feat(activitypub): index hashtags from incoming federated notes
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m11s
test / integration (pull_request) Failing after 18m6s
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 5m2s
test / unit (pull_request) Successful in 16m11s
test / integration (pull_request) Failing after 18m6s
- accept_note now returns ThoughtId (INSERT then SELECT by ap_id) - ThoughtsObjectHandler extracts Hashtag entries from AP tag array, strips #, lowercases - Calls TagRepository.find_or_create + attach_to_thought for each tag; failures silenced - TagRepository injected into handler via bootstrap and worker factories
This commit is contained in:
@@ -72,7 +72,7 @@ pub trait ActivityPubRepository: Send + Sync {
|
|||||||
content_warning: Option<String>,
|
content_warning: Option<String>,
|
||||||
visibility: &str,
|
visibility: &str,
|
||||||
in_reply_to: Option<&str>,
|
in_reply_to: Option<&str>,
|
||||||
) -> Result<(), DomainError>;
|
) -> Result<ThoughtId, DomainError>;
|
||||||
|
|
||||||
/// Apply an Update to a previously accepted remote Note.
|
/// Apply an Update to a previously accepted remote Note.
|
||||||
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>;
|
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError>;
|
||||||
|
|||||||
@@ -10,13 +10,14 @@ use url::Url;
|
|||||||
use crate::note::ThoughtNote;
|
use crate::note::ThoughtNote;
|
||||||
use crate::urls::ThoughtsUrls;
|
use crate::urls::ThoughtsUrls;
|
||||||
use activitypub_base::{ActivityPubRepository, ApObjectHandler};
|
use activitypub_base::{ActivityPubRepository, ApObjectHandler};
|
||||||
use domain::ports::EventPublisher;
|
use domain::ports::{EventPublisher, TagRepository};
|
||||||
use domain::value_objects::UserId;
|
use domain::value_objects::UserId;
|
||||||
|
|
||||||
pub struct ThoughtsObjectHandler {
|
pub struct ThoughtsObjectHandler {
|
||||||
repo: Arc<dyn ActivityPubRepository>,
|
repo: Arc<dyn ActivityPubRepository>,
|
||||||
urls: ThoughtsUrls,
|
urls: ThoughtsUrls,
|
||||||
event_publisher: Option<Arc<dyn EventPublisher>>,
|
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||||
|
tag_repo: Arc<dyn TagRepository>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ThoughtsObjectHandler {
|
impl ThoughtsObjectHandler {
|
||||||
@@ -24,11 +25,13 @@ impl ThoughtsObjectHandler {
|
|||||||
repo: Arc<dyn ActivityPubRepository>,
|
repo: Arc<dyn ActivityPubRepository>,
|
||||||
base_url: &str,
|
base_url: &str,
|
||||||
event_publisher: Option<Arc<dyn EventPublisher>>,
|
event_publisher: Option<Arc<dyn EventPublisher>>,
|
||||||
|
tag_repo: Arc<dyn TagRepository>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
repo,
|
repo,
|
||||||
urls: ThoughtsUrls::new(base_url),
|
urls: ThoughtsUrls::new(base_url),
|
||||||
event_publisher,
|
event_publisher,
|
||||||
|
tag_repo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -138,7 +141,7 @@ impl ApObjectHandler for ThoughtsObjectHandler {
|
|||||||
"direct"
|
"direct"
|
||||||
};
|
};
|
||||||
|
|
||||||
self.repo
|
let thought_id = self.repo
|
||||||
.accept_note(
|
.accept_note(
|
||||||
ap_id.as_str(),
|
ap_id.as_str(),
|
||||||
&author_id,
|
&author_id,
|
||||||
@@ -152,6 +155,22 @@ impl ApObjectHandler for ThoughtsObjectHandler {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow!("{e}"))?;
|
.map_err(|e| anyhow!("{e}"))?;
|
||||||
|
|
||||||
|
// Extract and index hashtags from the AP tag array.
|
||||||
|
let hashtag_names: Vec<String> = note
|
||||||
|
.tag
|
||||||
|
.iter()
|
||||||
|
.filter(|t| t.get("type").and_then(|v| v.as_str()) == Some("Hashtag"))
|
||||||
|
.filter_map(|t| t.get("name").and_then(|v| v.as_str()))
|
||||||
|
.map(|name| name.trim_start_matches('#').to_lowercase())
|
||||||
|
.filter(|name| !name.is_empty())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for name in hashtag_names {
|
||||||
|
if let Ok(tag) = self.tag_repo.find_or_create(&name).await {
|
||||||
|
let _ = self.tag_repo.attach_to_thought(&thought_id, tag.id).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Fire mention notifications for local @mentions in the note's tag array.
|
// Fire mention notifications for local @mentions in the note's tag array.
|
||||||
let base_url = url::Url::parse(&self.urls.base_url)
|
let base_url = url::Url::parse(&self.urls.base_url)
|
||||||
.ok()
|
.ok()
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ impl ActivityPubRepository for PgActivityPubRepository {
|
|||||||
content_warning: Option<String>,
|
content_warning: Option<String>,
|
||||||
visibility: &str,
|
visibility: &str,
|
||||||
in_reply_to: Option<&str>,
|
in_reply_to: Option<&str>,
|
||||||
) -> Result<(), DomainError> {
|
) -> Result<ThoughtId, DomainError> {
|
||||||
let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
|
let capped: String = content.chars().take(MAX_REMOTE_CONTENT_CHARS).collect();
|
||||||
let (in_reply_to_id, in_reply_to_url) = match in_reply_to {
|
let (in_reply_to_id, in_reply_to_url) = match in_reply_to {
|
||||||
Some(url) => {
|
Some(url) => {
|
||||||
@@ -251,8 +251,16 @@ impl ActivityPubRepository for PgActivityPubRepository {
|
|||||||
.bind(&in_reply_to_url)
|
.bind(&in_reply_to_url)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await
|
.await
|
||||||
.into_domain()
|
.into_domain()?;
|
||||||
.map(|_| ())
|
|
||||||
|
// SELECT the id — works whether the INSERT was a no-op or not (idempotent).
|
||||||
|
let row: (uuid::Uuid,) =
|
||||||
|
sqlx::query_as("SELECT id FROM thoughts WHERE ap_id=$1")
|
||||||
|
.bind(ap_id)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
.into_domain()?;
|
||||||
|
Ok(ThoughtId::from_uuid(row.0))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError> {
|
async fn apply_note_update(&self, ap_id: &str, new_content: &str) -> Result<(), DomainError> {
|
||||||
@@ -365,4 +373,34 @@ mod tests {
|
|||||||
let repo = PgActivityPubRepository::new(pool);
|
let repo = PgActivityPubRepository::new(pool);
|
||||||
assert_eq!(repo.count_local_notes().await.unwrap(), 0);
|
assert_eq!(repo.count_local_notes().await.unwrap(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn accept_note_returns_thought_id(pool: sqlx::PgPool) {
|
||||||
|
let repo = PgActivityPubRepository::new(pool.clone());
|
||||||
|
let actor_user_id = repo
|
||||||
|
.intern_remote_actor("https://remote.example/users/alice")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let thought_id = repo
|
||||||
|
.accept_note(
|
||||||
|
"https://remote.example/notes/1",
|
||||||
|
&actor_user_id,
|
||||||
|
"Hello #rust world",
|
||||||
|
chrono::Utc::now(),
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
"public",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let row: (uuid::Uuid,) = sqlx::query_as("SELECT id FROM thoughts WHERE ap_id=$1")
|
||||||
|
.bind("https://remote.example/notes/1")
|
||||||
|
.fetch_one(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(thought_id.as_uuid(), row.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -103,8 +103,8 @@ impl ActivityPubRepository for TestApRepo {
|
|||||||
_content_warning: Option<String>,
|
_content_warning: Option<String>,
|
||||||
_visibility: &str,
|
_visibility: &str,
|
||||||
_in_reply_to: Option<&str>,
|
_in_reply_to: Option<&str>,
|
||||||
) -> Result<(), DomainError> {
|
) -> Result<ThoughtId, DomainError> {
|
||||||
Ok(())
|
Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4()))
|
||||||
}
|
}
|
||||||
async fn apply_note_update(
|
async fn apply_note_update(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -77,6 +77,7 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
|||||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||||
&cfg.base_url,
|
&cfg.base_url,
|
||||||
Some(event_publisher.clone()),
|
Some(event_publisher.clone()),
|
||||||
|
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||||
)),
|
)),
|
||||||
cfg.base_url.clone(),
|
cfg.base_url.clone(),
|
||||||
cfg.allow_registration,
|
cfg.allow_registration,
|
||||||
|
|||||||
@@ -76,8 +76,8 @@ impl ActivityPubRepository for NoOpApRepo {
|
|||||||
_content_warning: Option<String>,
|
_content_warning: Option<String>,
|
||||||
_visibility: &str,
|
_visibility: &str,
|
||||||
_in_reply_to: Option<&str>,
|
_in_reply_to: Option<&str>,
|
||||||
) -> Result<(), DomainError> {
|
) -> Result<ThoughtId, DomainError> {
|
||||||
Ok(())
|
Ok(ThoughtId::from_uuid(uuid::Uuid::new_v4()))
|
||||||
}
|
}
|
||||||
async fn apply_note_update(
|
async fn apply_note_update(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
|||||||
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
Arc::new(PgActivityPubRepository::new(pool.clone())),
|
||||||
base_url,
|
base_url,
|
||||||
None,
|
None,
|
||||||
|
Arc::new(postgres::tag::PgTagRepository::new(pool.clone())),
|
||||||
)),
|
)),
|
||||||
base_url.to_string(),
|
base_url.to_string(),
|
||||||
false,
|
false,
|
||||||
|
|||||||
Reference in New Issue
Block a user