diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index b0986bd..a7909fa 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -11,6 +11,8 @@ uuid = { workspace = true } chrono = { workspace = true } sha2 = "0.10" hex = "0.4" +tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/crates/application/src/services/federation_event.rs b/crates/application/src/services/federation_event.rs index 5fa7c10..c698e89 100644 --- a/crates/application/src/services/federation_event.rs +++ b/crates/application/src/services/federation_event.rs @@ -2,7 +2,7 @@ use domain::{ errors::DomainError, events::DomainEvent, models::thought::{Thought, Visibility}, - ports::{OutboundFederationPort, ThoughtRepository, UserRepository}, + ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserRepository}, value_objects::ThoughtId, }; use std::sync::Arc; @@ -12,6 +12,8 @@ pub struct FederationEventService { pub users: Arc, pub ap: Arc, pub base_url: String, + pub federation_action: Arc, + pub ap_repo: Arc, } impl FederationEventService { @@ -112,6 +114,49 @@ impl FederationEventService { .await } + DomainEvent::FetchRemoteActorPosts { + actor_ap_url, + outbox_url, + } => { + let notes = match self + .federation_action + .fetch_outbox_page(outbox_url, 1) + .await + { + Ok(n) => n, + Err(e) => { + tracing::warn!(outbox_url, error = %e, "failed to fetch remote outbox"); + return Ok(()); + } + }; + + let actor_url = url::Url::parse(actor_ap_url) + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + let author_id = self.ap_repo.intern_remote_actor(&actor_url).await?; + + for note in notes { + let ap_id = match url::Url::parse(¬e.ap_id) { + Ok(u) => u, + Err(_) => continue, + }; + let _ = self + .ap_repo + .accept_note( + &ap_id, + &author_id, + ¬e.content, + note.published, + note.sensitive, + note.content_warning, + "public", + ) + .await; + } + + Ok(()) + } + _ => Ok(()), } } @@ -126,7 +171,7 @@ mod tests { events::DomainEvent, models::thought::{Thought, Visibility}, models::user::User, - ports::OutboundFederationPort, + ports::{ActivityPubRepository, OutboundFederationPort}, testing::TestStore, value_objects::*, }; @@ -208,6 +253,8 @@ mod tests { users: Arc::new(store.clone()), ap: spy, base_url: "https://example.com".to_string(), + federation_action: Arc::new(store.clone()), + ap_repo: Arc::new(store.clone()), } } @@ -531,4 +578,18 @@ mod tests { assert!(spy.updated.lock().unwrap().is_empty()); } + + #[tokio::test] + async fn fetch_remote_actor_posts_is_noop_when_outbox_empty() { + let store = TestStore::default(); + let spy = Arc::new(SpyPort::default()); + svc(&store, spy.clone()) + .process(&DomainEvent::FetchRemoteActorPosts { + actor_ap_url: "https://mastodon.social/users/alice".into(), + outbox_url: "https://mastodon.social/users/alice/outbox".into(), + }) + .await + .unwrap(); + // TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error + } } diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index da47251..8ff0ced 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use activitypub::ThoughtsObjectHandler; use activitypub_base::ActivityPubService; use application::services::{FederationEventService, NotificationEventService}; +use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort}; use postgres::activitypub::PgActivityPubRepository; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; @@ -34,7 +35,7 @@ pub async fn build( )); // ActivityPub service (for federation fan-out) - let ap_service: Arc = Arc::new( + let ap_service = Arc::new( ActivityPubService::new( Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresApUserRepository::new( @@ -54,6 +55,10 @@ pub async fn build( .await .expect("ActivityPubService build failed"), ); + let ap_outbound = ap_service.clone() as Arc; + let ap_federation = ap_service.clone() as Arc; + let ap_repo_worker = + Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc; // Application services let notification_svc = Arc::new(NotificationEventService { @@ -63,8 +68,10 @@ pub async fn build( let federation_svc = Arc::new(FederationEventService { thoughts, users, - ap: ap_service, + ap: ap_outbound, base_url: base_url.to_string(), + federation_action: ap_federation, + ap_repo: ap_repo_worker, }); // Thin handlers