feat: v2 rewrite — hexagonal arch, ActivityPub federation, NATS, deployment-ready (#1)
This commit was merged in pull request #1.
This commit is contained in:
30
crates/worker/Cargo.toml
Normal file
30
crates/worker/Cargo.toml
Normal file
@@ -0,0 +1,30 @@
|
||||
[package]
|
||||
name = "worker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "thoughts-worker"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
nats = { workspace = true }
|
||||
event-transport = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
activitypub-base = { workspace = true }
|
||||
activitypub = { workspace = true }
|
||||
postgres = { workspace = true }
|
||||
postgres-federation = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
domain = { workspace = true, features = ["test-helpers"] }
|
||||
64
crates/worker/src/dlq.rs
Normal file
64
crates/worker/src/dlq.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
|
||||
use postgres::failed_event::{PgFailedEventStore, DLQ_MAX_RETRIES, DLQ_POLL_INTERVAL_SECS};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Background task: polls `failed_events` and republishes due rows to the event bus.
|
||||
pub async fn run_dlq_processor(store: Arc<PgFailedEventStore>, publisher: Arc<dyn EventPublisher>) {
|
||||
let interval = std::time::Duration::from_secs(DLQ_POLL_INTERVAL_SECS);
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
if let Err(e) = process_due(&store, &*publisher).await {
|
||||
tracing::error!("DLQ processor error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_due(
|
||||
store: &PgFailedEventStore,
|
||||
publisher: &dyn EventPublisher,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
let due = store.poll_due().await?;
|
||||
if due.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
tracing::info!(count = due.len(), "DLQ: processing due events");
|
||||
|
||||
for row in due {
|
||||
if row.retry_count >= DLQ_MAX_RETRIES {
|
||||
tracing::error!(
|
||||
id = %row.id,
|
||||
event_type = %row.event_type,
|
||||
retry_count = row.retry_count,
|
||||
"DLQ: event permanently failed — parking",
|
||||
);
|
||||
store.park_permanently(row.id).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
let republish_result = republish(&row.payload, publisher).await;
|
||||
|
||||
match republish_result {
|
||||
Ok(()) => {
|
||||
tracing::info!(id = %row.id, "DLQ: republished successfully");
|
||||
store.advance(row.id, None).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(id = %row.id, error = %e, "DLQ: republish failed");
|
||||
store.advance(row.id, Some(&e.to_string())).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn republish(
|
||||
payload: &serde_json::Value,
|
||||
publisher: &dyn EventPublisher,
|
||||
) -> Result<(), DomainError> {
|
||||
use event_payload::EventPayload;
|
||||
let ep: EventPayload = serde_json::from_value(payload.clone())
|
||||
.map_err(|e| DomainError::Internal(format!("DLQ deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(ep)
|
||||
.map_err(|e| DomainError::Internal(format!("DLQ event conversion: {e}")))?;
|
||||
publisher.publish(&event).await
|
||||
}
|
||||
114
crates/worker/src/factory.rs
Normal file
114
crates/worker/src/factory.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use postgres::failed_event::PgFailedEventStore;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use activitypub::ThoughtsObjectHandler;
|
||||
use activitypub_base::ActivityPubService;
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use activitypub_base::{ActivityPubRepository, OutboundFederationPort};
|
||||
use domain::ports::EventPublisher;
|
||||
use postgres::activitypub::PgActivityPubRepository;
|
||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||
|
||||
use crate::handlers::{FederationHandler, NotificationHandler};
|
||||
|
||||
pub struct WorkerHandlers {
|
||||
pub notification: NotificationHandler,
|
||||
pub federation: FederationHandler,
|
||||
}
|
||||
|
||||
pub struct WorkerInfra {
|
||||
pub pool: PgPool,
|
||||
pub consumer: event_transport::EventConsumerAdapter<nats::NatsMessageSource>,
|
||||
pub handlers: WorkerHandlers,
|
||||
pub dlq_store: Arc<PgFailedEventStore>,
|
||||
pub event_publisher: Arc<dyn EventPublisher>,
|
||||
}
|
||||
|
||||
pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> WorkerInfra {
|
||||
let pool = PgPool::connect(database_url)
|
||||
.await
|
||||
.expect("DB connect failed");
|
||||
|
||||
// Repos
|
||||
let thoughts = Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone()));
|
||||
let users = Arc::new(postgres::user::PgUserRepository::new(pool.clone()));
|
||||
let notifications = Arc::new(postgres::notification::PgNotificationRepository::new(
|
||||
pool.clone(),
|
||||
));
|
||||
|
||||
// ActivityPub service (for federation fan-out)
|
||||
let ap_service = Arc::new(
|
||||
ActivityPubService::new(
|
||||
Arc::new(PostgresFederationRepository::new(pool.clone())),
|
||||
Arc::new(PostgresApUserRepository::new(
|
||||
pool.clone(),
|
||||
base_url.to_string(),
|
||||
)),
|
||||
Arc::new(ThoughtsObjectHandler::new(
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||
base_url,
|
||||
None,
|
||||
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||
)),
|
||||
base_url.to_string(),
|
||||
false,
|
||||
"thoughts".to_string(),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("ActivityPubService build failed"),
|
||||
);
|
||||
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
|
||||
let ap_repo_worker =
|
||||
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
|
||||
|
||||
// Application services
|
||||
let notification_svc = Arc::new(NotificationEventService {
|
||||
thoughts: thoughts.clone(),
|
||||
notifications,
|
||||
});
|
||||
let federation_svc = Arc::new(FederationEventService {
|
||||
thoughts,
|
||||
users,
|
||||
ap: ap_outbound,
|
||||
base_url: base_url.to_string(),
|
||||
ap_repo: ap_repo_worker,
|
||||
});
|
||||
|
||||
// Thin handlers
|
||||
let handlers = WorkerHandlers {
|
||||
notification: NotificationHandler {
|
||||
service: notification_svc,
|
||||
},
|
||||
federation: FederationHandler {
|
||||
service: federation_svc,
|
||||
},
|
||||
};
|
||||
|
||||
// DLQ store
|
||||
let dlq_store = Arc::new(PgFailedEventStore::new(pool.clone()));
|
||||
|
||||
// NATS consumer + publisher
|
||||
let nats_client = async_nats::connect(nats_url)
|
||||
.await
|
||||
.expect("NATS connect failed");
|
||||
nats::ensure_stream(&nats_client)
|
||||
.await
|
||||
.expect("JetStream stream setup failed");
|
||||
let consumer = event_transport::EventConsumerAdapter::new(nats::NatsMessageSource::new(
|
||||
nats_client.clone(),
|
||||
));
|
||||
let event_publisher: Arc<dyn EventPublisher> = Arc::new(
|
||||
event_transport::EventPublisherAdapter::new(nats::NatsTransport::new(nats_client)),
|
||||
);
|
||||
|
||||
WorkerInfra {
|
||||
pool,
|
||||
consumer,
|
||||
handlers,
|
||||
dlq_store,
|
||||
event_publisher,
|
||||
}
|
||||
}
|
||||
23
crates/worker/src/handlers.rs
Normal file
23
crates/worker/src/handlers.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
use application::services::{FederationEventService, NotificationEventService};
|
||||
use domain::{errors::DomainError, events::DomainEvent};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct NotificationHandler {
|
||||
pub service: Arc<NotificationEventService>,
|
||||
}
|
||||
|
||||
impl NotificationHandler {
|
||||
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FederationHandler {
|
||||
pub service: Arc<FederationEventService>,
|
||||
}
|
||||
|
||||
impl FederationHandler {
|
||||
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
self.service.process(event).await
|
||||
}
|
||||
}
|
||||
97
crates/worker/src/main.rs
Normal file
97
crates/worker/src/main.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
mod dlq;
|
||||
mod factory;
|
||||
mod handlers;
|
||||
mod outbox_relay;
|
||||
|
||||
use domain::ports::EventConsumer;
|
||||
use futures::StreamExt;
|
||||
use nats::CONSUMER_MAX_DELIVER;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenvy::dotenv().ok();
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL required");
|
||||
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
|
||||
let base_url = std::env::var("BASE_URL").expect("BASE_URL required");
|
||||
|
||||
tracing::info!("Building worker...");
|
||||
let infra = factory::build(&database_url, &base_url, &nats_url).await;
|
||||
|
||||
// Spawn DLQ processor as a background task.
|
||||
tokio::spawn(dlq::run_dlq_processor(
|
||||
infra.dlq_store.clone(),
|
||||
infra.event_publisher.clone(),
|
||||
));
|
||||
|
||||
// Spawn outbox relay — polls DB for undelivered events and publishes them.
|
||||
tokio::spawn(
|
||||
outbox_relay::OutboxRelay {
|
||||
pool: infra.pool.clone(),
|
||||
publisher: infra.event_publisher.clone(),
|
||||
poll_interval: std::time::Duration::from_secs(5),
|
||||
}
|
||||
.run(),
|
||||
);
|
||||
|
||||
tracing::info!("Worker started, consuming events...");
|
||||
let mut stream = infra.consumer.consume();
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(envelope) => {
|
||||
let event = &envelope.event;
|
||||
tracing::debug!(?event, "received event");
|
||||
|
||||
let n = infra.handlers.notification.handle(event).await;
|
||||
let f = infra.handlers.federation.handle(event).await;
|
||||
|
||||
if n.is_ok() && f.is_ok() {
|
||||
(envelope.ack)();
|
||||
} else {
|
||||
if let Err(e) = &n {
|
||||
tracing::error!("notification handler: {e}");
|
||||
}
|
||||
if let Err(e) = &f {
|
||||
tracing::error!("federation handler: {e}");
|
||||
}
|
||||
|
||||
// Last delivery attempt -> move to DLQ then ack.
|
||||
// Earlier attempts -> nack so NATS retries.
|
||||
if envelope.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||||
let error_msg = n
|
||||
.err()
|
||||
.or(f.err())
|
||||
.map(|e| e.to_string())
|
||||
.unwrap_or_else(|| "unknown error".into());
|
||||
|
||||
// Serialize event back to payload for storage.
|
||||
let ep = event_payload::EventPayload::from(event);
|
||||
let event_type = ep.subject().to_string();
|
||||
let payload = serde_json::to_value(&ep).unwrap_or(serde_json::Value::Null);
|
||||
|
||||
if let Err(e) = infra
|
||||
.dlq_store
|
||||
.insert(&event_type, &payload, &error_msg)
|
||||
.await
|
||||
{
|
||||
tracing::error!("DLQ insert failed: {e} — message lost");
|
||||
} else {
|
||||
tracing::warn!(
|
||||
event_type,
|
||||
delivery_count = envelope.delivery_count,
|
||||
"event exhausted — moved to DLQ"
|
||||
);
|
||||
}
|
||||
(envelope.ack)(); // ack from NATS — DLQ owns it now
|
||||
} else {
|
||||
(envelope.nack)();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!("consumer error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
114
crates/worker/src/outbox_relay.rs
Normal file
114
crates/worker/src/outbox_relay.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use domain::{events::DomainEvent, ports::EventPublisher};
|
||||
use event_payload::EventPayload;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub struct OutboxRelay {
|
||||
pub pool: PgPool,
|
||||
pub publisher: Arc<dyn EventPublisher>,
|
||||
pub poll_interval: Duration,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct OutboxRow {
|
||||
seq: i64,
|
||||
event_type: String,
|
||||
payload: serde_json::Value,
|
||||
}
|
||||
|
||||
impl OutboxRelay {
|
||||
pub async fn run(self) {
|
||||
loop {
|
||||
if let Err(e) = self.process_batch().await {
|
||||
tracing::error!("outbox relay error: {e}");
|
||||
}
|
||||
tokio::time::sleep(self.poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: thoughts.save() and outbox.append() are not in the same DB transaction
|
||||
// (known architectural limitation — fixing requires transaction-sharing between
|
||||
// repositories, a larger refactor).
|
||||
async fn process_batch(&self) -> Result<(), sqlx::Error> {
|
||||
// Process one row at a time inside its own transaction so that
|
||||
// FOR UPDATE SKIP LOCKED actually holds the lock for the duration
|
||||
// of publish + mark_delivered. A batch SELECT without a surrounding
|
||||
// transaction releases locks immediately after autocommit.
|
||||
loop {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let row = sqlx::query_as::<_, OutboxRow>(
|
||||
"SELECT seq, event_type, payload \
|
||||
FROM outbox_events \
|
||||
WHERE delivered = false \
|
||||
ORDER BY seq ASC \
|
||||
LIMIT 1 \
|
||||
FOR UPDATE SKIP LOCKED",
|
||||
)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let Some(row) = row else {
|
||||
tx.rollback().await?;
|
||||
break;
|
||||
};
|
||||
|
||||
let payload: EventPayload = match serde_json::from_value(row.payload.clone()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}");
|
||||
// Mark delivered to avoid blocking; investigate manually.
|
||||
sqlx::query(
|
||||
"UPDATE outbox_events \
|
||||
SET delivered = true, delivered_at = now() \
|
||||
WHERE seq = $1",
|
||||
)
|
||||
.bind(row.seq)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let domain_event = match DomainEvent::try_from(payload) {
|
||||
Ok(ev) => ev,
|
||||
Err(e) => {
|
||||
tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}");
|
||||
sqlx::query(
|
||||
"UPDATE outbox_events \
|
||||
SET delivered = true, delivered_at = now() \
|
||||
WHERE seq = $1",
|
||||
)
|
||||
.bind(row.seq)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match self.publisher.publish(&domain_event).await {
|
||||
Ok(()) => {
|
||||
sqlx::query(
|
||||
"UPDATE outbox_events \
|
||||
SET delivered = true, delivered_at = now() \
|
||||
WHERE seq = $1",
|
||||
)
|
||||
.bind(row.seq)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}");
|
||||
tx.rollback().await?; // row stays undelivered, retried next poll
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user