feat: add server-sent events for logging and activity tracking

- Implemented a custom tracing layer (`AppLogLayer`) to capture log events and broadcast them to SSE clients.
- Created admin routes for streaming server logs and listing recent activity logs.
- Added an activity log repository interface and SQLite implementation for persisting activity events.
- Integrated activity logging into user authentication and channel CRUD operations.
- Developed frontend components for displaying server logs and activity logs in the admin panel.
- Enhanced the video player with a stats overlay for monitoring streaming metrics.
This commit is contained in:
2026-03-16 02:21:40 +01:00
parent 4df6522952
commit e805028d46
28 changed files with 893 additions and 8 deletions

View File

@@ -90,10 +90,12 @@ dependencies = [
"thiserror 2.0.17",
"time",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
"uuid",
]
@@ -3546,6 +3548,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -56,6 +56,8 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] }
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
tokio-stream = { version = "0.1", features = ["sync"] }
reqwest = { version = "0.12", features = ["json"] }
handlebars = "6"

View File

@@ -59,6 +59,32 @@ pub struct ConfigResponse {
pub provider_capabilities: domain::ProviderCapabilities,
}
// ============================================================================
// Admin DTOs
// ============================================================================
/// An activity log entry returned by GET /admin/activity.
#[derive(Debug, Serialize)]
pub struct ActivityEventResponse {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub event_type: String,
pub detail: String,
pub channel_id: Option<Uuid>,
}
impl From<domain::ActivityEvent> for ActivityEventResponse {
fn from(e: domain::ActivityEvent) -> Self {
Self {
id: e.id,
timestamp: e.timestamp,
event_type: e.event_type,
detail: e.detail,
channel_id: e.channel_id,
}
}
}
// ============================================================================
// Channel DTOs
// ============================================================================

View File

@@ -0,0 +1,72 @@
//! Custom tracing layer that captures log events and broadcasts them to SSE clients.
use chrono::Utc;
use serde::Serialize;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tracing::Event;
use tracing_subscriber::Layer;
/// A single structured log line sent to SSE clients.
#[derive(Debug, Clone, Serialize)]
pub struct LogLine {
pub level: String,
pub target: String,
pub message: String,
pub timestamp: String,
}
/// Tracing layer that fans log events out to a broadcast channel + ring buffer.
pub struct AppLogLayer {
tx: broadcast::Sender<LogLine>,
history: Arc<Mutex<VecDeque<LogLine>>>,
}
impl AppLogLayer {
pub fn new(
tx: broadcast::Sender<LogLine>,
history: Arc<Mutex<VecDeque<LogLine>>>,
) -> Self {
Self { tx, history }
}
}
impl<S: tracing::Subscriber> Layer<S> for AppLogLayer {
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let mut visitor = MsgVisitor(String::new());
event.record(&mut visitor);
let line = LogLine {
level: event.metadata().level().to_string(),
target: event.metadata().target().to_string(),
message: visitor.0,
timestamp: Utc::now().to_rfc3339(),
};
if let Ok(mut history) = self.history.lock() {
if history.len() >= 200 {
history.pop_front();
}
history.push_back(line.clone());
}
let _ = self.tx.send(line);
}
}
struct MsgVisitor(String);
impl tracing::field::Visit for MsgVisitor {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "message" {
self.0 = value.to_owned();
}
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.0 = format!("{value:?}");
}
}
}

View File

