docs: ActivityPubRepository port implementation plan
This commit is contained in:
639
docs/superpowers/plans/2026-05-14-activitypub-repository-port.md
Normal file
639
docs/superpowers/plans/2026-05-14-activitypub-repository-port.md
Normal file
@@ -0,0 +1,639 @@
|
|||||||
|
# ActivityPubRepository Port Implementation Plan
|
||||||
|
|
||||||
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||||||
|
|
||||||
|
**Goal:** Eliminate the `activitypub` → `postgres` dependency violation by extracting an `ActivityPubRepository` port into domain and implementing it in the postgres adapter.
|
||||||
|
|
||||||
|
**Architecture:** `ActivityPubRepository` (9 methods, federation vocabulary) is added to `domain/src/ports.rs`. `PgActivityPubRepository` in `postgres/src/activitypub.rs` implements it with all the SQL that currently lives in `activitypub/src/handler.rs`. `ThoughtsObjectHandler` drops its `PgPool` and receives `Arc<dyn ActivityPubRepository>` instead. The dependency chain becomes `activitypub → domain` only; `postgres` drops off the `activitypub` Cargo.toml entirely.
|
||||||
|
|
||||||
|
**Tech Stack:** Rust, sqlx 0.8, async-trait, existing domain value objects
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## File Map
|
||||||
|
|
||||||
|
```
|
||||||
|
Modify: crates/domain/src/ports.rs ← add OutboxEntry struct + ActivityPubRepository trait
|
||||||
|
Modify: crates/domain/src/testing.rs ← add TestStore impl ActivityPubRepository
|
||||||
|
Create: crates/adapters/postgres/src/activitypub.rs ← PgActivityPubRepository (all 9 methods)
|
||||||
|
Modify: crates/adapters/postgres/src/lib.rs ← pub mod activitypub
|
||||||
|
Modify: crates/adapters/activitypub/src/handler.rs ← replace PgPool with Arc<dyn ActivityPubRepository>
|
||||||
|
Modify: crates/adapters/activitypub/Cargo.toml ← remove postgres + sqlx deps
|
||||||
|
Modify: crates/presentation/src/lib.rs ← wire PgActivityPubRepository into ThoughtsObjectHandler
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Domain — OutboxEntry + ActivityPubRepository trait
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/domain/src/ports.rs`
|
||||||
|
- Modify: `crates/domain/src/testing.rs`
|
||||||
|
|
||||||
|
- [ ] **Write the failing test** — add to bottom of `crates/domain/src/testing.rs` inside the existing `#[cfg(any(test, feature = "test-helpers"))]` scope:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[cfg(test)]
|
||||||
|
mod ap_repo_tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::models::thought::{Thought, Visibility};
|
||||||
|
use crate::value_objects::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_outbox_returns_empty() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let result = store.outbox_entries_for_actor(&UserId::new()).await.unwrap();
|
||||||
|
assert!(result.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_intern_creates_placeholder() {
|
||||||
|
let store = TestStore::default();
|
||||||
|
let url = url::Url::parse("https://example.com/users/alice").unwrap();
|
||||||
|
let id1 = store.intern_remote_actor(&url).await.unwrap();
|
||||||
|
let id2 = store.intern_remote_actor(&url).await.unwrap();
|
||||||
|
assert_eq!(id1, id2, "intern must be idempotent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p domain` — Expected: FAIL (ActivityPubRepository not defined).
|
||||||
|
|
||||||
|
- [ ] **Add `OutboxEntry` and `ActivityPubRepository` to `crates/domain/src/ports.rs`** — append after the `SearchPort` trait:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
/// A local thought ready for AP serialization, with the author's username
|
||||||
|
/// pre-joined so the handler can build AP URLs without a second query.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct OutboxEntry {
|
||||||
|
pub thought: crate::models::thought::Thought,
|
||||||
|
pub author_username: Username,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ActivityPubRepository: Send + Sync {
|
||||||
|
// ── Outbox (local → remote) ──────────────────────────────────────
|
||||||
|
|
||||||
|
/// All public local thoughts for this actor. Used for outbox totals
|
||||||
|
/// and full-collection delivery.
|
||||||
|
async fn outbox_entries_for_actor(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Vec<OutboxEntry>, DomainError>;
|
||||||
|
|
||||||
|
/// Cursor page of public local thoughts, newest-first, stopping before
|
||||||
|
/// `before`. Used for OrderedCollectionPage responses.
|
||||||
|
async fn outbox_page_for_actor(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
before: Option<chrono::DateTime<chrono::Utc>>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<Vec<OutboxEntry>, DomainError>;
|
||||||
|
|
||||||
|
// ── Remote actor resolution ──────────────────────────────────────
|
||||||
|
|
||||||
|
/// Find the local UserId for a remote actor by its AP URL.
|
||||||
|
async fn find_remote_actor_id(
|
||||||
|
&self,
|
||||||
|
actor_ap_url: &url::Url,
|
||||||
|
) -> Result<Option<UserId>, DomainError>;
|
||||||
|
|
||||||
|
/// Ensure a remote actor placeholder exists; create one if absent.
|
||||||
|
/// Idempotent — safe to call multiple times with the same URL.
|
||||||
|
async fn intern_remote_actor(
|
||||||
|
&self,
|
||||||
|
actor_ap_url: &url::Url,
|
||||||
|
) -> Result<UserId, DomainError>;
|
||||||
|
|
||||||
|
// ── Inbox processing (remote → local) ───────────────────────────
|
||||||
|
|
||||||
|
/// Persist an incoming remote Note. Idempotent on ap_id.
|
||||||
|
async fn accept_note(
|
||||||
|
&self,
|
||||||
|
ap_id: &url::Url,
|
||||||
|
author_id: &UserId,
|
||||||
|
content: &str,
|
||||||
|
published: chrono::DateTime<chrono::Utc>,
|
||||||
|
sensitive: bool,
|
||||||
|
content_warning: Option<String>,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Apply an Update to a previously accepted remote Note.
|
||||||
|
async fn apply_note_update(
|
||||||
|
&self,
|
||||||
|
ap_id: &url::Url,
|
||||||
|
new_content: &str,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Remove a specific remote Note (Delete activity). Only touches
|
||||||
|
/// remotely-originated thoughts.
|
||||||
|
async fn retract_note(&self, ap_id: &url::Url) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Remove all Notes from a remote actor (actor-level Delete/Tombstone).
|
||||||
|
async fn retract_actor_notes(
|
||||||
|
&self,
|
||||||
|
actor_ap_url: &url::Url,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
// ── Node-level stats ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Total locally-authored thought count for NodeInfo responses.
|
||||||
|
async fn count_local_notes(&self) -> Result<u64, DomainError>;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The imports already present in `ports.rs` cover `DomainError`, `UserId`, `Username`, `async_trait`. The `url::Url` and `chrono::DateTime` types need to be in scope — add these use statements at the top of `ports.rs` if not already present:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use url::Url;
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: `url` and `chrono` are already in `domain/Cargo.toml`. No dep changes needed.
|
||||||
|
|
||||||
|
- [ ] **Add `TestStore impl ActivityPubRepository`** in `crates/domain/src/testing.rs` — append after `impl SearchPort for TestStore`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[async_trait] impl ActivityPubRepository for TestStore {
|
||||||
|
async fn outbox_entries_for_actor(&self, _uid: &UserId) -> Result<Vec<crate::ports::OutboxEntry>, DomainError> {
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
async fn outbox_page_for_actor(&self, _uid: &UserId, _before: Option<chrono::DateTime<chrono::Utc>>, _limit: usize) -> Result<Vec<crate::ports::OutboxEntry>, DomainError> {
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
async fn find_remote_actor_id(&self, actor_ap_url: &url::Url) -> Result<Option<UserId>, DomainError> {
|
||||||
|
let url = actor_ap_url.to_string();
|
||||||
|
Ok(self.users.lock().unwrap().iter().find(|u| u.ap_id.as_deref() == Some(&url)).map(|u| u.id.clone()))
|
||||||
|
}
|
||||||
|
async fn intern_remote_actor(&self, actor_ap_url: &url::Url) -> Result<UserId, DomainError> {
|
||||||
|
if let Some(uid) = self.find_remote_actor_id(actor_ap_url).await? {
|
||||||
|
return Ok(uid);
|
||||||
|
}
|
||||||
|
let uid = UserId::new();
|
||||||
|
let handle = actor_ap_url.path().trim_start_matches('/').replace('/', "_");
|
||||||
|
let user = crate::models::user::User {
|
||||||
|
id: uid.clone(),
|
||||||
|
username: Username::from_trusted(handle.clone()),
|
||||||
|
email: Email::from_trusted(format!("{}@remote", uid)),
|
||||||
|
password_hash: PasswordHash("".into()),
|
||||||
|
display_name: None, bio: None, avatar_url: None, header_url: None,
|
||||||
|
custom_css: None, local: false,
|
||||||
|
ap_id: Some(actor_ap_url.to_string()),
|
||||||
|
inbox_url: None, public_key: None, private_key: None,
|
||||||
|
created_at: chrono::Utc::now(), updated_at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
self.users.lock().unwrap().push(user);
|
||||||
|
Ok(uid)
|
||||||
|
}
|
||||||
|
async fn accept_note(&self, _ap_id: &url::Url, _author_id: &UserId, _content: &str, _published: chrono::DateTime<chrono::Utc>, _sensitive: bool, _content_warning: Option<String>) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn apply_note_update(&self, _ap_id: &url::Url, _new_content: &str) -> Result<(), DomainError> { Ok(()) }
|
||||||
|
async fn retract_note(&self, _ap_id: &url::Url) -> Result<(), DomainError> { Ok(()) }
|
||||||
|
async fn retract_actor_notes(&self, _actor_ap_url: &url::Url) -> Result<(), DomainError> { Ok(()) }
|
||||||
|
async fn count_local_notes(&self) -> Result<u64, DomainError> {
|
||||||
|
Ok(self.thoughts.lock().unwrap().iter().filter(|t| t.local).count() as u64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p domain` — Expected: all tests pass including 2 new ap_repo tests.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/domain/
|
||||||
|
git commit -m "feat(domain): ActivityPubRepository port with federation vocabulary"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: Postgres — PgActivityPubRepository
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `crates/adapters/postgres/src/activitypub.rs`
|
||||||
|
- Modify: `crates/adapters/postgres/src/lib.rs`
|
||||||
|
|
||||||
|
- [ ] **Write integration tests** at the bottom of the new `crates/adapters/postgres/src/activitypub.rs` (create the file with tests first):
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use domain::ports::ActivityPubRepository;
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) {
|
||||||
|
let repo = PgActivityPubRepository::new(pool);
|
||||||
|
let url = url::Url::parse("https://mastodon.social/users/alice").unwrap();
|
||||||
|
let id1 = repo.intern_remote_actor(&url).await.unwrap();
|
||||||
|
let id2 = repo.intern_remote_actor(&url).await.unwrap();
|
||||||
|
assert_eq!(id1, id2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn accept_and_retract_note(pool: sqlx::PgPool) {
|
||||||
|
let repo = PgActivityPubRepository::new(pool);
|
||||||
|
let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap();
|
||||||
|
let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap();
|
||||||
|
let author = repo.intern_remote_actor(&actor_url).await.unwrap();
|
||||||
|
repo.accept_note(&ap_id, &author, "hello from remote", chrono::Utc::now(), false, None)
|
||||||
|
.await.unwrap();
|
||||||
|
repo.retract_note(&ap_id).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn count_local_notes_excludes_remote(pool: sqlx::PgPool) {
|
||||||
|
let repo = PgActivityPubRepository::new(pool);
|
||||||
|
assert_eq!(repo.count_local_notes().await.unwrap(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo test -p postgres activitypub` — Expected: FAIL (module does not exist).
|
||||||
|
|
||||||
|
- [ ] **Write `crates/adapters/postgres/src/activitypub.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use domain::{errors::DomainError, ports::{ActivityPubRepository, OutboxEntry}, value_objects::{Content, ThoughtId, UserId, Username}};
|
||||||
|
use domain::models::thought::{Thought, Visibility};
|
||||||
|
|
||||||
|
pub struct PgActivityPubRepository { pool: PgPool }
|
||||||
|
impl PgActivityPubRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } }
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ActivityPubRepository for PgActivityPubRepository {
|
||||||
|
async fn outbox_entries_for_actor(&self, user_id: &UserId) -> Result<Vec<OutboxEntry>, DomainError> {
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row { id: uuid::Uuid, user_id: uuid::Uuid, content: String, created_at: DateTime<Utc>, in_reply_to_id: Option<uuid::Uuid>, content_warning: Option<String>, sensitive: bool, username: String, updated_at: Option<DateTime<Utc>> }
|
||||||
|
sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
||||||
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
||||||
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
|
||||||
|
ORDER BY t.created_at DESC"
|
||||||
|
)
|
||||||
|
.bind(user_id.as_uuid())
|
||||||
|
.fetch_all(&self.pool).await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
.map(|rows| rows.into_iter().map(|r| OutboxEntry {
|
||||||
|
thought: Thought {
|
||||||
|
id: ThoughtId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id),
|
||||||
|
content: Content::new_remote(r.content), in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
|
||||||
|
in_reply_to_url: None, ap_id: None, visibility: Visibility::Public,
|
||||||
|
content_warning: r.content_warning, sensitive: r.sensitive, local: true,
|
||||||
|
created_at: r.created_at, updated_at: r.updated_at,
|
||||||
|
},
|
||||||
|
author_username: Username::from_trusted(r.username),
|
||||||
|
}).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn outbox_page_for_actor(&self, user_id: &UserId, before: Option<DateTime<Utc>>, limit: usize) -> Result<Vec<OutboxEntry>, DomainError> {
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct Row { id: uuid::Uuid, user_id: uuid::Uuid, content: String, created_at: DateTime<Utc>, in_reply_to_id: Option<uuid::Uuid>, content_warning: Option<String>, sensitive: bool, username: String, updated_at: Option<DateTime<Utc>> }
|
||||||
|
let rows = if let Some(before) = before {
|
||||||
|
sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
||||||
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
||||||
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2
|
||||||
|
ORDER BY t.created_at DESC LIMIT $3"
|
||||||
|
).bind(user_id.as_uuid()).bind(before).bind(limit as i64).fetch_all(&self.pool).await
|
||||||
|
} else {
|
||||||
|
sqlx::query_as::<_, Row>(
|
||||||
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
||||||
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
||||||
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
|
||||||
|
ORDER BY t.created_at DESC LIMIT $2"
|
||||||
|
).bind(user_id.as_uuid()).bind(limit as i64).fetch_all(&self.pool).await
|
||||||
|
}.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
Ok(rows.into_iter().map(|r| OutboxEntry {
|
||||||
|
thought: Thought {
|
||||||
|
id: ThoughtId::from_uuid(r.id), user_id: UserId::from_uuid(r.user_id),
|
||||||
|
content: Content::new_remote(r.content), in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
|
||||||
|
in_reply_to_url: None, ap_id: None, visibility: Visibility::Public,
|
||||||
|
content_warning: r.content_warning, sensitive: r.sensitive, local: true,
|
||||||
|
created_at: r.created_at, updated_at: r.updated_at,
|
||||||
|
},
|
||||||
|
author_username: Username::from_trusted(r.username),
|
||||||
|
}).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_remote_actor_id(&self, actor_ap_url: &Url) -> Result<Option<UserId>, DomainError> {
|
||||||
|
sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1")
|
||||||
|
.bind(actor_ap_url.as_str())
|
||||||
|
.fetch_optional(&self.pool).await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
.map(|o| o.map(UserId::from_uuid))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result<UserId, DomainError> {
|
||||||
|
// Fast path
|
||||||
|
if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? {
|
||||||
|
return Ok(id);
|
||||||
|
}
|
||||||
|
let new_id = uuid::Uuid::new_v4();
|
||||||
|
let handle = actor_ap_url.path().trim_start_matches('/').replace('/', "_");
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at)
|
||||||
|
VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING"
|
||||||
|
)
|
||||||
|
.bind(new_id).bind(&handle).bind(format!("{}@remote", new_id)).bind(actor_ap_url.as_str())
|
||||||
|
.execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
// Re-fetch to get whichever id won the race
|
||||||
|
self.find_remote_actor_id(actor_ap_url).await?
|
||||||
|
.ok_or_else(|| DomainError::Internal("intern_remote_actor: insert succeeded but row not found".into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn accept_note(&self, ap_id: &Url, author_id: &UserId, content: &str, published: DateTime<Utc>, sensitive: bool, content_warning: Option<String>) -> Result<(), DomainError> {
|
||||||
|
let capped: String = content.chars().take(500).collect();
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at)
|
||||||
|
VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING"
|
||||||
|
)
|
||||||
|
.bind(uuid::Uuid::new_v4()).bind(author_id.as_uuid()).bind(&capped)
|
||||||
|
.bind(ap_id.as_str()).bind(sensitive).bind(content_warning).bind(published)
|
||||||
|
.execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> {
|
||||||
|
let capped: String = new_content.chars().take(500).collect();
|
||||||
|
sqlx::query("UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false")
|
||||||
|
.bind(ap_id.as_str()).bind(&capped)
|
||||||
|
.execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> {
|
||||||
|
sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false")
|
||||||
|
.bind(ap_id.as_str())
|
||||||
|
.execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> {
|
||||||
|
sqlx::query(
|
||||||
|
"DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)"
|
||||||
|
)
|
||||||
|
.bind(actor_ap_url.as_str())
|
||||||
|
.execute(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string())).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count_local_notes(&self) -> Result<u64, DomainError> {
|
||||||
|
let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true")
|
||||||
|
.fetch_one(&self.pool).await.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
Ok(n as u64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Add `pub mod activitypub;`** to `crates/adapters/postgres/src/lib.rs` — append alongside the other module declarations.
|
||||||
|
|
||||||
|
- [ ] **Run:** `DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test -p postgres activitypub`
|
||||||
|
Expected: 3 tests pass.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/adapters/postgres/src/activitypub.rs crates/adapters/postgres/src/lib.rs
|
||||||
|
git commit -m "feat(postgres): PgActivityPubRepository implementing ActivityPubRepository port"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: activitypub adapter — use the port, drop postgres dep
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/adapters/activitypub/src/handler.rs`
|
||||||
|
- Modify: `crates/adapters/activitypub/Cargo.toml`
|
||||||
|
|
||||||
|
- [ ] **Rewrite `crates/adapters/activitypub/src/handler.rs`:**
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use std::sync::Arc;
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use activitypub_base::ApObjectHandler;
|
||||||
|
use domain::ports::ActivityPubRepository;
|
||||||
|
use domain::value_objects::UserId;
|
||||||
|
use crate::note::ThoughtNote;
|
||||||
|
use crate::urls::ThoughtsUrls;
|
||||||
|
|
||||||
|
pub struct ThoughtsObjectHandler {
|
||||||
|
repo: Arc<dyn ActivityPubRepository>,
|
||||||
|
urls: ThoughtsUrls,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ThoughtsObjectHandler {
|
||||||
|
pub fn new(repo: Arc<dyn ActivityPubRepository>, base_url: &str) -> Self {
|
||||||
|
Self { repo, urls: ThoughtsUrls::new(base_url) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ApObjectHandler for ThoughtsObjectHandler {
|
||||||
|
async fn get_local_objects_for_user(
|
||||||
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
) -> Result<Vec<(Url, serde_json::Value)>> {
|
||||||
|
let uid = UserId::from_uuid(user_id);
|
||||||
|
let entries = self.repo.outbox_entries_for_actor(&uid).await
|
||||||
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
|
entries.into_iter().map(|e| {
|
||||||
|
let note_url = self.urls.thought_url(e.thought.id.as_uuid());
|
||||||
|
let actor_url = self.urls.user_url(e.author_username.as_str());
|
||||||
|
let followers = self.urls.user_followers(e.author_username.as_str());
|
||||||
|
let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid()));
|
||||||
|
let note = ThoughtNote::new_public(
|
||||||
|
note_url.clone(), actor_url,
|
||||||
|
e.thought.content.as_str().to_owned(),
|
||||||
|
e.thought.created_at, in_reply_to,
|
||||||
|
e.thought.sensitive, e.thought.content_warning, followers,
|
||||||
|
);
|
||||||
|
Ok((note_url, serde_json::to_value(¬e)?))
|
||||||
|
}).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_local_objects_page(
|
||||||
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
before: Option<DateTime<Utc>>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> {
|
||||||
|
let uid = UserId::from_uuid(user_id);
|
||||||
|
let entries = self.repo.outbox_page_for_actor(&uid, before, limit).await
|
||||||
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
|
entries.into_iter().map(|e| {
|
||||||
|
let created_at = e.thought.created_at;
|
||||||
|
let note_url = self.urls.thought_url(e.thought.id.as_uuid());
|
||||||
|
let actor_url = self.urls.user_url(e.author_username.as_str());
|
||||||
|
let followers = self.urls.user_followers(e.author_username.as_str());
|
||||||
|
let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid()));
|
||||||
|
let note = ThoughtNote::new_public(
|
||||||
|
note_url.clone(), actor_url,
|
||||||
|
e.thought.content.as_str().to_owned(),
|
||||||
|
created_at, in_reply_to,
|
||||||
|
e.thought.sensitive, e.thought.content_warning, followers,
|
||||||
|
);
|
||||||
|
Ok((note_url, serde_json::to_value(¬e)?, created_at))
|
||||||
|
}).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_create(
|
||||||
|
&self,
|
||||||
|
ap_id: &Url,
|
||||||
|
actor_url: &Url,
|
||||||
|
object: serde_json::Value,
|
||||||
|
) -> Result<()> {
|
||||||
|
let note: ThoughtNote = serde_json::from_value(object)?;
|
||||||
|
let author_id = self.repo.intern_remote_actor(actor_url).await
|
||||||
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
|
self.repo.accept_note(
|
||||||
|
ap_id, &author_id,
|
||||||
|
¬e.content,
|
||||||
|
note.published,
|
||||||
|
note.sensitive,
|
||||||
|
note.summary,
|
||||||
|
).await.map_err(|e| anyhow!("{e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_update(
|
||||||
|
&self,
|
||||||
|
ap_id: &Url,
|
||||||
|
_actor_url: &Url,
|
||||||
|
object: serde_json::Value,
|
||||||
|
) -> Result<()> {
|
||||||
|
let note: ThoughtNote = serde_json::from_value(object)?;
|
||||||
|
self.repo.apply_note_update(ap_id, ¬e.content).await
|
||||||
|
.map_err(|e| anyhow!("{e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> {
|
||||||
|
self.repo.retract_note(ap_id).await.map_err(|e| anyhow!("{e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> {
|
||||||
|
self.repo.retract_actor_notes(actor_url).await.map_err(|e| anyhow!("{e}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count_local_posts(&self) -> Result<u64> {
|
||||||
|
self.repo.count_local_notes().await.map_err(|e| anyhow!("{e}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Rewrite `crates/adapters/activitypub/Cargo.toml`** — remove `postgres` and `sqlx`:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[package]
|
||||||
|
name = "activitypub"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
activitypub-base = { workspace = true }
|
||||||
|
activitypub_federation = "0.7.0-beta.11"
|
||||||
|
domain = { workspace = true }
|
||||||
|
url = { workspace = true }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
anyhow = { workspace = true }
|
||||||
|
chrono = { workspace = true }
|
||||||
|
uuid = { workspace = true }
|
||||||
|
async-trait = { workspace = true }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo check -p activitypub`
|
||||||
|
Expected: no errors. If there are unused import warnings for `sqlx` or `PgPool` — those are now gone, so the check should be clean.
|
||||||
|
|
||||||
|
- [ ] **Run full test suite:** `DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace`
|
||||||
|
Expected: all 67 tests pass (handler.rs has no unit tests of its own, but the workspace test suite must stay green).
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/adapters/activitypub/
|
||||||
|
git commit -m "refactor(activitypub): ThoughtsObjectHandler uses ActivityPubRepository port, drops postgres dep"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: Presentation — wire PgActivityPubRepository
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `crates/presentation/src/lib.rs`
|
||||||
|
|
||||||
|
The current `build_state` in `src/lib.rs` calls `ThoughtsObjectHandler::new(pool.clone(), &base_url)`. After Task 3, the signature changed to `ThoughtsObjectHandler::new(repo: Arc<dyn ActivityPubRepository>, base_url: &str)`.
|
||||||
|
|
||||||
|
- [ ] **Update the import and wiring in `crates/presentation/src/lib.rs`:**
|
||||||
|
|
||||||
|
Find the existing import line:
|
||||||
|
```rust
|
||||||
|
use activitypub::ThoughtsObjectHandler;
|
||||||
|
```
|
||||||
|
|
||||||
|
Add alongside it:
|
||||||
|
```rust
|
||||||
|
use postgres::activitypub::PgActivityPubRepository;
|
||||||
|
```
|
||||||
|
|
||||||
|
Find the existing call:
|
||||||
|
```rust
|
||||||
|
std::sync::Arc::new(ThoughtsObjectHandler::new(pool.clone(), &base_url)),
|
||||||
|
```
|
||||||
|
|
||||||
|
Replace with:
|
||||||
|
```rust
|
||||||
|
std::sync::Arc::new(ThoughtsObjectHandler::new(
|
||||||
|
std::sync::Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||||
|
&base_url,
|
||||||
|
)),
|
||||||
|
```
|
||||||
|
|
||||||
|
- [ ] **Run:** `cargo build -p presentation`
|
||||||
|
Expected: clean build, no errors.
|
||||||
|
|
||||||
|
- [ ] **Run full test suite:**
|
||||||
|
```bash
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost:5434/postgres cargo test --workspace 2>&1 | tail -3
|
||||||
|
```
|
||||||
|
Expected: all tests pass.
|
||||||
|
|
||||||
|
- [ ] **Verify dependency is gone:**
|
||||||
|
```bash
|
||||||
|
cargo tree -p activitypub | grep postgres
|
||||||
|
```
|
||||||
|
Expected: no output — `activitypub` no longer depends on `postgres`.
|
||||||
|
|
||||||
|
- [ ] **Commit:**
|
||||||
|
```bash
|
||||||
|
git add crates/presentation/src/lib.rs
|
||||||
|
git commit -m "fix: wire PgActivityPubRepository into ThoughtsObjectHandler — closes activitypub→postgres violation"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Self-Review
|
||||||
|
|
||||||
|
**Spec coverage:**
|
||||||
|
- ✅ `OutboxEntry` struct with `thought: Thought` + `author_username: Username`
|
||||||
|
- ✅ `ActivityPubRepository` trait in `domain/src/ports.rs` — 9 methods with federation vocabulary
|
||||||
|
- ✅ `TestStore impl ActivityPubRepository` — idempotent `intern_remote_actor`, empty stubs for others
|
||||||
|
- ✅ 2 domain unit tests covering idempotency and empty outbox
|
||||||
|
- ✅ `PgActivityPubRepository` in `postgres/src/activitypub.rs` — all 9 methods
|
||||||
|
- ✅ 3 postgres integration tests
|
||||||
|
- ✅ `ThoughtsObjectHandler` drops `PgPool`, receives `Arc<dyn ActivityPubRepository>`
|
||||||
|
- ✅ `activitypub/Cargo.toml` removes `postgres` and `sqlx` deps
|
||||||
|
- ✅ Presentation wires `PgActivityPubRepository` → `ThoughtsObjectHandler`
|
||||||
|
- ✅ `cargo tree` verification confirms violation is resolved
|
||||||
|
|
||||||
|
**Placeholder scan:** None.
|
||||||
|
|
||||||
|
**Type consistency:**
|
||||||
|
- `OutboxEntry` defined in `domain/src/ports.rs`, imported as `domain::ports::OutboxEntry` in postgres — consistent
|
||||||
|
- `ThoughtsObjectHandler::new(repo: Arc<dyn ActivityPubRepository>, base_url: &str)` — matches presentation wiring
|
||||||
|
- `PgActivityPubRepository::new(pool: PgPool)` — matches presentation wiring
|
||||||
|
- All 9 method signatures identical between trait definition (Task 1) and impl (Task 2) and handler calls (Task 3)
|
||||||
Reference in New Issue
Block a user