diff --git a/Cargo.lock b/Cargo.lock index 45d9bb0..c4a600f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,7 +12,6 @@ dependencies = [ "async-trait", "chrono", "domain", - "event-publisher", "serde", "serde_json", "tokio", @@ -3066,6 +3065,7 @@ dependencies = [ "serde", "serde_json", "sqlite", + "sqlite-federation", "sqlx", "template-askama", "thiserror 2.0.18", @@ -4019,6 +4019,20 @@ dependencies = [ [[package]] name = "sqlite" version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "domain", + "sqlx", + "tokio", + "tracing", + "uuid", +] + +[[package]] +name = "sqlite-federation" +version = "0.1.0" dependencies = [ "activitypub", "activitypub-base", @@ -4027,7 +4041,6 @@ dependencies = [ "chrono", "domain", "sqlx", - "tokio", "tracing", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 8f731d2..4e64bcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/adapters/poster-storage", "crates/adapters/rss", "crates/adapters/sqlite", + "crates/adapters/sqlite-federation", "crates/adapters/template-askama", "crates/adapters/activitypub", "crates/adapters/activitypub-base", @@ -48,6 +49,7 @@ poster-storage = { path = "crates/adapters/poster-storage" } event-publisher = { path = "crates/adapters/event-publisher" } rss = { path = "crates/adapters/rss" } sqlite = { path = "crates/adapters/sqlite" } +sqlite-federation = { path = "crates/adapters/sqlite-federation" } template-askama = { path = "crates/adapters/template-askama" } activitypub = { path = "crates/adapters/activitypub" } activitypub-base = { path = "crates/adapters/activitypub-base" } diff --git a/Dockerfile b/Dockerfile index e1dd419..4fa6a6a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml +COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml diff --git a/crates/adapters/activitypub/Cargo.toml b/crates/adapters/activitypub/Cargo.toml index b6abab3..5620fd5 100644 --- a/crates/adapters/activitypub/Cargo.toml +++ b/crates/adapters/activitypub/Cargo.toml @@ -14,7 +14,6 @@ chrono = { workspace = true } anyhow = { workspace = true } tracing = { workspace = true } async-trait = { workspace = true } -event-publisher = { workspace = true } activitypub_federation = "0.7.0-beta.11" url = { version = "2", features = ["serde"] } diff --git a/crates/adapters/activitypub/src/event_handler.rs b/crates/adapters/activitypub/src/event_handler.rs index 50a6591..0c18225 100644 --- a/crates/adapters/activitypub/src/event_handler.rs +++ b/crates/adapters/activitypub/src/event_handler.rs @@ -5,7 +5,7 @@ use domain::{ ports::MovieRepository, value_objects::{ReviewId, UserId}, }; -use event_publisher::EventHandler; +use domain::ports::EventHandler; use std::sync::Arc; use activitypub_base::ActivityPubService; diff --git a/crates/adapters/activitypub/src/lib.rs b/crates/adapters/activitypub/src/lib.rs index a276992..c33a413 100644 --- a/crates/adapters/activitypub/src/lib.rs +++ b/crates/adapters/activitypub/src/lib.rs @@ -1,5 +1,6 @@ pub mod event_handler; pub mod objects; +pub mod port; pub mod remote_review_repository; pub mod review_handler; pub mod user_adapter; @@ -12,6 +13,7 @@ pub use activitypub_base::{ }; pub use event_handler::ActivityPubEventHandler; +pub use port::{ActivityPubPort, NoopActivityPubService}; pub use remote_review_repository::RemoteReviewRepository; pub use review_handler::ReviewObjectHandler; pub use user_adapter::DomainUserRepoAdapter; diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs new file mode 100644 index 0000000..7e9484d --- /dev/null +++ b/crates/adapters/activitypub/src/port.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; +use uuid::Uuid; + +use activitypub_base::{ActivityPubService, RemoteActor}; + +#[async_trait] +pub trait ActivityPubPort: Send + Sync { + async fn actor_json(&self, user_id: &str) -> anyhow::Result; + async fn count_following(&self, local_user_id: Uuid) -> anyhow::Result; + async fn count_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result; + async fn get_pending_followers(&self, local_user_id: Uuid) -> anyhow::Result>; + async fn follow(&self, local_user_id: Uuid, handle: &str) -> anyhow::Result<()>; + async fn unfollow(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()>; + async fn accept_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()>; + async fn reject_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()>; + async fn get_following(&self, local_user_id: Uuid) -> anyhow::Result>; + async fn get_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result>; + async fn remove_follower(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()>; +} + +#[async_trait] +impl ActivityPubPort for ActivityPubService { + async fn actor_json(&self, user_id: &str) -> anyhow::Result { + self.actor_json(user_id).await + } + async fn count_following(&self, local_user_id: Uuid) -> anyhow::Result { + self.count_following(local_user_id).await + } + async fn count_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result { + self.count_accepted_followers(local_user_id).await + } + async fn get_pending_followers(&self, local_user_id: Uuid) -> anyhow::Result> { + self.get_pending_followers(local_user_id).await + } + async fn follow(&self, local_user_id: Uuid, handle: &str) -> anyhow::Result<()> { + self.follow(local_user_id, handle).await + } + async fn unfollow(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()> { + self.unfollow(local_user_id, actor_url).await + } + async fn accept_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + self.accept_follower(local_user_id, remote_actor_url).await + } + async fn reject_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()> { + self.reject_follower(local_user_id, remote_actor_url).await + } + async fn get_following(&self, local_user_id: Uuid) -> anyhow::Result> { + self.get_following(local_user_id).await + } + async fn get_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result> { + self.get_accepted_followers(local_user_id).await + } + async fn remove_follower(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()> { + self.remove_follower(local_user_id, actor_url).await + } +} + +pub struct NoopActivityPubService; + +#[async_trait] +impl ActivityPubPort for NoopActivityPubService { + async fn actor_json(&self, _: &str) -> anyhow::Result { Ok(String::new()) } + async fn count_following(&self, _: Uuid) -> anyhow::Result { Ok(0) } + async fn count_accepted_followers(&self, _: Uuid) -> anyhow::Result { Ok(0) } + async fn get_pending_followers(&self, _: Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn follow(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } + async fn unfollow(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } + async fn accept_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } + async fn reject_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } + async fn get_following(&self, _: Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn get_accepted_followers(&self, _: Uuid) -> anyhow::Result> { Ok(vec![]) } + async fn remove_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) } +} diff --git a/crates/adapters/event-publisher/src/lib.rs b/crates/adapters/event-publisher/src/lib.rs index 29b61dd..3ed2c2c 100644 --- a/crates/adapters/event-publisher/src/lib.rs +++ b/crates/adapters/event-publisher/src/lib.rs @@ -2,6 +2,8 @@ use async_trait::async_trait; use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; use tokio::sync::mpsc; +pub use domain::ports::EventHandler; + pub struct EventPublisherConfig { pub channel_buffer: usize, } @@ -16,11 +18,6 @@ impl EventPublisherConfig { } } -#[async_trait] -pub trait EventHandler: Send + Sync { - async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>; -} - pub struct ChannelEventPublisher { sender: mpsc::Sender, } diff --git a/crates/adapters/sqlite-federation/Cargo.toml b/crates/adapters/sqlite-federation/Cargo.toml new file mode 100644 index 0000000..a76f924 --- /dev/null +++ b/crates/adapters/sqlite-federation/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "sqlite-federation" +version = "0.1.0" +edition = "2024" + +[dependencies] +sqlx = { workspace = true } +activitypub = { workspace = true } +activitypub-base = { workspace = true } +domain = { workspace = true } +anyhow = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tracing = { workspace = true } +async-trait = { workspace = true } diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs new file mode 100644 index 0000000..a5d0f49 --- /dev/null +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -0,0 +1,459 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use chrono::{NaiveDateTime, Utc}; +use sqlx::{Row, SqlitePool}; + +use activitypub_base::{FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor}; +use activitypub::RemoteReviewRepository; +use domain::models::{Review, ReviewSource}; + +fn datetime_to_str(dt: &NaiveDateTime) -> String { + dt.format("%Y-%m-%d %H:%M:%S").to_string() +} + +pub struct SqliteFederationRepository { + pool: SqlitePool, +} + +impl SqliteFederationRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +fn status_to_str(status: &FollowerStatus) -> &'static str { + match status { + FollowerStatus::Pending => "pending", + FollowerStatus::Accepted => "accepted", + FollowerStatus::Rejected => "rejected", + } +} + +fn str_to_status(s: &str) -> FollowerStatus { + match s { + "accepted" => FollowerStatus::Accepted, + "rejected" => FollowerStatus::Rejected, + _ => FollowerStatus::Pending, + } +} + +#[async_trait] +impl FederationRepository for SqliteFederationRepository { + async fn add_follower( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + follow_activity_id: &str, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = status_to_str(&status); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_followers (local_user_id, remote_actor_url, status, created_at, follow_activity_id) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(local_user_id, remote_actor_url) DO UPDATE SET + status = excluded.status, + follow_activity_id = excluded.follow_activity_id", + ) + .bind(&uid) + .bind(remote_actor_url) + .bind(status_str) + .bind(&created_at) + .bind(follow_activity_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_follower_follow_activity_id( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + ) -> Result> { + let uid = local_user_id.to_string(); + let row: Option> = sqlx::query_scalar( + "SELECT follow_activity_id FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(&uid) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await?; + Ok(row.flatten()) + } + + async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?") + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + + let rows = sqlx::query( + "SELECT f.remote_actor_url, f.status, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ?", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + let followers = rows + .into_iter() + .map(|row| { + let url: String = row.get("remote_actor_url"); + let status_str: String = row.get("status"); + let handle: String = row.try_get("handle").unwrap_or_default(); + let inbox_url: String = row.try_get("inbox_url").unwrap_or_default(); + let shared_inbox_url: Option = row.try_get("shared_inbox_url").ok().flatten(); + let display_name: Option = row.try_get("display_name").ok().flatten(); + + Follower { + actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name }, + status: str_to_status(&status_str), + } + }) + .collect(); + + Ok(followers) + } + + async fn update_follower_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowerStatus, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = status_to_str(&status); + + let result = sqlx::query( + "UPDATE ap_followers SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + + if result.rows_affected() == 0 { + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found"); + } + + Ok(()) + } + + async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> { + let uid = local_user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + self.upsert_remote_actor(actor.clone()).await?; + + sqlx::query( + "INSERT OR IGNORE INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at) + VALUES (?, ?, ?, ?)", + ) + .bind(&uid) + .bind(&actor.url) + .bind(follow_activity_id) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result> { + let uid = local_user_id.to_string(); + let row: Option> = sqlx::query_scalar( + "SELECT follow_activity_id FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(&uid) + .bind(remote_actor_url) + .fetch_optional(&self.pool) + .await?; + Ok(row.flatten()) + } + + async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> { + let uid = local_user_id.to_string(); + sqlx::query("DELETE FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?") + .bind(&uid) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_following(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + + let rows = sqlx::query( + "SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_following f + INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'accepted'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) + } + + async fn count_following(&self, local_user_id: uuid::Uuid) -> Result { + let uid = local_user_id.to_string(); + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM ap_following WHERE local_user_id = ? AND status = 'accepted'", + ) + .bind(&uid) + .fetch_one(&self.pool) + .await?; + Ok(count as usize) + } + + async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> { + let now = Utc::now().naive_utc(); + let fetched_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, fetched_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(url) DO UPDATE SET + handle = excluded.handle, + inbox_url = excluded.inbox_url, + shared_inbox_url = excluded.shared_inbox_url, + display_name = excluded.display_name, + fetched_at = excluded.fetched_at", + ) + .bind(&actor.url) + .bind(&actor.handle) + .bind(&actor.inbox_url) + .bind(&actor.shared_inbox_url) + .bind(&actor.display_name) + .bind(&fetched_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_remote_actor(&self, actor_url: &str) -> Result> { + let row = sqlx::query( + "SELECT url, handle, inbox_url, shared_inbox_url, display_name + FROM ap_remote_actors WHERE url = ?", + ) + .bind(actor_url) + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|row| RemoteActor { + url: row.get("url"), + handle: row.get("handle"), + inbox_url: row.get("inbox_url"), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + })) + } + + async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result> { + let uid = user_id.to_string(); + let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?") + .bind(&uid) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|r| (r.get("public_key"), r.get("private_key")))) + } + + async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> { + let uid = user_id.to_string(); + let now = Utc::now().naive_utc(); + let created_at = datetime_to_str(&now); + + sqlx::query( + "INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + public_key = excluded.public_key, + private_key = excluded.private_key", + ) + .bind(&uid) + .bind(&public_key) + .bind(&private_key) + .bind(&created_at) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result> { + let uid = local_user_id.to_string(); + + let rows = sqlx::query( + "SELECT f.remote_actor_url, + a.handle, a.inbox_url, a.shared_inbox_url, a.display_name + FROM ap_followers f + LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url + WHERE f.local_user_id = ? AND f.status = 'pending'", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().map(|row| RemoteActor { + url: row.get("remote_actor_url"), + handle: row.try_get("handle").unwrap_or_default(), + inbox_url: row.try_get("inbox_url").unwrap_or_default(), + shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(), + display_name: row.try_get("display_name").ok().flatten(), + }).collect()) + } + + async fn update_following_status( + &self, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, + ) -> Result<()> { + let uid = local_user_id.to_string(); + let status_str = match status { + FollowingStatus::Pending => "pending", + FollowingStatus::Accepted => "accepted", + }; + + let result = sqlx::query( + "UPDATE ap_following SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?", + ) + .bind(status_str) + .bind(&uid) + .bind(remote_actor_url) + .execute(&self.pool) + .await?; + + if result.rows_affected() == 0 { + tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found"); + } + + Ok(()) + } +} + +// --- Content-specific repository (movies-diary) --- + +#[async_trait] +impl RemoteReviewRepository for SqliteFederationRepository { + async fn save_remote_review( + &self, + review: &Review, + ap_id: &str, + movie_title: &str, + release_year: u16, + poster_url: Option<&str>, + ) -> Result<()> { + let actor_url = match review.source() { + ReviewSource::Remote { actor_url } => actor_url.clone(), + ReviewSource::Local => { + return Err(anyhow!("save_remote_review called with a local review")); + } + }; + + let movie_id = review.movie_id().value().to_string(); + + let _ = sqlx::query( + "INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path) + VALUES (?, NULL, ?, ?, NULL, ?) + ON CONFLICT(id) DO UPDATE SET + poster_path = COALESCE(excluded.poster_path, movies.poster_path)", + ) + .bind(&movie_id) + .bind(movie_title) + .bind(release_year.max(1888) as i64) + .bind(poster_url) + .execute(&self.pool) + .await?; + + let id = review.id().value().to_string(); + let user_id = review.user_id().value().to_string(); + let rating = review.rating().value() as i64; + let comment = review.comment().map(|c| c.value().to_string()); + let watched_at = datetime_to_str(review.watched_at()); + let created_at = datetime_to_str(review.created_at()); + + sqlx::query( + "INSERT OR IGNORE INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url, ap_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&movie_id) + .bind(&user_id) + .bind(rating) + .bind(&comment) + .bind(&watched_at) + .bind(&created_at) + .bind(&actor_url) + .bind(ap_id) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM reviews WHERE ap_id = ? AND remote_actor_url = ?") + .bind(ap_id) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn update_remote_review( + &self, + ap_id: &str, + actor_url: &str, + rating: u8, + comment: Option<&str>, + watched_at: chrono::NaiveDateTime, + ) -> Result<()> { + let watched_at_str = datetime_to_str(&watched_at); + sqlx::query( + "UPDATE reviews SET rating = ?, comment = ?, watched_at = ? + WHERE ap_id = ? AND remote_actor_url = ?", + ) + .bind(rating as i64) + .bind(comment) + .bind(&watched_at_str) + .bind(ap_id) + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn delete_by_actor(&self, actor_url: &str) -> Result<()> { + sqlx::query("DELETE FROM reviews WHERE remote_actor_url = ?") + .bind(actor_url) + .execute(&self.pool) + .await?; + Ok(()) + } +} diff --git a/crates/adapters/sqlite/Cargo.toml b/crates/adapters/sqlite/Cargo.toml index ff4b12f..d2d8825 100644 --- a/crates/adapters/sqlite/Cargo.toml +++ b/crates/adapters/sqlite/Cargo.toml @@ -12,8 +12,6 @@ sqlx = { version = "0.8.6", features = [ ] } domain = { workspace = true } -activitypub = { workspace = true } -activitypub-base = { workspace = true } anyhow = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 90c4d5a..a7ecc77 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -12,7 +12,6 @@ use domain::{ }; use sqlx::SqlitePool; -mod federation; mod migrations; mod models; mod users; @@ -22,7 +21,6 @@ use models::{ UserTotalsRow, datetime_to_str, }; -pub use federation::SqliteFederationRepository; pub use users::SqliteUserRepository; fn format_year_month(ym: &str) -> String { diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 631f423..566f4e2 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -127,3 +127,8 @@ pub trait PasswordHasher: Send + Sync { pub trait DiaryExporter: Send + Sync { async fn serialize_reviews(&self, reviews: &[Review]) -> Result, DomainError>; } + +#[async_trait] +pub trait EventHandler: Send + Sync { + async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>; +} diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index 5dca7de..d23313d 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -26,6 +26,7 @@ metadata = { workspace = true } poster-fetcher = { workspace = true } poster-storage = { workspace = true } sqlite = { workspace = true } +sqlite-federation = { workspace = true } activitypub = { workspace = true } sqlx = { workspace = true } template-askama = { workspace = true } diff --git a/crates/presentation/src/event_handlers.rs b/crates/presentation/src/event_handlers.rs index 25baed3..a5d8249 100644 --- a/crates/presentation/src/event_handlers.rs +++ b/crates/presentation/src/event_handlers.rs @@ -3,7 +3,7 @@ use std::time::Duration; use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster}; use async_trait::async_trait; use domain::{errors::DomainError, events::DomainEvent}; -use event_publisher::EventHandler; +use domain::ports::EventHandler; pub struct PosterSyncHandler { ctx: AppContext, diff --git a/crates/presentation/src/extractors.rs b/crates/presentation/src/extractors.rs index 9b82a23..ecd0313 100644 --- a/crates/presentation/src/extractors.rs +++ b/crates/presentation/src/extractors.rs @@ -181,7 +181,7 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer), rss_renderer: Arc::new(PanicRssRenderer), - ap_service: test_ap_service().await, + ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService), }; let app = test_router(state); @@ -231,49 +231,6 @@ mod tests { } } - async fn test_ap_service() -> std::sync::Arc { - use std::sync::Arc; - let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))") - .execute(&pool).await.unwrap(); - let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool)); - - struct DummyApUserRepo; - #[async_trait::async_trait] - impl activitypub::ApUserRepository for DummyApUserRepo { - async fn find_by_id(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(None) } - async fn find_by_username(&self, _: &str) -> anyhow::Result> { Ok(None) } - } - - struct DummyObjectHandler; - #[async_trait::async_trait] - impl activitypub::ApObjectHandler for DummyObjectHandler { - async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result> { Ok(vec![]) } - async fn on_create(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } - async fn on_update(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) } - async fn on_delete(&self, _: &url::Url, _: &url::Url) -> anyhow::Result<()> { Ok(()) } - async fn on_actor_removed(&self, _: &url::Url) -> anyhow::Result<()> { Ok(()) } - } - - Arc::new( - activitypub::ActivityPubService::new( - fed_repo, - Arc::new(DummyApUserRepo), - Arc::new(DummyObjectHandler), - "http://localhost:3000".to_string(), - true, - ) - .await - .unwrap(), - ) - } - async fn panic_state() -> crate::state::AppState { use std::sync::Arc; use application::context::AppContext; @@ -334,7 +291,7 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer2), rss_renderer: Arc::new(PanicRssRenderer2), - ap_service: test_ap_service().await, + ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService), } } @@ -396,7 +353,7 @@ mod tests { }, html_renderer: Arc::new(PanicRenderer3), rss_renderer: Arc::new(PanicRssRenderer3), - ap_service: test_ap_service().await, + ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService), } } diff --git a/crates/presentation/src/handlers.rs b/crates/presentation/src/handlers.rs index 846e245..e64b2eb 100644 --- a/crates/presentation/src/handlers.rs +++ b/crates/presentation/src/handlers.rs @@ -5,23 +5,29 @@ pub mod html { use std::str::FromStr; use axum::{ + Form, extract::{Path, Query, State}, http::{HeaderValue, StatusCode, header::SET_COOKIE}, response::{Html, IntoResponse, Redirect}, - Form, }; use chrono::Utc; use uuid::Uuid; use application::{ commands::{DeleteReviewCommand, LoginCommand, RegisterCommand}, - ports::{FollowersPageData, FollowingPageData, HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData, RemoteActorView}, + ports::{ + FollowersPageData, FollowingPageData, HtmlPageContext, LoginPageData, + NewReviewPageData, RegisterPageData, RemoteActorView, + }, use_cases::{delete_review, log_review, login as login_uc, register as register_uc}, }; use domain::{errors::DomainError, value_objects::UserId}; use crate::{ - dtos::{DiaryQueryParams, ErrorQuery, FollowForm, FollowerActionForm, LoginForm, LogReviewData, LogReviewForm, RegisterForm, UnfollowForm}, + dtos::{ + DiaryQueryParams, ErrorQuery, FollowForm, FollowerActionForm, LogReviewData, + LogReviewForm, LoginForm, RegisterForm, UnfollowForm, + }, extractors::{OptionalCookieUser, RequiredCookieUser}, state::AppState, }; @@ -56,15 +62,24 @@ pub mod html { } fn secure_flag() -> &'static str { - if std::env::var("SECURE_COOKIES").as_deref() == Ok("true") { "; Secure" } else { "" } + if std::env::var("SECURE_COOKIES").as_deref() == Ok("true") { + "; Secure" + } else { + "" + } } fn set_cookie_header(token: &str, max_age: i64) -> (axum::http::HeaderName, HeaderValue) { let val = format!( "token={}; HttpOnly; Path=/; SameSite=Strict; Max-Age={}{}", - token, max_age, secure_flag() + token, + max_age, + secure_flag() ); - (SET_COOKIE, HeaderValue::from_str(&val).expect("valid cookie")) + ( + SET_COOKIE, + HeaderValue::from_str(&val).expect("valid cookie"), + ) } pub async fn get_login_page( @@ -112,8 +127,14 @@ pub mod html { } pub async fn get_logout() -> impl IntoResponse { - let val = format!("token=; HttpOnly; Path=/; SameSite=Strict; Max-Age=0{}", secure_flag()); - let cookie = (SET_COOKIE, HeaderValue::from_str(&val).expect("valid cookie")); + let val = format!( + "token=; HttpOnly; Path=/; SameSite=Strict; Max-Age=0{}", + secure_flag() + ); + let cookie = ( + SET_COOKIE, + HeaderValue::from_str(&val).expect("valid cookie"), + ); ([cookie], Redirect::to("/")).into_response() } @@ -171,9 +192,8 @@ pub mod html { Err(_) => Redirect::to("/login").into_response(), } } - Err(_) => { - Redirect::to("/register?error=Registration+failed.+Please+try+again.").into_response() - } + Err(_) => Redirect::to("/register?error=Registration+failed.+Please+try+again.") + .into_response(), } } @@ -203,7 +223,7 @@ pub mod html { let data = match LogReviewData::try_from(form) { Ok(d) => d, Err(_) => { - return Redirect::to("/reviews/new?error=Invalid+date+format").into_response() + return Redirect::to("/reviews/new?error=Invalid+date+format").into_response(); } }; @@ -230,7 +250,9 @@ pub mod html { Ok(()) => { let redirect_url = form .redirect_after - .filter(|url| (url.starts_with('/') && !url.starts_with("//")) || url.starts_with('?')) + .filter(|url| { + (url.starts_with('/') && !url.starts_with("//")) || url.starts_with('?') + }) .unwrap_or_else(|| "/".to_string()); Redirect::to(&redirect_url).into_response() } @@ -281,7 +303,12 @@ pub mod html { let mut ctx = build_page_context(&state, user_id).await; ctx.page_title = "Members — Movies Diary".to_string(); ctx.canonical_url = format!("{}/users", state.app_ctx.config.base_url); - match application::use_cases::get_users::execute(&state.app_ctx, application::queries::GetUsersQuery).await { + match application::use_cases::get_users::execute( + &state.app_ctx, + application::queries::GetUsersQuery, + ) + .await + { Ok(users) => { let data = application::ports::UsersPageData { ctx, users }; match state.html_renderer.render_users_page(data) { @@ -301,15 +328,24 @@ pub mod html { Query(params): Query, ) -> impl IntoResponse { // Content negotiation: AP clients request application/activity+json - let accept = headers.get(axum::http::header::ACCEPT) + let accept = headers + .get(axum::http::header::ACCEPT) .and_then(|v| v.to_str().ok()) .unwrap_or(""); if accept.contains("application/activity+json") || accept.contains("application/ld+json") { - return match state.ap_service.actor_json(&profile_user_uuid.to_string()).await { + return match state + .ap_service + .actor_json(&profile_user_uuid.to_string()) + .await + { Ok(json) => ( - [(axum::http::header::CONTENT_TYPE, "application/activity+json")], + [( + axum::http::header::CONTENT_TYPE, + "application/activity+json", + )], json, - ).into_response(), + ) + .into_response(), Err(_) => StatusCode::NOT_FOUND.into_response(), }; } @@ -318,10 +354,18 @@ pub mod html { let view_str = params.view.as_deref().unwrap_or("recent"); let profile_view = match application::queries::ProfileView::from_str(view_str) { Ok(v) => v, - Err(_) => return (axum::http::StatusCode::BAD_REQUEST, "invalid view parameter").into_response(), + Err(_) => { + return ( + axum::http::StatusCode::BAD_REQUEST, + "invalid view parameter", + ) + .into_response(); + } }; - let profile_user = match state.app_ctx.user_repository + let profile_user = match state + .app_ctx + .user_repository .find_by_id(&domain::value_objects::UserId::from_uuid(profile_user_uuid)) .await { @@ -332,15 +376,23 @@ pub mod html { let display_name = profile_user.username().value(); ctx.page_title = format!("{}'s Diary — Movies Diary", display_name); - ctx.canonical_url = format!("{}/users/{}", state.app_ctx.config.base_url, profile_user_uuid); + ctx.canonical_url = format!( + "{}/users/{}", + state.app_ctx.config.base_url, profile_user_uuid + ); - let is_own_profile = user_id.as_ref() + let is_own_profile = user_id + .as_ref() .map(|u| u.value() == profile_user_uuid) .unwrap_or(false); let following_count = if is_own_profile { if let Some(ref uid) = user_id { - state.ap_service.count_following(uid.value()).await.unwrap_or(0) + state + .ap_service + .count_following(uid.value()) + .await + .unwrap_or(0) } else { 0 } @@ -349,7 +401,8 @@ pub mod html { }; let followers_count = if is_own_profile { - state.ap_service + state + .ap_service .count_accepted_followers(profile_user_uuid) .await .unwrap_or(0) @@ -358,7 +411,8 @@ pub mod html { }; let pending_followers = if is_own_profile { - state.ap_service + state + .ap_service .get_pending_followers(profile_user_uuid) .await .unwrap_or_default() @@ -382,9 +436,12 @@ pub mod html { match application::use_cases::get_user_profile::execute(&state.app_ctx, query).await { Ok(profile) => { - let (offset, has_more, limit) = profile.entries.as_ref() + let (offset, has_more, limit) = profile + .entries + .as_ref() .map(|e| { - let has_more = (e.offset as u64).saturating_add(e.limit as u64) < e.total_count; + let has_more = + (e.offset as u64).saturating_add(e.limit as u64) < e.total_count; (e.offset, has_more, e.limit) }) .unwrap_or((0, false, super::DEFAULT_PAGE_LIMIT)); @@ -444,11 +501,20 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.unfollow(user_id.value(), &form.actor_url).await { - Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)).into_response(), + match state + .ap_service + .unfollow(user_id.value(), &form.actor_url) + .await + { + Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)) + .into_response(), Err(e) => { let msg = encode_error(&e.to_string()); - Redirect::to(&format!("/users/{}/following-list?error={}", profile_user_uuid, msg)).into_response() + Redirect::to(&format!( + "/users/{}/following-list?error={}", + profile_user_uuid, msg + )) + .into_response() } } } @@ -462,7 +528,11 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.accept_follower(user_id.value(), &form.actor_url).await { + match state + .ap_service + .accept_follower(user_id.value(), &form.actor_url) + .await + { Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -480,7 +550,11 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.reject_follower(user_id.value(), &form.actor_url).await { + match state + .ap_service + .reject_follower(user_id.value(), &form.actor_url) + .await + { Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Err(e) => { let msg = encode_error(&e.to_string()); @@ -500,14 +574,20 @@ pub mod html { } let mut ctx = build_page_context(&state, Some(user_id.clone())).await; ctx.page_title = "Following — Movies Diary".to_string(); - ctx.canonical_url = format!("{}/users/{}/following-list", state.app_ctx.config.base_url, profile_user_uuid); + ctx.canonical_url = format!( + "{}/users/{}/following-list", + state.app_ctx.config.base_url, profile_user_uuid + ); match state.ap_service.get_following(user_id.value()).await { Ok(following) => { - let actors = following.into_iter().map(|a| RemoteActorView { - handle: a.handle, - display_name: a.display_name, - url: a.url, - }).collect(); + let actors = following + .into_iter() + .map(|a| RemoteActorView { + handle: a.handle, + display_name: a.display_name, + url: a.url, + }) + .collect(); let data = FollowingPageData { ctx, user_id: profile_user_uuid, @@ -521,7 +601,11 @@ pub mod html { } Err(e) => { tracing::error!("get_following error: {:?}", e); - (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load following list").into_response() + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to load following list", + ) + .into_response() } } } @@ -537,14 +621,24 @@ pub mod html { } let mut ctx = build_page_context(&state, Some(user_id.clone())).await; ctx.page_title = "Followers — Movies Diary".to_string(); - ctx.canonical_url = format!("{}/users/{}/followers-list", state.app_ctx.config.base_url, profile_user_uuid); - match state.ap_service.get_accepted_followers(user_id.value()).await { + ctx.canonical_url = format!( + "{}/users/{}/followers-list", + state.app_ctx.config.base_url, profile_user_uuid + ); + match state + .ap_service + .get_accepted_followers(user_id.value()) + .await + { Ok(followers) => { - let actors = followers.into_iter().map(|a| RemoteActorView { - handle: a.handle, - display_name: a.display_name, - url: a.url, - }).collect(); + let actors = followers + .into_iter() + .map(|a| RemoteActorView { + handle: a.handle, + display_name: a.display_name, + url: a.url, + }) + .collect(); let data = FollowersPageData { ctx, user_id: profile_user_uuid, @@ -558,7 +652,11 @@ pub mod html { } Err(e) => { tracing::error!("get_followers error: {:?}", e); - (StatusCode::INTERNAL_SERVER_ERROR, "Failed to load followers list").into_response() + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to load followers list", + ) + .into_response() } } } @@ -572,11 +670,20 @@ pub mod html { if user_id.value() != profile_user_uuid { return StatusCode::FORBIDDEN.into_response(); } - match state.ap_service.remove_follower(user_id.value(), &form.actor_url).await { - Ok(_) => Redirect::to(&format!("/users/{}/followers-list", profile_user_uuid)).into_response(), + match state + .ap_service + .remove_follower(user_id.value(), &form.actor_url) + .await + { + Ok(_) => Redirect::to(&format!("/users/{}/followers-list", profile_user_uuid)) + .into_response(), Err(e) => { let msg = encode_error(&e.to_string()); - Redirect::to(&format!("/users/{}/followers-list?error={}", profile_user_uuid, msg)).into_response() + Redirect::to(&format!( + "/users/{}/followers-list?error={}", + profile_user_uuid, msg + )) + .into_response() } } } @@ -644,7 +751,10 @@ pub mod rss { .rss_renderer .render_feed(&page.items, "Movie Diary") .map_err(|e| ApiError(DomainError::InfrastructureError(e)))?; - Ok(([(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], xml)) + Ok(( + [(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], + xml, + )) } pub async fn get_user_feed( @@ -676,7 +786,10 @@ pub mod rss { .render_feed(&page.items, &title) .map_err(|e| ApiError(DomainError::InfrastructureError(e)))?; - Ok(([(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], xml)) + Ok(( + [(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], + xml, + )) } } @@ -692,7 +805,10 @@ pub mod api { use application::{ commands::{DeleteReviewCommand, LoginCommand, RegisterCommand, SyncPosterCommand}, queries::GetReviewHistoryQuery, - use_cases::{delete_review, get_diary, get_review_history, log_review, login as login_uc, register as register_uc, sync_poster}, + use_cases::{ + delete_review, get_diary, get_review_history, log_review, login as login_uc, + register as register_uc, sync_poster, + }, }; use domain::{ errors::DomainError, @@ -703,8 +819,8 @@ pub mod api { use crate::{ dtos::{ - DiaryEntryDto, DiaryQueryParams, DiaryResponse, LoginRequest, LoginResponse, - LogReviewData, LogReviewRequest, MovieDto, RegisterRequest, ReviewDto, + DiaryEntryDto, DiaryQueryParams, DiaryResponse, LogReviewData, LogReviewRequest, + LoginRequest, LoginResponse, MovieDto, RegisterRequest, ReviewDto, ReviewHistoryResponse, }, errors::ApiError, @@ -730,11 +846,8 @@ pub mod api { State(state): State, Path(movie_id): Path, ) -> Result, ApiError> { - let (history, trend) = get_review_history::execute( - &state.app_ctx, - GetReviewHistoryQuery { movie_id }, - ) - .await?; + let (history, trend) = + get_review_history::execute(&state.app_ctx, GetReviewHistoryQuery { movie_id }).await?; Ok(Json(ReviewHistoryResponse { movie: movie_to_dto(history.movie()), @@ -796,10 +909,13 @@ pub mod api { State(state): State, Json(req): Json, ) -> Result, ApiError> { - let result = login_uc::execute(&state.app_ctx, LoginCommand { - email: req.email, - password: req.password, - }) + let result = login_uc::execute( + &state.app_ctx, + LoginCommand { + email: req.email, + password: req.password, + }, + ) .await?; Ok(Json(LoginResponse { token: result.token, @@ -813,11 +929,14 @@ pub mod api { State(state): State, Json(req): Json, ) -> Result { - register_uc::execute(&state.app_ctx, RegisterCommand { - email: req.email, - username: req.username, - password: req.password, - }) + register_uc::execute( + &state.app_ctx, + RegisterCommand { + email: req.email, + username: req.username, + password: req.password, + }, + ) .await?; Ok(StatusCode::CREATED) } diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index 488c194..94ef15c 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -15,8 +15,9 @@ use auth::{AuthConfig, Argon2PasswordHasher, JwtAuthService}; use metadata::MetadataClientImpl; use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; use poster_storage::{PosterStorageAdapter, StorageConfig}; -use activitypub::{ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler}; -use sqlite::{SqliteFederationRepository, SqliteMovieRepository, SqliteUserRepository}; +use activitypub::{ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler}; +use sqlite::{SqliteMovieRepository, SqliteUserRepository}; +use sqlite_federation::SqliteFederationRepository; use rss::RssAdapter; use template_askama::AskamaHtmlRenderer; @@ -27,11 +28,11 @@ async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); init_tracing(); - let state = wire_dependencies() + let (state, ap_router) = wire_dependencies() .await .context("Failed to wire dependencies")?; - let app = routes::build_router(state); + let app = routes::build_router(state, ap_router); let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = std::env::var("PORT").unwrap_or_else(|_| "3000".to_string()); @@ -43,7 +44,7 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn wire_dependencies() -> anyhow::Result { +async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let auth_config = AuthConfig::from_env()?; let storage_config = StorageConfig::from_env()?; let app_config = AppConfig::from_env(); @@ -100,7 +101,7 @@ async fn wire_dependencies() -> anyhow::Result { review_store: Arc::clone(&federation_repo) as Arc, base_url: app_config.base_url.clone(), }); - let ap_service = Arc::new( + let concrete_ap_service = Arc::new( ActivityPubService::new( federation_repo, user_repo_adapter, @@ -110,11 +111,13 @@ async fn wire_dependencies() -> anyhow::Result { ) .await?, ); + let ap_router = concrete_ap_service.router(); let ap_event_handler = ActivityPubEventHandler::new( - Arc::clone(&ap_service), + Arc::clone(&concrete_ap_service), Arc::clone(&repository), app_config.base_url.clone(), ); + let ap_service: Arc = concrete_ap_service; let poster_handler = PosterSyncHandler::new(handler_ctx, 3); let (event_publisher, event_worker) = create_event_channel( @@ -135,14 +138,15 @@ async fn wire_dependencies() -> anyhow::Result { config: app_config, }; - Ok(AppState { + let state = AppState { app_ctx, html_renderer: Arc::new(AskamaHtmlRenderer::new()), rss_renderer: Arc::new(RssAdapter::new( std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost:3000".into()), )), ap_service, - }) + }; + Ok((state, ap_router)) } fn init_tracing() { diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index ef87f1c..9426705 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -9,21 +9,6 @@ use tower_http::{services::ServeDir, trace::TraceLayer}; use crate::{handlers, state::AppState}; -/// Build an ActivityPub router from the service, excluding routes that -/// conflict with HTML routes (/users/{id} and /users/{id}/following). -/// Those AP endpoints are still served via the federation middleware layer -/// applied to the whole AP router scope; the conflicting paths will need -/// content-negotiation wrappers added in Phase 5. -fn ap_routes(state: &AppState) -> Router { - let config = state.ap_service.federation_config(); - Router::new() - .route("/.well-known/webfinger", routing::get(activitypub::webfinger::webfinger_handler)) - .route("/users/{user_id}/inbox", routing::post(activitypub::inbox::inbox_handler)) - .route("/users/{user_id}/outbox", routing::get(activitypub::outbox::outbox_handler)) - .route("/users/{user_id}/followers", routing::get(activitypub::followers_handler::followers_handler)) - .layer(config.middleware()) -} - /// Simple global rate limiter: tracks request count per 60-second window. /// Not per-IP — suitable for a low-traffic personal app. #[derive(Clone)] @@ -60,9 +45,8 @@ impl RateLimiter { } } -pub fn build_router(state: AppState) -> Router { +pub fn build_router(state: AppState, ap_router: Router) -> Router { let rate_limit = state.app_ctx.config.rate_limit; - let ap_router = ap_routes(&state); Router::new() .merge(html_routes(rate_limit)) .merge(api_routes(rate_limit)) diff --git a/crates/presentation/src/state.rs b/crates/presentation/src/state.rs index c535870..36aa138 100644 --- a/crates/presentation/src/state.rs +++ b/crates/presentation/src/state.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use activitypub::ActivityPubService; +use activitypub::ActivityPubPort; use application::context::AppContext; use crate::ports::{HtmlRenderer, RssFeedRenderer}; @@ -10,5 +10,5 @@ pub struct AppState { pub app_ctx: AppContext, pub html_renderer: Arc, pub rss_renderer: Arc, - pub ap_service: Arc, + pub ap_service: Arc, } diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 4ede9e1..b95b75a 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -89,35 +89,6 @@ impl UserRepository for NobodyUserRepo { async fn list_with_stats(&self) -> Result, DomainError> { panic!() } } -async fn test_ap_service() -> Arc { - let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))") - .execute(&pool).await.unwrap(); - sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))") - .execute(&pool).await.unwrap(); - let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool)); - struct DummyUserRepo; - #[async_trait] - impl UserRepository for DummyUserRepo { - async fn find_by_email(&self, _: &Email) -> Result, DomainError> { Ok(None) } - async fn find_by_username(&self, _: &domain::value_objects::Username) -> Result, DomainError> { Ok(None) } - async fn save(&self, _: &User) -> Result<(), DomainError> { Ok(()) } - async fn find_by_id(&self, _: &UserId) -> Result, DomainError> { Ok(None) } - async fn list_with_stats(&self) -> Result, DomainError> { Ok(vec![]) } - } - let movie_pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - let movie_repo = Arc::new(sqlite::SqliteMovieRepository::new(movie_pool)); - Arc::new( - activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), movie_repo, "http://localhost:3000".to_string(), true) - .await - .unwrap(), - ) -} - async fn test_app() -> Router { let pool = SqlitePool::connect("sqlite::memory:") .await @@ -139,10 +110,10 @@ async fn test_app() -> Router { }, html_renderer: Arc::new(AskamaHtmlRenderer::new()), rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())), - ap_service: test_ap_service().await, + ap_service: Arc::new(activitypub::NoopActivityPubService), }; - routes::build_router(state) + routes::build_router(state, axum::Router::new()) } #[tokio::test]