@@ -2,27 +2,30 @@
//!
//! Configures and starts the HTTP server with JWT-based authentication.
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration as StdDuration;
use axum::Router;
use axum::http::{HeaderName, HeaderValue};
use std::sync::Arc;
use tokio::sync::broadcast;
use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
use tracing::info;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService};
use infra::factory::{build_channel_repository, build_schedule_repository, build_user_repository};
use infra::factory::{build_activity_log_repository, build_channel_repository, build_schedule_repository, build_user_repository};
use infra::run_migrations;
use k_core::http::server::{ServerConfig, apply_standard_middleware};
use k_core::logging;
use tokio::net::TcpListener;
use tracing::info;
mod config;
mod dto;
mod error;
mod events;
mod extractors;
mod log_layer;
mod poller;
mod routes;
mod scheduler;
@@ -34,7 +37,16 @@ use crate::state::AppState;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init("api");
// Set up broadcast channel + ring buffer for SSE log streaming.
let (log_tx, _) = broadcast::channel::<log_layer::LogLine>(512);
let log_history = Arc::new(Mutex::new(VecDeque::<log_layer::LogLine>::new()));
// Initialize tracing with our custom layer in addition to the fmt layer.
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(fmt::layer())
.with(log_layer::AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history)))
.init();
let config = Config::from_env();
@@ -71,6 +83,7 @@ async fn main() -> anyhow::Result<()> {
let user_repo = build_user_repository(&db_pool).await?;
let channel_repo = build_channel_repository(&db_pool).await?;
let schedule_repo = build_schedule_repository(&db_pool).await?;
let activity_log_repo = build_activity_log_repository(&db_pool).await?;
let user_service = UserService::new(user_repo);
let channel_service = ChannelService::new(channel_repo.clone());
@@ -177,6 +190,9 @@ async fn main() -> anyhow::Result<()> {
registry,
config.clone(),
event_tx.clone(),
log_tx,
log_history,
activity_log_repo,
)
.await?;

View File

@@ -0,0 +1,95 @@
//! Admin routes: SSE log stream + activity log.
use axum::{
Json,
extract::State,
response::{
IntoResponse,
sse::{Event, KeepAlive, Sse},
},
};
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use crate::{
dto::ActivityEventResponse,
error::ApiError,
extractors::OptionalCurrentUser,
state::AppState,
};
use axum::Router;
use axum::routing::get;
pub fn router() -> Router<AppState> {
Router::new()
.route("/logs", get(stream_logs))
.route("/activity", get(list_activity))
}
/// Stream server log lines as Server-Sent Events.
///
/// Auth: requires a valid JWT passed as `?token=<jwt>` (EventSource cannot set headers).
/// On connect: flushes the recent history ring buffer, then streams live events.
pub async fn stream_logs(
State(state): State<AppState>,
OptionalCurrentUser(user): OptionalCurrentUser,
) -> Result<impl IntoResponse, ApiError> {
if user.is_none() {
return Err(ApiError::Unauthorized(
"Authentication required for log stream".to_string(),
));
}
// Snapshot history and subscribe before releasing the lock so we don't miss events.
let rx = state.log_tx.subscribe();
let history: Vec<_> = state
.log_history
.lock()
.map(|h| h.iter().cloned().collect())
.unwrap_or_default();
let history_stream = tokio_stream::iter(history).map(|line| {
let data = serde_json::to_string(&line).unwrap_or_default();
Ok::<Event, String>(Event::default().data(data))
});
let live_stream = BroadcastStream::new(rx).filter_map(|result| match result {
Ok(line) => {
let data = serde_json::to_string(&line).unwrap_or_default();
Some(Ok::<Event, String>(Event::default().data(data)))
}
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
let data = format!(
r#"{{"level":"WARN","target":"sse","message":"[{n} log lines dropped — buffer overrun]","timestamp":""}}"#
);
Some(Ok(Event::default().data(data)))
}
});
let combined = history_stream.chain(live_stream);
Ok(Sse::new(combined).keep_alive(KeepAlive::default()))
}
/// Return recent activity log entries.
///
/// Auth: requires a valid JWT (Authorization: Bearer or ?token=).
pub async fn list_activity(
State(state): State<AppState>,
OptionalCurrentUser(user): OptionalCurrentUser,
) -> Result<impl IntoResponse, ApiError> {
if user.is_none() {
return Err(ApiError::Unauthorized(
"Authentication required".to_string(),
));
}
let events = state
.activity_log_repo
.recent(50)
.await
.map_err(ApiError::from)?;
let response: Vec<ActivityEventResponse> = events.into_iter().map(Into::into).collect();
Ok(Json(response))
}

View File

