debug: add INFO logging to ensure_stream and remote_actor_posts_handler
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m42s
test / unit (pull_request) Failing after 10m52s
test / integration (pull_request) Failing after 17m20s
Some checks failed
lint / lint (push) Has been cancelled
test / unit (push) Has been cancelled
test / integration (push) Has been cancelled
lint / lint (pull_request) Failing after 9m42s
test / unit (pull_request) Failing after 10m52s
test / integration (pull_request) Failing after 17m20s
This commit is contained in:
@@ -39,12 +39,17 @@ pub async fn ensure_stream(client: &async_nats::Client) -> Result<(), DomainErro
|
||||
// Try to update first (covers the case where stream exists with stale subjects).
|
||||
// Falls back to create if the stream doesn't exist yet.
|
||||
match js.update_stream(stream_config()).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => js
|
||||
.get_or_create_stream(stream_config())
|
||||
Ok(_) => {
|
||||
tracing::info!(subjects = ?STREAM_SUBJECTS, "JetStream stream updated");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("JetStream stream update failed ({e}), falling back to get_or_create");
|
||||
js.get_or_create_stream(stream_config())
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}"))),
|
||||
.map_err(|e| DomainError::Internal(format!("JetStream stream setup failed: {e}")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,32 +16,51 @@ pub async fn remote_actor_posts_handler(
|
||||
Query(q): Query<PaginationQuery>,
|
||||
OptionalAuthUser(viewer): OptionalAuthUser,
|
||||
) -> Result<Json<serde_json::Value>, ApiError> {
|
||||
tracing::info!(%handle, "remote_actor_posts: looking up actor");
|
||||
let actor = s.federation.lookup_actor(&handle).await?;
|
||||
tracing::info!(actor_url = %actor.url, has_outbox = actor.outbox_url.is_some(), "remote_actor_posts: actor found");
|
||||
|
||||
let ap_url = url::Url::parse(&actor.url).map_err(|e| ApiError::BadRequest(e.to_string()))?;
|
||||
|
||||
// Get or create interned local UserId for this remote actor
|
||||
let author_id = match s.ap_repo.find_remote_actor_id(&ap_url).await? {
|
||||
Some(id) => id,
|
||||
None => s.ap_repo.intern_remote_actor(&ap_url).await?,
|
||||
Some(id) => {
|
||||
tracing::info!(?id, "remote_actor_posts: actor already interned");
|
||||
id
|
||||
}
|
||||
None => {
|
||||
tracing::info!("remote_actor_posts: interning actor");
|
||||
let id = s.ap_repo.intern_remote_actor(&ap_url).await?;
|
||||
tracing::info!(?id, "remote_actor_posts: actor interned");
|
||||
id
|
||||
}
|
||||
};
|
||||
|
||||
// Return cached posts from DB
|
||||
let page = PageParams {
|
||||
page: q.page(),
|
||||
per_page: q.per_page(),
|
||||
};
|
||||
let result = get_user_feed(&*s.feed, &author_id, page, viewer.as_ref()).await?;
|
||||
tracing::info!(
|
||||
post_count = result.items.len(),
|
||||
"remote_actor_posts: cached posts fetched"
|
||||
);
|
||||
|
||||
// Trigger background outbox fetch (fire and forget)
|
||||
if let Some(outbox_url) = &actor.outbox_url {
|
||||
let _ = s
|
||||
match &actor.outbox_url {
|
||||
Some(outbox_url) => {
|
||||
tracing::info!(%outbox_url, "remote_actor_posts: publishing FetchRemoteActorPosts");
|
||||
match s
|
||||
.events
|
||||
.publish(&DomainEvent::FetchRemoteActorPosts {
|
||||
actor_ap_url: actor.url.clone(),
|
||||
outbox_url: outbox_url.clone(),
|
||||
})
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
Ok(_) => tracing::info!("remote_actor_posts: event published"),
|
||||
Err(e) => tracing::warn!("remote_actor_posts: event publish failed: {e}"),
|
||||
}
|
||||
}
|
||||
None => tracing::warn!("remote_actor_posts: actor has no outbox_url, skipping fetch"),
|
||||
}
|
||||
|
||||
Ok(Json(serde_json::json!({
|
||||
|
||||
Reference in New Issue
Block a user