Refactor ActivityPub integration and add SQLite federation support

- Removed event-publisher dependency from Cargo.lock and Cargo.toml.
- Introduced sqlite-federation crate with necessary dependencies and implementation.
- Updated activitypub crate to use new ActivityPubPort trait for better abstraction.
- Refactored event handling to utilize domain ports instead of direct dependencies.
- Adjusted presentation layer to accommodate new ActivityPub service structure.
- Removed unused test setup for ActivityPub service in favor of NoopActivityPubService.
- Cleaned up SQLite adapter to remove unnecessary dependencies and streamline functionality.
This commit is contained in:
2026-05-09 18:21:16 +02:00
parent 7a43eb4de6
commit 2120044f1a
21 changed files with 786 additions and 188 deletions

17
Cargo.lock generated
View File

@@ -12,7 +12,6 @@ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"domain", "domain",
"event-publisher",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@@ -3066,6 +3065,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sqlite", "sqlite",
"sqlite-federation",
"sqlx", "sqlx",
"template-askama", "template-askama",
"thiserror 2.0.18", "thiserror 2.0.18",
@@ -4019,6 +4019,20 @@ dependencies = [
[[package]] [[package]]
name = "sqlite" name = "sqlite"
version = "0.1.0" version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"domain",
"sqlx",
"tokio",
"tracing",
"uuid",
]
[[package]]
name = "sqlite-federation"
version = "0.1.0"
dependencies = [ dependencies = [
"activitypub", "activitypub",
"activitypub-base", "activitypub-base",
@@ -4027,7 +4041,6 @@ dependencies = [
"chrono", "chrono",
"domain", "domain",
"sqlx", "sqlx",
"tokio",
"tracing", "tracing",
"uuid", "uuid",
] ]

View File

@@ -7,6 +7,7 @@ members = [
"crates/adapters/poster-storage", "crates/adapters/poster-storage",
"crates/adapters/rss", "crates/adapters/rss",
"crates/adapters/sqlite", "crates/adapters/sqlite",
"crates/adapters/sqlite-federation",
"crates/adapters/template-askama", "crates/adapters/template-askama",
"crates/adapters/activitypub", "crates/adapters/activitypub",
"crates/adapters/activitypub-base", "crates/adapters/activitypub-base",
@@ -48,6 +49,7 @@ poster-storage = { path = "crates/adapters/poster-storage" }
event-publisher = { path = "crates/adapters/event-publisher" } event-publisher = { path = "crates/adapters/event-publisher" }
rss = { path = "crates/adapters/rss" } rss = { path = "crates/adapters/rss" }
sqlite = { path = "crates/adapters/sqlite" } sqlite = { path = "crates/adapters/sqlite" }
sqlite-federation = { path = "crates/adapters/sqlite-federation" }
template-askama = { path = "crates/adapters/template-askama" } template-askama = { path = "crates/adapters/template-askama" }
activitypub = { path = "crates/adapters/activitypub" } activitypub = { path = "crates/adapters/activitypub" }
activitypub-base = { path = "crates/adapters/activitypub-base" } activitypub-base = { path = "crates/adapters/activitypub-base" }

View File

@@ -16,6 +16,7 @@ COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher
COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml
COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml
COPY crates/application/Cargo.toml crates/application/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml
COPY crates/domain/Cargo.toml crates/domain/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml

View File

@@ -14,7 +14,6 @@ chrono = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
event-publisher = { workspace = true }
activitypub_federation = "0.7.0-beta.11" activitypub_federation = "0.7.0-beta.11"
url = { version = "2", features = ["serde"] } url = { version = "2", features = ["serde"] }

View File

@@ -5,7 +5,7 @@ use domain::{
ports::MovieRepository, ports::MovieRepository,
value_objects::{ReviewId, UserId}, value_objects::{ReviewId, UserId},
}; };
use event_publisher::EventHandler; use domain::ports::EventHandler;
use std::sync::Arc; use std::sync::Arc;
use activitypub_base::ActivityPubService; use activitypub_base::ActivityPubService;

View File

@@ -1,5 +1,6 @@
pub mod event_handler; pub mod event_handler;
pub mod objects; pub mod objects;
pub mod port;
pub mod remote_review_repository; pub mod remote_review_repository;
pub mod review_handler; pub mod review_handler;
pub mod user_adapter; pub mod user_adapter;
@@ -12,6 +13,7 @@ pub use activitypub_base::{
}; };
pub use event_handler::ActivityPubEventHandler; pub use event_handler::ActivityPubEventHandler;
pub use port::{ActivityPubPort, NoopActivityPubService};
pub use remote_review_repository::RemoteReviewRepository; pub use remote_review_repository::RemoteReviewRepository;
pub use review_handler::ReviewObjectHandler; pub use review_handler::ReviewObjectHandler;
pub use user_adapter::DomainUserRepoAdapter; pub use user_adapter::DomainUserRepoAdapter;

View File

