feat: add federation processed activities table and update dependencies
- Created a new SQL migration to add the `federation_processed_activities` table with an index on `processed_at`. - Updated dependencies in `Cargo.toml` files across `bootstrap` and `worker` crates, including version updates for `k-ap`. - Enhanced the event publishing mechanism in the `factory.rs` file to include a new `KapPublisher` for handling federation events. - Refactored the `build` function in `factory.rs` to accommodate the new event publisher and improve ActivityPub service initialization. - Modified the worker's main loop to handle new federation event types and improved error handling for event processing. Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -8,23 +8,25 @@ name = "thoughts-worker"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||
activitypub = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
k-ap = { version = "0.3.0", registry = "gitea" }
|
||||
activitypub = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
postgres-federation = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
url = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
domain = { workspace = true, features = ["test-helpers"] }
|
||||
|
||||
@@ -23,10 +23,11 @@ pub struct WorkerHandlers {
|
||||
|
||||
pub struct WorkerInfra {
|
||||
pub pool: PgPool,
|
||||
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||||
pub message_source: nats::NatsMessageSource,
|
||||
pub handlers: WorkerHandlers,
|
||||
pub dlq_store: Arc<PgFailedEventStore>,
|
||||
pub event_publisher: Arc<dyn EventPublisher>,
|
||||
pub raw_ap_service: Arc<k_ap::ActivityPubService>,
|
||||
}
|
||||
|
||||
pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra {
|
||||
@@ -43,28 +44,32 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
|
||||
// ActivityPub service (for federation fan-out)
|
||||
let connections_repo_worker = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()));
|
||||
let fed_repo_worker = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||
let ap_handler_worker = Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
base_url,
|
||||
None,
|
||||
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||
));
|
||||
let raw_ap_service = Arc::new(
|
||||
ActivityPubService::builder(
|
||||
Arc::new(PostgresFederationRepository::new(pool.clone())),
|
||||
Arc::new(PostgresApUserRepository::new(
|
||||
ActivityPubService::builder(base_url.to_string())
|
||||
.activity_repo(fed_repo_worker.clone())
|
||||
.follow_repo(fed_repo_worker.clone())
|
||||
.actor_repo(fed_repo_worker.clone())
|
||||
.blocklist_repo(fed_repo_worker.clone())
|
||||
.user_repo(Arc::new(PostgresApUserRepository::new(
|
||||
pool.clone(),
|
||||
base_url.to_string(),
|
||||
)),
|
||||
Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
base_url,
|
||||
None,
|
||||
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||
)),
|
||||
base_url,
|
||||
)
|
||||
.software_name("thoughts")
|
||||
.build()
|
||||
.await
|
||||
.expect("ActivityPubService build failed"),
|
||||
)))
|
||||
.content_reader(ap_handler_worker.clone())
|
||||
.object_handler(ap_handler_worker)
|
||||
.software_name("thoughts")
|
||||
.build()
|
||||
.await
|
||||
.expect("ActivityPubService build failed"),
|
||||
);
|
||||
let ap_service = Arc::new(ApFederationAdapter::new(
|
||||
raw_ap_service,
|
||||
raw_ap_service.clone(),
|
||||
connections_repo_worker,
|
||||
));
|
||||
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
|
||||
@@ -110,18 +115,17 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
||||
nats::ensure_stream(&nats_client)
|
||||
.await
|
||||
.expect("JetStream stream setup failed");
|
||||
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(
|
||||
nats_client.clone(),
|
||||
));
|
||||
let message_source = nats::NatsMessageSource::new(nats_client.clone());
|
||||
let event_publisher: Arc<dyn EventPublisher> = Arc::new(
|
||||
event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)),
|
||||
);
|
||||
|
||||
WorkerInfra {
|
||||
pool,
|
||||
consumer,
|
||||
message_source,
|
||||
handlers,
|
||||
dlq_store,
|
||||
event_publisher,
|
||||
raw_ap_service,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,13 @@ mod factory;
|
||||
mod handlers;
|
||||
mod outbox_relay;
|
||||
|
||||
use domain::ports::EventConsumer;
|
||||
use domain::{errors::DomainError, events::DomainEvent};
|
||||
use event_payload::EventPayload;
|
||||
use event_transport::MessageSource;
|
||||
use futures::StreamExt;
|
||||
use nats::CONSUMER_MAX_DELIVER;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -21,13 +25,11 @@ async fn main() {
|
||||
tracing::info!("Building worker...");
|
||||
let infra = factory::build(&database_url, &base_url, &nats_url).await;
|
||||
|
||||
// Spawn DLQ processor as a background task.
|
||||
tokio::spawn(dlq::run_dlq_processor(
|
||||
infra.dlq_store.clone(),
|
||||
infra.event_publisher.clone(),
|
||||
));
|
||||
|
||||
// Spawn outbox relay — polls DB for undelivered events and publishes them.
|
||||
tokio::spawn(
|
||||
outbox_relay::OutboxRelay {
|
||||
pool: infra.pool.clone(),
|
||||
@@ -38,71 +40,123 @@ async fn main() {
|
||||
);
|
||||
|
||||
tracing::info!("Worker started, consuming events...");
|
||||
let mut stream = infra.consumer.consume();
|
||||
let mut stream = infra.message_source.messages();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(envelope) => {
|
||||
let event = &envelope.event;
|
||||
let event_type = event_payload::EventPayload::from(event).subject();
|
||||
tracing::info!(
|
||||
event_type,
|
||||
delivery = envelope.delivery_count,
|
||||
"received event"
|
||||
);
|
||||
|
||||
let n = infra.handlers.notification.handle(event).await;
|
||||
let f = infra.handlers.federation.handle(event).await;
|
||||
let fm = infra.handlers.federation_management.handle(event).await;
|
||||
|
||||
if n.is_ok() && f.is_ok() && fm.is_ok() {
|
||||
(envelope.ack)();
|
||||
tracing::info!(event_type, "event handled ok");
|
||||
} else {
|
||||
if let Err(e) = &n {
|
||||
tracing::error!("notification handler: {e}");
|
||||
}
|
||||
if let Err(e) = &f {
|
||||
tracing::error!("federation handler: {e}");
|
||||
}
|
||||
if let Err(e) = &fm {
|
||||
tracing::error!("federation management handler: {e}");
|
||||
Err(e) => tracing::error!("consumer error: {e}"),
|
||||
Ok(raw) => {
|
||||
let payload = match serde_json::from_slice::<EventPayload>(&raw.payload) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to deserialize event payload — acking: {e}");
|
||||
(raw.ack)();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Last delivery attempt -> move to DLQ then ack.
|
||||
// Earlier attempts -> nack so NATS retries.
|
||||
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||||
let error_msg = n
|
||||
.err()
|
||||
.or(f.err())
|
||||
.or(fm.err())
|
||||
.map(|e| e.to_string())
|
||||
.unwrap_or_else(|| "unknown error".into());
|
||||
let event_type = payload.subject();
|
||||
tracing::info!(event_type, delivery = raw.delivery_count, "received event");
|
||||
|
||||
// Serialize event back to payload for storage.
|
||||
let ep = event_payload::EventPayload::from(event);
|
||||
let event_type = ep.subject().to_string();
|
||||
let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null);
|
||||
|
||||
if let Err(e) = infra
|
||||
.dlq_store
|
||||
.insert(&event_type, &payload, &error_msg)
|
||||
.await
|
||||
{
|
||||
tracing::error!("DLQ insert failed: {e} — message lost");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
event_type,
|
||||
delivery_count = envelope.delivery_count,
|
||||
"event exhausted — moved to DLQ"
|
||||
);
|
||||
let outcome: Result<(), DomainError> = match payload {
|
||||
// ── k-ap federation events ────────────────────────────
|
||||
EventPayload::FederationDeliveryRequested {
|
||||
inbox,
|
||||
activity,
|
||||
signing_actor_id,
|
||||
} => {
|
||||
let result = async {
|
||||
let inbox_url = Url::parse(&inbox)
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
let actor_id = Uuid::parse_str(&signing_actor_id)
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
infra
|
||||
.raw_ap_service
|
||||
.deliver_to_inbox(inbox_url, activity, actor_id)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
.await;
|
||||
result
|
||||
}
|
||||
EventPayload::FederationBackfillRequested {
|
||||
owner_user_id,
|
||||
follower_inbox_url,
|
||||
} => {
|
||||
let result = async {
|
||||
let owner_id = Uuid::parse_str(&owner_user_id)
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
infra
|
||||
.raw_ap_service
|
||||
.run_backfill_for_follower(owner_id, follower_inbox_url)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||
}
|
||||
.await;
|
||||
result
|
||||
}
|
||||
|
||||
// ── domain events ──────────────────────────────────────
|
||||
p => match DomainEvent::try_from(p) {
|
||||
Err(e) => {
|
||||
tracing::warn!("unknown event type — acking: {e}");
|
||||
(raw.ack)();
|
||||
continue;
|
||||
}
|
||||
Ok(event) => {
|
||||
let n = infra.handlers.notification.handle(&event).await;
|
||||
let f = infra.handlers.federation.handle(&event).await;
|
||||
let fm = infra.handlers.federation_management.handle(&event).await;
|
||||
match (n, f, fm) {
|
||||
(Ok(()), Ok(()), Ok(())) => Ok(()),
|
||||
(n, f, fm) => {
|
||||
if let Err(e) = &n {
|
||||
tracing::error!("notification handler: {e}");
|
||||
}
|
||||
if let Err(e) = &f {
|
||||
tracing::error!("federation handler: {e}");
|
||||
}
|
||||
if let Err(e) = &fm {
|
||||
tracing::error!("federation management handler: {e}");
|
||||
}
|
||||
Err(n.err().or(f.err()).or(fm.err()).unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
match outcome {
|
||||
Ok(()) => {
|
||||
(raw.ack)();
|
||||
tracing::info!(event_type, "event handled ok");
|
||||
}
|
||||
Err(e) => {
|
||||
if raw.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||||
// Rebuild payload from raw bytes for DLQ storage.
|
||||
let payload_val = serde_json::from_slice::<serde_json::Value>(
|
||||
&raw.payload,
|
||||
)
|
||||
.unwrap_or(serde_json::Value::Null);
|
||||
if let Err(dlq_err) = infra
|
||||
.dlq_store
|
||||
.insert(event_type, &payload_val, &e.to_string())
|
||||
.await
|
||||
{
|
||||
tracing::error!("DLQ insert failed: {dlq_err} — message lost");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
event_type,
|
||||
delivery_count = raw.delivery_count,
|
||||
"event exhausted — moved to DLQ"
|
||||
);
|
||||
}
|
||||
(raw.ack)();
|
||||
} else {
|
||||
(raw.nack)();
|
||||
}
|
||||
(envelope.ack)(); // ack from NATS — DLQ owns it now
|
||||
} else {
|
||||
(envelope.nack)();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!("consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user