refactor(activitypub): ThoughtsObjectHandler uses ActivityPubRepository port, drops postgres dep
This commit is contained in:
@@ -7,8 +7,6 @@ edition = "2021"
|
|||||||
activitypub-base = { workspace = true }
|
activitypub-base = { workspace = true }
|
||||||
activitypub_federation = "0.7.0-beta.11"
|
activitypub_federation = "0.7.0-beta.11"
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
postgres = { workspace = true }
|
|
||||||
sqlx = { workspace = true }
|
|
||||||
url = { workspace = true }
|
url = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
|
|||||||
@@ -1,137 +1,113 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::PgPool;
|
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use activitypub_base::ApObjectHandler;
|
use activitypub_base::ApObjectHandler;
|
||||||
|
use domain::ports::ActivityPubRepository;
|
||||||
|
use domain::value_objects::UserId;
|
||||||
use crate::note::ThoughtNote;
|
use crate::note::ThoughtNote;
|
||||||
use crate::urls::ThoughtsUrls;
|
use crate::urls::ThoughtsUrls;
|
||||||
|
|
||||||
pub struct ThoughtsObjectHandler {
|
pub struct ThoughtsObjectHandler {
|
||||||
pool: PgPool,
|
repo: Arc<dyn ActivityPubRepository>,
|
||||||
urls: ThoughtsUrls,
|
urls: ThoughtsUrls,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ThoughtsObjectHandler {
|
impl ThoughtsObjectHandler {
|
||||||
pub fn new(pool: PgPool, base_url: &str) -> Self {
|
pub fn new(repo: Arc<dyn ActivityPubRepository>, base_url: &str) -> Self {
|
||||||
Self { pool, urls: ThoughtsUrls::new(base_url) }
|
Self { repo, urls: ThoughtsUrls::new(base_url) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ApObjectHandler for ThoughtsObjectHandler {
|
impl ApObjectHandler for ThoughtsObjectHandler {
|
||||||
async fn get_local_objects_for_user(&self, user_id: uuid::Uuid) -> Result<Vec<(Url, serde_json::Value)>> {
|
async fn get_local_objects_for_user(
|
||||||
#[derive(sqlx::FromRow)]
|
&self,
|
||||||
struct Row { id: uuid::Uuid, content: String, created_at: DateTime<Utc>, in_reply_to_id: Option<uuid::Uuid>, content_warning: Option<String>, sensitive: bool, username: String }
|
user_id: uuid::Uuid,
|
||||||
let rows = sqlx::query_as::<_, Row>(
|
) -> Result<Vec<(Url, serde_json::Value)>> {
|
||||||
"SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username
|
let uid = UserId::from_uuid(user_id);
|
||||||
FROM thoughts t JOIN users u ON u.id=t.user_id
|
let entries = self.repo.outbox_entries_for_actor(&uid).await
|
||||||
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'"
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
).bind(user_id).fetch_all(&self.pool).await.map_err(|e| anyhow!(e))?;
|
entries.into_iter().map(|e| {
|
||||||
|
let note_url = self.urls.thought_url(e.thought.id.as_uuid());
|
||||||
let mut result = Vec::new();
|
let actor_url = self.urls.user_url(e.author_username.as_str());
|
||||||
for r in rows {
|
let followers = self.urls.user_followers(e.author_username.as_str());
|
||||||
let note_url = self.urls.thought_url(r.id);
|
let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid()));
|
||||||
let actor_url = self.urls.user_url(&r.username);
|
let note = ThoughtNote::new_public(
|
||||||
let followers_url = self.urls.user_followers(&r.username);
|
note_url.clone(), actor_url,
|
||||||
let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id));
|
e.thought.content.as_str().to_owned(),
|
||||||
let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content, r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url);
|
e.thought.created_at, in_reply_to,
|
||||||
result.push((note_url, serde_json::to_value(¬e)?));
|
e.thought.sensitive, e.thought.content_warning, followers,
|
||||||
}
|
);
|
||||||
Ok(result)
|
Ok((note_url, serde_json::to_value(¬e)?))
|
||||||
|
}).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_local_objects_page(
|
async fn get_local_objects_page(
|
||||||
&self, user_id: uuid::Uuid, before: Option<DateTime<Utc>>, limit: usize,
|
&self,
|
||||||
|
user_id: uuid::Uuid,
|
||||||
|
before: Option<DateTime<Utc>>,
|
||||||
|
limit: usize,
|
||||||
) -> Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> {
|
) -> Result<Vec<(Url, serde_json::Value, DateTime<Utc>)>> {
|
||||||
#[derive(sqlx::FromRow)]
|
let uid = UserId::from_uuid(user_id);
|
||||||
struct Row { id: uuid::Uuid, content: String, created_at: DateTime<Utc>, in_reply_to_id: Option<uuid::Uuid>, content_warning: Option<String>, sensitive: bool, username: String }
|
let entries = self.repo.outbox_page_for_actor(&uid, before, limit).await
|
||||||
let rows = if let Some(before) = before {
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
sqlx::query_as::<_, Row>(
|
entries.into_iter().map(|e| {
|
||||||
"SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username
|
let created_at = e.thought.created_at;
|
||||||
FROM thoughts t JOIN users u ON u.id=t.user_id
|
let note_url = self.urls.thought_url(e.thought.id.as_uuid());
|
||||||
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public' AND t.created_at < $2
|
let actor_url = self.urls.user_url(e.author_username.as_str());
|
||||||
ORDER BY t.created_at DESC LIMIT $3"
|
let followers = self.urls.user_followers(e.author_username.as_str());
|
||||||
).bind(user_id).bind(before).bind(limit as i64).fetch_all(&self.pool).await
|
let in_reply_to = e.thought.in_reply_to_id.map(|id| self.urls.thought_url(id.as_uuid()));
|
||||||
} else {
|
let note = ThoughtNote::new_public(
|
||||||
sqlx::query_as::<_, Row>(
|
note_url.clone(), actor_url,
|
||||||
"SELECT t.id, t.content, t.created_at, t.in_reply_to_id, t.content_warning, t.sensitive, u.username
|
e.thought.content.as_str().to_owned(),
|
||||||
FROM thoughts t JOIN users u ON u.id=t.user_id
|
created_at, in_reply_to,
|
||||||
WHERE t.user_id=$1 AND t.local=true AND t.visibility='public'
|
e.thought.sensitive, e.thought.content_warning, followers,
|
||||||
ORDER BY t.created_at DESC LIMIT $2"
|
);
|
||||||
).bind(user_id).bind(limit as i64).fetch_all(&self.pool).await
|
Ok((note_url, serde_json::to_value(¬e)?, created_at))
|
||||||
}.map_err(|e| anyhow!(e))?;
|
}).collect()
|
||||||
|
|
||||||
let mut result = Vec::new();
|
|
||||||
for r in rows {
|
|
||||||
let note_url = self.urls.thought_url(r.id);
|
|
||||||
let actor_url = self.urls.user_url(&r.username);
|
|
||||||
let followers_url = self.urls.user_followers(&r.username);
|
|
||||||
let in_reply_to = r.in_reply_to_id.map(|id| self.urls.thought_url(id));
|
|
||||||
let note = ThoughtNote::new_public(note_url.clone(), actor_url, r.content.clone(), r.created_at, in_reply_to, r.sensitive, r.content_warning, followers_url);
|
|
||||||
result.push((note_url, serde_json::to_value(¬e)?, r.created_at));
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_create(&self, ap_id: &Url, actor_url: &Url, object: serde_json::Value) -> Result<()> {
|
async fn on_create(
|
||||||
|
&self,
|
||||||
|
ap_id: &Url,
|
||||||
|
actor_url: &Url,
|
||||||
|
object: serde_json::Value,
|
||||||
|
) -> Result<()> {
|
||||||
let note: ThoughtNote = serde_json::from_value(object)?;
|
let note: ThoughtNote = serde_json::from_value(object)?;
|
||||||
let actor_url_str = actor_url.to_string();
|
let author_id = self.repo.intern_remote_actor(actor_url).await
|
||||||
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
// Find or create a remote user placeholder
|
self.repo.accept_note(
|
||||||
let existing: Option<uuid::Uuid> = sqlx::query_scalar(
|
ap_id, &author_id,
|
||||||
"SELECT id FROM users WHERE ap_id=$1"
|
¬e.content,
|
||||||
).bind(&actor_url_str).fetch_optional(&self.pool).await.map_err(|e| anyhow!(e))?;
|
note.published,
|
||||||
|
note.sensitive,
|
||||||
let user_id = match existing {
|
note.summary,
|
||||||
Some(id) => id,
|
).await.map_err(|e| anyhow!("{e}"))
|
||||||
None => {
|
|
||||||
let uid = uuid::Uuid::new_v4();
|
|
||||||
let handle = actor_url.path().trim_start_matches('/').replace('/', "_");
|
|
||||||
sqlx::query(
|
|
||||||
"INSERT INTO users(id,username,email,password_hash,local,ap_id,created_at,updated_at)
|
|
||||||
VALUES($1,$2,$3,'',false,$4,NOW(),NOW()) ON CONFLICT DO NOTHING"
|
|
||||||
).bind(uid).bind(&handle).bind(format!("{}@remote", uid)).bind(&actor_url_str)
|
|
||||||
.execute(&self.pool).await.map_err(|e| anyhow!(e))?;
|
|
||||||
uid
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let thought_id = uuid::Uuid::new_v4();
|
|
||||||
let content: String = note.content.chars().take(500).collect();
|
|
||||||
sqlx::query(
|
|
||||||
"INSERT INTO thoughts(id,user_id,content,ap_id,visibility,sensitive,local,content_warning,created_at)
|
|
||||||
VALUES($1,$2,$3,$4,'public',$5,false,$6,$7) ON CONFLICT(ap_id) DO NOTHING"
|
|
||||||
).bind(thought_id).bind(user_id).bind(&content).bind(ap_id.as_str())
|
|
||||||
.bind(note.sensitive).bind(note.summary).bind(note.published)
|
|
||||||
.execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_update(&self, ap_id: &Url, _actor_url: &Url, object: serde_json::Value) -> Result<()> {
|
async fn on_update(
|
||||||
|
&self,
|
||||||
|
ap_id: &Url,
|
||||||
|
_actor_url: &Url,
|
||||||
|
object: serde_json::Value,
|
||||||
|
) -> Result<()> {
|
||||||
let note: ThoughtNote = serde_json::from_value(object)?;
|
let note: ThoughtNote = serde_json::from_value(object)?;
|
||||||
let content: String = note.content.chars().take(500).collect();
|
self.repo.apply_note_update(ap_id, ¬e.content).await
|
||||||
sqlx::query("UPDATE thoughts SET content=$2, updated_at=NOW() WHERE ap_id=$1")
|
.map_err(|e| anyhow!("{e}"))
|
||||||
.bind(ap_id.as_str()).bind(&content)
|
|
||||||
.execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> {
|
async fn on_delete(&self, ap_id: &Url, _actor_url: &Url) -> Result<()> {
|
||||||
sqlx::query("DELETE FROM thoughts WHERE ap_id=$1 AND local=false")
|
self.repo.retract_note(ap_id).await.map_err(|e| anyhow!("{e}"))
|
||||||
.bind(ap_id.as_str())
|
|
||||||
.execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> {
|
async fn on_actor_removed(&self, actor_url: &Url) -> Result<()> {
|
||||||
sqlx::query(
|
self.repo.retract_actor_notes(actor_url).await.map_err(|e| anyhow!("{e}"))
|
||||||
"DELETE FROM thoughts WHERE local=false AND user_id=(SELECT id FROM users WHERE ap_id=$1)"
|
|
||||||
).bind(actor_url.as_str())
|
|
||||||
.execute(&self.pool).await.map_err(|e| anyhow!(e)).map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count_local_posts(&self) -> Result<u64> {
|
async fn count_local_posts(&self) -> Result<u64> {
|
||||||
let n: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM thoughts WHERE local=true")
|
self.repo.count_local_notes().await.map_err(|e| anyhow!("{e}"))
|
||||||
.fetch_one(&self.pool).await.map_err(|e| anyhow!(e))?;
|
|
||||||
Ok(n as u64)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user