Compare commits

..

17 Commits

Author SHA1 Message Date
fd9e526b81 fix(search): use to_thought_response in search handler — was returning snake_case partial data
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m24s
test / unit (pull_request) Successful in 16m42s
test / integration (pull_request) Failing after 17m1s
2026-05-15 15:44:59 +02:00
6dd4e3366c fix(search): viewer-aware SQL in search_thoughts — ViewerContext now real instead of hardcoded false 2026-05-15 15:24:55 +02:00
48b57abf92 refactor(adapters): update FeedEntry construction to use EngagementStats + ViewerContext 2026-05-15 15:21:48 +02:00
686dc1c91a refactor(domain): FeedEntry — EngagementStats + Option<ViewerContext> sub-structs 2026-05-15 15:20:07 +02:00
693f53b4b6 docs: FeedEntry decoupling implementation plan 2026-05-15 15:18:32 +02:00
a245b7b8b9 docs: FeedEntry decoupling design spec 2026-05-15 15:15:33 +02:00
988f5c75aa fix(tests): use distinct usernames in notification tests 2026-05-15 14:11:58 +02:00
3f6b91c943 refactor(ports): ActivityPubRepository takes &str instead of url::Url — infra type stays in adapter 2026-05-15 14:06:33 +02:00
c76894e527 refactor(domain): remove AP delivery fields (inbox_url, public_key) from domain RemoteActor 2026-05-15 14:02:53 +02:00
1a77e15d70 refactor(application): remove pass-through search use cases — call SearchPort directly 2026-05-15 13:59:30 +02:00
f697267828 fix(domain): from_db_str returns Result — unknown DB values are errors not silent defaults 2026-05-15 13:57:38 +02:00
5a64dd361c refactor(domain): algebraic NotificationKind — invalid states now unrepresentable 2026-05-15 13:53:53 +02:00
189901b778 refactor(ports): CQRS split — FederationActionPort into four focused sub-ports 2026-05-15 13:49:58 +02:00
8ed7f3d5bc refactor(ports): CQRS split — UserRepository = UserReader + UserWriter supertrait 2026-05-15 13:43:43 +02:00
a902154777 refactor(domain): remove FetchRemoteActorPosts/FetchActorConnections from DomainEvent; add FederationSchedulerPort 2026-05-15 13:28:19 +02:00
e935c8973e refactor(domain): remove ap_id/inbox_url from User and Thought; use ActivityPubRepository lookups 2026-05-15 13:21:21 +02:00
bf3e336d0f feat(ports): add get_thought_ap_id and get_actor_ap_urls to ActivityPubRepository 2026-05-15 13:09:37 +02:00
46 changed files with 2812 additions and 908 deletions

View File