@@ -35,6 +35,7 @@ pub(super) async fn login(
}
let token = create_jwt(&user, &state)?;
let _ = state.activity_log_repo.log("user_login", user.email.as_ref(), None).await;
Ok((
StatusCode::OK,

View File

@@ -69,6 +69,7 @@ pub(super) async fn create_channel(
}
let _ = state.event_tx.send(domain::DomainEvent::ChannelCreated { channel: channel.clone() });
let _ = state.activity_log_repo.log("channel_created", &channel.name, Some(channel.id)).await;
Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel))))
}
@@ -144,6 +145,7 @@ pub(super) async fn update_channel(
let channel = state.channel_service.update(channel).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelUpdated { channel: channel.clone() });
let _ = state.activity_log_repo.log("channel_updated", &channel.name, Some(channel.id)).await;
Ok(Json(ChannelResponse::from(channel)))
}
@@ -155,5 +157,6 @@ pub(super) async fn delete_channel(
// ChannelService::delete enforces ownership internally
state.channel_service.delete(channel_id, user.id).await?;
let _ = state.event_tx.send(domain::DomainEvent::ChannelDeleted { channel_id });
let _ = state.activity_log_repo.log("channel_deleted", &channel_id.to_string(), Some(channel_id)).await;
Ok(StatusCode::NO_CONTENT)
}

View File

@@ -37,6 +37,8 @@ pub(super) async fn generate_schedule(
channel_id,
schedule: schedule.clone(),
});
let detail = format!("{} slots", schedule.slots.len());
let _ = state.activity_log_repo.log("schedule_generated", &detail, Some(channel_id)).await;
Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule))))
}

View File

