From 597685520c560d41cefcbc3e26a93d3bf5d29676 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Sun, 10 May 2026 01:57:10 +0200 Subject: [PATCH] feat: postgresql adapter --- .env.example | 12 +- Cargo.lock | 35 + Cargo.toml | 4 + Dockerfile | 2 + .../adapters/postgres-federation/Cargo.toml | 24 + .../adapters/postgres-federation/src/lib.rs | 476 +++++++++++ crates/adapters/postgres/Cargo.toml | 19 + .../postgres/migrations/0001_initial.sql | 69 ++ crates/adapters/postgres/src/lib.rs | 768 ++++++++++++++++++ crates/adapters/postgres/src/models.rs | 210 +++++ crates/adapters/postgres/src/users.rs | 204 +++++ crates/domain/src/models/mod.rs | 2 - crates/presentation/Cargo.toml | 2 + crates/presentation/src/main.rs | 126 ++- 14 files changed, 1915 insertions(+), 38 deletions(-) create mode 100644 crates/adapters/postgres-federation/Cargo.toml create mode 100644 crates/adapters/postgres-federation/src/lib.rs create mode 100644 crates/adapters/postgres/Cargo.toml create mode 100644 crates/adapters/postgres/migrations/0001_initial.sql create mode 100644 crates/adapters/postgres/src/lib.rs create mode 100644 crates/adapters/postgres/src/models.rs create mode 100644 crates/adapters/postgres/src/users.rs diff --git a/.env.example b/.env.example index 88e9d61..8efd8c8 100644 --- a/.env.example +++ b/.env.example @@ -1,12 +1,20 @@ -# Database +# Database backend — "sqlite" (default) or "postgres" +DATABASE_BACKEND=sqlite + +# Option A: SQLite (default, zero external dependencies) DATABASE_URL=sqlite://movies.db +# Option B: PostgreSQL +# DATABASE_BACKEND=postgres +# DATABASE_URL=postgres://user:password@localhost:5432/movies_diary + # Authentication JWT_SECRET=change-me JWT_TTL_SECONDS=86400 -# OMDb metadata +# OMDb/TMDB metadata OMDB_API_KEY=your-key +TMDB_API_KEY=your-key # Poster storage — Option A (local) is active. To use S3, comment it out and uncomment Option B: diff --git a/Cargo.lock b/Cargo.lock index 2b5c9a8..3f8869f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3183,6 +3183,35 @@ dependencies = [ "uuid", ] +[[package]] +name = "postgres" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "domain", + "sqlx", + "tokio", + "tracing", + "uuid", +] + +[[package]] +name = "postgres-federation" +version = "0.1.0" +dependencies = [ + "activitypub", + "activitypub-base", + "anyhow", + "async-trait", + "chrono", + "domain", + "sqlx", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "potential_utf" version = "0.1.5" @@ -3230,6 +3259,8 @@ dependencies = [ "percent-encoding", "poster-fetcher", "poster-storage", + "postgres", + "postgres-federation", "rss 0.1.0", "serde", "serde_json", @@ -4286,6 +4317,7 @@ checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" dependencies = [ "base64", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -4364,6 +4396,7 @@ dependencies = [ "bitflags 2.11.1", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -4406,6 +4439,7 @@ dependencies = [ "base64", "bitflags 2.11.1", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -4441,6 +4475,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 2e3e7a9..7d02e99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ members = [ "crates/adapters/poster-storage", "crates/adapters/rss", "crates/adapters/sqlite", + "crates/adapters/postgres", "crates/adapters/sqlite-federation", + "crates/adapters/postgres-federation", "crates/adapters/template-askama", "crates/adapters/activitypub", "crates/adapters/activitypub-base", @@ -54,6 +56,8 @@ rss = { path = "crates/adapters/rss" } export = { path = "crates/adapters/export" } sqlite = { path = "crates/adapters/sqlite" } sqlite-federation = { path = "crates/adapters/sqlite-federation" } +postgres = { path = "crates/adapters/postgres" } +postgres-federation = { path = "crates/adapters/postgres-federation" } template-askama = { path = "crates/adapters/template-askama" } activitypub = { path = "crates/adapters/activitypub" } activitypub-base = { path = "crates/adapters/activitypub-base" } diff --git a/Dockerfile b/Dockerfile index bc667c2..aaf6355 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,8 @@ COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.t COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml +COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml +COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml diff --git a/crates/adapters/postgres-federation/Cargo.toml b/crates/adapters/postgres-federation/Cargo.toml new file mode 100644 index 0000000..3b8f524 --- /dev/null +++ b/crates/adapters/postgres-federation/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "postgres-federation" +version = "0.1.0" +edition = "2024" + +[dependencies] +sqlx = { version = "0.8.6", features = [ + "runtime-tokio-rustls", + "postgres", + "uuid", + "macros", + "chrono", +] } +activitypub = { workspace = true } +activitypub-base = { workspace = true } +domain = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +anyhow = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs new file mode 100644 index 0000000..7aa5f72 --- /dev/null +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -0,0 +1,476 @@ +use anyhow::{Result, anyhow}; +use async_trait::async_trait; +use chrono::{NaiveDateTime, Utc}; +use sqlx::{PgPool, Row}; + +use activitypub::RemoteReviewRepository; +use activitypub_base::{ + FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor, +}; +use domain::models::{Review, ReviewSource}; + +fn datetime_to_str(dt: &NaiveDateTime) -> String { + dt.format("%Y-%m-%d %H:%M:%S").to_string() +} + +pub struct PostgresFederationRepository { + pool: PgPool, +} + +impl PostgresFederationRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +fn status_to_str(status: &FollowerStatus) -> &'static str { + match status { + FollowerStatus::Pending => "pending", + FollowerStatus::Accepted => "accepted", + FollowerStatus::Rejected => "rejected", + } +} + +fn str_to_status(s: &str) -> FollowerStatus { + match s { + "accepted" => FollowerStatus::Accepted, + "rejected" => FollowerStatus::Rejected, + _ => FollowerStatus::Pending, + } +} + +#[async_trait] +impl FederationRepository for PostgresFederationRepository { + async fn add_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + follow_activity_id: &str, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = status_to_str(&status); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + sqlx::query( + "INSERT INTO ap_followers (local_user_id, remote_actor_url, status, created_at, follow_activity_id) + VALUES ($1, $2, $3, $4::timestamptz, $5) + ON CONFLICT(local_user_id, remote_actor_url) DO UPDATE SET + status = EXCLUDED.status, + follow_activity_id = EXCLUDED.follow_activity_id", + ) + .bind(&uid) + .bind(remote_actor_url) + .bind(status_str) + .bind(&created_at) + .bind(follow_activity_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_follower_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result> { + let uid = local_user_id.to_string(); + let row: Option = sqlx::query_scalar( + "SELECT follow_activity_id FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2", + ) + .bind(&uid) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + async fn remove_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2") + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + let rows = sqlx::query( + "SELECT f.remote_actor_url, f.status, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|row| { + let url: String = row.get("remote_actor_url"); + let status_str: String = row.get("status"); + let handle: String = row.try_get("handle").unwrap_or_default(); + let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); + let shared_inbox_url: Option = row.try_get("shared_inbox_url").ok().flatten(); + let display_name: Option = row.try_get("display_name").ok().flatten(); + Follower { + actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name }, + status: str_to_status(&status_str), + } + }).collect()) + } + + async fn update_follower_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = status_to_str(&status); + let result = sqlx::query( + "UPDATE ap_followers SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + if result.rows_affected() == 0 { + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found"); + } + Ok(()) + } + + async fn add_following( + &self, + local_user_id: uuid::Uuid, + actor: RemoteActor, + follow_activity_id: &str, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + self.upsert_remote_actor(actor.clone()).await?; + sqlx::query( + "INSERT INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at) + VALUES ($1, $2, $3, $4::timestamptz) + ON CONFLICT DO NOTHING", + ) + .bind(&uid) + .bind(&actor.url) + .bind(follow_activity_id) + .bind(&created_at) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result> { + let uid = local_user_id.to_string(); + let row: Option = sqlx::query_scalar( + "SELECT follow_activity_id FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2", + ) + .bind(&uid) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2") + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + let rows = sqlx::query( + "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_following f + INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'accepted'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) + } + + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { + let uid = local_user_id.to_string(); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'", + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; + Ok(count as usize) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + let now = Utc::now().naive_utc(); + let fetched_at = datetime_to_str(&now); + sqlx::query( + "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, fetched_at) + VALUES ($1, $2, $3, $4, $5, $6::timestamptz) + ON CONFLICT(url) DO UPDATE SET + handle = EXCLUDED.handle, + inbox_url = EXCLUDED.inbox_url, + shared_inbox_url = EXCLUDED.shared_inbox_url, + display_name = EXCLUDED.display_name, + fetched_at = EXCLUDED.fetched_at", + ) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.inbox_url) + .bind(&actor.shared_inbox_url) + .bind(&actor.display_name) + .bind(&fetched_at) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + let row = sqlx::query( + "SELECT url, handle, inbox_url, shared_inbox_url, display_name + FROM ap_remote_actors WHERE url = $1", + ) + .bind(actor_url) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + })) + } + + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + let uid = user_id.to_string(); + let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1") + .bind(&uid) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) + } + + async fn save_local_actor_keypair( + &self, + user_id: uuid::Uuid, + public_key: String, + private_key: String, + ) -> Result<()> { + let uid = user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + sqlx::query( + "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) + VALUES ($1, $2, $3, $4::timestamptz) + ON CONFLICT(user_id) DO UPDATE SET + public_key = EXCLUDED.public_key, + private_key = EXCLUDED.private_key", + ) + .bind(&uid) + .bind(&public_key) + .bind(&private_key) + .bind(&created_at) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + let rows = sqlx::query( + "SELECT f.remote_actor_url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = $1 AND f.status = 'pending'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("remote_actor_url"), + handle: row.try_get("handle").unwrap_or_default(), + inbox_url: row.try_get("inbox_url").unwrap_or_default(), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) + } + + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = match status { + FollowingStatus::Pending => "pending", + FollowingStatus::Accepted => "accepted", + }; + let result = sqlx::query( + "UPDATE ap_following SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + if result.rows_affected() == 0 { + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found"); + } + Ok(()) + } +} + +#[async_trait] +impl RemoteReviewRepository for PostgresFederationRepository { + async fn save_remote_review( + &self, + review: &Review, + ap_id: &str, + movie_title: &str, + release_year: u16, + poster_url: Option<&str>, + ) -> Result<()> { + let actor_url = match review.source() { + ReviewSource::Remote { actor_url } => actor_url.clone(), + ReviewSource::Local => return Err(anyhow!("save_remote_review called with a local review")), + }; + let movie_id = review.movie_id().value().to_string(); + sqlx::query( + "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) + VALUES ($1, NULL, $2, $3, NULL, $4) + ON CONFLICT(id) DO UPDATE SET + poster_path = COALESCE(EXCLUDED.poster_path, movies.poster_path)", + ) + .bind(&movie_id) + .bind(movie_title) + .bind(release_year.max(1888) as i64) + .bind(poster_url) + .execute(&self.pool) + .await?; + + let id = review.id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + sqlx::query( + "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url, ap_id) + VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8, $9) + ON CONFLICT DO NOTHING", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&actor_url) + .bind(ap_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM reviews WHERE ap_id = $1 AND remote_actor_url = $2") + .bind(ap_id) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn update_remote_review( + &self, + ap_id: &str, + actor_url: &str, + rating: u8, + comment: Option<&str>, + watched_at: chrono::NaiveDateTime, + ) -> Result<()> { + let watched_at_str = datetime_to_str(&watched_at); + sqlx::query( + "UPDATE reviews SET rating = $1, comment = $2, watched_at = $3::timestamptz + WHERE ap_id = $4 AND remote_actor_url = $5", + ) + .bind(rating as i64) + .bind(comment) + .bind(&watched_at_str) + .bind(ap_id) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn delete_by_actor(&self, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM reviews WHERE remote_actor_url = $1") + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } +} + +#[async_trait] +impl domain::ports::SocialQueryPort for PostgresFederationRepository { + async fn get_accepted_following_urls( + &self, + user_id: uuid::Uuid, + ) -> Result, domain::errors::DomainError> { + let user_id_str = user_id.to_string(); + sqlx::query_scalar::<_, String>( + "SELECT remote_actor_url FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'", + ) + .bind(&user_id_str) + .fetch_all(&self.pool) + .await + .map_err(|e| domain::errors::DomainError::InfrastructureError(e.to_string())) + } + + async fn list_all_followed_remote_actors( + &self, + ) -> Result, domain::errors::DomainError> { + let rows = sqlx::query_as::<_, (String, String, Option)>( + "SELECT DISTINCT ar.url, ar.handle, ar.display_name + FROM ap_remote_actors ar + JOIN ap_following f ON f.remote_actor_url = ar.url + WHERE f.status = 'accepted'", + ) + .fetch_all(&self.pool) + .await + .map_err(|e| domain::errors::DomainError::InfrastructureError(e.to_string()))?; + Ok(rows.into_iter().map(|(url, handle, display_name)| domain::ports::RemoteActorInfo { url, handle, display_name }).collect()) + } +} diff --git a/crates/adapters/postgres/Cargo.toml b/crates/adapters/postgres/Cargo.toml new file mode 100644 index 0000000..afd4c74 --- /dev/null +++ b/crates/adapters/postgres/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "postgres" +version = "0.1.0" +edition = "2024" + +[dependencies] +sqlx = { version = "0.8.6", features = [ + "runtime-tokio-rustls", + "postgres", + "uuid", + "macros", + "chrono", +] } +domain = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true } diff --git a/crates/adapters/postgres/migrations/0001_initial.sql b/crates/adapters/postgres/migrations/0001_initial.sql new file mode 100644 index 0000000..63a2ed0 --- /dev/null +++ b/crates/adapters/postgres/migrations/0001_initial.sql @@ -0,0 +1,69 @@ +CREATE TABLE IF NOT EXISTS movies ( + id TEXT PRIMARY KEY NOT NULL, + external_metadata_id TEXT UNIQUE, + title TEXT NOT NULL, + release_year BIGINT NOT NULL, + director TEXT, + poster_path TEXT +); + +CREATE INDEX IF NOT EXISTS idx_movies_title_year ON movies (title, release_year); + +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY NOT NULL, + email TEXT UNIQUE NOT NULL, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + role TEXT NOT NULL DEFAULT 'standard' +); + +CREATE TABLE IF NOT EXISTS reviews ( + id TEXT PRIMARY KEY NOT NULL, + movie_id TEXT NOT NULL REFERENCES movies(id), + user_id TEXT NOT NULL, + rating BIGINT NOT NULL, + comment TEXT, + watched_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + remote_actor_url TEXT, + ap_id TEXT +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_reviews_ap_id ON reviews (ap_id) WHERE ap_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_reviews_movie_id ON reviews (movie_id); +CREATE INDEX IF NOT EXISTS idx_reviews_watched_at ON reviews (watched_at); + +CREATE TABLE IF NOT EXISTS ap_followers ( + local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + follow_activity_id TEXT, + created_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (local_user_id, remote_actor_url) +); + +CREATE TABLE IF NOT EXISTS ap_following ( + local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + remote_actor_url TEXT NOT NULL, + follow_activity_id TEXT, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (local_user_id, remote_actor_url) +); + +CREATE TABLE IF NOT EXISTS ap_remote_actors ( + url TEXT PRIMARY KEY, + handle TEXT NOT NULL, + inbox_url TEXT NOT NULL, + shared_inbox_url TEXT, + display_name TEXT, + fetched_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS ap_local_actors ( + user_id TEXT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE, + public_key TEXT NOT NULL, + private_key TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL +); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs new file mode 100644 index 0000000..c2c4edd --- /dev/null +++ b/crates/adapters/postgres/src/lib.rs @@ -0,0 +1,768 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + events::DomainEvent, + models::{ + DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, Review, + ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends, + collections::{PageParams, Paginated}, + }, + ports::{DiaryRepository, MovieRepository, ReviewRepository, StatsRepository}, + value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId}, +}; +use sqlx::PgPool; + +mod models; +mod users; + +use models::{ + DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, ReviewRow, UserTotalsRow, + datetime_to_str, +}; + +pub use users::PostgresUserRepository; + +fn format_year_month(ym: &str) -> String { + let parts: Vec<&str> = ym.splitn(2, '-').collect(); + if parts.len() != 2 { + return ym.to_string(); + } + let year = parts[0].get(2..).unwrap_or(parts[0]); + let month = match parts[1] { + "01" => "Jan", + "02" => "Feb", + "03" => "Mar", + "04" => "Apr", + "05" => "May", + "06" => "Jun", + "07" => "Jul", + "08" => "Aug", + "09" => "Sep", + "10" => "Oct", + "11" => "Nov", + "12" => "Dec", + _ => parts[1], + }; + format!("{} '{}", month, year) +} + +pub struct PostgresRepository { + pool: PgPool, +} + +impl PostgresRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub async fn migrate(&self) -> Result<(), DomainError> { + sqlx::migrate!("./migrations") + .run(&self.pool) + .await + .map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e))) + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result { + match movie_id { + None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews") + .fetch_one(&self.pool) + .await + .map_err(Self::map_err), + Some(id) => { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = $1") + .bind(id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + } + } + + async fn fetch_all_diary_rows( + &self, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order = match sort { + SortDirection::Ascending => "r.watched_at ASC", + _ => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + ORDER BY {} + LIMIT $1 OFFSET $2", + order + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_movie_diary_rows( + &self, + movie_id: &str, + sort: &SortDirection, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let order = match sort { + SortDirection::Ascending => "r.watched_at ASC", + _ => "r.watched_at DESC", + }; + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.movie_id = $1 + ORDER BY {} + LIMIT $2 OFFSET $3", + order + ); + sqlx::query_as::<_, DiaryRow>(&sql) + .bind(movie_id) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn count_user_diary_entries( + &self, + user_id: &str, + search: Option<&str>, + ) -> Result { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let sql = if has_search { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.title ILIKE '%' || $2 || '%'" + .to_string() + } else { + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1" + .to_string() + }; + let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.fetch_one(&self.pool).await.map_err(Self::map_err) + } + + async fn fetch_user_diary_rows( + &self, + user_id: &str, + sort: &SortDirection, + search: Option<&str>, + limit: i64, + offset: i64, + ) -> Result, DomainError> { + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + let order_clause = match sort { + SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC", + SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC", + SortDirection::Ascending => "r.watched_at ASC", + SortDirection::Descending => "r.watched_at DESC", + }; + + // Build param counter: user_id=$1, optional search=$2, limit=$N-1, offset=$N + let mut p: i32 = 1; // $1 is user_id + let search_clause = if has_search { + p += 1; + format!(" AND m.title ILIKE '%' || ${} || '%'", p) + } else { + String::new() + }; + p += 1; + let limit_param = format!("${}", p); + p += 1; + let offset_param = format!("${}", p); + + let sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1{} + ORDER BY {} + LIMIT {} OFFSET {}", + search_clause, order_clause, limit_param, offset_param + ); + + let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id); + if has_search { + q = q.bind(search.unwrap()); + } + q.bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_totals(&self, user_id: &str) -> Result { + sqlx::query_as::<_, UserTotalsRow>( + r#"SELECT COUNT(DISTINCT movie_id) AS total, + AVG(rating::float) AS avg_rating + FROM reviews WHERE user_id = $1"#, + ) + .bind(user_id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_favorite_director( + &self, + user_id: &str, + ) -> Result, DomainError> { + sqlx::query_scalar::<_, String>( + "SELECT m.director + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err) + } + + async fn fetch_user_most_active_month( + &self, + user_id: &str, + ) -> Result, DomainError> { + sqlx::query_scalar::<_, String>( + "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month + FROM reviews + WHERE user_id = $1 + GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') + ORDER BY COUNT(*) DESC + LIMIT 1", + ) + .bind(user_id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err) + } +} + +#[async_trait] +impl MovieRepository for PostgresRepository { + async fn get_movie_by_external_id( + &self, + external_metadata_id: &ExternalMetadataId, + ) -> Result, DomainError> { + let id = external_metadata_id.value(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE external_metadata_id = $1", + ) + .bind(id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::to_domain) + .transpose() + } + + async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError> { + let id = movie_id.value().to_string(); + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(MovieRow::to_domain) + .transpose() + } + + async fn get_movies_by_title_and_year( + &self, + title: &MovieTitle, + year: &ReleaseYear, + ) -> Result, DomainError> { + let title = title.value(); + let year = year.value() as i64; + sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE title = $1 AND release_year = $2", + ) + .bind(title) + .bind(year) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(MovieRow::to_domain) + .collect() + } + + async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> { + let id = movie.id().value().to_string(); + let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string()); + let title = movie.title().value(); + let release_year = movie.release_year().value() as i64; + let director = movie.director(); + let poster_path = movie.poster_path().map(|p| p.value().to_string()); + + sqlx::query( + "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT(id) DO UPDATE SET + external_metadata_id = excluded.external_metadata_id, + title = excluded.title, + release_year = excluded.release_year, + director = excluded.director, + poster_path = excluded.poster_path", + ) + .bind(&id) + .bind(&external_metadata_id) + .bind(title) + .bind(release_year) + .bind(director) + .bind(&poster_path) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(()) + } + + async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> { + let id = movie_id.value().to_string(); + sqlx::query("DELETE FROM movies WHERE id = $1") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } +} + +#[async_trait] +impl ReviewRepository for PostgresRepository { + async fn save_review(&self, review: &Review) -> Result { + let id = review.id().value().to_string(); + let movie_id = review.movie_id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + let remote_actor_url = match review.source() { + ReviewSource::Local => None, + ReviewSource::Remote { actor_url } => Some(actor_url.clone()), + }; + + sqlx::query( + "INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url) + VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8)", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&remote_actor_url) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + + Ok(DomainEvent::ReviewLogged { + review_id: review.id().clone(), + movie_id: review.movie_id().clone(), + user_id: review.user_id().clone(), + rating: review.rating().clone(), + watched_at: *review.watched_at(), + }) + } + + async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError> { + let id = review_id.value().to_string(); + sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE id = $1", + ) + .bind(&id) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .map(ReviewRow::to_domain) + .transpose() + } + + async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> { + let id = review_id.value().to_string(); + sqlx::query("DELETE FROM reviews WHERE id = $1") + .bind(&id) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn get_all_reviews_for_user( + &self, + _user_id: &UserId, + ) -> Result, DomainError> { + todo!() + } +} + +#[async_trait] +impl DiaryRepository for PostgresRepository { + async fn query_diary( + &self, + filter: &DiaryFilter, + ) -> Result, DomainError> { + let limit = filter.page.limit as i64; + let offset = filter.page.offset as i64; + + let (total, rows) = match (&filter.movie_id, &filter.user_id) { + (None, None) => tokio::try_join!( + self.count_diary_entries(None), + self.fetch_all_diary_rows(&filter.sort_by, limit, offset) + )?, + (Some(id), None) => { + let id_str = id.value().to_string(); + tokio::try_join!( + self.count_diary_entries(Some(id_str.as_str())), + self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset) + )? + } + (None, Some(uid)) => { + let uid_str = uid.value().to_string(); + let search = filter.search.as_deref(); + tokio::try_join!( + self.count_user_diary_entries(&uid_str, search), + self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset) + )? + } + (Some(_), Some(_)) => { + return Err(DomainError::ValidationError( + "Combined movie_id + user_id filter not supported".into(), + )); + } + }; + + let items = rows + .into_iter() + .map(DiaryRow::to_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: filter.page.limit, + offset: filter.page.offset, + }) + } + + async fn query_activity_feed( + &self, + page: &PageParams, + ) -> Result, DomainError> { + self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None) + .await + } + + async fn query_activity_feed_filtered( + &self, + page: &PageParams, + sort_by: &domain::ports::FeedSortBy, + search: Option<&str>, + following: Option<&domain::ports::FollowingFilter>, + ) -> Result, DomainError> { + use domain::ports::FeedSortBy; + + let limit = page.limit as i64; + let offset = page.offset as i64; + let has_search = search.map(|s| !s.is_empty()).unwrap_or(false); + + // Dynamic param counter + let mut p: i32 = 0; + let mut next_param = || { + p += 1; + format!("${}", p) + }; + + let mut where_parts = vec!["1=1".to_string()]; + + if has_search { + let pn = next_param(); + where_parts.push(format!("m.title ILIKE '%' || {} || '%'", pn)); + } + + if let Some(f) = following { + let local_params: Vec = + f.local_user_ids.iter().map(|_| next_param()).collect(); + let remote_params: Vec = + f.remote_actor_urls.iter().map(|_| next_param()).collect(); + + let local_in = if local_params.is_empty() { + "(SELECT NULL::text WHERE false)".to_string() + } else { + local_params.join(", ") + }; + let remote_in = if remote_params.is_empty() { + "(SELECT NULL::text WHERE false)".to_string() + } else { + remote_params.join(", ") + }; + where_parts.push(format!( + "(r.user_id IN ({}) OR r.remote_actor_url IN ({}))", + local_in, remote_in + )); + } + + let limit_param = next_param(); + let offset_param = next_param(); + + let order_clause = match sort_by { + FeedSortBy::Date => "r.watched_at DESC", + FeedSortBy::DateAsc => "r.watched_at ASC", + FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC", + FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC", + }; + + let where_clause = where_parts.join(" AND "); + + // Reset counter for count query (reuse same where_clause string but re-bind) + // We need a separate counter for count SQL — but since where_clause is already built + // with the right $N references, both queries share it. + let count_sql = format!( + "SELECT COUNT(*) FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE {}", + where_clause + ); + + let select_sql = format!( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url, + COALESCE(u.email, r.remote_actor_url) AS user_email + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + LEFT JOIN users u ON u.id = r.user_id + WHERE {} + ORDER BY {} + LIMIT {} OFFSET {}", + where_clause, order_clause, limit_param, offset_param + ); + + // Bind helper closure — binds search + following params in order + macro_rules! bind_filter_params { + ($q:expr) => {{ + let mut q = $q; + if has_search { + q = q.bind(search.unwrap()); + } + if let Some(f) = following { + for uid in &f.local_user_ids { + q = q.bind(uid.to_string()); + } + for url in &f.remote_actor_urls { + q = q.bind(url.as_str()); + } + } + q + }}; + } + + let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql)); + let total = count_q + .fetch_one(&self.pool) + .await + .map_err(Self::map_err)?; + + let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql)); + let rows = rows_q + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + let items = rows + .into_iter() + .map(FeedRow::to_domain) + .collect::, _>>()?; + + Ok(Paginated { + items, + total_count: total as u64, + limit: page.limit, + offset: page.offset, + }) + } + + async fn get_review_history(&self, movie_id: &MovieId) -> Result { + let id_str = movie_id.value().to_string(); + + let movie = sqlx::query_as::<_, MovieRow>( + "SELECT id, external_metadata_id, title, release_year, director, poster_path + FROM movies WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)? + .ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))? + .to_domain()?; + + let viewings = sqlx::query_as::<_, ReviewRow>( + "SELECT id, movie_id, user_id, rating, comment, + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + remote_actor_url + FROM reviews WHERE movie_id = $1 ORDER BY watched_at ASC", + ) + .bind(&id_str) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(ReviewRow::to_domain) + .collect::, _>>()?; + + Ok(ReviewHistory::new(movie, viewings)) + } + + async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + let rows = sqlx::query_as::<_, DiaryRow>( + "SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path, + r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment, + to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, + to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, + r.remote_actor_url + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 + ORDER BY r.watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)?; + + rows.into_iter().map(DiaryRow::to_domain).collect() + } +} + +#[async_trait] +impl StatsRepository for PostgresRepository { + async fn get_user_stats(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (totals, fav_director, most_active) = tokio::try_join!( + self.fetch_user_totals(&uid), + self.fetch_user_favorite_director(&uid), + self.fetch_user_most_active_month(&uid) + )?; + + let most_active_month = most_active.map(|ym| format_year_month(&ym)); + + Ok(UserStats { + total_movies: totals.total, + avg_rating: totals.avg_rating, + favorite_director: fav_director, + most_active_month, + }) + } + + async fn get_user_trends(&self, user_id: &UserId) -> Result { + let uid = user_id.value().to_string(); + + let (rating_rows, director_rows) = tokio::try_join!( + sqlx::query_as::<_, MonthlyRatingRow>( + "SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month, + AVG(rating::float) AS avg_rating, + COUNT(*) AS count + FROM reviews + WHERE user_id = $1 AND watched_at >= NOW() - INTERVAL '12 months' + GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') + ORDER BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') ASC" + ) + .bind(&uid) + .fetch_all(&self.pool), + sqlx::query_as::<_, DirectorCountRow>( + "SELECT m.director AS director, COUNT(*) AS count + FROM reviews r + INNER JOIN movies m ON m.id = r.movie_id + WHERE r.user_id = $1 AND m.director IS NOT NULL + GROUP BY m.director + ORDER BY COUNT(*) DESC + LIMIT 5" + ) + .bind(&uid) + .fetch_all(&self.pool) + ) + .map_err(Self::map_err)?; + + let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1); + + let monthly_ratings = rating_rows + .into_iter() + .map(|r| MonthlyRating { + month_label: format_year_month(&r.month), + year_month: r.month, + avg_rating: r.avg_rating, + count: r.count, + }) + .collect(); + + let top_directors = director_rows + .into_iter() + .map(|d| DirectorStat { + director: d.director, + count: d.count, + }) + .collect(); + + Ok(UserTrends { + monthly_ratings, + top_directors, + max_director_count, + }) + } +} diff --git a/crates/adapters/postgres/src/models.rs b/crates/adapters/postgres/src/models.rs new file mode 100644 index 0000000..3fe5b8e --- /dev/null +++ b/crates/adapters/postgres/src/models.rs @@ -0,0 +1,210 @@ +use chrono::NaiveDateTime; +use domain::{ + errors::DomainError, + models::{DiaryEntry, FeedEntry, Movie, Review, ReviewSource, UserSummary}, + value_objects::{ + Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear, + ReviewId, UserId, + }, +}; +use uuid::Uuid; + +#[derive(sqlx::FromRow)] +pub(crate) struct MovieRow { + pub id: String, + pub external_metadata_id: Option, + pub title: String, + pub release_year: i64, + pub director: Option, + pub poster_path: Option, +} + +impl MovieRow { + pub fn to_domain(self) -> Result { + let id = MovieId::from_uuid(parse_uuid(&self.id)?); + let external_metadata_id = self + .external_metadata_id + .map(ExternalMetadataId::new) + .transpose()?; + let title = MovieTitle::new(self.title)?; + let release_year = ReleaseYear::new(self.release_year as u16)?; + let poster_path = self.poster_path.map(PosterPath::new).transpose()?; + Ok(Movie::from_persistence( + id, + external_metadata_id, + title, + release_year, + self.director, + poster_path, + )) + } +} + +#[derive(sqlx::FromRow)] +pub(crate) struct ReviewRow { + pub id: String, + pub movie_id: String, + pub user_id: String, + pub rating: i64, + pub comment: Option, + pub watched_at: String, + pub created_at: String, + pub remote_actor_url: Option, +} + +impl ReviewRow { + pub fn to_domain(self) -> Result { + let id = ReviewId::from_uuid(parse_uuid(&self.id)?); + let movie_id = MovieId::from_uuid(parse_uuid(&self.movie_id)?); + let user_id = UserId::from_uuid(parse_uuid(&self.user_id)?); + let rating = Rating::new(self.rating as u8)?; + let comment = self.comment.map(Comment::new).transpose()?; + let watched_at = parse_datetime(&self.watched_at)?; + let created_at = parse_datetime(&self.created_at)?; + let source = match self.remote_actor_url { + None => ReviewSource::Local, + Some(url) => ReviewSource::Remote { actor_url: url }, + }; + Ok(Review::from_persistence( + id, movie_id, user_id, rating, comment, watched_at, created_at, source, + )) + } +} + +#[derive(sqlx::FromRow)] +pub(crate) struct DiaryRow { + pub id: String, + pub external_metadata_id: Option, + pub title: String, + pub release_year: i64, + pub director: Option, + pub poster_path: Option, + pub review_id: String, + pub movie_id: String, + pub user_id: String, + pub rating: i64, + pub comment: Option, + pub watched_at: String, + pub created_at: String, + pub remote_actor_url: Option, +} + +impl DiaryRow { + pub fn to_domain(self) -> Result { + let movie = MovieRow { + id: self.id, + external_metadata_id: self.external_metadata_id, + title: self.title, + release_year: self.release_year, + director: self.director, + poster_path: self.poster_path, + } + .to_domain()?; + let review = ReviewRow { + id: self.review_id, + movie_id: self.movie_id, + user_id: self.user_id, + rating: self.rating, + comment: self.comment, + watched_at: self.watched_at, + created_at: self.created_at, + remote_actor_url: self.remote_actor_url, + } + .to_domain()?; + Ok(DiaryEntry::new(movie, review)) + } +} + +#[derive(sqlx::FromRow)] +pub(crate) struct FeedRow { + pub id: String, + pub external_metadata_id: Option, + pub title: String, + pub release_year: i64, + pub director: Option, + pub poster_path: Option, + pub review_id: String, + pub movie_id: String, + pub user_id: String, + pub rating: i64, + pub comment: Option, + pub watched_at: String, + pub created_at: String, + pub remote_actor_url: Option, + pub user_email: String, +} + +impl FeedRow { + pub fn to_domain(self) -> Result { + let diary = DiaryRow { + id: self.id, + external_metadata_id: self.external_metadata_id, + title: self.title, + release_year: self.release_year, + director: self.director, + poster_path: self.poster_path, + review_id: self.review_id, + movie_id: self.movie_id, + user_id: self.user_id, + rating: self.rating, + comment: self.comment, + watched_at: self.watched_at, + created_at: self.created_at, + remote_actor_url: self.remote_actor_url, + } + .to_domain()?; + Ok(FeedEntry::new(diary, self.user_email)) + } +} + +#[derive(sqlx::FromRow)] +pub(crate) struct UserSummaryRow { + pub id: String, + pub email: String, + pub total_movies: i64, + pub avg_rating: Option, +} + +impl UserSummaryRow { + pub fn to_domain(self) -> Result { + Ok(UserSummary::new( + UserId::from_uuid(parse_uuid(&self.id)?), + Email::new(self.email)?, + self.total_movies, + self.avg_rating, + )) + } +} + +#[derive(sqlx::FromRow)] +pub(crate) struct UserTotalsRow { + pub total: i64, + pub avg_rating: Option, +} + +#[derive(sqlx::FromRow)] +pub(crate) struct DirectorCountRow { + pub director: String, + pub count: i64, +} + +#[derive(sqlx::FromRow)] +pub(crate) struct MonthlyRatingRow { + pub month: String, + pub avg_rating: f64, + pub count: i64, +} + +pub(crate) fn parse_uuid(s: &str) -> Result { + Uuid::parse_str(s) + .map_err(|e| DomainError::InfrastructureError(format!("Invalid UUID '{}': {}", s, e))) +} + +pub(crate) fn datetime_to_str(dt: &NaiveDateTime) -> String { + dt.format("%Y-%m-%d %H:%M:%S").to_string() +} + +pub(crate) fn parse_datetime(s: &str) -> Result { + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") + .map_err(|e| DomainError::InfrastructureError(format!("Invalid datetime '{}': {}", s, e))) +} diff --git a/crates/adapters/postgres/src/users.rs b/crates/adapters/postgres/src/users.rs new file mode 100644 index 0000000..bb6b9f7 --- /dev/null +++ b/crates/adapters/postgres/src/users.rs @@ -0,0 +1,204 @@ +use async_trait::async_trait; +use chrono::Utc; +use sqlx::PgPool; + +use domain::{ + errors::DomainError, + models::{User, UserRole}, + ports::UserRepository, + value_objects::{Email, PasswordHash, UserId, Username}, +}; + +use super::models::UserSummaryRow; + +pub struct PostgresUserRepository { + pool: PgPool, +} + +impl PostgresUserRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) + } + + fn parse_role(s: &str) -> UserRole { + match s { + "admin" => UserRole::Admin, + _ => UserRole::Standard, + } + } + + fn row_to_user( + id_str: String, + email_str: String, + username_str: String, + hash_str: String, + role: UserRole, + ) -> Result { + let id = uuid::Uuid::parse_str(&id_str) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + let email = Email::new(email_str) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + let username = Username::new(username_str) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + let hash = PasswordHash::new(hash_str) + .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; + Ok(User::from_persistence( + UserId::from_uuid(id), + email, + username, + hash, + role, + )) + } +} + +#[async_trait] +impl UserRepository for PostgresUserRepository { + async fn find_by_email(&self, email: &Email) -> Result, DomainError> { + let email_str = email.value(); + #[derive(sqlx::FromRow)] + struct Row { + id: String, + email: String, + username: String, + password_hash: String, + role: String, + } + let row = sqlx::query_as::<_, Row>( + "SELECT id, email, username, password_hash, role FROM users WHERE email = $1", + ) + .bind(email_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + row.map(|r| { + Self::row_to_user( + r.id, + r.email, + r.username, + r.password_hash, + Self::parse_role(&r.role), + ) + }) + .transpose() + } + + async fn find_by_username(&self, username: &Username) -> Result, DomainError> { + let username_str = username.value(); + #[derive(sqlx::FromRow)] + struct Row { + id: String, + email: String, + username: String, + password_hash: String, + role: String, + } + let row = sqlx::query_as::<_, Row>( + "SELECT id, email, username, password_hash, role FROM users WHERE username = $1", + ) + .bind(username_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + row.map(|r| { + Self::row_to_user( + r.id, + r.email, + r.username, + r.password_hash, + Self::parse_role(&r.role), + ) + }) + .transpose() + } + + async fn save(&self, user: &User) -> Result<(), DomainError> { + if self.find_by_email(user.email()).await?.is_some() { + return Err(DomainError::ValidationError( + "Email already registered".into(), + )); + } + if self.find_by_username(user.username()).await?.is_some() { + return Err(DomainError::ValidationError( + "Username already taken".into(), + )); + } + let id = user.id().value().to_string(); + let email = user.email().value(); + let username = user.username().value(); + let hash = user.password_hash().value(); + let created_at = Utc::now() + .naive_utc() + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + let role = match user.role() { + UserRole::Admin => "admin", + UserRole::Standard => "standard", + }; + sqlx::query( + "INSERT INTO users (id, email, username, password_hash, created_at, role) VALUES ($1, $2, $3, $4, $5::timestamptz, $6)", + ) + .bind(&id) + .bind(email) + .bind(username) + .bind(hash) + .bind(&created_at) + .bind(role) + .execute(&self.pool) + .await + .map_err(Self::map_err)?; + Ok(()) + } + + async fn find_by_id(&self, id: &UserId) -> Result, DomainError> { + let id_str = id.value().to_string(); + #[derive(sqlx::FromRow)] + struct Row { + id: String, + email: String, + username: String, + password_hash: String, + role: String, + } + let row = sqlx::query_as::<_, Row>( + "SELECT id, email, username, password_hash, role FROM users WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_err)?; + row.map(|r| { + Self::row_to_user( + r.id, + r.email, + r.username, + r.password_hash, + Self::parse_role(&r.role), + ) + }) + .transpose() + } + + async fn list_with_stats(&self) -> Result, DomainError> { + sqlx::query_as::<_, UserSummaryRow>( + r#"SELECT u.id, u.email, + COUNT(DISTINCT r.movie_id) AS total_movies, + AVG(r.rating::float) AS avg_rating + FROM users u + LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL + GROUP BY u.id, u.email + ORDER BY u.email ASC"#, + ) + .fetch_all(&self.pool) + .await + .map_err(Self::map_err)? + .into_iter() + .map(UserSummaryRow::to_domain) + .collect() + } +} diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 5582a17..c7d3a9c 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -1,5 +1,3 @@ -use std::default; - use chrono::{NaiveDateTime, Utc}; use crate::{ diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 4122a8f..97f547d 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -30,6 +30,8 @@ poster-fetcher = { workspace = true } poster-storage = { workspace = true } sqlite = { workspace = true } sqlite-federation = { workspace = true } +postgres = { workspace = true } +postgres-federation = { workspace = true } activitypub = { workspace = true } sqlx = { workspace = true } template-askama = { workspace = true } diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 4692b3f..5df6e04 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -14,6 +14,7 @@ use activitypub::{ ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler, }; +use activitypub::FederationRepository; use application::{config::AppConfig, context::AppContext}; use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService}; use export::ExportAdapter; @@ -23,12 +24,20 @@ use poster_storage::{PosterStorageAdapter, StorageConfig}; use rss::RssAdapter; use sqlite::{SqliteMovieRepository, SqliteUserRepository}; use sqlite_federation::SqliteFederationRepository; +use postgres::{PostgresRepository, PostgresUserRepository}; +use postgres_federation::PostgresFederationRepository; use template_askama::AskamaHtmlRenderer; use doc::ApiDocExt; use presentation::{openapi::ApiDoc, routes, state::AppState}; use utoipa::OpenApi as _; +use domain::ports::{ + AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, + PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, + UserRepository, +}; + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); @@ -57,34 +66,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let omdb_api_key = std::env::var("OMDB_API_KEY").context("OMDB_API_KEY must be set")?; let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?; - let opts = SqliteConnectOptions::from_str(&database_url) - .context("Invalid DATABASE_URL")? - .create_if_missing(true) - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) - .busy_timeout(std::time::Duration::from_secs(5)); - let pool = SqlitePool::connect_with(opts) - .await - .context("Failed to connect to SQLite database")?; + let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string()); - let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone())); - sqlite_repo - .migrate() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - .context("Database migration failed")?; - - use domain::ports::{ - AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository, - PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository, - UserRepository, - }; - let movie_repository: Arc = Arc::clone(&sqlite_repo) as _; - let review_repository: Arc = Arc::clone(&sqlite_repo) as _; - let diary_repository: Arc = Arc::clone(&sqlite_repo) as _; - let stats_repository: Arc = Arc::clone(&sqlite_repo) as _; - - let user_repository: Arc = - Arc::new(SqliteUserRepository::new(pool.clone())); let metadata_client: Arc = Arc::new(MetadataClientImpl::new_omdb(omdb_api_key)); let poster_fetcher: Arc = @@ -94,8 +77,14 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let auth_service: Arc = Arc::new(JwtAuthService::new(auth_config)); let password_hasher: Arc = Arc::new(Argon2PasswordHasher); - // Build a context for the poster handler. sync_poster doesn't publish events, - // so a noop publisher here is safe and avoids a circular dependency. + let (movie_repository, review_repository, diary_repository, stats_repository, + user_repository, federation_repo_dyn, review_store, social_query) = + if backend == "postgres" { + wire_postgres(&database_url).await? + } else { + wire_sqlite(&database_url).await? + }; + let handler_ctx = AppContext { movie_repository: Arc::clone(&movie_repository), review_repository: Arc::clone(&review_repository), @@ -112,19 +101,16 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { config: app_config.clone(), }; - // Federation - let federation_repo = Arc::new(SqliteFederationRepository::new(pool)); - let social_query: Arc = Arc::clone(&federation_repo) as _; let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository))); let review_handler = Arc::new(ReviewObjectHandler { movie_repository: Arc::clone(&movie_repository), diary_repository: Arc::clone(&diary_repository), - review_store: Arc::clone(&federation_repo) as Arc, + review_store, base_url: app_config.base_url.clone(), }); let concrete_ap_service = Arc::new( ActivityPubService::new( - federation_repo, + federation_repo_dyn, user_repo_adapter, review_handler, app_config.base_url.clone(), @@ -176,6 +162,78 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { Ok((state, ap_router)) } +type WireResult = anyhow::Result<( + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +)>; + +async fn wire_sqlite(database_url: &str) -> WireResult { + let opts = SqliteConnectOptions::from_str(database_url) + .context("Invalid DATABASE_URL")? + .create_if_missing(true) + .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) + .busy_timeout(std::time::Duration::from_secs(5)); + let pool = SqlitePool::connect_with(opts) + .await + .context("Failed to connect to SQLite database")?; + + let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone())); + sqlite_repo + .migrate() + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + .context("Database migration failed")?; + + let movie_repository: Arc = Arc::clone(&sqlite_repo) as _; + let review_repository: Arc = Arc::clone(&sqlite_repo) as _; + let diary_repository: Arc = Arc::clone(&sqlite_repo) as _; + let stats_repository: Arc = Arc::clone(&sqlite_repo) as _; + let user_repository: Arc = + Arc::new(SqliteUserRepository::new(pool.clone())); + + let fed = Arc::new(SqliteFederationRepository::new(pool)); + let federation_repo_dyn: Arc = Arc::clone(&fed) as _; + let review_store: Arc = Arc::clone(&fed) as _; + let social_query: Arc = fed; + + Ok((movie_repository, review_repository, diary_repository, stats_repository, + user_repository, federation_repo_dyn, review_store, social_query)) +} + +async fn wire_postgres(database_url: &str) -> WireResult { + let pool = sqlx::PgPool::connect(database_url) + .await + .context("Failed to connect to PostgreSQL database")?; + + let pg_repo = Arc::new(PostgresRepository::new(pool.clone())); + pg_repo + .migrate() + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + .context("Database migration failed")?; + + let movie_repository: Arc = Arc::clone(&pg_repo) as _; + let review_repository: Arc = Arc::clone(&pg_repo) as _; + let diary_repository: Arc = Arc::clone(&pg_repo) as _; + let stats_repository: Arc = Arc::clone(&pg_repo) as _; + let user_repository: Arc = + Arc::new(PostgresUserRepository::new(pool.clone())); + + let fed = Arc::new(PostgresFederationRepository::new(pool)); + let federation_repo_dyn: Arc = Arc::clone(&fed) as _; + let review_store: Arc = Arc::clone(&fed) as _; + let social_query: Arc = fed; + + Ok((movie_repository, review_repository, diary_repository, stats_repository, + user_repository, federation_repo_dyn, review_store, social_query)) +} + fn init_tracing() { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new(