Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m19s
test / integration (pull_request) Failing after 17m15s
- feat(domain): Hashtag value object with canonical extract() — unifies two divergent private implementations; fields pre-compute raw/normalized/url_slug/ap_name - feat(presentation): Deps<S: FromAppState> extractor — each handler now declares its exact dependency surface; AppState unchanged; handlers become unit-testable without mocking all 20 deps - refactor(feed): replace 5 flat FeedRepository methods with FeedQuery/FeedScope — single query() method; SQL shared logic lives once; adding feed types no longer requires 5 edits - refactor(activitypub): ActivityPubRepository + OutboundFederationPort moved out of domain::ports into activitypub-base::ap_ports — domain crate no longer knows about AP IDs, inboxes, or actor URLs - fix(outbox): OutboxRelay now opens a per-row transaction so FOR UPDATE SKIP LOCKED actually holds the lock during publish + mark_delivered
115 lines
4.0 KiB
Rust
115 lines
4.0 KiB
Rust
use domain::{events::DomainEvent, ports::EventPublisher};
|
|
use event_payload::EventPayload;
|
|
use sqlx::PgPool;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
pub struct OutboxRelay {
|
|
pub pool: PgPool,
|
|
pub publisher: Arc<dyn EventPublisher>,
|
|
pub poll_interval: Duration,
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct OutboxRow {
|
|
seq: i64,
|
|
event_type: String,
|
|
payload: serde_json::Value,
|
|
}
|
|
|
|
impl OutboxRelay {
|
|
pub async fn run(self) {
|
|
loop {
|
|
if let Err(e) = self.process_batch().await {
|
|
tracing::error!("outbox relay error: {e}");
|
|
}
|
|
tokio::time::sleep(self.poll_interval).await;
|
|
}
|
|
}
|
|
|
|
// NOTE: thoughts.save() and outbox.append() are not in the same DB transaction
|
|
// (known architectural limitation — fixing requires transaction-sharing between
|
|
// repositories, a larger refactor).
|
|
async fn process_batch(&self) -> Result<(), sqlx::Error> {
|
|
// Process one row at a time inside its own transaction so that
|
|
// FOR UPDATE SKIP LOCKED actually holds the lock for the duration
|
|
// of publish + mark_delivered. A batch SELECT without a surrounding
|
|
// transaction releases locks immediately after autocommit.
|
|
loop {
|
|
let mut tx = self.pool.begin().await?;
|
|
|
|
let row = sqlx::query_as::<_, OutboxRow>(
|
|
"SELECT seq, event_type, payload \
|
|
FROM outbox_events \
|
|
WHERE delivered = false \
|
|
ORDER BY seq ASC \
|
|
LIMIT 1 \
|
|
FOR UPDATE SKIP LOCKED",
|
|
)
|
|
.fetch_optional(&mut *tx)
|
|
.await?;
|
|
|
|
let Some(row) = row else {
|
|
tx.rollback().await?;
|
|
break;
|
|
};
|
|
|
|
let payload: EventPayload = match serde_json::from_value(row.payload.clone()) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}");
|
|
// Mark delivered to avoid blocking; investigate manually.
|
|
sqlx::query(
|
|
"UPDATE outbox_events \
|
|
SET delivered = true, delivered_at = now() \
|
|
WHERE seq = $1",
|
|
)
|
|
.bind(row.seq)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
tx.commit().await?;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let domain_event = match DomainEvent::try_from(payload) {
|
|
Ok(ev) => ev,
|
|
Err(e) => {
|
|
tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}");
|
|
sqlx::query(
|
|
"UPDATE outbox_events \
|
|
SET delivered = true, delivered_at = now() \
|
|
WHERE seq = $1",
|
|
)
|
|
.bind(row.seq)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
tx.commit().await?;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
match self.publisher.publish(&domain_event).await {
|
|
Ok(()) => {
|
|
sqlx::query(
|
|
"UPDATE outbox_events \
|
|
SET delivered = true, delivered_at = now() \
|
|
WHERE seq = $1",
|
|
)
|
|
.bind(row.seq)
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
tx.commit().await?;
|
|
tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered");
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}");
|
|
tx.rollback().await?; // row stays undelivered, retried next poll
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|