diff --git a/crates/adapters/activitypub/src/handler.rs b/crates/adapters/activitypub/src/handler.rs index 6e4098a..3c36a92 100644 --- a/crates/adapters/activitypub/src/handler.rs +++ b/crates/adapters/activitypub/src/handler.rs @@ -10,7 +10,7 @@ use url::Url; use crate::note::{ThoughtNote, ThoughtNoteInput}; use crate::port::{AcceptNoteInput, ActivityPubRepository}; use crate::urls::ThoughtsUrls; -use domain::ports::{EventPublisher, TagRepository}; +use domain::ports::{BoostRepository, EventPublisher, LikeRepository, TagRepository}; use domain::value_objects::UserId; use k_ap::{ApContentReader, ApObjectHandler}; @@ -19,6 +19,8 @@ pub struct ThoughtsObjectHandler { urls: ThoughtsUrls, event_publisher: Option>, tag_repo: Arc, + likes: Arc, + boosts: Arc, } impl ThoughtsObjectHandler { @@ -27,12 +29,16 @@ impl ThoughtsObjectHandler { base_url: &str, event_publisher: Option>, tag_repo: Arc, + likes: Arc, + boosts: Arc, ) -> Self { Self { repo, urls: ThoughtsUrls::new(base_url), event_publisher, tag_repo, + likes, + boosts, } } } @@ -106,6 +112,10 @@ impl ApObjectHandler for ThoughtsObjectHandler { .intern_remote_actor(actor_url.as_str()) .await .map_err(|e| anyhow!("{e}"))?; + let _ = self + .repo + .sync_remote_actor_to_user(actor_url.as_str()) + .await; let as_public = "https://www.w3.org/ns/activitystreams#Public"; let in_to = note.to.iter().any(|s| s == as_public); @@ -194,17 +204,50 @@ impl ApObjectHandler for ThoughtsObjectHandler { async fn on_update( &self, ap_id: &Url, - _actor_url: &Url, + actor_url: &Url, object: serde_json::Value, ) -> Result<()> { - let Some((note, _)) = ThoughtNote::try_from_ap(object) else { - tracing::debug!(ap_id = %ap_id, "on_update: skipping non-Note object"); - return Ok(()); - }; - self.repo - .apply_note_update(ap_id.as_str(), ¬e.content) - .await - .map_err(|e| anyhow!("{e}")) + let obj_type = object.get("type").and_then(|v| v.as_str()).unwrap_or(""); + match obj_type { + "Note" | "Article" | "Page" => { + let Some((note, _)) = ThoughtNote::try_from_ap(object) else { + return Ok(()); + }; + self.repo + .apply_note_update(ap_id.as_str(), ¬e.content) + .await + .map_err(|e| anyhow!("{e}")) + } + "Person" | "Service" | "Application" | "Group" | "Organization" => { + let display_name = object.get("name").and_then(|v| v.as_str()); + let avatar_url = object + .get("icon") + .and_then(|v| v.get("url")) + .and_then(|v| v.as_str()); + self.repo + .update_remote_actor_display( + &self + .repo + .find_remote_actor_id(actor_url.as_str()) + .await + .map_err(|e| anyhow!("{e}"))? + .ok_or_else(|| anyhow!("unknown actor"))?, + display_name, + avatar_url, + ) + .await + .map_err(|e| anyhow!("{e}"))?; + let _ = self + .repo + .sync_remote_actor_to_user(actor_url.as_str()) + .await; + Ok(()) + } + _ => { + tracing::debug!(ap_id = %ap_id, obj_type, "on_update: skipping"); + Ok(()) + } + } } async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> { @@ -245,14 +288,24 @@ impl ApObjectHandler for ThoughtsObjectHandler { let actor_user_id = match actor_user_id { Some(id) => id, None => { - tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping notification"); + tracing::debug!(actor = %actor_url, "on_like: remote actor not interned, skipping"); return Ok(()); } }; + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let like_id = domain::value_objects::LikeId::new(); + + let like = domain::models::social::Like { + id: like_id.clone(), + user_id: actor_user_id.clone(), + thought_id: thought_id.clone(), + ap_id: Some(object_url.to_string()), + created_at: Utc::now(), + }; + let _ = self.likes.save(&like).await; + if let Some(ep) = &self.event_publisher { - let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); - let like_id = domain::value_objects::LikeId::new(); ep.publish(&domain::events::DomainEvent::LikeAdded { like_id, user_id: actor_user_id, @@ -294,10 +347,13 @@ impl ApObjectHandler for ThoughtsObjectHandler { } }; + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let _ = self.likes.delete(&actor_user_id, &thought_id).await; + if let Some(ep) = &self.event_publisher { ep.publish(&domain::events::DomainEvent::LikeRemoved { user_id: actor_user_id, - thought_id: domain::value_objects::ThoughtId::from_uuid(thought_uuid), + thought_id, }) .await .map_err(|e| anyhow!("{e}"))?; @@ -369,9 +425,19 @@ impl ApObjectHandler for ThoughtsObjectHandler { None => return Ok(()), }; + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let boost_id = domain::value_objects::BoostId::new(); + + let boost = domain::models::social::Boost { + id: boost_id.clone(), + user_id: actor_user_id.clone(), + thought_id: thought_id.clone(), + ap_id: Some(object_url.to_string()), + created_at: Utc::now(), + }; + let _ = self.boosts.save(&boost).await; + if let Some(ep) = &self.event_publisher { - let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); - let boost_id = domain::value_objects::BoostId::new(); ep.publish(&domain::events::DomainEvent::BoostAdded { boost_id, user_id: actor_user_id, @@ -384,6 +450,44 @@ impl ApObjectHandler for ThoughtsObjectHandler { Ok(()) } + async fn on_announce_removed(&self, object_url: &Url, actor_url: &Url) -> Result<()> { + let thought_uuid = object_url + .path() + .strip_prefix(THOUGHTS_PATH_PREFIX) + .and_then(|s| s.split('/').next()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + + let thought_uuid = match thought_uuid { + Some(u) => u, + None => return Ok(()), + }; + + let actor_user_id = self + .repo + .find_remote_actor_id(actor_url.as_str()) + .await + .map_err(|e| anyhow!("{e}"))?; + + let actor_user_id = match actor_user_id { + Some(id) => id, + None => return Ok(()), + }; + + let thought_id = domain::value_objects::ThoughtId::from_uuid(thought_uuid); + let _ = self.boosts.delete(&actor_user_id, &thought_id).await; + + if let Some(ep) = &self.event_publisher { + ep.publish(&domain::events::DomainEvent::BoostRemoved { + user_id: actor_user_id, + thought_id, + }) + .await + .map_err(|e| anyhow!("{e}"))?; + } + + Ok(()) + } + async fn on_announce_of_remote(&self, _object_url: &Url, _actor_url: &Url) -> Result<()> { Ok(()) } diff --git a/crates/adapters/activitypub/src/note/mod.rs b/crates/adapters/activitypub/src/note/mod.rs index 5a30004..ee59325 100644 --- a/crates/adapters/activitypub/src/note/mod.rs +++ b/crates/adapters/activitypub/src/note/mod.rs @@ -74,11 +74,15 @@ pub struct ThoughtNoteInput { impl ThoughtNote { /// Returns `(note, extensions)` if `value` is a Note object, `None` otherwise. - pub fn try_from_ap(value: serde_json::Value) -> Option<(Self, Option)> { - if value.get("type").and_then(|v| v.as_str()) != Some("Note") { + pub fn try_from_ap(mut value: serde_json::Value) -> Option<(Self, Option)> { + let obj_type = value.get("type").and_then(|v| v.as_str()); + if !matches!(obj_type, Some("Note" | "Article" | "Page")) { return None; } let extensions = extract_extensions(&value); + if let Some(obj) = value.as_object_mut() { + obj.insert("type".to_string(), serde_json::json!("Note")); + } serde_json::from_value(value) .ok() .map(|note| (note, extensions)) diff --git a/crates/adapters/activitypub/src/port.rs b/crates/adapters/activitypub/src/port.rs index 75e5cb7..c3f86cc 100644 --- a/crates/adapters/activitypub/src/port.rs +++ b/crates/adapters/activitypub/src/port.rs @@ -101,6 +101,9 @@ pub trait ActivityPubRepository: Send + Sync { /// Returns None for users that have not been federated. async fn get_actor_ap_urls(&self, user_id: &UserId) -> Result, DomainError>; + + /// Sync display_name + avatar_url from remote_actors to users table. + async fn sync_remote_actor_to_user(&self, actor_ap_url: &str) -> Result<(), DomainError>; } #[async_trait] diff --git a/crates/adapters/activitypub/src/service.rs b/crates/adapters/activitypub/src/service.rs index 532255b..2cea3bf 100644 --- a/crates/adapters/activitypub/src/service.rs +++ b/crates/adapters/activitypub/src/service.rs @@ -23,7 +23,8 @@ fn content_to_html(text: &str) -> String { .replace('&', "&") .replace('<', "<") .replace('>', ">") - .replace('"', """); + .replace('"', """) + .replace('\'', "'"); let paragraphs: Vec<&str> = escaped.split('\n').filter(|s| !s.is_empty()).collect(); if paragraphs.is_empty() { format!("

{}

", escaped) @@ -116,13 +117,17 @@ fn k_ap_actor_to_domain(a: k_ap::RemoteActor) -> DomainRemoteActor { last_fetched_at: chrono::Utc::now(), bio: a.bio, banner_url: a.banner_url, - also_known_as: a.also_known_as.into_iter().next(), + also_known_as: a.also_known_as, followers_url: a.followers_url, following_url: a.following_url, + inbox_url: Some(a.inbox_url), + shared_inbox_url: a.shared_inbox_url, attachment: vec![], } } +// TODO: these fetches are unsigned — fails on instances with authorized-fetch (Secure Mode). +// Fix requires exposing k-ap's signed HTTP client. async fn resolve_actor_profiles_from_urls( urls: Vec, ) -> Vec { @@ -201,7 +206,9 @@ async fn webfinger_resolve_actor_url(handle: &str) -> anyhow::Result { .and_then(|links| { links.iter().find(|l| { l["rel"].as_str() == Some("self") - && l["type"].as_str() == Some("application/activity+json") + && l["type"].as_str().is_some_and(|t| { + t == "application/activity+json" || t.starts_with("application/ld+json") + }) }) }) .and_then(|l| l["href"].as_str()) @@ -415,11 +422,8 @@ impl FederationSchedulerPort for ApFederationAdapter { actor_ap_url: &str, collection_url: &str, connection_type: &str, - page: u32, + _page: u32, ) -> Result<(), DomainError> { - if page != 1 { - return Ok(()); - } let actor = actor_ap_url.to_string(); let collection = collection_url.to_string(); let conn_type = connection_type.to_string(); @@ -536,9 +540,15 @@ impl FederationLookupPort for ApFederationAdapter { last_fetched_at: chrono::Utc::now(), bio: actor.bio, banner_url: actor.banner_url.as_ref().map(|u| u.to_string()), - also_known_as: actor.also_known_as.into_iter().next(), + also_known_as: actor + .also_known_as + .into_iter() + .map(|u| u.to_string()) + .collect(), followers_url: actor.followers_url.as_ref().map(|u| u.to_string()), following_url: actor.following_url.as_ref().map(|u| u.to_string()), + inbox_url: None, + shared_inbox_url: None, attachment: actor .attachment .into_iter() @@ -599,20 +609,36 @@ impl FederationFetchPort for ApFederationAdapter { .await .map_err(|e| DomainError::ExternalService(e.to_string()))?; - let url = base["first"] + let first_url = base["first"] .as_str() .map(|s| s.to_string()) - .unwrap_or_else(|| format!("{}?page={}", outbox_url, page)); + .unwrap_or_else(|| format!("{}?page=1", outbox_url)); - let resp: serde_json::Value = client - .get(&url) - .header("Accept", "application/activity+json, application/ld+json") - .send() - .await - .map_err(|e| DomainError::ExternalService(e.to_string()))? - .json() - .await - .map_err(|e| DomainError::ExternalService(e.to_string()))?; + let mut current_url = first_url; + let mut hops = 0u32; + let target_page = page.max(1); + let max_hops = 10u32; + + let resp: serde_json::Value = loop { + let page_resp: serde_json::Value = client + .get(¤t_url) + .header("Accept", "application/activity+json, application/ld+json") + .send() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))? + .json() + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + + hops += 1; + if hops >= target_page || hops >= max_hops { + break page_resp; + } + match page_resp["next"].as_str() { + Some(next) => current_url = next.to_string(), + None => break page_resp, + } + }; let empty = vec![]; let items = resp["orderedItems"].as_array().unwrap_or(&empty); @@ -850,4 +876,33 @@ impl FederationFollowRequestPort for ApFederationAdapter { } } +// ── FederationBlockPort ────────────────────────────────────────────────────── + +#[async_trait] +impl domain::ports::FederationBlockPort for ApFederationAdapter { + async fn block_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError> { + let actor_url = webfinger_resolve_actor_url(handle) + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + self.inner + .block_actor(local_user_id.as_uuid(), &actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } + + async fn unblock_remote( + &self, + local_user_id: &UserId, + handle: &str, + ) -> Result<(), DomainError> { + let actor_url = webfinger_resolve_actor_url(handle) + .await + .map_err(|e| DomainError::ExternalService(e.to_string()))?; + self.inner + .unblock_actor(local_user_id.as_uuid(), &actor_url) + .await + .map_err(|e| DomainError::ExternalService(e.to_string())) + } +} + // FederationActionPort is a blanket supertrait; no explicit impl needed. diff --git a/crates/adapters/activitypub/src/urls/mod.rs b/crates/adapters/activitypub/src/urls/mod.rs index 513c078..65a1b71 100644 --- a/crates/adapters/activitypub/src/urls/mod.rs +++ b/crates/adapters/activitypub/src/urls/mod.rs @@ -11,24 +11,24 @@ impl ThoughtsUrls { } } - pub fn user_url(&self, username: &str) -> Url { - Url::parse(&format!("{}/users/{}", self.base_url, username)).expect("valid URL") + pub fn user_url(&self, id: &str) -> Url { + Url::parse(&format!("{}/users/{}", self.base_url, id)).expect("valid URL") } pub fn thought_url(&self, thought_id: uuid::Uuid) -> Url { Url::parse(&format!("{}/thoughts/{}", self.base_url, thought_id)).expect("valid URL") } - pub fn user_inbox(&self, username: &str) -> Url { - Url::parse(&format!("{}/users/{}/inbox", self.base_url, username)).expect("valid URL") + pub fn user_inbox(&self, id: &str) -> Url { + Url::parse(&format!("{}/users/{}/inbox", self.base_url, id)).expect("valid URL") } - pub fn user_outbox(&self, username: &str) -> Url { - Url::parse(&format!("{}/users/{}/outbox", self.base_url, username)).expect("valid URL") + pub fn user_outbox(&self, id: &str) -> Url { + Url::parse(&format!("{}/users/{}/outbox", self.base_url, id)).expect("valid URL") } - pub fn user_followers(&self, username: &str) -> Url { - Url::parse(&format!("{}/users/{}/followers", self.base_url, username)).expect("valid URL") + pub fn user_followers(&self, id: &str) -> Url { + Url::parse(&format!("{}/users/{}/followers", self.base_url, id)).expect("valid URL") } } diff --git a/crates/adapters/postgres-federation/src/lib.rs b/crates/adapters/postgres-federation/src/lib.rs index f00ddc6..38a50e6 100644 --- a/crates/adapters/postgres-federation/src/lib.rs +++ b/crates/adapters/postgres-federation/src/lib.rs @@ -222,13 +222,12 @@ impl FollowRepository for PostgresFederationRepository { } async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result { - let n: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1 AND status='accepted'", - ) - .bind(local_user_id) - .fetch_one(&self.pool) - .await - .map_err(|e| anyhow!(e))?; + let n: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM federation_followers WHERE local_user_id=$1") + .bind(local_user_id) + .fetch_one(&self.pool) + .await + .map_err(|e| anyhow!(e))?; Ok(n as usize) } @@ -428,11 +427,24 @@ impl FollowRepository for PostgresFederationRepository { async fn update_following_status( &self, - _local_user_id: uuid::Uuid, - _remote_actor_url: &str, - _status: FollowingStatus, + local_user_id: uuid::Uuid, + remote_actor_url: &str, + status: FollowingStatus, ) -> Result<()> { - Ok(()) + let s = match status { + FollowingStatus::Pending => "pending", + FollowingStatus::Accepted => "accepted", + }; + sqlx::query( + "UPDATE federation_following SET status=$3 WHERE local_user_id=$1 AND remote_actor_url=$2", + ) + .bind(local_user_id) + .bind(remote_actor_url) + .bind(s) + .execute(&self.pool) + .await + .map_err(|e| anyhow!(e)) + .map(|_| ()) } async fn get_following_outbox_url( @@ -743,7 +755,7 @@ impl PostgresApUserRepository { } fn row_to_ap_user(&self, r: UserRow) -> ApUser { - let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, r.username)).ok(); + let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, r.id)).ok(); let avatar_url = r.avatar_url.and_then(|u| url::Url::parse(&u).ok()); let banner_url = r.header_url.and_then(|u| url::Url::parse(&u).ok()); ApUser { diff --git a/crates/adapters/postgres/migrations/017_widen_username.sql b/crates/adapters/postgres/migrations/017_widen_username.sql new file mode 100644 index 0000000..159f042 --- /dev/null +++ b/crates/adapters/postgres/migrations/017_widen_username.sql @@ -0,0 +1 @@ +ALTER TABLE users ALTER COLUMN username TYPE VARCHAR(255); diff --git a/crates/adapters/postgres/migrations/018_federation_following_status.sql b/crates/adapters/postgres/migrations/018_federation_following_status.sql new file mode 100644 index 0000000..06b66c3 --- /dev/null +++ b/crates/adapters/postgres/migrations/018_federation_following_status.sql @@ -0,0 +1,2 @@ +ALTER TABLE federation_following + ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'accepted'; diff --git a/crates/adapters/postgres/migrations/019_remote_actor_attachment.sql b/crates/adapters/postgres/migrations/019_remote_actor_attachment.sql new file mode 100644 index 0000000..16adb18 --- /dev/null +++ b/crates/adapters/postgres/migrations/019_remote_actor_attachment.sql @@ -0,0 +1 @@ +ALTER TABLE remote_actors ADD COLUMN IF NOT EXISTS attachment JSONB DEFAULT '[]'::jsonb; diff --git a/crates/adapters/postgres/src/activitypub/mod.rs b/crates/adapters/postgres/src/activitypub/mod.rs index e91557f..111aaf1 100644 --- a/crates/adapters/postgres/src/activitypub/mod.rs +++ b/crates/adapters/postgres/src/activitypub/mod.rs @@ -1,7 +1,7 @@ use crate::db_error::IntoDbResult; use async_trait::async_trait; -const MAX_REMOTE_CONTENT_CHARS: usize = 500; +const MAX_REMOTE_CONTENT_CHARS: usize = 5000; const THOUGHTS_PATH_PREFIX: &str = "/thoughts/"; use chrono::{DateTime, Utc}; use sqlx::PgPool; @@ -155,24 +155,28 @@ impl ActivityPubRepository for PgActivityPubRepository { return Ok(id); } let new_id = uuid::Uuid::new_v4(); - // Use the last path segment as username (e.g. /users/alice → "alice"). - // Falls back to a random short id for long segments (e.g. UUID-based actor URLs). - // username column is VARCHAR(32). - let last_seg = url::Url::parse(actor_ap_url) - .ok() + let parsed = url::Url::parse(actor_ap_url).ok(); + let domain_str = parsed + .as_ref() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + let last_seg = parsed .and_then(|u| { u.path_segments() .and_then(|mut s| s.next_back().map(|s| s.to_string())) }) .unwrap_or_default(); - let handle = if last_seg.is_empty() { - format!("remote_{}", &new_id.to_string()[..13]) - } else if last_seg.len() <= 32 { - last_seg + let handle = if last_seg.is_empty() || domain_str.is_empty() { + format!("r_{}", &new_id.to_string()[..13]) } else { - format!("remote_{}", &new_id.to_string()[..13]) + let candidate = format!("{}@{}", last_seg, domain_str); + if candidate.len() <= 255 { + candidate + } else { + format!("r_{}", &new_id.to_string()[..13]) + } }; - sqlx::query( + let result = sqlx::query( "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING", ) @@ -181,9 +185,24 @@ impl ActivityPubRepository for PgActivityPubRepository { .bind(format!("{}@remote", new_id)) .bind(actor_ap_url) .execute(&self.pool) - .await - .into_domain()?; - // Re-fetch to get whichever id won the race + .await; + + if result.is_err() { + let fallback = format!("r_{}", &new_id.to_string()[..13]); + let new_id2 = uuid::Uuid::new_v4(); + sqlx::query( + "INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at) + VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING", + ) + .bind(new_id2) + .bind(&fallback) + .bind(format!("{}@remote", new_id2)) + .bind(actor_ap_url) + .execute(&self.pool) + .await + .into_domain()?; + } + self.find_remote_actor_id(actor_ap_url) .await? .ok_or_else(|| { @@ -345,6 +364,19 @@ impl ActivityPubRepository for PgActivityPubRepository { .into_domain() .map(|opt| opt.map(|(ap_id, inbox_url)| ActorApUrls { ap_id, inbox_url })) } + + async fn sync_remote_actor_to_user(&self, actor_ap_url: &str) -> Result<(), DomainError> { + sqlx::query( + "UPDATE users SET display_name = ra.display_name, avatar_url = ra.avatar_url, updated_at = NOW() + FROM remote_actors ra + WHERE users.ap_id = ra.url AND users.ap_id = $1 AND users.local = false", + ) + .bind(actor_ap_url) + .execute(&self.pool) + .await + .into_domain() + .map(|_| ()) + } } #[cfg(test)] diff --git a/crates/adapters/postgres/src/feed/mod.rs b/crates/adapters/postgres/src/feed/mod.rs index 8960f49..678e95e 100644 --- a/crates/adapters/postgres/src/feed/mod.rs +++ b/crates/adapters/postgres/src/feed/mod.rs @@ -113,14 +113,14 @@ impl<'a> FeedSqlBuilder<'a> { } } - fn select(&self) -> String { + fn select(&self, viewer_param: &str) -> String { let (viewer_cols, viewer_joins) = match self.viewer { - Some(uid) => ( + Some(_) => ( "(lv.thought_id IS NOT NULL) AS liked_by_viewer, (bv.thought_id IS NOT NULL) AS boosted_by_viewer".to_string(), format!( - "LEFT JOIN (SELECT thought_id FROM likes WHERE user_id='{uid}') lv ON lv.thought_id = t.id - LEFT JOIN (SELECT thought_id FROM boosts WHERE user_id='{uid}') bv ON bv.thought_id = t.id" + "LEFT JOIN (SELECT thought_id FROM likes WHERE user_id={viewer_param}) lv ON lv.thought_id = t.id + LEFT JOIN (SELECT thought_id FROM boosts WHERE user_id={viewer_param}) bv ON bv.thought_id = t.id" ), ), None => ( @@ -164,13 +164,13 @@ impl<'a> FeedSqlBuilder<'a> { ) } - fn fed_clause(&self) -> String { + fn fed_clause(&self, viewer_param: &str) -> String { match self.viewer { - Some(fid) => format!( + Some(_) => format!( " OR t.user_id IN ( SELECT u2.id FROM users u2 JOIN federation_following ff ON u2.ap_id = ff.remote_actor_url - WHERE ff.local_user_id = '{fid}' + WHERE ff.local_user_id = {viewer_param} )" ), None => String::new(), @@ -217,7 +217,7 @@ impl<'a> FeedSqlBuilder<'a> { ); let data = format!( "{} WHERE t.local=true AND t.visibility='public'{} {} LIMIT $1 OFFSET $2", - self.select(), + self.select("$3"), filter, order ); @@ -225,17 +225,16 @@ impl<'a> FeedSqlBuilder<'a> { } fn home(&self) -> (String, String) { - let fed = self.fed_clause(); let filter = self.filter_sql(); let order = self.order_sql(); - let count = format!( + let count = format!( "SELECT COUNT(*) FROM thoughts t WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'{}", - fed, filter + self.fed_clause("$2"), filter ); let data = format!( "{} WHERE (t.user_id=ANY($1){}) AND t.visibility != 'direct'{} {} LIMIT $2 OFFSET $3", - self.select(), fed, filter, order + self.select("$4"), self.fed_clause("$4"), filter, order ); (count, data) } @@ -249,7 +248,7 @@ impl<'a> FeedSqlBuilder<'a> { ); let data = format!( "{} WHERE t.content % $1 AND t.visibility='public'{} {} LIMIT $2 OFFSET $3", - self.select(), + self.select("$4"), filter, order ); @@ -271,7 +270,7 @@ impl<'a> FeedSqlBuilder<'a> { JOIN thought_tags tt ON tt.thought_id = t.id JOIN tags tg ON tg.id = tt.tag_id WHERE tg.name = $1 AND t.visibility = 'public'{} {} LIMIT $2 OFFSET $3", - self.select(), + self.select("$4"), filter, order ); @@ -287,7 +286,7 @@ impl<'a> FeedSqlBuilder<'a> { ); let data = format!( "{} WHERE t.user_id = $1 AND ($4::uuid = $1 OR (t.visibility != 'direct' AND (t.visibility IN ('public', 'unlisted') OR (t.visibility = 'followers' AND EXISTS(SELECT 1 FROM follows WHERE follower_id = $4 AND following_id = $1 AND state = 'accepted'))))){} {} LIMIT $2 OFFSET $3", - self.select(), filter, order + self.select("$4"), filter, order ); (count, data) } @@ -300,12 +299,15 @@ impl FeedRepository for PgFeedRepository { let page = &req.query.page; let builder = FeedSqlBuilder::new(&req.options, &req.query.scope, viewer); + let viewer_uuid = viewer.unwrap_or(uuid::Uuid::nil()); + match &req.query.scope { FeedScope::Home { following_ids } => { let ids: Vec = following_ids.iter().map(|id| id.as_uuid()).collect(); let (count_sql, data_sql) = builder.home(); let total: i64 = sqlx::query_scalar(&count_sql) .bind(&ids) + .bind(viewer_uuid) .fetch_one(&self.pool) .await .into_domain()?; @@ -313,6 +315,7 @@ impl FeedRepository for PgFeedRepository { .bind(&ids) .bind(page.limit()) .bind(page.offset()) + .bind(viewer_uuid) .fetch_all(&self.pool) .await .into_domain()?; @@ -336,6 +339,7 @@ impl FeedRepository for PgFeedRepository { let rows = sqlx::query_as::<_, FeedRow>(&data_sql) .bind(page.limit()) .bind(page.offset()) + .bind(viewer_uuid) .fetch_all(&self.pool) .await .into_domain()?; @@ -361,6 +365,7 @@ impl FeedRepository for PgFeedRepository { .bind(query) .bind(page.limit()) .bind(page.offset()) + .bind(viewer_uuid) .fetch_all(&self.pool) .await .into_domain()?; @@ -386,6 +391,7 @@ impl FeedRepository for PgFeedRepository { .bind(tag_name) .bind(page.limit()) .bind(page.offset()) + .bind(viewer_uuid) .fetch_all(&self.pool) .await .into_domain()?; @@ -402,7 +408,6 @@ impl FeedRepository for PgFeedRepository { FeedScope::User { user_id } => { let uid = user_id.as_uuid(); - let viewer_uuid = viewer.unwrap_or(uuid::Uuid::nil()); let (count_sql, data_sql) = builder.user(); let total: i64 = sqlx::query_scalar(&count_sql) .bind(uid) diff --git a/crates/adapters/postgres/src/remote_actor.rs b/crates/adapters/postgres/src/remote_actor.rs index 6f4aecf..3987b52 100644 --- a/crates/adapters/postgres/src/remote_actor.rs +++ b/crates/adapters/postgres/src/remote_actor.rs @@ -18,14 +18,44 @@ impl PgRemoteActorRepository { #[async_trait] impl RemoteActorRepository for PgRemoteActorRepository { async fn upsert(&self, a: &RemoteActor) -> Result<(), DomainError> { + let also_known_as: Option> = if a.also_known_as.is_empty() { + None + } else { + Some(a.also_known_as.iter().map(|s| s.as_str()).collect()) + }; + let attachment_json: serde_json::Value = a + .attachment + .iter() + .map(|(n, v)| serde_json::json!({"name": n, "value": v})) + .collect(); sqlx::query( - "INSERT INTO remote_actors(url,handle,display_name,avatar_url,last_fetched_at) - VALUES($1,$2,$3,$4,$5) - ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, - avatar_url=EXCLUDED.avatar_url,last_fetched_at=EXCLUDED.last_fetched_at" + "INSERT INTO remote_actors(url,handle,display_name,avatar_url,last_fetched_at, + bio,banner_url,outbox_url,followers_url,following_url,also_known_as,attachment) + VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) + ON CONFLICT(url) DO UPDATE SET + handle=EXCLUDED.handle,display_name=EXCLUDED.display_name, + avatar_url=EXCLUDED.avatar_url,last_fetched_at=EXCLUDED.last_fetched_at, + bio=EXCLUDED.bio,banner_url=EXCLUDED.banner_url, + outbox_url=EXCLUDED.outbox_url,followers_url=EXCLUDED.followers_url, + following_url=EXCLUDED.following_url,also_known_as=EXCLUDED.also_known_as, + attachment=EXCLUDED.attachment", ) - .bind(&a.url).bind(&a.handle).bind(&a.display_name).bind(&a.avatar_url).bind(a.last_fetched_at) - .execute(&self.pool).await.into_domain().map(|_| ()) + .bind(&a.url) + .bind(&a.handle) + .bind(&a.display_name) + .bind(&a.avatar_url) + .bind(a.last_fetched_at) + .bind(&a.bio) + .bind(&a.banner_url) + .bind(&a.outbox_url) + .bind(&a.followers_url) + .bind(&a.following_url) + .bind(also_known_as.as_deref()) + .bind(&attachment_json) + .execute(&self.pool) + .await + .into_domain() + .map(|_| ()) } async fn find_by_url(&self, url: &str) -> Result, DomainError> { @@ -36,24 +66,55 @@ impl RemoteActorRepository for PgRemoteActorRepository { display_name: Option, avatar_url: Option, last_fetched_at: DateTime, + bio: Option, + banner_url: Option, + outbox_url: Option, + followers_url: Option, + following_url: Option, + also_known_as: Option>, + inbox_url: Option, + shared_inbox_url: Option, + attachment: Option, } sqlx::query_as::<_, Row>( - "SELECT url,handle,display_name,avatar_url,last_fetched_at FROM remote_actors WHERE url=$1" - ).bind(url).fetch_optional(&self.pool).await + "SELECT url,handle,display_name,avatar_url,last_fetched_at, + bio,banner_url,outbox_url,followers_url,following_url,also_known_as, + inbox_url,shared_inbox_url,attachment + FROM remote_actors WHERE url=$1", + ) + .bind(url) + .fetch_optional(&self.pool) + .await .into_domain() - .map(|o| o.map(|r| RemoteActor { - url: r.url, - handle: r.handle, - display_name: r.display_name, - avatar_url: r.avatar_url, - last_fetched_at: r.last_fetched_at, - bio: None, - banner_url: None, - also_known_as: None, - outbox_url: None, - followers_url: None, - following_url: None, - attachment: vec![], - })) + .map(|o| { + o.map(|r| RemoteActor { + url: r.url, + handle: r.handle, + display_name: r.display_name, + avatar_url: r.avatar_url, + last_fetched_at: r.last_fetched_at, + bio: r.bio, + banner_url: r.banner_url, + also_known_as: r.also_known_as.unwrap_or_default(), + outbox_url: r.outbox_url, + followers_url: r.followers_url, + following_url: r.following_url, + inbox_url: r.inbox_url, + shared_inbox_url: r.shared_inbox_url, + attachment: r + .attachment + .and_then(|v| v.as_array().cloned()) + .map(|arr| { + arr.into_iter() + .filter_map(|item| { + let name = item.get("name")?.as_str()?.to_string(); + let value = item.get("value")?.as_str()?.to_string(); + Some((name, value)) + }) + .collect() + }) + .unwrap_or_default(), + }) + }) } } diff --git a/crates/api-types/src/responses.rs b/crates/api-types/src/responses.rs index 6d68117..f145c3f 100644 --- a/crates/api-types/src/responses.rs +++ b/crates/api-types/src/responses.rs @@ -114,7 +114,7 @@ pub struct RemoteActorResponse { pub url: String, pub bio: Option, pub banner_url: Option, - pub also_known_as: Option, + pub also_known_as: Vec, pub outbox_url: Option, pub followers_url: Option, pub following_url: Option, diff --git a/crates/application/src/testing.rs b/crates/application/src/testing.rs index 6354ea4..83a42f4 100644 --- a/crates/application/src/testing.rs +++ b/crates/application/src/testing.rs @@ -136,4 +136,7 @@ impl ActivityPubRepository for TestApRepo { ) -> Result, DomainError> { Ok(self.actor_ap_urls.lock().unwrap().get(user_id).cloned()) } + async fn sync_remote_actor_to_user(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + Ok(()) + } } diff --git a/crates/application/src/use_cases/federation_management/mod.rs b/crates/application/src/use_cases/federation_management/mod.rs index b75ae66..7a5f963 100644 --- a/crates/application/src/use_cases/federation_management/mod.rs +++ b/crates/application/src/use_cases/federation_management/mod.rs @@ -44,16 +44,14 @@ pub async fn accept_follow_request( user_id: &UserId, actor_url: &str, ) -> Result<(), DomainError> { - federation - .mark_follower_accepted(user_id, actor_url) - .await?; events .publish(&DomainEvent::RemoteFollowAccepted { local_user_id: user_id.clone(), remote_actor_url: actor_url.to_string(), }) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .map_err(|e| DomainError::Internal(e.to_string()))?; + federation.mark_follower_accepted(user_id, actor_url).await } pub async fn reject_follow_request( @@ -62,16 +60,14 @@ pub async fn reject_follow_request( user_id: &UserId, actor_url: &str, ) -> Result<(), DomainError> { - federation - .mark_follower_rejected(user_id, actor_url) - .await?; events .publish(&DomainEvent::RemoteFollowRejected { local_user_id: user_id.clone(), remote_actor_url: actor_url.to_string(), }) .await - .map_err(|e| DomainError::Internal(e.to_string())) + .map_err(|e| DomainError::Internal(e.to_string()))?; + federation.mark_follower_rejected(user_id, actor_url).await } pub async fn list_remote_followers( @@ -179,8 +175,9 @@ pub async fn get_actor_connections_page( } }; if stale { + // Always fetch from page 1 — the full collection is fetched and chunked. let _ = scheduler - .schedule_connections_fetch(&actor.url, &collection_url, connection_type, page) + .schedule_connections_fetch(&actor.url, &collection_url, connection_type, 1) .await; } let has_more = items.len() >= PAGE_SIZE; diff --git a/crates/application/src/use_cases/federation_management/tests.rs b/crates/application/src/use_cases/federation_management/tests.rs index 76c8207..ff55763 100644 --- a/crates/application/src/use_cases/federation_management/tests.rs +++ b/crates/application/src/use_cases/federation_management/tests.rs @@ -11,10 +11,12 @@ fn remote_actor(url: &str, handle: &str) -> RemoteActor { avatar_url: None, bio: None, banner_url: None, - also_known_as: None, + also_known_as: vec![], outbox_url: None, followers_url: None, following_url: None, + inbox_url: None, + shared_inbox_url: None, attachment: vec![], last_fetched_at: Utc::now(), } diff --git a/crates/application/src/use_cases/social/mod.rs b/crates/application/src/use_cases/social/mod.rs index fd6b31e..cf05706 100644 --- a/crates/application/src/use_cases/social/mod.rs +++ b/crates/application/src/use_cases/social/mod.rs @@ -8,8 +8,8 @@ use domain::{ user::User, }, ports::{ - BlockRepository, BoostRepository, EventPublisher, FederationFollowPort, FollowRepository, - LikeRepository, UserReader, + BlockRepository, BoostRepository, EventPublisher, FederationBlockPort, + FederationFollowPort, FollowRepository, LikeRepository, UserReader, }, value_objects::{BoostId, LikeId, ThoughtId, UserId, Username}, }; @@ -217,10 +217,14 @@ pub async fn reject_follow( pub async fn block_by_username( blocks: &dyn BlockRepository, users: &dyn UserReader, + federation: &dyn FederationBlockPort, events: &dyn EventPublisher, blocker_id: &UserId, username: &str, ) -> Result<(), DomainError> { + if username.contains('@') { + return federation.block_remote(blocker_id, username).await; + } let uname = Username::new(username).map_err(|_| DomainError::NotFound)?; let target = users .find_by_username(&uname) @@ -232,10 +236,14 @@ pub async fn block_by_username( pub async fn unblock_by_username( blocks: &dyn BlockRepository, users: &dyn UserReader, + federation: &dyn FederationBlockPort, events: &dyn EventPublisher, blocker_id: &UserId, username: &str, ) -> Result<(), DomainError> { + if username.contains('@') { + return federation.unblock_remote(blocker_id, username).await; + } let uname = Username::new(username).map_err(|_| DomainError::NotFound)?; let target = users .find_by_username(&uname) diff --git a/crates/bootstrap/src/factory.rs b/crates/bootstrap/src/factory.rs index 0b9899f..1291679 100644 --- a/crates/bootstrap/src/factory.rs +++ b/crates/bootstrap/src/factory.rs @@ -126,11 +126,17 @@ pub async fn build(cfg: &Config) -> Infrastructure { // 3. ActivityPub federation let connections_repo = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())); let fed_repo = Arc::new(PostgresFederationRepository::new(pool.clone())); + let likes: Arc = + Arc::new(postgres::like::PgLikeRepository::new(pool.clone())); + let boosts: Arc = + Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())); let ap_handler = Arc::new(ThoughtsObjectHandler::new( Arc::new(PgActivityPubRepository::new(pool.clone())), &cfg.base_url, Some(event_publisher.clone()), Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), + likes.clone(), + boosts.clone(), )); let mut ap_builder = ActivityPubService::builder(cfg.base_url.clone()) .activity_repo(fed_repo.clone()) @@ -181,8 +187,8 @@ pub async fn build(cfg: &Config) -> Infrastructure { let state = AppState { users: Arc::new(postgres::user::PgUserRepository::new(pool.clone())), thoughts: Arc::new(postgres::thought::PgThoughtRepository::new(pool.clone())), - likes: Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), - boosts: Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), + likes: likes.clone(), + boosts: boosts.clone(), follows: Arc::new(postgres::follow::PgFollowRepository::new(pool.clone())), blocks: Arc::new(postgres::block::PgBlockRepository::new(pool.clone())), tags: Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), diff --git a/crates/bootstrap/src/main.rs b/crates/bootstrap/src/main.rs index 1b6f9b3..efea3db 100644 --- a/crates/bootstrap/src/main.rs +++ b/crates/bootstrap/src/main.rs @@ -35,8 +35,13 @@ async fn main() { .allow_headers(tower_http::cors::Any) }; + let ap_router = infra + .ap_service + .router::() + .layer(axum::extract::DefaultBodyLimit::max(256 * 1024)); + let base = presentation::routes::router() - .merge(infra.ap_service.router::()) + .merge(ap_router) .with_state(infra.state) .layer(cors); diff --git a/crates/domain/src/models/remote_actor.rs b/crates/domain/src/models/remote_actor.rs index 97ab795..c78ff0a 100644 --- a/crates/domain/src/models/remote_actor.rs +++ b/crates/domain/src/models/remote_actor.rs @@ -8,10 +8,12 @@ pub struct RemoteActor { pub avatar_url: Option, pub bio: Option, pub banner_url: Option, - pub also_known_as: Option, + pub also_known_as: Vec, pub outbox_url: Option, pub followers_url: Option, pub following_url: Option, + pub inbox_url: Option, + pub shared_inbox_url: Option, pub attachment: Vec<(String, String)>, pub last_fetched_at: DateTime, } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index fb24157..ac10335 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -360,15 +360,27 @@ pub trait FederationFetchPort: Send + Sync { ) -> Vec; } +#[async_trait] +pub trait FederationBlockPort: Send + Sync { + async fn block_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError>; + async fn unblock_remote(&self, local_user_id: &UserId, handle: &str) + -> Result<(), DomainError>; +} + pub trait FederationActionPort: - FederationLookupPort + FederationFollowPort + FederationFollowRequestPort + FederationFetchPort + FederationLookupPort + + FederationFollowPort + + FederationFollowRequestPort + + FederationFetchPort + + FederationBlockPort { } impl< T: FederationLookupPort + FederationFollowPort + FederationFollowRequestPort - + FederationFetchPort, + + FederationFetchPort + + FederationBlockPort, > FederationActionPort for T { } diff --git a/crates/domain/src/testing/mod.rs b/crates/domain/src/testing/mod.rs index 30e0a69..6a38312 100644 --- a/crates/domain/src/testing/mod.rs +++ b/crates/domain/src/testing/mod.rs @@ -846,6 +846,24 @@ impl FederationFetchPort for TestStore { } } +#[async_trait] +impl FederationBlockPort for TestStore { + async fn block_remote( + &self, + _local_user_id: &UserId, + _handle: &str, + ) -> Result<(), DomainError> { + Ok(()) + } + async fn unblock_remote( + &self, + _local_user_id: &UserId, + _handle: &str, + ) -> Result<(), DomainError> { + Ok(()) + } +} + #[async_trait] impl RemoteActorConnectionRepository for TestStore { async fn upsert_connections( diff --git a/crates/presentation/src/handlers/mod.rs b/crates/presentation/src/handlers/mod.rs index 811e515..12e6ebd 100644 --- a/crates/presentation/src/handlers/mod.rs +++ b/crates/presentation/src/handlers/mod.rs @@ -9,3 +9,4 @@ pub mod notifications; pub mod social; pub mod thoughts; pub mod users; +pub mod well_known; diff --git a/crates/presentation/src/handlers/social/mod.rs b/crates/presentation/src/handlers/social/mod.rs index 79a6ff2..92c746b 100644 --- a/crates/presentation/src/handlers/social/mod.rs +++ b/crates/presentation/src/handlers/social/mod.rs @@ -119,7 +119,15 @@ pub async fn post_block( AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { - block_by_username(&*d.blocks, &*d.users, &*d.events, &uid, &username).await?; + block_by_username( + &*d.blocks, + &*d.users, + &*d.federation, + &*d.events, + &uid, + &username, + ) + .await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(delete, path = "/users/{username}/block", params(("username" = String, Path, description = "Username")), responses((status = 204, description = "Unblocked")), security(("bearer_auth" = [])))] @@ -128,7 +136,15 @@ pub async fn delete_block( AuthUser(uid): AuthUser, Path(username): Path, ) -> Result { - unblock_by_username(&*d.blocks, &*d.users, &*d.events, &uid, &username).await?; + unblock_by_username( + &*d.blocks, + &*d.users, + &*d.federation, + &*d.events, + &uid, + &username, + ) + .await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path(put, path = "/users/me/top-friends", request_body = SetTopFriendsRequest, responses((status = 204, description = "Top friends updated")), security(("bearer_auth" = [])))] diff --git a/crates/presentation/src/handlers/well_known.rs b/crates/presentation/src/handlers/well_known.rs new file mode 100644 index 0000000..d445ea8 --- /dev/null +++ b/crates/presentation/src/handlers/well_known.rs @@ -0,0 +1,20 @@ +use axum::{extract::State, http::header, response::IntoResponse}; + +use crate::state::AppState; + +pub async fn host_meta(State(state): State) -> impl IntoResponse { + let domain = url::Url::parse(&state.base_url) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + let body = format!( + r#" + + +"# + ); + ( + [(header::CONTENT_TYPE, "application/xrd+xml; charset=utf-8")], + body, + ) +} diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index b0e0289..4490402 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -135,5 +135,7 @@ pub fn router() -> Router { ) .route("/api-keys/{id}", delete(api_keys::delete_api_key_handler)); - openapi::serve(api_routes).route("/media/{*path}", get(media::get_media)) + openapi::serve(api_routes) + .route("/media/{*path}", get(media::get_media)) + .route("/.well-known/host-meta", get(well_known::host_meta)) } diff --git a/crates/presentation/src/testing.rs b/crates/presentation/src/testing.rs index d2ec459..26080d7 100644 --- a/crates/presentation/src/testing.rs +++ b/crates/presentation/src/testing.rs @@ -97,6 +97,9 @@ impl ActivityPubRepository for NoOpApRepo { ) -> Result, DomainError> { Ok(None) } + async fn sync_remote_actor_to_user(&self, _actor_ap_url: &str) -> Result<(), DomainError> { + Ok(()) + } } pub struct NoOpMediaStore; diff --git a/crates/worker/src/factory.rs b/crates/worker/src/factory.rs index e61e195..0c1f28b 100644 --- a/crates/worker/src/factory.rs +++ b/crates/worker/src/factory.rs @@ -50,6 +50,8 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker base_url, None, Arc::new(postgres::tag::PgTagRepository::new(pool.clone())), + Arc::new(postgres::like::PgLikeRepository::new(pool.clone())), + Arc::new(postgres::boost::PgBoostRepository::new(pool.clone())), )); let raw_ap_service = Arc::new( ActivityPubService::builder(base_url.to_string()) diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index ae14274..a67afe9 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -1,6 +1,7 @@ mod dlq; mod factory; mod handlers; +mod outbox_cleanup; mod outbox_relay; use domain::{errors::DomainError, events::DomainEvent}; @@ -39,6 +40,15 @@ async fn main() { .run(), ); + tokio::spawn( + outbox_cleanup::OutboxCleanup { + pool: infra.pool.clone(), + retention_days: 7, + interval: std::time::Duration::from_secs(3600), + } + .run(), + ); + tracing::info!("Worker started, consuming events..."); let mut stream = infra.message_source.messages(); while let Some(result) = stream.next().await { diff --git a/crates/worker/src/outbox_cleanup.rs b/crates/worker/src/outbox_cleanup.rs new file mode 100644 index 0000000..47120f3 --- /dev/null +++ b/crates/worker/src/outbox_cleanup.rs @@ -0,0 +1,31 @@ +use sqlx::PgPool; +use std::time::Duration; + +pub struct OutboxCleanup { + pub pool: PgPool, + pub retention_days: i64, + pub interval: Duration, +} + +impl OutboxCleanup { + pub async fn run(self) { + loop { + tokio::time::sleep(self.interval).await; + match self.cleanup().await { + Ok(n) if n > 0 => tracing::info!(deleted = n, "outbox cleanup: removed old events"), + Err(e) => tracing::warn!(error = %e, "outbox cleanup failed"), + _ => {} + } + } + } + + async fn cleanup(&self) -> Result { + let result = sqlx::query( + "DELETE FROM outbox_events WHERE delivered = true AND delivered_at < NOW() - make_interval(days => $1)", + ) + .bind(self.retention_days as i32) + .execute(&self.pool) + .await?; + Ok(result.rows_affected()) + } +} diff --git a/thoughts-frontend/components/remote-user-profile/profile-card.tsx b/thoughts-frontend/components/remote-user-profile/profile-card.tsx index d573211..1a394d3 100644 --- a/thoughts-frontend/components/remote-user-profile/profile-card.tsx +++ b/thoughts-frontend/components/remote-user-profile/profile-card.tsx @@ -58,17 +58,22 @@ export function ProfileCard({ actor, action }: ProfileCardProps) { - {actor.alsoKnownAs && ( + {actor.alsoKnownAs.length > 0 && (

Also known as:{" "} - - {actor.alsoKnownAs} - + {actor.alsoKnownAs.map((aka, i) => ( + + {i > 0 && ", "} + + {aka} + + + ))}

)} diff --git a/thoughts-frontend/lib/api.ts b/thoughts-frontend/lib/api.ts index cbacdbf..d876147 100644 --- a/thoughts-frontend/lib/api.ts +++ b/thoughts-frontend/lib/api.ts @@ -29,7 +29,7 @@ export const RemoteActorSchema = z.object({ url: z.string(), bio: z.string().nullable(), bannerUrl: z.string().nullable(), - alsoKnownAs: z.string().nullable(), + alsoKnownAs: z.array(z.string()), outboxUrl: z.string().nullable(), followersUrl: z.string().nullable(), followingUrl: z.string().nullable(),