Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m33s
test / unit (pull_request) Failing after 10m46s
test / integration (pull_request) Failing after 16m54s
299 lines
10 KiB
Rust
299 lines
10 KiB
Rust
use async_trait::async_trait;
|
|
use chrono::{DateTime, Utc};
|
|
use sqlx::PgPool;
|
|
use url::Url;
|
|
|
|
use domain::{
|
|
errors::DomainError,
|
|
models::thought::{Thought, Visibility},
|
|
ports::{ActivityPubRepository, OutboxEntry},
|
|
value_objects::{Content, ThoughtId, UserId, Username},
|
|
};
|
|
|
|
pub struct PgActivityPubRepository {
|
|
pool: PgPool,
|
|
}
|
|
|
|
impl PgActivityPubRepository {
|
|
pub fn new(pool: PgPool) -> Self {
|
|
Self { pool }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ActivityPubRepository for PgActivityPubRepository {
|
|
async fn outbox_entries_for_actor(
|
|
&self,
|
|
user_id: &UserId,
|
|
) -> Result<Vec<OutboxEntry>, DomainError> {
|
|
#[derive(sqlx::FromRow)]
|
|
struct Row {
|
|
id: uuid::Uuid,
|
|
user_id: uuid::Uuid,
|
|
content: String,
|
|
created_at: DateTime<Utc>,
|
|
in_reply_to_id: Option<uuid::Uuid>,
|
|
content_warning: Option<String>,
|
|
sensitive: bool,
|
|
username: String,
|
|
updated_at: Option<DateTime<Utc>>,
|
|
}
|
|
sqlx::query_as::<_, Row>(
|
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
|
|
ORDER BY t.created_at DESC",
|
|
)
|
|
.bind(user_id.as_uuid())
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|rows| {
|
|
rows.into_iter()
|
|
.map(|r| OutboxEntry {
|
|
thought: Thought {
|
|
id: ThoughtId::from_uuid(r.id),
|
|
user_id: UserId::from_uuid(r.user_id),
|
|
content: Content::new_remote(r.content),
|
|
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
|
|
in_reply_to_url: None,
|
|
ap_id: None,
|
|
visibility: Visibility::Public,
|
|
content_warning: r.content_warning,
|
|
sensitive: r.sensitive,
|
|
local: true,
|
|
created_at: r.created_at,
|
|
updated_at: r.updated_at,
|
|
},
|
|
author_username: Username::from_trusted(r.username),
|
|
})
|
|
.collect()
|
|
})
|
|
}
|
|
|
|
async fn outbox_page_for_actor(
|
|
&self,
|
|
user_id: &UserId,
|
|
before: Option<DateTime<Utc>>,
|
|
limit: usize,
|
|
) -> Result<Vec<OutboxEntry>, DomainError> {
|
|
#[derive(sqlx::FromRow)]
|
|
struct Row {
|
|
id: uuid::Uuid,
|
|
user_id: uuid::Uuid,
|
|
content: String,
|
|
created_at: DateTime<Utc>,
|
|
in_reply_to_id: Option<uuid::Uuid>,
|
|
content_warning: Option<String>,
|
|
sensitive: bool,
|
|
username: String,
|
|
updated_at: Option<DateTime<Utc>>,
|
|
}
|
|
let rows = if let Some(before) = before {
|
|
sqlx::query_as::<_, Row>(
|
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2
|
|
ORDER BY t.created_at DESC LIMIT $3",
|
|
)
|
|
.bind(user_id.as_uuid())
|
|
.bind(before)
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
} else {
|
|
sqlx::query_as::<_, Row>(
|
|
"SELECT t.id, t.user_id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username, t.updated_at
|
|
FROM thoughts t JOIN users u ON u.id=t.user_id
|
|
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
|
|
ORDER BY t.created_at DESC LIMIT $2",
|
|
)
|
|
.bind(user_id.as_uuid())
|
|
.bind(limit as i64)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
}
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
|
|
Ok(rows
|
|
.into_iter()
|
|
.map(|r| OutboxEntry {
|
|
thought: Thought {
|
|
id: ThoughtId::from_uuid(r.id),
|
|
user_id: UserId::from_uuid(r.user_id),
|
|
content: Content::new_remote(r.content),
|
|
in_reply_to_id: r.in_reply_to_id.map(ThoughtId::from_uuid),
|
|
in_reply_to_url: None,
|
|
ap_id: None,
|
|
visibility: Visibility::Public,
|
|
content_warning: r.content_warning,
|
|
sensitive: r.sensitive,
|
|
local: true,
|
|
created_at: r.created_at,
|
|
updated_at: r.updated_at,
|
|
},
|
|
author_username: Username::from_trusted(r.username),
|
|
})
|
|
.collect())
|
|
}
|
|
|
|
async fn find_remote_actor_id(
|
|
&self,
|
|
actor_ap_url: &Url,
|
|
) -> Result<Option<UserId>, DomainError> {
|
|
sqlx::query_scalar::<_, uuid::Uuid>("SELECT id FROM users WHERE ap_id=$1")
|
|
.bind(actor_ap_url.as_str())
|
|
.fetch_optional(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|o| o.map(UserId::from_uuid))
|
|
}
|
|
|
|
async fn intern_remote_actor(&self, actor_ap_url: &Url) -> Result<UserId, DomainError> {
|
|
if let Some(id) = self.find_remote_actor_id(actor_ap_url).await? {
|
|
return Ok(id);
|
|
}
|
|
let new_id = uuid::Uuid::new_v4();
|
|
let raw = actor_ap_url
|
|
.path()
|
|
.trim_start_matches('/')
|
|
.replace('/', "_");
|
|
// username column is VARCHAR(32); truncate long paths (e.g. UUID-based actor URLs)
|
|
let handle = if raw.len() <= 32 {
|
|
raw
|
|
} else {
|
|
format!("remote_{}", &new_id.to_string()[..13])
|
|
};
|
|
sqlx::query(
|
|
"INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at)
|
|
VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT(ap_id) DO NOTHING",
|
|
)
|
|
.bind(new_id)
|
|
.bind(&handle)
|
|
.bind(format!("{}@remote", new_id))
|
|
.bind(actor_ap_url.as_str())
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
// Re-fetch to get whichever id won the race
|
|
self.find_remote_actor_id(actor_ap_url)
|
|
.await?
|
|
.ok_or_else(|| {
|
|
DomainError::Internal(
|
|
"intern_remote_actor: insert succeeded but row not found".into(),
|
|
)
|
|
})
|
|
}
|
|
|
|
async fn accept_note(
|
|
&self,
|
|
ap_id: &Url,
|
|
author_id: &UserId,
|
|
content: &str,
|
|
published: DateTime<Utc>,
|
|
sensitive: bool,
|
|
content_warning: Option<String>,
|
|
visibility: &str,
|
|
) -> Result<(), DomainError> {
|
|
let capped: String = content.chars().take(500).collect();
|
|
sqlx::query(
|
|
"INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at)
|
|
VALUES($1,$2,$3,$4,$8,$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING",
|
|
)
|
|
.bind(uuid::Uuid::new_v4())
|
|
.bind(author_id.as_uuid())
|
|
.bind(&capped)
|
|
.bind(ap_id.as_str())
|
|
.bind(sensitive)
|
|
.bind(content_warning)
|
|
.bind(published)
|
|
.bind(visibility)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|_| ())
|
|
}
|
|
|
|
async fn apply_note_update(&self, ap_id: &Url, new_content: &str) -> Result<(), DomainError> {
|
|
let capped: String = new_content.chars().take(500).collect();
|
|
sqlx::query(
|
|
"UPDATE thoughts SET content=$2,updated_at=NOW() WHERE ap_id=$1 AND local=false",
|
|
)
|
|
.bind(ap_id.as_str())
|
|
.bind(&capped)
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|_| ())
|
|
}
|
|
|
|
async fn retract_note(&self, ap_id: &Url) -> Result<(), DomainError> {
|
|
sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false")
|
|
.bind(ap_id.as_str())
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|_| ())
|
|
}
|
|
|
|
async fn retract_actor_notes(&self, actor_ap_url: &Url) -> Result<(), DomainError> {
|
|
sqlx::query(
|
|
"DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)",
|
|
)
|
|
.bind(actor_ap_url.as_str())
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
|
.map(|_| ())
|
|
}
|
|
|
|
async fn count_local_notes(&self) -> Result<u64, DomainError> {
|
|
let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true")
|
|
.fetch_one(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
Ok(n as u64)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use domain::ports::ActivityPubRepository;
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn intern_remote_actor_is_idempotent(pool: sqlx::PgPool) {
|
|
let repo = PgActivityPubRepository::new(pool);
|
|
let url = url::Url::parse("https://mastodon.social/users/alice").unwrap();
|
|
let id1 = repo.intern_remote_actor(&url).await.unwrap();
|
|
let id2 = repo.intern_remote_actor(&url).await.unwrap();
|
|
assert_eq!(id1, id2);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn accept_and_retract_note(pool: sqlx::PgPool) {
|
|
let repo = PgActivityPubRepository::new(pool);
|
|
let actor_url = url::Url::parse("https://remote.example/users/bob").unwrap();
|
|
let ap_id = url::Url::parse("https://remote.example/notes/1").unwrap();
|
|
let author = repo.intern_remote_actor(&actor_url).await.unwrap();
|
|
repo.accept_note(
|
|
&ap_id,
|
|
&author,
|
|
"hello from remote",
|
|
chrono::Utc::now(),
|
|
false,
|
|
None,
|
|
"public",
|
|
)
|
|
.await
|
|
.unwrap();
|
|
repo.retract_note(&ap_id).await.unwrap();
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn count_local_notes_excludes_remote(pool: sqlx::PgPool) {
|
|
let repo = PgActivityPubRepository::new(pool);
|
|
assert_eq!(repo.count_local_notes().await.unwrap(), 0);
|
|
}
|
|
}
|