diff --git a/Cargo.lock b/Cargo.lock index 113eb85..bad9056 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3385,6 +3385,19 @@ dependencies = [ [[package]] name = "postgres-event-queue" version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "domain", + "futures", + "serde", + "serde_json", + "sqlx", + "tokio", + "tracing", + "uuid", +] [[package]] name = "postgres-federation" @@ -3451,11 +3464,13 @@ dependencies = [ "poster-fetcher", "poster-storage", "postgres", + "postgres-event-queue", "postgres-federation", "rss 0.1.0", "serde", "serde_json", "sqlite", + "sqlite-event-queue", "sqlite-federation", "sqlx", "template-askama", @@ -4512,6 +4527,19 @@ dependencies = [ [[package]] name = "sqlite-event-queue" version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "domain", + "futures", + "serde", + "serde_json", + "sqlx", + "tokio", + "tracing", + "uuid", +] [[package]] name = "sqlite-federation" @@ -6221,9 +6249,11 @@ dependencies = [ "poster-fetcher", "poster-storage", "postgres", + "postgres-event-queue", "serde", "serde_json", "sqlite", + "sqlite-event-queue", "sqlx", "thiserror 2.0.18", "tokio", diff --git a/crates/adapters/postgres-event-queue/Cargo.toml b/crates/adapters/postgres-event-queue/Cargo.toml index 5c34a2c..961236f 100644 --- a/crates/adapters/postgres-event-queue/Cargo.toml +++ b/crates/adapters/postgres-event-queue/Cargo.toml @@ -4,3 +4,14 @@ version = "0.1.0" edition = "2024" [dependencies] +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono", "uuid"] } +domain = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } diff --git a/crates/adapters/postgres-event-queue/migrations/0001_event_queue.sql b/crates/adapters/postgres-event-queue/migrations/0001_event_queue.sql new file mode 100644 index 0000000..33dc858 --- /dev/null +++ b/crates/adapters/postgres-event-queue/migrations/0001_event_queue.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS event_queue ( + id BIGSERIAL PRIMARY KEY, + event_type TEXT NOT NULL, + payload TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_error TEXT +); + +CREATE INDEX IF NOT EXISTS idx_event_queue_poll + ON event_queue (status, next_attempt_at); diff --git a/crates/adapters/postgres-event-queue/src/lib.rs b/crates/adapters/postgres-event-queue/src/lib.rs index b93cf3f..74dd74b 100644 --- a/crates/adapters/postgres-event-queue/src/lib.rs +++ b/crates/adapters/postgres-event-queue/src/lib.rs @@ -1,14 +1,225 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right +mod migrations; +mod payload; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{AckHandle, DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, +}; +use futures::stream::{self, BoxStream}; +use sqlx::PgPool; +use tokio::sync::{Mutex, mpsc}; + +use payload::DbEventPayload; + +pub struct DbEventQueueConfig { + pub poll_interval_ms: u64, + pub batch_size: i64, + pub max_attempts: i32, } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); +impl DbEventQueueConfig { + pub fn from_env() -> Self { + Self { + poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS") + .ok().and_then(|v| v.parse().ok()).unwrap_or(500), + batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE") + .ok().and_then(|v| v.parse().ok()).unwrap_or(10), + max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS") + .ok().and_then(|v| v.parse().ok()).unwrap_or(5), + } } } + +#[derive(Clone)] +pub struct PostgresEventQueue { + pool: PgPool, + config: Arc, +} + +impl PostgresEventQueue { + pub async fn create(pool: PgPool, config: DbEventQueueConfig) -> anyhow::Result { + migrations::run(&pool).await?; + Ok(Self { pool, config: Arc::new(config) }) + } + + pub async fn create_publisher(pool: PgPool) -> anyhow::Result> { + let q = Self::create(pool, DbEventQueueConfig::from_env()).await?; + Ok(Arc::new(q)) + } + + pub async fn create_channel( + pool: PgPool, + ) -> anyhow::Result<(Arc, Arc)> { + let q = Self::create(pool, DbEventQueueConfig::from_env()).await?; + Ok((Arc::new(q.clone()), Arc::new(q))) + } +} + +#[async_trait] +impl EventPublisher for PostgresEventQueue { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let db_payload = DbEventPayload::from(event); + let event_type = db_payload.event_type(); + let payload_json = serde_json::to_string(&db_payload) + .map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?; + + sqlx::query( + "INSERT INTO event_queue (event_type, payload) VALUES ($1, $2)" + ) + .bind(event_type) + .bind(payload_json) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?; + + Ok(()) + } +} + +impl EventConsumer for PostgresEventQueue { + fn consume(&self) -> BoxStream<'_, Result> { + let pool = self.pool.clone(); + let config = Arc::clone(&self.config); + let (tx, rx) = mpsc::channel(128); + let rx = Arc::new(Mutex::new(rx)); + + tokio::spawn(async move { + let poll_interval = Duration::from_millis(config.poll_interval_ms); + loop { + match claim_batch(&pool, &config).await { + Err(e) => { + tracing::error!("postgres event queue claim error: {e}"); + tokio::time::sleep(poll_interval).await; + } + Ok(rows) if rows.is_empty() => { + tokio::time::sleep(poll_interval).await; + } + Ok(rows) => { + for row in rows { + let envelope = decode_row(&pool, row, config.max_attempts); + if tx.send(envelope).await.is_err() { + tracing::info!("postgres event queue consumer closed"); + return; + } + } + // no sleep — re-poll immediately when batch was non-empty + } + } + } + }); + + Box::pin(stream::unfold(rx, |rx| async move { + let item = rx.lock().await.recv().await?; + Some((item, rx)) + })) + } +} + +// ── Internal types ──────────────────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct QueueRow { + id: i64, + payload: String, + attempts: i32, +} + +async fn claim_batch( + pool: &PgPool, + config: &DbEventQueueConfig, +) -> Result, DomainError> { + // CTE with FOR UPDATE SKIP LOCKED — atomic and safe for multiple workers + let rows = sqlx::query_as::<_, QueueRow>( + r#" + WITH claimed AS ( + SELECT id FROM event_queue + WHERE status = 'pending' AND next_attempt_at <= NOW() + ORDER BY next_attempt_at ASC + LIMIT $1 + FOR UPDATE SKIP LOCKED + ) + UPDATE event_queue q + SET status = 'processing' + FROM claimed + WHERE q.id = claimed.id + RETURNING q.id, q.payload, q.attempts + "# + ) + .bind(config.batch_size) + .fetch_all(pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("claim batch: {e}")))?; + + Ok(rows) +} + +fn decode_row( + pool: &PgPool, + row: QueueRow, + max_attempts: i32, +) -> Result { + let db_payload: DbEventPayload = serde_json::from_str(&row.payload) + .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; + let event = DomainEvent::try_from(db_payload)?; + Ok(EventEnvelope::new(event, Box::new(DbAckHandle { + pool: pool.clone(), + row_id: row.id, + attempts: row.attempts, + max_attempts, + }))) +} + +struct DbAckHandle { + pool: PgPool, + row_id: i64, + attempts: i32, + max_attempts: i32, +} + +#[async_trait] +impl AckHandle for DbAckHandle { + async fn ack(&self) -> Result<(), DomainError> { + sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = $1") + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?; + Ok(()) + } + + async fn nack(&self) -> Result<(), DomainError> { + let new_attempts = self.attempts + 1; + if new_attempts >= self.max_attempts { + sqlx::query( + "UPDATE event_queue SET status = 'dead_lettered', attempts = $1, last_error = 'max attempts reached' WHERE id = $2" + ) + .bind(new_attempts) + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?; + } else { + let backoff = backoff_seconds(new_attempts).to_string(); + sqlx::query( + "UPDATE event_queue SET status = 'pending', attempts = $1, next_attempt_at = NOW() + ($2 || ' seconds')::interval, last_error = 'nack' WHERE id = $3" + ) + .bind(new_attempts) + .bind(backoff) + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?; + } + Ok(()) + } +} + +fn backoff_seconds(attempts: i32) -> i64 { + let base: i64 = 5 * (1i64 << attempts.min(6)); + base.min(300) +} diff --git a/crates/adapters/postgres-event-queue/src/migrations.rs b/crates/adapters/postgres-event-queue/src/migrations.rs new file mode 100644 index 0000000..66fb3be --- /dev/null +++ b/crates/adapters/postgres-event-queue/src/migrations.rs @@ -0,0 +1,6 @@ +pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> { + sqlx::migrate!("./migrations") + .run(pool) + .await + .map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}")) +} diff --git a/crates/adapters/postgres-event-queue/src/payload.rs b/crates/adapters/postgres-event-queue/src/payload.rs new file mode 100644 index 0000000..61ed07f --- /dev/null +++ b/crates/adapters/postgres-event-queue/src/payload.rs @@ -0,0 +1,189 @@ +use chrono::NaiveDateTime; +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", content = "data")] +pub enum DbEventPayload { + ReviewLogged { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + ReviewUpdated { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + MovieDiscovered { + movie_id: String, + external_metadata_id: String, + }, +} + +impl DbEventPayload { + pub fn event_type(&self) -> &'static str { + match self { + DbEventPayload::ReviewLogged { .. } => "ReviewLogged", + DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated", + DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered", + } + } +} + +fn parse_uuid(s: &str, field: &str) -> Result { + Uuid::parse_str(s) + .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) +} + +fn parse_ts(ts: i64) -> Result { + chrono::DateTime::from_timestamp(ts, 0) + .map(|dt| dt.naive_utc()) + .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) +} + +impl From<&DomainEvent> for DbEventPayload { + fn from(event: &DomainEvent) -> Self { + match event { + DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + DbEventPayload::ReviewLogged { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + DbEventPayload::ReviewUpdated { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { + DbEventPayload::MovieDiscovered { + movie_id: movie_id.value().to_string(), + external_metadata_id: external_metadata_id.value().to_owned(), + } + } + } + } +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + fn try_from(payload: DbEventPayload) -> Result { + match payload { + DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { + Ok(DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fixed_dt() -> NaiveDateTime { + chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() + } + + fn review_logged() -> DomainEvent { + DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(4).unwrap(), + watched_at: fixed_dt(), + } + } + + fn review_updated() -> DomainEvent { + DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(3).unwrap(), + watched_at: fixed_dt(), + } + } + + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + } + } + + fn round_trip(event: DomainEvent) { + let payload = DbEventPayload::from(&event); + let json = serde_json::to_string(&payload).expect("serialize"); + let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize"); + let recovered = DomainEvent::try_from(back).expect("try_from"); + assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered)); + } + + #[test] + fn round_trip_review_logged() { + round_trip(review_logged()); + } + + #[test] + fn round_trip_review_updated() { + round_trip(review_updated()); + } + + #[test] + fn round_trip_movie_discovered() { + round_trip(movie_discovered()); + } + + #[test] + fn serialized_format_is_tagged() { + let payload = DbEventPayload::from(&movie_discovered()); + let json = serde_json::to_string(&payload).unwrap(); + assert!(json.contains(r#""type":"MovieDiscovered""#)); + assert!(json.contains(r#""data":"#)); + } + + #[test] + fn event_type_strings() { + assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged"); + assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); + assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); + } +} diff --git a/crates/adapters/sqlite-event-queue/Cargo.toml b/crates/adapters/sqlite-event-queue/Cargo.toml index fef4d4a..175b6dc 100644 --- a/crates/adapters/sqlite-event-queue/Cargo.toml +++ b/crates/adapters/sqlite-event-queue/Cargo.toml @@ -4,3 +4,14 @@ version = "0.1.0" edition = "2024" [dependencies] +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "sqlite", "macros", "chrono"] } +domain = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +futures = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } diff --git a/crates/adapters/sqlite-event-queue/migrations/0001_event_queue.sql b/crates/adapters/sqlite-event-queue/migrations/0001_event_queue.sql new file mode 100644 index 0000000..336e058 --- /dev/null +++ b/crates/adapters/sqlite-event-queue/migrations/0001_event_queue.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS event_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type TEXT NOT NULL, + payload TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + next_attempt_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + last_error TEXT +); + +CREATE INDEX IF NOT EXISTS idx_event_queue_poll + ON event_queue (status, next_attempt_at); diff --git a/crates/adapters/sqlite-event-queue/src/lib.rs b/crates/adapters/sqlite-event-queue/src/lib.rs index b93cf3f..c5425fa 100644 --- a/crates/adapters/sqlite-event-queue/src/lib.rs +++ b/crates/adapters/sqlite-event-queue/src/lib.rs @@ -1,14 +1,236 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right +mod migrations; +mod payload; + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::{AckHandle, DomainEvent, EventEnvelope}, + ports::{EventConsumer, EventPublisher}, +}; +use futures::stream::{self, BoxStream}; +use sqlx::SqlitePool; +use tokio::sync::{Mutex, mpsc}; + +use payload::DbEventPayload; + +pub struct DbEventQueueConfig { + pub poll_interval_ms: u64, + pub batch_size: i64, + pub max_attempts: i32, } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); +impl DbEventQueueConfig { + pub fn from_env() -> Self { + Self { + poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS") + .ok().and_then(|v| v.parse().ok()).unwrap_or(500), + batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE") + .ok().and_then(|v| v.parse().ok()).unwrap_or(10), + max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS") + .ok().and_then(|v| v.parse().ok()).unwrap_or(5), + } } } + +#[derive(Clone)] +pub struct SqliteEventQueue { + pool: SqlitePool, + config: Arc, +} + +impl SqliteEventQueue { + pub async fn create(pool: SqlitePool, config: DbEventQueueConfig) -> anyhow::Result { + migrations::run(&pool).await?; + Ok(Self { pool, config: Arc::new(config) }) + } + + pub async fn create_publisher(pool: SqlitePool) -> anyhow::Result> { + let q = Self::create(pool, DbEventQueueConfig::from_env()).await?; + Ok(Arc::new(q)) + } + + pub async fn create_channel( + pool: SqlitePool, + ) -> anyhow::Result<(Arc, Arc)> { + let q = Self::create(pool, DbEventQueueConfig::from_env()).await?; + Ok((Arc::new(q.clone()), Arc::new(q))) + } +} + +#[async_trait] +impl EventPublisher for SqliteEventQueue { + async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> { + let db_payload = DbEventPayload::from(event); + let event_type = db_payload.event_type(); + let payload_json = serde_json::to_string(&db_payload) + .map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?; + + sqlx::query( + "INSERT INTO event_queue (event_type, payload) VALUES (?, ?)" + ) + .bind(event_type) + .bind(payload_json) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?; + + Ok(()) + } +} + +impl EventConsumer for SqliteEventQueue { + fn consume(&self) -> BoxStream<'_, Result> { + let pool = self.pool.clone(); + let config = Arc::clone(&self.config); + let (tx, rx) = mpsc::channel(128); + let rx = Arc::new(Mutex::new(rx)); + + tokio::spawn(async move { + let poll_interval = Duration::from_millis(config.poll_interval_ms); + loop { + match claim_batch(&pool, &config).await { + Err(e) => { + tracing::error!("sqlite event queue claim error: {e}"); + tokio::time::sleep(poll_interval).await; + } + Ok(rows) if rows.is_empty() => { + tokio::time::sleep(poll_interval).await; + } + Ok(rows) => { + for row in rows { + let envelope = decode_row(&pool, row, config.max_attempts); + if tx.send(envelope).await.is_err() { + tracing::info!("sqlite event queue consumer closed"); + return; + } + } + // no sleep — re-poll immediately when batch was non-empty + } + } + } + }); + + Box::pin(stream::unfold(rx, |rx| async move { + let item = rx.lock().await.recv().await?; + Some((item, rx)) + })) + } +} + +// ── Internal types ──────────────────────────────────────────────────────────── + +#[derive(sqlx::FromRow)] +struct QueueRow { + id: i64, + payload: String, + attempts: i32, +} + +async fn claim_batch( + pool: &SqlitePool, + config: &DbEventQueueConfig, +) -> Result, DomainError> { + let mut tx = pool.begin().await + .map_err(|e| DomainError::InfrastructureError(format!("begin tx: {e}")))?; + + let rows = sqlx::query_as::<_, QueueRow>( + "SELECT id, payload, attempts FROM event_queue + WHERE status = 'pending' + AND next_attempt_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now') + ORDER BY next_attempt_at ASC + LIMIT ?" + ) + .bind(config.batch_size) + .fetch_all(&mut *tx) + .await + .map_err(|e| DomainError::InfrastructureError(format!("select pending: {e}")))?; + + if rows.is_empty() { + tx.rollback().await.ok(); + return Ok(vec![]); + } + + let placeholders = rows.iter().map(|_| "?").collect::>().join(", "); + let sql = format!( + "UPDATE event_queue SET status = 'processing' WHERE id IN ({})", + placeholders + ); + let mut q = sqlx::query(&sql); + for r in &rows { q = q.bind(r.id); } + q.execute(&mut *tx).await + .map_err(|e| DomainError::InfrastructureError(format!("mark processing: {e}")))?; + + tx.commit().await + .map_err(|e| DomainError::InfrastructureError(format!("commit claim: {e}")))?; + + Ok(rows) +} + +fn decode_row( + pool: &SqlitePool, + row: QueueRow, + max_attempts: i32, +) -> Result { + let db_payload: DbEventPayload = serde_json::from_str(&row.payload) + .map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?; + let event = DomainEvent::try_from(db_payload)?; + Ok(EventEnvelope::new(event, Box::new(DbAckHandle { + pool: pool.clone(), + row_id: row.id, + attempts: row.attempts, + max_attempts, + }))) +} + +struct DbAckHandle { + pool: SqlitePool, + row_id: i64, + attempts: i32, + max_attempts: i32, +} + +#[async_trait] +impl AckHandle for DbAckHandle { + async fn ack(&self) -> Result<(), DomainError> { + sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = ?") + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?; + Ok(()) + } + + async fn nack(&self) -> Result<(), DomainError> { + let new_attempts = self.attempts + 1; + if new_attempts >= self.max_attempts { + sqlx::query( + "UPDATE event_queue SET status = 'dead_lettered', attempts = ?, last_error = 'max attempts reached' WHERE id = ?" + ) + .bind(new_attempts) + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?; + } else { + let backoff = backoff_seconds(new_attempts); + let sql = format!( + "UPDATE event_queue SET status = 'pending', attempts = ?, next_attempt_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now', '+{backoff} seconds'), last_error = 'nack' WHERE id = ?" + ); + sqlx::query(&sql) + .bind(new_attempts) + .bind(self.row_id) + .execute(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?; + } + Ok(()) + } +} + +fn backoff_seconds(attempts: i32) -> i64 { + let base: i64 = 5 * (1i64 << attempts.min(6)); + base.min(300) +} diff --git a/crates/adapters/sqlite-event-queue/src/migrations.rs b/crates/adapters/sqlite-event-queue/src/migrations.rs new file mode 100644 index 0000000..1a26dca --- /dev/null +++ b/crates/adapters/sqlite-event-queue/src/migrations.rs @@ -0,0 +1,6 @@ +pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> { + sqlx::migrate!("./migrations") + .run(pool) + .await + .map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}")) +} diff --git a/crates/adapters/sqlite-event-queue/src/payload.rs b/crates/adapters/sqlite-event-queue/src/payload.rs new file mode 100644 index 0000000..61ed07f --- /dev/null +++ b/crates/adapters/sqlite-event-queue/src/payload.rs @@ -0,0 +1,189 @@ +use chrono::NaiveDateTime; +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", content = "data")] +pub enum DbEventPayload { + ReviewLogged { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + ReviewUpdated { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + MovieDiscovered { + movie_id: String, + external_metadata_id: String, + }, +} + +impl DbEventPayload { + pub fn event_type(&self) -> &'static str { + match self { + DbEventPayload::ReviewLogged { .. } => "ReviewLogged", + DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated", + DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered", + } + } +} + +fn parse_uuid(s: &str, field: &str) -> Result { + Uuid::parse_str(s) + .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) +} + +fn parse_ts(ts: i64) -> Result { + chrono::DateTime::from_timestamp(ts, 0) + .map(|dt| dt.naive_utc()) + .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) +} + +impl From<&DomainEvent> for DbEventPayload { + fn from(event: &DomainEvent) -> Self { + match event { + DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + DbEventPayload::ReviewLogged { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + DbEventPayload::ReviewUpdated { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { + DbEventPayload::MovieDiscovered { + movie_id: movie_id.value().to_string(), + external_metadata_id: external_metadata_id.value().to_owned(), + } + } + } + } +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + fn try_from(payload: DbEventPayload) -> Result { + match payload { + DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { + Ok(DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fixed_dt() -> NaiveDateTime { + chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() + } + + fn review_logged() -> DomainEvent { + DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(4).unwrap(), + watched_at: fixed_dt(), + } + } + + fn review_updated() -> DomainEvent { + DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(3).unwrap(), + watched_at: fixed_dt(), + } + } + + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + } + } + + fn round_trip(event: DomainEvent) { + let payload = DbEventPayload::from(&event); + let json = serde_json::to_string(&payload).expect("serialize"); + let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize"); + let recovered = DomainEvent::try_from(back).expect("try_from"); + assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered)); + } + + #[test] + fn round_trip_review_logged() { + round_trip(review_logged()); + } + + #[test] + fn round_trip_review_updated() { + round_trip(review_updated()); + } + + #[test] + fn round_trip_movie_discovered() { + round_trip(movie_discovered()); + } + + #[test] + fn serialized_format_is_tagged() { + let payload = DbEventPayload::from(&movie_discovered()); + let json = serde_json::to_string(&payload).unwrap(); + assert!(json.contains(r#""type":"MovieDiscovered""#)); + assert!(json.contains(r#""data":"#)); + } + + #[test] + fn event_type_strings() { + assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged"); + assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); + assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); + } +} diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 8ffbb92..d955fca 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" [features] default = ["sqlite", "sqlite-federation"] -sqlite = ["dep:sqlite"] -postgres = ["dep:postgres"] +sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] +postgres = ["dep:postgres", "dep:postgres-event-queue"] # Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working federation = [] sqlite-federation = [ @@ -59,6 +59,8 @@ utoipa = { version = "5.5.0", features = ["axum_extras", "uuid"] } # Optional — database backends sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } +sqlite-event-queue = { workspace = true, optional = true } +postgres-event-queue = { workspace = true, optional = true } # Optional — federation activitypub = { workspace = true, optional = true } diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index a74af36..25f9f53 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -1,8 +1,6 @@ use std::sync::Arc; use anyhow::Context; -use event_publisher::{EventPublisherConfig, NoopEventPublisher, create_event_channel}; -use application::event_handlers::PosterSyncHandler; use std::str::FromStr; use tokio::net::TcpListener; @@ -20,11 +18,11 @@ use postgres_federation::PostgresFederationRepository; #[cfg(feature = "federation")] use activitypub::{ - ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter, + ActivityPubPort, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler, }; -use application::{config::AppConfig, context::AppContext, worker::WorkerService}; +use application::{config::AppConfig, context::AppContext}; use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService}; use export::ExportAdapter; use metadata::MetadataClientImpl; @@ -38,7 +36,7 @@ use presentation::{openapi::ApiDoc, routes, state::AppState}; use utoipa::OpenApi as _; use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, EventHandler, EventPublisher, MetadataClient, + AuthService, DiaryExporter, DiaryRepository, EventPublisher, MetadataClient, MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, UserRepository, }; @@ -89,10 +87,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let auth_service: Arc = Arc::new(JwtAuthService::new(auth_config)); let password_hasher: Arc = Arc::new(Argon2PasswordHasher); - // Only track pools when the federation feature for that backend needs them - #[cfg(feature = "sqlite-federation")] + // Track pools — needed for federation and DB event queue + #[cfg(feature = "sqlite")] let mut sqlite_pool: Option = None; - #[cfg(feature = "postgres-federation")] + #[cfg(feature = "postgres")] let mut pg_pool: Option = None; let (movie_repository, review_repository, diary_repository, stats_repository, user_repository): @@ -101,39 +99,20 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (_pool, m, r, d, s, u) = wire_postgres(&database_url).await?; - #[cfg(feature = "postgres-federation")] - { pg_pool = Some(_pool); } + let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?; + pg_pool = Some(pool); (m, r, d, s, u) } #[cfg(feature = "sqlite")] _ => { - let (_pool, m, r, d, s, u) = wire_sqlite(&database_url).await?; - #[cfg(feature = "sqlite-federation")] - { sqlite_pool = Some(_pool); } + let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?; + sqlite_pool = Some(pool); (m, r, d, s, u) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"), }; - // Build handler context (used for poster sync handler) - let handler_ctx = AppContext { - movie_repository: Arc::clone(&movie_repository), - review_repository: Arc::clone(&review_repository), - diary_repository: Arc::clone(&diary_repository), - diary_exporter: Arc::new(ExportAdapter) as Arc, - stats_repository: Arc::clone(&stats_repository), - metadata_client: Arc::clone(&metadata_client), - poster_fetcher: Arc::clone(&poster_fetcher), - poster_storage: Arc::clone(&poster_storage), - event_publisher: Arc::new(NoopEventPublisher), - auth_service: Arc::clone(&auth_service), - password_hasher: Arc::clone(&password_hasher), - user_repository: Arc::clone(&user_repository), - config: app_config.clone(), - }; - // Wire up event channel, federation service, and ap_router #[cfg(feature = "federation")] let (event_publisher_arc, ap_router, ap_service, social_query) = { @@ -176,26 +155,50 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { .await?, ); let ap_router = concrete_ap_service.router(); - let ap_event_handler = ActivityPubEventHandler::new( - Arc::clone(&concrete_ap_service), - Arc::clone(&movie_repository), - Arc::clone(&review_repository), - app_config.base_url.clone(), - ); let ap_service_arc: Arc = concrete_ap_service; - let ep = build_event_publisher( - handler_ctx, - vec![Arc::new(ap_event_handler) as Arc], - ).await?; + let ep: Arc = if let Ok(cfg) = nats::NatsConfig::from_env() { + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_publisher(cfg).await? + } else { + tracing::info!("event bus: DB queue"); + match backend.as_str() { + #[cfg(feature = "postgres")] + "postgres" => postgres_event_queue::PostgresEventQueue::create_publisher( + pg_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(feature = "sqlite")] + _ => sqlite_event_queue::SqliteEventQueue::create_publisher( + sqlite_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + } + }; (ep, ap_router, ap_service_arc, social_query_arc) }; #[cfg(not(feature = "federation"))] - let (event_publisher_arc, ap_router): (Arc, axum::Router) = ( - build_event_publisher(handler_ctx, vec![]).await?, - axum::Router::new(), - ); + let event_publisher_arc: Arc = if let Ok(cfg) = nats::NatsConfig::from_env() { + tracing::info!("event bus: NATS ({})", cfg.url); + nats::create_publisher(cfg).await? + } else { + tracing::info!("event bus: DB queue"); + match backend.as_str() { + #[cfg(feature = "postgres")] + "postgres" => postgres_event_queue::PostgresEventQueue::create_publisher( + pg_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(feature = "sqlite")] + _ => sqlite_event_queue::SqliteEventQueue::create_publisher( + sqlite_pool.as_ref().unwrap().clone() + ).await?, + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + } + }; + #[cfg(not(feature = "federation"))] + let ap_router = axum::Router::new(); let app_ctx = AppContext { movie_repository, @@ -294,23 +297,6 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<( Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) } -async fn build_event_publisher( - handler_ctx: AppContext, - extra_handlers: Vec>, -) -> anyhow::Result> { - if let Ok(cfg) = nats::NatsConfig::from_env() { - tracing::info!("event bus: NATS ({})", cfg.url); - return nats::create_publisher(cfg).await; - } - tracing::info!("event bus: in-memory"); - let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3)); - let mut handlers: Vec> = vec![poster_handler]; - handlers.extend(extra_handlers); - let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); - tokio::spawn(WorkerService::new(Arc::new(consumer), handlers).run()); - Ok(Arc::new(publisher)) -} - fn init_tracing() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 6a9eec3..43d1c3f 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -5,8 +5,8 @@ edition = "2024" [features] default = ["sqlite"] -sqlite = ["dep:sqlite"] -postgres = ["dep:postgres"] +sqlite = ["dep:sqlite", "dep:sqlite-event-queue"] +postgres = ["dep:postgres", "dep:postgres-event-queue"] [dependencies] domain = { workspace = true } @@ -35,3 +35,5 @@ sqlx = { workspace = true } # Optional — database backends sqlite = { workspace = true, optional = true } postgres = { workspace = true, optional = true } +sqlite-event-queue = { workspace = true, optional = true } +postgres-event-queue = { workspace = true, optional = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 753e61f..c765eb4 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -4,7 +4,6 @@ use std::str::FromStr; use anyhow::Context; use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService}; use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService}; -use event_publisher::{EventPublisherConfig, create_event_channel}; use export::ExportAdapter; use metadata::MetadataClientImpl; use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; @@ -52,18 +51,25 @@ async fn main() -> anyhow::Result<()> { let auth_service: Arc = Arc::new(JwtAuthService::new(auth_config)); let password_hasher: Arc = Arc::new(Argon2PasswordHasher); + #[cfg(feature = "sqlite")] + let mut sqlite_pool: Option = None; + #[cfg(feature = "postgres")] + let mut pg_pool: Option = None; + let (movie_repository, review_repository, diary_repository, stats_repository, user_repository): (Arc, Arc, Arc, Arc, Arc) = match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (_, m, r, d, s, u) = wire_postgres(&database_url).await?; + let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?; + pg_pool = Some(pool); (m, r, d, s, u) } #[cfg(feature = "sqlite")] _ => { - let (_, m, r, d, s, u) = wire_sqlite(&database_url).await?; + let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?; + sqlite_pool = Some(pool); (m, r, d, s, u) } #[cfg(not(feature = "sqlite"))] @@ -79,12 +85,19 @@ async fn main() -> anyhow::Result<()> { nats::create_channel(cfg).await? } Err(_) => { - tracing::info!("event bus: in-memory channel (NATS_URL not set)"); - let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env()); - ( - Arc::new(publisher) as Arc, - Arc::new(consumer) as Arc, - ) + tracing::info!("event bus: DB queue"); + match backend.as_str() { + #[cfg(feature = "postgres")] + "postgres" => postgres_event_queue::PostgresEventQueue::create_channel( + pg_pool.unwrap() + ).await?, + #[cfg(feature = "sqlite")] + _ => sqlite_event_queue::SqliteEventQueue::create_channel( + sqlite_pool.unwrap() + ).await?, + #[cfg(not(feature = "sqlite"))] + _ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"), + } } };