Compare commits
11 Commits
c30243f1c8
...
be4a37546c
| Author | SHA1 | Date | |
|---|---|---|---|
| be4a37546c | |||
| 7a2d8308d9 | |||
| 4a5d5df884 | |||
| 421cb463e3 | |||
| 925f4f8bf3 | |||
| e5c8380ba7 | |||
| 97bc918bbc | |||
| 805240aaf8 | |||
| cd6148eff9 | |||
| 6f1a0572df | |||
| 0841554dbe |
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -2015,8 +2015,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "k-ap"
|
name = "k-ap"
|
||||||
version = "0.1.9"
|
version = "0.1.10"
|
||||||
source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.9#432f39cbb4f8d74255a1f614a9bb7c8bbfe11cde"
|
source = "git+https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git?tag=v0.1.10#d80cfd0431205498161db8665fd884710866ca95"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"activitypub_federation",
|
"activitypub_federation",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@@ -4566,7 +4566,7 @@ version = "0.1.11"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
|
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.48.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||||
domain = { workspace = true }
|
domain = { workspace = true }
|
||||||
url = { workspace = true }
|
url = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|||||||
@@ -807,6 +807,28 @@ impl FederationFollowRequestPort for ApFederationAdapter {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| DomainError::ExternalService(e.to_string()))
|
.map_err(|e| DomainError::ExternalService(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn mark_follower_accepted(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
actor_url: &str,
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
self.inner
|
||||||
|
.mark_follower_accepted(user_id.as_uuid(), actor_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_follower_rejected(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
actor_url: &str,
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
self.inner
|
||||||
|
.mark_follower_rejected(user_id.as_uuid(), actor_url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FederationActionPort is a blanket supertrait; no explicit impl needed.
|
// FederationActionPort is a blanket supertrait; no explicit impl needed.
|
||||||
|
|||||||
@@ -71,6 +71,18 @@ pub enum EventPayload {
|
|||||||
ProfileUpdated {
|
ProfileUpdated {
|
||||||
user_id: String,
|
user_id: String,
|
||||||
},
|
},
|
||||||
|
RemoteFollowAccepted {
|
||||||
|
local_user_id: String,
|
||||||
|
remote_actor_url: String,
|
||||||
|
},
|
||||||
|
RemoteFollowRejected {
|
||||||
|
local_user_id: String,
|
||||||
|
remote_actor_url: String,
|
||||||
|
},
|
||||||
|
ActorMoved {
|
||||||
|
user_id: String,
|
||||||
|
new_actor_url: String,
|
||||||
|
},
|
||||||
MentionReceived {
|
MentionReceived {
|
||||||
thought_id: String,
|
thought_id: String,
|
||||||
mentioned_user_id: String,
|
mentioned_user_id: String,
|
||||||
@@ -97,6 +109,9 @@ impl EventPayload {
|
|||||||
Self::UserUnblocked { .. } => "users.unblocked",
|
Self::UserUnblocked { .. } => "users.unblocked",
|
||||||
Self::UserRegistered { .. } => "users.registered",
|
Self::UserRegistered { .. } => "users.registered",
|
||||||
Self::ProfileUpdated { .. } => "users.profile_updated",
|
Self::ProfileUpdated { .. } => "users.profile_updated",
|
||||||
|
Self::RemoteFollowAccepted { .. } => "federation.follow.accepted",
|
||||||
|
Self::RemoteFollowRejected { .. } => "federation.follow.rejected",
|
||||||
|
Self::ActorMoved { .. } => "federation.actor.moved",
|
||||||
Self::MentionReceived { .. } => "mentions.received",
|
Self::MentionReceived { .. } => "mentions.received",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -210,6 +225,27 @@ impl From<&DomainEvent> for EventPayload {
|
|||||||
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
|
DomainEvent::ProfileUpdated { user_id } => Self::ProfileUpdated {
|
||||||
user_id: user_id.to_string(),
|
user_id: user_id.to_string(),
|
||||||
},
|
},
|
||||||
|
DomainEvent::RemoteFollowAccepted {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => Self::RemoteFollowAccepted {
|
||||||
|
local_user_id: local_user_id.to_string(),
|
||||||
|
remote_actor_url: remote_actor_url.clone(),
|
||||||
|
},
|
||||||
|
DomainEvent::RemoteFollowRejected {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => Self::RemoteFollowRejected {
|
||||||
|
local_user_id: local_user_id.to_string(),
|
||||||
|
remote_actor_url: remote_actor_url.clone(),
|
||||||
|
},
|
||||||
|
DomainEvent::ActorMoved {
|
||||||
|
user_id,
|
||||||
|
new_actor_url,
|
||||||
|
} => Self::ActorMoved {
|
||||||
|
user_id: user_id.to_string(),
|
||||||
|
new_actor_url: new_actor_url.clone(),
|
||||||
|
},
|
||||||
DomainEvent::MentionReceived {
|
DomainEvent::MentionReceived {
|
||||||
thought_id,
|
thought_id,
|
||||||
mentioned_user_id,
|
mentioned_user_id,
|
||||||
@@ -340,6 +376,27 @@ impl TryFrom<EventPayload> for DomainEvent {
|
|||||||
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
|
EventPayload::ProfileUpdated { user_id } => DomainEvent::ProfileUpdated {
|
||||||
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
},
|
},
|
||||||
|
EventPayload::RemoteFollowAccepted {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => DomainEvent::RemoteFollowAccepted {
|
||||||
|
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
|
||||||
|
remote_actor_url,
|
||||||
|
},
|
||||||
|
EventPayload::RemoteFollowRejected {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => DomainEvent::RemoteFollowRejected {
|
||||||
|
local_user_id: UserId::from_uuid(parse_uuid(&local_user_id, "local_user_id")?),
|
||||||
|
remote_actor_url,
|
||||||
|
},
|
||||||
|
EventPayload::ActorMoved {
|
||||||
|
user_id,
|
||||||
|
new_actor_url,
|
||||||
|
} => DomainEvent::ActorMoved {
|
||||||
|
user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?),
|
||||||
|
new_actor_url,
|
||||||
|
},
|
||||||
EventPayload::MentionReceived {
|
EventPayload::MentionReceived {
|
||||||
thought_id,
|
thought_id,
|
||||||
mentioned_user_id,
|
mentioned_user_id,
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|||||||
@@ -32,6 +32,9 @@ fn aggregate_id(event: &DomainEvent) -> Uuid {
|
|||||||
DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(),
|
DomainEvent::UserUnblocked { blocker_id, .. } => blocker_id.as_uuid(),
|
||||||
DomainEvent::UserRegistered { user_id } => user_id.as_uuid(),
|
DomainEvent::UserRegistered { user_id } => user_id.as_uuid(),
|
||||||
DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(),
|
DomainEvent::ProfileUpdated { user_id } => user_id.as_uuid(),
|
||||||
|
DomainEvent::RemoteFollowAccepted { local_user_id, .. } => local_user_id.as_uuid(),
|
||||||
|
DomainEvent::RemoteFollowRejected { local_user_id, .. } => local_user_id.as_uuid(),
|
||||||
|
DomainEvent::ActorMoved { user_id, .. } => user_id.as_uuid(),
|
||||||
DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(),
|
DomainEvent::MentionReceived { thought_id, .. } => thought_id.as_uuid(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
use domain::{errors::DomainError, events::DomainEvent, ports::FederationActionPort};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub struct FederationManagementEventService {
|
||||||
|
pub federation: Arc<dyn FederationActionPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FederationManagementEventService {
|
||||||
|
pub async fn process(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
match event {
|
||||||
|
DomainEvent::RemoteFollowAccepted {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => {
|
||||||
|
tracing::info!(
|
||||||
|
local_user_id = %local_user_id,
|
||||||
|
actor = %remote_actor_url,
|
||||||
|
"federation-mgmt: accepting follow — sending Accept + backfill"
|
||||||
|
);
|
||||||
|
self.federation
|
||||||
|
.accept_follow_request(local_user_id, remote_actor_url)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
DomainEvent::RemoteFollowRejected {
|
||||||
|
local_user_id,
|
||||||
|
remote_actor_url,
|
||||||
|
} => {
|
||||||
|
tracing::info!(
|
||||||
|
local_user_id = %local_user_id,
|
||||||
|
actor = %remote_actor_url,
|
||||||
|
"federation-mgmt: rejecting follow — sending Reject"
|
||||||
|
);
|
||||||
|
self.federation
|
||||||
|
.reject_follow_request(local_user_id, remote_actor_url)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
DomainEvent::ActorMoved {
|
||||||
|
user_id,
|
||||||
|
new_actor_url,
|
||||||
|
} => {
|
||||||
|
tracing::info!(
|
||||||
|
user_id = %user_id,
|
||||||
|
target = %new_actor_url,
|
||||||
|
"federation-mgmt: broadcasting Move"
|
||||||
|
);
|
||||||
|
let url = url::Url::parse(new_actor_url)
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||||
|
self.federation
|
||||||
|
.broadcast_move(user_id, url)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
pub mod federation_event;
|
pub mod federation_event;
|
||||||
|
pub mod federation_management_event;
|
||||||
pub mod notification_event;
|
pub mod notification_event;
|
||||||
|
|
||||||
pub use federation_event::FederationEventService;
|
pub use federation_event::FederationEventService;
|
||||||
|
pub use federation_management_event::FederationManagementEventService;
|
||||||
pub use notification_event::NotificationEventService;
|
pub use notification_event::NotificationEventService;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use activitypub::ActivityPubRepository;
|
use activitypub::ActivityPubRepository;
|
||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
|
events::DomainEvent,
|
||||||
models::{
|
models::{
|
||||||
actor_connection_summary::ActorConnectionSummary,
|
actor_connection_summary::ActorConnectionSummary,
|
||||||
feed::{FeedEntry, PageParams, Paginated},
|
feed::{FeedEntry, PageParams, Paginated},
|
||||||
@@ -16,6 +17,20 @@ use domain::{
|
|||||||
|
|
||||||
use super::social;
|
use super::social;
|
||||||
|
|
||||||
|
pub async fn initiate_actor_move(
|
||||||
|
events: &dyn EventPublisher,
|
||||||
|
user_id: &UserId,
|
||||||
|
new_actor_url: url::Url,
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
events
|
||||||
|
.publish(&DomainEvent::ActorMoved {
|
||||||
|
user_id: user_id.clone(),
|
||||||
|
new_actor_url: new_actor_url.to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn list_pending_requests(
|
pub async fn list_pending_requests(
|
||||||
federation: &dyn FederationFollowRequestPort,
|
federation: &dyn FederationFollowRequestPort,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
@@ -25,18 +40,38 @@ pub async fn list_pending_requests(
|
|||||||
|
|
||||||
pub async fn accept_follow_request(
|
pub async fn accept_follow_request(
|
||||||
federation: &dyn FederationFollowRequestPort,
|
federation: &dyn FederationFollowRequestPort,
|
||||||
|
events: &dyn EventPublisher,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
actor_url: &str,
|
actor_url: &str,
|
||||||
) -> Result<(), DomainError> {
|
) -> Result<(), DomainError> {
|
||||||
federation.accept_follow_request(user_id, actor_url).await
|
federation
|
||||||
|
.mark_follower_accepted(user_id, actor_url)
|
||||||
|
.await?;
|
||||||
|
events
|
||||||
|
.publish(&DomainEvent::RemoteFollowAccepted {
|
||||||
|
local_user_id: user_id.clone(),
|
||||||
|
remote_actor_url: actor_url.to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn reject_follow_request(
|
pub async fn reject_follow_request(
|
||||||
federation: &dyn FederationFollowRequestPort,
|
federation: &dyn FederationFollowRequestPort,
|
||||||
|
events: &dyn EventPublisher,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
actor_url: &str,
|
actor_url: &str,
|
||||||
) -> Result<(), DomainError> {
|
) -> Result<(), DomainError> {
|
||||||
federation.reject_follow_request(user_id, actor_url).await
|
federation
|
||||||
|
.mark_follower_rejected(user_id, actor_url)
|
||||||
|
.await?;
|
||||||
|
events
|
||||||
|
.publish(&DomainEvent::RemoteFollowRejected {
|
||||||
|
local_user_id: user_id.clone(),
|
||||||
|
remote_actor_url: actor_url.to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| DomainError::Internal(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_remote_followers(
|
pub async fn list_remote_followers(
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ async fn list_pending_returns_empty_by_default() {
|
|||||||
async fn accept_follow_request_returns_ok() {
|
async fn accept_follow_request_returns_ok() {
|
||||||
let store = TestStore::default();
|
let store = TestStore::default();
|
||||||
let uid = UserId::new();
|
let uid = UserId::new();
|
||||||
accept_follow_request(&store, &uid, "https://mastodon.social/users/alice")
|
accept_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
@@ -22,7 +22,7 @@ async fn accept_follow_request_returns_ok() {
|
|||||||
async fn reject_follow_request_returns_ok() {
|
async fn reject_follow_request_returns_ok() {
|
||||||
let store = TestStore::default();
|
let store = TestStore::default();
|
||||||
let uid = UserId::new();
|
let uid = UserId::new();
|
||||||
reject_follow_request(&store, &uid, "https://mastodon.social/users/alice")
|
reject_follow_request(&store, &store, &uid, "https://mastodon.social/users/alice")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ postgres = { workspace = true }
|
|||||||
postgres-search = { workspace = true }
|
postgres-search = { workspace = true }
|
||||||
postgres-federation = { workspace = true }
|
postgres-federation = { workspace = true }
|
||||||
activitypub = { workspace = true }
|
activitypub = { workspace = true }
|
||||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||||
nats = { workspace = true }
|
nats = { workspace = true }
|
||||||
event-transport = { workspace = true }
|
event-transport = { workspace = true }
|
||||||
auth = { workspace = true }
|
auth = { workspace = true }
|
||||||
|
|||||||
@@ -63,6 +63,18 @@ pub enum DomainEvent {
|
|||||||
ProfileUpdated {
|
ProfileUpdated {
|
||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
},
|
},
|
||||||
|
RemoteFollowAccepted {
|
||||||
|
local_user_id: UserId,
|
||||||
|
remote_actor_url: String,
|
||||||
|
},
|
||||||
|
RemoteFollowRejected {
|
||||||
|
local_user_id: UserId,
|
||||||
|
remote_actor_url: String,
|
||||||
|
},
|
||||||
|
ActorMoved {
|
||||||
|
user_id: UserId,
|
||||||
|
new_actor_url: String,
|
||||||
|
},
|
||||||
MentionReceived {
|
MentionReceived {
|
||||||
thought_id: ThoughtId,
|
thought_id: ThoughtId,
|
||||||
mentioned_user_id: UserId,
|
mentioned_user_id: UserId,
|
||||||
|
|||||||
@@ -322,6 +322,20 @@ pub trait FederationFollowRequestPort: Send + Sync {
|
|||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
actor_url: &str,
|
actor_url: &str,
|
||||||
) -> Result<(), DomainError>;
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Update follower status to Accepted in DB only — no federation activity sent.
|
||||||
|
async fn mark_follower_accepted(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
actor_url: &str,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
|
|
||||||
|
/// Remove follower from DB only — no federation activity sent.
|
||||||
|
async fn mark_follower_rejected(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
actor_url: &str,
|
||||||
|
) -> Result<(), DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -763,6 +763,22 @@ impl FederationFollowRequestPort for TestStore {
|
|||||||
) -> Result<(), DomainError> {
|
) -> Result<(), DomainError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn mark_follower_accepted(
|
||||||
|
&self,
|
||||||
|
_user_id: &UserId,
|
||||||
|
_actor_url: &str,
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_follower_rejected(
|
||||||
|
&self,
|
||||||
|
_user_id: &UserId,
|
||||||
|
_actor_url: &str,
|
||||||
|
) -> Result<(), DomainError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use api_types::responses::{ProfileField, RemoteActorResponse};
|
use api_types::responses::{ProfileField, RemoteActorResponse};
|
||||||
use application::use_cases::federation_management::{
|
use application::use_cases::federation_management::{
|
||||||
accept_follow_request, list_pending_requests, list_remote_followers, list_remote_following,
|
accept_follow_request, initiate_actor_move, list_pending_requests, list_remote_followers,
|
||||||
reject_follow_request, remove_remote_following,
|
list_remote_following, reject_follow_request, remove_remote_following,
|
||||||
};
|
};
|
||||||
use axum::{http::StatusCode, Json};
|
use axum::{http::StatusCode, Json};
|
||||||
use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository};
|
use domain::ports::{EventPublisher, FederationActionPort, FollowRepository, UserRepository};
|
||||||
@@ -67,7 +67,7 @@ pub async fn post_accept_request(
|
|||||||
AuthUser(uid): AuthUser,
|
AuthUser(uid): AuthUser,
|
||||||
Json(body): Json<ActorUrlBody>,
|
Json(body): Json<ActorUrlBody>,
|
||||||
) -> Result<StatusCode, ApiError> {
|
) -> Result<StatusCode, ApiError> {
|
||||||
accept_follow_request(&*d.federation, &uid, &body.actor_url).await?;
|
accept_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
|
||||||
Ok(StatusCode::NO_CONTENT)
|
Ok(StatusCode::NO_CONTENT)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,7 +76,7 @@ pub async fn delete_follower(
|
|||||||
AuthUser(uid): AuthUser,
|
AuthUser(uid): AuthUser,
|
||||||
Json(body): Json<ActorUrlBody>,
|
Json(body): Json<ActorUrlBody>,
|
||||||
) -> Result<StatusCode, ApiError> {
|
) -> Result<StatusCode, ApiError> {
|
||||||
reject_follow_request(&*d.federation, &uid, &body.actor_url).await?;
|
reject_follow_request(&*d.federation, &*d.events, &uid, &body.actor_url).await?;
|
||||||
Ok(StatusCode::NO_CONTENT)
|
Ok(StatusCode::NO_CONTENT)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,10 +120,7 @@ pub async fn post_move_account(
|
|||||||
) -> Result<StatusCode, ApiError> {
|
) -> Result<StatusCode, ApiError> {
|
||||||
let new_url = url::Url::parse(&body.new_actor_url)
|
let new_url = url::Url::parse(&body.new_actor_url)
|
||||||
.map_err(|_| ApiError::BadRequest("invalid new_actor_url".into()))?;
|
.map_err(|_| ApiError::BadRequest("invalid new_actor_url".into()))?;
|
||||||
d.federation
|
initiate_actor_move(&*d.events, &uid, new_url).await?;
|
||||||
.broadcast_move(&uid, new_url)
|
|
||||||
.await
|
|
||||||
.map_err(ApiError::from)?;
|
|
||||||
Ok(StatusCode::NO_CONTENT)
|
Ok(StatusCode::NO_CONTENT)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ application = { workspace = true }
|
|||||||
nats = { workspace = true }
|
nats = { workspace = true }
|
||||||
event-transport = { workspace = true }
|
event-transport = { workspace = true }
|
||||||
event-payload = { workspace = true }
|
event-payload = { workspace = true }
|
||||||
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.9" }
|
k-ap = { git = "https://git.gabrielkaszewski.dev/GKaszewski/k-ap.git", tag = "v0.1.10" }
|
||||||
activitypub = { workspace = true }
|
activitypub = { workspace = true }
|
||||||
postgres = { workspace = true }
|
postgres = { workspace = true }
|
||||||
postgres-federation = { workspace = true }
|
postgres-federation = { workspace = true }
|
||||||
|
|||||||
@@ -5,17 +5,20 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use activitypub::{ActivityPubRepository, OutboundFederationPort};
|
use activitypub::{ActivityPubRepository, OutboundFederationPort};
|
||||||
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
|
use activitypub::{ApFederationAdapter, ThoughtsObjectHandler};
|
||||||
use application::services::{FederationEventService, NotificationEventService};
|
use application::services::{
|
||||||
|
FederationEventService, FederationManagementEventService, NotificationEventService,
|
||||||
|
};
|
||||||
use domain::ports::EventPublisher;
|
use domain::ports::EventPublisher;
|
||||||
use k_ap::ActivityPubService;
|
use k_ap::ActivityPubService;
|
||||||
use postgres::activitypub::PgActivityPubRepository;
|
use postgres::activitypub::PgActivityPubRepository;
|
||||||
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
use postgres_federation::{PostgresApUserRepository, PostgresFederationRepository};
|
||||||
|
|
||||||
use crate::handlers::{FederationHandler, NotificationHandler};
|
use crate::handlers::{FederationHandler, FederationManagementHandler, NotificationHandler};
|
||||||
|
|
||||||
pub struct WorkerHandlers {
|
pub struct WorkerHandlers {
|
||||||
pub notification: NotificationHandler,
|
pub notification: NotificationHandler,
|
||||||
pub federation: FederationHandler,
|
pub federation: FederationHandler,
|
||||||
|
pub federation_management: FederationManagementHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WorkerInfra {
|
pub struct WorkerInfra {
|
||||||
@@ -80,6 +83,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
|||||||
base_url: base_url.to_string(),
|
base_url: base_url.to_string(),
|
||||||
ap_repo: ap_repo_worker,
|
ap_repo: ap_repo_worker,
|
||||||
});
|
});
|
||||||
|
let federation_management_svc = Arc::new(FederationManagementEventService {
|
||||||
|
federation: ap_service.clone() as Arc<dyn domain::ports::FederationActionPort>,
|
||||||
|
});
|
||||||
|
|
||||||
// Thin handlers
|
// Thin handlers
|
||||||
let handlers = WorkerHandlers {
|
let handlers = WorkerHandlers {
|
||||||
@@ -89,6 +95,9 @@ pub async fn build(database_url: &str, base_url: &str, nats_url: &str) -> Worker
|
|||||||
federation: FederationHandler {
|
federation: FederationHandler {
|
||||||
service: federation_svc,
|
service: federation_svc,
|
||||||
},
|
},
|
||||||
|
federation_management: FederationManagementHandler {
|
||||||
|
service: federation_management_svc,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// DLQ store
|
// DLQ store
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
use application::services::{FederationEventService, NotificationEventService};
|
use application::services::{
|
||||||
|
FederationEventService, FederationManagementEventService, NotificationEventService,
|
||||||
|
};
|
||||||
use domain::{errors::DomainError, events::DomainEvent};
|
use domain::{errors::DomainError, events::DomainEvent};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -21,3 +23,13 @@ impl FederationHandler {
|
|||||||
self.service.process(event).await
|
self.service.process(event).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct FederationManagementHandler {
|
||||||
|
pub service: Arc<FederationManagementEventService>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FederationManagementHandler {
|
||||||
|
pub async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||||
|
self.service.process(event).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -52,8 +52,9 @@ async fn main() {
|
|||||||
|
|
||||||
let n = infra.handlers.notification.handle(event).await;
|
let n = infra.handlers.notification.handle(event).await;
|
||||||
let f = infra.handlers.federation.handle(event).await;
|
let f = infra.handlers.federation.handle(event).await;
|
||||||
|
let fm = infra.handlers.federation_management.handle(event).await;
|
||||||
|
|
||||||
if n.is_ok() && f.is_ok() {
|
if n.is_ok() && f.is_ok() && fm.is_ok() {
|
||||||
(envelope.ack)();
|
(envelope.ack)();
|
||||||
tracing::info!(event_type, "event handled ok");
|
tracing::info!(event_type, "event handled ok");
|
||||||
} else {
|
} else {
|
||||||
@@ -63,6 +64,9 @@ async fn main() {
|
|||||||
if let Err(e) = &f {
|
if let Err(e) = &f {
|
||||||
tracing::error!("federation handler: {e}");
|
tracing::error!("federation handler: {e}");
|
||||||
}
|
}
|
||||||
|
if let Err(e) = &fm {
|
||||||
|
tracing::error!("federation management handler: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
// Last delivery attempt -> move to DLQ then ack.
|
// Last delivery attempt -> move to DLQ then ack.
|
||||||
// Earlier attempts -> nack so NATS retries.
|
// Earlier attempts -> nack so NATS retries.
|
||||||
@@ -70,6 +74,7 @@ async fn main() {
|
|||||||
let error_msg = n
|
let error_msg = n
|
||||||
.err()
|
.err()
|
||||||
.or(f.err())
|
.or(f.err())
|
||||||
|
.or(fm.err())
|
||||||
.map(|e| e.to_string())
|
.map(|e| e.to_string())
|
||||||
.unwrap_or_else(|| "unknown error".into());
|
.unwrap_or_else(|| "unknown error".into());
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user