refactor: 5 architectural improvements (Tasks 2-5 + Task 6 fix)
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m19s
test / integration (pull_request) Failing after 17m15s

- feat(domain): Hashtag value object with canonical extract() — unifies two
  divergent private implementations; fields pre-compute raw/normalized/url_slug/ap_name

- feat(presentation): Deps<S: FromAppState> extractor — each handler now
  declares its exact dependency surface; AppState unchanged; handlers
  become unit-testable without mocking all 20 deps

- refactor(feed): replace 5 flat FeedRepository methods with FeedQuery/FeedScope
  — single query() method; SQL shared logic lives once; adding feed types
  no longer requires 5 edits

- refactor(activitypub): ActivityPubRepository + OutboundFederationPort moved
  out of domain::ports into activitypub-base::ap_ports — domain crate no
  longer knows about AP IDs, inboxes, or actor URLs

- fix(outbox): OutboxRelay now opens a per-row transaction so FOR UPDATE
  SKIP LOCKED actually holds the lock during publish + mark_delivered
This commit is contained in:
2026-05-15 18:54:20 +02:00
parent 6024a65060
commit 0592861edd
37 changed files with 1401 additions and 865 deletions

View File

@@ -5,7 +5,8 @@ use std::sync::Arc;
use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, EventPublisher, OutboundFederationPort};
use activitypub_base::{ActivityPubRepository, OutboundFederationPort};
use domain::ports::EventPublisher;
use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};

View File

@@ -27,25 +27,47 @@ impl OutboxRelay {
}
}
// NOTE: thoughts.save() and outbox.append() are not in the same DB transaction
// (known architectural limitation — fixing requires transaction-sharing between
// repositories, a larger refactor).
async fn process_batch(&self) -> Result<(), sqlx::Error> {
let rows = sqlx::query_as::<_, OutboxRow>(
"SELECT seq, event_type, payload \
FROM outbox_events \
WHERE delivered = false \
ORDER BY seq ASC \
LIMIT 100 \
FOR UPDATE SKIP LOCKED",
)
.fetch_all(&self.pool)
.await?;
// Process one row at a time inside its own transaction so that
// FOR UPDATE SKIP LOCKED actually holds the lock for the duration
// of publish + mark_delivered. A batch SELECT without a surrounding
// transaction releases locks immediately after autocommit.
loop {
let mut tx = self.pool.begin().await?;
let row = sqlx::query_as::<_, OutboxRow>(
"SELECT seq, event_type, payload \
FROM outbox_events \
WHERE delivered = false \
ORDER BY seq ASC \
LIMIT 1 \
FOR UPDATE SKIP LOCKED",
)
.fetch_optional(&mut *tx)
.await?;
let Some(row) = row else {
tx.rollback().await?;
break;
};
for row in rows {
let payload: EventPayload = match serde_json::from_value(row.payload.clone()) {
Ok(p) => p,
Err(e) => {
tracing::error!(seq = row.seq, event_type = row.event_type, "outbox: failed to deserialize payload: {e}");
// Mark delivered to avoid blocking; investigate manually.
self.mark_delivered(row.seq).await?;
sqlx::query(
"UPDATE outbox_events \
SET delivered = true, delivered_at = now() \
WHERE seq = $1",
)
.bind(row.seq)
.execute(&mut *tx)
.await?;
tx.commit().await?;
continue;
}
};
@@ -54,35 +76,39 @@ impl OutboxRelay {
Ok(ev) => ev,
Err(e) => {
tracing::error!(seq = row.seq, "outbox: failed to convert to DomainEvent: {e}");
self.mark_delivered(row.seq).await?;
sqlx::query(
"UPDATE outbox_events \
SET delivered = true, delivered_at = now() \
WHERE seq = $1",
)
.bind(row.seq)
.execute(&mut *tx)
.await?;
tx.commit().await?;
continue;
}
};
match self.publisher.publish(&domain_event).await {
Ok(()) => {
self.mark_delivered(row.seq).await?;
sqlx::query(
"UPDATE outbox_events \
SET delivered = true, delivered_at = now() \
WHERE seq = $1",
)
.bind(row.seq)
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::debug!(seq = row.seq, event_type = row.event_type, "outbox: delivered");
}
Err(e) => {
tracing::warn!(seq = row.seq, "outbox: publish failed (will retry): {e}");
// Leave delivered=false — will be retried next poll.
tx.rollback().await?; // row stays undelivered, retried next poll
}
}
}
Ok(())
}
async fn mark_delivered(&self, seq: i64) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE outbox_events \
SET delivered = true, delivered_at = now() \
WHERE seq = $1",
)
.bind(seq)
.execute(&self.pool)
.await?;
Ok(())
}
}