refactor(ports): ActivityPubRepository takes &str instead of url::Url — infra type stays in adapter

This commit is contained in:
2026-05-15 14:06:33 +02:00
parent c76894e527
commit 3f6b91c943
5 changed files with 69 additions and 85 deletions

View File

@@ -117,7 +117,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let note: ThoughtNote = serde_json::from_value(object)?; let note: ThoughtNote = serde_json::from_value(object)?;
let author_id = self let author_id = self
.repo .repo
.intern_remote_actor(actor_url) .intern_remote_actor(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
@@ -140,14 +140,14 @@ impl ApObjectHandler for ThoughtsObjectHandler {
self.repo self.repo
.accept_note( .accept_note(
ap_id, ap_id.as_str(),
&author_id, &author_id,
&note.content, &note.content,
note.published, note.published,
note.sensitive, note.sensitive,
note.summary, note.summary,
visibility, visibility,
note.in_reply_to.as_ref(), note.in_reply_to.as_ref().map(|u| u.as_str()),
) )
.await .await
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
@@ -198,21 +198,21 @@ impl ApObjectHandler for ThoughtsObjectHandler {
) -> Result<()> { ) -> Result<()> {
let note: ThoughtNote = serde_json::from_value(object)?; let note: ThoughtNote = serde_json::from_value(object)?;
self.repo self.repo
.apply_note_update(ap_id, &note.content) .apply_note_update(ap_id.as_str(), &note.content)
.await .await
.map_err(|e| anyhow!("{e}")) .map_err(|e| anyhow!("{e}"))
} }
async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> { async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> {
self.repo self.repo
.retract_note(ap_id) .retract_note(ap_id.as_str())
.await .await
.map_err(|e| anyhow!("{e}")) .map_err(|e| anyhow!("{e}"))
} }
async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> { async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> {
self.repo self.repo
.retract_actor_notes(actor_url) .retract_actor_notes(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}")) .map_err(|e| anyhow!("{e}"))
} }
@@ -234,7 +234,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self let actor_user_id = self
.repo .repo
.find_remote_actor_id(actor_url) .find_remote_actor_id(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
@@ -278,7 +278,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self let actor_user_id = self
.repo .repo
.find_remote_actor_id(actor_url) .find_remote_actor_id(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;
@@ -310,7 +310,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let author_user_id = match self let author_user_id = match self
.repo .repo
.find_remote_actor_id(actor_url) .find_remote_actor_id(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}"))? .map_err(|e| anyhow!("{e}"))?
{ {
@@ -356,7 +356,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
let actor_user_id = self let actor_user_id = self
.repo .repo
.find_remote_actor_id(actor_url) .find_remote_actor_id(actor_url.as_str())
.await .await
.map_err(|e| anyhow!("{e}"))?; .map_err(|e| anyhow!("{e}"))?;

View File

@@ -5,7 +5,6 @@ const MAX_REMOTE_CONTENT_CHARS: usize = 500;
const THOUGHTS_PATH_PREFIX: &str = "/thoughts/"; const THOUGHTS_PATH_PREFIX: &str = "/thoughts/";
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::PgPool; use sqlx::PgPool;
use url::Url;
use domain::{ use domain::{
errors::DomainError, errors::DomainError,
@@ -139,17 +138,17 @@ impl ActivityPubRepository for PgActivityPubRepository {
async fn find_remote_actor_id( async fn find_remote_actor_id(
&self, &self,
actor_ap_url: &Url, actor_ap_url: &str,
) -> Result<Option<UserId>, DomainError> { ) -> Result<Option<UserId>, DomainError> {
sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1") sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1")
.bind(actor_ap_url.as_str()) .bind(actor_ap_url)
.fetch_optional(&self.pool) .fetch_optional(&self.pool)
.await .await
.into_domain() .into_domain()
.map(|o| o.map(UserId::from_uuid)) .map(|o| o.map(UserId::from_uuid))
} }
async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result<UserId, DomainError> { async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError> {
if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? { if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? {
return Ok(id); return Ok(id);
} }
@@ -157,11 +156,13 @@ impl ActivityPubRepository for PgActivityPubRepository {
// Use the last path segment as username (e.g. /users/alice → "alice"). // Use the last path segment as username (e.g. /users/alice → "alice").
// Falls back to a random short id for long segments (e.g. UUID-based actor URLs). // Falls back to a random short id for long segments (e.g. UUID-based actor URLs).
// username column is VARCHAR(32). // username column is VARCHAR(32).
let last_seg = actor_ap_url let last_seg = url::Url::parse(actor_ap_url)
.path_segments() .ok()
.and_then(|mut s| s.next_back()) .and_then(|u| {
.unwrap_or("") u.path_segments()
.to_string(); .and_then(|mut s| s.next_back().map(|s| s.to_string()))
})
.unwrap_or_default();
let handle = if last_seg.is_empty() { let handle = if last_seg.is_empty() {
format!("remote_{}", &new_id.to_string()[..13]) format!("remote_{}", &new_id.to_string()[..13])
} else if last_seg.len() <= 32 { } else if last_seg.len() <= 32 {
@@ -176,7 +177,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.bind(new_id) .bind(new_id)
.bind(&handle) .bind(&handle)
.bind(format!("{}@remote", new_id)) .bind(format!("{}@remote", new_id))
.bind(actor_ap_url.as_str()) .bind(actor_ap_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.into_domain()?; .into_domain()?;
@@ -211,25 +212,26 @@ impl ActivityPubRepository for PgActivityPubRepository {
async fn accept_note( async fn accept_note(
&self, &self,
ap_id: &Url, ap_id: &str,
author_id: &UserId, author_id: &UserId,
content: &str, content: &str,
published: DateTime<Utc>, published: DateTime<Utc>,
sensitive: bool, sensitive: bool,
content_warning: Option<String>, content_warning: Option<String>,
visibility: &str, visibility: &str,
in_reply_to: Option<&Url>, in_reply_to: Option<&str>,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect(); let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
let (in_reply_to_id, in_reply_to_url) = match in_reply_to { let (in_reply_to_id, in_reply_to_url) = match in_reply_to {
Some(url) => { Some(url) => {
// If the parent is a local thought, extract its UUID for in_reply_to_id. // If the parent is a local thought, extract its UUID for in_reply_to_id.
let local_uuid = url let local_uuid = url::Url::parse(url).ok().and_then(|u| {
.path() u.path()
.strip_prefix(THOUGHTS_PATH_PREFIX) .strip_prefix(THOUGHTS_PATH_PREFIX)
.and_then(|s| s.split('/').next()) .and_then(|s| s.split('/').next())
.and_then(|s| uuid::Uuid::parse_str(s).ok()); .and_then(|s| uuid::Uuid::parse_str(s).ok())
(local_uuid, Some(url.as_str().to_string())) });
(local_uuid, Some(url.to_string()))
} }
None => (None, None), None => (None, None),
}; };
@@ -240,7 +242,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
.bind(uuid::Uuid::new_v4()) .bind(uuid::Uuid::new_v4())
.bind(author_id.as_uuid()) .bind(author_id.as_uuid())
.bind(&capped) .bind(&capped)
.bind(ap_id.as_str()) .bind(ap_id)
.bind(sensitive) .bind(sensitive)
.bind(content_warning) .bind(content_warning)
.bind(published) .bind(published)
@@ -253,12 +255,12 @@ impl ActivityPubRepository for PgActivityPubRepository {
.map(|_| ()) .map(|_| ())
} }
async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> { async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError> {
let capped: String = new_content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect(); let capped: String = new_content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
sqlx::query( sqlx::query(
"UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false", "UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false",
) )
.bind(ap_id.as_str()) .bind(ap_id)
.bind(&capped) .bind(&capped)
.execute(&self.pool) .execute(&self.pool)
.await .await
@@ -266,20 +268,20 @@ impl ActivityPubRepository for PgActivityPubRepository {
.map(|_| ()) .map(|_| ())
} }
async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> { async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError> {
sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false") sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false")
.bind(ap_id.as_str()) .bind(ap_id)
.execute(&self.pool) .execute(&self.pool)
.await .await
.into_domain() .into_domain()
.map(|_| ()) .map(|_| ())
} }
async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> { async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError> {
sqlx::query( sqlx::query(
"DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)", "DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)",
) )
.bind(actor_ap_url.as_str()) .bind(actor_ap_url)
.execute(&self.pool) .execute(&self.pool)
.await .await
.into_domain() .into_domain()
@@ -331,20 +333,20 @@ mod tests {
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) { async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) {
let repo = PgActivityPubRepository::new(pool); let repo = PgActivityPubRepository::new(pool);
let url = url::Url::parse("https://mastodon.social/users/alice").unwrap(); let url = "https://mastodon.social/users/alice";
let id1 = repo.intern_remote_actor(&url).await.unwrap(); let id1 = repo.intern_remote_actor(url).await.unwrap();
let id2 = repo.intern_remote_actor(&url).await.unwrap(); let id2 = repo.intern_remote_actor(url).await.unwrap();
assert_eq!(id1, id2); assert_eq!(id1, id2);
} }
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn accept_and_retract_note(pool: sqlx::PgPool) { async fn accept_and_retract_note(pool: sqlx::PgPool) {
let repo = PgActivityPubRepository::new(pool); let repo = PgActivityPubRepository::new(pool);
let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap(); let actor_url = "https://remote.example/users/bob";
let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap(); let ap_id = "https://remote.example/notes/1";
let author = repo.intern_remote_actor(&actor_url).await.unwrap(); let author = repo.intern_remote_actor(actor_url).await.unwrap();
repo.accept_note( repo.accept_note(
&ap_id, ap_id,
&author, &author,
"hello from remote", "hello from remote",
chrono::Utc::now(), chrono::Utc::now(),
@@ -355,7 +357,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
repo.retract_note(&ap_id).await.unwrap(); repo.retract_note(ap_id).await.unwrap();
} }
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]

View File

@@ -81,10 +81,9 @@ pub async fn get_remote_actor_posts(
viewer_id: Option<&UserId>, viewer_id: Option<&UserId>,
) -> Result<Paginated<FeedEntry>, DomainError> { ) -> Result<Paginated<FeedEntry>, DomainError> {
let actor = federation.lookup_actor(handle).await?; let actor = federation.lookup_actor(handle).await?;
let ap_url = url::Url::parse(&actor.url).map_err(|e| DomainError::Internal(e.to_string()))?; let author_id = match ap_repo.find_remote_actor_id(&actor.url).await? {
let author_id = match ap_repo.find_remote_actor_id(&ap_url).await? {
Some(id) => id, Some(id) => id,
None => ap_repo.intern_remote_actor(&ap_url).await?, None => ap_repo.intern_remote_actor(&actor.url).await?,
}; };
let result = feed.user_feed(&author_id, &page, viewer_id).await?; let result = feed.user_feed(&author_id, &page, viewer_id).await?;
if let Some(outbox_url) = actor.outbox_url { if let Some(outbox_url) = actor.outbox_url {

View File

@@ -401,14 +401,12 @@ pub trait ActivityPubRepository: Send + Sync {
// ── Remote actor resolution ────────────────────────────────────── // ── Remote actor resolution ──────────────────────────────────────
/// Find the local UserId for a remote actor by its AP URL. /// Find the local UserId for a remote actor by its AP URL.
async fn find_remote_actor_id( async fn find_remote_actor_id(&self, actor_ap_url: &str)
&self, -> Result<Option<UserId>, DomainError>;
actor_ap_url: &url::Url,
) -> Result<Option<UserId>, DomainError>;
/// Ensure a remote actor placeholder exists; create one if absent. /// Ensure a remote actor placeholder exists; create one if absent.
/// Idempotent — safe to call multiple times with the same URL. /// Idempotent — safe to call multiple times with the same URL.
async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result<UserId, DomainError>; async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError>;
/// Update display_name and avatar_url for an already-interned remote actor. /// Update display_name and avatar_url for an already-interned remote actor.
async fn update_remote_actor_display( async fn update_remote_actor_display(
@@ -424,29 +422,25 @@ pub trait ActivityPubRepository: Send + Sync {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn accept_note( async fn accept_note(
&self, &self,
ap_id: &url::Url, ap_id: &str,
author_id: &UserId, author_id: &UserId,
content: &str, content: &str,
published: chrono::DateTime<chrono::Utc>, published: chrono::DateTime<chrono::Utc>,
sensitive: bool, sensitive: bool,
content_warning: Option<String>, content_warning: Option<String>,
visibility: &str, visibility: &str,
in_reply_to: Option<&url::Url>, in_reply_to: Option<&str>,
) -> Result<(), DomainError>; ) -> Result<(), DomainError>;
/// Apply an Update to a previously accepted remote Note. /// Apply an Update to a previously accepted remote Note.
async fn apply_note_update( async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>;
&self,
ap_id: &url::Url,
new_content: &str,
) -> Result<(), DomainError>;
/// Remove a specific remote Note (Delete activity). Only touches /// Remove a specific remote Note (Delete activity). Only touches
/// remotely-originated thoughts. /// remotely-originated thoughts.
async fn retract_note(&self, ap_id: &url::Url) -> Result<(), DomainError>; async fn retract_note(&self, ap_id: &str) -> Result<(), DomainError>;
/// Remove all Notes from a remote actor (actor-level Delete/Tombstone). /// Remove all Notes from a remote actor (actor-level Delete/Tombstone).
async fn retract_actor_notes(&self, actor_ap_url: &url::Url) -> Result<(), DomainError>; async fn retract_actor_notes(&self, actor_ap_url: &str) -> Result<(), DomainError>;
// ── Node-level stats ───────────────────────────────────────────── // ── Node-level stats ─────────────────────────────────────────────

View File

@@ -21,7 +21,6 @@ use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use url;
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct TestStore { pub struct TestStore {
@@ -820,24 +819,18 @@ impl ActivityPubRepository for TestStore {
} }
async fn find_remote_actor_id( async fn find_remote_actor_id(
&self, &self,
actor_ap_url: &url::Url, actor_ap_url: &str,
) -> Result<Option<UserId>, DomainError> { ) -> Result<Option<UserId>, DomainError> {
Ok(self Ok(self.actor_ap_ids.lock().unwrap().get(actor_ap_url).cloned())
.actor_ap_ids
.lock()
.unwrap()
.get(actor_ap_url.as_str())
.cloned())
} }
async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result<UserId, DomainError> { async fn intern_remote_actor(&self, actor_ap_url: &str) -> Result<UserId, DomainError> {
if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? { if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? {
return Ok(uid); return Ok(uid);
} }
let uid = UserId::new(); let uid = UserId::new();
let handle = actor_ap_url let handle = url::Url::parse(actor_ap_url)
.path() .map(|u| u.path().trim_start_matches('/').replace('/', "_"))
.trim_start_matches('/') .unwrap_or_else(|_| format!("remote_{}", &uid.to_string()[..8]));
.replace('/', "_");
let user = crate::models::user::User { let user = crate::models::user::User {
id: uid.clone(), id: uid.clone(),
username: Username::from_trusted(handle.clone()), username: Username::from_trusted(handle.clone()),
@@ -869,28 +862,24 @@ impl ActivityPubRepository for TestStore {
} }
async fn accept_note( async fn accept_note(
&self, &self,
_ap_id: &url::Url, _ap_id: &str,
_author_id: &UserId, _author_id: &UserId,
_content: &str, _content: &str,
_published: chrono::DateTime<chrono::Utc>, _published: chrono::DateTime<chrono::Utc>,
_sensitive: bool, _sensitive: bool,
_content_warning: Option<String>, _content_warning: Option<String>,
_visibility: &str, _visibility: &str,
_in_reply_to: Option<&url::Url>, _in_reply_to: Option<&str>,
) -> Result<(), DomainError> { ) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn apply_note_update( async fn apply_note_update(&self, _ap_id: &str, _new_content: &str) -> Result<(), DomainError> {
&self,
_ap_id: &url::Url,
_new_content: &str,
) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn retract_note(&self, _ap_id: &url::Url) -> Result<(), DomainError> { async fn retract_note(&self, _ap_id: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn retract_actor_notes(&self, _actor_ap_url: &url::Url) -> Result<(), DomainError> { async fn retract_actor_notes(&self, _actor_ap_url: &str) -> Result<(), DomainError> {
Ok(()) Ok(())
} }
async fn count_local_notes(&self) -> Result<u64, DomainError> { async fn count_local_notes(&self) -> Result<u64, DomainError> {
@@ -966,9 +955,9 @@ mod ap_repo_tests {
#[tokio::test] #[tokio::test]
async fn test_store_intern_creates_placeholder() { async fn test_store_intern_creates_placeholder() {
let store = TestStore::default(); let store = TestStore::default();
let url = url::Url::parse("https://example.com/users/alice").unwrap(); let url = "https://example.com/users/alice";
let id1 = store.intern_remote_actor(&url).await.unwrap(); let id1 = store.intern_remote_actor(url).await.unwrap();
let id2 = store.intern_remote_actor(&url).await.unwrap(); let id2 = store.intern_remote_actor(url).await.unwrap();
assert_eq!(id1, id2, "intern must be idempotent"); assert_eq!(id1, id2, "intern must be idempotent");
} }
} }