Refactor code for improved readability and consistency

- Simplified error handling in `PostgresApContentQuery` and `SqliteApContentQuery` by aligning the formatting of `try_get` calls.
- Removed unnecessary line breaks and improved formatting in various repository implementations for better readability.
- Consolidated imports in `lib.rs` and `factory.rs` to maintain a cleaner structure.
- Enhanced consistency in async function signatures across multiple files.
- Updated test helpers and use cases to streamline code and improve clarity.
- Refactored `InMemory` repositories to enhance readability by aligning method implementations.
This commit is contained in:
2026-05-29 10:58:44 +02:00
parent 412ab12695
commit 68a939f6c4
23 changed files with 578 additions and 224 deletions

View File

@@ -4,12 +4,12 @@ use chrono::{NaiveDateTime, Utc};
use sqlx::{PgPool, Row};
use activitypub::RemoteReviewRepository;
use domain::models::{RemoteWatchlistEntry, Review, ReviewSource};
use domain::ports::RemoteWatchlistRepository;
use k_ap::{
ActivityRepository, ActorRepository, BlockedDomain, BlocklistRepository, FollowRepository,
Follower, FollowerStatus, FollowingStatus, RemoteActor,
};
use domain::models::{RemoteWatchlistEntry, Review, ReviewSource};
use domain::ports::RemoteWatchlistRepository;
fn datetime_to_str(dt: &NaiveDateTime) -> String {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
@@ -116,7 +116,11 @@ impl FollowRepository for PostgresFederationRepository {
Ok(row)
}
async fn remove_follower(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<()> {
async fn remove_follower(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<()> {
let uid = local_user_id.to_string();
sqlx::query("DELETE FROM ap_followers WHERE local_user_id = $1 AND remote_actor_url = $2")
.bind(&uid)
@@ -134,13 +138,24 @@ impl FollowRepository for PostgresFederationRepository {
WHERE f.local_user_id = $1"
);
let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?;
Ok(rows.iter().map(|row| {
let status_str: String = row.get("status");
Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) }
}).collect())
Ok(rows
.iter()
.map(|row| {
let status_str: String = row.get("status");
Follower {
actor: pg_remote_actor(row, "remote_actor_url"),
status: str_to_status(&status_str),
}
})
.collect())
}
async fn get_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result<Vec<Follower>> {
async fn get_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<Follower>> {
let uid = local_user_id.to_string();
let q = format!(
"SELECT f.remote_actor_url, f.status, {PG_ACTOR_COLS}
@@ -148,22 +163,41 @@ impl FollowRepository for PostgresFederationRepository {
WHERE f.local_user_id = $1 AND f.status = 'accepted'
ORDER BY f.created_at ASC LIMIT $2 OFFSET $3"
);
let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?;
Ok(rows.iter().map(|row| {
let status_str: String = row.get("status");
Follower { actor: pg_remote_actor(row, "remote_actor_url"), status: str_to_status(&status_str) }
}).collect())
let rows = sqlx::query(&q)
.bind(&uid)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows
.iter()
.map(|row| {
let status_str: String = row.get("status");
Follower {
actor: pg_remote_actor(row, "remote_actor_url"),
status: str_to_status(&status_str),
}
})
.collect())
}
async fn count_followers(&self, local_user_id: uuid::Uuid) -> Result<usize> {
let uid = local_user_id.to_string();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'",
).bind(&uid).fetch_one(&self.pool).await?;
)
.bind(&uid)
.fetch_one(&self.pool)
.await?;
Ok(count as usize)
}
async fn update_follower_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowerStatus) -> Result<()> {
async fn update_follower_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowerStatus,
) -> Result<()> {
let uid = local_user_id.to_string();
let status_str = status_to_str(&status);
let result = sqlx::query(
@@ -183,10 +217,16 @@ impl FollowRepository for PostgresFederationRepository {
WHERE f.local_user_id = $1 AND f.status = 'pending'"
);
let rows = sqlx::query(&q).bind(&uid).fetch_all(&self.pool).await?;
Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect())
Ok(rows
.iter()
.map(|row| pg_remote_actor(row, "remote_actor_url"))
.collect())
}
async fn get_accepted_follower_inboxes(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>> {
async fn get_accepted_follower_inboxes(
&self,
local_user_id: uuid::Uuid,
) -> Result<Vec<String>> {
let uid = local_user_id.to_string();
let rows = sqlx::query(
"SELECT DISTINCT COALESCE(a.shared_inbox_url, a.inbox_url) as inbox
@@ -196,19 +236,33 @@ impl FollowRepository for PostgresFederationRepository {
AND f.remote_actor_url NOT IN (
SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1
)",
).bind(&uid).fetch_all(&self.pool).await?;
Ok(rows.iter().filter_map(|r| r.try_get::<String, _>("inbox").ok()).collect())
)
.bind(&uid)
.fetch_all(&self.pool)
.await?;
Ok(rows
.iter()
.filter_map(|r| r.try_get::<String, _>("inbox").ok())
.collect())
}
async fn count_accepted_followers(&self, local_user_id: uuid::Uuid) -> Result<usize> {
let uid = local_user_id.to_string();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ap_followers WHERE local_user_id = $1 AND status = 'accepted'",
).bind(&uid).fetch_one(&self.pool).await?;
)
.bind(&uid)
.fetch_one(&self.pool)
.await?;
Ok(count as usize)
}
async fn get_accepted_followers_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result<Vec<RemoteActor>> {
async fn get_accepted_followers_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<RemoteActor>> {
let uid = local_user_id.to_string();
let q = format!(
"SELECT f.remote_actor_url, {PG_ACTOR_COLS}
@@ -216,11 +270,24 @@ impl FollowRepository for PostgresFederationRepository {
WHERE f.local_user_id = $1 AND f.status = 'accepted'
ORDER BY f.created_at ASC LIMIT $2 OFFSET $3"
);
let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?;
Ok(rows.iter().map(|row| pg_remote_actor(row, "remote_actor_url")).collect())
let rows = sqlx::query(&q)
.bind(&uid)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows
.iter()
.map(|row| pg_remote_actor(row, "remote_actor_url"))
.collect())
}
async fn add_following(&self, local_user_id: uuid::Uuid, actor: RemoteActor, follow_activity_id: &str) -> Result<()> {
async fn add_following(
&self,
local_user_id: uuid::Uuid,
actor: RemoteActor,
follow_activity_id: &str,
) -> Result<()> {
let uid = local_user_id.to_string();
let now = Utc::now().naive_utc();
let created_at = datetime_to_str(&now);
@@ -232,7 +299,11 @@ impl FollowRepository for PostgresFederationRepository {
Ok(())
}
async fn get_follow_activity_id(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<Option<String>> {
async fn get_follow_activity_id(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>> {
let uid = local_user_id.to_string();
let row: Option<String> = sqlx::query_scalar(
"SELECT follow_activity_id FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2",
@@ -243,7 +314,10 @@ impl FollowRepository for PostgresFederationRepository {
async fn remove_following(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
let uid = local_user_id.to_string();
sqlx::query("DELETE FROM ap_following WHERE local_user_id = $1 AND remote_actor_url = $2")
.bind(&uid).bind(actor_url).execute(&self.pool).await?;
.bind(&uid)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
@@ -262,11 +336,19 @@ impl FollowRepository for PostgresFederationRepository {
let uid = local_user_id.to_string();
let count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ap_following WHERE local_user_id = $1 AND status = 'accepted'",
).bind(&uid).fetch_one(&self.pool).await?;
)
.bind(&uid)
.fetch_one(&self.pool)
.await?;
Ok(count as usize)
}
async fn get_following_page(&self, local_user_id: uuid::Uuid, offset: u32, limit: usize) -> Result<Vec<RemoteActor>> {
async fn get_following_page(
&self,
local_user_id: uuid::Uuid,
offset: u32,
limit: usize,
) -> Result<Vec<RemoteActor>> {
let uid = local_user_id.to_string();
let q = format!(
"SELECT a.url, {PG_ACTOR_COLS}
@@ -274,13 +356,26 @@ impl FollowRepository for PostgresFederationRepository {
WHERE f.local_user_id = $1 AND f.status = 'accepted'
ORDER BY f.created_at ASC LIMIT $2 OFFSET $3"
);
let rows = sqlx::query(&q).bind(&uid).bind(limit as i64).bind(offset as i64).fetch_all(&self.pool).await?;
let rows = sqlx::query(&q)
.bind(&uid)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows.iter().map(|row| pg_remote_actor(row, "url")).collect())
}
async fn update_following_status(&self, local_user_id: uuid::Uuid, remote_actor_url: &str, status: FollowingStatus) -> Result<()> {
async fn update_following_status(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
status: FollowingStatus,
) -> Result<()> {
let uid = local_user_id.to_string();
let status_str = match status { FollowingStatus::Pending => "pending", FollowingStatus::Accepted => "accepted" };
let status_str = match status {
FollowingStatus::Pending => "pending",
FollowingStatus::Accepted => "accepted",
};
let result = sqlx::query(
"UPDATE ap_following SET status = $1 WHERE local_user_id = $2 AND remote_actor_url = $3",
).bind(status_str).bind(&uid).bind(remote_actor_url).execute(&self.pool).await?;
@@ -290,7 +385,11 @@ impl FollowRepository for PostgresFederationRepository {
Ok(())
}
async fn get_following_outbox_url(&self, local_user_id: uuid::Uuid, remote_actor_url: &str) -> Result<Option<String>> {
async fn get_following_outbox_url(
&self,
local_user_id: uuid::Uuid,
remote_actor_url: &str,
) -> Result<Option<String>> {
let uid = local_user_id.to_string();
let row: Option<Option<String>> = sqlx::query_scalar(
"SELECT a.outbox_url FROM ap_following f INNER JOIN ap_remote_actors a ON a.url = f.remote_actor_url
@@ -299,33 +398,53 @@ impl FollowRepository for PostgresFederationRepository {
Ok(row.flatten())
}
async fn migrate_follower_actor(&self, old_actor_url: &str, new_actor_url: &str) -> Result<Vec<uuid::Uuid>> {
async fn migrate_follower_actor(
&self,
old_actor_url: &str,
new_actor_url: &str,
) -> Result<Vec<uuid::Uuid>> {
let candidates: Vec<String> = sqlx::query_scalar(
"SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1
AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $2)",
).bind(old_actor_url).bind(new_actor_url).fetch_all(&self.pool).await?;
if candidates.is_empty() { return Ok(vec![]); }
if candidates.is_empty() {
return Ok(vec![]);
}
sqlx::query(
"UPDATE ap_following SET remote_actor_url = $1 WHERE remote_actor_url = $2
AND local_user_id NOT IN (SELECT local_user_id FROM ap_following WHERE remote_actor_url = $1)",
).bind(new_actor_url).bind(old_actor_url).execute(&self.pool).await?;
candidates.into_iter().map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e))).collect()
candidates
.into_iter()
.map(|s| uuid::Uuid::parse_str(&s).map_err(|e| anyhow::anyhow!(e)))
.collect()
}
}
#[async_trait]
impl ActorRepository for PostgresFederationRepository {
async fn get_local_actor_keypair(&self, user_id: uuid::Uuid) -> Result<Option<(String, String)>> {
async fn get_local_actor_keypair(
&self,
user_id: uuid::Uuid,
) -> Result<Option<(String, String)>> {
let uid = user_id.to_string();
let row = sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1")
.bind(&uid).fetch_optional(&self.pool).await?;
let row =
sqlx::query("SELECT public_key, private_key FROM ap_local_actors WHERE user_id = $1")
.bind(&uid)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| (r.get("public_key"), r.get("private_key"))))
}
async fn save_local_actor_keypair(&self, user_id: uuid::Uuid, public_key: String, private_key: String) -> Result<()> {
async fn save_local_actor_keypair(
&self,
user_id: uuid::Uuid,
public_key: String,
private_key: String,
) -> Result<()> {
let uid = user_id.to_string();
let now = Utc::now().naive_utc();
let created_at = datetime_to_str(&now);
@@ -360,14 +479,21 @@ impl ActorRepository for PostgresFederationRepository {
}
async fn get_remote_actor(&self, actor_url: &str) -> Result<Option<RemoteActor>> {
let q = format!(
"SELECT url, {PG_ACTOR_COLS} FROM ap_remote_actors a WHERE url = $1"
);
let row = sqlx::query(&q).bind(actor_url).fetch_optional(&self.pool).await?;
let q = format!("SELECT url, {PG_ACTOR_COLS} FROM ap_remote_actors a WHERE url = $1");
let row = sqlx::query(&q)
.bind(actor_url)
.fetch_optional(&self.pool)
.await?;
Ok(row.as_ref().map(|r| pg_remote_actor(r, "url")))
}
async fn add_announce(&self, activity_id: &str, object_url: &str, actor_url: &str, announced_at: chrono::DateTime<chrono::Utc>) -> Result<()> {
async fn add_announce(
&self,
activity_id: &str,
object_url: &str,
actor_url: &str,
announced_at: chrono::DateTime<chrono::Utc>,
) -> Result<()> {
let ts = announced_at.format("%Y-%m-%d %H:%M:%S").to_string();
sqlx::query("INSERT INTO ap_announces (id, object_url, actor_url, announced_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO NOTHING")
.bind(activity_id).bind(object_url).bind(actor_url).bind(&ts).execute(&self.pool).await?;
@@ -376,13 +502,18 @@ impl ActorRepository for PostgresFederationRepository {
async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> {
sqlx::query("DELETE FROM ap_announces WHERE id = $1 AND actor_url = $2")
.bind(activity_id).bind(actor_url).execute(&self.pool).await?;
.bind(activity_id)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn count_announces(&self, object_url: &str) -> Result<usize> {
let row = sqlx::query("SELECT COUNT(*) as cnt FROM ap_announces WHERE object_url = $1")
.bind(object_url).fetch_one(&self.pool).await?;
.bind(object_url)
.fetch_one(&self.pool)
.await?;
Ok(row.get::<i64, _>("cnt") as usize)
}
}
@@ -396,17 +527,33 @@ impl BlocklistRepository for PostgresFederationRepository {
Ok(())
}
async fn remove_blocked_domain(&self, domain: &str) -> Result<()> {
sqlx::query("DELETE FROM blocked_domains WHERE domain = $1").bind(domain).execute(&self.pool).await?;
sqlx::query("DELETE FROM blocked_domains WHERE domain = $1")
.bind(domain)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_blocked_domains(&self) -> Result<Vec<BlockedDomain>> {
let rows = sqlx::query("SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC")
.fetch_all(&self.pool).await?;
Ok(rows.iter().map(|r| BlockedDomain { domain: r.get("domain"), reason: r.get("reason"), blocked_at: r.get("blocked_at") }).collect())
let rows = sqlx::query(
"SELECT domain, reason, blocked_at FROM blocked_domains ORDER BY blocked_at DESC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.iter()
.map(|r| BlockedDomain {
domain: r.get("domain"),
reason: r.get("reason"),
blocked_at: r.get("blocked_at"),
})
.collect())
}
async fn is_domain_blocked(&self, domain: &str) -> Result<bool> {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1")
.bind(domain).fetch_one(&self.pool).await?;
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM blocked_domains WHERE domain = $1")
.bind(domain)
.fetch_one(&self.pool)
.await?;
Ok(count > 0)
}
async fn add_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
@@ -418,15 +565,23 @@ impl BlocklistRepository for PostgresFederationRepository {
}
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
let uid = local_user_id.to_string();
sqlx::query("DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2")
.bind(&uid).bind(actor_url).execute(&self.pool).await?;
sqlx::query(
"DELETE FROM blocked_actors WHERE local_user_id = $1 AND remote_actor_url = $2",
)
.bind(&uid)
.bind(actor_url)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>> {
let uid = local_user_id.to_string();
let rows = sqlx::query("SELECT remote_actor_url FROM blocked_actors WHERE local_user_id = $1 ORDER BY blocked_at DESC")
.bind(&uid).fetch_all(&self.pool).await?;
Ok(rows.iter().map(|r| r.get::<String, _>("remote_actor_url")).collect())
Ok(rows
.iter()
.map(|r| r.get::<String, _>("remote_actor_url"))
.collect())
}
async fn is_actor_blocked(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<bool> {
let uid = local_user_id.to_string();
@@ -440,13 +595,20 @@ impl BlocklistRepository for PostgresFederationRepository {
impl ActivityRepository for PostgresFederationRepository {
async fn is_activity_processed(&self, activity_id: &str) -> Result<bool> {
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ap_activities WHERE id = $1")
.bind(activity_id).fetch_one(&self.pool).await?;
.bind(activity_id)
.fetch_one(&self.pool)
.await?;
Ok(count > 0)
}
async fn mark_activity_processed(&self, activity_id: &str) -> Result<()> {
let ts = datetime_to_str(&Utc::now().naive_utc());
sqlx::query("INSERT INTO ap_activities (id, processed_at) VALUES ($1, $2) ON CONFLICT DO NOTHING")
.bind(activity_id).bind(&ts).execute(&self.pool).await?;
sqlx::query(
"INSERT INTO ap_activities (id, processed_at) VALUES ($1, $2) ON CONFLICT DO NOTHING",
)
.bind(activity_id)
.bind(&ts)
.execute(&self.pool)
.await?;
Ok(())
}
}