@@ -74,6 +74,7 @@ fn thought_note_json(
thought: &domain::models::thought::Thought,
local_actor: &crate::actors::DbActor,
base_url: &str,
in_reply_to_url: Option<&str>,
) -> anyhow::Result<(url::Url, serde_json::Value)> {
let ap_id = url::Url::parse(&format!("{}/thoughts/{}", base_url, thought.id))?;
@@ -107,7 +108,7 @@ fn thought_note_json(
if let Some(ref cw) = thought.content_warning {
note["summary"] = serde_json::json!(cw);
}
if let Some(ref reply_url) = thought.in_reply_to_url {
if let Some(reply_url) = in_reply_to_url {
note["inReplyTo"] = serde_json::json!(reply_url);
}
if let Some(updated_at) = thought.updated_at {
@@ -1301,11 +1302,8 @@ impl ActivityPubService {
url: a.url,
handle: a.handle,
display_name: a.display_name,
inbox_url: a.inbox_url,
shared_inbox_url: a.shared_inbox_url,
avatar_url: a.avatar_url,
outbox_url: a.outbox_url,
public_key: String::new(),
last_fetched_at: chrono::Utc::now(),
bio: None,
banner_url: None,
@@ -1415,6 +1413,7 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
author_user_id: &domain::value_objects::UserId,
thought: &domain::models::thought::Thought,
_author_username: &str,
in_reply_to_url: Option<&str>,
) -> Result<(), domain::errors::DomainError> {
let user_uuid = author_user_id.as_uuid();
let data = self.federation_config.to_request_data();
@@ -1426,8 +1425,9 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
return Ok(());
};
let (ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
let (ap_id, note) =
thought_note_json(thought, &local_actor, &self.base_url, in_reply_to_url)
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
let create = crate::activities::CreateActivity {
id: ap_id,
@@ -1476,6 +1476,7 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
author_user_id: &domain::value_objects::UserId,
thought: &domain::models::thought::Thought,
_author_username: &str,
in_reply_to_url: Option<&str>,
) -> Result<(), domain::errors::DomainError> {
let user_uuid = author_user_id.as_uuid();
let data = self.federation_config.to_request_data();
@@ -1487,8 +1488,9 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
return Ok(());
};
let (_ap_id, note) = thought_note_json(thought, &local_actor, &self.base_url)
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
let (_ap_id, note) =
thought_note_json(thought, &local_actor, &self.base_url, in_reply_to_url)
.map_err(|e| domain::errors::DomainError::Internal(e.to_string()))?;
let update_id = url::Url::parse(&format!(
"{}/activities/update/{}",
@@ -1588,7 +1590,40 @@ impl domain::ports::OutboundFederationPort for ActivityPubService {
}
#[async_trait::async_trait]
impl domain::ports::FederationActionPort for ActivityPubService {
impl domain::ports::FederationSchedulerPort for ActivityPubService {
async fn schedule_actor_posts_fetch(
&self,
actor_ap_url: &str,
outbox_url: &str,
) -> Result<(), domain::errors::DomainError> {
tracing::debug!(
actor = actor_ap_url,
outbox = outbox_url,
"schedule_actor_posts_fetch: deferred"
);
Ok(())
}
async fn schedule_connections_fetch(
&self,
actor_ap_url: &str,
collection_url: &str,
connection_type: &str,
page: u32,
) -> Result<(), domain::errors::DomainError> {
tracing::debug!(
actor = actor_ap_url,
collection = collection_url,
connection_type,
page,
"schedule_connections_fetch: deferred"
);
Ok(())
}
}
#[async_trait::async_trait]
impl domain::ports::FederationLookupPort for ActivityPubService {
async fn lookup_actor(
&self,
handle: &str,
@@ -1646,9 +1681,6 @@ impl domain::ports::FederationActionPort for ActivityPubService {
url: actor.ap_id.to_string(),
handle: full_handle,
display_name: Some(actor.username.clone()),
inbox_url: actor.inbox_url.to_string(),
shared_inbox_url: actor.shared_inbox_url.as_ref().map(|u| u.to_string()),
public_key: actor.public_key_pem.clone(),
avatar_url: actor.avatar_url.as_ref().map(|u| u.to_string()),
last_fetched_at: actor.last_refreshed_at,
bio: actor.bio.clone(),
@@ -1665,31 +1697,6 @@ impl domain::ports::FederationActionPort for ActivityPubService {
})
}
async fn follow_remote(
&self,
local_user_id: &domain::value_objects::UserId,
handle: &str,
) -> Result<(), domain::errors::DomainError> {
self.follow(local_user_id.as_uuid(), handle)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
async fn unfollow_remote(
&self,
local_user_id: &domain::value_objects::UserId,
handle: &str,
) -> Result<(), domain::errors::DomainError> {
let data = self.federation_config.to_request_data();
let remote_actor: DbActor = Self::webfinger_https(handle, &data)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let actor_url = remote_actor.ap_id.to_string();
self.unfollow(local_user_id.as_uuid(), &actor_url)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
async fn actor_json(
&self,
user_id: &domain::value_objects::UserId,
@@ -1794,7 +1801,10 @@ impl domain::ports::FederationActionPort for ActivityPubService {
serde_json::to_string(&obj)
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
}
#[async_trait::async_trait]
impl domain::ports::FederationFetchPort for ActivityPubService {
async fn fetch_outbox_page(
&self,
outbox_url: &str,
@@ -1988,7 +1998,48 @@ impl domain::ports::FederationActionPort for ActivityPubService {
})
.collect()
}
}
#[async_trait::async_trait]
impl domain::ports::FederationFollowPort for ActivityPubService {
async fn follow_remote(
&self,
local_user_id: &domain::value_objects::UserId,
handle: &str,
) -> Result<(), domain::errors::DomainError> {
self.follow(local_user_id.as_uuid(), handle)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
async fn unfollow_remote(
&self,
local_user_id: &domain::value_objects::UserId,
handle: &str,
) -> Result<(), domain::errors::DomainError> {
let data = self.federation_config.to_request_data();
let remote_actor: DbActor = Self::webfinger_https(handle, &data)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))?;
let actor_url = remote_actor.ap_id.to_string();
self.unfollow(local_user_id.as_uuid(), &actor_url)
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
async fn get_remote_following(
&self,
user_id: &domain::value_objects::UserId,
) -> Result<Vec<domain::models::remote_actor::RemoteActor>, domain::errors::DomainError> {
self.get_following(user_id.as_uuid())
.await
.map(|v| v.into_iter().map(Self::adapter_actor_to_domain).collect())
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
}
#[async_trait::async_trait]
impl domain::ports::FederationFollowRequestPort for ActivityPubService {
async fn get_pending_followers(
&self,
user_id: &domain::value_objects::UserId,
@@ -2038,16 +2089,6 @@ impl domain::ports::FederationActionPort for ActivityPubService {
.await
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
async fn get_remote_following(
&self,
user_id: &domain::value_objects::UserId,
) -> Result<Vec<domain::models::remote_actor::RemoteActor>, domain::errors::DomainError> {
self.get_following(user_id.as_uuid())
.await
.map(|v| v.into_iter().map(Self::adapter_actor_to_domain).collect())
.map_err(|e| domain::errors::DomainError::ExternalService(e.to_string()))
}
}
#[cfg(test)]

View File

@@ -1,10 +1,28 @@
fn _assert_impl_federation_action_port()
fn _assert_impl_federation_lookup_port()
where
crate::service::ActivityPubService: domain::ports::FederationActionPort,
crate::service::ActivityPubService: domain::ports::FederationLookupPort,
{
}
fn _assert_impl_federation_action_port_connections()
fn _assert_impl_federation_follow_port()
where
crate::service::ActivityPubService: domain::ports::FederationFollowPort,
{
}
fn _assert_impl_federation_follow_request_port()
where
crate::service::ActivityPubService: domain::ports::FederationFollowRequestPort,
{
}
fn _assert_impl_federation_fetch_port()
where
crate::service::ActivityPubService: domain::ports::FederationFetchPort,
{
}
fn _assert_impl_federation_action_port()
where
crate::service::ActivityPubService: domain::ports::FederationActionPort,
{

View File

@@ -117,7 +117,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let note: ThoughtNote = serde_json::from_value(object)?;
let author_id = self
.repo
.intern_remote_actor(actor_url)
.intern_remote_actor(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))?;
@@ -140,14 +140,14 @@ impl ApObjectHandler for ThoughtsObjectHandler {
self.repo
.accept_note(
ap_id,
ap_id.as_str(),
&author_id,
&note.content,
note.published,
note.sensitive,
note.summary,
visibility,
note.in_reply_to.as_ref(),
note.in_reply_to.as_ref().map(|u| u.as_str()),
)
.await
.map_err(|e| anyhow!("{e}"))?;
@@ -198,21 +198,21 @@ impl ApObjectHandler for ThoughtsObjectHandler {
) -> Result<()> {
let note: ThoughtNote = serde_json::from_value(object)?;
self.repo
.apply_note_update(ap_id, &note.content)
.apply_note_update(ap_id.as_str(), &note.content)
.await
.map_err(|e| anyhow!("{e}"))
}
async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> {
self.repo
.retract_note(ap_id)
.retract_note(ap_id.as_str())
.await
.map_err(|e| anyhow!("{e}"))
}
async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> {
self.repo
.retract_actor_notes(actor_url)
.retract_actor_notes(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))
}
@@ -234,7 +234,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self
.repo
.find_remote_actor_id(actor_url)
.find_remote_actor_id(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))?;
@@ -278,7 +278,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self
.repo
.find_remote_actor_id(actor_url)
.find_remote_actor_id(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))?;
@@ -310,7 +310,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
) -> anyhow::Result<()> {
let author_user_id = match self
.repo
.find_remote_actor_id(actor_url)
.find_remote_actor_id(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))?
{
@@ -356,7 +356,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self
.repo
.find_remote_actor_id(actor_url)
.find_remote_actor_id(actor_url.as_str())
.await
.map_err(|e| anyhow!("{e}"))?;

View File

@@ -71,16 +71,6 @@ pub enum EventPayload {
ProfileUpdated {
user_id: String,
},
FetchRemoteActorPosts {
actor_ap_url: String,
outbox_url: String,
},
FetchActorConnections {
actor_ap_url: String,
collection_url: String,
connection_type: String,
page: u32,
},
MentionReceived {
thought_id: String,
mentioned_user_id: String,
@@ -107,8 +97,6 @@ impl EventPayload {
Self::UserUnblocked { .. } => "users.unblocked",
Self::UserRegistered { .. } => "users.registered",
Self::ProfileUpdated { .. } => "users.profile_updated",
Self::FetchRemoteActorPosts { .. } => "federation.fetch_outbox",
Self::FetchActorConnections { .. } => "federation.fetch_connections",
Self::MentionReceived { .. } => "mentions.received",
}
}
@@ -222,24 +210,6 @@ impl From<&DomainEvent> for EventPayload {
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
user_id: user_id.to_string(),
},
DomainEvent::FetchRemoteActorPosts {
actor_ap_url,
outbox_url,
} => Self::FetchRemoteActorPosts {
actor_ap_url: actor_ap_url.clone(),
outbox_url: outbox_url.clone(),
},
DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => Self::FetchActorConnections {
actor_ap_url: actor_ap_url.clone(),
collection_url: collection_url.clone(),
connection_type: connection_type.clone(),
page: *page,
},
DomainEvent::MentionReceived {
thought_id,
mentioned_user_id,
@@ -370,24 +340,6 @@ impl TryFrom<EventPayload> for DomainEvent {
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
},
EventPayload::FetchRemoteActorPosts {
actor_ap_url,
outbox_url,
} => DomainEvent::FetchRemoteActorPosts {
actor_ap_url,
outbox_url,
},
EventPayload::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
},
EventPayload::MentionReceived {
thought_id,
mentioned_user_id,
@@ -481,16 +433,6 @@ mod tests {
EventPayload::UserRegistered {
user_id: "a".into(),
},
EventPayload::FetchRemoteActorPosts {
actor_ap_url: "https://mastodon.social/users/alice".into(),
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
},
EventPayload::FetchActorConnections {
actor_ap_url: "https://mastodon.social/users/alice".into(),
collection_url: "https://mastodon.social/users/alice/followers".into(),
connection_type: "followers".into(),
page: 1,
},
];
let mut subjects: Vec<&str> = samples.iter().map(|p| p.subject()).collect();
subjects.sort();

View File

@@ -29,8 +29,6 @@ struct FeedRow {
t_user_id: uuid::Uuid,
content: String,
in_reply_to_id: Option<uuid::Uuid>,
in_reply_to_url: Option<String>,
t_ap_id: Option<String>,
visibility: String,
content_warning: Option<String>,
sensitive: bool,
@@ -47,39 +45,48 @@ struct FeedRow {
header_url: Option<String>,
custom_css: Option<String>,
author_local: bool,
u_ap_id: Option<String>,
inbox_url: Option<String>,
author_created_at: DateTime<Utc>,
author_updated_at: DateTime<Utc>,
like_count: i64,
boost_count: i64,
reply_count: i64,
liked_by_viewer: bool,
boosted_by_viewer: bool,
}
const FEED_SELECT: &str = "
SELECT
t.id AS thought_id, t.user_id AS t_user_id, t.content,
t.in_reply_to_id, t.in_reply_to_url, t.ap_id AS t_ap_id,
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,
t.created_at AS thought_created_at, t.updated_at,
u.id AS author_id, u.username, u.email, u.password_hash,
u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css,
u.local AS author_local, u.ap_id AS u_ap_id, u.inbox_url,
u.created_at AS author_created_at, u.updated_at AS author_updated_at,
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,
(SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count
FROM thoughts t JOIN users u ON u.id=t.user_id";
fn feed_select(viewer: Option<uuid::Uuid>) -> String {
let viewer_checks = match viewer {
Some(uid) => format!(
"EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer,\n\
EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer"
),
None => "false AS liked_by_viewer, false AS boosted_by_viewer".to_string(),
};
format!(
"\n SELECT\n\
t.id AS thought_id, t.user_id AS t_user_id, t.content,\n\
t.in_reply_to_id,\n\
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,\n\
t.created_at AS thought_created_at, t.updated_at,\n\
u.id AS author_id, u.username, u.email, u.password_hash,\n\
u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css,\n\
u.local AS author_local,\n\
u.created_at AS author_created_at, u.updated_at AS author_updated_at,\n\
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,\n\
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,\n\
(SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count,\n\
{viewer_checks}\n\
FROM thoughts t JOIN users u ON u.id=t.user_id"
)
}
fn row_to_entry(r: FeedRow) -> FeedEntry {
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
let thought = Thought {
id: ThoughtId::from_uuid(r.thought_id),
user_id: UserId::from_uuid(r.t_user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
in_reply_to_url: r.in_reply_to_url,
ap_id: r.t_ap_id,
visibility: Visibility::from_db_str(&r.visibility),
visibility: Visibility::from_db_str(&r.visibility)?,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: r.t_local,
@@ -97,20 +104,22 @@ fn row_to_entry(r: FeedRow) -> FeedEntry {
header_url: r.header_url,
custom_css: r.custom_css,
local: r.author_local,
ap_id: r.u_ap_id,
inbox_url: r.inbox_url,
created_at: r.author_created_at,
updated_at: r.author_updated_at,
};
FeedEntry {
Ok(FeedEntry {
thought,
author,
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
liked_by_viewer: false,
boosted_by_viewer: false,
}
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
liked: r.liked_by_viewer,
boosted: r.boosted_by_viewer,
}),
})
}
#[async_trait]
@@ -119,8 +128,11 @@ impl SearchPort for PgSearchRepository {
&self,
query: &str,
page: &PageParams,
_viewer_id: Option<&UserId>,
viewer_id: Option<&UserId>,
) -> Result<Paginated<FeedEntry>, DomainError> {
let viewer = viewer_id.map(|v| v.as_uuid());
let select = feed_select(viewer);
let total: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM thoughts t
WHERE t.content % $1 AND t.visibility='public'",
@@ -131,7 +143,7 @@ impl SearchPort for PgSearchRepository {
.map_err(|e| DomainError::Internal(e.to_string()))?;
let sql = format!(
"{FEED_SELECT}
"{select}
WHERE t.content % $1 AND t.visibility='public'
ORDER BY similarity(t.content, $1) DESC
LIMIT $2 OFFSET $3"
@@ -145,7 +157,10 @@ impl SearchPort for PgSearchRepository {
.map_err(|e| DomainError::Internal(e.to_string()))?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -197,7 +212,7 @@ mod tests {
thought::{Thought, Visibility},
user::User,
},
ports::{SearchPort, ThoughtRepository, UserRepository},
ports::{SearchPort, ThoughtRepository, UserWriter},
value_objects::*,
};
@@ -291,4 +306,67 @@ mod tests {
.unwrap();
assert_eq!(result.total, 0);
}
#[sqlx::test(migrations = "../postgres/migrations")]
async fn search_thoughts_viewer_context(pool: sqlx::PgPool) {
use domain::models::social::Like;
use domain::ports::{LikeRepository, UserWriter};
use domain::value_objects::LikeId;
use postgres::{like::PgLikeRepository, user::PgUserRepository};
let (alice, thought) = seed_thought(&pool, "alice", "hello world").await;
// alice likes her own thought
let like_repo = PgLikeRepository::new(pool.clone());
like_repo
.save(&Like {
id: LikeId::new(),
user_id: alice.id.clone(),
thought_id: thought.id.clone(),
ap_id: None,
created_at: chrono::Utc::now(),
})
.await
.unwrap();
let repo = PgSearchRepository::new(pool);
// with viewer — should see liked = true
let authed = repo
.search_thoughts(
"hello",
&PageParams {
page: 1,
per_page: 20,
},
Some(&alice.id),
)
.await
.unwrap();
assert_eq!(authed.items.len(), 1);
let ctx = authed.items[0]
.viewer
.as_ref()
.expect("viewer context present");
assert!(ctx.liked, "alice should see the thought as liked");
assert!(!ctx.boosted);
// without viewer — viewer should be None
let anon = repo
.search_thoughts(
"hello",
&PageParams {
page: 1,
per_page: 20,
},
None,
)
.await
.unwrap();
assert_eq!(anon.items.len(), 1);
assert!(
anon.items[0].viewer.is_none(),
"anonymous request has no viewer context"
);
}
}

View File

@@ -0,0 +1 @@
ALTER TABLE notifications RENAME COLUMN "type" TO notification_type;

View File

@@ -5,12 +5,11 @@ const MAX_REMOTE_CONTENT_CHARS: usize = 500;
const THOUGHTS_PATH_PREFIX: &str = "/thoughts/";
use chrono::{DateTime, Utc};
use sqlx::PgPool;
use url::Url;
use domain::{
errors::DomainError,
models::thought::{Thought, Visibility},
ports::{ActivityPubRepository, OutboxEntry},
ports::{ActivityPubRepository, ActorApUrls, OutboxEntry},
value_objects::{Content, ThoughtId, UserId, Username},
};
@@ -60,8 +59,6 @@ impl ActivityPubRepository for PgActivityPubRepository {
user_id: UserId::from_uuid(r.user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
in_reply_to_url: None,
ap_id: None,
visibility: Visibility::Public,
content_warning: r.content_warning,
sensitive: r.sensitive,
@@ -127,8 +124,6 @@ impl ActivityPubRepository for PgActivityPubRepository {
user_id: UserId::from_uuid(r.user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
in_reply_to_url: None,
ap_id: None,
visibility: Visibility::Public,
content_warning: r.content_warning,
sensitive: r.sensitive,
@@ -143,17 +138,17 @@ impl ActivityPubRepository for PgActivityPubRepository {
async fn find_remote_actor_id(
&self,
actor_ap_url: &Url,
actor_ap_url: &str,
) -> Result<Option<UserId>, DomainError> {
sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1")
.bind(actor_ap_url.as_str())
.bind(actor_ap_url)
.fetch_optional(&self.pool)
.await
.into_domain()
.map(|o| o.map(UserId::from_uuid))
}
async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result<UserId, DomainError> {
async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError> {
if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? {
return Ok(id);
}
@@ -161,11 +156,13 @@ impl ActivityPubRepository for PgActivityPubRepository {
// Use the last path segment as username (e.g. /users/alice → "alice").
// Falls back to a random short id for long segments (e.g. UUID-based actor URLs).
// username column is VARCHAR(32).
let last_seg = actor_ap_url
.path_segments()
.and_then(|mut s| s.next_back())
.unwrap_or("")
.to_string();
let last_seg = url::Url::parse(actor_ap_url)
.ok()
.and_then(|u| {
u.path_segments()
.and_then(|mut s| s.next_back().map(|s| s.to_string()))
})
.unwrap_or_default();
let handle = if last_seg.is_empty() {
format!("remote_{}", &new_id.to_string()[..13])
} else if last_seg.len() <= 32 {
@@ -180,7 +177,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.bind(new_id)
.bind(&handle)
.bind(format!("{}@remote", new_id))
.bind(actor_ap_url.as_str())
.bind(actor_ap_url)
.execute(&self.pool)
.await
.into_domain()?;
@@ -215,25 +212,26 @@ impl ActivityPubRepository for PgActivityPubRepository {
async fn accept_note(
&self,
ap_id: &Url,
ap_id: &str,
author_id: &UserId,
content: &str,
published: DateTime<Utc>,
sensitive: bool,
content_warning: Option<String>,
visibility: &str,
in_reply_to: Option<&Url>,
in_reply_to: Option<&str>,
) -> Result<(), DomainError> {
let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
let (in_reply_to_id, in_reply_to_url) = match in_reply_to {
Some(url) => {
// If the parent is a local thought, extract its UUID for in_reply_to_id.
let local_uuid = url
.path()
.strip_prefix(THOUGHTS_PATH_PREFIX)
.and_then(|s| s.split('/').next())
.and_then(|s| uuid::Uuid::parse_str(s).ok());
(local_uuid, Some(url.as_str().to_string()))
let local_uuid = url::Url::parse(url).ok().and_then(|u| {
u.path()
.strip_prefix(THOUGHTS_PATH_PREFIX)
.and_then(|s| s.split('/').next())
.and_then(|s| uuid::Uuid::parse_str(s).ok())
});
(local_uuid, Some(url.to_string()))
}
None => (None, None),
};
@@ -244,7 +242,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.bind(uuid::Uuid::new_v4())
.bind(author_id.as_uuid())
.bind(&capped)
.bind(ap_id.as_str())
.bind(ap_id)
.bind(sensitive)
.bind(content_warning)
.bind(published)
@@ -257,12 +255,12 @@ impl ActivityPubRepository for PgActivityPubRepository {
.map(|_| ())
}
async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> {
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError> {
let capped: String = new_content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
sqlx::query(
"UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false",
)
.bind(ap_id.as_str())
.bind(ap_id)
.bind(&capped)
.execute(&self.pool)
.await
@@ -270,20 +268,20 @@ impl ActivityPubRepository for PgActivityPubRepository {
.map(|_| ())
}
async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> {
async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError> {
sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false")
.bind(ap_id.as_str())
.bind(ap_id)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> {
async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError> {
sqlx::query(
"DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)",
)
.bind(actor_ap_url.as_str())
.bind(actor_ap_url)
.execute(&self.pool)
.await
.into_domain()
@@ -297,6 +295,34 @@ impl ActivityPubRepository for PgActivityPubRepository {
.into_domain()?;
Ok(n as u64)
}
async fn get_thought_ap_id(
&self,
thought_id: &ThoughtId,
) -> Result<Option<String>, DomainError> {
sqlx::query_scalar::<_, String>(
"SELECT ap_id FROM thoughts WHERE id = $1 AND ap_id IS NOT NULL",
)
.bind(thought_id.as_uuid())
.fetch_optional(&self.pool)
.await
.into_domain()
}
async fn get_actor_ap_urls(
&self,
user_id: &UserId,
) -> Result<Option<ActorApUrls>, DomainError> {
sqlx::query_as::<_, (String, String)>(
"SELECT ap_id, inbox_url FROM users \
WHERE id = $1 AND ap_id IS NOT NULL AND inbox_url IS NOT NULL",
)
.bind(user_id.as_uuid())
.fetch_optional(&self.pool)
.await
.into_domain()
.map(|opt| opt.map(|(ap_id, inbox_url)| ActorApUrls { ap_id, inbox_url }))
}
}
#[cfg(test)]
@@ -307,20 +333,20 @@ mod tests {
#[sqlx::test(migrations = "./migrations")]
async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) {
let repo = PgActivityPubRepository::new(pool);
let url = url::Url::parse("https://mastodon.social/users/alice").unwrap();
let id1 = repo.intern_remote_actor(&url).await.unwrap();
let id2 = repo.intern_remote_actor(&url).await.unwrap();
let url = "https://mastodon.social/users/alice";
let id1 = repo.intern_remote_actor(url).await.unwrap();
let id2 = repo.intern_remote_actor(url).await.unwrap();
assert_eq!(id1, id2);
}
#[sqlx::test(migrations = "./migrations")]
async fn accept_and_retract_note(pool: sqlx::PgPool) {
let repo = PgActivityPubRepository::new(pool);
let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap();
let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap();
let author = repo.intern_remote_actor(&actor_url).await.unwrap();
let actor_url = "https://remote.example/users/bob";
let ap_id = "https://remote.example/notes/1";
let author = repo.intern_remote_actor(actor_url).await.unwrap();
repo.accept_note(
&ap_id,
ap_id,
&author,
"hello from remote",
chrono::Utc::now(),
@@ -331,7 +357,7 @@ mod tests {
)
.await
.unwrap();
repo.retract_note(&ap_id).await.unwrap();
repo.retract_note(ap_id).await.unwrap();
}
#[sqlx::test(migrations = "./migrations")]

View File

@@ -93,7 +93,7 @@ mod tests {
use super::*;
use crate::user::PgUserRepository;
use chrono::Utc;
use domain::ports::UserRepository;
use domain::ports::UserWriter;
use domain::{models::user::User, value_objects::*};
async fn seed_user(pool: &sqlx::PgPool) -> User {

View File

@@ -29,8 +29,6 @@ struct FeedRow {
t_user_id: uuid::Uuid,
content: String,
in_reply_to_id: Option<uuid::Uuid>,
in_reply_to_url: Option<String>,
t_ap_id: Option<String>,
visibility: String,
content_warning: Option<String>,
sensitive: bool,
@@ -47,8 +45,6 @@ struct FeedRow {
header_url: Option<String>,
custom_css: Option<String>,
author_local: bool,
u_ap_id: Option<String>,
inbox_url: Option<String>,
author_created_at: DateTime<Utc>,
author_updated_at: DateTime<Utc>,
like_count: i64,
@@ -83,7 +79,7 @@ fn feed_select(viewer: Option<uuid::Uuid>) -> String {
"
SELECT
t.id AS thought_id, t.user_id AS t_user_id, t.content,
t.in_reply_to_id, t.in_reply_to_url, t.ap_id AS t_ap_id,
t.in_reply_to_id,
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,
t.created_at AS thought_created_at, t.updated_at,
u.id AS author_id,
@@ -98,7 +94,7 @@ fn feed_select(viewer: Option<uuid::Uuid>) -> String {
u.bio,
COALESCE(ra.avatar_url, u.avatar_url) AS avatar_url,
u.header_url, u.custom_css,
u.local AS author_local, u.ap_id AS u_ap_id, u.inbox_url,
u.local AS author_local,
u.created_at AS author_created_at, u.updated_at AS author_updated_at,
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,
@@ -110,15 +106,13 @@ fn feed_select(viewer: Option<uuid::Uuid>) -> String {
)
}
fn row_to_entry(r: FeedRow) -> FeedEntry {
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
let thought = Thought {
id: ThoughtId::from_uuid(r.thought_id),
user_id: UserId::from_uuid(r.t_user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
in_reply_to_url: r.in_reply_to_url,
ap_id: r.t_ap_id,
visibility: Visibility::from_db_str(&r.visibility),
visibility: Visibility::from_db_str(&r.visibility)?,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: r.t_local,
@@ -136,20 +130,22 @@ fn row_to_entry(r: FeedRow) -> FeedEntry {
header_url: r.header_url,
custom_css: r.custom_css,
local: r.author_local,
ap_id: r.u_ap_id,
inbox_url: r.inbox_url,
created_at: r.author_created_at,
updated_at: r.author_updated_at,
};
FeedEntry {
Ok(FeedEntry {
thought,
author,
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
liked_by_viewer: r.liked_by_viewer,
boosted_by_viewer: r.boosted_by_viewer,
}
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
liked: r.liked_by_viewer,
boosted: r.boosted_by_viewer,
}),
})
}
#[async_trait]
@@ -184,7 +180,10 @@ impl FeedRepository for PgFeedRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -214,7 +213,10 @@ impl FeedRepository for PgFeedRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -247,7 +249,10 @@ impl FeedRepository for PgFeedRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -289,7 +294,10 @@ impl FeedRepository for PgFeedRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -329,7 +337,10 @@ impl FeedRepository for PgFeedRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(row_to_entry).collect(),
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -346,7 +357,7 @@ mod tests {
thought::{Thought, Visibility},
user::User,
},
ports::{ThoughtRepository, UserRepository},
ports::{ThoughtRepository, UserWriter},
value_objects::*,
};

View File

@@ -76,13 +76,18 @@ impl FollowRepository for PgFollowRepository {
.fetch_optional(&self.pool)
.await
.into_domain()
.map(|o| o.map(|r| Follow {
follower_id: UserId::from_uuid(r.follower_id),
following_id: UserId::from_uuid(r.following_id),
state: FollowState::from_db_str(&r.state),
ap_id: r.ap_id,
created_at: r.created_at,
}))
.and_then(|o| {
o.map(|r| {
Ok(Follow {
follower_id: UserId::from_uuid(r.follower_id),
following_id: UserId::from_uuid(r.following_id),
state: FollowState::from_db_str(&r.state)?,
ap_id: r.ap_id,
created_at: r.created_at,
})
})
.transpose()
})
}
async fn update_state(

View File

@@ -2,33 +2,11 @@ use crate::db_error::IntoDbResult;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
fn notif_type_from_str(s: &str) -> domain::models::notification::NotificationType {
use domain::models::notification::NotificationType;
match s {
"like" => NotificationType::Like,
"boost" => NotificationType::Boost,
"follow" => NotificationType::Follow,
"mention" => NotificationType::Mention,
_ => NotificationType::Reply,
}
}
fn notif_type_as_str(t: &domain::models::notification::NotificationType) -> &'static str {
use domain::models::notification::NotificationType;
match t {
NotificationType::Like => "like",
NotificationType::Boost => "boost",
NotificationType::Follow => "follow",
NotificationType::Mention => "mention",
NotificationType::Reply => "reply",
}
}
use domain::{
errors::DomainError,
models::{
feed::{PageParams, Paginated},
notification::Notification,
notification::{Notification, NotificationKind},
},
ports::NotificationRepository,
value_objects::{NotificationId, ThoughtId, UserId},
@@ -44,17 +22,83 @@ impl PgNotificationRepository {
}
}
#[derive(sqlx::FromRow)]
struct NotificationRow {
id: uuid::Uuid,
user_id: uuid::Uuid,
notification_type: String,
from_user_id: Option<uuid::Uuid>,
thought_id: Option<uuid::Uuid>,
read: bool,
created_at: DateTime<Utc>,
}
fn row_to_notification(r: NotificationRow) -> Result<Notification, DomainError> {
let from_user_id = r
.from_user_id
.map(UserId::from_uuid)
.ok_or_else(|| DomainError::Internal("notification missing from_user_id".into()))?;
let kind = match r.notification_type.as_str() {
"follow" => NotificationKind::Follow { from_user_id },
other => {
let thought_id = r.thought_id.map(ThoughtId::from_uuid).ok_or_else(|| {
DomainError::Internal(format!("notification type '{other}' missing thought_id"))
})?;
match other {
"like" => NotificationKind::Like {
thought_id,
from_user_id,
},
"boost" => NotificationKind::Boost {
thought_id,
from_user_id,
},
"reply" => NotificationKind::Reply {
thought_id,
from_user_id,
},
"mention" => NotificationKind::Mention {
thought_id,
from_user_id,
},
_ => {
return Err(DomainError::Internal(format!(
"unknown notification type: {other}"
)))
}
}
}
};
Ok(Notification {
id: NotificationId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
kind,
read: r.read,
created_at: r.created_at,
})
}
#[async_trait]
impl NotificationRepository for PgNotificationRepository {
async fn save(&self, n: &Notification) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO notifications(id,user_id,type,from_user_id,thought_id,read,created_at) VALUES($1,$2,$3,$4,$5,$6,$7)"
"INSERT INTO notifications(id,user_id,notification_type,from_user_id,thought_id,read,created_at)
VALUES($1,$2,$3,$4,$5,$6,$7)
ON CONFLICT(id) DO NOTHING"
)
.bind(n.id.as_uuid()).bind(n.user_id.as_uuid()).bind(notif_type_as_str(&n.notification_type))
.bind(n.from_user_id.as_ref().map(|u| u.as_uuid()))
.bind(n.thought_id.as_ref().map(|t| t.as_uuid()))
.bind(n.read).bind(n.created_at)
.execute(&self.pool).await.into_domain().map(|_| ())
.bind(n.id.as_uuid())
.bind(n.user_id.as_uuid())
.bind(n.kind.kind_str())
.bind(n.kind.from_user_id().as_uuid())
.bind(n.kind.thought_id().map(|t| t.as_uuid()))
.bind(n.read)
.bind(n.created_at)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
async fn list_for_user(
@@ -67,32 +111,14 @@ impl NotificationRepository for PgNotificationRepository {
.fetch_one(&self.pool)
.await
.into_domain()?;
#[derive(sqlx::FromRow)]
struct Row {
id: uuid::Uuid,
user_id: uuid::Uuid,
r#type: String,
from_user_id: Option<uuid::Uuid>,
thought_id: Option<uuid::Uuid>,
read: bool,
created_at: DateTime<Utc>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT id,user_id,type,from_user_id,thought_id,read,created_at FROM notifications WHERE user_id=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3"
let rows = sqlx::query_as::<_, NotificationRow>(
"SELECT id,user_id,notification_type,from_user_id,thought_id,read,created_at FROM notifications WHERE user_id=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3"
).bind(user_id.as_uuid()).bind(page.limit()).bind(page.offset())
.fetch_all(&self.pool).await.into_domain()?;
let items = rows
.into_iter()
.map(|r| Notification {
id: NotificationId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
notification_type: notif_type_from_str(&r.r#type),
from_user_id: r.from_user_id.map(UserId::from_uuid),
thought_id: r.thought_id.map(ThoughtId::from_uuid),
read: r.read,
created_at: r.created_at,
})
.collect();
.map(row_to_notification)
.collect::<Result<Vec<_>, _>>()?;
Ok(Paginated {
items,
total,
@@ -135,37 +161,25 @@ impl NotificationRepository for PgNotificationRepository {
#[cfg(test)]
mod tests {
use super::*;
use crate::user::PgUserRepository;
use crate::test_helpers;
use chrono::Utc;
use domain::ports::UserRepository;
use domain::{
models::{notification::NotificationType, user::User},
models::{notification::NotificationKind, user::User},
value_objects::*,
};
async fn seed_user(pool: &sqlx::PgPool) -> User {
let repo = PgUserRepository::new(pool.clone());
let u = User::new_local(
UserId::new(),
Username::new("alice").unwrap(),
Email::new("alice@ex.com").unwrap(),
PasswordHash("h".into()),
);
repo.save(&u).await.unwrap();
u
}
#[sqlx::test(migrations = "./migrations")]
async fn save_and_list(pool: sqlx::PgPool) {
let user = seed_user(&pool).await;
let user = test_helpers::seed_user(&pool, "alice", "alice@ex.com").await;
let from_user = test_helpers::seed_user(&pool, "bob", "bob@ex.com").await;
let repo = PgNotificationRepository::new(pool);
use domain::models::feed::PageParams;
let n = Notification {
id: NotificationId::new(),
user_id: user.id.clone(),
notification_type: NotificationType::Like,
from_user_id: None,
thought_id: None,
kind: NotificationKind::Follow {
from_user_id: from_user.id.clone(),
},
read: false,
created_at: Utc::now(),
};
@@ -186,15 +200,16 @@ mod tests {
#[sqlx::test(migrations = "./migrations")]
async fn mark_all_read(pool: sqlx::PgPool) {
let user = seed_user(&pool).await;
let user = test_helpers::seed_user(&pool, "alice", "alice@ex.com").await;
let from_user = test_helpers::seed_user(&pool, "bob", "bob@ex.com").await;
let repo = PgNotificationRepository::new(pool);
use domain::models::feed::PageParams;
let n = Notification {
id: NotificationId::new(),
user_id: user.id.clone(),
notification_type: NotificationType::Follow,
from_user_id: None,
thought_id: None,
kind: NotificationKind::Follow {
from_user_id: from_user.id.clone(),
},
read: false,
created_at: Utc::now(),
};

View File

@@ -19,14 +19,12 @@ impl PgRemoteActorRepository {
impl RemoteActorRepository for PgRemoteActorRepository {
async fn upsert(&self, a: &RemoteActor) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO remote_actors(url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,last_fetched_at)
VALUES($1,$2,$3,$4,$5,$6,$7,$8)
"INSERT INTO remote_actors(url,handle,display_name,avatar_url,last_fetched_at)
VALUES($1,$2,$3,$4,$5)
ON CONFLICT(url) DO UPDATE SET handle=EXCLUDED.handle,display_name=EXCLUDED.display_name,
inbox_url=EXCLUDED.inbox_url,shared_inbox_url=EXCLUDED.shared_inbox_url,
public_key=EXCLUDED.public_key,avatar_url=EXCLUDED.avatar_url,last_fetched_at=EXCLUDED.last_fetched_at"
avatar_url=EXCLUDED.avatar_url,last_fetched_at=EXCLUDED.last_fetched_at"
)
.bind(&a.url).bind(&a.handle).bind(&a.display_name).bind(&a.inbox_url)
.bind(&a.shared_inbox_url).bind(&a.public_key).bind(&a.avatar_url).bind(a.last_fetched_at)
.bind(&a.url).bind(&a.handle).bind(&a.display_name).bind(&a.avatar_url).bind(a.last_fetched_at)
.execute(&self.pool).await.into_domain().map(|_| ())
}
@@ -36,16 +34,26 @@ impl RemoteActorRepository for PgRemoteActorRepository {
url: String,
handle: String,
display_name: Option<String>,
inbox_url: String,
shared_inbox_url: Option<String>,
public_key: String,
avatar_url: Option<String>,
last_fetched_at: DateTime<Utc>,
}
sqlx::query_as::<_, Row>(
"SELECT url,handle,display_name,inbox_url,shared_inbox_url,public_key,avatar_url,last_fetched_at FROM remote_actors WHERE url=$1"
"SELECT url,handle,display_name,avatar_url,last_fetched_at FROM remote_actors WHERE url=$1"
).bind(url).fetch_optional(&self.pool).await
.into_domain()
.map(|o| o.map(|r| RemoteActor { url: r.url, handle: r.handle, display_name: r.display_name, inbox_url: r.inbox_url, shared_inbox_url: r.shared_inbox_url, public_key: r.public_key, avatar_url: r.avatar_url, last_fetched_at: r.last_fetched_at, bio: None, banner_url: None, also_known_as: None, outbox_url: None, followers_url: None, following_url: None, attachment: vec![] }))
.map(|o| o.map(|r| RemoteActor {
url: r.url,
handle: r.handle,
display_name: r.display_name,
avatar_url: r.avatar_url,
last_fetched_at: r.last_fetched_at,
bio: None,
banner_url: None,
also_known_as: None,
outbox_url: None,
followers_url: None,
following_url: None,
attachment: vec![],
}))
}
}

View File

@@ -105,7 +105,10 @@ impl TagRepository for PgTagRepository {
.fetch_all(&self.pool).await.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(Thought::from).collect(),
items: rows
.into_iter()
.map(Thought::try_from)
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
@@ -132,7 +135,7 @@ impl TagRepository for PgTagRepository {
mod tests {
use super::*;
use crate::{thought::PgThoughtRepository, user::PgUserRepository};
use domain::ports::{ThoughtRepository, UserRepository};
use domain::ports::{ThoughtRepository, UserWriter};
use domain::{
models::{
thought::{Thought, Visibility},

View File

@@ -4,7 +4,7 @@ use domain::{
thought::{Thought, Visibility},
user::User,
},
ports::{ThoughtRepository, UserRepository},
ports::{ThoughtRepository, UserWriter},
value_objects::{Content, Email, PasswordHash, ThoughtId, UserId, Username},
};

View File

@@ -28,8 +28,6 @@ pub(crate) struct ThoughtRow {
pub user_id: uuid::Uuid,
pub content: String,
pub in_reply_to_id: Option<uuid::Uuid>,
pub in_reply_to_url: Option<String>,
pub ap_id: Option<String>,
pub visibility: String,
pub content_warning: Option<String>,
pub sensitive: bool,
@@ -38,42 +36,39 @@ pub(crate) struct ThoughtRow {
pub updated_at: Option<DateTime<Utc>>,
}
impl From<ThoughtRow> for Thought {
fn from(r: ThoughtRow) -> Self {
Thought {
impl TryFrom<ThoughtRow> for Thought {
type Error = DomainError;
fn try_from(r: ThoughtRow) -> Result<Self, DomainError> {
Ok(Thought {
id: ThoughtId::from_uuid(r.id),
user_id: UserId::from_uuid(r.user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
in_reply_to_url: r.in_reply_to_url,
ap_id: r.ap_id,
visibility: Visibility::from_db_str(&r.visibility),
visibility: Visibility::from_db_str(&r.visibility)?,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: r.local,
created_at: r.created_at,
updated_at: r.updated_at,
}
})
}
}
const THOUGHT_SELECT: &str =
"SELECT id,user_id,content,in_reply_to_id,in_reply_to_url,ap_id,visibility,content_warning,sensitive,local,created_at,updated_at FROM thoughts";
"SELECT id,user_id,content,in_reply_to_id,visibility,content_warning,sensitive,local,created_at,updated_at FROM thoughts";
#[async_trait]
impl ThoughtRepository for PgThoughtRepository {
async fn save(&self, t: &Thought) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO thoughts(id,user_id,content,in_reply_to_id,in_reply_to_url,ap_id,visibility,content_warning,sensitive,local,created_at)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
"INSERT INTO thoughts(id,user_id,content,in_reply_to_id,visibility,content_warning,sensitive,local,created_at)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT(id) DO UPDATE SET content=EXCLUDED.content,updated_at=NOW()"
)
.bind(t.id.as_uuid())
.bind(t.user_id.as_uuid())
.bind(t.content.as_str())
.bind(t.in_reply_to_id.as_ref().map(|x| x.as_uuid()))
.bind(&t.in_reply_to_url)
.bind(&t.ap_id)
.bind(t.visibility.as_str())
.bind(&t.content_warning)
.bind(t.sensitive)
@@ -91,7 +86,7 @@ impl ThoughtRepository for PgThoughtRepository {
.fetch_optional(&self.pool)
.await
.into_domain()
.map(|o| o.map(Thought::from))
.and_then(|o| o.map(Thought::try_from).transpose())
}
async fn delete(&self, id: &ThoughtId, user_id: &UserId) -> Result<(), DomainError> {
@@ -121,11 +116,11 @@ impl ThoughtRepository for PgThoughtRepository {
// Recursive CTE: fetches the root thought and all nested replies at any depth.
sqlx::query_as::<_, ThoughtRow>(
"WITH RECURSIVE thread AS (
SELECT id,user_id,content,in_reply_to_id,in_reply_to_url,ap_id,
SELECT id,user_id,content,in_reply_to_id,
visibility,content_warning,sensitive,local,created_at,updated_at
FROM thoughts WHERE id = $1
UNION ALL
SELECT t.id,t.user_id,t.content,t.in_reply_to_id,t.in_reply_to_url,t.ap_id,
SELECT t.id,t.user_id,t.content,t.in_reply_to_id,
t.visibility,t.content_warning,t.sensitive,t.local,t.created_at,t.updated_at
FROM thoughts t JOIN thread ON t.in_reply_to_id = thread.id
)
@@ -135,7 +130,7 @@ impl ThoughtRepository for PgThoughtRepository {
.fetch_all(&self.pool)
.await
.into_domain()
.map(|rows| rows.into_iter().map(Thought::from).collect())
.and_then(|rows| rows.into_iter().map(Thought::try_from).collect())
}
async fn list_by_user(
@@ -161,7 +156,10 @@ impl ThoughtRepository for PgThoughtRepository {
.into_domain()?;
Ok(Paginated {
items: rows.into_iter().map(Thought::from).collect(),
items: rows
.into_iter()
.map(Thought::try_from)
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,

View File

@@ -58,15 +58,13 @@ impl TopFriendRepository for PgTopFriendRepository {
header_url: Option<String>,
custom_css: Option<String>,
local: bool,
ap_id: Option<String>,
inbox_url: Option<String>,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT tf.user_id AS tf_user_id, tf.friend_id, tf.position,
u.id, u.username, u.email, u.password_hash, u.display_name, u.bio,
u.avatar_url, u.header_url, u.custom_css, u.local, u.ap_id, u.inbox_url,
u.avatar_url, u.header_url, u.custom_css, u.local,
u.created_at, u.updated_at
FROM top_friends tf JOIN users u ON u.id=tf.friend_id
WHERE tf.user_id=$1 ORDER BY tf.position",
@@ -96,8 +94,6 @@ impl TopFriendRepository for PgTopFriendRepository {
header_url: r.header_url,
custom_css: r.custom_css,
local: r.local,
ap_id: r.ap_id,
inbox_url: r.inbox_url,
created_at: r.created_at,
updated_at: r.updated_at,
};
@@ -111,7 +107,7 @@ impl TopFriendRepository for PgTopFriendRepository {
mod tests {
use super::*;
use crate::user::PgUserRepository;
use domain::ports::UserRepository;
use domain::ports::UserWriter;
use domain::{models::user::User, value_objects::*};
async fn seed_user(pool: &sqlx::PgPool, username: &str, email: &str) -> User {

View File

@@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use domain::{
errors::DomainError,
models::{feed::UserSummary, user::User},
ports::UserRepository,
ports::{UserReader, UserWriter},
value_objects::{Email, PasswordHash, UserId, Username},
};
use sqlx::PgPool;
@@ -30,8 +30,6 @@ pub struct UserRow {
pub header_url: Option<String>,
pub custom_css: Option<String>,
pub local: bool,
pub ap_id: Option<String>,
pub inbox_url: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -49,18 +47,18 @@ impl From<UserRow> for User {
header_url: r.header_url,
custom_css: r.custom_css,
local: r.local,
ap_id: r.ap_id,
inbox_url: r.inbox_url,
created_at: r.created_at,
updated_at: r.updated_at,
}
}
}
pub const USER_SELECT: &str = "SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,custom_css,local,ap_id,inbox_url,created_at,updated_at FROM users";
pub const USER_SELECT: &str =
"SELECT id,username,email,password_hash,display_name,bio,avatar_url,header_url,\
custom_css,local,created_at,updated_at FROM users";
#[async_trait]
impl UserRepository for PgUserRepository {
impl UserReader for PgUserRepository {
async fn find_by_id(&self, id: &UserId) -> Result<Option<User>, DomainError> {
sqlx::query_as::<_, UserRow>(&format!("{USER_SELECT} WHERE id=$1"))
.bind(id.as_uuid())
@@ -88,62 +86,6 @@ impl UserRepository for PgUserRepository {
.map(|o| o.map(User::from))
}
async fn save(&self, user: &User) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO users (id,username,email,password_hash,display_name,bio,avatar_url,header_url,custom_css,local,ap_id,inbox_url,created_at,updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)
ON CONFLICT(id) DO UPDATE SET
username=EXCLUDED.username, email=EXCLUDED.email,
password_hash=EXCLUDED.password_hash, display_name=EXCLUDED.display_name,
bio=EXCLUDED.bio, avatar_url=EXCLUDED.avatar_url,
header_url=EXCLUDED.header_url, custom_css=EXCLUDED.custom_css,
local=EXCLUDED.local, ap_id=EXCLUDED.ap_id, inbox_url=EXCLUDED.inbox_url,
updated_at=NOW()"
)
.bind(user.id.as_uuid())
.bind(user.username.as_str())
.bind(user.email.as_str())
.bind(&user.password_hash.0)
.bind(&user.display_name)
.bind(&user.bio)
.bind(&user.avatar_url)
.bind(&user.header_url)
.bind(&user.custom_css)
.bind(user.local)
.bind(&user.ap_id)
.bind(&user.inbox_url)
.bind(user.created_at)
.bind(user.updated_at)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
async fn update_profile(
&self,
user_id: &UserId,
display_name: Option<String>,
bio: Option<String>,
avatar_url: Option<String>,
header_url: Option<String>,
custom_css: Option<String>,
) -> Result<(), DomainError> {
sqlx::query(
"UPDATE users SET display_name=$2,bio=$3,avatar_url=$4,header_url=$5,custom_css=$6,updated_at=NOW() WHERE id=$1"
)
.bind(user_id.as_uuid())
.bind(display_name)
.bind(bio)
.bind(avatar_url)
.bind(header_url)
.bind(custom_css)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
async fn list_with_stats(&self) -> Result<Vec<UserSummary>, DomainError> {
#[derive(sqlx::FromRow)]
struct Row {
@@ -196,6 +138,63 @@ impl UserRepository for PgUserRepository {
}
}
#[async_trait]
impl UserWriter for PgUserRepository {
async fn save(&self, user: &User) -> Result<(), DomainError> {
sqlx::query(
"INSERT INTO users (id,username,email,password_hash,display_name,bio,avatar_url,header_url,custom_css,local,created_at,updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
ON CONFLICT(id) DO UPDATE SET
username=EXCLUDED.username, email=EXCLUDED.email,
password_hash=EXCLUDED.password_hash, display_name=EXCLUDED.display_name,
bio=EXCLUDED.bio, avatar_url=EXCLUDED.avatar_url,
header_url=EXCLUDED.header_url, custom_css=EXCLUDED.custom_css,
local=EXCLUDED.local,
updated_at=NOW()"
)
.bind(user.id.as_uuid())
.bind(user.username.as_str())
.bind(user.email.as_str())
.bind(&user.password_hash.0)
.bind(&user.display_name)
.bind(&user.bio)
.bind(&user.avatar_url)
.bind(&user.header_url)
.bind(&user.custom_css)
.bind(user.local)
.bind(user.created_at)
.bind(user.updated_at)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
async fn update_profile(
&self,
user_id: &UserId,
display_name: Option<String>,
bio: Option<String>,
avatar_url: Option<String>,
header_url: Option<String>,
custom_css: Option<String>,
) -> Result<(), DomainError> {
sqlx::query(
"UPDATE users SET display_name=$2,bio=$3,avatar_url=$4,header_url=$5,custom_css=$6,updated_at=NOW() WHERE id=$1"
)
.bind(user_id.as_uuid())
.bind(display_name)
.bind(bio)
.bind(avatar_url)
.bind(header_url)
.bind(custom_css)
.execute(&self.pool)
.await
.into_domain()
.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,28 +1,26 @@
use domain::{
errors::DomainError,
events::DomainEvent,
models::thought::{Thought, Visibility},
ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserRepository},
models::thought::Visibility,
ports::{ActivityPubRepository, OutboundFederationPort, ThoughtRepository, UserReader},
value_objects::ThoughtId,
};
use std::sync::Arc;
pub struct FederationEventService {
pub thoughts: Arc<dyn ThoughtRepository>,
pub users: Arc<dyn UserRepository>,
pub users: Arc<dyn UserReader>,
pub ap: Arc<dyn OutboundFederationPort>,
pub base_url: String,
pub federation_action: Arc<dyn domain::ports::FederationActionPort>,
pub ap_repo: Arc<dyn ActivityPubRepository>,
pub remote_actor_connections: Arc<dyn domain::ports::RemoteActorConnectionRepository>,
}
impl FederationEventService {
fn object_ap_id(&self, thought: &Thought, thought_id: &ThoughtId) -> String {
thought
.ap_id
.clone()
.unwrap_or_else(|| format!("{}/thoughts/{}", self.base_url, thought_id))
async fn object_ap_id(&self, thought_id: &ThoughtId) -> Result<String, DomainError> {
if let Some(ap_id) = self.ap_repo.get_thought_ap_id(thought_id).await? {
return Ok(ap_id);
}
Ok(format!("{}/thoughts/{}", self.base_url, thought_id))
}
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
@@ -48,31 +46,24 @@ impl FederationEventService {
Some(u) => u,
None => return Ok(()),
};
// For replies to remote posts: in_reply_to_url is None but in_reply_to_id
// points to the locally-stored remote thought. Resolve its ap_id so the
// outbound Note includes inReplyTo and Mastodon threads it correctly.
let thought = if thought.in_reply_to_url.is_none() {
if let Some(ref reply_id) = thought.in_reply_to_id {
match self.thoughts.find_by_id(reply_id).await? {
Some(parent) => {
let parent_ap_url = parent.ap_id.unwrap_or_else(|| {
format!("{}/thoughts/{}", self.base_url, reply_id)
});
domain::models::thought::Thought {
in_reply_to_url: Some(parent_ap_url),
..thought
}
}
None => thought,
}
} else {
thought
}
// Resolve in_reply_to_url for the parent thought via AP repo.
let in_reply_to_url = if let Some(ref reply_id) = thought.in_reply_to_id {
let ap_id = self
.ap_repo
.get_thought_ap_id(reply_id)
.await?
.unwrap_or_else(|| format!("{}/thoughts/{}", self.base_url, reply_id));
Some(ap_id)
} else {
thought
None
};
self.ap
.broadcast_create(user_id, &thought, user.username.as_str())
.broadcast_create(
user_id,
&thought,
user.username.as_str(),
in_reply_to_url.as_deref(),
)
.await
}
@@ -106,8 +97,21 @@ impl FederationEventService {
Some(u) => u,
None => return Ok(()),
};
let in_reply_to_url = if let Some(ref reply_id) = thought.in_reply_to_id {
self.ap_repo
.get_thought_ap_id(reply_id)
.await?
.or_else(|| Some(format!("{}/thoughts/{}", self.base_url, reply_id)))
} else {
None
};
self.ap
.broadcast_update(user_id, &thought, user.username.as_str())
.broadcast_update(
user_id,
&thought,
user.username.as_str(),
in_reply_to_url.as_deref(),
)
.await
}
@@ -122,11 +126,10 @@ impl FederationEventService {
_ => return Ok(()),
};
let _ = booster;
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()),
};
let object_ap_id = self.object_ap_id(&thought, thought_id);
if self.thoughts.find_by_id(thought_id).await?.is_none() {
return Ok(());
}
let object_ap_id = self.object_ap_id(thought_id).await?;
self.ap.broadcast_announce(user_id, &object_ap_id).await
}
@@ -134,122 +137,15 @@ impl FederationEventService {
user_id,
thought_id,
} => {
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) => t,
None => return Ok(()),
};
let object_ap_id = self.object_ap_id(&thought, thought_id);
if self.thoughts.find_by_id(thought_id).await?.is_none() {
return Ok(());
}
let object_ap_id = self.object_ap_id(thought_id).await?;
self.ap
.broadcast_undo_announce(user_id, &object_ap_id)
.await
}
DomainEvent::FetchRemoteActorPosts {
actor_ap_url,
outbox_url,
} => {
let notes = match self
.federation_action
.fetch_outbox_page(outbox_url, 1)
.await
{
Ok(n) => n,
Err(e) => {
tracing::warn!(outbox_url, error = %e, "failed to fetch remote outbox");
return Ok(());
}
};
let actor_url = url::Url::parse(actor_ap_url)
.map_err(|e| DomainError::ExternalService(e.to_string()))?;
let author_id = self.ap_repo.intern_remote_actor(&actor_url).await?;
// Resolve and cache display info so thought cards show proper names.
let profiles = self
.federation_action
.resolve_actor_profiles(vec![actor_ap_url.clone()])
.await;
if let Some(profile) = profiles.into_iter().next() {
let _ = self
.ap_repo
.update_remote_actor_display(
&author_id,
profile.display_name.as_deref(),
profile.avatar_url.as_deref(),
)
.await;
}
for note in notes {
let ap_id = match url::Url::parse(&note.ap_id) {
Ok(u) => u,
Err(_) => continue,
};
let _ = self
.ap_repo
.accept_note(
&ap_id,
&author_id,
&note.content,
note.published,
note.sensitive,
note.content_warning,
"public",
None,
)
.await;
}
Ok(())
}
DomainEvent::FetchActorConnections {
actor_ap_url,
collection_url,
connection_type,
page,
} => {
let urls = match self
.federation_action
.fetch_actor_urls_from_collection(collection_url)
.await
{
Ok(u) => u,
Err(e) => {
tracing::warn!(
collection_url,
error = %e,
"failed to fetch actor connections collection"
);
return Ok(());
}
};
if urls.is_empty() {
return Ok(());
}
let summaries = self.federation_action.resolve_actor_profiles(urls).await;
if summaries.is_empty() {
return Ok(());
}
tracing::info!(
count = summaries.len(),
connection_type,
actor = actor_ap_url,
"caching actor connections"
);
self.remote_actor_connections
.upsert_connections(actor_ap_url, connection_type, *page, &summaries)
.await?;
Ok(())
}
DomainEvent::LikeAdded {
like_id: _,
user_id,
@@ -262,17 +158,19 @@ impl FederationEventService {
};
let _ = liker;
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) if t.ap_id.is_some() => t,
Some(t) => t,
_ => return Ok(()),
};
let author = match self.users.find_by_id(&thought.user_id).await? {
Some(u) if u.inbox_url.is_some() => u,
_ => return Ok(()),
let thought_ap_id = match self.ap_repo.get_thought_ap_id(thought_id).await? {
Some(id) => id,
None => return Ok(()), // local thought — no federation needed
};
let actor_urls = match self.ap_repo.get_actor_ap_urls(&thought.user_id).await? {
Some(u) => u,
None => return Ok(()),
};
let object_ap_id = thought.ap_id.unwrap();
let inbox_url = author.inbox_url.unwrap();
self.ap
.broadcast_like(user_id, &object_ap_id, &inbox_url)
.broadcast_like(user_id, &thought_ap_id, &actor_urls.inbox_url)
.await
}
@@ -286,17 +184,19 @@ impl FederationEventService {
};
let _ = liker;
let thought = match self.thoughts.find_by_id(thought_id).await? {
Some(t) if t.ap_id.is_some() => t,
Some(t) => t,
_ => return Ok(()),
};
let author = match self.users.find_by_id(&thought.user_id).await? {
Some(u) if u.inbox_url.is_some() => u,
_ => return Ok(()),
let thought_ap_id = match self.ap_repo.get_thought_ap_id(thought_id).await? {
Some(id) => id,
None => return Ok(()),
};
let actor_urls = match self.ap_repo.get_actor_ap_urls(&thought.user_id).await? {
Some(u) => u,
None => return Ok(()),
};
let object_ap_id = thought.ap_id.unwrap();
let inbox_url = author.inbox_url.unwrap();
self.ap
.broadcast_undo_like(user_id, &object_ap_id, &inbox_url)
.broadcast_undo_like(user_id, &thought_ap_id, &actor_urls.inbox_url)
.await
}
@@ -345,6 +245,7 @@ mod tests {
_: &UserId,
thought: &Thought,
_: &str,
_in_reply_to_url: Option<&str>,
) -> Result<(), DomainError> {
self.created.lock().unwrap().push(thought.id.clone());
Ok(())
@@ -358,6 +259,7 @@ mod tests {
_: &UserId,
thought: &Thought,
_: &str,
_in_reply_to_url: Option<&str>,
) -> Result<(), DomainError> {
self.updated.lock().unwrap().push(thought.id.clone());
Ok(())
@@ -428,9 +330,7 @@ mod tests {
users: Arc::new(store.clone()),
ap: spy,
base_url: "https://example.com".to_string(),
federation_action: Arc::new(store.clone()),
ap_repo: Arc::new(store.clone()),
remote_actor_connections: Arc::new(store.clone()),
}
}
@@ -460,10 +360,9 @@ mod tests {
async fn remote_thought_created_does_not_broadcast() {
let store = TestStore::default();
let alice = alice();
// Remote thought: local = false, ap_id = Some(...)
// Remote thought: local = false
let mut thought = local_thought(alice.id.clone());
thought.local = false;
thought.ap_id = Some("https://remote.example/notes/1".into());
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
@@ -553,7 +452,10 @@ mod tests {
let alice = alice();
let mut thought = local_thought(alice.id.clone());
thought.local = false;
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/123".into());
store.thought_ap_ids.lock().unwrap().insert(
thought.id.clone(),
"https://mastodon.social/users/bob/statuses/123".into(),
);
store.users.lock().unwrap().push(alice.clone());
store.thoughts.lock().unwrap().push(thought.clone());
@@ -702,7 +604,10 @@ mod tests {
let alice = alice();
let mut thought = local_thought(alice.id.clone());
thought.local = false;
thought.ap_id = Some("https://mastodon.social/users/bob/statuses/456".into());
store.thought_ap_ids.lock().unwrap().insert(
thought.id.clone(),
"https://mastodon.social/users/bob/statuses/456".into(),
);
store.thoughts.lock().unwrap().push(thought.clone());
let spy = Arc::new(SpyPort::default());
@@ -757,35 +662,6 @@ mod tests {
assert!(spy.updated.lock().unwrap().is_empty());
}
#[tokio::test]
async fn fetch_remote_actor_posts_is_noop_when_outbox_empty() {
let store = TestStore::default();
let spy = Arc::new(SpyPort::default());
svc(&store, spy.clone())
.process(&DomainEvent::FetchRemoteActorPosts {
actor_ap_url: "https://mastodon.social/users/alice".into(),
outbox_url: "https://mastodon.social/users/alice/outbox".into(),
})
.await
.unwrap();
// TestStore.fetch_outbox_page returns Ok(vec![]) — no notes, no error
}
#[tokio::test]
async fn fetch_actor_connections_is_noop_when_collection_empty() {
let store = TestStore::default();
let spy = Arc::new(SpyPort::default());
svc(&store, spy.clone())
.process(&DomainEvent::FetchActorConnections {
actor_ap_url: "https://mastodon.social/users/alice".into(),
collection_url: "https://mastodon.social/users/alice/followers".into(),
connection_type: "followers".into(),
page: 1,
})
.await
.unwrap();
}
#[tokio::test]
async fn like_added_local_user_remote_thought_broadcasts_like() {
let store = TestStore::default();
@@ -797,10 +673,19 @@ mod tests {
PasswordHash("h".into()),
);
author.local = false;
author.inbox_url = Some("https://mastodon.social/users/author/inbox".into());
store.actor_ap_urls.lock().unwrap().insert(
author.id.clone(),
domain::ports::ActorApUrls {
ap_id: "https://mastodon.social/users/author".into(),
inbox_url: "https://mastodon.social/users/author/inbox".into(),
},
);
let mut thought = local_thought(author.id.clone());
thought.ap_id = Some("https://mastodon.social/posts/123".into());
let thought = local_thought(author.id.clone());
store.thought_ap_ids.lock().unwrap().insert(
thought.id.clone(),
"https://mastodon.social/posts/123".into(),
);
let liker = alice();

View File

@@ -2,9 +2,9 @@ use chrono::Utc;
use domain::{
errors::DomainError,
events::DomainEvent,
models::notification::{Notification, NotificationType},
models::notification::{Notification, NotificationKind},
ports::{NotificationRepository, ThoughtRepository},
value_objects::{NotificationId, UserId},
value_objects::NotificationId,
};
use std::sync::Arc;
@@ -13,7 +13,10 @@ pub struct NotificationEventService {
pub notifications: Arc<dyn NotificationRepository>,
}
fn is_self_action(thought_author: &UserId, actor: &UserId) -> bool {
fn is_self_action(
thought_author: &domain::value_objects::UserId,
actor: &domain::value_objects::UserId,
) -> bool {
thought_author == actor
}
@@ -36,9 +39,10 @@ impl NotificationEventService {
.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Like,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
kind: NotificationKind::Like {
thought_id: thought_id.clone(),
from_user_id: user_id.clone(),
},
read: false,
created_at: Utc::now(),
})
@@ -60,9 +64,10 @@ impl NotificationEventService {
.save(&Notification {
id: NotificationId::new(),
user_id: thought.user_id,
notification_type: NotificationType::Boost,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
kind: NotificationKind::Boost {
thought_id: thought_id.clone(),
from_user_id: user_id.clone(),
},
read: false,
created_at: Utc::now(),
})
@@ -76,9 +81,9 @@ impl NotificationEventService {
.save(&Notification {
id: NotificationId::new(),
user_id: following_id.clone(),
notification_type: NotificationType::Follow,
from_user_id: Some(follower_id.clone()),
thought_id: None,
kind: NotificationKind::Follow {
from_user_id: follower_id.clone(),
},
read: false,
created_at: Utc::now(),
})
@@ -104,9 +109,10 @@ impl NotificationEventService {
.save(&Notification {
id: NotificationId::new(),
user_id: original.user_id,
notification_type: NotificationType::Reply,
from_user_id: Some(user_id.clone()),
thought_id: Some(thought_id.clone()),
kind: NotificationKind::Reply {
thought_id: thought_id.clone(),
from_user_id: user_id.clone(),
},
read: false,
created_at: Utc::now(),
})
@@ -121,9 +127,10 @@ impl NotificationEventService {
.save(&Notification {
id: NotificationId::new(),
user_id: mentioned_user_id.clone(),
notification_type: NotificationType::Mention,
from_user_id: Some(author_user_id.clone()),
thought_id: Some(thought_id.clone()),
kind: NotificationKind::Mention {
thought_id: thought_id.clone(),
from_user_id: author_user_id.clone(),
},
read: false,
created_at: Utc::now(),
})
@@ -139,6 +146,7 @@ mod tests {
use super::*;
use domain::{
models::{
notification::NotificationKind,
thought::{Thought, Visibility},
user::User,
},
@@ -184,10 +192,7 @@ mod tests {
.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert!(matches!(
notifs[0].notification_type,
NotificationType::Like
));
assert!(matches!(notifs[0].kind, NotificationKind::Like { .. }));
}
#[tokio::test]
@@ -235,10 +240,7 @@ mod tests {
.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert!(matches!(
notifs[0].notification_type,
NotificationType::Follow
));
assert!(matches!(notifs[0].kind, NotificationKind::Follow { .. }));
}
#[tokio::test]
@@ -269,10 +271,7 @@ mod tests {
.unwrap();
let notifs = store.notifications.lock().unwrap();
assert_eq!(notifs.len(), 1);
assert!(matches!(
notifs[0].notification_type,
NotificationType::Reply
));
assert!(matches!(notifs[0].kind, NotificationKind::Reply { .. }));
}
#[tokio::test]

View File

@@ -2,7 +2,7 @@ use domain::{
errors::DomainError,
events::DomainEvent,
models::user::User,
ports::{AuthService, EventPublisher, PasswordHasher, UserRepository},
ports::{AuthService, EventPublisher, PasswordHasher, UserReader, UserRepository},
value_objects::{Email, UserId, Username},
};
@@ -58,7 +58,7 @@ pub struct LoginOutput {
}
pub async fn login(
users: &dyn UserRepository,
users: &dyn UserReader,
hasher: &dyn PasswordHasher,
auth: &dyn AuthService,
input: LoginInput,

View File

@@ -1,14 +1,14 @@
use domain::{
errors::DomainError,
events::DomainEvent,
models::{
actor_connection_summary::ActorConnectionSummary,
feed::{FeedEntry, PageParams, Paginated},
remote_actor::RemoteActor,
},
ports::{
ActivityPubRepository, EventPublisher, FederationActionPort, FeedRepository,
FollowRepository, RemoteActorConnectionRepository, UserRepository,
ActivityPubRepository, EventPublisher, FederationActionPort, FederationFollowPort,
FederationFollowRequestPort, FederationSchedulerPort, FeedRepository, FollowRepository,
RemoteActorConnectionRepository, UserReader,
},
value_objects::UserId,
};
@@ -16,14 +16,14 @@ use domain::{
use super::social;
pub async fn list_pending_requests(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowRequestPort,
user_id: &UserId,
) -> Result<Vec<RemoteActor>, DomainError> {
federation.get_pending_followers(user_id).await
}
pub async fn accept_follow_request(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowRequestPort,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError> {
@@ -31,7 +31,7 @@ pub async fn accept_follow_request(
}
pub async fn reject_follow_request(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowRequestPort,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError> {
@@ -39,14 +39,14 @@ pub async fn reject_follow_request(
}
pub async fn list_remote_followers(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowRequestPort,
user_id: &UserId,
) -> Result<Vec<RemoteActor>, DomainError> {
federation.get_remote_followers(user_id).await
}
pub async fn remove_remote_follower(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowRequestPort,
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError> {
@@ -54,7 +54,7 @@ pub async fn remove_remote_follower(
}
pub async fn list_remote_following(
federation: &dyn FederationActionPort,
federation: &dyn FederationFollowPort,
user_id: &UserId,
) -> Result<Vec<RemoteActor>, DomainError> {
federation.get_remote_following(user_id).await
@@ -62,8 +62,8 @@ pub async fn list_remote_following(
pub async fn remove_remote_following(
follows: &dyn FollowRepository,
users: &dyn UserRepository,
federation: &dyn FederationActionPort,
users: &dyn UserReader,
federation: &dyn FederationFollowPort,
events: &dyn EventPublisher,
user_id: &UserId,
handle: &str,
@@ -75,24 +75,20 @@ pub async fn get_remote_actor_posts(
federation: &dyn FederationActionPort,
ap_repo: &dyn ActivityPubRepository,
feed: &dyn FeedRepository,
events: &dyn EventPublisher,
scheduler: &dyn FederationSchedulerPort,
handle: &str,
page: PageParams,
viewer_id: Option<&UserId>,
) -> Result<Paginated<FeedEntry>, DomainError> {
let actor = federation.lookup_actor(handle).await?;
let ap_url = url::Url::parse(&actor.url).map_err(|e| DomainError::Internal(e.to_string()))?;
let author_id = match ap_repo.find_remote_actor_id(&ap_url).await? {
let author_id = match ap_repo.find_remote_actor_id(&actor.url).await? {
Some(id) => id,
None => ap_repo.intern_remote_actor(&ap_url).await?,
None => ap_repo.intern_remote_actor(&actor.url).await?,
};
let result = feed.user_feed(&author_id, &page, viewer_id).await?;
if let Some(outbox_url) = actor.outbox_url {
let _ = events
.publish(&DomainEvent::FetchRemoteActorPosts {
actor_ap_url: actor.url,
outbox_url,
})
let _ = scheduler
.schedule_actor_posts_fetch(&actor.url, &outbox_url)
.await;
}
Ok(result)
@@ -103,7 +99,7 @@ const ACTOR_CONNECTIONS_CACHE_TTL_SECS: i64 = 3600;
pub async fn get_actor_connections_page(
federation: &dyn FederationActionPort,
connections: &dyn RemoteActorConnectionRepository,
events: &dyn EventPublisher,
scheduler: &dyn FederationSchedulerPort,
handle: &str,
connection_type: &str,
page: u32,
@@ -128,13 +124,8 @@ pub async fn get_actor_connections_page(
}
};
if stale {
let _ = events
.publish(&DomainEvent::FetchActorConnections {
actor_ap_url: actor.url,
collection_url,
connection_type: connection_type.to_string(),
page,
})
let _ = scheduler
.schedule_connections_fetch(&actor.url, &collection_url, connection_type, page)
.await;
}
let has_more = items.len() >= PAGE_SIZE;

View File

@@ -4,7 +4,7 @@ use domain::{
feed::{FeedEntry, PageParams, Paginated, UserSummary},
user::User,
},
ports::{FeedRepository, FollowRepository, TagRepository, UserRepository},
ports::{FeedRepository, FollowRepository, TagRepository, UserReader},
value_objects::UserId,
};
@@ -70,12 +70,12 @@ pub async fn search(
feed.search(query, &page, viewer_id).await
}
pub async fn list_users(users: &dyn UserRepository) -> Result<Vec<UserSummary>, DomainError> {
pub async fn list_users(users: &dyn UserReader) -> Result<Vec<UserSummary>, DomainError> {
users.list_with_stats().await
}
pub async fn list_users_paginated(
users: &dyn UserRepository,
users: &dyn UserReader,
page: PageParams,
) -> Result<Paginated<UserSummary>, DomainError> {
let all = users.list_with_stats().await?;

View File

@@ -4,6 +4,5 @@ pub mod federation_management;
pub mod feed;
pub mod notifications;
pub mod profile;
pub mod search;
pub mod social;
pub mod thoughts;

View File

@@ -4,11 +4,11 @@ use domain::{
errors::DomainError,
events::DomainEvent,
models::{top_friend::TopFriend, user::User},
ports::{EventPublisher, TopFriendRepository, UserRepository},
ports::{EventPublisher, TopFriendRepository, UserReader, UserWriter},
value_objects::{UserId, Username},
};
pub async fn get_user(users: &dyn UserRepository, user_id: &UserId) -> Result<User, DomainError> {
pub async fn get_user(users: &dyn UserReader, user_id: &UserId) -> Result<User, DomainError> {
users
.find_by_id(user_id)
.await?
@@ -16,7 +16,7 @@ pub async fn get_user(users: &dyn UserRepository, user_id: &UserId) -> Result<Us
}
pub async fn get_user_by_username(
users: &dyn UserRepository,
users: &dyn UserReader,
username: &str,
) -> Result<User, DomainError> {
let username = Username::new(username).map_err(|_| DomainError::NotFound)?;
@@ -28,7 +28,7 @@ pub async fn get_user_by_username(
/// Resolve a path segment that is either a UUID (AP actor URL) or a username.
pub async fn get_user_by_id_or_username(
users: &dyn UserRepository,
users: &dyn UserReader,
id_or_username: &str,
) -> Result<User, DomainError> {
if let Ok(uuid) = uuid::Uuid::parse_str(id_or_username) {
@@ -43,7 +43,7 @@ pub async fn get_user_by_id_or_username(
#[allow(clippy::too_many_arguments)]
pub async fn update_profile(
users: &dyn UserRepository,
users: &dyn UserWriter,
events: &dyn EventPublisher,
user_id: &UserId,
display_name: Option<String>,

View File

@@ -1,26 +0,0 @@
use domain::{
errors::DomainError,
models::{
feed::{FeedEntry, PageParams, Paginated},
user::User,
},
ports::SearchPort,
value_objects::UserId,
};
pub async fn search_thoughts(
search: &dyn SearchPort,
query: &str,
page: PageParams,
viewer_id: Option<&UserId>,
) -> Result<Paginated<FeedEntry>, DomainError> {
search.search_thoughts(query, &page, viewer_id).await
}
pub async fn search_users(
search: &dyn SearchPort,
query: &str,
page: PageParams,
) -> Result<Paginated<User>, DomainError> {
search.search_users(query, &page).await
}

View File

@@ -4,8 +4,8 @@ use domain::{
events::DomainEvent,
models::social::{Block, Boost, Follow, FollowState, Like},
ports::{
BlockRepository, BoostRepository, EventPublisher, FederationActionPort, FollowRepository,
LikeRepository, UserRepository,
BlockRepository, BoostRepository, EventPublisher, FederationFollowPort, FollowRepository,
LikeRepository, UserReader,
},
value_objects::{BoostId, LikeId, ThoughtId, UserId, Username},
};
@@ -92,8 +92,8 @@ pub async fn unboost_thought(
pub async fn follow_actor(
follows: &dyn FollowRepository,
users: &dyn UserRepository,
federation: &dyn FederationActionPort,
users: &dyn UserReader,
federation: &dyn FederationFollowPort,
events: &dyn EventPublisher,
follower_id: &UserId,
username: &str,
@@ -139,8 +139,8 @@ pub async fn follow_user(
pub async fn unfollow_actor(
follows: &dyn FollowRepository,
users: &dyn UserRepository,
federation: &dyn FederationActionPort,
users: &dyn UserReader,
federation: &dyn FederationFollowPort,
events: &dyn EventPublisher,
follower_id: &UserId,
username: &str,
@@ -212,7 +212,7 @@ pub async fn reject_follow(
pub async fn block_by_username(
blocks: &dyn BlockRepository,
users: &dyn UserRepository,
users: &dyn UserReader,
events: &dyn EventPublisher,
blocker_id: &UserId,
username: &str,
@@ -227,7 +227,7 @@ pub async fn block_by_username(
pub async fn unblock_by_username(
blocks: &dyn BlockRepository,
users: &dyn UserRepository,
users: &dyn UserReader,
events: &dyn EventPublisher,
blocker_id: &UserId,
username: &str,

View File

@@ -2,7 +2,7 @@ use domain::{
errors::DomainError,
events::DomainEvent,
models::thought::{Thought, Visibility},
ports::{EventPublisher, TagRepository, ThoughtRepository, UserRepository},
ports::{EventPublisher, TagRepository, ThoughtRepository, UserReader},
value_objects::{Content, ThoughtId, UserId},
};
@@ -51,7 +51,7 @@ pub struct CreateThoughtOutput {
pub async fn create_thought(
thoughts: &dyn ThoughtRepository,
_users: &dyn UserRepository,
_users: &dyn UserReader,
tags: &dyn TagRepository,
events: &dyn EventPublisher,
input: CreateThoughtInput,

View File

@@ -116,6 +116,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
ap_repo: Arc::new(PgActivityPubRepository::new(pool.clone())),
remote_actor_connections: Arc::new(PgRemoteActorConnectionRepository::new(pool.clone())),
federation_scheduler: ap_service.clone() as Arc<dyn domain::ports::FederationSchedulerPort>,
};
Infrastructure { state, ap_service }

View File

@@ -63,16 +63,6 @@ pub enum DomainEvent {
ProfileUpdated {
user_id: UserId,
},
FetchRemoteActorPosts {
actor_ap_url: String,
outbox_url: String,
},
FetchActorConnections {
actor_ap_url: String,
collection_url: String,
connection_type: String,
page: u32,
},
MentionReceived {
thought_id: ThoughtId,
mentioned_user_id: UserId,

View File

@@ -1,6 +1,30 @@
use crate::models::{thought::Thought, user::User};
use crate::value_objects::UserId;
#[derive(Debug, Clone)]
pub struct EngagementStats {
pub like_count: i64,
pub boost_count: i64,
pub reply_count: i64,
}
/// Present only when an authenticated viewer made the request.
/// `liked`/`boosted` are the viewer's interaction state with this thought.
/// `None` means anonymous request or viewer context unavailable.
#[derive(Debug, Clone)]
pub struct ViewerContext {
pub liked: bool,
pub boosted: bool,
}
#[derive(Debug, Clone)]
pub struct FeedEntry {
pub thought: Thought,
pub author: User,
pub stats: EngagementStats,
pub viewer: Option<ViewerContext>,
}
#[derive(Debug, Clone)]
pub struct UserSummary {
pub id: UserId,
@@ -13,17 +37,6 @@ pub struct UserSummary {
pub following_count: i64,
}
#[derive(Debug, Clone)]
pub struct FeedEntry {
pub thought: Thought,
pub author: User,
pub like_count: i64,
pub boost_count: i64,
pub reply_count: i64,
pub liked_by_viewer: bool,
pub boosted_by_viewer: bool,
}
#[derive(Debug, Clone)]
pub struct PageParams {
pub page: u64,

View File

@@ -1,22 +1,66 @@
use crate::value_objects::{NotificationId, ThoughtId, UserId};
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NotificationType {
Like,
Boost,
Follow,
Mention,
Reply,
#[derive(Debug, Clone, PartialEq)]
pub enum NotificationKind {
Like {
thought_id: ThoughtId,
from_user_id: UserId,
},
Boost {
thought_id: ThoughtId,
from_user_id: UserId,
},
Reply {
thought_id: ThoughtId,
from_user_id: UserId,
},
Mention {
thought_id: ThoughtId,
from_user_id: UserId,
},
Follow {
from_user_id: UserId,
},
}
impl NotificationKind {
pub fn from_user_id(&self) -> &UserId {
match self {
Self::Like { from_user_id, .. } => from_user_id,
Self::Boost { from_user_id, .. } => from_user_id,
Self::Reply { from_user_id, .. } => from_user_id,
Self::Mention { from_user_id, .. } => from_user_id,
Self::Follow { from_user_id } => from_user_id,
}
}
pub fn thought_id(&self) -> Option<&ThoughtId> {
match self {
Self::Like { thought_id, .. } => Some(thought_id),
Self::Boost { thought_id, .. } => Some(thought_id),
Self::Reply { thought_id, .. } => Some(thought_id),
Self::Mention { thought_id, .. } => Some(thought_id),
Self::Follow { .. } => None,
}
}
pub fn kind_str(&self) -> &'static str {
match self {
Self::Like { .. } => "like",
Self::Boost { .. } => "boost",
Self::Reply { .. } => "reply",
Self::Mention { .. } => "mention",
Self::Follow { .. } => "follow",
}
}
}
#[derive(Debug, Clone)]
pub struct Notification {
pub id: NotificationId,
pub user_id: UserId,
pub notification_type: NotificationType,
pub from_user_id: Option<UserId>,
pub thought_id: Option<ThoughtId>,
pub kind: NotificationKind,
pub read: bool,
pub created_at: DateTime<Utc>,
}

View File

@@ -5,11 +5,7 @@ pub struct RemoteActor {
pub url: String,
pub handle: String,
pub display_name: Option<String>,
pub inbox_url: String,
pub shared_inbox_url: Option<String>,
pub public_key: String,
pub avatar_url: Option<String>,
pub last_fetched_at: DateTime<Utc>,
pub bio: Option<String>,
pub banner_url: Option<String>,
pub also_known_as: Option<String>,
@@ -17,4 +13,5 @@ pub struct RemoteActor {
pub followers_url: Option<String>,
pub following_url: Option<String>,
pub attachment: Vec<(String, String)>,
pub last_fetched_at: DateTime<Utc>,
}

View File

@@ -35,11 +35,14 @@ impl FollowState {
}
}
pub fn from_db_str(s: &str) -> Self {
pub fn from_db_str(s: &str) -> Result<Self, crate::errors::DomainError> {
match s {
"pending" => Self::Pending,
"rejected" => Self::Rejected,
_ => Self::Accepted,
"pending" => Ok(Self::Pending),
"accepted" => Ok(Self::Accepted),
"rejected" => Ok(Self::Rejected),
other => Err(crate::errors::DomainError::Internal(format!(
"unknown follow_state: '{other}'"
))),
}
}
}

View File

@@ -15,8 +15,6 @@ pub struct Thought {
pub user_id: UserId,
pub content: Content,
pub in_reply_to_id: Option<ThoughtId>,
pub in_reply_to_url: Option<String>,
pub ap_id: Option<String>,
pub visibility: Visibility,
pub content_warning: Option<String>,
pub sensitive: bool,
@@ -35,12 +33,15 @@ impl Visibility {
}
}
pub fn from_db_str(s: &str) -> Self {
pub fn from_db_str(s: &str) -> Result<Self, crate::errors::DomainError> {
match s {
"followers" => Self::Followers,
"unlisted" => Self::Unlisted,
"direct" => Self::Direct,
_ => Self::Public,
"public" => Ok(Self::Public),
"followers" => Ok(Self::Followers),
"unlisted" => Ok(Self::Unlisted),
"direct" => Ok(Self::Direct),
other => Err(crate::errors::DomainError::Internal(format!(
"unknown visibility: '{other}'"
))),
}
}
}
@@ -60,8 +61,6 @@ impl Thought {
user_id,
content,
in_reply_to_id,
in_reply_to_url: None,
ap_id: None,
visibility,
content_warning,
sensitive,

View File

@@ -13,8 +13,6 @@ pub struct User {
pub header_url: Option<String>,
pub custom_css: Option<String>,
pub local: bool,
pub ap_id: Option<String>,
pub inbox_url: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -38,8 +36,6 @@ impl User {
header_url: None,
custom_css: None,
local: true,
ap_id: None,
inbox_url: None,
created_at: now,
updated_at: now,
}

View File

@@ -45,10 +45,16 @@ pub trait EventConsumer: Send + Sync {
}
#[async_trait]
pub trait UserRepository: Send + Sync {
pub trait UserReader: Send + Sync {
async fn find_by_id(&self, id: &UserId) -> Result<Option<User>, DomainError>;
async fn find_by_username(&self, username: &Username) -> Result<Option<User>, DomainError>;
async fn find_by_email(&self, email: &Email) -> Result<Option<User>, DomainError>;
async fn list_with_stats(&self) -> Result<Vec<UserSummary>, DomainError>;
async fn count(&self) -> Result<i64, DomainError>;
}
#[async_trait]
pub trait UserWriter: Send + Sync {
async fn save(&self, user: &User) -> Result<(), DomainError>;
async fn update_profile(
&self,
@@ -59,10 +65,13 @@ pub trait UserRepository: Send + Sync {
header_url: Option<String>,
custom_css: Option<String>,
) -> Result<(), DomainError>;
async fn list_with_stats(&self) -> Result<Vec<UserSummary>, DomainError>;
async fn count(&self) -> Result<i64, DomainError>;
}
/// Combined supertrait — `AppState.users` stays `Arc<dyn UserRepository>`.
/// Blanket impl: any type implementing both sub-traits gets `UserRepository` for free.
pub trait UserRepository: UserReader + UserWriter {}
impl<T: UserReader + UserWriter> UserRepository for T {}
#[async_trait]
pub trait ThoughtRepository: Send + Sync {
async fn save(&self, thought: &Thought) -> Result<(), DomainError>;
@@ -221,14 +230,35 @@ pub trait RemoteActorConnectionRepository: Send + Sync {
}
#[async_trait]
pub trait FederationActionPort: Send + Sync {
pub trait FederationLookupPort: Send + Sync {
async fn lookup_actor(&self, handle: &str) -> Result<RemoteActor, DomainError>;
async fn actor_json(&self, user_id: &UserId) -> Result<String, DomainError>;
async fn followers_collection_json(
&self,
user_id: &UserId,
page: Option<u32>,
) -> Result<String, DomainError>;
async fn following_collection_json(
&self,
user_id: &UserId,
page: Option<u32>,
) -> Result<String, DomainError>;
}
#[async_trait]
pub trait FederationFollowPort: Send + Sync {
async fn follow_remote(&self, local_user_id: &UserId, handle: &str) -> Result<(), DomainError>;
async fn unfollow_remote(
&self,
local_user_id: &UserId,
handle: &str,
) -> Result<(), DomainError>;
async fn get_remote_following(&self, user_id: &UserId)
-> Result<Vec<RemoteActor>, DomainError>;
}
#[async_trait]
pub trait FederationFollowRequestPort: Send + Sync {
async fn get_pending_followers(
&self,
user_id: &UserId,
@@ -250,36 +280,38 @@ pub trait FederationActionPort: Send + Sync {
user_id: &UserId,
actor_url: &str,
) -> Result<(), DomainError>;
async fn get_remote_following(&self, user_id: &UserId)
-> Result<Vec<RemoteActor>, DomainError>;
async fn actor_json(&self, user_id: &UserId) -> Result<String, DomainError>;
async fn followers_collection_json(
&self,
user_id: &UserId,
page: Option<u32>,
) -> Result<String, DomainError>;
async fn following_collection_json(
&self,
user_id: &UserId,
page: Option<u32>,
) -> Result<String, DomainError>;
}
#[async_trait]
pub trait FederationFetchPort: Send + Sync {
async fn fetch_outbox_page(
&self,
outbox_url: &str,
page: u32,
) -> Result<Vec<crate::models::remote_note::RemoteNote>, DomainError>;
async fn fetch_actor_urls_from_collection(
&self,
collection_url: &str,
) -> Result<Vec<String>, DomainError>;
async fn resolve_actor_profiles(
&self,
urls: Vec<String>,
) -> Vec<crate::models::actor_connection_summary::ActorConnectionSummary>;
}
pub trait FederationActionPort:
FederationLookupPort + FederationFollowPort + FederationFollowRequestPort + FederationFetchPort
{
}
impl<
T: FederationLookupPort
+ FederationFollowPort
+ FederationFollowRequestPort
+ FederationFetchPort,
> FederationActionPort for T
{
}
#[async_trait]
pub trait FeedRepository: Send + Sync {
async fn home_feed(
@@ -331,6 +363,13 @@ pub trait SearchPort: Send + Sync {
) -> Result<Paginated<User>, DomainError>;
}
/// AP-protocol endpoints for a locally-stored user (local or interned remote).
#[derive(Debug, Clone)]
pub struct ActorApUrls {
pub ap_id: String,
pub inbox_url: String,
}
/// A local thought ready for AP serialization, with the author's username
/// pre-joined so the handler can build AP URLs without a second query.
#[derive(Debug, Clone)]
@@ -362,14 +401,12 @@ pub trait ActivityPubRepository: Send + Sync {
// ── Remote actor resolution ──────────────────────────────────────
/// Find the local UserId for a remote actor by its AP URL.
async fn find_remote_actor_id(
&self,
actor_ap_url: &url::Url,
) -> Result<Option<UserId>, DomainError>;
async fn find_remote_actor_id(&self, actor_ap_url: &str)
-> Result<Option<UserId>, DomainError>;
/// Ensure a remote actor placeholder exists; create one if absent.
/// Idempotent — safe to call multiple times with the same URL.
async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result<UserId, DomainError>;
async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError>;
/// Update display_name and avatar_url for an already-interned remote actor.
async fn update_remote_actor_display(
@@ -385,34 +422,42 @@ pub trait ActivityPubRepository: Send + Sync {
#[allow(clippy::too_many_arguments)]
async fn accept_note(
&self,
ap_id: &url::Url,
ap_id: &str,
author_id: &UserId,
content: &str,
published: chrono::DateTime<chrono::Utc>,
sensitive: bool,
content_warning: Option<String>,
visibility: &str,
in_reply_to: Option<&url::Url>,
in_reply_to: Option<&str>,
) -> Result<(), DomainError>;
/// Apply an Update to a previously accepted remote Note.
async fn apply_note_update(
&self,
ap_id: &url::Url,
new_content: &str,
) -> Result<(), DomainError>;
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>;
/// Remove a specific remote Note (Delete activity). Only touches
/// remotely-originated thoughts.
async fn retract_note(&self, ap_id: &url::Url) -> Result<(), DomainError>;
async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>;
/// Remove all Notes from a remote actor (actor-level Delete/Tombstone).
async fn retract_actor_notes(&self, actor_ap_url: &url::Url) -> Result<(), DomainError>;
async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>;
// ── Node-level stats ─────────────────────────────────────────────
/// Total locally-authored thought count for NodeInfo responses.
async fn count_local_notes(&self) -> Result<u64, DomainError>;
/// Return the ActivityPub object URL for a thought, if one is stored.
/// Returns None for local thoughts (caller constructs URL from base_url + thought_id).
async fn get_thought_ap_id(
&self,
thought_id: &ThoughtId,
) -> Result<Option<String>, DomainError>;
/// Return the AP actor URL and inbox URL for a user, if stored.
/// Returns None for users that have not been federated.
async fn get_actor_ap_urls(&self, user_id: &UserId)
-> Result<Option<ActorApUrls>, DomainError>;
}
#[async_trait]
@@ -423,6 +468,7 @@ pub trait OutboundFederationPort: Send + Sync {
author_user_id: &UserId,
thought: &Thought,
author_username: &str,
in_reply_to_url: Option<&str>,
) -> Result<(), DomainError>;
/// Fan out a Delete tombstone for a now-deleted local Note.
@@ -440,6 +486,7 @@ pub trait OutboundFederationPort: Send + Sync {
author_user_id: &UserId,
thought: &Thought,
author_username: &str,
in_reply_to_url: Option<&str>,
) -> Result<(), DomainError>;
/// Fan out an Announce(object_ap_id) for a boost.
@@ -476,3 +523,20 @@ pub trait OutboundFederationPort: Send + Sync {
/// Fan out an Update(Actor) to all accepted followers after a profile change.
async fn broadcast_actor_update(&self, user_id: &UserId) -> Result<(), DomainError>;
}
#[async_trait]
pub trait FederationSchedulerPort: Send + Sync {
async fn schedule_actor_posts_fetch(
&self,
actor_ap_url: &str,
outbox_url: &str,
) -> Result<(), DomainError>;
async fn schedule_connections_fetch(
&self,
actor_ap_url: &str,
collection_url: &str,
connection_type: &str,
page: u32,
) -> Result<(), DomainError>;
}

View File

@@ -19,8 +19,8 @@ use crate::{
};
use async_trait::async_trait;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use url;
#[derive(Default, Clone)]
pub struct TestStore {
@@ -35,10 +35,16 @@ pub struct TestStore {
pub top_friends: Arc<Mutex<Vec<TopFriend>>>,
pub notifications: Arc<Mutex<Vec<Notification>>>,
pub events: Arc<Mutex<Vec<DomainEvent>>>,
/// AP URL → UserId for remote actors (used by find_remote_actor_id / intern_remote_actor)
pub actor_ap_ids: Arc<Mutex<HashMap<String, UserId>>>,
/// ThoughtId → AP object URL (used by get_thought_ap_id)
pub thought_ap_ids: Arc<Mutex<HashMap<ThoughtId, String>>>,
/// UserId → ActorApUrls (used by get_actor_ap_urls)
pub actor_ap_urls: Arc<Mutex<HashMap<UserId, ActorApUrls>>>,
}
#[async_trait]
impl UserRepository for TestStore {
impl UserReader for TestStore {
async fn find_by_id(&self, id: &UserId) -> Result<Option<User>, DomainError> {
Ok(self
.users
@@ -66,6 +72,22 @@ impl UserRepository for TestStore {
.find(|u| u.email.as_str() == email.as_str())
.cloned())
}
async fn list_with_stats(&self) -> Result<Vec<UserSummary>, DomainError> {
Ok(vec![])
}
async fn count(&self) -> Result<i64, DomainError> {
Ok(self
.users
.lock()
.unwrap()
.iter()
.filter(|u| u.local)
.count() as i64)
}
}
#[async_trait]
impl UserWriter for TestStore {
async fn save(&self, user: &User) -> Result<(), DomainError> {
let mut g = self.users.lock().unwrap();
g.retain(|u| u.id != user.id);
@@ -96,18 +118,6 @@ impl UserRepository for TestStore {
}
Ok(())
}
async fn list_with_stats(&self) -> Result<Vec<UserSummary>, DomainError> {
Ok(vec![])
}
async fn count(&self) -> Result<i64, DomainError> {
Ok(self
.users
.lock()
.unwrap()
.iter()
.filter(|u| u.local)
.count() as i64)
}
}
#[async_trait]
@@ -544,11 +554,34 @@ impl RemoteActorRepository for TestStore {
}
#[async_trait]
impl FederationActionPort for TestStore {
impl FederationLookupPort for TestStore {
async fn lookup_actor(&self, _handle: &str) -> Result<RemoteActor, DomainError> {
Err(DomainError::NotFound)
}
async fn actor_json(&self, _user_id: &UserId) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
async fn followers_collection_json(
&self,
_user_id: &UserId,
_page: Option<u32>,
) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
async fn following_collection_json(
&self,
_user_id: &UserId,
_page: Option<u32>,
) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
}
#[async_trait]
impl FederationFollowPort for TestStore {
async fn follow_remote(
&self,
_local_user_id: &UserId,
@@ -565,6 +598,16 @@ impl FederationActionPort for TestStore {
Ok(())
}
async fn get_remote_following(
&self,
_user_id: &UserId,
) -> Result<Vec<RemoteActor>, DomainError> {
Ok(vec![])
}
}
#[async_trait]
impl FederationFollowRequestPort for TestStore {
async fn get_pending_followers(
&self,
_user_id: &UserId,
@@ -602,34 +645,10 @@ impl FederationActionPort for TestStore {
) -> Result<(), DomainError> {
Ok(())
}
}
async fn get_remote_following(
&self,
_user_id: &UserId,
) -> Result<Vec<RemoteActor>, DomainError> {
Ok(vec![])
}
async fn actor_json(&self, _user_id: &UserId) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
async fn followers_collection_json(
&self,
_user_id: &UserId,
_page: Option<u32>,
) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
async fn following_collection_json(
&self,
_user_id: &UserId,
_page: Option<u32>,
) -> Result<String, DomainError> {
Err(DomainError::NotFound)
}
#[async_trait]
impl FederationFetchPort for TestStore {
async fn fetch_outbox_page(
&self,
_outbox_url: &str,
@@ -800,26 +819,18 @@ impl ActivityPubRepository for TestStore {
}
async fn find_remote_actor_id(
&self,
actor_ap_url: &url::Url,
actor_ap_url: &str,
) -> Result<Option<UserId>, DomainError> {
let url = actor_ap_url.to_string();
Ok(self
.users
.lock()
.unwrap()
.iter()
.find(|u| u.ap_id.as_deref() == Some(&url))
.map(|u| u.id.clone()))
Ok(self.actor_ap_ids.lock().unwrap().get(actor_ap_url).cloned())
}
async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result<UserId, DomainError> {
async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError> {
if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? {
return Ok(uid);
}
let uid = UserId::new();
let handle = actor_ap_url
.path()
.trim_start_matches('/')
.replace('/', "_");
let handle = url::Url::parse(actor_ap_url)
.map(|u| u.path().trim_start_matches('/').replace('/', "_"))
.unwrap_or_else(|_| format!("remote_{}", &uid.to_string()[..8]));
let user = crate::models::user::User {
id: uid.clone(),
username: Username::from_trusted(handle.clone()),
@@ -831,12 +842,14 @@ impl ActivityPubRepository for TestStore {
header_url: None,
custom_css: None,
local: false,
ap_id: Some(actor_ap_url.to_string()),
inbox_url: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
self.users.lock().unwrap().push(user);
self.actor_ap_ids
.lock()
.unwrap()
.insert(actor_ap_url.to_string(), uid.clone());
Ok(uid)
}
async fn update_remote_actor_display(
@@ -849,28 +862,24 @@ impl ActivityPubRepository for TestStore {
}
async fn accept_note(
&self,
_ap_id: &url::Url,
_ap_id: &str,
_author_id: &UserId,
_content: &str,
_published: chrono::DateTime<chrono::Utc>,
_sensitive: bool,
_content_warning: Option<String>,
_visibility: &str,
_in_reply_to: Option<&url::Url>,
_in_reply_to: Option<&str>,
) -> Result<(), DomainError> {
Ok(())
}
async fn apply_note_update(
&self,
_ap_id: &url::Url,
_new_content: &str,
) -> Result<(), DomainError> {
async fn apply_note_update(&self, _ap_id: &str, _new_content: &str) -> Result<(), DomainError> {
Ok(())
}
async fn retract_note(&self, _ap_id: &url::Url) -> Result<(), DomainError> {
async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> {
Ok(())
}
async fn retract_actor_notes(&self, _actor_ap_url: &url::Url) -> Result<(), DomainError> {
async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> {
Ok(())
}
async fn count_local_notes(&self) -> Result<u64, DomainError> {
@@ -882,6 +891,34 @@ impl ActivityPubRepository for TestStore {
.filter(|t| t.local)
.count() as u64)
}
async fn get_thought_ap_id(
&self,
thought_id: &ThoughtId,
) -> Result<Option<String>, DomainError> {
Ok(self.thought_ap_ids.lock().unwrap().get(thought_id).cloned())
}
async fn get_actor_ap_urls(
&self,
user_id: &UserId,
) -> Result<Option<ActorApUrls>, DomainError> {
Ok(self.actor_ap_urls.lock().unwrap().get(user_id).cloned())
}
}
#[async_trait]
impl FederationSchedulerPort for TestStore {
async fn schedule_actor_posts_fetch(&self, _: &str, _: &str) -> Result<(), DomainError> {
Ok(())
}
async fn schedule_connections_fetch(
&self,
_: &str,
_: &str,
_: &str,
_: u32,
) -> Result<(), DomainError> {
Ok(())
}
}
#[async_trait]
@@ -918,9 +955,9 @@ mod ap_repo_tests {
#[tokio::test]
async fn test_store_intern_creates_placeholder() {
let store = TestStore::default();
let url = url::Url::parse("https://example.com/users/alice").unwrap();
let id1 = store.intern_remote_actor(&url).await.unwrap();
let id2 = store.intern_remote_actor(&url).await.unwrap();
let url = "https://example.com/users/alice";
let id1 = store.intern_remote_actor(url).await.unwrap();
let id2 = store.intern_remote_actor(url).await.unwrap();
assert_eq!(id1, id2, "intern must be idempotent");
}
}

View File

@@ -29,7 +29,7 @@ pub async fn remote_actor_posts_handler(
&*s.federation,
&*s.ap_repo,
&*s.feed,
&*s.events,
&*s.federation_scheduler,
&handle,
page,
viewer.as_ref(),
@@ -68,7 +68,7 @@ async fn actor_connections_handler(
let (items, has_more) = get_actor_connections_page(
&*s.federation,
&*s.remote_actor_connections,
&*s.events,
&*s.federation_scheduler,
&handle,
connection_type,
page,

View File

@@ -11,7 +11,6 @@ use application::use_cases::feed::{
get_popular_tags as uc_get_popular_tags, get_public_feed, get_user_feed,
};
use application::use_cases::profile::{get_user_by_id_or_username, get_user_by_username};
use application::use_cases::search::{search_thoughts, search_users};
use axum::{
extract::{Path, Query, State},
http::{header, HeaderMap},
@@ -26,15 +25,15 @@ pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtRespon
content: e.thought.content.as_str().to_string(),
author: to_user_response(&e.author),
in_reply_to_id: e.thought.in_reply_to_id.as_ref().map(|id| id.as_uuid()),
in_reply_to_url: e.thought.in_reply_to_url.clone(),
in_reply_to_url: None,
visibility: e.thought.visibility.as_str().to_string(),
content_warning: e.thought.content_warning.clone(),
sensitive: e.thought.sensitive,
like_count: e.like_count,
boost_count: e.boost_count,
reply_count: e.reply_count,
liked_by_viewer: e.liked_by_viewer,
boosted_by_viewer: e.boosted_by_viewer,
like_count: e.stats.like_count,
boost_count: e.stats.boost_count,
reply_count: e.stats.reply_count,
liked_by_viewer: e.viewer.as_ref().map(|v| v.liked).unwrap_or(false),
boosted_by_viewer: e.viewer.as_ref().map(|v| v.boosted).unwrap_or(false),
created_at: e.thought.created_at,
updated_at: e.thought.updated_at,
}
@@ -104,39 +103,14 @@ pub async fn search_handler(
let query = q.q.trim().to_string();
let (thoughts_result, users_result) = tokio::join!(
search_thoughts(
&*s.search,
&query,
PageParams {
page: page.page,
per_page: page.per_page
},
viewer.as_ref()
),
search_users(
&*s.search,
&query,
PageParams {
page: page.page,
per_page: page.per_page
}
),
s.search.search_thoughts(&query, &page, viewer.as_ref()),
s.search.search_users(&query, &page),
);
let thoughts = thoughts_result?
.items
.into_iter()
.map(|e| {
serde_json::json!({
"id": e.thought.id.as_uuid(),
"content": e.thought.content.as_str(),
"author": to_user_response(&e.author),
"like_count": e.like_count,
"boost_count": e.boost_count,
"reply_count": e.reply_count,
"created_at": e.thought.created_at,
})
})
.iter()
.map(to_thought_response)
.collect::<Vec<_>>();
let users = users_result?

View File

@@ -12,7 +12,6 @@ use application::use_cases::feed::list_users_paginated;
use application::use_cases::profile::{
get_user as fetch_user, get_user_by_id_or_username, update_profile,
};
use application::use_cases::search::search_users;
use axum::{
extract::{Path, Query, State},
http::{header, HeaderMap},
@@ -135,7 +134,7 @@ pub async fn get_users(
let page_params = PageParams { page, per_page };
if let Some(q) = params.get("q").filter(|q| !q.trim().is_empty()) {
let result = search_users(&*s.search, q, page_params).await?;
let result = s.search.search_users(q, &page_params).await?;
let users: Vec<_> = result
.items
.iter()

View File

@@ -22,4 +22,5 @@ pub struct AppState {
pub federation: Arc<dyn FederationActionPort>,
pub ap_repo: Arc<dyn ActivityPubRepository>,
pub remote_actor_connections: Arc<dyn RemoteActorConnectionRepository>,
pub federation_scheduler: Arc<dyn FederationSchedulerPort>,
}

View File

@@ -51,5 +51,6 @@ pub fn make_state() -> AppState {
federation: store.clone(),
ap_repo: store.clone(),
remote_actor_connections: store.clone(),
federation_scheduler: store.clone(),
}
}

View File

@@ -4,9 +4,8 @@ use std::sync::Arc;
use activitypub::ThoughtsObjectHandler;
use activitypub_base::ActivityPubService;
use application::services::{FederationEventService, NotificationEventService};
use domain::ports::{ActivityPubRepository, FederationActionPort, OutboundFederationPort};
use domain::ports::{ActivityPubRepository, OutboundFederationPort};
use postgres::activitypub::PgActivityPubRepository;
use postgres::remote_actor_connections::PgRemoteActorConnectionRepository;
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
use crate::handlers::{FederationHandler, NotificationHandler};
@@ -58,11 +57,8 @@ pub async fn build(
.expect("ActivityPubService build failed"),
);
let ap_outbound = ap_service.clone() as Arc<dyn OutboundFederationPort>;
let ap_federation = ap_service.clone() as Arc<dyn FederationActionPort>;
let ap_repo_worker =
Arc::new(PgActivityPubRepository::new(pool.clone())) as Arc<dyn ActivityPubRepository>;
let actor_connections = Arc::new(PgRemoteActorConnectionRepository::new(pool.clone()))
as Arc<dyn domain::ports::RemoteActorConnectionRepository>;
// Application services
let notification_svc = Arc::new(NotificationEventService {
@@ -74,9 +70,7 @@ pub async fn build(
users,
ap: ap_outbound,
base_url: base_url.to_string(),
federation_action: ap_federation,
ap_repo: ap_repo_worker,
remote_actor_connections: actor_connections,
});
// Thin handlers

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,492 @@
# FeedEntry Decoupling Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace the flat `liked_by_viewer`/`boosted_by_viewer` booleans and inline stats fields on `FeedEntry` with two named sub-structs (`EngagementStats`, `Option<ViewerContext>`), and fix the search adapter to compute real viewer context instead of hardcoding `false`.
**Architecture:** Three sequential tasks. Task 1 changes the domain model, which breaks compilation. Task 2 fixes all downstream construction sites and restores compilation. Task 3 adds the functional improvement — viewer-aware SQL in the search adapter.
**Tech Stack:** Rust, SQLx, Postgres trigram search (`pg_trgm`).
---
### Task 1: Add `EngagementStats` and `ViewerContext` to the domain model
**Files:**
- Modify: `crates/domain/src/models/feed.rs`
- [ ] **Step 1: Replace the flat fields on `FeedEntry` with two named sub-structs**
Replace the entire contents of `crates/domain/src/models/feed.rs` with:
```rust
use crate::models::{thought::Thought, user::User};
use crate::value_objects::UserId;
#[derive(Debug, Clone)]
pub struct EngagementStats {
pub like_count: i64,
pub boost_count: i64,
pub reply_count: i64,
}
/// Present only when an authenticated viewer made the request.
/// `liked`/`boosted` are the viewer's interaction state with this thought.
/// `None` means anonymous request or viewer context unavailable.
#[derive(Debug, Clone)]
pub struct ViewerContext {
pub liked: bool,
pub boosted: bool,
}
#[derive(Debug, Clone)]
pub struct FeedEntry {
pub thought: Thought,
pub author: User,
pub stats: EngagementStats,
pub viewer: Option<ViewerContext>,
}
#[derive(Debug, Clone)]
pub struct UserSummary {
pub id: UserId,
pub username: String,
pub display_name: Option<String>,
pub avatar_url: Option<String>,
pub bio: Option<String>,
pub thought_count: i64,
pub follower_count: i64,
pub following_count: i64,
}
#[derive(Debug, Clone)]
pub struct PageParams {
pub page: u64,
pub per_page: u64,
}
impl PageParams {
pub fn offset(&self) -> i64 {
((self.page.saturating_sub(1)) * self.per_page) as i64
}
pub fn limit(&self) -> i64 {
self.per_page as i64
}
}
#[derive(Debug, Clone)]
pub struct Paginated<T> {
pub items: Vec<T>,
pub total: i64,
pub page: u64,
pub per_page: u64,
}
```
- [ ] **Step 2: Verify the domain crate compiles (other crates will break)**
```bash
cargo check -p domain 2>&1 | head -10
```
Expected: `domain` compiles clean. Other crates (`postgres`, `postgres-search`, `presentation`) will show errors referencing the removed fields — that is expected and will be fixed in Task 2.
- [ ] **Step 3: Commit the domain model change**
```bash
git add crates/domain/src/models/feed.rs
git commit -m "refactor(domain): FeedEntry — EngagementStats + Option<ViewerContext> sub-structs"
```
---
### Task 2: Fix downstream compilation — adapters and handler
**Files:**
- Modify: `crates/adapters/postgres/src/feed.rs` (line 136 — `row_to_entry`)
- Modify: `crates/adapters/postgres-search/src/lib.rs` (line 97 — `row_to_entry`)
- Modify: `crates/presentation/src/handlers/feed.rs` (line 22 — `to_thought_response`)
- [ ] **Step 1: Update `row_to_entry` in `postgres/src/feed.rs`**
Find `row_to_entry` in `crates/adapters/postgres/src/feed.rs` (around line 109). Replace the `Ok(FeedEntry { ... })` block (currently lines 136144) with:
```rust
Ok(FeedEntry {
thought,
author,
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: Some(domain::models::feed::ViewerContext {
liked: r.liked_by_viewer,
boosted: r.boosted_by_viewer,
}),
})
```
Note: `postgres/src/feed.rs` already builds `viewer = Some(...)` unconditionally here because its `feed_select(viewer)` function always produces `liked_by_viewer`/`boosted_by_viewer` columns — `false AS liked_by_viewer` when there is no viewer, and the real EXISTS result when there is one. The `Option<ViewerContext>` distinction (`None` = anonymous) is handled by the caller's knowledge of whether a viewer was passed. To preserve the `None`-when-no-viewer semantic, read how `viewer` is passed into the calling functions and thread it through.
Actually, the correct fix: the `row_to_entry` function doesn't know if a viewer was passed. Pass the viewer `Option<uuid::Uuid>` as a parameter so it can decide:
Replace the signature of `row_to_entry`:
```rust
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
```
And change the construction:
```rust
Ok(FeedEntry {
thought,
author,
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
liked: r.liked_by_viewer,
boosted: r.boosted_by_viewer,
}),
})
```
Then update all call sites of `row_to_entry` inside `postgres/src/feed.rs`. Each `FeedRepository` method already has a `viewer` variable of type `Option<uuid::Uuid>`. Pass it through:
```rust
// Before:
.map(row_to_entry)
.collect::<Result<Vec<_>, _>>()?
// After:
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?
```
Read `crates/adapters/postgres/src/feed.rs` to find all five `impl FeedRepository` methods and update each `.map(row_to_entry)` call.
- [ ] **Step 2: Update `row_to_entry` in `postgres-search/src/lib.rs`**
In `crates/adapters/postgres-search/src/lib.rs`, find `row_to_entry` (line 70). Change the `Ok(FeedEntry { ... })` block (lines 97105) to:
```rust
Ok(FeedEntry {
thought,
author,
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: None, // Task 3 will fix this to use real viewer data
})
```
Add `EngagementStats` and `ViewerContext` to the domain import at the top if needed (they're in `domain::models::feed`). The existing import already pulls in `FeedEntry` from that module.
- [ ] **Step 3: Update `to_thought_response` in `presentation/src/handlers/feed.rs`**
Find `to_thought_response` (line 22 in `crates/presentation/src/handlers/feed.rs`). Update it to read from the new sub-structs:
```rust
pub fn to_thought_response(e: &domain::models::feed::FeedEntry) -> ThoughtResponse {
ThoughtResponse {
id: e.thought.id.as_uuid(),
content: e.thought.content.as_str().to_string(),
author: to_user_response(&e.author),
in_reply_to_id: e.thought.in_reply_to_id.as_ref().map(|id| id.as_uuid()),
in_reply_to_url: None,
visibility: e.thought.visibility.as_str().to_string(),
content_warning: e.thought.content_warning.clone(),
sensitive: e.thought.sensitive,
like_count: e.stats.like_count,
boost_count: e.stats.boost_count,
reply_count: e.stats.reply_count,
liked_by_viewer: e.viewer.as_ref().map(|v| v.liked).unwrap_or(false),
boosted_by_viewer: e.viewer.as_ref().map(|v| v.boosted).unwrap_or(false),
created_at: e.thought.created_at,
updated_at: e.thought.updated_at,
}
}
```
`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged.
- [ ] **Step 4: Compile check — full workspace must be clean**
```bash
cargo check --workspace 2>&1 | head -20
```
Expected: 0 errors. Fix any remaining references to the old flat fields (`e.like_count`, `e.liked_by_viewer`, etc.) — they must become `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)`.
- [ ] **Step 5: Commit**
```bash
git add crates/adapters/postgres/src/feed.rs \
crates/adapters/postgres-search/src/lib.rs \
crates/presentation/src/handlers/feed.rs
git commit -m "refactor(adapters): update FeedEntry construction to use EngagementStats + ViewerContext"
```
---
### Task 3: Fix search adapter — real viewer context instead of hardcoded `false`
**Files:**
- Modify: `crates/adapters/postgres-search/src/lib.rs`
The `SearchPort::search_thoughts` signature already takes `viewer_id: Option<&UserId>` (the parameter is named `_viewer_id` because it was ignored). This task makes it real.
- [ ] **Step 1: Add `liked_by_viewer` and `boosted_by_viewer` to `FeedRow`**
In `crates/adapters/postgres-search/src/lib.rs`, find the `FeedRow` struct (line 27). Add two fields at the end:
```rust
#[derive(sqlx::FromRow)]
struct FeedRow {
thought_id: uuid::Uuid,
t_user_id: uuid::Uuid,
content: String,
in_reply_to_id: Option<uuid::Uuid>,
visibility: String,
content_warning: Option<String>,
sensitive: bool,
t_local: bool,
thought_created_at: DateTime<Utc>,
updated_at: Option<DateTime<Utc>>,
author_id: uuid::Uuid,
username: String,
email: String,
password_hash: String,
display_name: Option<String>,
bio: Option<String>,
avatar_url: Option<String>,
header_url: Option<String>,
custom_css: Option<String>,
author_local: bool,
author_created_at: DateTime<Utc>,
author_updated_at: DateTime<Utc>,
like_count: i64,
boost_count: i64,
reply_count: i64,
liked_by_viewer: bool, // NEW
boosted_by_viewer: bool, // NEW
}
```
- [ ] **Step 2: Replace `FEED_SELECT` constant with a `feed_select(viewer)` function**
Delete the `const FEED_SELECT` and replace with a function that injects viewer-aware columns — identical pattern to `postgres/src/feed.rs`:
```rust
fn feed_select(viewer: Option<uuid::Uuid>) -> String {
let viewer_checks = match viewer {
Some(uid) => format!(
"EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer,
EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer"
),
None => "false AS liked_by_viewer, false AS boosted_by_viewer".to_string(),
};
format!(
"
SELECT
t.id AS thought_id, t.user_id AS t_user_id, t.content,
t.in_reply_to_id,
t.visibility, t.content_warning, t.sensitive, t.local AS t_local,
t.created_at AS thought_created_at, t.updated_at,
u.id AS author_id, u.username, u.email, u.password_hash,
u.display_name, u.bio, u.avatar_url, u.header_url, u.custom_css,
u.local AS author_local,
u.created_at AS author_created_at, u.updated_at AS author_updated_at,
(SELECT COUNT(*) FROM likes l WHERE l.thought_id=t.id) AS like_count,
(SELECT COUNT(*) FROM boosts b WHERE b.thought_id=t.id) AS boost_count,
(SELECT COUNT(*) FROM thoughts r WHERE r.in_reply_to_id=t.id) AS reply_count,
{viewer_checks}
FROM thoughts t JOIN users u ON u.id=t.user_id"
)
}
```
- [ ] **Step 3: Update `row_to_entry` to use viewer fields**
Update `row_to_entry` to accept `viewer: Option<uuid::Uuid>` and build the `ViewerContext`:
```rust
fn row_to_entry(r: FeedRow, viewer: Option<uuid::Uuid>) -> Result<FeedEntry, DomainError> {
let thought = Thought {
id: ThoughtId::from_uuid(r.thought_id),
user_id: UserId::from_uuid(r.t_user_id),
content: Content::new_remote(r.content),
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
visibility: Visibility::from_db_str(&r.visibility)?,
content_warning: r.content_warning,
sensitive: r.sensitive,
local: r.t_local,
created_at: r.thought_created_at,
updated_at: r.updated_at,
};
let author = User {
id: UserId::from_uuid(r.author_id),
username: Username::from_trusted(r.username),
email: Email::from_trusted(r.email),
password_hash: PasswordHash(r.password_hash),
display_name: r.display_name,
bio: r.bio,
avatar_url: r.avatar_url,
header_url: r.header_url,
custom_css: r.custom_css,
local: r.author_local,
created_at: r.author_created_at,
updated_at: r.author_updated_at,
};
Ok(FeedEntry {
thought,
author,
stats: domain::models::feed::EngagementStats {
like_count: r.like_count,
boost_count: r.boost_count,
reply_count: r.reply_count,
},
viewer: viewer.map(|_| domain::models::feed::ViewerContext {
liked: r.liked_by_viewer,
boosted: r.boosted_by_viewer,
}),
})
}
```
- [ ] **Step 4: Update `search_thoughts` to use viewer_id**
Find `search_thoughts` in `crates/adapters/postgres-search/src/lib.rs` (line 110). Rename `_viewer_id``viewer_id`, extract the viewer UUID, and thread it through `feed_select` and `row_to_entry`:
```rust
async fn search_thoughts(
&self,
query: &str,
page: &PageParams,
viewer_id: Option<&UserId>, // was _viewer_id
) -> Result<Paginated<FeedEntry>, DomainError> {
let viewer = viewer_id.map(|v| v.as_uuid());
let select = feed_select(viewer);
let total: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM thoughts t
WHERE t.content % $1 AND t.visibility='public'",
)
.bind(query)
.fetch_one(&self.pool)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
let sql = format!(
"{select}
WHERE t.content % $1 AND t.visibility='public'
ORDER BY similarity(t.content, $1) DESC
LIMIT $2 OFFSET $3"
);
let rows = sqlx::query_as::<_, FeedRow>(&sql)
.bind(query)
.bind(page.limit())
.bind(page.offset())
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::Internal(e.to_string()))?;
Ok(Paginated {
items: rows
.into_iter()
.map(|r| row_to_entry(r, viewer))
.collect::<Result<Vec<_>, _>>()?,
total,
page: page.page,
per_page: page.per_page,
})
}
```
Note: `USER_SELECT` from `postgres::user` is no longer used in this file after the switch from const to function. Remove the `use postgres::user::{UserRow, USER_SELECT};` import if `UserRow`/`USER_SELECT` are no longer referenced.
- [ ] **Step 5: Add an integration test for viewer-aware search**
In the `#[cfg(test)]` module in `postgres-search/src/lib.rs`, add after the existing tests:
```rust
#[sqlx::test(migrations = "../postgres/migrations")]
async fn search_thoughts_sets_viewer_context_when_authed(pool: sqlx::PgPool) {
use domain::ports::{LikeRepository, UserWriter};
use postgres::{like::PgLikeRepository, user::PgUserRepository};
use domain::models::social::Like;
use domain::value_objects::LikeId;
let (alice, thought) = seed_thought(&pool, "alice", "hello world").await;
// alice likes her own thought
let like_repo = PgLikeRepository::new(pool.clone());
like_repo.save(&Like {
id: LikeId::new(),
user_id: alice.id.clone(),
thought_id: thought.id.clone(),
ap_id: None,
created_at: chrono::Utc::now(),
}).await.unwrap();
let repo = PgSearchRepository::new(pool);
// with viewer — should see liked = true
let authed = repo
.search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, Some(&alice.id))
.await
.unwrap();
assert_eq!(authed.items.len(), 1);
let ctx = authed.items[0].viewer.as_ref().expect("viewer context present");
assert!(ctx.liked, "alice should see the thought as liked");
assert!(!ctx.boosted);
// without viewer — viewer should be None
let anon = repo
.search_thoughts("hello", &PageParams { page: 1, per_page: 20 }, None)
.await
.unwrap();
assert_eq!(anon.items.len(), 1);
assert!(anon.items[0].viewer.is_none(), "anonymous request has no viewer context");
}
```
- [ ] **Step 6: Compile check**
```bash
cargo check --workspace 2>&1 | head -20
```
Expected: 0 errors.
- [ ] **Step 7: Commit**
```bash
git add crates/adapters/postgres-search/src/lib.rs
git commit -m "fix(search): viewer-aware SQL in search_thoughts — ViewerContext now real instead of hardcoded false"
```
---
## Self-Review
**Spec coverage:**
| Spec requirement | Task |
|---|---|
| Add `EngagementStats` struct | Task 1 |
| Add `ViewerContext` struct | Task 1 |
| `FeedEntry.viewer: Option<ViewerContext>` | Task 1 |
| postgres feed adapter uses new structs | Task 2 |
| Handler `to_thought_response` uses new fields | Task 2 |
| search adapter `viewer: None` (structural fix) | Task 2 |
| search adapter uses real viewer SQL (functional fix) | Task 3 |
| `viewer: None` = anonymous; `Some(...)` = viewer present | Tasks 2 + 3 |
| Wire format (`ThoughtResponse`) unchanged | Task 2 step 3 |
**No placeholders found.**
**Type consistency:** `EngagementStats` and `ViewerContext` defined in Task 1, used by name in Tasks 2 and 3. `row_to_entry(r, viewer)` signature matches in both Task 2 and Task 3. `viewer: Option<uuid::Uuid>` threaded consistently.

View File

@@ -0,0 +1,80 @@
# FeedEntry Decoupling Design
**Goal:** Fix search viewer context (functional), restructure `FeedEntry` for clarity (structural), and make viewer presence explicit via `Option<ViewerContext>` (type-safe).
**Priority:** C (search fix) → B (struct clarity) → A (type safety). All three land in one pass.
---
## Data Model
Replace flat fields on `FeedEntry` with two named sub-structs in `crates/domain/src/models/feed.rs`:
```rust
#[derive(Debug, Clone)]
pub struct EngagementStats {
pub like_count: i64,
pub boost_count: i64,
pub reply_count: i64,
}
#[derive(Debug, Clone)]
pub struct ViewerContext {
pub liked: bool,
pub boosted: bool,
}
#[derive(Debug, Clone)]
pub struct FeedEntry {
pub thought: Thought,
pub author: User,
pub stats: EngagementStats,
pub viewer: Option<ViewerContext>, // None when no authenticated viewer
}
```
`viewer: None` means the request was anonymous or viewer state is unavailable (e.g. search without auth). `viewer: Some(ViewerContext { liked: false, boosted: false })` means a viewer is known and they have not liked or boosted the thought. These two states are now distinct at the type level.
---
## Search Adapter Fix
`SearchPort::search_thoughts` already accepts `viewer_id: Option<&UserId>` but `postgres-search/src/lib.rs` ignores it, always hardcoding `false` for viewer fields.
Fix: conditionally inject EXISTS subqueries into the search SQL, identical to the pattern used in `postgres/src/feed.rs`:
```sql
-- viewer_id = None (anonymous)
false AS liked_by_viewer,
false AS boosted_by_viewer
-- viewer_id = Some(uid)
EXISTS(SELECT 1 FROM likes WHERE user_id='{uid}' AND thought_id=t.id) AS liked_by_viewer,
EXISTS(SELECT 1 FROM boosts WHERE user_id='{uid}' AND thought_id=t.id) AS boosted_by_viewer
```
The `FeedRow` struct in postgres-search already has `liked_by_viewer: bool` and `boosted_by_viewer: bool` columns — they just need to be populated correctly. No schema change required.
---
## Callsite Migration
| File | Change |
|---|---|
| `crates/domain/src/models/feed.rs` | Replace flat stats/viewer fields with `EngagementStats` and `Option<ViewerContext>` |
| `crates/adapters/postgres/src/feed.rs``row_to_entry` | Construct `EngagementStats { ... }` and `viewer: Some/None` based on `FeedRow` |
| `crates/adapters/postgres-search/src/lib.rs``row_to_entry` + SQL | Fix SQL to use viewer_id; build `Option<ViewerContext>` from result |
| `crates/presentation/src/handlers/feed.rs``to_thought_response` | `e.stats.like_count`, `e.viewer.as_ref().map(|v| v.liked).unwrap_or(false)` |
| `crates/domain/src/testing.rs``TestStore` feed impl | Build `FeedEntry` with `stats:` and `viewer:` fields |
`ThoughtResponse` in `api-types/src/responses.rs` keeps `liked_by_viewer: bool` and `boosted_by_viewer: bool` — the wire format is unchanged. `viewer: None` serialises as `false` in `to_thought_response`.
---
## What Does Not Change
- `FeedRepository` port signatures (still returns `Paginated<FeedEntry>`)
- HTTP response shape (`ThoughtResponse`)
- Database schema
- Pagination, filtering, or query logic
- Any code path that doesn't touch `FeedEntry` fields directly