refactor: simplify function signatures and improve code readability
This commit is contained in:
@@ -97,9 +97,7 @@ fn build_note_json(
|
|||||||
note
|
note
|
||||||
}
|
}
|
||||||
|
|
||||||
fn thought_to_ap_visibility(
|
fn thought_to_ap_visibility(v: &domain::models::thought::Visibility) -> k_ap::ApVisibility {
|
||||||
v: &domain::models::thought::Visibility,
|
|
||||||
) -> k_ap::ApVisibility {
|
|
||||||
match v {
|
match v {
|
||||||
domain::models::thought::Visibility::Public => k_ap::ApVisibility::Public,
|
domain::models::thought::Visibility::Public => k_ap::ApVisibility::Public,
|
||||||
domain::models::thought::Visibility::Unlisted => k_ap::ApVisibility::Public,
|
domain::models::thought::Visibility::Unlisted => k_ap::ApVisibility::Public,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use sqlx::PgPool;
|
|||||||
|
|
||||||
use k_ap::{
|
use k_ap::{
|
||||||
ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain,
|
ActivityRepository, ActorRepository, ApActorType, ApUser, ApUserRepository, BlockedDomain,
|
||||||
BlocklistRepository, Follower, FollowerStatus, FollowingStatus, FollowRepository, RemoteActor,
|
BlocklistRepository, FollowRepository, Follower, FollowerStatus, FollowingStatus, RemoteActor,
|
||||||
};
|
};
|
||||||
|
|
||||||
// ── PostgresFederationRepository ─────────────────────────────────────────────
|
// ── PostgresFederationRepository ─────────────────────────────────────────────
|
||||||
@@ -518,13 +518,12 @@ impl FollowRepository for PostgresFederationRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize> {
|
async fn count_following(&self, local_user_id: uuid::Uuid) -> Result<usize> {
|
||||||
let n: i64 = sqlx::query_scalar(
|
let n: i64 =
|
||||||
"SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1",
|
sqlx::query_scalar("SELECT COUNT(*) FROM federation_following WHERE local_user_id=$1")
|
||||||
)
|
.bind(local_user_id)
|
||||||
.bind(local_user_id)
|
.fetch_one(&self.pool)
|
||||||
.fetch_one(&self.pool)
|
.await
|
||||||
.await
|
.map_err(|e| anyhow!(e))?;
|
||||||
.map_err(|e| anyhow!(e))?;
|
|
||||||
Ok(n as usize)
|
Ok(n as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -616,16 +615,14 @@ impl ActorRepository for PostgresFederationRepository {
|
|||||||
public_key: String,
|
public_key: String,
|
||||||
private_key: String,
|
private_key: String,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
sqlx::query(
|
sqlx::query("UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1")
|
||||||
"UPDATE users SET public_key=$2, private_key=$3, updated_at=NOW() WHERE id=$1",
|
.bind(user_id)
|
||||||
)
|
.bind(&public_key)
|
||||||
.bind(user_id)
|
.bind(&private_key)
|
||||||
.bind(&public_key)
|
.execute(&self.pool)
|
||||||
.bind(&private_key)
|
.await
|
||||||
.execute(&self.pool)
|
.map_err(|e| anyhow!(e))
|
||||||
.await
|
.map(|_| ())
|
||||||
.map_err(|e| anyhow!(e))
|
|
||||||
.map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> {
|
async fn upsert_remote_actor(&self, actor: RemoteActor) -> Result<()> {
|
||||||
@@ -704,25 +701,22 @@ impl ActorRepository for PostgresFederationRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> {
|
async fn remove_announce(&self, activity_id: &str, actor_url: &str) -> Result<()> {
|
||||||
sqlx::query(
|
sqlx::query("DELETE FROM federation_announces WHERE activity_id=$1 AND actor_url=$2")
|
||||||
"DELETE FROM federation_announces WHERE activity_id=$1 AND actor_url=$2",
|
.bind(activity_id)
|
||||||
)
|
.bind(actor_url)
|
||||||
.bind(activity_id)
|
.execute(&self.pool)
|
||||||
.bind(actor_url)
|
.await
|
||||||
.execute(&self.pool)
|
.map_err(|e| anyhow!(e))
|
||||||
.await
|
.map(|_| ())
|
||||||
.map_err(|e| anyhow!(e))
|
|
||||||
.map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count_announces(&self, object_url: &str) -> Result<usize> {
|
async fn count_announces(&self, object_url: &str) -> Result<usize> {
|
||||||
let n: i64 = sqlx::query_scalar(
|
let n: i64 =
|
||||||
"SELECT COUNT(*) FROM federation_announces WHERE object_url=$1",
|
sqlx::query_scalar("SELECT COUNT(*) FROM federation_announces WHERE object_url=$1")
|
||||||
)
|
.bind(object_url)
|
||||||
.bind(object_url)
|
.fetch_one(&self.pool)
|
||||||
.fetch_one(&self.pool)
|
.await
|
||||||
.await
|
.map_err(|e| anyhow!(e))?;
|
||||||
.map_err(|e| anyhow!(e))?;
|
|
||||||
Ok(n as usize)
|
Ok(n as usize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -777,13 +771,12 @@ impl BlocklistRepository for PostgresFederationRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn is_domain_blocked(&self, domain: &str) -> Result<bool> {
|
async fn is_domain_blocked(&self, domain: &str) -> Result<bool> {
|
||||||
let n: i64 = sqlx::query_scalar(
|
let n: i64 =
|
||||||
"SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1",
|
sqlx::query_scalar("SELECT COUNT(*) FROM federation_blocked_domains WHERE domain=$1")
|
||||||
)
|
.bind(domain)
|
||||||
.bind(domain)
|
.fetch_one(&self.pool)
|
||||||
.fetch_one(&self.pool)
|
.await
|
||||||
.await
|
.map_err(|e| anyhow!(e))?;
|
||||||
.map_err(|e| anyhow!(e))?;
|
|
||||||
Ok(n > 0)
|
Ok(n > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -799,20 +792,14 @@ impl BlocklistRepository for PostgresFederationRepository {
|
|||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_blocked_actor(
|
async fn remove_blocked_actor(&self, local_user_id: uuid::Uuid, actor_url: &str) -> Result<()> {
|
||||||
&self,
|
sqlx::query("DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2")
|
||||||
local_user_id: uuid::Uuid,
|
.bind(local_user_id)
|
||||||
actor_url: &str,
|
.bind(actor_url)
|
||||||
) -> Result<()> {
|
.execute(&self.pool)
|
||||||
sqlx::query(
|
.await
|
||||||
"DELETE FROM federation_blocked_actors WHERE local_user_id=$1 AND actor_url=$2",
|
.map_err(|e| anyhow!(e))
|
||||||
)
|
.map(|_| ())
|
||||||
.bind(local_user_id)
|
|
||||||
.bind(actor_url)
|
|
||||||
.execute(&self.pool)
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow!(e))
|
|
||||||
.map(|_| ())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>> {
|
async fn get_blocked_actors(&self, local_user_id: uuid::Uuid) -> Result<Vec<String>> {
|
||||||
@@ -860,8 +847,7 @@ impl PostgresApUserRepository {
|
|||||||
header_url: Option<String>,
|
header_url: Option<String>,
|
||||||
also_known_as: Option<String>,
|
also_known_as: Option<String>,
|
||||||
) -> ApUser {
|
) -> ApUser {
|
||||||
let profile_url =
|
let profile_url = url::Url::parse(&format!("{}/users/{}", self.base_url, username)).ok();
|
||||||
url::Url::parse(&format!("{}/users/{}", self.base_url, username)).ok();
|
|
||||||
let avatar_url = avatar_url.and_then(|u| url::Url::parse(&u).ok());
|
let avatar_url = avatar_url.and_then(|u| url::Url::parse(&u).ok());
|
||||||
let banner_url = header_url.and_then(|u| url::Url::parse(&u).ok());
|
let banner_url = header_url.and_then(|u| url::Url::parse(&u).ok());
|
||||||
ApUser {
|
ApUser {
|
||||||
|
|||||||
@@ -48,7 +48,11 @@ struct KapPublisher(NatsTransport);
|
|||||||
impl k_ap::data::EventPublisher for KapPublisher {
|
impl k_ap::data::EventPublisher for KapPublisher {
|
||||||
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
|
async fn publish(&self, event: FederationEvent) -> anyhow::Result<()> {
|
||||||
let (subject, payload) = match event {
|
let (subject, payload) = match event {
|
||||||
FederationEvent::DeliveryRequested { inbox, activity, signing_actor_id } => (
|
FederationEvent::DeliveryRequested {
|
||||||
|
inbox,
|
||||||
|
activity,
|
||||||
|
signing_actor_id,
|
||||||
|
} => (
|
||||||
"federation.delivery.requested",
|
"federation.delivery.requested",
|
||||||
serde_json::to_vec(&event_payload::EventPayload::FederationDeliveryRequested {
|
serde_json::to_vec(&event_payload::EventPayload::FederationDeliveryRequested {
|
||||||
inbox: inbox.to_string(),
|
inbox: inbox.to_string(),
|
||||||
@@ -56,7 +60,10 @@ impl k_ap::data::EventPublisher for KapPublisher {
|
|||||||
signing_actor_id: signing_actor_id.to_string(),
|
signing_actor_id: signing_actor_id.to_string(),
|
||||||
})?,
|
})?,
|
||||||
),
|
),
|
||||||
FederationEvent::BackfillRequested { owner_user_id, follower_inbox_url } => (
|
FederationEvent::BackfillRequested {
|
||||||
|
owner_user_id,
|
||||||
|
follower_inbox_url,
|
||||||
|
} => (
|
||||||
"federation.backfill.requested",
|
"federation.backfill.requested",
|
||||||
serde_json::to_vec(&event_payload::EventPayload::FederationBackfillRequested {
|
serde_json::to_vec(&event_payload::EventPayload::FederationBackfillRequested {
|
||||||
owner_user_id: owner_user_id.to_string(),
|
owner_user_id: owner_user_id.to_string(),
|
||||||
@@ -107,7 +114,9 @@ pub async fn build(cfg: &Config) -> Infrastructure {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let event_publisher: Arc<dyn EventPublisher> = match &nats_client {
|
let event_publisher: Arc<dyn EventPublisher> = match &nats_client {
|
||||||
Some(client) => Arc::new(EventPublisherAdapter::new(NatsTransport::new(client.clone()))),
|
Some(client) => Arc::new(EventPublisherAdapter::new(NatsTransport::new(
|
||||||
|
client.clone(),
|
||||||
|
))),
|
||||||
None => Arc::new(NoOpEventPublisher),
|
None => Arc::new(NoOpEventPublisher),
|
||||||
};
|
};
|
||||||
let kap_publisher: Option<Arc<dyn k_ap::data::EventPublisher>> = nats_client
|
let kap_publisher: Option<Arc<dyn k_ap::data::EventPublisher>> = nats_client
|
||||||
|
|||||||
@@ -133,10 +133,9 @@ async fn main() {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
if raw.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
if raw.delivery_count >= CONSUMER_MAX_DELIVER as u64 {
|
||||||
// Rebuild payload from raw bytes for DLQ storage.
|
// Rebuild payload from raw bytes for DLQ storage.
|
||||||
let payload_val = serde_json::from_slice::<serde_json::Value>(
|
let payload_val =
|
||||||
&raw.payload,
|
serde_json::from_slice::<serde_json::Value>(&raw.payload)
|
||||||
)
|
.unwrap_or(serde_json::Value::Null);
|
||||||
.unwrap_or(serde_json::Value::Null);
|
|
||||||
if let Err(dlq_err) = infra
|
if let Err(dlq_err) = infra
|
||||||
.dlq_store
|
.dlq_store
|
||||||
.insert(event_type, &payload_val, &e.to_string())
|
.insert(event_type, &payload_val, &e.to_string())
|
||||||
|
|||||||
Reference in New Issue
Block a user