@@ -0,0 +1,73 @@
use async_trait::async_trait;
use uuid::Uuid;
use activitypub_base::{ActivityPubService, RemoteActor};
#[async_trait]
pub trait ActivityPubPort: Send + Sync {
async fn actor_json(&self, user_id: &str) -> anyhow::Result<String>;
async fn count_following(&self, local_user_id: Uuid) -> anyhow::Result<usize>;
async fn count_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result<usize>;
async fn get_pending_followers(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>>;
async fn follow(&self, local_user_id: Uuid, handle: &str) -> anyhow::Result<()>;
async fn unfollow(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()>;
async fn accept_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()>;
async fn reject_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()>;
async fn get_following(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>>;
async fn get_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>>;
async fn remove_follower(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()>;
}
#[async_trait]
impl ActivityPubPort for ActivityPubService {
async fn actor_json(&self, user_id: &str) -> anyhow::Result<String> {
self.actor_json(user_id).await
}
async fn count_following(&self, local_user_id: Uuid) -> anyhow::Result<usize> {
self.count_following(local_user_id).await
}
async fn count_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result<usize> {
self.count_accepted_followers(local_user_id).await
}
async fn get_pending_followers(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>> {
self.get_pending_followers(local_user_id).await
}
async fn follow(&self, local_user_id: Uuid, handle: &str) -> anyhow::Result<()> {
self.follow(local_user_id, handle).await
}
async fn unfollow(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()> {
self.unfollow(local_user_id, actor_url).await
}
async fn accept_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()> {
self.accept_follower(local_user_id, remote_actor_url).await
}
async fn reject_follower(&self, local_user_id: Uuid, remote_actor_url: &str) -> anyhow::Result<()> {
self.reject_follower(local_user_id, remote_actor_url).await
}
async fn get_following(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>> {
self.get_following(local_user_id).await
}
async fn get_accepted_followers(&self, local_user_id: Uuid) -> anyhow::Result<Vec<RemoteActor>> {
self.get_accepted_followers(local_user_id).await
}
async fn remove_follower(&self, local_user_id: Uuid, actor_url: &str) -> anyhow::Result<()> {
self.remove_follower(local_user_id, actor_url).await
}
}
pub struct NoopActivityPubService;
#[async_trait]
impl ActivityPubPort for NoopActivityPubService {
async fn actor_json(&self, _: &str) -> anyhow::Result<String> { Ok(String::new()) }
async fn count_following(&self, _: Uuid) -> anyhow::Result<usize> { Ok(0) }
async fn count_accepted_followers(&self, _: Uuid) -> anyhow::Result<usize> { Ok(0) }
async fn get_pending_followers(&self, _: Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn follow(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn unfollow(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn accept_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn reject_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
async fn get_following(&self, _: Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn get_accepted_followers(&self, _: Uuid) -> anyhow::Result<Vec<RemoteActor>> { Ok(vec![]) }
async fn remove_follower(&self, _: Uuid, _: &str) -> anyhow::Result<()> { Ok(()) }
}

View File

@@ -2,6 +2,8 @@ use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher}; use domain::{errors::DomainError, events::DomainEvent, ports::EventPublisher};
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub use domain::ports::EventHandler;
pub struct EventPublisherConfig { pub struct EventPublisherConfig {
pub channel_buffer: usize, pub channel_buffer: usize,
} }
@@ -16,11 +18,6 @@ impl EventPublisherConfig {
} }
} }
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>;
}
pub struct ChannelEventPublisher { pub struct ChannelEventPublisher {
sender: mpsc::Sender<DomainEvent>, sender: mpsc::Sender<DomainEvent>,
} }

View File

@@ -0,0 +1,15 @@
[package]
name = "sqlite-federation"
version = "0.1.0"
edition = "2024"
[dependencies]
sqlx = { workspace = true }
activitypub = { workspace = true }
activitypub-base = { workspace = true }
domain = { workspace = true }
anyhow = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
tracing = { workspace = true }
async-trait = { workspace = true }

View File

@@ -0,0 +1,459 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use chrono::{NaiveDateTime, Utc};
use sqlx::{Row, SqlitePool};
use activitypub_base::{FederationRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor};
use activitypub::RemoteReviewRepository;
use domain::models::{Review, ReviewSource};
fn datetime_to_str(dt: &NaiveDateTime) -> String {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
}
pub struct SqliteFederationRepository {
pool: SqlitePool,
}
impl SqliteFederationRepository {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
fn status_to_str(status: &FollowerStatus) -> &'static str {
match status {
FollowerStatus::Pending => "pending",
FollowerStatus::Accepted => "accepted",
FollowerStatus::Rejected => "rejected",
}
}
fn str_to_status(s: &str) -> FollowerStatus {
match s {
"accepted" => FollowerStatus::Accepted,
"rejected" => FollowerStatus::Rejected,
_ => FollowerStatus::Pending,
}
}
#[async_trait]
impl FederationRepository for SqliteFederationRepository {
async fn add_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
follow_activity_id: &str,
) -> Result<()> {
let uid = local_user_id.to_string();
let status_str = status_to_str(&status);
let now = Utc::now().naive_utc();
let created_at = datetime_to_str(&now);
sqlx::query(
"INSERT INTO ap_followers (local_user_id, remote_actor_url, status, created_at, follow_activity_id)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(local_user_id, remote_actor_url) DO UPDATE SET
status = excluded.status,
follow_activity_id = excluded.follow_activity_id",
)
.bind(&uid)
.bind(remote_actor_url)
.bind(status_str)
.bind(&created_at)
.bind(follow_activity_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_follower_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>> {
let uid = local_user_id.to_string();
let row: Option<Option<String>> = sqlx::query_scalar(
"SELECT follow_activity_id FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?",
)
.bind(&uid)
.bind(remote_actor_url)
.fetch_optional(&self.pool)
.await?;
Ok(row.flatten())
}
async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> {
let uid = local_user_id.to_string();
sqlx::query("DELETE FROM ap_followers WHERE local_user_id = ? AND remote_actor_url = ?")
.bind(&uid)
.bind(remote_actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<Follower>> {
let uid = local_user_id.to_string();
let rows = sqlx::query(
"SELECT f.remote_actor_url, f.status,
a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
FROM ap_followers f
LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url
WHERE f.local_user_id = ?",
)
.bind(&uid)
.fetch_all(&self.pool)
.await?;
let followers = rows
.into_iter()
.map(|row| {
let url: String = row.get("remote_actor_url");
let status_str: String = row.get("status");
let handle: String = row.try_get("handle").unwrap_or_default();
let inbox_url: String = row.try_get("inbox_url").unwrap_or_default();
let shared_inbox_url: Option<String> = row.try_get("shared_inbox_url").ok().flatten();
let display_name: Option<String> = row.try_get("display_name").ok().flatten();
Follower {
actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name },
status: str_to_status(&status_str),
}
})
.collect();
Ok(followers)
}
async fn update_follower_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
) -> Result<()> {
let uid = local_user_id.to_string();
let status_str = status_to_str(&status);
let result = sqlx::query(
"UPDATE ap_followers SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?",
)
.bind(status_str)
.bind(&uid)
.bind(remote_actor_url)
.execute(&self.pool)
.await?;
if result.rows_affected() == 0 {
tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_follower_status: no row found");
}
Ok(())
}
async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> {
let uid = local_user_id.to_string();
let now = Utc::now().naive_utc();
let created_at = datetime_to_str(&now);
self.upsert_remote_actor(actor.clone()).await?;
sqlx::query(
"INSERT OR IGNORE INTO ap_following (local_user_id, remote_actor_url, follow_activity_id, created_at)
VALUES (?, ?, ?, ?)",
)
.bind(&uid)
.bind(&actor.url)
.bind(follow_activity_id)
.bind(&created_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<Option<String>> {
let uid = local_user_id.to_string();
let row: Option<Option<String>> = sqlx::query_scalar(
"SELECT follow_activity_id FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?",
)
.bind(&uid)
.bind(remote_actor_url)
.fetch_optional(&self.pool)
.await?;
Ok(row.flatten())
}
async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
let uid = local_user_id.to_string();
sqlx::query("DELETE FROM ap_following WHERE local_user_id = ? AND remote_actor_url = ?")
.bind(&uid)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_following(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>> {
let uid = local_user_id.to_string();
let rows = sqlx::query(
"SELECT a.url, a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
FROM ap_following f
INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url
WHERE f.local_user_id = ? AND f.status = 'accepted'",
)
.bind(&uid)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|row| RemoteActor {
url: row.get("url"),
handle: row.get("handle"),
inbox_url: row.get("inbox_url"),
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
display_name: row.try_get("display_name").ok().flatten(),
}).collect())
}
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize> {
let uid = local_user_id.to_string();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ap_following WHERE local_user_id = ? AND status = 'accepted'",
)
.bind(&uid)
.fetch_one(&self.pool)
.await?;
Ok(count as usize)
}
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> {
let now = Utc::now().naive_utc();
let fetched_at = datetime_to_str(&now);
sqlx::query(
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, fetched_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
handle = excluded.handle,
inbox_url = excluded.inbox_url,
shared_inbox_url = excluded.shared_inbox_url,
display_name = excluded.display_name,
fetched_at = excluded.fetched_at",
)
.bind(&actor.url)
.bind(&actor.handle)
.bind(&actor.inbox_url)
.bind(&actor.shared_inbox_url)
.bind(&actor.display_name)
.bind(&fetched_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_remote_actor(&self, actor_url: &str) -> Result<Option<RemoteActor>> {
let row = sqlx::query(
"SELECT url, handle, inbox_url, shared_inbox_url, display_name
FROM ap_remote_actors WHERE url = ?",
)
.bind(actor_url)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|row| RemoteActor {
url: row.get("url"),
handle: row.get("handle"),
inbox_url: row.get("inbox_url"),
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
display_name: row.try_get("display_name").ok().flatten(),
}))
}
async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result<Option<(String, String)>> {
let uid = user_id.to_string();
let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = ?")
.bind(&uid)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| (r.get("public_key"), r.get("private_key"))))
}
async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> {
let uid = user_id.to_string();
let now = Utc::now().naive_utc();
let created_at = datetime_to_str(&now);
sqlx::query(
"INSERT INTO ap_local_actors (user_id, public_key, private_key, created_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
public_key = excluded.public_key,
private_key = excluded.private_key",
)
.bind(&uid)
.bind(&public_key)
.bind(&private_key)
.bind(&created_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_pending_followers(&self, local_user_id: uuid::Uuid) -> Result<Vec<RemoteActor>> {
let uid = local_user_id.to_string();
let rows = sqlx::query(
"SELECT f.remote_actor_url,
a.handle, a.inbox_url, a.shared_inbox_url, a.display_name
FROM ap_followers f
LEFT JOIN ap_remote_actors a ON a.url = f.remote_actor_url
WHERE f.local_user_id = ? AND f.status = 'pending'",
)
.bind(&uid)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|row| RemoteActor {
url: row.get("remote_actor_url"),
handle: row.try_get("handle").unwrap_or_default(),
inbox_url: row.try_get("inbox_url").unwrap_or_default(),
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
display_name: row.try_get("display_name").ok().flatten(),
}).collect())
}
async fn update_following_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowingStatus,
) -> Result<()> {
let uid = local_user_id.to_string();
let status_str = match status {
FollowingStatus::Pending => "pending",
FollowingStatus::Accepted => "accepted",
};
let result = sqlx::query(
"UPDATE ap_following SET status = ? WHERE local_user_id = ? AND remote_actor_url = ?",
)
.bind(status_str)
.bind(&uid)
.bind(remote_actor_url)
.execute(&self.pool)
.await?;
if result.rows_affected() == 0 {
tracing::warn!(local_user_id = %local_user_id, remote_actor_url, "update_following_status: no row found");
}
Ok(())
}
}
// --- Content-specific repository (movies-diary) ---
#[async_trait]
impl RemoteReviewRepository for SqliteFederationRepository {
async fn save_remote_review(
&self,
review: &Review,
ap_id: &str,
movie_title: &str,
release_year: u16,
poster_url: Option<&str>,
) -> Result<()> {
let actor_url = match review.source() {
ReviewSource::Remote { actor_url } => actor_url.clone(),
ReviewSource::Local => {
return Err(anyhow!("save_remote_review called with a local review"));
}
};
let movie_id = review.movie_id().value().to_string();
let _ = sqlx::query(
"INSERT INTO movies (id, external_metadata_id, title, release_year, director, poster_path)
VALUES (?, NULL, ?, ?, NULL, ?)
ON CONFLICT(id) DO UPDATE SET
poster_path = COALESCE(excluded.poster_path, movies.poster_path)",
)
.bind(&movie_id)
.bind(movie_title)
.bind(release_year.max(1888) as i64)
.bind(poster_url)
.execute(&self.pool)
.await?;
let id = review.id().value().to_string();
let user_id = review.user_id().value().to_string();
let rating = review.rating().value() as i64;
let comment = review.comment().map(|c| c.value().to_string());
let watched_at = datetime_to_str(review.watched_at());
let created_at = datetime_to_str(review.created_at());
sqlx::query(
"INSERT OR IGNORE INTO reviews (id, movie_id, user_id, rating, comment, watched_at, created_at, remote_actor_url, ap_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&movie_id)
.bind(&user_id)
.bind(rating)
.bind(&comment)
.bind(&watched_at)
.bind(&created_at)
.bind(&actor_url)
.bind(ap_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete_remote_review(&self, ap_id: &str, actor_url: &str) -> Result<()> {
sqlx::query("DELETE FROM reviews WHERE ap_id = ? AND remote_actor_url = ?")
.bind(ap_id)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn update_remote_review(
&self,
ap_id: &str,
actor_url: &str,
rating: u8,
comment: Option<&str>,
watched_at: chrono::NaiveDateTime,
) -> Result<()> {
let watched_at_str = datetime_to_str(&watched_at);
sqlx::query(
"UPDATE reviews SET rating = ?, comment = ?, watched_at = ?
WHERE ap_id = ? AND remote_actor_url = ?",
)
.bind(rating as i64)
.bind(comment)
.bind(&watched_at_str)
.bind(ap_id)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete_by_actor(&self, actor_url: &str) -> Result<()> {
sqlx::query("DELETE FROM reviews WHERE remote_actor_url = ?")
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
}

View File

@@ -12,8 +12,6 @@ sqlx = { version = "0.8.6", features = [
] } ] }
domain = { workspace = true } domain = { workspace = true }
activitypub = { workspace = true }
activitypub-base = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }

View File

@@ -12,7 +12,6 @@ use domain::{
}; };
use sqlx::SqlitePool; use sqlx::SqlitePool;
mod federation;
mod migrations; mod migrations;
mod models; mod models;
mod users; mod users;
@@ -22,7 +21,6 @@ use models::{
UserTotalsRow, datetime_to_str, UserTotalsRow, datetime_to_str,
}; };
pub use federation::SqliteFederationRepository;
pub use users::SqliteUserRepository; pub use users::SqliteUserRepository;
fn format_year_month(ym: &str) -> String { fn format_year_month(ym: &str) -> String {

View File

@@ -127,3 +127,8 @@ pub trait PasswordHasher: Send + Sync {
pub trait DiaryExporter: Send + Sync { pub trait DiaryExporter: Send + Sync {
async fn serialize_reviews(&self, reviews: &[Review]) -> Result<Vec<u8>, DomainError>; async fn serialize_reviews(&self, reviews: &[Review]) -> Result<Vec<u8>, DomainError>;
} }
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>;
}

View File

@@ -26,6 +26,7 @@ metadata = { workspace = true }
poster-fetcher = { workspace = true } poster-fetcher = { workspace = true }
poster-storage = { workspace = true } poster-storage = { workspace = true }
sqlite = { workspace = true } sqlite = { workspace = true }
sqlite-federation = { workspace = true }
activitypub = { workspace = true } activitypub = { workspace = true }
sqlx = { workspace = true } sqlx = { workspace = true }
template-askama = { workspace = true } template-askama = { workspace = true }

View File

@@ -3,7 +3,7 @@ use std::time::Duration;
use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster}; use application::{commands::SyncPosterCommand, context::AppContext, use_cases::sync_poster};
use async_trait::async_trait; use async_trait::async_trait;
use domain::{errors::DomainError, events::DomainEvent}; use domain::{errors::DomainError, events::DomainEvent};
use event_publisher::EventHandler; use domain::ports::EventHandler;
pub struct PosterSyncHandler { pub struct PosterSyncHandler {
ctx: AppContext, ctx: AppContext,

View File

@@ -181,7 +181,7 @@ mod tests {
}, },
html_renderer: Arc::new(PanicRenderer), html_renderer: Arc::new(PanicRenderer),
rss_renderer: Arc::new(PanicRssRenderer), rss_renderer: Arc::new(PanicRssRenderer),
ap_service: test_ap_service().await, ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService),
}; };
let app = test_router(state); let app = test_router(state);
@@ -231,49 +231,6 @@ mod tests {
} }
} }
async fn test_ap_service() -> std::sync::Arc<activitypub::ActivityPubService> {
use std::sync::Arc;
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool));
struct DummyApUserRepo;
#[async_trait::async_trait]
impl activitypub::ApUserRepository for DummyApUserRepo {
async fn find_by_id(&self, _: uuid::Uuid) -> anyhow::Result<Option<activitypub::ApUser>> { Ok(None) }
async fn find_by_username(&self, _: &str) -> anyhow::Result<Option<activitypub::ApUser>> { Ok(None) }
}
struct DummyObjectHandler;
#[async_trait::async_trait]
impl activitypub::ApObjectHandler for DummyObjectHandler {
async fn get_local_objects_for_user(&self, _: uuid::Uuid) -> anyhow::Result<Vec<(url::Url, serde_json::Value)>> { Ok(vec![]) }
async fn on_create(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) }
async fn on_update(&self, _: &url::Url, _: &url::Url, _: serde_json::Value) -> anyhow::Result<()> { Ok(()) }
async fn on_delete(&self, _: &url::Url, _: &url::Url) -> anyhow::Result<()> { Ok(()) }
async fn on_actor_removed(&self, _: &url::Url) -> anyhow::Result<()> { Ok(()) }
}
Arc::new(
activitypub::ActivityPubService::new(
fed_repo,
Arc::new(DummyApUserRepo),
Arc::new(DummyObjectHandler),
"http://localhost:3000".to_string(),
true,
)
.await
.unwrap(),
)
}
async fn panic_state() -> crate::state::AppState { async fn panic_state() -> crate::state::AppState {
use std::sync::Arc; use std::sync::Arc;
use application::context::AppContext; use application::context::AppContext;
@@ -334,7 +291,7 @@ mod tests {
}, },
html_renderer: Arc::new(PanicRenderer2), html_renderer: Arc::new(PanicRenderer2),
rss_renderer: Arc::new(PanicRssRenderer2), rss_renderer: Arc::new(PanicRssRenderer2),
ap_service: test_ap_service().await, ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService),
} }
} }
@@ -396,7 +353,7 @@ mod tests {
}, },
html_renderer: Arc::new(PanicRenderer3), html_renderer: Arc::new(PanicRenderer3),
rss_renderer: Arc::new(PanicRssRenderer3), rss_renderer: Arc::new(PanicRssRenderer3),
ap_service: test_ap_service().await, ap_service: std::sync::Arc::new(activitypub::NoopActivityPubService),
} }
} }