@@ -5,6 +5,7 @@
use crate::state::AppState;
use axum::Router;
pub mod admin;
pub mod auth;
pub mod channels;
pub mod config;
@@ -15,6 +16,7 @@ pub mod library;
/// Construct the API v1 router
pub fn api_v1_router() -> Router<AppState> {
Router::new()
.nest("/admin", admin::router())
.nest("/auth", auth::router())
.nest("/channels", channels::router())
.nest("/config", config::router())

View File

@@ -8,11 +8,14 @@ use axum_extra::extract::cookie::Key;
use infra::auth::jwt::{JwtConfig, JwtValidator};
#[cfg(feature = "auth-oidc")]
use infra::auth::oidc::OidcService;
use std::sync::Arc;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use crate::config::Config;
use crate::events::EventBus;
use domain::{ChannelService, ScheduleEngineService, UserService};
use crate::log_layer::LogLine;
use domain::{ActivityLogRepository, ChannelService, ScheduleEngineService, UserService};
#[derive(Clone)]
pub struct AppState {
@@ -27,6 +30,12 @@ pub struct AppState {
pub jwt_validator: Option<Arc<JwtValidator>>,
pub config: Arc<Config>,
pub event_tx: EventBus,
/// Broadcast channel for streaming log lines to SSE clients.
pub log_tx: broadcast::Sender<LogLine>,
/// Ring buffer of recent log lines sent to new SSE clients on connect.
pub log_history: Arc<Mutex<VecDeque<LogLine>>>,
/// Repository for persisted in-app activity events.
pub activity_log_repo: Arc<dyn ActivityLogRepository>,
/// Index for the local-files provider, used by the rescan route.
#[cfg(feature = "local-files")]
pub local_index: Option<Arc<infra::LocalIndex>>,
@@ -46,6 +55,9 @@ impl AppState {
provider_registry: Arc<infra::ProviderRegistry>,
config: Config,
event_tx: EventBus,
log_tx: broadcast::Sender<LogLine>,
log_history: Arc<Mutex<VecDeque<LogLine>>>,
activity_log_repo: Arc<dyn ActivityLogRepository>,
) -> anyhow::Result<Self> {
let cookie_key = Key::derive_from(config.cookie_secret.as_bytes());
@@ -118,6 +130,9 @@ impl AppState {
jwt_validator,
config: Arc::new(config),
event_tx,
log_tx,
log_history,
activity_log_repo,
#[cfg(feature = "local-files")]
local_index: None,
#[cfg(feature = "local-files")]

View File

@@ -12,6 +12,16 @@ use crate::entities::{Channel, GeneratedSchedule, PlaybackRecord, User};
use crate::errors::DomainResult;
use crate::value_objects::{ChannelId, UserId};
/// An in-app activity event stored in the database for the admin log view.
#[derive(Debug, Clone)]
pub struct ActivityEvent {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub event_type: String,
pub detail: String,
pub channel_id: Option<Uuid>,
}
/// Repository port for User persistence
#[async_trait]
pub trait UserRepository: Send + Sync {
@@ -71,3 +81,15 @@ pub trait ScheduleRepository: Send + Sync {
async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()>;
}
/// Repository port for activity log persistence.
#[async_trait]
pub trait ActivityLogRepository: Send + Sync {
async fn log(
&self,
event_type: &str,
detail: &str,
channel_id: Option<Uuid>,
) -> DomainResult<()>;
async fn recent(&self, limit: u32) -> DomainResult<Vec<ActivityEvent>>;
}

View File

@@ -0,0 +1,5 @@
#[cfg(feature = "sqlite")]
mod sqlite;
#[cfg(feature = "sqlite")]
pub use sqlite::SqliteActivityLogRepository;

View File

@@ -0,0 +1,71 @@
use async_trait::async_trait;
use chrono::Utc;
use uuid::Uuid;
use domain::{ActivityEvent, ActivityLogRepository, DomainError, DomainResult};
pub struct SqliteActivityLogRepository {
pool: sqlx::SqlitePool,
}
impl SqliteActivityLogRepository {
pub fn new(pool: sqlx::SqlitePool) -> Self {
Self { pool }
}
}
#[async_trait]
impl ActivityLogRepository for SqliteActivityLogRepository {
async fn log(
&self,
event_type: &str,
detail: &str,
channel_id: Option<Uuid>,
) -> DomainResult<()> {
let id = Uuid::new_v4().to_string();
let timestamp = Utc::now().to_rfc3339();
let channel_id_str = channel_id.map(|id| id.to_string());
sqlx::query(
"INSERT INTO activity_log (id, timestamp, event_type, detail, channel_id) VALUES (?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&timestamp)
.bind(event_type)
.bind(detail)
.bind(&channel_id_str)
.execute(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
Ok(())
}
async fn recent(&self, limit: u32) -> DomainResult<Vec<ActivityEvent>> {
let rows: Vec<(String, String, String, String, Option<String>)> = sqlx::query_as(
"SELECT id, timestamp, event_type, detail, channel_id FROM activity_log ORDER BY timestamp DESC LIMIT ?",
)
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
let events = rows
.into_iter()
.filter_map(|(id, timestamp, event_type, detail, channel_id)| {
let id = Uuid::parse_str(&id).ok()?;
let timestamp = timestamp.parse().ok()?;
let channel_id = channel_id.and_then(|s| Uuid::parse_str(&s).ok());
Some(ActivityEvent {
id,
timestamp,
event_type,
detail,
channel_id,
})
})
.collect();
Ok(events)
}
}

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use crate::db::DatabasePool;
use domain::{ChannelRepository, ScheduleRepository, UserRepository};
use domain::{ActivityLogRepository, ChannelRepository, ScheduleRepository, UserRepository};
#[derive(Debug, thiserror::Error)]
pub enum FactoryError {
@@ -51,6 +51,25 @@ pub async fn build_channel_repository(
}
}
pub async fn build_activity_log_repository(
pool: &DatabasePool,
) -> FactoryResult<Arc<dyn ActivityLogRepository>> {
match pool {
#[cfg(feature = "sqlite")]
DatabasePool::Sqlite(pool) => Ok(Arc::new(
crate::activity_log_repository::SqliteActivityLogRepository::new(pool.clone()),
)),
#[cfg(feature = "postgres")]
DatabasePool::Postgres(_pool) => Err(FactoryError::NotImplemented(
"ActivityLogRepository not yet implemented for Postgres".to_string(),
)),
#[allow(unreachable_patterns)]
_ => Err(FactoryError::NotImplemented(
"No database feature enabled".to_string(),
)),
}
}
pub async fn build_schedule_repository(
pool: &DatabasePool,
) -> FactoryResult<Arc<dyn ScheduleRepository>> {

View File

@@ -18,6 +18,7 @@ pub mod db;
pub mod factory;
pub mod jellyfin;
pub mod provider_registry;
mod activity_log_repository;
mod channel_repository;
mod schedule_repository;
mod user_repository;
@@ -29,6 +30,8 @@ pub mod local_files;
pub use db::run_migrations;
pub use provider_registry::ProviderRegistry;
#[cfg(feature = "sqlite")]
pub use activity_log_repository::SqliteActivityLogRepository;
#[cfg(feature = "sqlite")]
pub use user_repository::SqliteUserRepository;
#[cfg(feature = "sqlite")]

View File

@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS activity_log (
id TEXT PRIMARY KEY NOT NULL,
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
detail TEXT NOT NULL,
channel_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_activity_log_timestamp ON activity_log(timestamp DESC);