feat(worker): handle FetchRemoteActorPosts — fetch and store remote outbox notes

This commit is contained in:
2026-05-14 22:23:20 +02:00
parent f3c3637ade
commit dc3afeca26
3 changed files with 74 additions and 4 deletions

View File

@@ -11,6 +11,8 @@ uuid = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
sha2 = "0.10" sha2 = "0.10"
hex = "0.4" hex = "0.4"
tracing = { workspace = true }
url = { workspace = true }
[dev-dependencies] [dev-dependencies]
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }

View File

@@ -2,7 +2,7 @@ use domain::{
errors::DomainError, errors::DomainError,
events::DomainEvent, events::DomainEvent,
models::thought::{Thought, Visibility}, models::thought::{Thought, Visibility},
ports::{OutboundFederationPort, ThoughtRepository, UserRepository}, ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserRepository},
value_objects::ThoughtId, value_objects::ThoughtId,
}; };
use std::sync::Arc; use std::sync::Arc;
@@ -12,6 +12,8 @@ pub struct FederationEventService {
pub users: Arc<dyn UserRepository>, pub users: Arc<dyn UserRepository>,
pub ap: Arc<dyn OutboundFederationPort>, pub ap: Arc<dyn OutboundFederationPort>,
pub base_url: String, pub base_url: String,
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
pub ap_repo: Arc<dyn ActivityPubRepository>,
} }
impl FederationEventService { impl FederationEventService {
@@ -112,6 +114,49 @@ impl FederationEventService {
.await .await
} }
DomainEvent::FetchRemoteActorPosts {
actor_ap_url,
outbox_url,
} => {
let notes = match self
.federation_action
.fetch_outbox_page(outbox_url, 1)
.await
{
Ok(n) => n,
Err(e) => {
tracing::warn!(outbox_url, error = %e, "failed to fetch remote outbox");
return Ok(());
}
};
let actor_url = url::Url::parse(actor_ap_url)
.map_err(|e| DomainError::ExternalService(e.to_string()))?;
let author_id = self.ap_repo.intern_remote_actor(&actor_url).await?;
for note in notes {
let ap_id = match url::Url::parse(&note.ap_id) {
Ok(u) => u,
Err(_) => continue,
};
let _ = self
.ap_repo
.accept_note(
&ap_id,
&author_id,
&note.content,
note.published,
note.sensitive,
note.content_warning,
"public",
)
.await;
}
Ok(())
}
_ => Ok(()), _ => Ok(()),
} }
} }
@@ -126,7 +171,7 @@ mod tests {
events::DomainEvent, events::DomainEvent,
models::thought::{Thought, Visibility}, models::thought::{Thought, Visibility},
models::user::User, models::user::User,
ports::OutboundFederationPort, ports::{ActivityPubRepository, OutboundFederationPort},
testing::TestStore, testing::TestStore,
value_objects::*, value_objects::*,
}; };
@@ -208,6 +253,8 @@ mod tests {
users: Arc::new(store.clone()), users: Arc::new(store.clone()),
ap: spy, ap: spy,
base_url: "https://example.com".to_string(), base_url: "https://example.com".to_string(),
federation_action: Arc::new(store.clone()),
ap_repo: Arc::new(store.clone()),
} }
} }
@@ -531,4 +578,18 @@ mod tests {
assert!(spy.updated.lock().unwrap().is_empty()); assert!(spy.updated.lock().unwrap().is_empty());
} }
#[tokio::test]
async fn fetch_remote_actor_posts_is_noop_when_outbox_empty() {
let store = TestStore::default();
let spy = Arc::new(SpyPort::default());
svc(&store, spy.clone())
.process(&DomainEvent::FetchRemoteActorPosts {
actor_ap_url: "https://mastodon.social/users/alice".into(),
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
})
.await
.unwrap();
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
}
} }

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use activitypub::ThoughtsObjectHandler; use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService; use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService}; use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository; use postgres::activitypub::PgActivityPubRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository}; use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
@@ -34,7 +35,7 @@ pub async fn build(
)); ));
// ActivityPub service (for federation fan-out) // ActivityPub service (for federation fan-out)
let ap_service: Arc<dyn domain::ports::OutboundFederationPort> = Arc::new( let ap_service = Arc::new(
ActivityPubService::new( ActivityPubService::new(
Arc::new(PostgresFederationRepository::new(pool.clone())), Arc::new(PostgresFederationRepository::new(pool.clone())),
Arc::new(PostgresApUserRepository::new( Arc::new(PostgresApUserRepository::new(
@@ -54,6 +55,10 @@ pub async fn build(
.await .await
.expect("ActivityPubService build failed"), .expect("ActivityPubService build failed"),
); );
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
// Application services // Application services
let notification_svc = Arc::new(NotificationEventService { let notification_svc = Arc::new(NotificationEventService {
@@ -63,8 +68,10 @@ pub async fn build(
let federation_svc = Arc::new(FederationEventService { let federation_svc = Arc::new(FederationEventService {
thoughts, thoughts,
users, users,
ap: ap_service, ap: ap_outbound,
base_url: base_url.to_string(), base_url: base_url.to_string(),
federation_action: ap_federation,
ap_repo: ap_repo_worker,
}); });
// Thin handlers // Thin handlers