View File

@@ -5,23 +5,29 @@ pub mod html {
use std::str::FromStr; use std::str::FromStr;
use axum::{ use axum::{
Form,
extract::{Path, Query, State}, extract::{Path, Query, State},
http::{HeaderValue, StatusCode, header::SET_COOKIE}, http::{HeaderValue, StatusCode, header::SET_COOKIE},
response::{Html, IntoResponse, Redirect}, response::{Html, IntoResponse, Redirect},
Form,
}; };
use chrono::Utc; use chrono::Utc;
use uuid::Uuid; use uuid::Uuid;
use application::{ use application::{
commands::{DeleteReviewCommand, LoginCommand, RegisterCommand}, commands::{DeleteReviewCommand, LoginCommand, RegisterCommand},
ports::{FollowersPageData, FollowingPageData, HtmlPageContext, LoginPageData, NewReviewPageData, RegisterPageData, RemoteActorView}, ports::{
FollowersPageData, FollowingPageData, HtmlPageContext, LoginPageData,
NewReviewPageData, RegisterPageData, RemoteActorView,
},
use_cases::{delete_review, log_review, login as login_uc, register as register_uc}, use_cases::{delete_review, log_review, login as login_uc, register as register_uc},
}; };
use domain::{errors::DomainError, value_objects::UserId}; use domain::{errors::DomainError, value_objects::UserId};
use crate::{ use crate::{
dtos::{DiaryQueryParams, ErrorQuery, FollowForm, FollowerActionForm, LoginForm, LogReviewData, LogReviewForm, RegisterForm, UnfollowForm}, dtos::{
DiaryQueryParams, ErrorQuery, FollowForm, FollowerActionForm, LogReviewData,
LogReviewForm, LoginForm, RegisterForm, UnfollowForm,
},
extractors::{OptionalCookieUser, RequiredCookieUser}, extractors::{OptionalCookieUser, RequiredCookieUser},
state::AppState, state::AppState,
}; };
@@ -56,15 +62,24 @@ pub mod html {
} }
fn secure_flag() -> &'static str { fn secure_flag() -> &'static str {
if std::env::var("SECURE_COOKIES").as_deref() == Ok("true") { "; Secure" } else { "" } if std::env::var("SECURE_COOKIES").as_deref() == Ok("true") {
"; Secure"
} else {
""
}
} }
fn set_cookie_header(token: &str, max_age: i64) -> (axum::http::HeaderName, HeaderValue) { fn set_cookie_header(token: &str, max_age: i64) -> (axum::http::HeaderName, HeaderValue) {
let val = format!( let val = format!(
"token={}; HttpOnly; Path=/; SameSite=Strict; Max-Age={}{}", "token={}; HttpOnly; Path=/; SameSite=Strict; Max-Age={}{}",
token, max_age, secure_flag() token,
max_age,
secure_flag()
); );
(SET_COOKIE, HeaderValue::from_str(&val).expect("valid cookie")) (
SET_COOKIE,
HeaderValue::from_str(&val).expect("valid cookie"),
)
} }
pub async fn get_login_page( pub async fn get_login_page(
@@ -112,8 +127,14 @@ pub mod html {
} }
pub async fn get_logout() -> impl IntoResponse { pub async fn get_logout() -> impl IntoResponse {
let val = format!("token=; HttpOnly; Path=/; SameSite=Strict; Max-Age=0{}", secure_flag()); let val = format!(
let cookie = (SET_COOKIE, HeaderValue::from_str(&val).expect("valid cookie")); "token=; HttpOnly; Path=/; SameSite=Strict; Max-Age=0{}",
secure_flag()
);
let cookie = (
SET_COOKIE,
HeaderValue::from_str(&val).expect("valid cookie"),
);
([cookie], Redirect::to("/")).into_response() ([cookie], Redirect::to("/")).into_response()
} }
@@ -171,9 +192,8 @@ pub mod html {
Err(_) => Redirect::to("/login").into_response(), Err(_) => Redirect::to("/login").into_response(),
} }
} }
Err(_) => { Err(_) => Redirect::to("/register?error=Registration+failed.+Please+try+again.")
Redirect::to("/register?error=Registration+failed.+Please+try+again.").into_response() .into_response(),
}
} }
} }
@@ -203,7 +223,7 @@ pub mod html {
let data = match LogReviewData::try_from(form) { let data = match LogReviewData::try_from(form) {
Ok(d) => d, Ok(d) => d,
Err(_) => { Err(_) => {
return Redirect::to("/reviews/new?error=Invalid+date+format").into_response() return Redirect::to("/reviews/new?error=Invalid+date+format").into_response();
} }
}; };
@@ -230,7 +250,9 @@ pub mod html {
Ok(()) => { Ok(()) => {
let redirect_url = form let redirect_url = form
.redirect_after .redirect_after
.filter(|url| (url.starts_with('/') && !url.starts_with("//")) || url.starts_with('?')) .filter(|url| {
(url.starts_with('/') && !url.starts_with("//")) || url.starts_with('?')
})
.unwrap_or_else(|| "/".to_string()); .unwrap_or_else(|| "/".to_string());
Redirect::to(&redirect_url).into_response() Redirect::to(&redirect_url).into_response()
} }
@@ -281,7 +303,12 @@ pub mod html {
let mut ctx = build_page_context(&state, user_id).await; let mut ctx = build_page_context(&state, user_id).await;
ctx.page_title = "Members — Movies Diary".to_string(); ctx.page_title = "Members — Movies Diary".to_string();
ctx.canonical_url = format!("{}/users", state.app_ctx.config.base_url); ctx.canonical_url = format!("{}/users", state.app_ctx.config.base_url);
match application::use_cases::get_users::execute(&state.app_ctx, application::queries::GetUsersQuery).await { match application::use_cases::get_users::execute(
&state.app_ctx,
application::queries::GetUsersQuery,
)
.await
{
Ok(users) => { Ok(users) => {
let data = application::ports::UsersPageData { ctx, users }; let data = application::ports::UsersPageData { ctx, users };
match state.html_renderer.render_users_page(data) { match state.html_renderer.render_users_page(data) {
@@ -301,15 +328,24 @@ pub mod html {
Query(params): Query<crate::dtos::ProfileQueryParams>, Query(params): Query<crate::dtos::ProfileQueryParams>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// Content negotiation: AP clients request application/activity+json // Content negotiation: AP clients request application/activity+json
let accept = headers.get(axum::http::header::ACCEPT) let accept = headers
.get(axum::http::header::ACCEPT)
.and_then(|v| v.to_str().ok()) .and_then(|v| v.to_str().ok())
.unwrap_or(""); .unwrap_or("");
if accept.contains("application/activity+json") || accept.contains("application/ld+json") { if accept.contains("application/activity+json") || accept.contains("application/ld+json") {
return match state.ap_service.actor_json(&profile_user_uuid.to_string()).await { return match state
.ap_service
.actor_json(&profile_user_uuid.to_string())
.await
{
Ok(json) => ( Ok(json) => (
[(axum::http::header::CONTENT_TYPE, "application/activity+json")], [(
axum::http::header::CONTENT_TYPE,
"application/activity+json",
)],
json, json,
).into_response(), )
.into_response(),
Err(_) => StatusCode::NOT_FOUND.into_response(), Err(_) => StatusCode::NOT_FOUND.into_response(),
}; };
} }
@@ -318,10 +354,18 @@ pub mod html {
let view_str = params.view.as_deref().unwrap_or("recent"); let view_str = params.view.as_deref().unwrap_or("recent");
let profile_view = match application::queries::ProfileView::from_str(view_str) { let profile_view = match application::queries::ProfileView::from_str(view_str) {
Ok(v) => v, Ok(v) => v,
Err(_) => return (axum::http::StatusCode::BAD_REQUEST, "invalid view parameter").into_response(), Err(_) => {
return (
axum::http::StatusCode::BAD_REQUEST,
"invalid view parameter",
)
.into_response();
}
}; };
let profile_user = match state.app_ctx.user_repository let profile_user = match state
.app_ctx
.user_repository
.find_by_id(&domain::value_objects::UserId::from_uuid(profile_user_uuid)) .find_by_id(&domain::value_objects::UserId::from_uuid(profile_user_uuid))
.await .await
{ {
@@ -332,15 +376,23 @@ pub mod html {
let display_name = profile_user.username().value(); let display_name = profile_user.username().value();
ctx.page_title = format!("{}'s Diary — Movies Diary", display_name); ctx.page_title = format!("{}'s Diary — Movies Diary", display_name);
ctx.canonical_url = format!("{}/users/{}", state.app_ctx.config.base_url, profile_user_uuid); ctx.canonical_url = format!(
"{}/users/{}",
state.app_ctx.config.base_url, profile_user_uuid
);
let is_own_profile = user_id.as_ref() let is_own_profile = user_id
.as_ref()
.map(|u| u.value() == profile_user_uuid) .map(|u| u.value() == profile_user_uuid)
.unwrap_or(false); .unwrap_or(false);
let following_count = if is_own_profile { let following_count = if is_own_profile {
if let Some(ref uid) = user_id { if let Some(ref uid) = user_id {
state.ap_service.count_following(uid.value()).await.unwrap_or(0) state
.ap_service
.count_following(uid.value())
.await
.unwrap_or(0)
} else { } else {
0 0
} }
@@ -349,7 +401,8 @@ pub mod html {
}; };
let followers_count = if is_own_profile { let followers_count = if is_own_profile {
state.ap_service state
.ap_service
.count_accepted_followers(profile_user_uuid) .count_accepted_followers(profile_user_uuid)
.await .await
.unwrap_or(0) .unwrap_or(0)
@@ -358,7 +411,8 @@ pub mod html {
}; };
let pending_followers = if is_own_profile { let pending_followers = if is_own_profile {
state.ap_service state
.ap_service
.get_pending_followers(profile_user_uuid) .get_pending_followers(profile_user_uuid)
.await .await
.unwrap_or_default() .unwrap_or_default()
@@ -382,9 +436,12 @@ pub mod html {
match application::use_cases::get_user_profile::execute(&state.app_ctx, query).await { match application::use_cases::get_user_profile::execute(&state.app_ctx, query).await {
Ok(profile) => { Ok(profile) => {
let (offset, has_more, limit) = profile.entries.as_ref() let (offset, has_more, limit) = profile
.entries
.as_ref()
.map(|e| { .map(|e| {
let has_more = (e.offset as u64).saturating_add(e.limit as u64) < e.total_count; let has_more =
(e.offset as u64).saturating_add(e.limit as u64) < e.total_count;
(e.offset, has_more, e.limit) (e.offset, has_more, e.limit)
}) })
.unwrap_or((0, false, super::DEFAULT_PAGE_LIMIT)); .unwrap_or((0, false, super::DEFAULT_PAGE_LIMIT));
@@ -444,11 +501,20 @@ pub mod html {
if user_id.value() != profile_user_uuid { if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response(); return StatusCode::FORBIDDEN.into_response();
} }
match state.ap_service.unfollow(user_id.value(), &form.actor_url).await { match state
Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid)).into_response(), .ap_service
.unfollow(user_id.value(), &form.actor_url)
.await
{
Ok(()) => Redirect::to(&format!("/users/{}/following-list", profile_user_uuid))
.into_response(),
Err(e) => { Err(e) => {
let msg = encode_error(&e.to_string()); let msg = encode_error(&e.to_string());
Redirect::to(&format!("/users/{}/following-list?error={}", profile_user_uuid, msg)).into_response() Redirect::to(&format!(
"/users/{}/following-list?error={}",
profile_user_uuid, msg
))
.into_response()
} }
} }
} }
@@ -462,7 +528,11 @@ pub mod html {
if user_id.value() != profile_user_uuid { if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response(); return StatusCode::FORBIDDEN.into_response();
} }
match state.ap_service.accept_follower(user_id.value(), &form.actor_url).await { match state
.ap_service
.accept_follower(user_id.value(), &form.actor_url)
.await
{
Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(),
Err(e) => { Err(e) => {
let msg = encode_error(&e.to_string()); let msg = encode_error(&e.to_string());
@@ -480,7 +550,11 @@ pub mod html {
if user_id.value() != profile_user_uuid { if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response(); return StatusCode::FORBIDDEN.into_response();
} }
match state.ap_service.reject_follower(user_id.value(), &form.actor_url).await { match state
.ap_service
.reject_follower(user_id.value(), &form.actor_url)
.await
{
Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(), Ok(_) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(),
Err(e) => { Err(e) => {
let msg = encode_error(&e.to_string()); let msg = encode_error(&e.to_string());
@@ -500,14 +574,20 @@ pub mod html {
} }
let mut ctx = build_page_context(&state, Some(user_id.clone())).await; let mut ctx = build_page_context(&state, Some(user_id.clone())).await;
ctx.page_title = "Following — Movies Diary".to_string(); ctx.page_title = "Following — Movies Diary".to_string();
ctx.canonical_url = format!("{}/users/{}/following-list", state.app_ctx.config.base_url, profile_user_uuid); ctx.canonical_url = format!(
"{}/users/{}/following-list",
state.app_ctx.config.base_url, profile_user_uuid
);
match state.ap_service.get_following(user_id.value()).await { match state.ap_service.get_following(user_id.value()).await {
Ok(following) => { Ok(following) => {
let actors = following.into_iter().map(|a| RemoteActorView { let actors = following
handle: a.handle, .into_iter()
display_name: a.display_name, .map(|a| RemoteActorView {
url: a.url, handle: a.handle,
}).collect(); display_name: a.display_name,
url: a.url,
})
.collect();
let data = FollowingPageData { let data = FollowingPageData {
ctx, ctx,
user_id: profile_user_uuid, user_id: profile_user_uuid,
@@ -521,7 +601,11 @@ pub mod html {
} }
Err(e) => { Err(e) => {
tracing::error!("get_following error: {:?}", e); tracing::error!("get_following error: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to load following list").into_response() (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to load following list",
)
.into_response()
} }
} }
} }
@@ -537,14 +621,24 @@ pub mod html {
} }
let mut ctx = build_page_context(&state, Some(user_id.clone())).await; let mut ctx = build_page_context(&state, Some(user_id.clone())).await;
ctx.page_title = "Followers — Movies Diary".to_string(); ctx.page_title = "Followers — Movies Diary".to_string();
ctx.canonical_url = format!("{}/users/{}/followers-list", state.app_ctx.config.base_url, profile_user_uuid); ctx.canonical_url = format!(
match state.ap_service.get_accepted_followers(user_id.value()).await { "{}/users/{}/followers-list",
state.app_ctx.config.base_url, profile_user_uuid
);
match state
.ap_service
.get_accepted_followers(user_id.value())
.await
{
Ok(followers) => { Ok(followers) => {
let actors = followers.into_iter().map(|a| RemoteActorView { let actors = followers
handle: a.handle, .into_iter()
display_name: a.display_name, .map(|a| RemoteActorView {
url: a.url, handle: a.handle,
}).collect(); display_name: a.display_name,
url: a.url,
})
.collect();
let data = FollowersPageData { let data = FollowersPageData {
ctx, ctx,
user_id: profile_user_uuid, user_id: profile_user_uuid,
@@ -558,7 +652,11 @@ pub mod html {
} }
Err(e) => { Err(e) => {
tracing::error!("get_followers error: {:?}", e); tracing::error!("get_followers error: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to load followers list").into_response() (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to load followers list",
)
.into_response()
} }
} }
} }
@@ -572,11 +670,20 @@ pub mod html {
if user_id.value() != profile_user_uuid { if user_id.value() != profile_user_uuid {
return StatusCode::FORBIDDEN.into_response(); return StatusCode::FORBIDDEN.into_response();
} }
match state.ap_service.remove_follower(user_id.value(), &form.actor_url).await { match state
Ok(_) => Redirect::to(&format!("/users/{}/followers-list", profile_user_uuid)).into_response(), .ap_service
.remove_follower(user_id.value(), &form.actor_url)
.await
{
Ok(_) => Redirect::to(&format!("/users/{}/followers-list", profile_user_uuid))
.into_response(),
Err(e) => { Err(e) => {
let msg = encode_error(&e.to_string()); let msg = encode_error(&e.to_string());
Redirect::to(&format!("/users/{}/followers-list?error={}", profile_user_uuid, msg)).into_response() Redirect::to(&format!(
"/users/{}/followers-list?error={}",
profile_user_uuid, msg
))
.into_response()
} }
} }
} }
@@ -644,7 +751,10 @@ pub mod rss {
.rss_renderer .rss_renderer
.render_feed(&page.items, "Movie Diary") .render_feed(&page.items, "Movie Diary")
.map_err(|e| ApiError(DomainError::InfrastructureError(e)))?; .map_err(|e| ApiError(DomainError::InfrastructureError(e)))?;
Ok(([(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], xml)) Ok((
[(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")],
xml,
))
} }
pub async fn get_user_feed( pub async fn get_user_feed(
@@ -676,7 +786,10 @@ pub mod rss {
.render_feed(&page.items, &title) .render_feed(&page.items, &title)
.map_err(|e| ApiError(DomainError::InfrastructureError(e)))?; .map_err(|e| ApiError(DomainError::InfrastructureError(e)))?;
Ok(([(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")], xml)) Ok((
[(header::CONTENT_TYPE, "application/rss+xml; charset=utf-8")],
xml,
))
} }
} }
@@ -692,7 +805,10 @@ pub mod api {
use application::{ use application::{
commands::{DeleteReviewCommand, LoginCommand, RegisterCommand, SyncPosterCommand}, commands::{DeleteReviewCommand, LoginCommand, RegisterCommand, SyncPosterCommand},
queries::GetReviewHistoryQuery, queries::GetReviewHistoryQuery,
use_cases::{delete_review, get_diary, get_review_history, log_review, login as login_uc, register as register_uc, sync_poster}, use_cases::{
delete_review, get_diary, get_review_history, log_review, login as login_uc,
register as register_uc, sync_poster,
},
}; };
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
@@ -703,8 +819,8 @@ pub mod api {
use crate::{ use crate::{
dtos::{ dtos::{
DiaryEntryDto, DiaryQueryParams, DiaryResponse, LoginRequest, LoginResponse, DiaryEntryDto, DiaryQueryParams, DiaryResponse, LogReviewData, LogReviewRequest,
LogReviewData, LogReviewRequest, MovieDto, RegisterRequest, ReviewDto, LoginRequest, LoginResponse, MovieDto, RegisterRequest, ReviewDto,
ReviewHistoryResponse, ReviewHistoryResponse,
}, },
errors::ApiError, errors::ApiError,
@@ -730,11 +846,8 @@ pub mod api {
State(state): State<AppState>, State(state): State<AppState>,
Path(movie_id): Path<Uuid>, Path(movie_id): Path<Uuid>,
) -> Result<Json<ReviewHistoryResponse>, ApiError> { ) -> Result<Json<ReviewHistoryResponse>, ApiError> {
let (history, trend) = get_review_history::execute( let (history, trend) =
&state.app_ctx, get_review_history::execute(&state.app_ctx, GetReviewHistoryQuery { movie_id }).await?;
GetReviewHistoryQuery { movie_id },
)
.await?;
Ok(Json(ReviewHistoryResponse { Ok(Json(ReviewHistoryResponse {
movie: movie_to_dto(history.movie()), movie: movie_to_dto(history.movie()),
@@ -796,10 +909,13 @@ pub mod api {
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<LoginRequest>, Json(req): Json<LoginRequest>,
) -> Result<Json<LoginResponse>, ApiError> { ) -> Result<Json<LoginResponse>, ApiError> {
let result = login_uc::execute(&state.app_ctx, LoginCommand { let result = login_uc::execute(
email: req.email, &state.app_ctx,
password: req.password, LoginCommand {
}) email: req.email,
password: req.password,
},
)
.await?; .await?;
Ok(Json(LoginResponse { Ok(Json(LoginResponse {
token: result.token, token: result.token,
@@ -813,11 +929,14 @@ pub mod api {
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<RegisterRequest>, Json(req): Json<RegisterRequest>,
) -> Result<StatusCode, ApiError> { ) -> Result<StatusCode, ApiError> {
register_uc::execute(&state.app_ctx, RegisterCommand { register_uc::execute(
email: req.email, &state.app_ctx,
username: req.username, RegisterCommand {
password: req.password, email: req.email,
}) username: req.username,
password: req.password,
},
)
.await?; .await?;
Ok(StatusCode::CREATED) Ok(StatusCode::CREATED)
} }

View File

@@ -15,8 +15,9 @@ use auth::{AuthConfig, Argon2PasswordHasher, JwtAuthService};
use metadata::MetadataClientImpl; use metadata::MetadataClientImpl;
use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher}; use poster_fetcher::{PosterFetcherConfig, ReqwestPosterFetcher};
use poster_storage::{PosterStorageAdapter, StorageConfig}; use poster_storage::{PosterStorageAdapter, StorageConfig};
use activitypub::{ActivityPubEventHandler, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler}; use activitypub::{ActivityPubEventHandler, ActivityPubPort, ActivityPubService, DomainUserRepoAdapter, ReviewObjectHandler};
use sqlite::{SqliteFederationRepository, SqliteMovieRepository, SqliteUserRepository}; use sqlite::{SqliteMovieRepository, SqliteUserRepository};
use sqlite_federation::SqliteFederationRepository;
use rss::RssAdapter; use rss::RssAdapter;
use template_askama::AskamaHtmlRenderer; use template_askama::AskamaHtmlRenderer;
@@ -27,11 +28,11 @@ async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok(); dotenvy::dotenv().ok();
init_tracing(); init_tracing();
let state = wire_dependencies() let (state, ap_router) = wire_dependencies()
.await .await
.context("Failed to wire dependencies")?; .context("Failed to wire dependencies")?;
let app = routes::build_router(state); let app = routes::build_router(state, ap_router);
let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = std::env::var("PORT").unwrap_or_else(|_| "3000".to_string()); let port = std::env::var("PORT").unwrap_or_else(|_| "3000".to_string());
@@ -43,7 +44,7 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
async fn wire_dependencies() -> anyhow::Result<AppState> { async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
let auth_config = AuthConfig::from_env()?; let auth_config = AuthConfig::from_env()?;
let storage_config = StorageConfig::from_env()?; let storage_config = StorageConfig::from_env()?;
let app_config = AppConfig::from_env(); let app_config = AppConfig::from_env();
@@ -100,7 +101,7 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
review_store: Arc::clone(&federation_repo) as Arc<dyn activitypub::RemoteReviewRepository>, review_store: Arc::clone(&federation_repo) as Arc<dyn activitypub::RemoteReviewRepository>,
base_url: app_config.base_url.clone(), base_url: app_config.base_url.clone(),
}); });
let ap_service = Arc::new( let concrete_ap_service = Arc::new(
ActivityPubService::new( ActivityPubService::new(
federation_repo, federation_repo,
user_repo_adapter, user_repo_adapter,
@@ -110,11 +111,13 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
) )
.await?, .await?,
); );
let ap_router = concrete_ap_service.router();
let ap_event_handler = ActivityPubEventHandler::new( let ap_event_handler = ActivityPubEventHandler::new(
Arc::clone(&ap_service), Arc::clone(&concrete_ap_service),
Arc::clone(&repository), Arc::clone(&repository),
app_config.base_url.clone(), app_config.base_url.clone(),
); );
let ap_service: Arc<dyn ActivityPubPort> = concrete_ap_service;
let poster_handler = PosterSyncHandler::new(handler_ctx, 3); let poster_handler = PosterSyncHandler::new(handler_ctx, 3);
let (event_publisher, event_worker) = create_event_channel( let (event_publisher, event_worker) = create_event_channel(
@@ -135,14 +138,15 @@ async fn wire_dependencies() -> anyhow::Result<AppState> {
config: app_config, config: app_config,
}; };
Ok(AppState { let state = AppState {
app_ctx, app_ctx,
html_renderer: Arc::new(AskamaHtmlRenderer::new()), html_renderer: Arc::new(AskamaHtmlRenderer::new()),
rss_renderer: Arc::new(RssAdapter::new( rss_renderer: Arc::new(RssAdapter::new(
std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost:3000".into()), std::env::var("BASE_URL").unwrap_or_else(|_| "http://localhost:3000".into()),
)), )),
ap_service, ap_service,
}) };
Ok((state, ap_router))
} }
fn init_tracing() { fn init_tracing() {

View File

@@ -9,21 +9,6 @@ use tower_http::{services::ServeDir, trace::TraceLayer};
use crate::{handlers, state::AppState}; use crate::{handlers, state::AppState};
/// Build an ActivityPub router from the service, excluding routes that
/// conflict with HTML routes (/users/{id} and /users/{id}/following).
/// Those AP endpoints are still served via the federation middleware layer
/// applied to the whole AP router scope; the conflicting paths will need
/// content-negotiation wrappers added in Phase 5.
fn ap_routes(state: &AppState) -> Router {
let config = state.ap_service.federation_config();
Router::new()
.route("/.well-known/webfinger", routing::get(activitypub::webfinger::webfinger_handler))
.route("/users/{user_id}/inbox", routing::post(activitypub::inbox::inbox_handler))
.route("/users/{user_id}/outbox", routing::get(activitypub::outbox::outbox_handler))
.route("/users/{user_id}/followers", routing::get(activitypub::followers_handler::followers_handler))
.layer(config.middleware())
}
/// Simple global rate limiter: tracks request count per 60-second window. /// Simple global rate limiter: tracks request count per 60-second window.
/// Not per-IP — suitable for a low-traffic personal app. /// Not per-IP — suitable for a low-traffic personal app.
#[derive(Clone)] #[derive(Clone)]
@@ -60,9 +45,8 @@ impl RateLimiter {
} }
} }
pub fn build_router(state: AppState) -> Router { pub fn build_router(state: AppState, ap_router: Router) -> Router {
let rate_limit = state.app_ctx.config.rate_limit; let rate_limit = state.app_ctx.config.rate_limit;
let ap_router = ap_routes(&state);
Router::new() Router::new()
.merge(html_routes(rate_limit)) .merge(html_routes(rate_limit))
.merge(api_routes(rate_limit)) .merge(api_routes(rate_limit))

View File

@@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use activitypub::ActivityPubService; use activitypub::ActivityPubPort;
use application::context::AppContext; use application::context::AppContext;
use crate::ports::{HtmlRenderer, RssFeedRenderer}; use crate::ports::{HtmlRenderer, RssFeedRenderer};
@@ -10,5 +10,5 @@ pub struct AppState {
pub app_ctx: AppContext, pub app_ctx: AppContext,
pub html_renderer: Arc<dyn HtmlRenderer>, pub html_renderer: Arc<dyn HtmlRenderer>,
pub rss_renderer: Arc<dyn RssFeedRenderer>, pub rss_renderer: Arc<dyn RssFeedRenderer>,
pub ap_service: Arc<ActivityPubService>, pub ap_service: Arc<dyn ActivityPubPort>,
} }

View File

@@ -89,35 +89,6 @@ impl UserRepository for NobodyUserRepo {
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { panic!() } async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { panic!() }
} }
async fn test_ap_service() -> Arc<activitypub::ActivityPubService> {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_keypairs (user_id TEXT PRIMARY KEY, public_key TEXT NOT NULL, private_key TEXT NOT NULL)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_remote_actors (url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL, shared_inbox_url TEXT, display_name TEXT)")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_followers (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
sqlx::query("CREATE TABLE IF NOT EXISTS ap_following (local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL, PRIMARY KEY (local_user_id, remote_actor_url))")
.execute(&pool).await.unwrap();
let fed_repo = Arc::new(sqlite::SqliteFederationRepository::new(pool));
struct DummyUserRepo;
#[async_trait]
impl UserRepository for DummyUserRepo {
async fn find_by_email(&self, _: &Email) -> Result<Option<User>, DomainError> { Ok(None) }
async fn find_by_username(&self, _: &domain::value_objects::Username) -> Result<Option<User>, DomainError> { Ok(None) }
async fn save(&self, _: &User) -> Result<(), DomainError> { Ok(()) }
async fn find_by_id(&self, _: &UserId) -> Result<Option<User>, DomainError> { Ok(None) }
async fn list_with_stats(&self) -> Result<Vec<domain::models::UserSummary>, DomainError> { Ok(vec![]) }
}
let movie_pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
let movie_repo = Arc::new(sqlite::SqliteMovieRepository::new(movie_pool));
Arc::new(
activitypub::ActivityPubService::new(fed_repo, Arc::new(DummyUserRepo), movie_repo, "http://localhost:3000".to_string(), true)
.await
.unwrap(),
)
}
async fn test_app() -> Router { async fn test_app() -> Router {
let pool = SqlitePool::connect("sqlite::memory:") let pool = SqlitePool::connect("sqlite::memory:")
.await .await
@@ -139,10 +110,10 @@ async fn test_app() -> Router {
}, },
html_renderer: Arc::new(AskamaHtmlRenderer::new()), html_renderer: Arc::new(AskamaHtmlRenderer::new()),
rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())), rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())),
ap_service: test_ap_service().await, ap_service: Arc::new(activitypub::NoopActivityPubService),
}; };
routes::build_router(state) routes::build_router(state, axum::Router::new())
} }
#[tokio::test] #[tokio::test]