Compare commits
11 Commits
7c08e4a942
...
10fcc27339
| Author | SHA1 | Date | |
|---|---|---|---|
| 10fcc27339 | |||
| bc6c767c29 | |||
| 20e70325c6 | |||
| e92c6789d9 | |||
| ca9a504632 | |||
| 2567103587 | |||
| b1d4b4de2d | |||
| 5cd7409491 | |||
| 1b41e7c1f5 | |||
| faeac18126 | |||
| 80983f1ff2 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -29,7 +29,9 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"axum",
|
"axum",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"domain",
|
||||||
"enum_delegate",
|
"enum_delegate",
|
||||||
|
"reqwest 0.13.3",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -6808,6 +6810,7 @@ dependencies = [
|
|||||||
"activitypub",
|
"activitypub",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"application",
|
"application",
|
||||||
|
"async-trait",
|
||||||
"auth",
|
"auth",
|
||||||
"domain",
|
"domain",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
|
|||||||
@@ -17,3 +17,5 @@ axum = { workspace = true }
|
|||||||
activitypub_federation = "0.7.0-beta.11"
|
activitypub_federation = "0.7.0-beta.11"
|
||||||
url = { version = "2", features = ["serde"] }
|
url = { version = "2", features = ["serde"] }
|
||||||
enum_delegate = "0.2"
|
enum_delegate = "0.2"
|
||||||
|
domain = { workspace = true }
|
||||||
|
reqwest = { workspace = true }
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use crate::actors::DbActor;
|
|||||||
use crate::data::FederationData;
|
use crate::data::FederationData;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::repository::{FollowerStatus, FollowingStatus};
|
use crate::repository::{FollowerStatus, FollowingStatus};
|
||||||
|
use domain::{events::DomainEvent, value_objects::UserId};
|
||||||
|
|
||||||
// --- Follow ---
|
// --- Follow ---
|
||||||
|
|
||||||
@@ -141,6 +142,24 @@ impl Activity for AcceptActivity {
|
|||||||
FollowingStatus::Accepted,
|
FollowingStatus::Accepted,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
if let Ok(Some(outbox_url)) = data
|
||||||
|
.federation_repo
|
||||||
|
.get_following_outbox_url(local_user_id, self.actor.inner().as_str())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
let event = DomainEvent::FollowAccepted {
|
||||||
|
local_user_id: UserId::from_uuid(local_user_id),
|
||||||
|
remote_actor_url: self.actor.inner().to_string(),
|
||||||
|
outbox_url,
|
||||||
|
};
|
||||||
|
if let Some(publisher) = &data.event_publisher {
|
||||||
|
if let Err(e) = publisher.publish(&event).await {
|
||||||
|
tracing::warn!(error = %e, "failed to publish FollowAccepted event");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote");
|
tracing::info!(remote_actor = %self.actor.inner(), "follow accepted by remote");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -219,6 +219,7 @@ impl Object for DbActor {
|
|||||||
shared_inbox_url: None,
|
shared_inbox_url: None,
|
||||||
display_name: json.name.clone(),
|
display_name: json.name.clone(),
|
||||||
avatar_url: json.icon.as_ref().map(|i| i.url.to_string()),
|
avatar_url: json.icon.as_ref().map(|i| i.url.to_string()),
|
||||||
|
outbox_url: Some(json.outbox.to_string()),
|
||||||
};
|
};
|
||||||
data.federation_repo.upsert_remote_actor(actor).await?;
|
data.federation_repo.upsert_remote_actor(actor).await?;
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
|||||||
use crate::content::ApObjectHandler;
|
use crate::content::ApObjectHandler;
|
||||||
use crate::repository::FederationRepository;
|
use crate::repository::FederationRepository;
|
||||||
use crate::user::ApUserRepository;
|
use crate::user::ApUserRepository;
|
||||||
|
use domain::ports::EventPublisher;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct FederationData {
|
pub struct FederationData {
|
||||||
@@ -13,6 +14,7 @@ pub struct FederationData {
|
|||||||
pub(crate) domain: String,
|
pub(crate) domain: String,
|
||||||
pub(crate) allow_registration: bool,
|
pub(crate) allow_registration: bool,
|
||||||
pub(crate) software_name: String,
|
pub(crate) software_name: String,
|
||||||
|
pub(crate) event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FederationData {
|
impl FederationData {
|
||||||
@@ -23,6 +25,7 @@ impl FederationData {
|
|||||||
base_url: String,
|
base_url: String,
|
||||||
allow_registration: bool,
|
allow_registration: bool,
|
||||||
software_name: String,
|
software_name: String,
|
||||||
|
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let domain = base_url
|
let domain = base_url
|
||||||
.trim_start_matches("https://")
|
.trim_start_matches("https://")
|
||||||
@@ -39,6 +42,7 @@ impl FederationData {
|
|||||||
domain,
|
domain,
|
||||||
allow_registration,
|
allow_registration,
|
||||||
software_name,
|
software_name,
|
||||||
|
event_publisher,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ pub struct RemoteActor {
|
|||||||
pub shared_inbox_url: Option<String>,
|
pub shared_inbox_url: Option<String>,
|
||||||
pub display_name: Option<String>,
|
pub display_name: Option<String>,
|
||||||
pub avatar_url: Option<String>,
|
pub avatar_url: Option<String>,
|
||||||
|
pub outbox_url: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -96,6 +97,11 @@ pub trait FederationRepository: Send + Sync {
|
|||||||
remote_actor_url: &str,
|
remote_actor_url: &str,
|
||||||
status: FollowingStatus,
|
status: FollowingStatus,
|
||||||
) -> Result<()>;
|
) -> Result<()>;
|
||||||
|
async fn get_following_outbox_url(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<Option<String>>;
|
||||||
async fn add_announce(
|
async fn add_announce(
|
||||||
&self,
|
&self,
|
||||||
activity_id: &str,
|
activity_id: &str,
|
||||||
|
|||||||
@@ -81,8 +81,12 @@ impl ActivityPubService {
|
|||||||
allow_registration: bool,
|
allow_registration: bool,
|
||||||
software_name: String,
|
software_name: String,
|
||||||
debug: bool,
|
debug: bool,
|
||||||
|
event_publisher: Option<Arc<dyn domain::ports::EventPublisher>>,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
let data = FederationData::new(repo, user_repo, object_handler, base_url.clone(), allow_registration, software_name);
|
let data = FederationData::new(
|
||||||
|
repo, user_repo, object_handler, base_url.clone(),
|
||||||
|
allow_registration, software_name, event_publisher,
|
||||||
|
);
|
||||||
let federation_config = ApFederationConfig::new(data, debug).await?;
|
let federation_config = ApFederationConfig::new(data, debug).await?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
federation_config,
|
federation_config,
|
||||||
@@ -173,6 +177,7 @@ impl ActivityPubService {
|
|||||||
shared_inbox_url: None,
|
shared_inbox_url: None,
|
||||||
display_name: Some(remote_actor.username.clone()),
|
display_name: Some(remote_actor.username.clone()),
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
|
outbox_url: Some(remote_actor.outbox_url.to_string()),
|
||||||
};
|
};
|
||||||
data.federation_repo
|
data.federation_repo
|
||||||
.add_following(local_user_id, remote, &follow_id_str)
|
.add_following(local_user_id, remote, &follow_id_str)
|
||||||
@@ -867,6 +872,7 @@ impl ActivityPubService {
|
|||||||
shared_inbox_url: None,
|
shared_inbox_url: None,
|
||||||
display_name: None,
|
display_name: None,
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
|
outbox_url: None,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
actors.push(actor);
|
actors.push(actor);
|
||||||
@@ -928,6 +934,7 @@ impl ActivityPubService {
|
|||||||
shared_inbox_url: None,
|
shared_inbox_url: None,
|
||||||
display_name: Some(target.username),
|
display_name: Some(target.username),
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
|
outbox_url: None,
|
||||||
};
|
};
|
||||||
data.federation_repo
|
data.federation_repo
|
||||||
.add_following(local_user_id, target_as_remote, &follow_id)
|
.add_following(local_user_id, target_as_remote, &follow_id)
|
||||||
@@ -968,6 +975,91 @@ impl ActivityPubService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
|
||||||
|
let client = reqwest::Client::builder()
|
||||||
|
.timeout(std::time::Duration::from_secs(30))
|
||||||
|
.build()?;
|
||||||
|
let data = self.federation_config.to_request_data();
|
||||||
|
let actor = url::Url::parse(actor_url)?;
|
||||||
|
|
||||||
|
let root: serde_json::Value = client
|
||||||
|
.get(outbox_url)
|
||||||
|
.header("Accept", "application/activity+json")
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let first = match root.get("first").and_then(|v| v.as_str()) {
|
||||||
|
Some(url) => url.to_string(),
|
||||||
|
None => {
|
||||||
|
tracing::debug!(outbox = %outbox_url, "outbox has no first page, nothing to backfill");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut current_url = first;
|
||||||
|
let mut visited = std::collections::HashSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if !visited.insert(current_url.clone()) {
|
||||||
|
tracing::warn!(url = %current_url, "backfill: loop detected, stopping");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let page: serde_json::Value = match client
|
||||||
|
.get(¤t_url)
|
||||||
|
.header("Accept", "application/activity+json")
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(resp) => match resp.json().await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, url = %current_url, "backfill: failed to parse page JSON");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, url = %current_url, "backfill: HTTP request failed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(items) = page.get("orderedItems").and_then(|v| v.as_array()) {
|
||||||
|
for item in items {
|
||||||
|
let activity_type = item.get("type").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
if activity_type != "Create" && activity_type != "Add" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let object = match item.get("object") {
|
||||||
|
Some(o) if o.is_object() => o.clone(),
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
let ap_id = match object
|
||||||
|
.get("id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.and_then(|s| url::Url::parse(s).ok())
|
||||||
|
{
|
||||||
|
Some(u) => u,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
if let Err(e) = data.object_handler.on_create(&ap_id, &actor, object).await {
|
||||||
|
tracing::warn!(ap_id = %ap_id, error = %e, "backfill: failed to process item, skipping");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match page.get("next").and_then(|v| v.as_str()) {
|
||||||
|
Some(next) => current_url = next.to_string(),
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(outbox = %outbox_url, pages = visited.len(), "backfill complete");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) {
|
fn spawn_backfill(&self, owner_user_id: uuid::Uuid, follower_inbox_url: String) {
|
||||||
let config = self.federation_config.clone();
|
let config = self.federation_config.clone();
|
||||||
let base_url = self.base_url.clone();
|
let base_url = self.base_url.clone();
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ fn make_follower(inbox: &str, shared: Option<&str>) -> Follower {
|
|||||||
shared_inbox_url: shared.map(|s| s.to_string()),
|
shared_inbox_url: shared.map(|s| s.to_string()),
|
||||||
display_name: None,
|
display_name: None,
|
||||||
avatar_url: None,
|
avatar_url: None,
|
||||||
|
outbox_url: None,
|
||||||
},
|
},
|
||||||
status: FollowerStatus::Accepted,
|
status: FollowerStatus::Accepted,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ pub async fn wire(
|
|||||||
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
diary_repo: std::sync::Arc<dyn domain::ports::DiaryRepository>,
|
||||||
base_url: String,
|
base_url: String,
|
||||||
allow_registration: bool,
|
allow_registration: bool,
|
||||||
|
event_publisher: std::sync::Arc<dyn domain::ports::EventPublisher>,
|
||||||
) -> anyhow::Result<ActivityPubWire> {
|
) -> anyhow::Result<ActivityPubWire> {
|
||||||
let review_handler = std::sync::Arc::new(ReviewObjectHandler {
|
let review_handler = std::sync::Arc::new(ReviewObjectHandler {
|
||||||
movie_repository: std::sync::Arc::clone(&movie_repo),
|
movie_repository: std::sync::Arc::clone(&movie_repo),
|
||||||
@@ -60,6 +61,7 @@ pub async fn wire(
|
|||||||
allow_registration,
|
allow_registration,
|
||||||
"movies-diary".to_string(),
|
"movies-diary".to_string(),
|
||||||
cfg!(debug_assertions),
|
cfg!(debug_assertions),
|
||||||
|
Some(event_publisher),
|
||||||
)
|
)
|
||||||
.await?,
|
.await?,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ pub trait ActivityPubPort: Send + Sync {
|
|||||||
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()>;
|
async fn add_blocked_domain(&self, domain: &str, reason: Option<&str>) -> anyhow::Result<()>;
|
||||||
async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>;
|
async fn remove_blocked_domain(&self, domain: &str) -> anyhow::Result<()>;
|
||||||
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>>;
|
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>>;
|
||||||
|
async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -97,6 +98,9 @@ impl ActivityPubPort for ActivityPubService {
|
|||||||
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
|
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
|
||||||
self.get_blocked_domains().await
|
self.get_blocked_domains().await
|
||||||
}
|
}
|
||||||
|
async fn backfill_outbox(&self, outbox_url: &str, actor_url: &str) -> anyhow::Result<()> {
|
||||||
|
self.backfill_outbox(outbox_url, actor_url).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NoopActivityPubService;
|
pub struct NoopActivityPubService;
|
||||||
@@ -154,4 +158,7 @@ impl ActivityPubPort for NoopActivityPubService {
|
|||||||
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
|
async fn get_blocked_domains(&self) -> anyhow::Result<Vec<BlockedDomain>> {
|
||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
async fn backfill_outbox(&self, _: &str, _: &str) -> anyhow::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -119,9 +119,11 @@ impl From<&DomainEvent> for EventPayload {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
DomainEvent::ImageStored { key } => EventPayload::ImageStored { key: key.clone() },
|
DomainEvent::ImageStored { key } => EventPayload::ImageStored { key: key.clone() },
|
||||||
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => {
|
DomainEvent::WatchlistEntryAdded { .. }
|
||||||
|
| DomainEvent::WatchlistEntryRemoved { .. }
|
||||||
|
| DomainEvent::FollowAccepted { .. } => {
|
||||||
// federation-only events; not serialized via EventPayload
|
// federation-only events; not serialized via EventPayload
|
||||||
unreachable!("watchlist events are handled by the AP event handler directly")
|
unreachable!("federation events are handled by the AP event handler directly")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
|
|||||||
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => {
|
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => {
|
||||||
unreachable!("watchlist events are not published to NATS")
|
unreachable!("watchlist events are not published to NATS")
|
||||||
}
|
}
|
||||||
|
DomainEvent::FollowAccepted { .. } => "follow.accepted",
|
||||||
};
|
};
|
||||||
format!("{prefix}.{suffix}")
|
format!("{prefix}.{suffix}")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
let display_name: Option<String> = row.try_get("display_name").ok().flatten();
|
let display_name: Option<String> = row.try_get("display_name").ok().flatten();
|
||||||
let avatar_url: Option<String> = row.try_get("avatar_url").ok().flatten();
|
let avatar_url: Option<String> = row.try_get("avatar_url").ok().flatten();
|
||||||
Follower {
|
Follower {
|
||||||
actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name, avatar_url },
|
actor: RemoteActor { url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url: row.try_get("outbox_url").ok().flatten() },
|
||||||
status: str_to_status(&status_str),
|
status: str_to_status(&status_str),
|
||||||
}
|
}
|
||||||
}).collect())
|
}).collect())
|
||||||
@@ -217,6 +217,7 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
}).collect())
|
}).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -235,14 +236,15 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
let now = Utc::now().naive_utc();
|
let now = Utc::now().naive_utc();
|
||||||
let fetched_at = datetime_to_str(&now);
|
let fetched_at = datetime_to_str(&now);
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, fetched_at)
|
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, fetched_at)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7::timestamptz)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::timestamptz)
|
||||||
ON CONFLICT(url) DO UPDATE SET
|
ON CONFLICT(url) DO UPDATE SET
|
||||||
handle = EXCLUDED.handle,
|
handle = EXCLUDED.handle,
|
||||||
inbox_url = EXCLUDED.inbox_url,
|
inbox_url = EXCLUDED.inbox_url,
|
||||||
shared_inbox_url = EXCLUDED.shared_inbox_url,
|
shared_inbox_url = EXCLUDED.shared_inbox_url,
|
||||||
display_name = EXCLUDED.display_name,
|
display_name = EXCLUDED.display_name,
|
||||||
avatar_url = EXCLUDED.avatar_url,
|
avatar_url = EXCLUDED.avatar_url,
|
||||||
|
outbox_url = COALESCE(EXCLUDED.outbox_url, ap_remote_actors.outbox_url),
|
||||||
fetched_at = EXCLUDED.fetched_at",
|
fetched_at = EXCLUDED.fetched_at",
|
||||||
)
|
)
|
||||||
.bind(&actor.url)
|
.bind(&actor.url)
|
||||||
@@ -251,6 +253,7 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
.bind(&actor.shared_inbox_url)
|
.bind(&actor.shared_inbox_url)
|
||||||
.bind(&actor.display_name)
|
.bind(&actor.display_name)
|
||||||
.bind(&actor.avatar_url)
|
.bind(&actor.avatar_url)
|
||||||
|
.bind(&actor.outbox_url)
|
||||||
.bind(&fetched_at)
|
.bind(&fetched_at)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -272,6 +275,7 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,6 +331,7 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
}).collect())
|
}).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,6 +360,25 @@ impl FederationRepository for PostgresFederationRepository {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_following_outbox_url(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<Option<String>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let row: Option<Option<String>> = sqlx::query_scalar(
|
||||||
|
"SELECT a.outbox_url
|
||||||
|
FROM ap_following f
|
||||||
|
INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url
|
||||||
|
WHERE f.local_user_id = $1 AND f.remote_actor_url = $2",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row.flatten())
|
||||||
|
}
|
||||||
|
|
||||||
async fn add_announce(
|
async fn add_announce(
|
||||||
&self,
|
&self,
|
||||||
activity_id: &str,
|
activity_id: &str,
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
ALTER TABLE ap_remote_actors ADD COLUMN outbox_url TEXT;
|
||||||
@@ -136,6 +136,7 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
shared_inbox_url,
|
shared_inbox_url,
|
||||||
display_name,
|
display_name,
|
||||||
avatar_url,
|
avatar_url,
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
},
|
},
|
||||||
status: str_to_status(&status_str),
|
status: str_to_status(&status_str),
|
||||||
}
|
}
|
||||||
@@ -244,6 +245,7 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
})
|
})
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
@@ -264,14 +266,15 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
let fetched_at = datetime_to_str(&now);
|
let fetched_at = datetime_to_str(&now);
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, fetched_at)
|
"INSERT INTO ap_remote_actors (url, handle, inbox_url, shared_inbox_url, display_name, avatar_url, outbox_url, fetched_at)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
ON CONFLICT(url) DO UPDATE SET
|
ON CONFLICT(url) DO UPDATE SET
|
||||||
handle = excluded.handle,
|
handle = excluded.handle,
|
||||||
inbox_url = excluded.inbox_url,
|
inbox_url = excluded.inbox_url,
|
||||||
shared_inbox_url = excluded.shared_inbox_url,
|
shared_inbox_url = excluded.shared_inbox_url,
|
||||||
display_name = excluded.display_name,
|
display_name = excluded.display_name,
|
||||||
avatar_url = excluded.avatar_url,
|
avatar_url = excluded.avatar_url,
|
||||||
|
outbox_url = COALESCE(excluded.outbox_url, ap_remote_actors.outbox_url),
|
||||||
fetched_at = excluded.fetched_at",
|
fetched_at = excluded.fetched_at",
|
||||||
)
|
)
|
||||||
.bind(&actor.url)
|
.bind(&actor.url)
|
||||||
@@ -280,6 +283,7 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
.bind(&actor.shared_inbox_url)
|
.bind(&actor.shared_inbox_url)
|
||||||
.bind(&actor.display_name)
|
.bind(&actor.display_name)
|
||||||
.bind(&actor.avatar_url)
|
.bind(&actor.avatar_url)
|
||||||
|
.bind(&actor.outbox_url)
|
||||||
.bind(&fetched_at)
|
.bind(&fetched_at)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -303,6 +307,7 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -369,6 +374,7 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
shared_inbox_url: row.try_get("shared_inbox_url").ok().flatten(),
|
||||||
display_name: row.try_get("display_name").ok().flatten(),
|
display_name: row.try_get("display_name").ok().flatten(),
|
||||||
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
avatar_url: row.try_get("avatar_url").ok().flatten(),
|
||||||
|
outbox_url: row.try_get("outbox_url").ok().flatten(),
|
||||||
})
|
})
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
@@ -401,6 +407,25 @@ impl FederationRepository for SqliteFederationRepository {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_following_outbox_url(
|
||||||
|
&self,
|
||||||
|
local_user_id: uuid::Uuid,
|
||||||
|
remote_actor_url: &str,
|
||||||
|
) -> Result<Option<String>> {
|
||||||
|
let uid = local_user_id.to_string();
|
||||||
|
let row: Option<Option<String>> = sqlx::query_scalar(
|
||||||
|
"SELECT a.outbox_url
|
||||||
|
FROM ap_following f
|
||||||
|
INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url
|
||||||
|
WHERE f.local_user_id = ? AND f.remote_actor_url = ?",
|
||||||
|
)
|
||||||
|
.bind(&uid)
|
||||||
|
.bind(remote_actor_url)
|
||||||
|
.fetch_optional(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row.flatten())
|
||||||
|
}
|
||||||
|
|
||||||
async fn add_announce(
|
async fn add_announce(
|
||||||
&self,
|
&self,
|
||||||
activity_id: &str,
|
activity_id: &str,
|
||||||
@@ -780,6 +805,79 @@ pub fn wire(pool: sqlx::SqlitePool) -> (
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod outbox_url_tests {
|
||||||
|
use super::*;
|
||||||
|
use activitypub_base::{FederationRepository, FollowingStatus, RemoteActor};
|
||||||
|
|
||||||
|
async fn setup_pool() -> SqlitePool {
|
||||||
|
let pool = SqlitePool::connect(":memory:").await.unwrap();
|
||||||
|
sqlx::query(
|
||||||
|
"CREATE TABLE ap_remote_actors (
|
||||||
|
url TEXT PRIMARY KEY, handle TEXT NOT NULL, inbox_url TEXT NOT NULL,
|
||||||
|
shared_inbox_url TEXT, display_name TEXT, avatar_url TEXT,
|
||||||
|
outbox_url TEXT, fetched_at TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE TABLE ap_following (
|
||||||
|
local_user_id TEXT NOT NULL, remote_actor_url TEXT NOT NULL,
|
||||||
|
follow_activity_id TEXT, created_at TEXT NOT NULL,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
PRIMARY KEY (local_user_id, remote_actor_url)
|
||||||
|
);",
|
||||||
|
)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
pool
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn get_following_outbox_url_returns_stored_url() {
|
||||||
|
let pool = setup_pool().await;
|
||||||
|
let repo = SqliteFederationRepository::new(pool);
|
||||||
|
let local_user = uuid::Uuid::new_v4();
|
||||||
|
let actor = RemoteActor {
|
||||||
|
url: "https://remote.example/users/alice".to_string(),
|
||||||
|
handle: "alice@remote.example".to_string(),
|
||||||
|
inbox_url: "https://remote.example/users/alice/inbox".to_string(),
|
||||||
|
shared_inbox_url: None,
|
||||||
|
display_name: None,
|
||||||
|
avatar_url: None,
|
||||||
|
outbox_url: Some("https://remote.example/users/alice/outbox".to_string()),
|
||||||
|
};
|
||||||
|
repo.add_following(local_user, actor, "https://local/activities/1")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
repo.update_following_status(
|
||||||
|
local_user,
|
||||||
|
"https://remote.example/users/alice",
|
||||||
|
FollowingStatus::Accepted,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result = repo
|
||||||
|
.get_following_outbox_url(local_user, "https://remote.example/users/alice")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
Some("https://remote.example/users/alice/outbox".to_string())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn get_following_outbox_url_returns_none_when_not_following() {
|
||||||
|
let pool = setup_pool().await;
|
||||||
|
let repo = SqliteFederationRepository::new(pool);
|
||||||
|
let result = repo
|
||||||
|
.get_following_outbox_url(uuid::Uuid::new_v4(), "https://remote.example/users/alice")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(result, None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
#[path = "tests/actor_block_tests.rs"]
|
#[path = "tests/actor_block_tests.rs"]
|
||||||
mod actor_block_tests;
|
mod actor_block_tests;
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
ALTER TABLE ap_remote_actors ADD COLUMN outbox_url TEXT;
|
||||||
@@ -4,6 +4,12 @@
|
|||||||
{% if let Some(err) = error %}
|
{% if let Some(err) = error %}
|
||||||
<p class="error">{{ err }}</p>
|
<p class="error">{{ err }}</p>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
<form method="POST" action="/users/{{ user_id }}/follow" class="follow-form">
|
||||||
|
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
|
||||||
|
<input type="hidden" name="redirect_after" value="/social/following">
|
||||||
|
<input type="text" name="handle" placeholder="@user@instance.tld" required>
|
||||||
|
<button type="submit">Follow</button>
|
||||||
|
</form>
|
||||||
{% if actors.is_empty() %}
|
{% if actors.is_empty() %}
|
||||||
<p>Not following anyone yet. Follow remote users from your <a href="/users/{{ user_id }}">profile page</a>.</p>
|
<p>Not following anyone yet. Follow remote users from your <a href="/users/{{ user_id }}">profile page</a>.</p>
|
||||||
{% else %}
|
{% else %}
|
||||||
|
|||||||
@@ -2,6 +2,14 @@
|
|||||||
{% block content %}
|
{% block content %}
|
||||||
<div class="users-list">
|
<div class="users-list">
|
||||||
<h2 class="page-title">Members</h2>
|
<h2 class="page-title">Members</h2>
|
||||||
|
{% if let Some(viewer_id) = ctx.user_id %}
|
||||||
|
<form method="POST" action="/users/{{ viewer_id }}/follow" class="follow-form">
|
||||||
|
<input type="hidden" name="_csrf" value="{{ ctx.csrf_token }}">
|
||||||
|
<input type="hidden" name="redirect_after" value="/users">
|
||||||
|
<input type="text" name="handle" placeholder="@user@instance.tld" required>
|
||||||
|
<button type="submit">Follow</button>
|
||||||
|
</form>
|
||||||
|
{% endif %}
|
||||||
{% for user in users %}
|
{% for user in users %}
|
||||||
<div class="user-row">
|
<div class="user-row">
|
||||||
<div class="user-avatar">{{ user.initial }}</div>
|
<div class="user-avatar">{{ user.initial }}</div>
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{errors::DomainError, events::{AckHandle, DomainEvent}};
|
|
||||||
use domain::value_objects::{ExternalMetadataId, MovieId};
|
use domain::value_objects::{ExternalMetadataId, MovieId};
|
||||||
|
use domain::{
|
||||||
|
errors::DomainError,
|
||||||
|
events::{AckHandle, DomainEvent},
|
||||||
|
};
|
||||||
use futures::{stream, stream::BoxStream};
|
use futures::{stream, stream::BoxStream};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
@@ -9,8 +12,12 @@ struct NoopAck;
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl AckHandle for NoopAck {
|
impl AckHandle for NoopAck {
|
||||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
async fn ack(&self) -> Result<(), DomainError> {
|
||||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn nack(&self) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct VecConsumer {
|
struct VecConsumer {
|
||||||
@@ -45,7 +52,10 @@ impl EventHandler for RecordingHandler {
|
|||||||
DomainEvent::UserUpdated { .. } => "user_updated",
|
DomainEvent::UserUpdated { .. } => "user_updated",
|
||||||
DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested",
|
DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested",
|
||||||
DomainEvent::ImageStored { .. } => "image_stored",
|
DomainEvent::ImageStored { .. } => "image_stored",
|
||||||
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => "watchlist",
|
DomainEvent::WatchlistEntryAdded { .. } | DomainEvent::WatchlistEntryRemoved { .. } => {
|
||||||
|
"watchlist"
|
||||||
|
}
|
||||||
|
DomainEvent::FollowAccepted { .. } => "follow_accepted",
|
||||||
};
|
};
|
||||||
self.calls.lock().unwrap().push(label);
|
self.calls.lock().unwrap().push(label);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -62,8 +72,12 @@ fn movie_discovered() -> DomainEvent {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn dispatches_to_all_handlers() {
|
async fn dispatches_to_all_handlers() {
|
||||||
let calls = Arc::new(Mutex::new(vec![]));
|
let calls = Arc::new(Mutex::new(vec![]));
|
||||||
let consumer = VecConsumer { events: vec![movie_discovered()] };
|
let consumer = VecConsumer {
|
||||||
let handler = RecordingHandler { calls: Arc::clone(&calls) };
|
events: vec![movie_discovered()],
|
||||||
|
};
|
||||||
|
let handler = RecordingHandler {
|
||||||
|
calls: Arc::clone(&calls),
|
||||||
|
};
|
||||||
|
|
||||||
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
|
WorkerService::new(Arc::new(consumer), vec![Arc::new(handler)])
|
||||||
.run()
|
.run()
|
||||||
@@ -82,7 +96,9 @@ async fn nacks_when_handler_fails() {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl AckHandle for TrackingAck {
|
impl AckHandle for TrackingAck {
|
||||||
async fn ack(&self) -> Result<(), DomainError> { Ok(()) }
|
async fn ack(&self) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
async fn nack(&self) -> Result<(), DomainError> {
|
async fn nack(&self) -> Result<(), DomainError> {
|
||||||
*self.nack_called.lock().unwrap() = true;
|
*self.nack_called.lock().unwrap() = true;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -98,7 +114,9 @@ async fn nacks_when_handler_fails() {
|
|||||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
let envelope = EventEnvelope::new(
|
let envelope = EventEnvelope::new(
|
||||||
self.event.clone(),
|
self.event.clone(),
|
||||||
Box::new(TrackingAck { nack_called: Arc::clone(&self.nack_called) }),
|
Box::new(TrackingAck {
|
||||||
|
nack_called: Arc::clone(&self.nack_called),
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||||
}
|
}
|
||||||
@@ -139,7 +157,9 @@ async fn acks_when_all_handlers_succeed() {
|
|||||||
*self.ack_called.lock().unwrap() = true;
|
*self.ack_called.lock().unwrap() = true;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn nack(&self) -> Result<(), DomainError> { Ok(()) }
|
async fn nack(&self) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TrackingConsumer {
|
struct TrackingConsumer {
|
||||||
@@ -151,7 +171,9 @@ async fn acks_when_all_handlers_succeed() {
|
|||||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||||
let envelope = EventEnvelope::new(
|
let envelope = EventEnvelope::new(
|
||||||
self.event.clone(),
|
self.event.clone(),
|
||||||
Box::new(TrackingAck { ack_called: Arc::clone(&self.ack_called) }),
|
Box::new(TrackingAck {
|
||||||
|
ack_called: Arc::clone(&self.ack_called),
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
Box::pin(stream::iter(vec![Ok(envelope)]))
|
Box::pin(stream::iter(vec![Ok(envelope)]))
|
||||||
}
|
}
|
||||||
@@ -162,9 +184,7 @@ async fn acks_when_all_handlers_succeed() {
|
|||||||
ack_called: Arc::clone(&ack_called),
|
ack_called: Arc::clone(&ack_called),
|
||||||
};
|
};
|
||||||
|
|
||||||
WorkerService::new(Arc::new(consumer), vec![])
|
WorkerService::new(Arc::new(consumer), vec![]).run().await;
|
||||||
.run()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
assert!(*ack_called.lock().unwrap());
|
assert!(*ack_called.lock().unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,6 +56,11 @@ pub enum DomainEvent {
|
|||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
movie_id: MovieId,
|
movie_id: MovieId,
|
||||||
},
|
},
|
||||||
|
FollowAccepted {
|
||||||
|
local_user_id: UserId,
|
||||||
|
remote_actor_url: String,
|
||||||
|
outbox_url: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -82,3 +87,23 @@ impl EventEnvelope {
|
|||||||
self.ack.nack().await
|
self.ack.nack().await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::value_objects::UserId;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn follow_accepted_matches() {
|
||||||
|
let uid = UserId::from_uuid(uuid::Uuid::new_v4());
|
||||||
|
let event = DomainEvent::FollowAccepted {
|
||||||
|
local_user_id: uid.clone(),
|
||||||
|
remote_actor_url: "https://remote.example/users/alice".to_string(),
|
||||||
|
outbox_url: "https://remote.example/users/alice/outbox".to_string(),
|
||||||
|
};
|
||||||
|
let DomainEvent::FollowAccepted { outbox_url, .. } = event else {
|
||||||
|
panic!("wrong variant");
|
||||||
|
};
|
||||||
|
assert_eq!(outbox_url, "https://remote.example/users/alice/outbox");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -85,6 +85,8 @@ pub struct FollowForm {
|
|||||||
pub handle: String,
|
pub handle: String,
|
||||||
#[serde(rename = "_csrf", default)]
|
#[serde(rename = "_csrf", default)]
|
||||||
pub csrf_token: String,
|
pub csrf_token: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub redirect_after: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
|||||||
@@ -718,12 +718,20 @@ pub async fn follow_remote_user(
|
|||||||
if crate::csrf::mismatch(&csrf, &form.csrf_token) {
|
if crate::csrf::mismatch(&csrf, &form.csrf_token) {
|
||||||
return StatusCode::FORBIDDEN.into_response();
|
return StatusCode::FORBIDDEN.into_response();
|
||||||
}
|
}
|
||||||
|
let redirect_base = form
|
||||||
|
.redirect_after
|
||||||
|
.as_deref()
|
||||||
|
.filter(|u| u.starts_with('/') && !u.starts_with("//"))
|
||||||
|
.unwrap_or(&format!("/users/{}", profile_user_uuid))
|
||||||
|
.to_string();
|
||||||
|
|
||||||
match state.ap_service.follow(user_id.value(), &form.handle).await {
|
match state.ap_service.follow(user_id.value(), &form.handle).await {
|
||||||
Ok(()) => Redirect::to(&format!("/users/{}", profile_user_uuid)).into_response(),
|
Ok(()) => Redirect::to(&redirect_base).into_response(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("follow error: {:?}", e);
|
tracing::error!("follow error: {:?}", e);
|
||||||
let msg = encode_error(&e.to_string());
|
let msg = encode_error(&e.to_string());
|
||||||
Redirect::to(&format!("/users/{}?error={}", profile_user_uuid, msg)).into_response()
|
let sep = if redirect_base.contains('?') { '&' } else { '?' };
|
||||||
|
Redirect::to(&format!("{}{}error={}", redirect_base, sep, msg)).into_response()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,20 +86,6 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} federation is not supported by this build"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ap = activitypub::wire(
|
|
||||||
federation_repo,
|
|
||||||
review_store,
|
|
||||||
remote_watchlist_repo.clone(),
|
|
||||||
Arc::clone(&user_repository),
|
|
||||||
Arc::clone(&movie_repository),
|
|
||||||
Arc::clone(&review_repository),
|
|
||||||
Arc::clone(&diary_repository),
|
|
||||||
app_config.base_url.clone(),
|
|
||||||
app_config.allow_registration,
|
|
||||||
).await?;
|
|
||||||
let ap_router = ap.router;
|
|
||||||
let ap_service_arc = ap.service;
|
|
||||||
|
|
||||||
let ep: Arc<dyn EventPublisher> = match event_bus {
|
let ep: Arc<dyn EventPublisher> = match event_bus {
|
||||||
EventBusBackend::Db => {
|
EventBusBackend::Db => {
|
||||||
tracing::info!("event bus: DB queue");
|
tracing::info!("event bus: DB queue");
|
||||||
@@ -122,6 +108,22 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> {
|
|||||||
nats::create_publisher(cfg).await?
|
nats::create_publisher(cfg).await?
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let ap = activitypub::wire(
|
||||||
|
federation_repo,
|
||||||
|
review_store,
|
||||||
|
remote_watchlist_repo.clone(),
|
||||||
|
Arc::clone(&user_repository),
|
||||||
|
Arc::clone(&movie_repository),
|
||||||
|
Arc::clone(&review_repository),
|
||||||
|
Arc::clone(&diary_repository),
|
||||||
|
app_config.base_url.clone(),
|
||||||
|
app_config.allow_registration,
|
||||||
|
Arc::clone(&ep),
|
||||||
|
).await?;
|
||||||
|
let ap_router = ap.router;
|
||||||
|
let ap_service_arc = ap.service;
|
||||||
|
|
||||||
(ep, ap_router, ap_service_arc, social_query_arc, remote_watchlist_repo)
|
(ep, ap_router, ap_service_arc, social_query_arc, remote_watchlist_repo)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ importer = { workspace = true }
|
|||||||
image-converter = { workspace = true }
|
image-converter = { workspace = true }
|
||||||
nats = { workspace = true, optional = true }
|
nats = { workspace = true, optional = true }
|
||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
|
||||||
# Optional — database backends
|
# Optional — database backends
|
||||||
sqlite = { workspace = true, optional = true }
|
sqlite = { workspace = true, optional = true }
|
||||||
|
|||||||
22
crates/worker/src/follow_backfill_handler.rs
Normal file
22
crates/worker/src/follow_backfill_handler.rs
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use domain::{errors::DomainError, events::DomainEvent, ports::EventHandler};
|
||||||
|
|
||||||
|
pub struct FollowBackfillHandler {
|
||||||
|
pub ap_service: Arc<dyn activitypub::ActivityPubPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EventHandler for FollowBackfillHandler {
|
||||||
|
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
let DomainEvent::FollowAccepted { remote_actor_url, outbox_url, .. } = event else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
tracing::info!(actor = %remote_actor_url, outbox = %outbox_url, "starting outbox backfill");
|
||||||
|
self.ap_service
|
||||||
|
.backfill_outbox(outbox_url, remote_actor_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
mod db;
|
mod db;
|
||||||
mod event_bus;
|
mod event_bus;
|
||||||
|
mod follow_backfill_handler;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -166,7 +167,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
{
|
{
|
||||||
let ap = activitypub::wire(
|
let ap_wire = activitypub::wire(
|
||||||
fed_federation_repo,
|
fed_federation_repo,
|
||||||
fed_review_store,
|
fed_review_store,
|
||||||
fed_remote_watchlist_repo,
|
fed_remote_watchlist_repo,
|
||||||
@@ -176,12 +177,18 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
fed_diary_repo,
|
fed_diary_repo,
|
||||||
base_url,
|
base_url,
|
||||||
allow_registration,
|
allow_registration,
|
||||||
).await?.event_handler;
|
Arc::clone(&ctx.event_publisher),
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
let ap_event_handler = ap_wire.event_handler;
|
||||||
|
let backfill = Arc::new(follow_backfill_handler::FollowBackfillHandler {
|
||||||
|
ap_service: ap_wire.service,
|
||||||
|
}) as Arc<dyn EventHandler>;
|
||||||
|
|
||||||
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
let search_cleanup = Arc::new(SearchCleanupHandler::new(Arc::clone(&ctx.search_command), Arc::clone(&ctx.person_query))) as Arc<dyn EventHandler>;
|
||||||
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
let discovery_indexer = Arc::new(MovieDiscoveryIndexer::new(Arc::clone(&ctx.movie_repository), Arc::clone(&ctx.search_command))) as Arc<dyn EventHandler>;
|
||||||
tracing::info!("federation event handler registered");
|
tracing::info!("federation event handler registered");
|
||||||
let mut h = vec![poster, cleanup, ap, search_cleanup, discovery_indexer];
|
let mut h = vec![poster, cleanup, ap_event_handler, backfill, search_cleanup, discovery_indexer];
|
||||||
if let Some(e) = enrichment_handler { h.push(e); }
|
if let Some(e) = enrichment_handler { h.push(e); }
|
||||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||||
h
|
h
|
||||||
|
|||||||
Reference in New Issue
Block a user