feat: postgresql adapter
This commit is contained in:
12
.env.example
12
.env.example
@@ -1,12 +1,20 @@
|
|||||||
# Database
|
# Database backend — "sqlite" (default) or "postgres"
|
||||||
|
DATABASE_BACKEND=sqlite
|
||||||
|
|
||||||
|
# Option A: SQLite (default, zero external dependencies)
|
||||||
DATABASE_URL=sqlite://movies.db
|
DATABASE_URL=sqlite://movies.db
|
||||||
|
|
||||||
|
# Option B: PostgreSQL
|
||||||
|
# DATABASE_BACKEND=postgres
|
||||||
|
# DATABASE_URL=postgres://user:password@localhost:5432/movies_diary
|
||||||
|
|
||||||
# Authentication
|
# Authentication
|
||||||
JWT_SECRET=change-me
|
JWT_SECRET=change-me
|
||||||
JWT_TTL_SECONDS=86400
|
JWT_TTL_SECONDS=86400
|
||||||
|
|
||||||
# OMDb metadata
|
# OMDb/TMDB metadata
|
||||||
OMDB_API_KEY=your-key
|
OMDB_API_KEY=your-key
|
||||||
|
TMDB_API_KEY=your-key
|
||||||
|
|
||||||
# Poster storage — Option A (local) is active. To use S3, comment it out and uncomment Option B:
|
# Poster storage — Option A (local) is active. To use S3, comment it out and uncomment Option B:
|
||||||
|
|
||||||
|
|||||||
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -3183,6 +3183,35 @@ dependencies = [
|
|||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "postgres"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"domain",
|
||||||
|
"sqlx",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "postgres-federation"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"activitypub",
|
||||||
|
"activitypub-base",
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"domain",
|
||||||
|
"sqlx",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "potential_utf"
|
name = "potential_utf"
|
||||||
version = "0.1.5"
|
version = "0.1.5"
|
||||||
@@ -3230,6 +3259,8 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"poster-fetcher",
|
"poster-fetcher",
|
||||||
"poster-storage",
|
"poster-storage",
|
||||||
|
"postgres",
|
||||||
|
"postgres-federation",
|
||||||
"rss 0.1.0",
|
"rss 0.1.0",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -4286,6 +4317,7 @@ checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"chrono",
|
||||||
"crc",
|
"crc",
|
||||||
"crossbeam-queue",
|
"crossbeam-queue",
|
||||||
"either",
|
"either",
|
||||||
@@ -4364,6 +4396,7 @@ dependencies = [
|
|||||||
"bitflags 2.11.1",
|
"bitflags 2.11.1",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"chrono",
|
||||||
"crc",
|
"crc",
|
||||||
"digest",
|
"digest",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
@@ -4406,6 +4439,7 @@ dependencies = [
|
|||||||
"base64",
|
"base64",
|
||||||
"bitflags 2.11.1",
|
"bitflags 2.11.1",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"chrono",
|
||||||
"crc",
|
"crc",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
"etcetera",
|
"etcetera",
|
||||||
@@ -4441,6 +4475,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
|
checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atoi",
|
"atoi",
|
||||||
|
"chrono",
|
||||||
"flume",
|
"flume",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ members = [
|
|||||||
"crates/adapters/poster-storage",
|
"crates/adapters/poster-storage",
|
||||||
"crates/adapters/rss",
|
"crates/adapters/rss",
|
||||||
"crates/adapters/sqlite",
|
"crates/adapters/sqlite",
|
||||||
|
"crates/adapters/postgres",
|
||||||
"crates/adapters/sqlite-federation",
|
"crates/adapters/sqlite-federation",
|
||||||
|
"crates/adapters/postgres-federation",
|
||||||
"crates/adapters/template-askama",
|
"crates/adapters/template-askama",
|
||||||
"crates/adapters/activitypub",
|
"crates/adapters/activitypub",
|
||||||
"crates/adapters/activitypub-base",
|
"crates/adapters/activitypub-base",
|
||||||
@@ -54,6 +56,8 @@ rss = { path = "crates/adapters/rss" }
|
|||||||
export = { path = "crates/adapters/export" }
|
export = { path = "crates/adapters/export" }
|
||||||
sqlite = { path = "crates/adapters/sqlite" }
|
sqlite = { path = "crates/adapters/sqlite" }
|
||||||
sqlite-federation = { path = "crates/adapters/sqlite-federation" }
|
sqlite-federation = { path = "crates/adapters/sqlite-federation" }
|
||||||
|
postgres = { path = "crates/adapters/postgres" }
|
||||||
|
postgres-federation = { path = "crates/adapters/postgres-federation" }
|
||||||
template-askama = { path = "crates/adapters/template-askama" }
|
template-askama = { path = "crates/adapters/template-askama" }
|
||||||
activitypub = { path = "crates/adapters/activitypub" }
|
activitypub = { path = "crates/adapters/activitypub" }
|
||||||
activitypub-base = { path = "crates/adapters/activitypub-base" }
|
activitypub-base = { path = "crates/adapters/activitypub-base" }
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.t
|
|||||||
COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml
|
COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml
|
||||||
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
|
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
|
||||||
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
|
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
|
||||||
|
COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml
|
||||||
|
COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml
|
||||||
COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml
|
COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml
|
||||||
COPY crates/application/Cargo.toml crates/application/Cargo.toml
|
COPY crates/application/Cargo.toml crates/application/Cargo.toml
|
||||||
COPY crates/domain/Cargo.toml crates/domain/Cargo.toml
|
COPY crates/domain/Cargo.toml crates/domain/Cargo.toml
|
||||||
|
|||||||
24
crates/adapters/postgres-federation/Cargo.toml
Normal file
24
crates/adapters/postgres-federation/Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
[package]
|
||||||
|
name = "postgres-federation"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sqlx = { version = "0.8.6", features = [
|
||||||
|
"runtime-tokio-rustls",
|
||||||
|
"postgres",
|
||||||
|
"uuid",
|
||||||
|
"macros",
|
||||||
|
"chrono",
|
||||||
|
] }
|
||||||
|
activitypub = { workspace = true }
|
||||||
|
activitypub-base = { workspace = true }
|
||||||
|
domain = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true }
|
||||||
476
crates/adapters/postgres-federation/src/lib.rs
Normal file
476
crates/adapters/postgres-federation/src/lib.rs
Normal file
@@ -0,0 +1,476 @@
|
|||||||
|
use anyhow::{Result, anyhow};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{NaiveDateTime, Utc};
|
||||||
|
use sqlx::{PgPool, Row};
|
||||||
|
|
||||||
|
use activitypub::RemoteReviewRepository;
|
||||||
|
use activitypub_base::{
|
||||||
|
FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
|
||||||
|
};
|
||||||
|
use domain::models::{Review, ReviewSource};
|
||||||
|
|
||||||
|
fn datetime_to_str(dt: &NaiveDateTime) -> String {
|
||||||
|
dt.format("%Y-%m-%d %H:%M:%S").to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PostgresFederationRepository {
|
||||||
|
pool: PgPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostgresFederationRepository {
|
||||||
|
pub fn new(pool: PgPool) -> Self {
|
||||||
|
Self { pool }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_to_str(status: &FollowerStatus) -> &'static str {
|
||||||
|
match status {
|
||||||
|
FollowerStatus::Pending => "pending",
|
||||||
|
FollowerStatus::Accepted => "accepted",
|
||||||
|
FollowerStatus::Rejected => "rejected",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn str_to_status(s: &str) -> FollowerStatus {
|
||||||
|
match s {
|
||||||
|
"accepted" => FollowerStatus::Accepted,
|
||||||
|
"rejected" => FollowerStatus::Rejected,
|
||||||
|
_ => FollowerStatus::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl FederationRepository for PostgresFederationRepository {
|
||||||
|
async fn add_follower(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
status: FollowerStatus,
|
||||||
|
follow_activity_id: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let status_str = status_to_str(&status);
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let created_at = datetime_to_str(&now);
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO ap_followers (local_user_id, remote_actor_url, status, created_at, follow_activity_id)
|
||||||
|
VALUES ($1, $2, $3, $4::timestamptz, $5)
|
||||||
|
ON CONFLICT(local_user_id, remote_actor_url) DO UPDATE SET
|
||||||
|
status = EXCLUDED.status,
|
||||||
|
follow_activity_id = EXCLUDED.follow_activity_id",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.bind(status_str)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(follow_activity_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_follower_follow_activity_id(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<Option<String>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let row: Option<String> = sqlx::query_scalar(
|
||||||
|
"SELECT follow_activity_id FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_follower(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
sqlx::query("DELETE FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2")
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<Follower>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let rows = sqlx::query(
|
||||||
|
"SELECT f.remote_actor_url, f.status,
|
||||||
|
a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
|
||||||
|
FROM ap_followers f
|
||||||
|
LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url
|
||||||
|
WHERE f.local_user_id = $1",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(rows.into_iter().map(|row| {
|
||||||
|
let url: String = row.get("remote_actor_url");
|
||||||
|
let status_str: String = row.get("status");
|
||||||
|
let handle: String = row.try_get("handle").unwrap_or_default();
|
||||||
|
let inbox_url: String = row.try_get("inbox_url").unwrap_or_default();
|
||||||
|
let shared_inbox_url: Option<String> = row.try_get("shared_inbox_url").ok().flatten();
|
||||||
|
let display_name: Option<String> = row.try_get("display_name").ok().flatten();
|
||||||
|
Follower {
|
||||||
|
actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name },
|
||||||
|
status: str_to_status(&status_str),
|
||||||
|
}
|
||||||
|
}).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_follower_status(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
status: FollowerStatus,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let status_str = status_to_str(&status);
|
||||||
|
let result = sqlx::query(
|
||||||
|
"UPDATE ap_followers SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3",
|
||||||
|
)
|
||||||
|
.bind(status_str)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
if result.rows_affected() == 0 {
|
||||||
|
tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn add_following(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
actor: RemoteActor,
|
||||||
|
follow_activity_id: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let created_at = datetime_to_str(&now);
|
||||||
|
self.upsert_remote_actor(actor.clone()).await?;
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at)
|
||||||
|
VALUES ($1, $2, $3, $4::timestamptz)
|
||||||
|
ON CONFLICT DO NOTHING",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(&actor.url)
|
||||||
|
.bind(follow_activity_id)
|
||||||
|
.bind(&created_at)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_follow_activity_id(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<Option<String>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let row: Option<String> = sqlx::query_scalar(
|
||||||
|
"SELECT follow_activity_id FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
sqlx::query("DELETE FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2")
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let rows = sqlx::query(
|
||||||
|
"SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
|
||||||
|
FROM ap_following f
|
||||||
|
INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url
|
||||||
|
WHERE f.local_user_id = $1 AND f.status = 'accepted'",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(rows.into_iter().map(|row| RemoteActor {
|
||||||
|
url: row.get("url"),
|
||||||
|
handle: row.get("handle"),
|
||||||
|
inbox_url: row.get("inbox_url"),
|
||||||
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
|
}).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let count: i64 = sqlx::query_scalar(
|
||||||
|
"SELECT COUNT(*) FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(count as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> {
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let fetched_at = datetime_to_str(&now);
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, fetched_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6::timestamptz)
|
||||||
|
ON CONFLICT(url) DO UPDATE SET
|
||||||
|
handle = EXCLUDED.handle,
|
||||||
|
inbox_url = EXCLUDED.inbox_url,
|
||||||
|
shared_inbox_url = EXCLUDED.shared_inbox_url,
|
||||||
|
display_name = EXCLUDED.display_name,
|
||||||
|
fetched_at = EXCLUDED.fetched_at",
|
||||||
|
)
|
||||||
|
.bind(&actor.url)
|
||||||
|
.bind(&actor.handle)
|
||||||
|
.bind(&actor.inbox_url)
|
||||||
|
.bind(&actor.shared_inbox_url)
|
||||||
|
.bind(&actor.display_name)
|
||||||
|
.bind(&fetched_at)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_remote_actor(&self, actor_url: &str) -> Result<Option<RemoteActor>> {
|
||||||
|
let row = sqlx::query(
|
||||||
|
"SELECT url, handle, inbox_url, shared_inbox_url, display_name
|
||||||
|
FROM ap_remote_actors WHERE url = $1",
|
||||||
|
)
|
||||||
|
.bind(actor_url)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row.map(|row| RemoteActor {
|
||||||
|
url: row.get("url"),
|
||||||
|
handle: row.get("handle"),
|
||||||
|
inbox_url: row.get("inbox_url"),
|
||||||
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result<Option<(String, String)>> {
|
||||||
|
let uid = user_id.to_string();
|
||||||
|
let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1")
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row.map(|r| (r.get("public_key"), r.get("private_key"))))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_local_actor_keypair(
|
||||||
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
public_key: String,
|
||||||
|
private_key: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = user_id.to_string();
|
||||||
|
let now = Utc::now().naive_utc();
|
||||||
|
let created_at = datetime_to_str(&now);
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at)
|
||||||
|
VALUES ($1, $2, $3, $4::timestamptz)
|
||||||
|
ON CONFLICT(user_id) DO UPDATE SET
|
||||||
|
public_key = EXCLUDED.public_key,
|
||||||
|
private_key = EXCLUDED.private_key",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(&public_key)
|
||||||
|
.bind(&private_key)
|
||||||
|
.bind(&created_at)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let rows = sqlx::query(
|
||||||
|
"SELECT f.remote_actor_url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
|
||||||
|
FROM ap_followers f
|
||||||
|
LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url
|
||||||
|
WHERE f.local_user_id = $1 AND f.status = 'pending'",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(rows.into_iter().map(|row| RemoteActor {
|
||||||
|
url: row.get("remote_actor_url"),
|
||||||
|
handle: row.try_get("handle").unwrap_or_default(),
|
||||||
|
inbox_url: row.try_get("inbox_url").unwrap_or_default(),
|
||||||
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
|
}).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_following_status(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
status: FollowingStatus,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let status_str = match status {
|
||||||
|
FollowingStatus::Pending => "pending",
|
||||||
|
FollowingStatus::Accepted => "accepted",
|
||||||
|
};
|
||||||
|
let result = sqlx::query(
|
||||||
|
"UPDATE ap_following SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3",
|
||||||
|
)
|
||||||
|
.bind(status_str)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
if result.rows_affected() == 0 {
|
||||||
|
tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RemoteReviewRepository for PostgresFederationRepository {
|
||||||
|
async fn save_remote_review(
|
||||||
|
&self,
|
||||||
|
review: &Review,
|
||||||
|
ap_id: &str,
|
||||||
|
movie_title: &str,
|
||||||
|
release_year: u16,
|
||||||
|
poster_url: Option<&str>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let actor_url = match review.source() {
|
||||||
|
ReviewSource::Remote { actor_url } => actor_url.clone(),
|
||||||
|
ReviewSource::Local => return Err(anyhow!("save_remote_review called with a local review")),
|
||||||
|
};
|
||||||
|
let movie_id = review.movie_id().value().to_string();
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path)
|
||||||
|
VALUES ($1, NULL, $2, $3, NULL, $4)
|
||||||
|
ON CONFLICT(id) DO UPDATE SET
|
||||||
|
poster_path = COALESCE(EXCLUDED.poster_path, movies.poster_path)",
|
||||||
|
)
|
||||||
|
.bind(&movie_id)
|
||||||
|
.bind(movie_title)
|
||||||
|
.bind(release_year.max(1888) as i64)
|
||||||
|
.bind(poster_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let id = review.id().value().to_string();
|
||||||
|
let user_id = review.user_id().value().to_string();
|
||||||
|
let rating = review.rating().value() as i64;
|
||||||
|
let comment = review.comment().map(|c| c.value().to_string());
|
||||||
|
let watched_at = datetime_to_str(review.watched_at());
|
||||||
|
let created_at = datetime_to_str(review.created_at());
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url, ap_id)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8, $9)
|
||||||
|
ON CONFLICT DO NOTHING",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&movie_id)
|
||||||
|
.bind(&user_id)
|
||||||
|
.bind(rating)
|
||||||
|
.bind(&comment)
|
||||||
|
.bind(&watched_at)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(&actor_url)
|
||||||
|
.bind(ap_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()> {
|
||||||
|
sqlx::query("DELETE FROM reviews WHERE ap_id = $1 AND remote_actor_url = $2")
|
||||||
|
.bind(ap_id)
|
||||||
|
.bind(actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_remote_review(
|
||||||
|
&self,
|
||||||
|
ap_id: &str,
|
||||||
|
actor_url: &str,
|
||||||
|
rating: u8,
|
||||||
|
comment: Option<&str>,
|
||||||
|
watched_at: chrono::NaiveDateTime,
|
||||||
|
) -> Result<()> {
|
||||||
|
let watched_at_str = datetime_to_str(&watched_at);
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE reviews SET rating = $1, comment = $2, watched_at = $3::timestamptz
|
||||||
|
WHERE ap_id = $4 AND remote_actor_url = $5",
|
||||||
|
)
|
||||||
|
.bind(rating as i64)
|
||||||
|
.bind(comment)
|
||||||
|
.bind(&watched_at_str)
|
||||||
|
.bind(ap_id)
|
||||||
|
.bind(actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_by_actor(&self, actor_url: &str) -> Result<()> {
|
||||||
|
sqlx::query("DELETE FROM reviews WHERE remote_actor_url = $1")
|
||||||
|
.bind(actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl domain::ports::SocialQueryPort for PostgresFederationRepository {
|
||||||
|
async fn get_accepted_following_urls(
|
||||||
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
) -> Result<Vec<String>, domain::errors::DomainError> {
|
||||||
|
let user_id_str = user_id.to_string();
|
||||||
|
sqlx::query_scalar::<_, String>(
|
||||||
|
"SELECT remote_actor_url FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'",
|
||||||
|
)
|
||||||
|
.bind(&user_id_str)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::InfrastructureError(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_all_followed_remote_actors(
|
||||||
|
&self,
|
||||||
|
) -> Result<Vec<domain::ports::RemoteActorInfo>, domain::errors::DomainError> {
|
||||||
|
let rows = sqlx::query_as::<_, (String, String, Option<String>)>(
|
||||||
|
"SELECT DISTINCT ar.url, ar.handle, ar.display_name
|
||||||
|
FROM ap_remote_actors ar
|
||||||
|
JOIN ap_following f ON f.remote_actor_url = ar.url
|
||||||
|
WHERE f.status = 'accepted'",
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| domain::errors::DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
Ok(rows.into_iter().map(|(url, handle, display_name)| domain::ports::RemoteActorInfo { url, handle, display_name }).collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
19
crates/adapters/postgres/Cargo.toml
Normal file
19
crates/adapters/postgres/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "postgres"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sqlx = { version = "0.8.6", features = [
|
||||||
|
"runtime-tokio-rustls",
|
||||||
|
"postgres",
|
||||||
|
"uuid",
|
||||||
|
"macros",
|
||||||
|
"chrono",
|
||||||
|
] }
|
||||||
|
domain = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
69
crates/adapters/postgres/migrations/0001_initial.sql
Normal file
69
crates/adapters/postgres/migrations/0001_initial.sql
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS movies (
|
||||||
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
|
external_metadata_id TEXT UNIQUE,
|
||||||
|
title TEXT NOT NULL,
|
||||||
|
release_year BIGINT NOT NULL,
|
||||||
|
director TEXT,
|
||||||
|
poster_path TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_movies_title_year ON movies (title, release_year);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
|
email TEXT UNIQUE NOT NULL,
|
||||||
|
username TEXT UNIQUE NOT NULL,
|
||||||
|
password_hash TEXT NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL,
|
||||||
|
role TEXT NOT NULL DEFAULT 'standard'
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS reviews (
|
||||||
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
|
movie_id TEXT NOT NULL REFERENCES movies(id),
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
rating BIGINT NOT NULL,
|
||||||
|
comment TEXT,
|
||||||
|
watched_at TIMESTAMPTZ NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL,
|
||||||
|
remote_actor_url TEXT,
|
||||||
|
ap_id TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_reviews_ap_id ON reviews (ap_id) WHERE ap_id IS NOT NULL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_reviews_movie_id ON reviews (movie_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_reviews_watched_at ON reviews (watched_at);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS ap_followers (
|
||||||
|
local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
remote_actor_url TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
follow_activity_id TEXT,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL,
|
||||||
|
PRIMARY KEY (local_user_id, remote_actor_url)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS ap_following (
|
||||||
|
local_user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
remote_actor_url TEXT NOT NULL,
|
||||||
|
follow_activity_id TEXT,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
created_at TIMESTAMPTZ NOT NULL,
|
||||||
|
PRIMARY KEY (local_user_id, remote_actor_url)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS ap_remote_actors (
|
||||||
|
url TEXT PRIMARY KEY,
|
||||||
|
handle TEXT NOT NULL,
|
||||||
|
inbox_url TEXT NOT NULL,
|
||||||
|
shared_inbox_url TEXT,
|
||||||
|
display_name TEXT,
|
||||||
|
fetched_at TIMESTAMPTZ NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS ap_local_actors (
|
||||||
|
user_id TEXT PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
|
||||||
|
public_key TEXT NOT NULL,
|
||||||
|
private_key TEXT NOT NULL,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL
|
||||||
|
);
|
||||||
768
crates/adapters/postgres/src/lib.rs
Normal file
768
crates/adapters/postgres/src/lib.rs
Normal file
@@ -0,0 +1,768 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
|
models::{
|
||||||
|
DiaryEntry, DiaryFilter, DirectorStat, FeedEntry, MonthlyRating, Movie, Review,
|
||||||
|
ReviewHistory, ReviewSource, SortDirection, UserStats, UserTrends,
|
||||||
|
collections::{PageParams, Paginated},
|
||||||
|
},
|
||||||
|
ports::{DiaryRepository, MovieRepository, ReviewRepository, StatsRepository},
|
||||||
|
value_objects::{ExternalMetadataId, MovieId, MovieTitle, ReleaseYear, ReviewId, UserId},
|
||||||
|
};
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
mod models;
|
||||||
|
mod users;
|
||||||
|
|
||||||
|
use models::{
|
||||||
|
DiaryRow, DirectorCountRow, FeedRow, MonthlyRatingRow, MovieRow, ReviewRow, UserTotalsRow,
|
||||||
|
datetime_to_str,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub use users::PostgresUserRepository;
|
||||||
|
|
||||||
|
fn format_year_month(ym: &str) -> String {
|
||||||
|
let parts: Vec<&str> = ym.splitn(2, '-').collect();
|
||||||
|
if parts.len() != 2 {
|
||||||
|
return ym.to_string();
|
||||||
|
}
|
||||||
|
let year = parts[0].get(2..).unwrap_or(parts[0]);
|
||||||
|
let month = match parts[1] {
|
||||||
|
"01" => "Jan",
|
||||||
|
"02" => "Feb",
|
||||||
|
"03" => "Mar",
|
||||||
|
"04" => "Apr",
|
||||||
|
"05" => "May",
|
||||||
|
"06" => "Jun",
|
||||||
|
"07" => "Jul",
|
||||||
|
"08" => "Aug",
|
||||||
|
"09" => "Sep",
|
||||||
|
"10" => "Oct",
|
||||||
|
"11" => "Nov",
|
||||||
|
"12" => "Dec",
|
||||||
|
_ => parts[1],
|
||||||
|
};
|
||||||
|
format!("{} '{}", month, year)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PostgresRepository {
|
||||||
|
pool: PgPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostgresRepository {
|
||||||
|
pub fn new(pool: PgPool) -> Self {
|
||||||
|
Self { pool }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn migrate(&self) -> Result<(), DomainError> {
|
||||||
|
sqlx::migrate!("./migrations")
|
||||||
|
.run(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_err(e: sqlx::Error) -> DomainError {
|
||||||
|
tracing::error!("Database error: {:?}", e);
|
||||||
|
DomainError::InfrastructureError("Database operation failed".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count_diary_entries(&self, movie_id: Option<&str>) -> Result<i64, DomainError> {
|
||||||
|
match movie_id {
|
||||||
|
None => sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews")
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err),
|
||||||
|
Some(id) => {
|
||||||
|
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM reviews WHERE movie_id = $1")
|
||||||
|
.bind(id)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_all_diary_rows(
|
||||||
|
&self,
|
||||||
|
sort: &SortDirection,
|
||||||
|
limit: i64,
|
||||||
|
offset: i64,
|
||||||
|
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||||
|
let order = match sort {
|
||||||
|
SortDirection::Ascending => "r.watched_at ASC",
|
||||||
|
_ => "r.watched_at DESC",
|
||||||
|
};
|
||||||
|
let sql = format!(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
ORDER BY {}
|
||||||
|
LIMIT $1 OFFSET $2",
|
||||||
|
order
|
||||||
|
);
|
||||||
|
sqlx::query_as::<_, DiaryRow>(&sql)
|
||||||
|
.bind(limit)
|
||||||
|
.bind(offset)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_movie_diary_rows(
|
||||||
|
&self,
|
||||||
|
movie_id: &str,
|
||||||
|
sort: &SortDirection,
|
||||||
|
limit: i64,
|
||||||
|
offset: i64,
|
||||||
|
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||||
|
let order = match sort {
|
||||||
|
SortDirection::Ascending => "r.watched_at ASC",
|
||||||
|
_ => "r.watched_at DESC",
|
||||||
|
};
|
||||||
|
let sql = format!(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.movie_id = $1
|
||||||
|
ORDER BY {}
|
||||||
|
LIMIT $2 OFFSET $3",
|
||||||
|
order
|
||||||
|
);
|
||||||
|
sqlx::query_as::<_, DiaryRow>(&sql)
|
||||||
|
.bind(movie_id)
|
||||||
|
.bind(limit)
|
||||||
|
.bind(offset)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count_user_diary_entries(
|
||||||
|
&self,
|
||||||
|
user_id: &str,
|
||||||
|
search: Option<&str>,
|
||||||
|
) -> Result<i64, DomainError> {
|
||||||
|
let has_search = search.map(|s| !s.is_empty()).unwrap_or(false);
|
||||||
|
let sql = if has_search {
|
||||||
|
"SELECT COUNT(*) FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1 AND m.title ILIKE '%' || $2 || '%'"
|
||||||
|
.to_string()
|
||||||
|
} else {
|
||||||
|
"SELECT COUNT(*) FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1"
|
||||||
|
.to_string()
|
||||||
|
};
|
||||||
|
let mut q = sqlx::query_scalar::<_, i64>(&sql).bind(user_id);
|
||||||
|
if has_search {
|
||||||
|
q = q.bind(search.unwrap());
|
||||||
|
}
|
||||||
|
q.fetch_one(&self.pool).await.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_user_diary_rows(
|
||||||
|
&self,
|
||||||
|
user_id: &str,
|
||||||
|
sort: &SortDirection,
|
||||||
|
search: Option<&str>,
|
||||||
|
limit: i64,
|
||||||
|
offset: i64,
|
||||||
|
) -> Result<Vec<DiaryRow>, DomainError> {
|
||||||
|
let has_search = search.map(|s| !s.is_empty()).unwrap_or(false);
|
||||||
|
let order_clause = match sort {
|
||||||
|
SortDirection::ByRatingDesc => "r.rating DESC, r.watched_at DESC",
|
||||||
|
SortDirection::ByRatingAsc => "r.rating ASC, r.watched_at ASC",
|
||||||
|
SortDirection::Ascending => "r.watched_at ASC",
|
||||||
|
SortDirection::Descending => "r.watched_at DESC",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build param counter: user_id=$1, optional search=$2, limit=$N-1, offset=$N
|
||||||
|
let mut p: i32 = 1; // $1 is user_id
|
||||||
|
let search_clause = if has_search {
|
||||||
|
p += 1;
|
||||||
|
format!(" AND m.title ILIKE '%' || ${} || '%'", p)
|
||||||
|
} else {
|
||||||
|
String::new()
|
||||||
|
};
|
||||||
|
p += 1;
|
||||||
|
let limit_param = format!("${}", p);
|
||||||
|
p += 1;
|
||||||
|
let offset_param = format!("${}", p);
|
||||||
|
|
||||||
|
let sql = format!(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1{}
|
||||||
|
ORDER BY {}
|
||||||
|
LIMIT {} OFFSET {}",
|
||||||
|
search_clause, order_clause, limit_param, offset_param
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut q = sqlx::query_as::<_, DiaryRow>(&sql).bind(user_id);
|
||||||
|
if has_search {
|
||||||
|
q = q.bind(search.unwrap());
|
||||||
|
}
|
||||||
|
q.bind(limit)
|
||||||
|
.bind(offset)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_user_totals(&self, user_id: &str) -> Result<UserTotalsRow, DomainError> {
|
||||||
|
sqlx::query_as::<_, UserTotalsRow>(
|
||||||
|
r#"SELECT COUNT(DISTINCT movie_id) AS total,
|
||||||
|
AVG(rating::float) AS avg_rating
|
||||||
|
FROM reviews WHERE user_id = $1"#,
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_user_favorite_director(
|
||||||
|
&self,
|
||||||
|
user_id: &str,
|
||||||
|
) -> Result<Option<String>, DomainError> {
|
||||||
|
sqlx::query_scalar::<_, String>(
|
||||||
|
"SELECT m.director
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1 AND m.director IS NOT NULL
|
||||||
|
GROUP BY m.director
|
||||||
|
ORDER BY COUNT(*) DESC
|
||||||
|
LIMIT 1",
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_user_most_active_month(
|
||||||
|
&self,
|
||||||
|
user_id: &str,
|
||||||
|
) -> Result<Option<String>, DomainError> {
|
||||||
|
sqlx::query_scalar::<_, String>(
|
||||||
|
"SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month
|
||||||
|
FROM reviews
|
||||||
|
WHERE user_id = $1
|
||||||
|
GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM')
|
||||||
|
ORDER BY COUNT(*) DESC
|
||||||
|
LIMIT 1",
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl MovieRepository for PostgresRepository {
|
||||||
|
async fn get_movie_by_external_id(
|
||||||
|
&self,
|
||||||
|
external_metadata_id: &ExternalMetadataId,
|
||||||
|
) -> Result<Option<Movie>, DomainError> {
|
||||||
|
let id = external_metadata_id.value();
|
||||||
|
sqlx::query_as::<_, MovieRow>(
|
||||||
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
|
FROM movies WHERE external_metadata_id = $1",
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.map(MovieRow::to_domain)
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result<Option<Movie>, DomainError> {
|
||||||
|
let id = movie_id.value().to_string();
|
||||||
|
sqlx::query_as::<_, MovieRow>(
|
||||||
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
|
FROM movies WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.map(MovieRow::to_domain)
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_movies_by_title_and_year(
|
||||||
|
&self,
|
||||||
|
title: &MovieTitle,
|
||||||
|
year: &ReleaseYear,
|
||||||
|
) -> Result<Vec<Movie>, DomainError> {
|
||||||
|
let title = title.value();
|
||||||
|
let year = year.value() as i64;
|
||||||
|
sqlx::query_as::<_, MovieRow>(
|
||||||
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
|
FROM movies WHERE title = $1 AND release_year = $2",
|
||||||
|
)
|
||||||
|
.bind(title)
|
||||||
|
.bind(year)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.into_iter()
|
||||||
|
.map(MovieRow::to_domain)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError> {
|
||||||
|
let id = movie.id().value().to_string();
|
||||||
|
let external_metadata_id = movie.external_metadata_id().map(|e| e.value().to_string());
|
||||||
|
let title = movie.title().value();
|
||||||
|
let release_year = movie.release_year().value() as i64;
|
||||||
|
let director = movie.director();
|
||||||
|
let poster_path = movie.poster_path().map(|p| p.value().to_string());
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
ON CONFLICT(id) DO UPDATE SET
|
||||||
|
external_metadata_id = excluded.external_metadata_id,
|
||||||
|
title = excluded.title,
|
||||||
|
release_year = excluded.release_year,
|
||||||
|
director = excluded.director,
|
||||||
|
poster_path = excluded.poster_path",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&external_metadata_id)
|
||||||
|
.bind(title)
|
||||||
|
.bind(release_year)
|
||||||
|
.bind(director)
|
||||||
|
.bind(&poster_path)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError> {
|
||||||
|
let id = movie_id.value().to_string();
|
||||||
|
sqlx::query("DELETE FROM movies WHERE id = $1")
|
||||||
|
.bind(&id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ReviewRepository for PostgresRepository {
|
||||||
|
async fn save_review(&self, review: &Review) -> Result<DomainEvent, DomainError> {
|
||||||
|
let id = review.id().value().to_string();
|
||||||
|
let movie_id = review.movie_id().value().to_string();
|
||||||
|
let user_id = review.user_id().value().to_string();
|
||||||
|
let rating = review.rating().value() as i64;
|
||||||
|
let comment = review.comment().map(|c| c.value().to_string());
|
||||||
|
let watched_at = datetime_to_str(review.watched_at());
|
||||||
|
let created_at = datetime_to_str(review.created_at());
|
||||||
|
let remote_actor_url = match review.source() {
|
||||||
|
ReviewSource::Local => None,
|
||||||
|
ReviewSource::Remote { actor_url } => Some(actor_url.clone()),
|
||||||
|
};
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6::timestamptz, $7::timestamptz, $8)",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(&movie_id)
|
||||||
|
.bind(&user_id)
|
||||||
|
.bind(rating)
|
||||||
|
.bind(&comment)
|
||||||
|
.bind(&watched_at)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(&remote_actor_url)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
Ok(DomainEvent::ReviewLogged {
|
||||||
|
review_id: review.id().clone(),
|
||||||
|
movie_id: review.movie_id().clone(),
|
||||||
|
user_id: review.user_id().clone(),
|
||||||
|
rating: review.rating().clone(),
|
||||||
|
watched_at: *review.watched_at(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_review_by_id(&self, review_id: &ReviewId) -> Result<Option<Review>, DomainError> {
|
||||||
|
let id = review_id.value().to_string();
|
||||||
|
sqlx::query_as::<_, ReviewRow>(
|
||||||
|
"SELECT id, movie_id, user_id, rating, comment,
|
||||||
|
to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
remote_actor_url
|
||||||
|
FROM reviews WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.map(ReviewRow::to_domain)
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError> {
|
||||||
|
let id = review_id.value().to_string();
|
||||||
|
sqlx::query("DELETE FROM reviews WHERE id = $1")
|
||||||
|
.bind(&id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_all_reviews_for_user(
|
||||||
|
&self,
|
||||||
|
_user_id: &UserId,
|
||||||
|
) -> Result<Vec<Review>, DomainError> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DiaryRepository for PostgresRepository {
|
||||||
|
async fn query_diary(
|
||||||
|
&self,
|
||||||
|
filter: &DiaryFilter,
|
||||||
|
) -> Result<Paginated<DiaryEntry>, DomainError> {
|
||||||
|
let limit = filter.page.limit as i64;
|
||||||
|
let offset = filter.page.offset as i64;
|
||||||
|
|
||||||
|
let (total, rows) = match (&filter.movie_id, &filter.user_id) {
|
||||||
|
(None, None) => tokio::try_join!(
|
||||||
|
self.count_diary_entries(None),
|
||||||
|
self.fetch_all_diary_rows(&filter.sort_by, limit, offset)
|
||||||
|
)?,
|
||||||
|
(Some(id), None) => {
|
||||||
|
let id_str = id.value().to_string();
|
||||||
|
tokio::try_join!(
|
||||||
|
self.count_diary_entries(Some(id_str.as_str())),
|
||||||
|
self.fetch_movie_diary_rows(&id_str, &filter.sort_by, limit, offset)
|
||||||
|
)?
|
||||||
|
}
|
||||||
|
(None, Some(uid)) => {
|
||||||
|
let uid_str = uid.value().to_string();
|
||||||
|
let search = filter.search.as_deref();
|
||||||
|
tokio::try_join!(
|
||||||
|
self.count_user_diary_entries(&uid_str, search),
|
||||||
|
self.fetch_user_diary_rows(&uid_str, &filter.sort_by, search, limit, offset)
|
||||||
|
)?
|
||||||
|
}
|
||||||
|
(Some(_), Some(_)) => {
|
||||||
|
return Err(DomainError::ValidationError(
|
||||||
|
"Combined movie_id + user_id filter not supported".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(DiaryRow::to_domain)
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
Ok(Paginated {
|
||||||
|
items,
|
||||||
|
total_count: total as u64,
|
||||||
|
limit: filter.page.limit,
|
||||||
|
offset: filter.page.offset,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_activity_feed(
|
||||||
|
&self,
|
||||||
|
page: &PageParams,
|
||||||
|
) -> Result<Paginated<FeedEntry>, DomainError> {
|
||||||
|
self.query_activity_feed_filtered(page, &domain::ports::FeedSortBy::Date, None, None)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_activity_feed_filtered(
|
||||||
|
&self,
|
||||||
|
page: &PageParams,
|
||||||
|
sort_by: &domain::ports::FeedSortBy,
|
||||||
|
search: Option<&str>,
|
||||||
|
following: Option<&domain::ports::FollowingFilter>,
|
||||||
|
) -> Result<Paginated<FeedEntry>, DomainError> {
|
||||||
|
use domain::ports::FeedSortBy;
|
||||||
|
|
||||||
|
let limit = page.limit as i64;
|
||||||
|
let offset = page.offset as i64;
|
||||||
|
let has_search = search.map(|s| !s.is_empty()).unwrap_or(false);
|
||||||
|
|
||||||
|
// Dynamic param counter
|
||||||
|
let mut p: i32 = 0;
|
||||||
|
let mut next_param = || {
|
||||||
|
p += 1;
|
||||||
|
format!("${}", p)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut where_parts = vec!["1=1".to_string()];
|
||||||
|
|
||||||
|
if has_search {
|
||||||
|
let pn = next_param();
|
||||||
|
where_parts.push(format!("m.title ILIKE '%' || {} || '%'", pn));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(f) = following {
|
||||||
|
let local_params: Vec<String> =
|
||||||
|
f.local_user_ids.iter().map(|_| next_param()).collect();
|
||||||
|
let remote_params: Vec<String> =
|
||||||
|
f.remote_actor_urls.iter().map(|_| next_param()).collect();
|
||||||
|
|
||||||
|
let local_in = if local_params.is_empty() {
|
||||||
|
"(SELECT NULL::text WHERE false)".to_string()
|
||||||
|
} else {
|
||||||
|
local_params.join(", ")
|
||||||
|
};
|
||||||
|
let remote_in = if remote_params.is_empty() {
|
||||||
|
"(SELECT NULL::text WHERE false)".to_string()
|
||||||
|
} else {
|
||||||
|
remote_params.join(", ")
|
||||||
|
};
|
||||||
|
where_parts.push(format!(
|
||||||
|
"(r.user_id IN ({}) OR r.remote_actor_url IN ({}))",
|
||||||
|
local_in, remote_in
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let limit_param = next_param();
|
||||||
|
let offset_param = next_param();
|
||||||
|
|
||||||
|
let order_clause = match sort_by {
|
||||||
|
FeedSortBy::Date => "r.watched_at DESC",
|
||||||
|
FeedSortBy::DateAsc => "r.watched_at ASC",
|
||||||
|
FeedSortBy::Rating => "r.rating DESC, r.watched_at DESC",
|
||||||
|
FeedSortBy::RatingAsc => "r.rating ASC, r.watched_at ASC",
|
||||||
|
};
|
||||||
|
|
||||||
|
let where_clause = where_parts.join(" AND ");
|
||||||
|
|
||||||
|
// Reset counter for count query (reuse same where_clause string but re-bind)
|
||||||
|
// We need a separate counter for count SQL — but since where_clause is already built
|
||||||
|
// with the right $N references, both queries share it.
|
||||||
|
let count_sql = format!(
|
||||||
|
"SELECT COUNT(*) FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE {}",
|
||||||
|
where_clause
|
||||||
|
);
|
||||||
|
|
||||||
|
let select_sql = format!(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url,
|
||||||
|
COALESCE(u.email, r.remote_actor_url) AS user_email
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
LEFT JOIN users u ON u.id = r.user_id
|
||||||
|
WHERE {}
|
||||||
|
ORDER BY {}
|
||||||
|
LIMIT {} OFFSET {}",
|
||||||
|
where_clause, order_clause, limit_param, offset_param
|
||||||
|
);
|
||||||
|
|
||||||
|
// Bind helper closure — binds search + following params in order
|
||||||
|
macro_rules! bind_filter_params {
|
||||||
|
($q:expr) => {{
|
||||||
|
let mut q = $q;
|
||||||
|
if has_search {
|
||||||
|
q = q.bind(search.unwrap());
|
||||||
|
}
|
||||||
|
if let Some(f) = following {
|
||||||
|
for uid in &f.local_user_ids {
|
||||||
|
q = q.bind(uid.to_string());
|
||||||
|
}
|
||||||
|
for url in &f.remote_actor_urls {
|
||||||
|
q = q.bind(url.as_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
q
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
let count_q = bind_filter_params!(sqlx::query_scalar::<_, i64>(&count_sql));
|
||||||
|
let total = count_q
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
let rows_q = bind_filter_params!(sqlx::query_as::<_, FeedRow>(&select_sql));
|
||||||
|
let rows = rows_q
|
||||||
|
.bind(limit)
|
||||||
|
.bind(offset)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
let items = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(FeedRow::to_domain)
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
Ok(Paginated {
|
||||||
|
items,
|
||||||
|
total_count: total as u64,
|
||||||
|
limit: page.limit,
|
||||||
|
offset: page.offset,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_review_history(&self, movie_id: &MovieId) -> Result<ReviewHistory, DomainError> {
|
||||||
|
let id_str = movie_id.value().to_string();
|
||||||
|
|
||||||
|
let movie = sqlx::query_as::<_, MovieRow>(
|
||||||
|
"SELECT id, external_metadata_id, title, release_year, director, poster_path
|
||||||
|
FROM movies WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(&id_str)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.ok_or_else(|| DomainError::NotFound(format!("Movie {}", id_str)))?
|
||||||
|
.to_domain()?;
|
||||||
|
|
||||||
|
let viewings = sqlx::query_as::<_, ReviewRow>(
|
||||||
|
"SELECT id, movie_id, user_id, rating, comment,
|
||||||
|
to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
remote_actor_url
|
||||||
|
FROM reviews WHERE movie_id = $1 ORDER BY watched_at ASC",
|
||||||
|
)
|
||||||
|
.bind(&id_str)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.into_iter()
|
||||||
|
.map(ReviewRow::to_domain)
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
Ok(ReviewHistory::new(movie, viewings))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_history(&self, user_id: &UserId) -> Result<Vec<DiaryEntry>, DomainError> {
|
||||||
|
let uid = user_id.value().to_string();
|
||||||
|
let rows = sqlx::query_as::<_, DiaryRow>(
|
||||||
|
"SELECT m.id, m.external_metadata_id, m.title, m.release_year, m.director, m.poster_path,
|
||||||
|
r.id AS review_id, r.movie_id, r.user_id, r.rating, r.comment,
|
||||||
|
to_char(r.watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at,
|
||||||
|
to_char(r.created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at,
|
||||||
|
r.remote_actor_url
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1
|
||||||
|
ORDER BY r.watched_at DESC",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
rows.into_iter().map(DiaryRow::to_domain).collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl StatsRepository for PostgresRepository {
|
||||||
|
async fn get_user_stats(&self, user_id: &UserId) -> Result<UserStats, DomainError> {
|
||||||
|
let uid = user_id.value().to_string();
|
||||||
|
|
||||||
|
let (totals, fav_director, most_active) = tokio::try_join!(
|
||||||
|
self.fetch_user_totals(&uid),
|
||||||
|
self.fetch_user_favorite_director(&uid),
|
||||||
|
self.fetch_user_most_active_month(&uid)
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let most_active_month = most_active.map(|ym| format_year_month(&ym));
|
||||||
|
|
||||||
|
Ok(UserStats {
|
||||||
|
total_movies: totals.total,
|
||||||
|
avg_rating: totals.avg_rating,
|
||||||
|
favorite_director: fav_director,
|
||||||
|
most_active_month,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_trends(&self, user_id: &UserId) -> Result<UserTrends, DomainError> {
|
||||||
|
let uid = user_id.value().to_string();
|
||||||
|
|
||||||
|
let (rating_rows, director_rows) = tokio::try_join!(
|
||||||
|
sqlx::query_as::<_, MonthlyRatingRow>(
|
||||||
|
"SELECT to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') AS month,
|
||||||
|
AVG(rating::float) AS avg_rating,
|
||||||
|
COUNT(*) AS count
|
||||||
|
FROM reviews
|
||||||
|
WHERE user_id = $1 AND watched_at >= NOW() - INTERVAL '12 months'
|
||||||
|
GROUP BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM')
|
||||||
|
ORDER BY to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM') ASC"
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool),
|
||||||
|
sqlx::query_as::<_, DirectorCountRow>(
|
||||||
|
"SELECT m.director AS director, COUNT(*) AS count
|
||||||
|
FROM reviews r
|
||||||
|
INNER JOIN movies m ON m.id = r.movie_id
|
||||||
|
WHERE r.user_id = $1 AND m.director IS NOT NULL
|
||||||
|
GROUP BY m.director
|
||||||
|
ORDER BY COUNT(*) DESC
|
||||||
|
LIMIT 5"
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
)
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
|
||||||
|
let max_director_count = director_rows.iter().map(|d| d.count).max().unwrap_or(1);
|
||||||
|
|
||||||
|
let monthly_ratings = rating_rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| MonthlyRating {
|
||||||
|
month_label: format_year_month(&r.month),
|
||||||
|
year_month: r.month,
|
||||||
|
avg_rating: r.avg_rating,
|
||||||
|
count: r.count,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let top_directors = director_rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|d| DirectorStat {
|
||||||
|
director: d.director,
|
||||||
|
count: d.count,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(UserTrends {
|
||||||
|
monthly_ratings,
|
||||||
|
top_directors,
|
||||||
|
max_director_count,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
210
crates/adapters/postgres/src/models.rs
Normal file
210
crates/adapters/postgres/src/models.rs
Normal file
@@ -0,0 +1,210 @@
|
|||||||
|
use chrono::NaiveDateTime;
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
models::{DiaryEntry, FeedEntry, Movie, Review, ReviewSource, UserSummary},
|
||||||
|
value_objects::{
|
||||||
|
Comment, Email, ExternalMetadataId, MovieId, MovieTitle, PosterPath, Rating, ReleaseYear,
|
||||||
|
ReviewId, UserId,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct MovieRow {
|
||||||
|
pub id: String,
|
||||||
|
pub external_metadata_id: Option<String>,
|
||||||
|
pub title: String,
|
||||||
|
pub release_year: i64,
|
||||||
|
pub director: Option<String>,
|
||||||
|
pub poster_path: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MovieRow {
|
||||||
|
pub fn to_domain(self) -> Result<Movie, DomainError> {
|
||||||
|
let id = MovieId::from_uuid(parse_uuid(&self.id)?);
|
||||||
|
let external_metadata_id = self
|
||||||
|
.external_metadata_id
|
||||||
|
.map(ExternalMetadataId::new)
|
||||||
|
.transpose()?;
|
||||||
|
let title = MovieTitle::new(self.title)?;
|
||||||
|
let release_year = ReleaseYear::new(self.release_year as u16)?;
|
||||||
|
let poster_path = self.poster_path.map(PosterPath::new).transpose()?;
|
||||||
|
Ok(Movie::from_persistence(
|
||||||
|
id,
|
||||||
|
external_metadata_id,
|
||||||
|
title,
|
||||||
|
release_year,
|
||||||
|
self.director,
|
||||||
|
poster_path,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct ReviewRow {
|
||||||
|
pub id: String,
|
||||||
|
pub movie_id: String,
|
||||||
|
pub user_id: String,
|
||||||
|
pub rating: i64,
|
||||||
|
pub comment: Option<String>,
|
||||||
|
pub watched_at: String,
|
||||||
|
pub created_at: String,
|
||||||
|
pub remote_actor_url: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReviewRow {
|
||||||
|
pub fn to_domain(self) -> Result<Review, DomainError> {
|
||||||
|
let id = ReviewId::from_uuid(parse_uuid(&self.id)?);
|
||||||
|
let movie_id = MovieId::from_uuid(parse_uuid(&self.movie_id)?);
|
||||||
|
let user_id = UserId::from_uuid(parse_uuid(&self.user_id)?);
|
||||||
|
let rating = Rating::new(self.rating as u8)?;
|
||||||
|
let comment = self.comment.map(Comment::new).transpose()?;
|
||||||
|
let watched_at = parse_datetime(&self.watched_at)?;
|
||||||
|
let created_at = parse_datetime(&self.created_at)?;
|
||||||
|
let source = match self.remote_actor_url {
|
||||||
|
None => ReviewSource::Local,
|
||||||
|
Some(url) => ReviewSource::Remote { actor_url: url },
|
||||||
|
};
|
||||||
|
Ok(Review::from_persistence(
|
||||||
|
id, movie_id, user_id, rating, comment, watched_at, created_at, source,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct DiaryRow {
|
||||||
|
pub id: String,
|
||||||
|
pub external_metadata_id: Option<String>,
|
||||||
|
pub title: String,
|
||||||
|
pub release_year: i64,
|
||||||
|
pub director: Option<String>,
|
||||||
|
pub poster_path: Option<String>,
|
||||||
|
pub review_id: String,
|
||||||
|
pub movie_id: String,
|
||||||
|
pub user_id: String,
|
||||||
|
pub rating: i64,
|
||||||
|
pub comment: Option<String>,
|
||||||
|
pub watched_at: String,
|
||||||
|
pub created_at: String,
|
||||||
|
pub remote_actor_url: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DiaryRow {
|
||||||
|
pub fn to_domain(self) -> Result<DiaryEntry, DomainError> {
|
||||||
|
let movie = MovieRow {
|
||||||
|
id: self.id,
|
||||||
|
external_metadata_id: self.external_metadata_id,
|
||||||
|
title: self.title,
|
||||||
|
release_year: self.release_year,
|
||||||
|
director: self.director,
|
||||||
|
poster_path: self.poster_path,
|
||||||
|
}
|
||||||
|
.to_domain()?;
|
||||||
|
let review = ReviewRow {
|
||||||
|
id: self.review_id,
|
||||||
|
movie_id: self.movie_id,
|
||||||
|
user_id: self.user_id,
|
||||||
|
rating: self.rating,
|
||||||
|
comment: self.comment,
|
||||||
|
watched_at: self.watched_at,
|
||||||
|
created_at: self.created_at,
|
||||||
|
remote_actor_url: self.remote_actor_url,
|
||||||
|
}
|
||||||
|
.to_domain()?;
|
||||||
|
Ok(DiaryEntry::new(movie, review))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct FeedRow {
|
||||||
|
pub id: String,
|
||||||
|
pub external_metadata_id: Option<String>,
|
||||||
|
pub title: String,
|
||||||
|
pub release_year: i64,
|
||||||
|
pub director: Option<String>,
|
||||||
|
pub poster_path: Option<String>,
|
||||||
|
pub review_id: String,
|
||||||
|
pub movie_id: String,
|
||||||
|
pub user_id: String,
|
||||||
|
pub rating: i64,
|
||||||
|
pub comment: Option<String>,
|
||||||
|
pub watched_at: String,
|
||||||
|
pub created_at: String,
|
||||||
|
pub remote_actor_url: Option<String>,
|
||||||
|
pub user_email: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FeedRow {
|
||||||
|
pub fn to_domain(self) -> Result<FeedEntry, DomainError> {
|
||||||
|
let diary = DiaryRow {
|
||||||
|
id: self.id,
|
||||||
|
external_metadata_id: self.external_metadata_id,
|
||||||
|
title: self.title,
|
||||||
|
release_year: self.release_year,
|
||||||
|
director: self.director,
|
||||||
|
poster_path: self.poster_path,
|
||||||
|
review_id: self.review_id,
|
||||||
|
movie_id: self.movie_id,
|
||||||
|
user_id: self.user_id,
|
||||||
|
rating: self.rating,
|
||||||
|
comment: self.comment,
|
||||||
|
watched_at: self.watched_at,
|
||||||
|
created_at: self.created_at,
|
||||||
|
remote_actor_url: self.remote_actor_url,
|
||||||
|
}
|
||||||
|
.to_domain()?;
|
||||||
|
Ok(FeedEntry::new(diary, self.user_email))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct UserSummaryRow {
|
||||||
|
pub id: String,
|
||||||
|
pub email: String,
|
||||||
|
pub total_movies: i64,
|
||||||
|
pub avg_rating: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UserSummaryRow {
|
||||||
|
pub fn to_domain(self) -> Result<UserSummary, DomainError> {
|
||||||
|
Ok(UserSummary::new(
|
||||||
|
UserId::from_uuid(parse_uuid(&self.id)?),
|
||||||
|
Email::new(self.email)?,
|
||||||
|
self.total_movies,
|
||||||
|
self.avg_rating,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct UserTotalsRow {
|
||||||
|
pub total: i64,
|
||||||
|
pub avg_rating: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct DirectorCountRow {
|
||||||
|
pub director: String,
|
||||||
|
pub count: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
pub(crate) struct MonthlyRatingRow {
|
||||||
|
pub month: String,
|
||||||
|
pub avg_rating: f64,
|
||||||
|
pub count: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn parse_uuid(s: &str) -> Result<Uuid, DomainError> {
|
||||||
|
Uuid::parse_str(s)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("Invalid UUID '{}': {}", s, e)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn datetime_to_str(dt: &NaiveDateTime) -> String {
|
||||||
|
dt.format("%Y-%m-%d %H:%M:%S").to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn parse_datetime(s: &str) -> Result<NaiveDateTime, DomainError> {
|
||||||
|
NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(format!("Invalid datetime '{}': {}", s, e)))
|
||||||
|
}
|
||||||
204
crates/adapters/postgres/src/users.rs
Normal file
204
crates/adapters/postgres/src/users.rs
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::Utc;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
models::{User, UserRole},
|
||||||
|
ports::UserRepository,
|
||||||
|
value_objects::{Email, PasswordHash, UserId, Username},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::models::UserSummaryRow;
|
||||||
|
|
||||||
|
pub struct PostgresUserRepository {
|
||||||
|
pool: PgPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostgresUserRepository {
|
||||||
|
pub fn new(pool: PgPool) -> Self {
|
||||||
|
Self { pool }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_err(e: sqlx::Error) -> DomainError {
|
||||||
|
tracing::error!("Database error: {:?}", e);
|
||||||
|
DomainError::InfrastructureError("Database operation failed".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_role(s: &str) -> UserRole {
|
||||||
|
match s {
|
||||||
|
"admin" => UserRole::Admin,
|
||||||
|
_ => UserRole::Standard,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn row_to_user(
|
||||||
|
id_str: String,
|
||||||
|
email_str: String,
|
||||||
|
username_str: String,
|
||||||
|
hash_str: String,
|
||||||
|
role: UserRole,
|
||||||
|
) -> Result<User, DomainError> {
|
||||||
|
let id = uuid::Uuid::parse_str(&id_str)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
let email = Email::new(email_str)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
let username = Username::new(username_str)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
let hash = PasswordHash::new(hash_str)
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
|
Ok(User::from_persistence(
|
||||||
|
UserId::from_uuid(id),
|
||||||
|
email,
|
||||||
|
username,
|
||||||
|
hash,
|
||||||
|
role,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl UserRepository for PostgresUserRepository {
|
||||||
|
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, DomainError> {
|
||||||
|
let email_str = email.value();
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row {
|
||||||
|
id: String,
|
||||||
|
email: String,
|
||||||
|
username: String,
|
||||||
|
password_hash: String,
|
||||||
|
role: String,
|
||||||
|
}
|
||||||
|
let row = sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT id, email, username, password_hash, role FROM users WHERE email = $1",
|
||||||
|
)
|
||||||
|
.bind(email_str)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
row.map(|r| {
|
||||||
|
Self::row_to_user(
|
||||||
|
r.id,
|
||||||
|
r.email,
|
||||||
|
r.username,
|
||||||
|
r.password_hash,
|
||||||
|
Self::parse_role(&r.role),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_by_username(&self, username: &Username) -> Result<Option<User>, DomainError> {
|
||||||
|
let username_str = username.value();
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row {
|
||||||
|
id: String,
|
||||||
|
email: String,
|
||||||
|
username: String,
|
||||||
|
password_hash: String,
|
||||||
|
role: String,
|
||||||
|
}
|
||||||
|
let row = sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT id, email, username, password_hash, role FROM users WHERE username = $1",
|
||||||
|
)
|
||||||
|
.bind(username_str)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
row.map(|r| {
|
||||||
|
Self::row_to_user(
|
||||||
|
r.id,
|
||||||
|
r.email,
|
||||||
|
r.username,
|
||||||
|
r.password_hash,
|
||||||
|
Self::parse_role(&r.role),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&self, user: &User) -> Result<(), DomainError> {
|
||||||
|
if self.find_by_email(user.email()).await?.is_some() {
|
||||||
|
return Err(DomainError::ValidationError(
|
||||||
|
"Email already registered".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if self.find_by_username(user.username()).await?.is_some() {
|
||||||
|
return Err(DomainError::ValidationError(
|
||||||
|
"Username already taken".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let id = user.id().value().to_string();
|
||||||
|
let email = user.email().value();
|
||||||
|
let username = user.username().value();
|
||||||
|
let hash = user.password_hash().value();
|
||||||
|
let created_at = Utc::now()
|
||||||
|
.naive_utc()
|
||||||
|
.format("%Y-%m-%d %H:%M:%S")
|
||||||
|
.to_string();
|
||||||
|
let role = match user.role() {
|
||||||
|
UserRole::Admin => "admin",
|
||||||
|
UserRole::Standard => "standard",
|
||||||
|
};
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO users (id, email, username, password_hash, created_at, role) VALUES ($1, $2, $3, $4, $5::timestamptz, $6)",
|
||||||
|
)
|
||||||
|
.bind(&id)
|
||||||
|
.bind(email)
|
||||||
|
.bind(username)
|
||||||
|
.bind(hash)
|
||||||
|
.bind(&created_at)
|
||||||
|
.bind(role)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_by_id(&self, id: &UserId) -> Result<Option<User>, DomainError> {
|
||||||
|
let id_str = id.value().to_string();
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row {
|
||||||
|
id: String,
|
||||||
|
email: String,
|
||||||
|
username: String,
|
||||||
|
password_hash: String,
|
||||||
|
role: String,
|
||||||
|
}
|
||||||
|
let row = sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT id, email, username, password_hash, role FROM users WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(&id_str)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?;
|
||||||
|
row.map(|r| {
|
||||||
|
Self::row_to_user(
|
||||||
|
r.id,
|
||||||
|
r.email,
|
||||||
|
r.username,
|
||||||
|
r.password_hash,
|
||||||
|
Self::parse_role(&r.role),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> {
|
||||||
|
sqlx::query_as::<_, UserSummaryRow>(
|
||||||
|
r#"SELECT u.id, u.email,
|
||||||
|
COUNT(DISTINCT r.movie_id) AS total_movies,
|
||||||
|
AVG(r.rating::float) AS avg_rating
|
||||||
|
FROM users u
|
||||||
|
LEFT JOIN reviews r ON r.user_id = u.id AND r.remote_actor_url IS NULL
|
||||||
|
GROUP BY u.id, u.email
|
||||||
|
ORDER BY u.email ASC"#,
|
||||||
|
)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(Self::map_err)?
|
||||||
|
.into_iter()
|
||||||
|
.map(UserSummaryRow::to_domain)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,3 @@
|
|||||||
use std::default;
|
|
||||||
|
|
||||||
use chrono::{NaiveDateTime, Utc};
|
use chrono::{NaiveDateTime, Utc};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ poster-fetcher = { workspace = true }
|
|||||||
poster-storage = { workspace = true }
|
poster-storage = { workspace = true }
|
||||||
sqlite = { workspace = true }
|
sqlite = { workspace = true }
|
||||||
sqlite-federation = { workspace = true }
|
sqlite-federation = { workspace = true }
|
||||||
|
postgres = { workspace = true }
|
||||||
|
postgres-federation = { workspace = true }
|
||||||
activitypub = { workspace = true }
|
activitypub = { workspace = true }
|
||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
template-askama = { workspace = true }
|
template-askama = { workspace = true }
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use activitypub::{
|
|||||||
ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter,
|
||||||
ReviewObjectHandler,
|
ReviewObjectHandler,
|
||||||
};
|
};
|
||||||
|
use activitypub::FederationRepository;
|
||||||
use application::{config::AppConfig, context::AppContext};
|
use application::{config::AppConfig, context::AppContext};
|
||||||
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
use auth::{Argon2PasswordHasher, AuthConfig, JwtAuthService};
|
||||||
use export::ExportAdapter;
|
use export::ExportAdapter;
|
||||||
@@ -23,12 +24,20 @@ use poster_storage::{PosterStorageAdapter, StorageConfig};
|
|||||||
use rss::RssAdapter;
|
use rss::RssAdapter;
|
||||||
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
use sqlite::{SqliteMovieRepository, SqliteUserRepository};
|
||||||
use sqlite_federation::SqliteFederationRepository;
|
use sqlite_federation::SqliteFederationRepository;
|
||||||
|
use postgres::{PostgresRepository, PostgresUserRepository};
|
||||||
|
use postgres_federation::PostgresFederationRepository;
|
||||||
use template_askama::AskamaHtmlRenderer;
|
use template_askama::AskamaHtmlRenderer;
|
||||||
|
|
||||||
use doc::ApiDocExt;
|
use doc::ApiDocExt;
|
||||||
use presentation::{openapi::ApiDoc, routes, state::AppState};
|
use presentation::{openapi::ApiDoc, routes, state::AppState};
|
||||||
use utoipa::OpenApi as _;
|
use utoipa::OpenApi as _;
|
||||||
|
|
||||||
|
use domain::ports::{
|
||||||
|
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository,
|
||||||
|
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
||||||
|
UserRepository,
|
||||||
|
};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
dotenvy::dotenv().ok();
|
dotenvy::dotenv().ok();
|
||||||
@@ -57,34 +66,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let omdb_api_key = std::env::var("OMDB_API_KEY").context("OMDB_API_KEY must be set")?;
|
let omdb_api_key = std::env::var("OMDB_API_KEY").context("OMDB_API_KEY must be set")?;
|
||||||
|
|
||||||
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
let database_url = std::env::var("DATABASE_URL").context("DATABASE_URL must be set")?;
|
||||||
let opts = SqliteConnectOptions::from_str(&database_url)
|
let backend = std::env::var("DATABASE_BACKEND").unwrap_or_else(|_| "sqlite".to_string());
|
||||||
.context("Invalid DATABASE_URL")?
|
|
||||||
.create_if_missing(true)
|
|
||||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
|
||||||
.busy_timeout(std::time::Duration::from_secs(5));
|
|
||||||
let pool = SqlitePool::connect_with(opts)
|
|
||||||
.await
|
|
||||||
.context("Failed to connect to SQLite database")?;
|
|
||||||
|
|
||||||
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
|
||||||
sqlite_repo
|
|
||||||
.migrate()
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow::anyhow!("{}", e))
|
|
||||||
.context("Database migration failed")?;
|
|
||||||
|
|
||||||
use domain::ports::{
|
|
||||||
AuthService, DiaryExporter, DiaryRepository, MetadataClient, MovieRepository,
|
|
||||||
PasswordHasher, PosterFetcherClient, PosterStorage, ReviewRepository, StatsRepository,
|
|
||||||
UserRepository,
|
|
||||||
};
|
|
||||||
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
|
||||||
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
|
||||||
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
|
||||||
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
|
||||||
|
|
||||||
let user_repository: Arc<dyn UserRepository> =
|
|
||||||
Arc::new(SqliteUserRepository::new(pool.clone()));
|
|
||||||
let metadata_client: Arc<dyn MetadataClient> =
|
let metadata_client: Arc<dyn MetadataClient> =
|
||||||
Arc::new(MetadataClientImpl::new_omdb(omdb_api_key));
|
Arc::new(MetadataClientImpl::new_omdb(omdb_api_key));
|
||||||
let poster_fetcher: Arc<dyn PosterFetcherClient> =
|
let poster_fetcher: Arc<dyn PosterFetcherClient> =
|
||||||
@@ -94,8 +77,14 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
let auth_service: Arc<dyn AuthService> = Arc::new(JwtAuthService::new(auth_config));
|
||||||
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
let password_hasher: Arc<dyn PasswordHasher> = Arc::new(Argon2PasswordHasher);
|
||||||
|
|
||||||
// Build a context for the poster handler. sync_poster doesn't publish events,
|
let (movie_repository, review_repository, diary_repository, stats_repository,
|
||||||
// so a noop publisher here is safe and avoids a circular dependency.
|
user_repository, federation_repo_dyn, review_store, social_query) =
|
||||||
|
if backend == "postgres" {
|
||||||
|
wire_postgres(&database_url).await?
|
||||||
|
} else {
|
||||||
|
wire_sqlite(&database_url).await?
|
||||||
|
};
|
||||||
|
|
||||||
let handler_ctx = AppContext {
|
let handler_ctx = AppContext {
|
||||||
movie_repository: Arc::clone(&movie_repository),
|
movie_repository: Arc::clone(&movie_repository),
|
||||||
review_repository: Arc::clone(&review_repository),
|
review_repository: Arc::clone(&review_repository),
|
||||||
@@ -112,19 +101,16 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
config: app_config.clone(),
|
config: app_config.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Federation
|
|
||||||
let federation_repo = Arc::new(SqliteFederationRepository::new(pool));
|
|
||||||
let social_query: Arc<dyn domain::ports::SocialQueryPort> = Arc::clone(&federation_repo) as _;
|
|
||||||
let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository)));
|
let user_repo_adapter = Arc::new(DomainUserRepoAdapter(Arc::clone(&user_repository)));
|
||||||
let review_handler = Arc::new(ReviewObjectHandler {
|
let review_handler = Arc::new(ReviewObjectHandler {
|
||||||
movie_repository: Arc::clone(&movie_repository),
|
movie_repository: Arc::clone(&movie_repository),
|
||||||
diary_repository: Arc::clone(&diary_repository),
|
diary_repository: Arc::clone(&diary_repository),
|
||||||
review_store: Arc::clone(&federation_repo) as Arc<dyn activitypub::RemoteReviewRepository>,
|
review_store,
|
||||||
base_url: app_config.base_url.clone(),
|
base_url: app_config.base_url.clone(),
|
||||||
});
|
});
|
||||||
let concrete_ap_service = Arc::new(
|
let concrete_ap_service = Arc::new(
|
||||||
ActivityPubService::new(
|
ActivityPubService::new(
|
||||||
federation_repo,
|
federation_repo_dyn,
|
||||||
user_repo_adapter,
|
user_repo_adapter,
|
||||||
review_handler,
|
review_handler,
|
||||||
app_config.base_url.clone(),
|
app_config.base_url.clone(),
|
||||||
@@ -176,6 +162,78 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
Ok((state, ap_router))
|
Ok((state, ap_router))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WireResult = anyhow::Result<(
|
||||||
|
Arc<dyn MovieRepository>,
|
||||||
|
Arc<dyn ReviewRepository>,
|
||||||
|
Arc<dyn DiaryRepository>,
|
||||||
|
Arc<dyn StatsRepository>,
|
||||||
|
Arc<dyn UserRepository>,
|
||||||
|
Arc<dyn FederationRepository>,
|
||||||
|
Arc<dyn activitypub::RemoteReviewRepository>,
|
||||||
|
Arc<dyn domain::ports::SocialQueryPort>,
|
||||||
|
)>;
|
||||||
|
|
||||||
|
async fn wire_sqlite(database_url: &str) -> WireResult {
|
||||||
|
let opts = SqliteConnectOptions::from_str(database_url)
|
||||||
|
.context("Invalid DATABASE_URL")?
|
||||||
|
.create_if_missing(true)
|
||||||
|
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||||
|
.busy_timeout(std::time::Duration::from_secs(5));
|
||||||
|
let pool = SqlitePool::connect_with(opts)
|
||||||
|
.await
|
||||||
|
.context("Failed to connect to SQLite database")?;
|
||||||
|
|
||||||
|
let sqlite_repo = Arc::new(SqliteMovieRepository::new(pool.clone()));
|
||||||
|
sqlite_repo
|
||||||
|
.migrate()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||||
|
.context("Database migration failed")?;
|
||||||
|
|
||||||
|
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&sqlite_repo) as _;
|
||||||
|
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&sqlite_repo) as _;
|
||||||
|
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&sqlite_repo) as _;
|
||||||
|
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&sqlite_repo) as _;
|
||||||
|
let user_repository: Arc<dyn UserRepository> =
|
||||||
|
Arc::new(SqliteUserRepository::new(pool.clone()));
|
||||||
|
|
||||||
|
let fed = Arc::new(SqliteFederationRepository::new(pool));
|
||||||
|
let federation_repo_dyn: Arc<dyn FederationRepository> = Arc::clone(&fed) as _;
|
||||||
|
let review_store: Arc<dyn activitypub::RemoteReviewRepository> = Arc::clone(&fed) as _;
|
||||||
|
let social_query: Arc<dyn domain::ports::SocialQueryPort> = fed;
|
||||||
|
|
||||||
|
Ok((movie_repository, review_repository, diary_repository, stats_repository,
|
||||||
|
user_repository, federation_repo_dyn, review_store, social_query))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wire_postgres(database_url: &str) -> WireResult {
|
||||||
|
let pool = sqlx::PgPool::connect(database_url)
|
||||||
|
.await
|
||||||
|
.context("Failed to connect to PostgreSQL database")?;
|
||||||
|
|
||||||
|
let pg_repo = Arc::new(PostgresRepository::new(pool.clone()));
|
||||||
|
pg_repo
|
||||||
|
.migrate()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("{}", e))
|
||||||
|
.context("Database migration failed")?;
|
||||||
|
|
||||||
|
let movie_repository: Arc<dyn MovieRepository> = Arc::clone(&pg_repo) as _;
|
||||||
|
let review_repository: Arc<dyn ReviewRepository> = Arc::clone(&pg_repo) as _;
|
||||||
|
let diary_repository: Arc<dyn DiaryRepository> = Arc::clone(&pg_repo) as _;
|
||||||
|
let stats_repository: Arc<dyn StatsRepository> = Arc::clone(&pg_repo) as _;
|
||||||
|
let user_repository: Arc<dyn UserRepository> =
|
||||||
|
Arc::new(PostgresUserRepository::new(pool.clone()));
|
||||||
|
|
||||||
|
let fed = Arc::new(PostgresFederationRepository::new(pool));
|
||||||
|
let federation_repo_dyn: Arc<dyn FederationRepository> = Arc::clone(&fed) as _;
|
||||||
|
let review_store: Arc<dyn activitypub::RemoteReviewRepository> = Arc::clone(&fed) as _;
|
||||||
|
let social_query: Arc<dyn domain::ports::SocialQueryPort> = fed;
|
||||||
|
|
||||||
|
Ok((movie_repository, review_repository, diary_repository, stats_repository,
|
||||||
|
user_repository, federation_repo_dyn, review_store, social_query))
|
||||||
|
}
|
||||||
|
|
||||||
fn init_tracing() {
|
fn init_tracing() {
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(tracing_subscriber::EnvFilter::new(
|
.with(tracing_subscriber::EnvFilter::new(
|
||||||
|
|||||||
Reference in New Issue
Block a user