Compare commits
4 Commits
660a8d618d
...
f2761b8e97
| Author | SHA1 | Date | |
|---|---|---|---|
| f2761b8e97 | |||
| 08d0734d03 | |||
| 595460373b | |||
| 3714b6d7a7 |
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -10,6 +10,7 @@ dependencies = [
|
||||
"activitypub_federation",
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"chrono",
|
||||
"domain",
|
||||
"serde",
|
||||
@@ -1625,6 +1626,17 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-payload"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"domain",
|
||||
"serde",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-publisher"
|
||||
version = "0.1.0"
|
||||
@@ -2750,6 +2762,7 @@ dependencies = [
|
||||
name = "metadata"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"domain",
|
||||
"reqwest 0.13.3",
|
||||
@@ -2829,6 +2842,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"event-payload",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3373,6 +3387,7 @@ dependencies = [
|
||||
name = "postgres"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
@@ -3390,6 +3405,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"event-payload",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3454,7 +3470,6 @@ dependencies = [
|
||||
"doc",
|
||||
"domain",
|
||||
"dotenvy",
|
||||
"event-publisher",
|
||||
"export",
|
||||
"http-body-util",
|
||||
"infer",
|
||||
@@ -4532,6 +4547,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"domain",
|
||||
"event-payload",
|
||||
"futures",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6234,6 +6250,7 @@ dependencies = [
|
||||
name = "worker"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"activitypub",
|
||||
"anyhow",
|
||||
"application",
|
||||
"async-trait",
|
||||
@@ -6241,7 +6258,6 @@ dependencies = [
|
||||
"chrono",
|
||||
"domain",
|
||||
"dotenvy",
|
||||
"event-publisher",
|
||||
"export",
|
||||
"futures",
|
||||
"metadata",
|
||||
@@ -6250,10 +6266,12 @@ dependencies = [
|
||||
"poster-storage",
|
||||
"postgres",
|
||||
"postgres-event-queue",
|
||||
"postgres-federation",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlite",
|
||||
"sqlite-event-queue",
|
||||
"sqlite-federation",
|
||||
"sqlx",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
|
||||
@@ -16,6 +16,7 @@ members = [
|
||||
"crates/adapters/activitypub",
|
||||
"crates/adapters/activitypub-base",
|
||||
"crates/adapters/export",
|
||||
"crates/adapters/event-payload",
|
||||
"crates/adapters/nats",
|
||||
"crates/application",
|
||||
"crates/domain",
|
||||
@@ -67,6 +68,7 @@ template-askama = { path = "crates/adapters/template-askama" }
|
||||
activitypub = { path = "crates/adapters/activitypub" }
|
||||
activitypub-base = { path = "crates/adapters/activitypub-base" }
|
||||
doc = { path = "crates/doc" }
|
||||
event-payload = { path = "crates/adapters/event-payload" }
|
||||
nats = { path = "crates/adapters/nats" }
|
||||
sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" }
|
||||
postgres-event-queue = { path = "crates/adapters/postgres-event-queue" }
|
||||
|
||||
@@ -10,6 +10,7 @@ COPY .sqlx ./.sqlx
|
||||
COPY crates/adapters/activitypub/Cargo.toml crates/adapters/activitypub/Cargo.toml
|
||||
COPY crates/adapters/activitypub-base/Cargo.toml crates/adapters/activitypub-base/Cargo.toml
|
||||
COPY crates/adapters/auth/Cargo.toml crates/adapters/auth/Cargo.toml
|
||||
COPY crates/adapters/event-payload/Cargo.toml crates/adapters/event-payload/Cargo.toml
|
||||
COPY crates/adapters/event-publisher/Cargo.toml crates/adapters/event-publisher/Cargo.toml
|
||||
COPY crates/adapters/nats/Cargo.toml crates/adapters/nats/Cargo.toml
|
||||
COPY crates/adapters/metadata/Cargo.toml crates/adapters/metadata/Cargo.toml
|
||||
|
||||
58
README.md
58
README.md
@@ -24,24 +24,28 @@ Hexagonal (Ports & Adapters) with Domain-Driven Design:
|
||||
```
|
||||
domain — pure types and trait definitions, no external deps
|
||||
application — use cases / business logic orchestration
|
||||
presentation — Axum HTTP router, wires all adapters together
|
||||
presentation — Axum HTTP router, composition root for the HTTP process
|
||||
worker — standalone worker binary (event consumer, poster sync, federation)
|
||||
adapters/
|
||||
auth — JWT issuance and validation (Argon2 passwords)
|
||||
sqlite — SQLite repository via sqlx
|
||||
metadata — OMDb HTTP client
|
||||
poster-fetcher — downloads poster images
|
||||
poster-storage — uploads posters to local filesystem or S3-compatible storage
|
||||
template-askama — Askama HTML rendering
|
||||
rss — RSS/Atom feed generation
|
||||
export — CSV and JSON diary serialization
|
||||
sqlite-event-queue — polling event queue backed by SQLite (DB-queue mode)
|
||||
postgres-event-queue — polling event queue backed by PostgreSQL (DB-queue mode)
|
||||
auth — JWT issuance and validation (Argon2 passwords)
|
||||
sqlite — SQLite repository + connection factory
|
||||
postgres — PostgreSQL repository + connection factory
|
||||
metadata — TMDB / OMDb HTTP client
|
||||
poster-fetcher — downloads poster images
|
||||
poster-storage — stores posters on local filesystem or S3-compatible storage
|
||||
template-askama — Askama HTML rendering
|
||||
rss — RSS/Atom feed generation
|
||||
export — CSV and JSON diary serialization
|
||||
event-payload — shared event serialization DTOs (used by all event bus adapters)
|
||||
sqlite-event-queue — durable polling event queue backed by SQLite
|
||||
postgres-event-queue — durable polling event queue backed by PostgreSQL
|
||||
nats — NATS Core / JetStream event publisher and consumer
|
||||
event-publisher — in-memory event channel (used in tests)
|
||||
activitypub — ActivityPub federation (follow, inbox/outbox, actor)
|
||||
activitypub-base — core ActivityPub types and repository traits
|
||||
event-publisher — in-memory event channel (tests only)
|
||||
activitypub — ActivityPub federation wiring (follow, inbox/outbox, actor)
|
||||
activitypub-base — core ActivityPub protocol types and service
|
||||
sqlite-federation — SQLite-backed federation repository
|
||||
postgres-federation — PostgreSQL-backed federation repository
|
||||
doc — OpenAPI spec assembly and Swagger UI / Scalar serving
|
||||
worker — standalone worker binary (polls the event queue, syncs posters)
|
||||
tui — terminal UI client (ratatui)
|
||||
```
|
||||
|
||||
@@ -136,17 +140,39 @@ cargo test
|
||||
|
||||
## Docker
|
||||
|
||||
The image contains both `presentation` (HTTP server) and `worker` (event processor). Run them as separate containers sharing the same data volume:
|
||||
|
||||
```bash
|
||||
docker build -t movies-diary .
|
||||
# Build (SQLite + federation + NATS support)
|
||||
docker build -t movies-diary \
|
||||
--build-arg FEATURES=sqlite,sqlite-federation,nats .
|
||||
|
||||
# HTTP server
|
||||
docker run -p 3000:3000 \
|
||||
-e DATABASE_URL=sqlite:///data/movies.db \
|
||||
-e JWT_SECRET=change-me \
|
||||
-e OMDB_API_KEY=your-key \
|
||||
-e BASE_URL=https://yourdomain.example.com \
|
||||
-e EVENT_BUS_BACKEND=nats \
|
||||
-e NATS_URL=nats://nats:4222 \
|
||||
-v $(pwd)/data:/data \
|
||||
movies-diary
|
||||
|
||||
# Event worker (separate container, same image)
|
||||
docker run \
|
||||
-e DATABASE_URL=sqlite:///data/movies.db \
|
||||
-e JWT_SECRET=change-me \
|
||||
-e OMDB_API_KEY=your-key \
|
||||
-e BASE_URL=https://yourdomain.example.com \
|
||||
-e EVENT_BUS_BACKEND=nats \
|
||||
-e NATS_URL=nats://nats:4222 \
|
||||
-v $(pwd)/data:/data \
|
||||
--entrypoint ./worker \
|
||||
movies-diary
|
||||
```
|
||||
|
||||
To build for PostgreSQL: `--build-arg FEATURES=postgres,postgres-federation,nats`
|
||||
|
||||
## License
|
||||
|
||||
MIT License. See [LICENSE](LICENSE).
|
||||
|
||||
@@ -6,6 +6,7 @@ edition = "2024"
|
||||
[dependencies]
|
||||
activitypub-base = { workspace = true }
|
||||
domain = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -17,3 +17,49 @@ pub use port::{ActivityPubPort, NoopActivityPubService};
|
||||
pub use remote_review_repository::RemoteReviewRepository;
|
||||
pub use review_handler::ReviewObjectHandler;
|
||||
pub use user_adapter::DomainUserRepoAdapter;
|
||||
|
||||
pub struct ActivityPubWire {
|
||||
pub service: std::sync::Arc<dyn ActivityPubPort>,
|
||||
pub router: axum::Router,
|
||||
pub event_handler: std::sync::Arc<dyn domain::ports::EventHandler>,
|
||||
}
|
||||
|
||||
pub async fn wire(
|
||||
federation_repo: std::sync::Arc<dyn FederationRepository>,
|
||||
review_store: std::sync::Arc<dyn RemoteReviewRepository>,
|
||||
user_repo: std::sync::Arc<dyn domain::ports::UserRepository>,
|
||||
movie_repo: std::sync::Arc<dyn domain::ports::MovieRepository>,
|
||||
review_repo: std::sync::Arc<dyn domain::ports::ReviewRepository>,
|
||||
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
||||
base_url: String,
|
||||
) -> anyhow::Result<ActivityPubWire> {
|
||||
let concrete = std::sync::Arc::new(
|
||||
ActivityPubService::new(
|
||||
federation_repo,
|
||||
std::sync::Arc::new(DomainUserRepoAdapter(user_repo)),
|
||||
std::sync::Arc::new(ReviewObjectHandler {
|
||||
movie_repository: std::sync::Arc::clone(&movie_repo),
|
||||
diary_repository: diary_repo,
|
||||
review_store,
|
||||
base_url: base_url.clone(),
|
||||
}),
|
||||
base_url.clone(),
|
||||
cfg!(debug_assertions),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let router = concrete.router();
|
||||
let event_handler = std::sync::Arc::new(ActivityPubEventHandler::new(
|
||||
std::sync::Arc::clone(&concrete),
|
||||
movie_repo,
|
||||
review_repo,
|
||||
base_url,
|
||||
)) as std::sync::Arc<dyn domain::ports::EventHandler>;
|
||||
|
||||
Ok(ActivityPubWire {
|
||||
service: concrete as std::sync::Arc<dyn ActivityPubPort>,
|
||||
router,
|
||||
event_handler,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -105,3 +105,14 @@ impl PasswordHasher for Argon2PasswordHasher {
|
||||
.is_ok())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create() -> anyhow::Result<(
|
||||
std::sync::Arc<dyn domain::ports::AuthService>,
|
||||
std::sync::Arc<dyn domain::ports::PasswordHasher>,
|
||||
)> {
|
||||
let config = AuthConfig::from_env()?;
|
||||
Ok((
|
||||
std::sync::Arc::new(JwtAuthService::new(config)),
|
||||
std::sync::Arc::new(Argon2PasswordHasher),
|
||||
))
|
||||
}
|
||||
|
||||
11
crates/adapters/event-payload/Cargo.toml
Normal file
11
crates/adapters/event-payload/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "event-payload"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
189
crates/adapters/event-payload/src/lib.rs
Normal file
189
crates/adapters/event-payload/src/lib.rs
Normal file
@@ -0,0 +1,189 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum EventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl EventPayload {
|
||||
pub fn event_type(&self) -> &'static str {
|
||||
match self {
|
||||
EventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||
EventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||
EventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for EventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
EventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
EventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
EventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<EventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: EventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
EventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
EventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
EventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = EventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: EventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(EventPayload::from(&event), EventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = EventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_type_strings() {
|
||||
assert_eq!(EventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||
assert_eq!(EventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(EventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -65,3 +65,14 @@ impl MetadataClient for MetadataClientImpl {
|
||||
Ok(pm.poster_url)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::MetadataClient>> {
|
||||
use anyhow::Context;
|
||||
if let Ok(key) = std::env::var("TMDB_API_KEY") {
|
||||
Ok(std::sync::Arc::new(MetadataClientImpl::new_tmdb(key)))
|
||||
} else {
|
||||
let key = std::env::var("OMDB_API_KEY")
|
||||
.context("either TMDB_API_KEY or OMDB_API_KEY must be set")?;
|
||||
Ok(std::sync::Arc::new(MetadataClientImpl::new_omdb(key)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ edition = "2024"
|
||||
async-nats = "0.48.0"
|
||||
|
||||
domain = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -1,172 +1 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum NatsEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for NatsEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
NatsEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
NatsEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
NatsEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<NatsEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: NatsEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
NatsEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
NatsEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
NatsEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = NatsEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: NatsEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(NatsEventPayload::from(&event), NatsEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = NatsEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
}
|
||||
pub use event_payload::EventPayload as NatsEventPayload;
|
||||
|
||||
@@ -36,3 +36,7 @@ impl PosterFetcherClient for ReqwestPosterFetcher {
|
||||
Ok(bytes.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterFetcherClient>> {
|
||||
Ok(std::sync::Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?))
|
||||
}
|
||||
|
||||
@@ -68,6 +68,10 @@ impl PosterStorage for PosterStorageAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create() -> anyhow::Result<std::sync::Arc<dyn domain::ports::PosterStorage>> {
|
||||
Ok(std::sync::Arc::new(PosterStorageAdapter::from_config(StorageConfig::from_env()?)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -6,6 +6,7 @@ edition = "2024"
|
||||
[dependencies]
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono"] }
|
||||
domain = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -1,189 +1 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum DbEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl DbEventPayload {
|
||||
pub fn event_type(&self) -> &'static str {
|
||||
match self {
|
||||
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for DbEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
DbEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = DbEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = DbEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_type_strings() {
|
||||
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
}
|
||||
pub use event_payload::EventPayload as DbEventPayload;
|
||||
|
||||
@@ -474,3 +474,16 @@ impl domain::ports::SocialQueryPort for PostgresFederationRepository {
|
||||
Ok(rows.into_iter().map(|(url, handle, display_name)| domain::ports::RemoteActorInfo { url, handle, display_name }).collect())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wire(pool: sqlx::PgPool) -> (
|
||||
std::sync::Arc<dyn activitypub::FederationRepository>,
|
||||
std::sync::Arc<dyn domain::ports::SocialQueryPort>,
|
||||
std::sync::Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) {
|
||||
let fed = std::sync::Arc::new(PostgresFederationRepository::new(pool));
|
||||
(
|
||||
std::sync::Arc::clone(&fed) as _,
|
||||
std::sync::Arc::clone(&fed) as _,
|
||||
fed as _,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ sqlx = { version = "0.8.6", features = [
|
||||
"chrono",
|
||||
] }
|
||||
domain = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -767,3 +767,33 @@ impl StatsRepository for PostgresRepository {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wire(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::PgPool,
|
||||
std::sync::Arc<dyn domain::ports::MovieRepository>,
|
||||
std::sync::Arc<dyn domain::ports::ReviewRepository>,
|
||||
std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
||||
std::sync::Arc<dyn domain::ports::StatsRepository>,
|
||||
std::sync::Arc<dyn domain::ports::UserRepository>,
|
||||
)> {
|
||||
use anyhow::Context;
|
||||
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("Failed to connect to PostgreSQL database")?;
|
||||
|
||||
let repo = std::sync::Arc::new(PostgresRepository::new(pool.clone()));
|
||||
repo.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
Ok((
|
||||
pool.clone(),
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::new(PostgresUserRepository::new(pool)) as _,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ edition = "2024"
|
||||
[dependencies]
|
||||
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "sqlite", "macros", "chrono"] }
|
||||
domain = { workspace = true }
|
||||
event-payload = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@@ -1,189 +1 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, Rating, ReviewId, UserId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum DbEventPayload {
|
||||
ReviewLogged {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
ReviewUpdated {
|
||||
review_id: String,
|
||||
movie_id: String,
|
||||
user_id: String,
|
||||
rating: u8,
|
||||
watched_at: i64,
|
||||
},
|
||||
MovieDiscovered {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl DbEventPayload {
|
||||
pub fn event_type(&self) -> &'static str {
|
||||
match self {
|
||||
DbEventPayload::ReviewLogged { .. } => "ReviewLogged",
|
||||
DbEventPayload::ReviewUpdated { .. } => "ReviewUpdated",
|
||||
DbEventPayload::MovieDiscovered { .. } => "MovieDiscovered",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_uuid(s: &str, field: &str) -> Result<Uuid, DomainError> {
|
||||
Uuid::parse_str(s)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("{field}: {e}")))
|
||||
}
|
||||
|
||||
fn parse_ts(ts: i64) -> Result<NaiveDateTime, DomainError> {
|
||||
chrono::DateTime::from_timestamp(ts, 0)
|
||||
.map(|dt| dt.naive_utc())
|
||||
.ok_or_else(|| DomainError::InfrastructureError(format!("invalid timestamp: {ts}")))
|
||||
}
|
||||
|
||||
impl From<&DomainEvent> for DbEventPayload {
|
||||
fn from(event: &DomainEvent) -> Self {
|
||||
match event {
|
||||
DomainEvent::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewLogged {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
DbEventPayload::ReviewUpdated {
|
||||
review_id: review_id.value().to_string(),
|
||||
movie_id: movie_id.value().to_string(),
|
||||
user_id: user_id.value().to_string(),
|
||||
rating: rating.value(),
|
||||
watched_at: watched_at.and_utc().timestamp(),
|
||||
}
|
||||
}
|
||||
DomainEvent::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
DbEventPayload::MovieDiscovered {
|
||||
movie_id: movie_id.value().to_string(),
|
||||
external_metadata_id: external_metadata_id.value().to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<DbEventPayload> for DomainEvent {
|
||||
type Error = DomainError;
|
||||
fn try_from(payload: DbEventPayload) -> Result<Self, DomainError> {
|
||||
match payload {
|
||||
DbEventPayload::ReviewLogged { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::ReviewUpdated { review_id, movie_id, user_id, rating, watched_at } => {
|
||||
Ok(DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(parse_uuid(&review_id, "review_id")?),
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||
rating: Rating::new(rating)?,
|
||||
watched_at: parse_ts(watched_at)?,
|
||||
})
|
||||
}
|
||||
DbEventPayload::MovieDiscovered { movie_id, external_metadata_id } => {
|
||||
Ok(DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(parse_uuid(&movie_id, "movie_id")?),
|
||||
external_metadata_id: ExternalMetadataId::new(external_metadata_id)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn fixed_dt() -> NaiveDateTime {
|
||||
chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap().naive_utc()
|
||||
}
|
||||
|
||||
fn review_logged() -> DomainEvent {
|
||||
DomainEvent::ReviewLogged {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(4).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn review_updated() -> DomainEvent {
|
||||
DomainEvent::ReviewUpdated {
|
||||
review_id: ReviewId::from_uuid(Uuid::new_v4()),
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
user_id: UserId::from_uuid(Uuid::new_v4()),
|
||||
rating: Rating::new(3).unwrap(),
|
||||
watched_at: fixed_dt(),
|
||||
}
|
||||
}
|
||||
|
||||
fn movie_discovered() -> DomainEvent {
|
||||
DomainEvent::MovieDiscovered {
|
||||
movie_id: MovieId::from_uuid(Uuid::new_v4()),
|
||||
external_metadata_id: ExternalMetadataId::new("tt1234567".into()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn round_trip(event: DomainEvent) {
|
||||
let payload = DbEventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).expect("serialize");
|
||||
let back: DbEventPayload = serde_json::from_str(&json).expect("deserialize");
|
||||
let recovered = DomainEvent::try_from(back).expect("try_from");
|
||||
assert_eq!(DbEventPayload::from(&event), DbEventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_logged() {
|
||||
round_trip(review_logged());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_review_updated() {
|
||||
round_trip(review_updated());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_movie_discovered() {
|
||||
round_trip(movie_discovered());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialized_format_is_tagged() {
|
||||
let payload = DbEventPayload::from(&movie_discovered());
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
assert!(json.contains(r#""type":"MovieDiscovered""#));
|
||||
assert!(json.contains(r#""data":"#));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_type_strings() {
|
||||
assert_eq!(DbEventPayload::from(&review_logged()).event_type(), "ReviewLogged");
|
||||
assert_eq!(DbEventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(DbEventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
}
|
||||
pub use event_payload::EventPayload as DbEventPayload;
|
||||
|
||||
@@ -537,6 +537,19 @@ impl domain::ports::SocialQueryPort for SqliteFederationRepository {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wire(pool: sqlx::SqlitePool) -> (
|
||||
std::sync::Arc<dyn activitypub::FederationRepository>,
|
||||
std::sync::Arc<dyn domain::ports::SocialQueryPort>,
|
||||
std::sync::Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) {
|
||||
let fed = std::sync::Arc::new(SqliteFederationRepository::new(pool));
|
||||
(
|
||||
std::sync::Arc::clone(&fed) as _,
|
||||
std::sync::Arc::clone(&fed) as _,
|
||||
fed as _,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -759,6 +759,43 @@ impl StatsRepository for SqliteMovieRepository {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wire(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::SqlitePool,
|
||||
std::sync::Arc<dyn domain::ports::MovieRepository>,
|
||||
std::sync::Arc<dyn domain::ports::ReviewRepository>,
|
||||
std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
||||
std::sync::Arc<dyn domain::ports::StatsRepository>,
|
||||
std::sync::Arc<dyn domain::ports::UserRepository>,
|
||||
)> {
|
||||
use std::str::FromStr;
|
||||
use anyhow::Context;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(database_url)
|
||||
.context("Invalid DATABASE_URL")?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.busy_timeout(std::time::Duration::from_secs(5));
|
||||
let pool = sqlx::SqlitePool::connect_with(opts)
|
||||
.await
|
||||
.context("Failed to connect to SQLite database")?;
|
||||
|
||||
let repo = std::sync::Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||
repo.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e}"))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
Ok((
|
||||
pool.clone(),
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::clone(&repo) as _,
|
||||
std::sync::Arc::new(SqliteUserRepository::new(pool)) as _,
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod feed_filter_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -49,7 +49,6 @@ metadata = { workspace = true }
|
||||
poster-fetcher = { workspace = true }
|
||||
poster-storage = { workspace = true }
|
||||
template-askama = { workspace = true }
|
||||
event-publisher = { workspace = true }
|
||||
nats = { workspace = true, optional = true }
|
||||
rss = { workspace = true }
|
||||
export = { workspace = true }
|
||||
|
||||
@@ -1,33 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use std::str::FromStr;
|
||||
|
||||
use tokio::net::TcpListener;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
use sqlite_federation::SqliteFederationRepository;
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
use postgres::{PostgresRepository, PostgresUserRepository};
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
use postgres_federation::PostgresFederationRepository;
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
use activitypub::{
|
||||
ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
||||
ReviewObjectHandler,
|
||||
};
|
||||
|
||||
use application::{config::AppConfig, context::AppContext};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||
use poster_storage::{PosterStorageAdapter, StorageConfig};
|
||||
use rss::RssAdapter;
|
||||
use template_askama::AskamaHtmlRenderer;
|
||||
|
||||
@@ -35,11 +14,7 @@ use doc::ApiDocExt;
|
||||
use presentation::{openapi::ApiDoc, routes, state::AppState};
|
||||
use utoipa::OpenApi as _;
|
||||
|
||||
use domain::ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, EventPublisher, MetadataClient,
|
||||
MovieRepository, PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository,
|
||||
StatsRepository, UserRepository,
|
||||
};
|
||||
use domain::ports::{DiaryExporter, EventPublisher};
|
||||
|
||||
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
||||
compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres");
|
||||
@@ -66,37 +41,25 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
let auth_config = AuthConfig::from_env()?;
|
||||
let storage_config = StorageConfig::from_env()?;
|
||||
let app_config = AppConfig::from_env();
|
||||
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
||||
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
|
||||
|
||||
let metadata_client: Arc<dyn MetadataClient> =
|
||||
if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") {
|
||||
Arc::new(MetadataClientImpl::new_tmdb(tmdb_key))
|
||||
} else {
|
||||
let omdb_key = std::env::var("OMDB_API_KEY")
|
||||
.context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?;
|
||||
Arc::new(MetadataClientImpl::new_omdb(omdb_key))
|
||||
};
|
||||
let poster_fetcher: Arc<dyn PosterFetcherClient> =
|
||||
Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
|
||||
let poster_storage: Arc<dyn PosterStorage> =
|
||||
Arc::new(PosterStorageAdapter::from_config(storage_config));
|
||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||
let (auth_service, password_hasher) = auth::create()?;
|
||||
let metadata_client = metadata::create()?;
|
||||
let poster_fetcher = poster_fetcher::create()?;
|
||||
let poster_storage = poster_storage::create()?;
|
||||
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = postgres::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = sqlite::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
@@ -108,44 +71,26 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
let (event_publisher_arc, ap_router, ap_service, social_query) = {
|
||||
let (federation_repo, social_query_arc, review_store): (
|
||||
Arc<dyn activitypub::FederationRepository>,
|
||||
Arc<dyn domain::ports::SocialQueryPort>,
|
||||
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||
) = match &db_pool {
|
||||
let (federation_repo, social_query_arc, review_store) = match &db_pool {
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
DbPool::Postgres(pool) => {
|
||||
let fed = Arc::new(PostgresFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
DbPool::Sqlite(pool) => {
|
||||
let fed = Arc::new(SqliteFederationRepository::new(pool.clone()));
|
||||
(Arc::clone(&fed) as _, Arc::clone(&fed) as _, fed as _)
|
||||
}
|
||||
DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
|
||||
#[cfg(not(feature = "sqlite-federation"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
|
||||
};
|
||||
|
||||
let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository)));
|
||||
let review_handler = Arc::new(ReviewObjectHandler {
|
||||
movie_repository: Arc::clone(&movie_repository),
|
||||
diary_repository: Arc::clone(&diary_repository),
|
||||
let ap = activitypub::wire(
|
||||
federation_repo,
|
||||
review_store,
|
||||
base_url: app_config.base_url.clone(),
|
||||
});
|
||||
let concrete_ap_service = Arc::new(
|
||||
ActivityPubService::new(
|
||||
federation_repo,
|
||||
user_repo_adapter,
|
||||
review_handler,
|
||||
app_config.base_url.clone(),
|
||||
cfg!(debug_assertions),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
let ap_router = concrete_ap_service.router();
|
||||
let ap_service_arc: Arc<dyn ActivityPubPort> = concrete_ap_service;
|
||||
Arc::clone(&user_repository),
|
||||
Arc::clone(&movie_repository),
|
||||
Arc::clone(&review_repository),
|
||||
Arc::clone(&diary_repository),
|
||||
app_config.base_url.clone(),
|
||||
).await?;
|
||||
let ap_router = ap.router;
|
||||
let ap_service_arc = ap.service;
|
||||
|
||||
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||
EventBusBackend::Db => {
|
||||
@@ -230,73 +175,6 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
||||
Ok((state, ap_router))
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn wire_sqlite(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::SqlitePool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(database_url)
|
||||
.context("Invalid DATABASE_URL")?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.busy_timeout(std::time::Duration::from_secs(5));
|
||||
let pool = sqlx::SqlitePool::connect_with(opts)
|
||||
.await
|
||||
.context("Failed to connect to SQLite database")?;
|
||||
|
||||
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||
sqlite_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(SqliteUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::PgPool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("Failed to connect to PostgreSQL database")?;
|
||||
|
||||
let pg_repo = Arc::new(PostgresRepository::new(pool.clone()));
|
||||
pg_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&pg_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&pg_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&pg_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&pg_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(PostgresUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
enum DbPool {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Sqlite(sqlx::SqlitePool),
|
||||
|
||||
@@ -8,11 +8,13 @@ default = ["sqlite"]
|
||||
sqlite = ["dep:sqlite", "dep:sqlite-event-queue"]
|
||||
postgres = ["dep:postgres", "dep:postgres-event-queue"]
|
||||
nats = ["dep:nats"]
|
||||
federation = []
|
||||
sqlite-federation = ["sqlite", "dep:sqlite-federation", "dep:activitypub", "federation"]
|
||||
postgres-federation = ["postgres", "dep:postgres-federation", "dep:activitypub", "federation"]
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
event-publisher = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
@@ -38,3 +40,8 @@ sqlite = { workspace = true, optional = true }
|
||||
postgres = { workspace = true, optional = true }
|
||||
sqlite-event-queue = { workspace = true, optional = true }
|
||||
postgres-event-queue = { workspace = true, optional = true }
|
||||
|
||||
# Optional — federation
|
||||
activitypub = { workspace = true, optional = true }
|
||||
sqlite-federation = { workspace = true, optional = true }
|
||||
postgres-federation = { workspace = true, optional = true }
|
||||
|
||||
@@ -1,26 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use application::{config::AppConfig, context::AppContext, event_handlers::PosterSyncHandler, worker::WorkerService};
|
||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||
use export::ExportAdapter;
|
||||
use metadata::MetadataClientImpl;
|
||||
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
|
||||
use poster_storage::{PosterStorageAdapter, StorageConfig};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
use postgres::{PostgresRepository, PostgresUserRepository};
|
||||
|
||||
use domain::ports::{
|
||||
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository,
|
||||
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
||||
UserRepository,
|
||||
};
|
||||
use domain::ports::{DiaryExporter, EventHandler};
|
||||
|
||||
#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
|
||||
compile_error!("At least one database backend must be enabled. Use --features sqlite or --features postgres");
|
||||
@@ -32,35 +18,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
||||
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
|
||||
let auth_config = AuthConfig::from_env()?;
|
||||
let storage_config = StorageConfig::from_env()?;
|
||||
let app_config = AppConfig::from_env();
|
||||
|
||||
let metadata_client: Arc<dyn MetadataClient> =
|
||||
if let Ok(tmdb_key) = std::env::var("TMDB_API_KEY") {
|
||||
Arc::new(MetadataClientImpl::new_tmdb(tmdb_key))
|
||||
} else {
|
||||
let omdb_key = std::env::var("OMDB_API_KEY")
|
||||
.context("Either TMDB_API_KEY or OMDB_API_KEY must be set")?;
|
||||
Arc::new(MetadataClientImpl::new_omdb(omdb_key))
|
||||
};
|
||||
let poster_fetcher: Arc<dyn PosterFetcherClient> =
|
||||
Arc::new(ReqwestPosterFetcher::new(PosterFetcherConfig::from_env())?);
|
||||
let poster_storage: Arc<dyn PosterStorage> =
|
||||
Arc::new(PosterStorageAdapter::from_config(storage_config));
|
||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||
let (auth_service, password_hasher) = auth::create()?;
|
||||
let metadata_client = metadata::create()?;
|
||||
let poster_fetcher = poster_fetcher::create()?;
|
||||
let poster_storage = poster_storage::create()?;
|
||||
|
||||
let (movie_repository, review_repository, diary_repository, stats_repository, user_repository, db_pool) =
|
||||
match backend.as_str() {
|
||||
#[cfg(feature = "postgres")]
|
||||
"postgres" => {
|
||||
let (pool, m, r, d, s, u) = wire_postgres(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = postgres::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Postgres(pool))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (pool, m, r, d, s, u) = wire_sqlite(&database_url).await?;
|
||||
let (pool, m, r, d, s, u) = sqlite::wire(&database_url).await?;
|
||||
(m, r, d, s, u, DbPool::Sqlite(pool))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
@@ -73,11 +47,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
) = match EventBusBackend::from_env()? {
|
||||
EventBusBackend::Db => {
|
||||
tracing::info!("event bus: DB queue");
|
||||
match db_pool {
|
||||
match &db_pool {
|
||||
#[cfg(feature = "postgres")]
|
||||
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool).await?,
|
||||
DbPool::Postgres(pool) => postgres_event_queue::PostgresEventQueue::create_channel(pool.clone()).await?,
|
||||
#[cfg(feature = "sqlite")]
|
||||
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool).await?,
|
||||
DbPool::Sqlite(pool) => sqlite_event_queue::SqliteEventQueue::create_channel(pool.clone()).await?,
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "nats")]
|
||||
@@ -89,6 +63,16 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// Clone what federation handler needs before ctx and app_config are consumed.
|
||||
#[cfg(feature = "federation")]
|
||||
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url) = (
|
||||
Arc::clone(&movie_repository),
|
||||
Arc::clone(&review_repository),
|
||||
Arc::clone(&diary_repository),
|
||||
Arc::clone(&user_repository),
|
||||
app_config.base_url.clone(),
|
||||
);
|
||||
|
||||
let ctx = AppContext {
|
||||
movie_repository,
|
||||
review_repository,
|
||||
@@ -105,8 +89,37 @@ async fn main() -> anyhow::Result<()> {
|
||||
config: app_config,
|
||||
};
|
||||
|
||||
let poster_handler = Arc::new(PosterSyncHandler::new(ctx, 3));
|
||||
let worker = WorkerService::new(consumer_arc, vec![poster_handler]);
|
||||
let handlers: Vec<Arc<dyn EventHandler>> = {
|
||||
let poster = Arc::new(PosterSyncHandler::new(ctx, 3)) as Arc<dyn EventHandler>;
|
||||
|
||||
#[cfg(not(feature = "federation"))]
|
||||
{ vec![poster] }
|
||||
|
||||
#[cfg(feature = "federation")]
|
||||
{
|
||||
let (federation_repo, _social_query, review_store) = match &db_pool {
|
||||
#[cfg(feature = "sqlite-federation")]
|
||||
DbPool::Sqlite(pool) => sqlite_federation::wire(pool.clone()),
|
||||
#[cfg(feature = "postgres-federation")]
|
||||
DbPool::Postgres(pool) => postgres_federation::wire(pool.clone()),
|
||||
};
|
||||
|
||||
let ap = activitypub::wire(
|
||||
federation_repo,
|
||||
review_store,
|
||||
fed_user_repo,
|
||||
fed_movie_repo,
|
||||
fed_review_repo,
|
||||
fed_diary_repo,
|
||||
base_url,
|
||||
).await?.event_handler;
|
||||
|
||||
tracing::info!("federation event handler registered");
|
||||
vec![poster, ap]
|
||||
}
|
||||
};
|
||||
|
||||
let worker = WorkerService::new(consumer_arc, handlers);
|
||||
|
||||
tracing::info!("worker started");
|
||||
worker.run().await;
|
||||
@@ -154,69 +167,3 @@ fn init_tracing() {
|
||||
.init();
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
async fn wire_sqlite(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::SqlitePool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
|
||||
let opts = SqliteConnectOptions::from_str(database_url)
|
||||
.context("Invalid DATABASE_URL")?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.busy_timeout(std::time::Duration::from_secs(5));
|
||||
let pool = sqlx::SqlitePool::connect_with(opts)
|
||||
.await
|
||||
.context("Failed to connect to SQLite database")?;
|
||||
|
||||
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||
sqlite_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(SqliteUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
async fn wire_postgres(database_url: &str) -> anyhow::Result<(
|
||||
sqlx::PgPool,
|
||||
Arc<dyn MovieRepository>,
|
||||
Arc<dyn ReviewRepository>,
|
||||
Arc<dyn DiaryRepository>,
|
||||
Arc<dyn StatsRepository>,
|
||||
Arc<dyn UserRepository>,
|
||||
)> {
|
||||
let pool = sqlx::PgPool::connect(database_url)
|
||||
.await
|
||||
.context("Failed to connect to PostgreSQL database")?;
|
||||
|
||||
let pg_repo = Arc::new(PostgresRepository::new(pool.clone()));
|
||||
pg_repo
|
||||
.migrate()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||
.context("Database migration failed")?;
|
||||
|
||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&pg_repo) as _;
|
||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&pg_repo) as _;
|
||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&pg_repo) as _;
|
||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&pg_repo) as _;
|
||||
let user_repository: Arc<dyn UserRepository> =
|
||||
Arc::new(PostgresUserRepository::new(pool.clone()));
|
||||
|
||||
Ok((pool, movie_repository, review_repository, diary_repository, stats_repository, user_repository))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user