diff --git a/Cargo.lock b/Cargo.lock index a526cd4..3b8a889 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1625,6 +1625,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-payload" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "domain", + "serde", + "uuid", +] + [[package]] name = "event-publisher" version = "0.1.0" @@ -2829,6 +2840,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "event-payload", "futures", "serde", "serde_json", @@ -3373,6 +3385,7 @@ dependencies = [ name = "postgres" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "chrono", "domain", @@ -3390,6 +3403,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "event-payload", "futures", "serde", "serde_json", @@ -3454,7 +3468,6 @@ dependencies = [ "doc", "domain", "dotenvy", - "event-publisher", "export", "http-body-util", "infer", @@ -4532,6 +4545,7 @@ dependencies = [ "async-trait", "chrono", "domain", + "event-payload", "futures", "serde", "serde_json", @@ -6242,7 +6256,6 @@ dependencies = [ "chrono", "domain", "dotenvy", - "event-publisher", "export", "futures", "metadata", diff --git a/Cargo.toml b/Cargo.toml index 6f982a4..995bc77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/adapters/activitypub", "crates/adapters/activitypub-base", "crates/adapters/export", + "crates/adapters/event-payload", "crates/adapters/nats", "crates/application", "crates/domain", @@ -67,6 +68,7 @@ template-askama = { path = "crates/adapters/template-askama" } activitypub = { path = "crates/adapters/activitypub" } activitypub-base = { path = "crates/adapters/activitypub-base" } doc = { path = "crates/doc" } +event-payload = { path = "crates/adapters/event-payload" } nats = { path = "crates/adapters/nats" } sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" } postgres-event-queue = { path = "crates/adapters/postgres-event-queue" } diff --git a/Dockerfile b/Dockerfile index 0512597..20c8aa2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,7 @@ COPY .sqlx ./.sqlx COPY crates/adapters/activitypub/Cargo.toml crates/adapters/activitypub/Cargo.toml COPY crates/adapters/activitypub-base/Cargo.toml crates/adapters/activitypub-base/Cargo.toml COPY crates/adapters/auth/Cargo.toml crates/adapters/auth/Cargo.toml +COPY crates/adapters/event-payload/Cargo.toml crates/adapters/event-payload/Cargo.toml COPY crates/adapters/event-publisher/Cargo.toml crates/adapters/event-publisher/Cargo.toml COPY crates/adapters/nats/Cargo.toml crates/adapters/nats/Cargo.toml COPY crates/adapters/metadata/Cargo.toml crates/adapters/metadata/Cargo.toml diff --git a/crates/adapters/event-payload/Cargo.toml b/crates/adapters/event-payload/Cargo.toml new file mode 100644 index 0000000..c56f6ff --- /dev/null +++ b/crates/adapters/event-payload/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "event-payload" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +serde = { workspace = true } +chrono = { workspace = true } +uuid = { workspace = true } +anyhow = { workspace = true } diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs new file mode 100644 index 0000000..97455f7 --- /dev/null +++ b/crates/adapters/event-payload/src/lib.rs @@ -0,0 +1,189 @@ +use chrono::NaiveDateTime; +use domain::{ + errors::DomainError, + events::DomainEvent, + value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", content = "data")] +pub enum EventPayload { + ReviewLogged { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + ReviewUpdated { + review_id: String, + movie_id: String, + user_id: String, + rating: u8, + watched_at: i64, + }, + MovieDiscovered { + movie_id: String, + external_metadata_id: String, + }, +} + +impl EventPayload { + pub fn event_type(&self) -> &'static str { + match self { + EventPayload::ReviewLogged { .. } => "ReviewLogged", + EventPayload::ReviewUpdated { .. } => "ReviewUpdated", + EventPayload::MovieDiscovered { .. } => "MovieDiscovered", + } + } +} + +fn parse_uuid(s: &str, field: &str) -> Result { + Uuid::parse_str(s) + .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) +} + +fn parse_ts(ts: i64) -> Result { + chrono::DateTime::from_timestamp(ts, 0) + .map(|dt| dt.naive_utc()) + .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) +} + +impl From<&DomainEvent> for EventPayload { + fn from(event: &DomainEvent) -> Self { + match event { + DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + EventPayload::ReviewLogged { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + EventPayload::ReviewUpdated { + review_id: review_id.value().to_string(), + movie_id: movie_id.value().to_string(), + user_id: user_id.value().to_string(), + rating: rating.value(), + watched_at: watched_at.and_utc().timestamp(), + } + } + DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { + EventPayload::MovieDiscovered { + movie_id: movie_id.value().to_string(), + external_metadata_id: external_metadata_id.value().to_owned(), + } + } + } + } +} + +impl TryFrom for DomainEvent { + type Error = DomainError; + fn try_from(payload: EventPayload) -> Result { + match payload { + EventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + EventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { + Ok(DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + rating: Rating::new(rating)?, + watched_at: parse_ts(watched_at)?, + }) + } + EventPayload::MovieDiscovered { movie_id, external_metadata_id } => { + Ok(DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), + external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fixed_dt() -> NaiveDateTime { + chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() + } + + fn review_logged() -> DomainEvent { + DomainEvent::ReviewLogged { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(4).unwrap(), + watched_at: fixed_dt(), + } + } + + fn review_updated() -> DomainEvent { + DomainEvent::ReviewUpdated { + review_id: ReviewId::from_uuid(Uuid::new_v4()), + movie_id: MovieId::from_uuid(Uuid::new_v4()), + user_id: UserId::from_uuid(Uuid::new_v4()), + rating: Rating::new(3).unwrap(), + watched_at: fixed_dt(), + } + } + + fn movie_discovered() -> DomainEvent { + DomainEvent::MovieDiscovered { + movie_id: MovieId::from_uuid(Uuid::new_v4()), + external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), + } + } + + fn round_trip(event: DomainEvent) { + let payload = EventPayload::from(&event); + let json = serde_json::to_string(&payload).expect("serialize"); + let back: EventPayload = serde_json::from_str(&json).expect("deserialize"); + let recovered = DomainEvent::try_from(back).expect("try_from"); + assert_eq!(EventPayload::from(&event), EventPayload::from(&recovered)); + } + + #[test] + fn round_trip_review_logged() { + round_trip(review_logged()); + } + + #[test] + fn round_trip_review_updated() { + round_trip(review_updated()); + } + + #[test] + fn round_trip_movie_discovered() { + round_trip(movie_discovered()); + } + + #[test] + fn serialized_format_is_tagged() { + let payload = EventPayload::from(&movie_discovered()); + let json = serde_json::to_string(&payload).unwrap(); + assert!(json.contains(r#""type":"MovieDiscovered""#)); + assert!(json.contains(r#""data":"#)); + } + + #[test] + fn event_type_strings() { + assert_eq!(EventPayload::from(&review_logged()).event_type(), "ReviewLogged"); + assert_eq!(EventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); + assert_eq!(EventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); + } +} diff --git a/crates/adapters/nats/Cargo.toml b/crates/adapters/nats/Cargo.toml index c9a5fec..086d788 100644 --- a/crates/adapters/nats/Cargo.toml +++ b/crates/adapters/nats/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" async-nats = "0.48.0" domain = { workspace = true } +event-payload = { workspace = true } async-trait = { workspace = true } anyhow = { workspace = true } thiserror = { workspace = true } diff --git a/crates/adapters/nats/src/payload.rs b/crates/adapters/nats/src/payload.rs index dab12bf..2ca19ec 100644 --- a/crates/adapters/nats/src/payload.rs +++ b/crates/adapters/nats/src/payload.rs @@ -1,172 +1 @@ -use chrono::NaiveDateTime; -use domain::{ - errors::DomainError, - events::DomainEvent, - value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, -}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "type", content = "data")] -pub enum NatsEventPayload { - ReviewLogged { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - ReviewUpdated { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - MovieDiscovered { - movie_id: String, - external_metadata_id: String, - }, -} - -fn parse_uuid(s: &str, field: &str) -> Result { - Uuid::parse_str(s) - .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) -} - -fn parse_ts(ts: i64) -> Result { - chrono::DateTime::from_timestamp(ts, 0) - .map(|dt| dt.naive_utc()) - .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) -} - -impl From<&DomainEvent> for NatsEventPayload { - fn from(event: &DomainEvent) -> Self { - match event { - DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - NatsEventPayload::ReviewLogged { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - NatsEventPayload::ReviewUpdated { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { - NatsEventPayload::MovieDiscovered { - movie_id: movie_id.value().to_string(), - external_metadata_id: external_metadata_id.value().to_owned(), - } - } - } - } -} - -impl TryFrom for DomainEvent { - type Error = DomainError; - fn try_from(payload: NatsEventPayload) -> Result { - match payload { - NatsEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - NatsEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - NatsEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { - Ok(DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, - }) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn fixed_dt() -> NaiveDateTime { - chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() - } - - fn review_logged() -> DomainEvent { - DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(4).unwrap(), - watched_at: fixed_dt(), - } - } - - fn review_updated() -> DomainEvent { - DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(3).unwrap(), - watched_at: fixed_dt(), - } - } - - fn movie_discovered() -> DomainEvent { - DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(Uuid::new_v4()), - external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), - } - } - - fn round_trip(event: DomainEvent) { - let payload = NatsEventPayload::from(&event); - let json = serde_json::to_string(&payload).expect("serialize"); - let back: NatsEventPayload = serde_json::from_str(&json).expect("deserialize"); - let recovered = DomainEvent::try_from(back).expect("try_from"); - assert_eq!(NatsEventPayload::from(&event), NatsEventPayload::from(&recovered)); - } - - #[test] - fn round_trip_review_logged() { - round_trip(review_logged()); - } - - #[test] - fn round_trip_review_updated() { - round_trip(review_updated()); - } - - #[test] - fn round_trip_movie_discovered() { - round_trip(movie_discovered()); - } - - #[test] - fn serialized_format_is_tagged() { - let payload = NatsEventPayload::from(&movie_discovered()); - let json = serde_json::to_string(&payload).unwrap(); - assert!(json.contains(r#""type":"MovieDiscovered""#)); - assert!(json.contains(r#""data":"#)); - } -} +pub use event_payload::EventPayload as NatsEventPayload; diff --git a/crates/adapters/postgres-event-queue/Cargo.toml b/crates/adapters/postgres-event-queue/Cargo.toml index 1bbb445..d1904c2 100644 --- a/crates/adapters/postgres-event-queue/Cargo.toml +++ b/crates/adapters/postgres-event-queue/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono"] } domain = { workspace = true } +event-payload = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } serde = { workspace = true } diff --git a/crates/adapters/postgres-event-queue/src/payload.rs b/crates/adapters/postgres-event-queue/src/payload.rs index 61ed07f..6df07ad 100644 --- a/crates/adapters/postgres-event-queue/src/payload.rs +++ b/crates/adapters/postgres-event-queue/src/payload.rs @@ -1,189 +1 @@ -use chrono::NaiveDateTime; -use domain::{ - errors::DomainError, - events::DomainEvent, - value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, -}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "type", content = "data")] -pub enum DbEventPayload { - ReviewLogged { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - ReviewUpdated { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - MovieDiscovered { - movie_id: String, - external_metadata_id: String, - }, -} - -impl DbEventPayload { - pub fn event_type(&self) -> &'static str { - match self { - DbEventPayload::ReviewLogged { .. } => "ReviewLogged", - DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated", - DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered", - } - } -} - -fn parse_uuid(s: &str, field: &str) -> Result { - Uuid::parse_str(s) - .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) -} - -fn parse_ts(ts: i64) -> Result { - chrono::DateTime::from_timestamp(ts, 0) - .map(|dt| dt.naive_utc()) - .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) -} - -impl From<&DomainEvent> for DbEventPayload { - fn from(event: &DomainEvent) -> Self { - match event { - DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - DbEventPayload::ReviewLogged { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - DbEventPayload::ReviewUpdated { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { - DbEventPayload::MovieDiscovered { - movie_id: movie_id.value().to_string(), - external_metadata_id: external_metadata_id.value().to_owned(), - } - } - } - } -} - -impl TryFrom for DomainEvent { - type Error = DomainError; - fn try_from(payload: DbEventPayload) -> Result { - match payload { - DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { - Ok(DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, - }) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn fixed_dt() -> NaiveDateTime { - chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() - } - - fn review_logged() -> DomainEvent { - DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(4).unwrap(), - watched_at: fixed_dt(), - } - } - - fn review_updated() -> DomainEvent { - DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(3).unwrap(), - watched_at: fixed_dt(), - } - } - - fn movie_discovered() -> DomainEvent { - DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(Uuid::new_v4()), - external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), - } - } - - fn round_trip(event: DomainEvent) { - let payload = DbEventPayload::from(&event); - let json = serde_json::to_string(&payload).expect("serialize"); - let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize"); - let recovered = DomainEvent::try_from(back).expect("try_from"); - assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered)); - } - - #[test] - fn round_trip_review_logged() { - round_trip(review_logged()); - } - - #[test] - fn round_trip_review_updated() { - round_trip(review_updated()); - } - - #[test] - fn round_trip_movie_discovered() { - round_trip(movie_discovered()); - } - - #[test] - fn serialized_format_is_tagged() { - let payload = DbEventPayload::from(&movie_discovered()); - let json = serde_json::to_string(&payload).unwrap(); - assert!(json.contains(r#""type":"MovieDiscovered""#)); - assert!(json.contains(r#""data":"#)); - } - - #[test] - fn event_type_strings() { - assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged"); - assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); - assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); - } -} +pub use event_payload::EventPayload as DbEventPayload; diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml index afd4c74..b571287 100644 --- a/crates/adapters/postgres/Cargo.toml +++ b/crates/adapters/postgres/Cargo.toml @@ -12,6 +12,7 @@ sqlx = { version = "0.8.6", features = [ "chrono", ] } domain = { workspace = true } +anyhow = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } tracing = { workspace = true } diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index b58ad6e..6f023b2 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -767,3 +767,33 @@ impl StatsRepository for PostgresRepository { }) } } + +pub async fn wire(database_url: &str) -> anyhow::Result<( + sqlx::PgPool, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, +)> { + use anyhow::Context; + + let pool = sqlx::PgPool::connect(database_url) + .await + .context("Failed to connect to PostgreSQL database")?; + + let repo = std::sync::Arc::new(PostgresRepository::new(pool.clone())); + repo.migrate() + .await + .map_err(|e| anyhow::anyhow!("{e}")) + .context("Database migration failed")?; + + Ok(( + pool.clone(), + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::new(PostgresUserRepository::new(pool)) as _, + )) +} diff --git a/crates/adapters/sqlite-event-queue/Cargo.toml b/crates/adapters/sqlite-event-queue/Cargo.toml index 175b6dc..a0b20d4 100644 --- a/crates/adapters/sqlite-event-queue/Cargo.toml +++ b/crates/adapters/sqlite-event-queue/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "sqlite", "macros", "chrono"] } domain = { workspace = true } +event-payload = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } serde = { workspace = true } diff --git a/crates/adapters/sqlite-event-queue/src/payload.rs b/crates/adapters/sqlite-event-queue/src/payload.rs index 61ed07f..6df07ad 100644 --- a/crates/adapters/sqlite-event-queue/src/payload.rs +++ b/crates/adapters/sqlite-event-queue/src/payload.rs @@ -1,189 +1 @@ -use chrono::NaiveDateTime; -use domain::{ - errors::DomainError, - events::DomainEvent, - value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId}, -}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "type", content = "data")] -pub enum DbEventPayload { - ReviewLogged { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - ReviewUpdated { - review_id: String, - movie_id: String, - user_id: String, - rating: u8, - watched_at: i64, - }, - MovieDiscovered { - movie_id: String, - external_metadata_id: String, - }, -} - -impl DbEventPayload { - pub fn event_type(&self) -> &'static str { - match self { - DbEventPayload::ReviewLogged { .. } => "ReviewLogged", - DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated", - DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered", - } - } -} - -fn parse_uuid(s: &str, field: &str) -> Result { - Uuid::parse_str(s) - .map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}"))) -} - -fn parse_ts(ts: i64) -> Result { - chrono::DateTime::from_timestamp(ts, 0) - .map(|dt| dt.naive_utc()) - .ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}"))) -} - -impl From<&DomainEvent> for DbEventPayload { - fn from(event: &DomainEvent) -> Self { - match event { - DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - DbEventPayload::ReviewLogged { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - DbEventPayload::ReviewUpdated { - review_id: review_id.value().to_string(), - movie_id: movie_id.value().to_string(), - user_id: user_id.value().to_string(), - rating: rating.value(), - watched_at: watched_at.and_utc().timestamp(), - } - } - DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => { - DbEventPayload::MovieDiscovered { - movie_id: movie_id.value().to_string(), - external_metadata_id: external_metadata_id.value().to_owned(), - } - } - } - } -} - -impl TryFrom for DomainEvent { - type Error = DomainError; - fn try_from(payload: DbEventPayload) -> Result { - match payload { - DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => { - Ok(DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?), - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), - rating: Rating::new(rating)?, - watched_at: parse_ts(watched_at)?, - }) - } - DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => { - Ok(DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?), - external_metadata_id: ExternalMetadataId::new(external_metadata_id)?, - }) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn fixed_dt() -> NaiveDateTime { - chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc() - } - - fn review_logged() -> DomainEvent { - DomainEvent::ReviewLogged { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(4).unwrap(), - watched_at: fixed_dt(), - } - } - - fn review_updated() -> DomainEvent { - DomainEvent::ReviewUpdated { - review_id: ReviewId::from_uuid(Uuid::new_v4()), - movie_id: MovieId::from_uuid(Uuid::new_v4()), - user_id: UserId::from_uuid(Uuid::new_v4()), - rating: Rating::new(3).unwrap(), - watched_at: fixed_dt(), - } - } - - fn movie_discovered() -> DomainEvent { - DomainEvent::MovieDiscovered { - movie_id: MovieId::from_uuid(Uuid::new_v4()), - external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(), - } - } - - fn round_trip(event: DomainEvent) { - let payload = DbEventPayload::from(&event); - let json = serde_json::to_string(&payload).expect("serialize"); - let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize"); - let recovered = DomainEvent::try_from(back).expect("try_from"); - assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered)); - } - - #[test] - fn round_trip_review_logged() { - round_trip(review_logged()); - } - - #[test] - fn round_trip_review_updated() { - round_trip(review_updated()); - } - - #[test] - fn round_trip_movie_discovered() { - round_trip(movie_discovered()); - } - - #[test] - fn serialized_format_is_tagged() { - let payload = DbEventPayload::from(&movie_discovered()); - let json = serde_json::to_string(&payload).unwrap(); - assert!(json.contains(r#""type":"MovieDiscovered""#)); - assert!(json.contains(r#""data":"#)); - } - - #[test] - fn event_type_strings() { - assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged"); - assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated"); - assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered"); - } -} +pub use event_payload::EventPayload as DbEventPayload; diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index e1fc6bd..fdcdcfb 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -759,6 +759,43 @@ impl StatsRepository for SqliteMovieRepository { } } +pub async fn wire(database_url: &str) -> anyhow::Result<( + sqlx::SqlitePool, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, + std::sync::Arc, +)> { + use std::str::FromStr; + use anyhow::Context; + use sqlx::sqlite::SqliteConnectOptions; + + let opts = SqliteConnectOptions::from_str(database_url) + .context("Invalid DATABASE_URL")? + .create_if_missing(true) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) + .busy_timeout(std::time::Duration::from_secs(5)); + let pool = sqlx::SqlitePool::connect_with(opts) + .await + .context("Failed to connect to SQLite database")?; + + let repo = std::sync::Arc::new(SqliteMovieRepository::new(pool.clone())); + repo.migrate() + .await + .map_err(|e| anyhow::anyhow!("{e}")) + .context("Database migration failed")?; + + Ok(( + pool.clone(), + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::clone(&repo) as _, + std::sync::Arc::new(SqliteUserRepository::new(pool)) as _, + )) +} + #[cfg(test)] mod feed_filter_tests { use super::*; diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 0894edd..1012893 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -49,7 +49,6 @@ metadata = { workspace = true } poster-fetcher = { workspace = true } poster-storage = { workspace = true } template-askama = { workspace = true } -event-publisher = { workspace = true } nats = { workspace = true, optional = true } rss = { workspace = true } export = { workspace = true } diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 8f789c5..5784597 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -1,18 +1,12 @@ use std::sync::Arc; use anyhow::Context; -use std::str::FromStr; use tokio::net::TcpListener; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -#[cfg(feature = "sqlite")] -use sqlite::{SqliteMovieRepository, SqliteUserRepository}; #[cfg(feature = "sqlite-federation")] use sqlite_federation::SqliteFederationRepository; - -#[cfg(feature = "postgres")] -use postgres::{PostgresRepository, PostgresUserRepository}; #[cfg(feature = "postgres-federation")] use postgres_federation::PostgresFederationRepository; @@ -36,9 +30,8 @@ use presentation::{openapi::ApiDoc, routes, state::AppState}; use utoipa::OpenApi as _; use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, EventPublisher, MetadataClient, - MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, - StatsRepository, UserRepository, + AuthService, DiaryExporter, EventPublisher, MetadataClient, + PasswordHasher, PosterFetcherClient, PosterStorage, }; #[cfg(not(any(feature = "sqlite", feature = "postgres")))] @@ -91,12 +84,12 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?; + let (pool, m, r, d, s, u) = postgres::wire(&database_url).await?; (m, r, d, s, u, DbPool::Postgres(pool)) } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?; + let (pool, m, r, d, s, u) = sqlite::wire(&database_url).await?; (m, r, d, s, u, DbPool::Sqlite(pool)) } #[cfg(not(feature = "sqlite"))] @@ -230,73 +223,6 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { Ok((state, ap_router)) } -#[cfg(feature = "sqlite")] -async fn wire_sqlite(database_url: &str) -> anyhow::Result<( - sqlx::SqlitePool, - Arc, - Arc, - Arc, - Arc, - Arc, -)> { - use sqlx::sqlite::SqliteConnectOptions; - - let opts = SqliteConnectOptions::from_str(database_url) - .context("Invalid DATABASE_URL")? - .create_if_missing(true) - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) - .busy_timeout(std::time::Duration::from_secs(5)); - let pool = sqlx::SqlitePool::connect_with(opts) - .await - .context("Failed to connect to SQLite database")?; - - let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone())); - sqlite_repo - .migrate() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - .context("Database migration failed")?; - - let movie_repository: Arc = Arc::clone(&sqlite_repo) as _; - let review_repository: Arc = Arc::clone(&sqlite_repo) as _; - let diary_repository: Arc = Arc::clone(&sqlite_repo) as _; - let stats_repository: Arc = Arc::clone(&sqlite_repo) as _; - let user_repository: Arc = - Arc::new(SqliteUserRepository::new(pool.clone())); - - Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) -} - -#[cfg(feature = "postgres")] -async fn wire_postgres(database_url: &str) -> anyhow::Result<( - sqlx::PgPool, - Arc, - Arc, - Arc, - Arc, - Arc, -)> { - let pool = sqlx::PgPool::connect(database_url) - .await - .context("Failed to connect to PostgreSQL database")?; - - let pg_repo = Arc::new(PostgresRepository::new(pool.clone())); - pg_repo - .migrate() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - .context("Database migration failed")?; - - let movie_repository: Arc = Arc::clone(&pg_repo) as _; - let review_repository: Arc = Arc::clone(&pg_repo) as _; - let diary_repository: Arc = Arc::clone(&pg_repo) as _; - let stats_repository: Arc = Arc::clone(&pg_repo) as _; - let user_repository: Arc = - Arc::new(PostgresUserRepository::new(pool.clone())); - - Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) -} - enum DbPool { #[cfg(feature = "sqlite")] Sqlite(sqlx::SqlitePool), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 66035ae..fee9067 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -15,7 +15,6 @@ postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", [dependencies] domain = { workspace = true } application = { workspace = true } -event-publisher = { workspace = true } tokio = { workspace = true } anyhow = { workspace = true } thiserror = { workspace = true } diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 7eb4d8f..b9fa7bf 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::str::FromStr; use anyhow::Context; use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService}; @@ -10,12 +9,6 @@ use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; use poster_storage::{PosterStorageAdapter, StorageConfig}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -#[cfg(feature = "sqlite")] -use sqlite::{SqliteMovieRepository, SqliteUserRepository}; - -#[cfg(feature = "postgres")] -use postgres::{PostgresRepository, PostgresUserRepository}; - #[cfg(feature = "sqlite-federation")] use sqlite_federation::SqliteFederationRepository; #[cfg(feature = "postgres-federation")] @@ -27,9 +20,8 @@ use activitypub::{ }; use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, EventHandler, MetadataClient, MovieRepository, - PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, - UserRepository, + AuthService, DiaryExporter, EventHandler, MetadataClient, + PasswordHasher, PosterFetcherClient, PosterStorage, }; #[cfg(not(any(feature = "sqlite", feature = "postgres")))] @@ -65,12 +57,12 @@ async fn main() -> anyhow::Result<()> { match backend.as_str() { #[cfg(feature = "postgres")] "postgres" => { - let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?; + let (pool, m, r, d, s, u) = postgres::wire(&database_url).await?; (m, r, d, s, u, DbPool::Postgres(pool)) } #[cfg(feature = "sqlite")] _ => { - let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?; + let (pool, m, r, d, s, u) = sqlite::wire(&database_url).await?; (m, r, d, s, u, DbPool::Sqlite(pool)) } #[cfg(not(feature = "sqlite"))] @@ -125,50 +117,57 @@ async fn main() -> anyhow::Result<()> { config: app_config, }; - let mut handlers: Vec> = vec![Arc::new(PosterSyncHandler::new(ctx, 3))]; + let handlers: Vec> = { + let poster = Arc::new(PosterSyncHandler::new(ctx, 3)) as Arc; - #[cfg(feature = "federation")] - { - let (federation_repo, review_store): ( - Arc, - Arc, - ) = match &db_pool { - #[cfg(feature = "sqlite-federation")] - DbPool::Sqlite(pool) => { - let fed = Arc::new(SqliteFederationRepository::new(pool.clone())); - (Arc::clone(&fed) as _, fed as _) - } - #[cfg(feature = "postgres-federation")] - DbPool::Postgres(pool) => { - let fed = Arc::new(PostgresFederationRepository::new(pool.clone())); - (Arc::clone(&fed) as _, fed as _) - } - }; + #[cfg(not(feature = "federation"))] + { vec![poster] } - let ap_service = Arc::new( - ActivityPubService::new( - federation_repo, - Arc::new(DomainUserRepoAdapter(fed_user_repo)), - Arc::new(ReviewObjectHandler { - movie_repository: Arc::clone(&fed_movie_repo), - diary_repository: fed_diary_repo, - review_store, - base_url: base_url.clone(), - }), - base_url.clone(), - cfg!(debug_assertions), - ) - .await?, - ); + #[cfg(feature = "federation")] + { + let (federation_repo, review_store): ( + Arc, + Arc, + ) = match &db_pool { + #[cfg(feature = "sqlite-federation")] + DbPool::Sqlite(pool) => { + let fed = Arc::new(SqliteFederationRepository::new(pool.clone())); + (Arc::clone(&fed) as _, fed as _) + } + #[cfg(feature = "postgres-federation")] + DbPool::Postgres(pool) => { + let fed = Arc::new(PostgresFederationRepository::new(pool.clone())); + (Arc::clone(&fed) as _, fed as _) + } + }; - handlers.push(Arc::new(ActivityPubEventHandler::new( - ap_service, - fed_movie_repo, - fed_review_repo, - base_url, - ))); - tracing::info!("federation event handler registered"); - } + let ap_service = Arc::new( + ActivityPubService::new( + federation_repo, + Arc::new(DomainUserRepoAdapter(fed_user_repo)), + Arc::new(ReviewObjectHandler { + movie_repository: Arc::clone(&fed_movie_repo), + diary_repository: fed_diary_repo, + review_store, + base_url: base_url.clone(), + }), + base_url.clone(), + cfg!(debug_assertions), + ) + .await?, + ); + + let ap = Arc::new(ActivityPubEventHandler::new( + ap_service, + fed_movie_repo, + fed_review_repo, + base_url, + )) as Arc; + + tracing::info!("federation event handler registered"); + vec![poster, ap] + } + }; let worker = WorkerService::new(consumer_arc, handlers); @@ -218,69 +217,3 @@ fn init_tracing() { .init(); } -#[cfg(feature = "sqlite")] -async fn wire_sqlite(database_url: &str) -> anyhow::Result<( - sqlx::SqlitePool, - Arc, - Arc, - Arc, - Arc, - Arc, -)> { - use sqlx::sqlite::SqliteConnectOptions; - - let opts = SqliteConnectOptions::from_str(database_url) - .context("Invalid DATABASE_URL")? - .create_if_missing(true) - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) - .busy_timeout(std::time::Duration::from_secs(5)); - let pool = sqlx::SqlitePool::connect_with(opts) - .await - .context("Failed to connect to SQLite database")?; - - let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone())); - sqlite_repo - .migrate() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - .context("Database migration failed")?; - - let movie_repository: Arc = Arc::clone(&sqlite_repo) as _; - let review_repository: Arc = Arc::clone(&sqlite_repo) as _; - let diary_repository: Arc = Arc::clone(&sqlite_repo) as _; - let stats_repository: Arc = Arc::clone(&sqlite_repo) as _; - let user_repository: Arc = - Arc::new(SqliteUserRepository::new(pool.clone())); - - Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) -} - -#[cfg(feature = "postgres")] -async fn wire_postgres(database_url: &str) -> anyhow::Result<( - sqlx::PgPool, - Arc, - Arc, - Arc, - Arc, - Arc, -)> { - let pool = sqlx::PgPool::connect(database_url) - .await - .context("Failed to connect to PostgreSQL database")?; - - let pg_repo = Arc::new(PostgresRepository::new(pool.clone())); - pg_repo - .migrate() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - .context("Database migration failed")?; - - let movie_repository: Arc = Arc::clone(&pg_repo) as _; - let review_repository: Arc = Arc::clone(&pg_repo) as _; - let diary_repository: Arc = Arc::clone(&pg_repo) as _; - let stats_repository: Arc = Arc::clone(&pg_repo) as _; - let user_repository: Arc = - Arc::new(PostgresUserRepository::new(pool.clone())); - - Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository)) -}