feat: implement SQLite and Postgres event queue adapters
- Added SQLite and Postgres event queue implementations with migrations and payload structures. - Created migration scripts for both SQLite and Postgres event queues. - Implemented event publishing and consumption logic for both adapters. - Added serialization and deserialization for domain events to database payloads. - Updated presentation and worker crates to support new event queue features. - Refactored event handling to utilize the new database-backed event queues.
This commit is contained in:
30
Cargo.lock
generated
30
Cargo.lock
generated
@@ -3385,6 +3385,19 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "postgres-event-queue"
|
name = "postgres-event-queue"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"domain",
|
||||||
|
"futures",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sqlx",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "postgres-federation"
|
name = "postgres-federation"
|
||||||
@@ -3451,11 +3464,13 @@ dependencies = [
|
|||||||
"poster-fetcher",
|
"poster-fetcher",
|
||||||
"poster-storage",
|
"poster-storage",
|
||||||
"postgres",
|
"postgres",
|
||||||
|
"postgres-event-queue",
|
||||||
"postgres-federation",
|
"postgres-federation",
|
||||||
"rss 0.1.0",
|
"rss 0.1.0",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlite",
|
"sqlite",
|
||||||
|
"sqlite-event-queue",
|
||||||
"sqlite-federation",
|
"sqlite-federation",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"template-askama",
|
"template-askama",
|
||||||
@@ -4512,6 +4527,19 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlite-event-queue"
|
name = "sqlite-event-queue"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"domain",
|
||||||
|
"futures",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sqlx",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sqlite-federation"
|
name = "sqlite-federation"
|
||||||
@@ -6221,9 +6249,11 @@ dependencies = [
|
|||||||
"poster-fetcher",
|
"poster-fetcher",
|
||||||
"poster-storage",
|
"poster-storage",
|
||||||
"postgres",
|
"postgres",
|
||||||
|
"postgres-event-queue",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlite",
|
"sqlite",
|
||||||
|
"sqlite-event-queue",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"thiserror 2.0.18",
|
"thiserror 2.0.18",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|||||||
@@ -4,3 +4,14 @@ version = "0.1.0"
|
|||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono", "uuid"] }
|
||||||
|
domain = { workspace = true }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS event_queue (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
event_type TEXT NOT NULL,
|
||||||
|
payload TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
attempts INTEGER NOT NULL DEFAULT 0,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
last_error TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_event_queue_poll
|
||||||
|
ON event_queue (status, next_attempt_at);
|
||||||
@@ -1,14 +1,225 @@
|
|||||||
pub fn add(left: u64, right: u64) -> u64 {
|
mod migrations;
|
||||||
left + right
|
mod payload;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||||
|
ports::{EventConsumer, EventPublisher},
|
||||||
|
};
|
||||||
|
use futures::stream::{self, BoxStream};
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use tokio::sync::{Mutex, mpsc};
|
||||||
|
|
||||||
|
use payload::DbEventPayload;
|
||||||
|
|
||||||
|
pub struct DbEventQueueConfig {
|
||||||
|
pub poll_interval_ms: u64,
|
||||||
|
pub batch_size: i64,
|
||||||
|
pub max_attempts: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
impl DbEventQueueConfig {
|
||||||
mod tests {
|
pub fn from_env() -> Self {
|
||||||
use super::*;
|
Self {
|
||||||
|
poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS")
|
||||||
#[test]
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(500),
|
||||||
fn it_works() {
|
batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE")
|
||||||
let result = add(2, 2);
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(10),
|
||||||
assert_eq!(result, 4);
|
max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS")
|
||||||
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(5),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PostgresEventQueue {
|
||||||
|
pool: PgPool,
|
||||||
|
config: Arc<DbEventQueueConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostgresEventQueue {
|
||||||
|
pub async fn create(pool: PgPool, config: DbEventQueueConfig) -> anyhow::Result<Self> {
|
||||||
|
migrations::run(&pool).await?;
|
||||||
|
Ok(Self { pool, config: Arc::new(config) })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_publisher(pool: PgPool) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||||
|
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||||
|
Ok(Arc::new(q))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_channel(
|
||||||
|
pool: PgPool,
|
||||||
|
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
|
||||||
|
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||||
|
Ok((Arc::new(q.clone()), Arc::new(q)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for PostgresEventQueue {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let db_payload = DbEventPayload::from(event);
|
||||||
|
let event_type = db_payload.event_type();
|
||||||
|
let payload_json = serde_json::to_string(&db_payload)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?;
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO event_queue (event_type, payload) VALUES ($1, $2)"
|
||||||
|
)
|
||||||
|
.bind(event_type)
|
||||||
|
.bind(payload_json)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventConsumer for PostgresEventQueue {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
let config = Arc::clone(&self.config);
|
||||||
|
let (tx, rx) = mpsc::channel(128);
|
||||||
|
let rx = Arc::new(Mutex::new(rx));
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let poll_interval = Duration::from_millis(config.poll_interval_ms);
|
||||||
|
loop {
|
||||||
|
match claim_batch(&pool, &config).await {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("postgres event queue claim error: {e}");
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
Ok(rows) if rows.is_empty() => {
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
Ok(rows) => {
|
||||||
|
for row in rows {
|
||||||
|
let envelope = decode_row(&pool, row, config.max_attempts);
|
||||||
|
if tx.send(envelope).await.is_err() {
|
||||||
|
tracing::info!("postgres event queue consumer closed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// no sleep — re-poll immediately when batch was non-empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::pin(stream::unfold(rx, |rx| async move {
|
||||||
|
let item = rx.lock().await.recv().await?;
|
||||||
|
Some((item, rx))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Internal types ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct QueueRow {
|
||||||
|
id: i64,
|
||||||
|
payload: String,
|
||||||
|
attempts: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn claim_batch(
|
||||||
|
pool: &PgPool,
|
||||||
|
config: &DbEventQueueConfig,
|
||||||
|
) -> Result<Vec<QueueRow>, DomainError> {
|
||||||
|
// CTE with FOR UPDATE SKIP LOCKED — atomic and safe for multiple workers
|
||||||
|
let rows = sqlx::query_as::<_, QueueRow>(
|
||||||
|
r#"
|
||||||
|
WITH claimed AS (
|
||||||
|
SELECT id FROM event_queue
|
||||||
|
WHERE status = 'pending' AND next_attempt_at <= NOW()
|
||||||
|
ORDER BY next_attempt_at ASC
|
||||||
|
LIMIT $1
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
)
|
||||||
|
UPDATE event_queue q
|
||||||
|
SET status = 'processing'
|
||||||
|
FROM claimed
|
||||||
|
WHERE q.id = claimed.id
|
||||||
|
RETURNING q.id, q.payload, q.attempts
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.bind(config.batch_size)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("claim batch: {e}")))?;
|
||||||
|
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_row(
|
||||||
|
pool: &PgPool,
|
||||||
|
row: QueueRow,
|
||||||
|
max_attempts: i32,
|
||||||
|
) -> Result<EventEnvelope, DomainError> {
|
||||||
|
let db_payload: DbEventPayload = serde_json::from_str(&row.payload)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||||
|
let event = DomainEvent::try_from(db_payload)?;
|
||||||
|
Ok(EventEnvelope::new(event, Box::new(DbAckHandle {
|
||||||
|
pool: pool.clone(),
|
||||||
|
row_id: row.id,
|
||||||
|
attempts: row.attempts,
|
||||||
|
max_attempts,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DbAckHandle {
|
||||||
|
pool: PgPool,
|
||||||
|
row_id: i64,
|
||||||
|
attempts: i32,
|
||||||
|
max_attempts: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AckHandle for DbAckHandle {
|
||||||
|
async fn ack(&self) -> Result<(), DomainError> {
|
||||||
|
sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = $1")
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn nack(&self) -> Result<(), DomainError> {
|
||||||
|
let new_attempts = self.attempts + 1;
|
||||||
|
if new_attempts >= self.max_attempts {
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE event_queue SET status = 'dead_lettered', attempts = $1, last_error = 'max attempts reached' WHERE id = $2"
|
||||||
|
)
|
||||||
|
.bind(new_attempts)
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?;
|
||||||
|
} else {
|
||||||
|
let backoff = backoff_seconds(new_attempts).to_string();
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE event_queue SET status = 'pending', attempts = $1, next_attempt_at = NOW() + ($2 || ' seconds')::interval, last_error = 'nack' WHERE id = $3"
|
||||||
|
)
|
||||||
|
.bind(new_attempts)
|
||||||
|
.bind(backoff)
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn backoff_seconds(attempts: i32) -> i64 {
|
||||||
|
let base: i64 = 5 * (1i64 << attempts.min(6));
|
||||||
|
base.min(300)
|
||||||
|
}
|
||||||
|
|||||||
6
crates/adapters/postgres-event-queue/src/migrations.rs
Normal file
6
crates/adapters/postgres-event-queue/src/migrations.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> {
|
||||||
|
sqlx::migrate!("./migrations")
|
||||||
|
.run(pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}"))
|
||||||
|
}
|
||||||
189
crates/adapters/postgres-event-queue/src/payload.rs
Normal file
189
crates/adapters/postgres-event-queue/src/payload.rs
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
use chrono::NaiveDateTime;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum DbEventPayload {
|
||||||
|
ReviewLogged {
|
||||||
|
review_id: String,
|
||||||
|
movie_id: String,
|
||||||
|
user_id: String,
|
||||||
|
rating: u8,
|
||||||
|
watched_at: i64,
|
||||||
|
},
|
||||||
|
ReviewUpdated {
|
||||||
|
review_id: String,
|
||||||
|
movie_id: String,
|
||||||
|
user_id: String,
|
||||||
|
rating: u8,
|
||||||
|
watched_at: i64,
|
||||||
|
},
|
||||||
|
MovieDiscovered {
|
||||||
|
movie_id: String,
|
||||||
|
external_metadata_id: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DbEventPayload {
|
||||||
|
pub fn event_type(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||||
|
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||||
|
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||||
|
Uuid::parse_str(s)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||||
|
chrono::DateTime::from_timestamp(ts, 0)
|
||||||
|
.map(|dt| dt.naive_utc())
|
||||||
|
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&DomainEvent> for DbEventPayload {
|
||||||
|
fn from(event: &DomainEvent) -> Self {
|
||||||
|
match event {
|
||||||
|
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
DbEventPayload::ReviewLogged {
|
||||||
|
review_id: review_id.value().to_string(),
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
user_id: user_id.value().to_string(),
|
||||||
|
rating: rating.value(),
|
||||||
|
watched_at: watched_at.and_utc().timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
DbEventPayload::ReviewUpdated {
|
||||||
|
review_id: review_id.value().to_string(),
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
user_id: user_id.value().to_string(),
|
||||||
|
rating: rating.value(),
|
||||||
|
watched_at: watched_at.and_utc().timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||||
|
DbEventPayload::MovieDiscovered {
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||||
|
type Error = DomainError;
|
||||||
|
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||||
|
match payload {
|
||||||
|
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
Ok(DomainEvent::ReviewLogged {
|
||||||
|
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
rating: Rating::new(rating)?,
|
||||||
|
watched_at: parse_ts(watched_at)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
Ok(DomainEvent::ReviewUpdated {
|
||||||
|
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
rating: Rating::new(rating)?,
|
||||||
|
watched_at: parse_ts(watched_at)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||||
|
Ok(DomainEvent::MovieDiscovered {
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn fixed_dt() -> NaiveDateTime {
|
||||||
|
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn review_logged() -> DomainEvent {
|
||||||
|
DomainEvent::ReviewLogged {
|
||||||
|
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||||
|
rating: Rating::new(4).unwrap(),
|
||||||
|
watched_at: fixed_dt(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn review_updated() -> DomainEvent {
|
||||||
|
DomainEvent::ReviewUpdated {
|
||||||
|
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||||
|
rating: Rating::new(3).unwrap(),
|
||||||
|
watched_at: fixed_dt(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn movie_discovered() -> DomainEvent {
|
||||||
|
DomainEvent::MovieDiscovered {
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn round_trip(event: DomainEvent) {
|
||||||
|
let payload = DbEventPayload::from(&event);
|
||||||
|
let json = serde_json::to_string(&payload).expect("serialize");
|
||||||
|
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||||
|
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||||
|
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_review_logged() {
|
||||||
|
round_trip(review_logged());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_review_updated() {
|
||||||
|
round_trip(review_updated());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_movie_discovered() {
|
||||||
|
round_trip(movie_discovered());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serialized_format_is_tagged() {
|
||||||
|
let payload = DbEventPayload::from(&movie_discovered());
|
||||||
|
let json = serde_json::to_string(&payload).unwrap();
|
||||||
|
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||||
|
assert!(json.contains(r#""data":"#));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_type_strings() {
|
||||||
|
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||||
|
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||||
|
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,3 +4,14 @@ version = "0.1.0"
|
|||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "sqlite", "macros", "chrono"] }
|
||||||
|
domain = { workspace = true }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
|||||||
@@ -0,0 +1,13 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS event_queue (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
event_type TEXT NOT NULL,
|
||||||
|
payload TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
attempts INTEGER NOT NULL DEFAULT 0,
|
||||||
|
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
||||||
|
next_attempt_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
|
||||||
|
last_error TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_event_queue_poll
|
||||||
|
ON event_queue (status, next_attempt_at);
|
||||||
@@ -1,14 +1,236 @@
|
|||||||
pub fn add(left: u64, right: u64) -> u64 {
|
mod migrations;
|
||||||
left + right
|
mod payload;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::{AckHandle, DomainEvent, EventEnvelope},
|
||||||
|
ports::{EventConsumer, EventPublisher},
|
||||||
|
};
|
||||||
|
use futures::stream::{self, BoxStream};
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
use tokio::sync::{Mutex, mpsc};
|
||||||
|
|
||||||
|
use payload::DbEventPayload;
|
||||||
|
|
||||||
|
pub struct DbEventQueueConfig {
|
||||||
|
pub poll_interval_ms: u64,
|
||||||
|
pub batch_size: i64,
|
||||||
|
pub max_attempts: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
impl DbEventQueueConfig {
|
||||||
mod tests {
|
pub fn from_env() -> Self {
|
||||||
use super::*;
|
Self {
|
||||||
|
poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS")
|
||||||
#[test]
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(500),
|
||||||
fn it_works() {
|
batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE")
|
||||||
let result = add(2, 2);
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(10),
|
||||||
assert_eq!(result, 4);
|
max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS")
|
||||||
|
.ok().and_then(|v| v.parse().ok()).unwrap_or(5),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SqliteEventQueue {
|
||||||
|
pool: SqlitePool,
|
||||||
|
config: Arc<DbEventQueueConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteEventQueue {
|
||||||
|
pub async fn create(pool: SqlitePool, config: DbEventQueueConfig) -> anyhow::Result<Self> {
|
||||||
|
migrations::run(&pool).await?;
|
||||||
|
Ok(Self { pool, config: Arc::new(config) })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_publisher(pool: SqlitePool) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||||
|
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||||
|
Ok(Arc::new(q))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_channel(
|
||||||
|
pool: SqlitePool,
|
||||||
|
) -> anyhow::Result<(Arc<dyn EventPublisher>, Arc<dyn EventConsumer>)> {
|
||||||
|
let q = Self::create(pool, DbEventQueueConfig::from_env()).await?;
|
||||||
|
Ok((Arc::new(q.clone()), Arc::new(q)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventPublisher for SqliteEventQueue {
|
||||||
|
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let db_payload = DbEventPayload::from(event);
|
||||||
|
let event_type = db_payload.event_type();
|
||||||
|
let payload_json = serde_json::to_string(&db_payload)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?;
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO event_queue (event_type, payload) VALUES (?, ?)"
|
||||||
|
)
|
||||||
|
.bind(event_type)
|
||||||
|
.bind(payload_json)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventConsumer for SqliteEventQueue {
|
||||||
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
let config = Arc::clone(&self.config);
|
||||||
|
let (tx, rx) = mpsc::channel(128);
|
||||||
|
let rx = Arc::new(Mutex::new(rx));
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let poll_interval = Duration::from_millis(config.poll_interval_ms);
|
||||||
|
loop {
|
||||||
|
match claim_batch(&pool, &config).await {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("sqlite event queue claim error: {e}");
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
Ok(rows) if rows.is_empty() => {
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
Ok(rows) => {
|
||||||
|
for row in rows {
|
||||||
|
let envelope = decode_row(&pool, row, config.max_attempts);
|
||||||
|
if tx.send(envelope).await.is_err() {
|
||||||
|
tracing::info!("sqlite event queue consumer closed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// no sleep — re-poll immediately when batch was non-empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::pin(stream::unfold(rx, |rx| async move {
|
||||||
|
let item = rx.lock().await.recv().await?;
|
||||||
|
Some((item, rx))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Internal types ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct QueueRow {
|
||||||
|
id: i64,
|
||||||
|
payload: String,
|
||||||
|
attempts: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn claim_batch(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
config: &DbEventQueueConfig,
|
||||||
|
) -> Result<Vec<QueueRow>, DomainError> {
|
||||||
|
let mut tx = pool.begin().await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("begin tx: {e}")))?;
|
||||||
|
|
||||||
|
let rows = sqlx::query_as::<_, QueueRow>(
|
||||||
|
"SELECT id, payload, attempts FROM event_queue
|
||||||
|
WHERE status = 'pending'
|
||||||
|
AND next_attempt_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
|
||||||
|
ORDER BY next_attempt_at ASC
|
||||||
|
LIMIT ?"
|
||||||
|
)
|
||||||
|
.bind(config.batch_size)
|
||||||
|
.fetch_all(&mut *tx)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("select pending: {e}")))?;
|
||||||
|
|
||||||
|
if rows.is_empty() {
|
||||||
|
tx.rollback().await.ok();
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let placeholders = rows.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
|
||||||
|
let sql = format!(
|
||||||
|
"UPDATE event_queue SET status = 'processing' WHERE id IN ({})",
|
||||||
|
placeholders
|
||||||
|
);
|
||||||
|
let mut q = sqlx::query(&sql);
|
||||||
|
for r in &rows { q = q.bind(r.id); }
|
||||||
|
q.execute(&mut *tx).await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("mark processing: {e}")))?;
|
||||||
|
|
||||||
|
tx.commit().await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("commit claim: {e}")))?;
|
||||||
|
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_row(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
row: QueueRow,
|
||||||
|
max_attempts: i32,
|
||||||
|
) -> Result<EventEnvelope, DomainError> {
|
||||||
|
let db_payload: DbEventPayload = serde_json::from_str(&row.payload)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||||
|
let event = DomainEvent::try_from(db_payload)?;
|
||||||
|
Ok(EventEnvelope::new(event, Box::new(DbAckHandle {
|
||||||
|
pool: pool.clone(),
|
||||||
|
row_id: row.id,
|
||||||
|
attempts: row.attempts,
|
||||||
|
max_attempts,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DbAckHandle {
|
||||||
|
pool: SqlitePool,
|
||||||
|
row_id: i64,
|
||||||
|
attempts: i32,
|
||||||
|
max_attempts: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AckHandle for DbAckHandle {
|
||||||
|
async fn ack(&self) -> Result<(), DomainError> {
|
||||||
|
sqlx::query("UPDATE event_queue SET status = 'done' WHERE id = ?")
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("ack: {e}")))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn nack(&self) -> Result<(), DomainError> {
|
||||||
|
let new_attempts = self.attempts + 1;
|
||||||
|
if new_attempts >= self.max_attempts {
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE event_queue SET status = 'dead_lettered', attempts = ?, last_error = 'max attempts reached' WHERE id = ?"
|
||||||
|
)
|
||||||
|
.bind(new_attempts)
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("nack dead-letter: {e}")))?;
|
||||||
|
} else {
|
||||||
|
let backoff = backoff_seconds(new_attempts);
|
||||||
|
let sql = format!(
|
||||||
|
"UPDATE event_queue SET status = 'pending', attempts = ?, next_attempt_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now', '+{backoff} seconds'), last_error = 'nack' WHERE id = ?"
|
||||||
|
);
|
||||||
|
sqlx::query(&sql)
|
||||||
|
.bind(new_attempts)
|
||||||
|
.bind(self.row_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("nack retry: {e}")))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn backoff_seconds(attempts: i32) -> i64 {
|
||||||
|
let base: i64 = 5 * (1i64 << attempts.min(6));
|
||||||
|
base.min(300)
|
||||||
|
}
|
||||||
|
|||||||
6
crates/adapters/sqlite-event-queue/src/migrations.rs
Normal file
6
crates/adapters/sqlite-event-queue/src/migrations.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> {
|
||||||
|
sqlx::migrate!("./migrations")
|
||||||
|
.run(pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}"))
|
||||||
|
}
|
||||||
189
crates/adapters/sqlite-event-queue/src/payload.rs
Normal file
189
crates/adapters/sqlite-event-queue/src/payload.rs
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
use chrono::NaiveDateTime;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum DbEventPayload {
|
||||||
|
ReviewLogged {
|
||||||
|
review_id: String,
|
||||||
|
movie_id: String,
|
||||||
|
user_id: String,
|
||||||
|
rating: u8,
|
||||||
|
watched_at: i64,
|
||||||
|
},
|
||||||
|
ReviewUpdated {
|
||||||
|
review_id: String,
|
||||||
|
movie_id: String,
|
||||||
|
user_id: String,
|
||||||
|
rating: u8,
|
||||||
|
watched_at: i64,
|
||||||
|
},
|
||||||
|
MovieDiscovered {
|
||||||
|
movie_id: String,
|
||||||
|
external_metadata_id: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DbEventPayload {
|
||||||
|
pub fn event_type(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||||
|
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||||
|
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||||
|
Uuid::parse_str(s)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||||
|
chrono::DateTime::from_timestamp(ts, 0)
|
||||||
|
.map(|dt| dt.naive_utc())
|
||||||
|
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&DomainEvent> for DbEventPayload {
|
||||||
|
fn from(event: &DomainEvent) -> Self {
|
||||||
|
match event {
|
||||||
|
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
DbEventPayload::ReviewLogged {
|
||||||
|
review_id: review_id.value().to_string(),
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
user_id: user_id.value().to_string(),
|
||||||
|
rating: rating.value(),
|
||||||
|
watched_at: watched_at.and_utc().timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
DbEventPayload::ReviewUpdated {
|
||||||
|
review_id: review_id.value().to_string(),
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
user_id: user_id.value().to_string(),
|
||||||
|
rating: rating.value(),
|
||||||
|
watched_at: watched_at.and_utc().timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||||
|
DbEventPayload::MovieDiscovered {
|
||||||
|
movie_id: movie_id.value().to_string(),
|
||||||
|
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||||
|
type Error = DomainError;
|
||||||
|
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||||
|
match payload {
|
||||||
|
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
Ok(DomainEvent::ReviewLogged {
|
||||||
|
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
rating: Rating::new(rating)?,
|
||||||
|
watched_at: parse_ts(watched_at)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||||
|
Ok(DomainEvent::ReviewUpdated {
|
||||||
|
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
rating: Rating::new(rating)?,
|
||||||
|
watched_at: parse_ts(watched_at)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||||
|
Ok(DomainEvent::MovieDiscovered {
|
||||||
|
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||||
|
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn fixed_dt() -> NaiveDateTime {
|
||||||
|
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn review_logged() -> DomainEvent {
|
||||||
|
DomainEvent::ReviewLogged {
|
||||||
|
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||||
|
rating: Rating::new(4).unwrap(),
|
||||||
|
watched_at: fixed_dt(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn review_updated() -> DomainEvent {
|
||||||
|
DomainEvent::ReviewUpdated {
|
||||||
|
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||||
|
rating: Rating::new(3).unwrap(),
|
||||||
|
watched_at: fixed_dt(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn movie_discovered() -> DomainEvent {
|
||||||
|
DomainEvent::MovieDiscovered {
|
||||||
|
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||||
|
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn round_trip(event: DomainEvent) {
|
||||||
|
let payload = DbEventPayload::from(&event);
|
||||||
|
let json = serde_json::to_string(&payload).expect("serialize");
|
||||||
|
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||||
|
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||||
|
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_review_logged() {
|
||||||
|
round_trip(review_logged());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_review_updated() {
|
||||||
|
round_trip(review_updated());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn round_trip_movie_discovered() {
|
||||||
|
round_trip(movie_discovered());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn serialized_format_is_tagged() {
|
||||||
|
let payload = DbEventPayload::from(&movie_discovered());
|
||||||
|
let json = serde_json::to_string(&payload).unwrap();
|
||||||
|
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||||
|
assert!(json.contains(r#""data":"#));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_type_strings() {
|
||||||
|
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||||
|
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||||
|
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,8 +5,8 @@ edition = "2024"
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["sqlite", "sqlite-federation"]
|
default = ["sqlite", "sqlite-federation"]
|
||||||
sqlite = ["dep:sqlite"]
|
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||||
postgres = ["dep:postgres"]
|
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||||
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
|
# Meta-feature: true when any federation adapter is active — keeps all #[cfg(feature = "federation")] gates working
|
||||||
federation = []
|
federation = []
|
||||||
sqlite-federation = [
|
sqlite-federation = [
|
||||||
@@ -59,6 +59,8 @@ utoipa = { version = "5.5.0", features = ["axum_extras", "uuid"] }
|
|||||||
# Optional — database backends
|
# Optional — database backends
|
||||||
sqlite = { workspace = true, optional = true }
|
sqlite = { workspace = true, optional = true }
|
||||||
postgres = { workspace = true, optional = true }
|
postgres = { workspace = true, optional = true }
|
||||||
|
sqlite-event-queue = { workspace = true, optional = true }
|
||||||
|
postgres-event-queue = { workspace = true, optional = true }
|
||||||
|
|
||||||
# Optional — federation
|
# Optional — federation
|
||||||
activitypub = { workspace = true, optional = true }
|
activitypub = { workspace = true, optional = true }
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use event_publisher::{EventPublisherConfig, NoopEventPublisher, create_event_channel};
|
|
||||||
use application::event_handlers::PosterSyncHandler;
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
@@ -20,11 +18,11 @@ use postgres_federation::PostgresFederationRepository;
|
|||||||
|
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
use activitypub::{
|
use activitypub::{
|
||||||
ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
||||||
ReviewObjectHandler,
|
ReviewObjectHandler,
|
||||||
};
|
};
|
||||||
|
|
||||||
use application::{config::AppConfig, context::AppContext, worker::WorkerService};
|
use application::{config::AppConfig, context::AppContext};
|
||||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||||
use export::ExportAdapter;
|
use export::ExportAdapter;
|
||||||
use metadata::MetadataClientImpl;
|
use metadata::MetadataClientImpl;
|
||||||
@@ -38,7 +36,7 @@ use presentation::{openapi::ApiDoc, routes, state::AppState};
|
|||||||
use utoipa::OpenApi as _;
|
use utoipa::OpenApi as _;
|
||||||
|
|
||||||
use domain::ports::{
|
use domain::ports::{
|
||||||
AuthService, DiaryExporter, DiaryRepository, EventHandler, EventPublisher, MetadataClient,
|
AuthService, DiaryExporter, DiaryRepository, EventPublisher, MetadataClient,
|
||||||
MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository,
|
MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository,
|
||||||
StatsRepository, UserRepository,
|
StatsRepository, UserRepository,
|
||||||
};
|
};
|
||||||
@@ -89,10 +87,10 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||||
|
|
||||||
// Only track pools when the federation feature for that backend needs them
|
// Track pools — needed for federation and DB event queue
|
||||||
#[cfg(feature = "sqlite-federation")]
|
#[cfg(feature = "sqlite")]
|
||||||
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
||||||
#[cfg(feature = "postgres-federation")]
|
#[cfg(feature = "postgres")]
|
||||||
let mut pg_pool: Option<sqlx::PgPool> = None;
|
let mut pg_pool: Option<sqlx::PgPool> = None;
|
||||||
|
|
||||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
||||||
@@ -101,39 +99,20 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => {
|
"postgres" => {
|
||||||
let (_pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||||
#[cfg(feature = "postgres-federation")]
|
pg_pool = Some(pool);
|
||||||
{ pg_pool = Some(_pool); }
|
|
||||||
(m, r, d, s, u)
|
(m, r, d, s, u)
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => {
|
_ => {
|
||||||
let (_pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||||
#[cfg(feature = "sqlite-federation")]
|
sqlite_pool = Some(pool);
|
||||||
{ sqlite_pool = Some(_pool); }
|
|
||||||
(m, r, d, s, u)
|
(m, r, d, s, u)
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build (sqlite feature is not enabled)"),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Build handler context (used for poster sync handler)
|
|
||||||
let handler_ctx = AppContext {
|
|
||||||
movie_repository: Arc::clone(&movie_repository),
|
|
||||||
review_repository: Arc::clone(&review_repository),
|
|
||||||
diary_repository: Arc::clone(&diary_repository),
|
|
||||||
diary_exporter: Arc::new(ExportAdapter) as Arc<dyn DiaryExporter>,
|
|
||||||
stats_repository: Arc::clone(&stats_repository),
|
|
||||||
metadata_client: Arc::clone(&metadata_client),
|
|
||||||
poster_fetcher: Arc::clone(&poster_fetcher),
|
|
||||||
poster_storage: Arc::clone(&poster_storage),
|
|
||||||
event_publisher: Arc::new(NoopEventPublisher),
|
|
||||||
auth_service: Arc::clone(&auth_service),
|
|
||||||
password_hasher: Arc::clone(&password_hasher),
|
|
||||||
user_repository: Arc::clone(&user_repository),
|
|
||||||
config: app_config.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Wire up event channel, federation service, and ap_router
|
// Wire up event channel, federation service, and ap_router
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
||||||
@@ -176,26 +155,50 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
.await?,
|
.await?,
|
||||||
);
|
);
|
||||||
let ap_router = concrete_ap_service.router();
|
let ap_router = concrete_ap_service.router();
|
||||||
let ap_event_handler = ActivityPubEventHandler::new(
|
|
||||||
Arc::clone(&concrete_ap_service),
|
|
||||||
Arc::clone(&movie_repository),
|
|
||||||
Arc::clone(&review_repository),
|
|
||||||
app_config.base_url.clone(),
|
|
||||||
);
|
|
||||||
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
||||||
|
|
||||||
let ep = build_event_publisher(
|
let ep: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
|
||||||
handler_ctx,
|
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||||
vec![Arc::new(ap_event_handler) as Arc<dyn EventHandler>],
|
nats::create_publisher(cfg).await?
|
||||||
).await?;
|
} else {
|
||||||
|
tracing::info!("event bus: DB queue");
|
||||||
|
match backend.as_str() {
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
|
||||||
|
pg_pool.as_ref().unwrap().clone()
|
||||||
|
).await?,
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
||||||
|
sqlite_pool.as_ref().unwrap().clone()
|
||||||
|
).await?,
|
||||||
|
#[cfg(not(feature = "sqlite"))]
|
||||||
|
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
||||||
|
}
|
||||||
|
};
|
||||||
(ep, ap_router, ap_service_arc, social_query_arc)
|
(ep, ap_router, ap_service_arc, social_query_arc)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(not(feature = "federation"))]
|
#[cfg(not(feature = "federation"))]
|
||||||
let (event_publisher_arc, ap_router): (Arc<dyn EventPublisher>, axum::Router) = (
|
let event_publisher_arc: Arc<dyn EventPublisher> = if let Ok(cfg) = nats::NatsConfig::from_env() {
|
||||||
build_event_publisher(handler_ctx, vec![]).await?,
|
tracing::info!("event bus: NATS ({})", cfg.url);
|
||||||
axum::Router::new(),
|
nats::create_publisher(cfg).await?
|
||||||
);
|
} else {
|
||||||
|
tracing::info!("event bus: DB queue");
|
||||||
|
match backend.as_str() {
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
"postgres" => postgres_event_queue::PostgresEventQueue::create_publisher(
|
||||||
|
pg_pool.as_ref().unwrap().clone()
|
||||||
|
).await?,
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
_ => sqlite_event_queue::SqliteEventQueue::create_publisher(
|
||||||
|
sqlite_pool.as_ref().unwrap().clone()
|
||||||
|
).await?,
|
||||||
|
#[cfg(not(feature = "sqlite"))]
|
||||||
|
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
#[cfg(not(feature = "federation"))]
|
||||||
|
let ap_router = axum::Router::new();
|
||||||
|
|
||||||
let app_ctx = AppContext {
|
let app_ctx = AppContext {
|
||||||
movie_repository,
|
movie_repository,
|
||||||
@@ -294,23 +297,6 @@ async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
|||||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn build_event_publisher(
|
|
||||||
handler_ctx: AppContext,
|
|
||||||
extra_handlers: Vec<Arc<dyn EventHandler>>,
|
|
||||||
) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
|
||||||
if let Ok(cfg) = nats::NatsConfig::from_env() {
|
|
||||||
tracing::info!("event bus: NATS ({})", cfg.url);
|
|
||||||
return nats::create_publisher(cfg).await;
|
|
||||||
}
|
|
||||||
tracing::info!("event bus: in-memory");
|
|
||||||
let poster_handler = Arc::new(PosterSyncHandler::new(handler_ctx, 3));
|
|
||||||
let mut handlers: Vec<Arc<dyn EventHandler>> = vec![poster_handler];
|
|
||||||
handlers.extend(extra_handlers);
|
|
||||||
let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
|
||||||
tokio::spawn(WorkerService::new(Arc::new(consumer), handlers).run());
|
|
||||||
Ok(Arc::new(publisher))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn init_tracing() {
|
fn init_tracing() {
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(tracing_subscriber::EnvFilter::new(
|
.with(tracing_subscriber::EnvFilter::new(
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ edition = "2024"
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["sqlite"]
|
default = ["sqlite"]
|
||||||
sqlite = ["dep:sqlite"]
|
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||||
postgres = ["dep:postgres"]
|
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
@@ -35,3 +35,5 @@ sqlx = { workspace = true }
|
|||||||
# Optional — database backends
|
# Optional — database backends
|
||||||
sqlite = { workspace = true, optional = true }
|
sqlite = { workspace = true, optional = true }
|
||||||
postgres = { workspace = true, optional = true }
|
postgres = { workspace = true, optional = true }
|
||||||
|
sqlite-event-queue = { workspace = true, optional = true }
|
||||||
|
postgres-event-queue = { workspace = true, optional = true }
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ use std::str::FromStr;
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
||||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||||
use event_publisher::{EventPublisherConfig, create_event_channel};
|
|
||||||
use export::ExportAdapter;
|
use export::ExportAdapter;
|
||||||
use metadata::MetadataClientImpl;
|
use metadata::MetadataClientImpl;
|
||||||
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||||
@@ -52,18 +51,25 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||||
|
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
let mut sqlite_pool: Option<sqlx::SqlitePool> = None;
|
||||||
|
#[cfg(feature = "postgres")]
|
||||||
|
let mut pg_pool: Option<sqlx::PgPool> = None;
|
||||||
|
|
||||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository):
|
||||||
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
(Arc<dyn MovieRepository>, Arc<dyn ReviewRepository>, Arc<dyn DiaryRepository>,
|
||||||
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
Arc<dyn StatsRepository>, Arc<dyn UserRepository>) =
|
||||||
match backend.as_str() {
|
match backend.as_str() {
|
||||||
#[cfg(feature = "postgres")]
|
#[cfg(feature = "postgres")]
|
||||||
"postgres" => {
|
"postgres" => {
|
||||||
let (_, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||||
|
pg_pool = Some(pool);
|
||||||
(m, r, d, s, u)
|
(m, r, d, s, u)
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => {
|
_ => {
|
||||||
let (_, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||||
|
sqlite_pool = Some(pool);
|
||||||
(m, r, d, s, u)
|
(m, r, d, s, u)
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
@@ -79,12 +85,19 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
nats::create_channel(cfg).await?
|
nats::create_channel(cfg).await?
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
tracing::info!("event bus: in-memory channel (NATS_URL not set)");
|
tracing::info!("event bus: DB queue");
|
||||||
let (publisher, consumer) = create_event_channel(EventPublisherConfig::from_env());
|
match backend.as_str() {
|
||||||
(
|
#[cfg(feature = "postgres")]
|
||||||
Arc::new(publisher) as Arc<dyn domain::ports::EventPublisher>,
|
"postgres" => postgres_event_queue::PostgresEventQueue::create_channel(
|
||||||
Arc::new(consumer) as Arc<dyn domain::ports::EventConsumer>,
|
pg_pool.unwrap()
|
||||||
)
|
).await?,
|
||||||
|
#[cfg(feature = "sqlite")]
|
||||||
|
_ => sqlite_event_queue::SqliteEventQueue::create_channel(
|
||||||
|
sqlite_pool.unwrap()
|
||||||
|
).await?,
|
||||||
|
#[cfg(not(feature = "sqlite"))]
|
||||||
|
_ => anyhow::bail!("no event bus: NATS_URL not set and sqlite feature not enabled"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user