From e805028d46d768987810548423569e00f77d0ce3 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Mon, 16 Mar 2026 02:21:40 +0100 Subject: [PATCH] feat: add server-sent events for logging and activity tracking - Implemented a custom tracing layer (`AppLogLayer`) to capture log events and broadcast them to SSE clients. - Created admin routes for streaming server logs and listing recent activity logs. - Added an activity log repository interface and SQLite implementation for persisting activity events. - Integrated activity logging into user authentication and channel CRUD operations. - Developed frontend components for displaying server logs and activity logs in the admin panel. - Enhanced the video player with a stats overlay for monitoring streaming metrics. --- .gitignore | 1 + k-tv-backend/Cargo.lock | 3 + k-tv-backend/api/Cargo.toml | 2 + k-tv-backend/api/src/dto.rs | 26 +++ k-tv-backend/api/src/log_layer.rs | 72 +++++++++ k-tv-backend/api/src/main.rs | 26 ++- k-tv-backend/api/src/routes/admin.rs | 95 +++++++++++ k-tv-backend/api/src/routes/auth/local.rs | 1 + k-tv-backend/api/src/routes/channels/crud.rs | 3 + .../api/src/routes/channels/schedule.rs | 2 + k-tv-backend/api/src/routes/mod.rs | 2 + k-tv-backend/api/src/state.rs | 19 ++- k-tv-backend/domain/src/repositories.rs | 22 +++ .../infra/src/activity_log_repository/mod.rs | 5 + .../src/activity_log_repository/sqlite.rs | 71 +++++++++ k-tv-backend/infra/src/factory.rs | 21 ++- k-tv-backend/infra/src/lib.rs | 3 + .../20260317000000_add_activity_log.sql | 9 ++ .../admin/components/activity-log-panel.tsx | 73 +++++++++ .../admin/components/server-logs-panel.tsx | 129 +++++++++++++++ k-tv-frontend/app/(main)/admin/page.tsx | 58 +++++++ k-tv-frontend/app/(main)/layout.tsx | 1 + .../app/(main)/tv/components/stats-panel.tsx | 150 ++++++++++++++++++ .../app/(main)/tv/components/video-player.tsx | 18 +++ k-tv-frontend/app/(main)/tv/page.tsx | 16 ++ k-tv-frontend/hooks/use-admin.ts | 52 ++++++ k-tv-frontend/lib/api.ts | 6 + k-tv-frontend/lib/types.ts | 15 ++ 28 files changed, 893 insertions(+), 8 deletions(-) create mode 100644 k-tv-backend/api/src/log_layer.rs create mode 100644 k-tv-backend/api/src/routes/admin.rs create mode 100644 k-tv-backend/infra/src/activity_log_repository/mod.rs create mode 100644 k-tv-backend/infra/src/activity_log_repository/sqlite.rs create mode 100644 k-tv-backend/migrations_sqlite/20260317000000_add_activity_log.sql create mode 100644 k-tv-frontend/app/(main)/admin/components/activity-log-panel.tsx create mode 100644 k-tv-frontend/app/(main)/admin/components/server-logs-panel.tsx create mode 100644 k-tv-frontend/app/(main)/admin/page.tsx create mode 100644 k-tv-frontend/app/(main)/tv/components/stats-panel.tsx create mode 100644 k-tv-frontend/hooks/use-admin.ts diff --git a/.gitignore b/.gitignore index b259350..0cf9c18 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ transcode/ .worktrees/ +.superpowers/ \ No newline at end of file diff --git a/k-tv-backend/Cargo.lock b/k-tv-backend/Cargo.lock index 767eb70..1957419 100644 --- a/k-tv-backend/Cargo.lock +++ b/k-tv-backend/Cargo.lock @@ -90,10 +90,12 @@ dependencies = [ "thiserror 2.0.17", "time", "tokio", + "tokio-stream", "tokio-util", "tower", "tower-http", "tracing", + "tracing-subscriber", "uuid", ] @@ -3546,6 +3548,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/k-tv-backend/api/Cargo.toml b/k-tv-backend/api/Cargo.toml index e629ace..ce26728 100644 --- a/k-tv-backend/api/Cargo.toml +++ b/k-tv-backend/api/Cargo.toml @@ -56,6 +56,8 @@ uuid = { version = "1.19.0", features = ["v4", "serde"] } # Logging tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +tokio-stream = { version = "0.1", features = ["sync"] } reqwest = { version = "0.12", features = ["json"] } handlebars = "6" diff --git a/k-tv-backend/api/src/dto.rs b/k-tv-backend/api/src/dto.rs index 3e2ab62..a15e1a5 100644 --- a/k-tv-backend/api/src/dto.rs +++ b/k-tv-backend/api/src/dto.rs @@ -59,6 +59,32 @@ pub struct ConfigResponse { pub provider_capabilities: domain::ProviderCapabilities, } +// ============================================================================ +// Admin DTOs +// ============================================================================ + +/// An activity log entry returned by GET /admin/activity. +#[derive(Debug, Serialize)] +pub struct ActivityEventResponse { + pub id: Uuid, + pub timestamp: DateTime, + pub event_type: String, + pub detail: String, + pub channel_id: Option, +} + +impl From for ActivityEventResponse { + fn from(e: domain::ActivityEvent) -> Self { + Self { + id: e.id, + timestamp: e.timestamp, + event_type: e.event_type, + detail: e.detail, + channel_id: e.channel_id, + } + } +} + // ============================================================================ // Channel DTOs // ============================================================================ diff --git a/k-tv-backend/api/src/log_layer.rs b/k-tv-backend/api/src/log_layer.rs new file mode 100644 index 0000000..0fc5575 --- /dev/null +++ b/k-tv-backend/api/src/log_layer.rs @@ -0,0 +1,72 @@ +//! Custom tracing layer that captures log events and broadcasts them to SSE clients. + +use chrono::Utc; +use serde::Serialize; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; +use tracing::Event; +use tracing_subscriber::Layer; + +/// A single structured log line sent to SSE clients. +#[derive(Debug, Clone, Serialize)] +pub struct LogLine { + pub level: String, + pub target: String, + pub message: String, + pub timestamp: String, +} + +/// Tracing layer that fans log events out to a broadcast channel + ring buffer. +pub struct AppLogLayer { + tx: broadcast::Sender, + history: Arc>>, +} + +impl AppLogLayer { + pub fn new( + tx: broadcast::Sender, + history: Arc>>, + ) -> Self { + Self { tx, history } + } +} + +impl Layer for AppLogLayer { + fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let mut visitor = MsgVisitor(String::new()); + event.record(&mut visitor); + + let line = LogLine { + level: event.metadata().level().to_string(), + target: event.metadata().target().to_string(), + message: visitor.0, + timestamp: Utc::now().to_rfc3339(), + }; + + if let Ok(mut history) = self.history.lock() { + if history.len() >= 200 { + history.pop_front(); + } + history.push_back(line.clone()); + } + + let _ = self.tx.send(line); + } +} + +struct MsgVisitor(String); + +impl tracing::field::Visit for MsgVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.0 = value.to_owned(); + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.0 = format!("{value:?}"); + } + } +} diff --git a/k-tv-backend/api/src/main.rs b/k-tv-backend/api/src/main.rs index 4848fad..1e57779 100644 --- a/k-tv-backend/api/src/main.rs +++ b/k-tv-backend/api/src/main.rs @@ -2,27 +2,30 @@ //! //! Configures and starts the HTTP server with JWT-based authentication. +use std::collections::VecDeque; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use std::time::Duration as StdDuration; use axum::Router; use axum::http::{HeaderName, HeaderValue}; -use std::sync::Arc; +use tokio::sync::broadcast; use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer}; +use tracing::info; +use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; use domain::{ChannelService, IMediaProvider, IProviderRegistry, ProviderCapabilities, ScheduleEngineService, StreamingProtocol, UserService}; -use infra::factory::{build_channel_repository, build_schedule_repository, build_user_repository}; +use infra::factory::{build_activity_log_repository, build_channel_repository, build_schedule_repository, build_user_repository}; use infra::run_migrations; use k_core::http::server::{ServerConfig, apply_standard_middleware}; -use k_core::logging; use tokio::net::TcpListener; -use tracing::info; mod config; mod dto; mod error; mod events; mod extractors; +mod log_layer; mod poller; mod routes; mod scheduler; @@ -34,7 +37,16 @@ use crate::state::AppState; #[tokio::main] async fn main() -> anyhow::Result<()> { - logging::init("api"); + // Set up broadcast channel + ring buffer for SSE log streaming. + let (log_tx, _) = broadcast::channel::(512); + let log_history = Arc::new(Mutex::new(VecDeque::::new())); + + // Initialize tracing with our custom layer in addition to the fmt layer. + tracing_subscriber::registry() + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) + .with(fmt::layer()) + .with(log_layer::AppLogLayer::new(log_tx.clone(), Arc::clone(&log_history))) + .init(); let config = Config::from_env(); @@ -71,6 +83,7 @@ async fn main() -> anyhow::Result<()> { let user_repo = build_user_repository(&db_pool).await?; let channel_repo = build_channel_repository(&db_pool).await?; let schedule_repo = build_schedule_repository(&db_pool).await?; + let activity_log_repo = build_activity_log_repository(&db_pool).await?; let user_service = UserService::new(user_repo); let channel_service = ChannelService::new(channel_repo.clone()); @@ -177,6 +190,9 @@ async fn main() -> anyhow::Result<()> { registry, config.clone(), event_tx.clone(), + log_tx, + log_history, + activity_log_repo, ) .await?; diff --git a/k-tv-backend/api/src/routes/admin.rs b/k-tv-backend/api/src/routes/admin.rs new file mode 100644 index 0000000..759f8f1 --- /dev/null +++ b/k-tv-backend/api/src/routes/admin.rs @@ -0,0 +1,95 @@ +//! Admin routes: SSE log stream + activity log. + +use axum::{ + Json, + extract::State, + response::{ + IntoResponse, + sse::{Event, KeepAlive, Sse}, + }, +}; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; + +use crate::{ + dto::ActivityEventResponse, + error::ApiError, + extractors::OptionalCurrentUser, + state::AppState, +}; + +use axum::Router; +use axum::routing::get; + +pub fn router() -> Router { + Router::new() + .route("/logs", get(stream_logs)) + .route("/activity", get(list_activity)) +} + +/// Stream server log lines as Server-Sent Events. +/// +/// Auth: requires a valid JWT passed as `?token=` (EventSource cannot set headers). +/// On connect: flushes the recent history ring buffer, then streams live events. +pub async fn stream_logs( + State(state): State, + OptionalCurrentUser(user): OptionalCurrentUser, +) -> Result { + if user.is_none() { + return Err(ApiError::Unauthorized( + "Authentication required for log stream".to_string(), + )); + } + + // Snapshot history and subscribe before releasing the lock so we don't miss events. + let rx = state.log_tx.subscribe(); + let history: Vec<_> = state + .log_history + .lock() + .map(|h| h.iter().cloned().collect()) + .unwrap_or_default(); + + let history_stream = tokio_stream::iter(history).map(|line| { + let data = serde_json::to_string(&line).unwrap_or_default(); + Ok::(Event::default().data(data)) + }); + + let live_stream = BroadcastStream::new(rx).filter_map(|result| match result { + Ok(line) => { + let data = serde_json::to_string(&line).unwrap_or_default(); + Some(Ok::(Event::default().data(data))) + } + Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => { + let data = format!( + r#"{{"level":"WARN","target":"sse","message":"[{n} log lines dropped — buffer overrun]","timestamp":""}}"# + ); + Some(Ok(Event::default().data(data))) + } + }); + + let combined = history_stream.chain(live_stream); + + Ok(Sse::new(combined).keep_alive(KeepAlive::default())) +} + +/// Return recent activity log entries. +/// +/// Auth: requires a valid JWT (Authorization: Bearer or ?token=). +pub async fn list_activity( + State(state): State, + OptionalCurrentUser(user): OptionalCurrentUser, +) -> Result { + if user.is_none() { + return Err(ApiError::Unauthorized( + "Authentication required".to_string(), + )); + } + + let events = state + .activity_log_repo + .recent(50) + .await + .map_err(ApiError::from)?; + + let response: Vec = events.into_iter().map(Into::into).collect(); + Ok(Json(response)) +} diff --git a/k-tv-backend/api/src/routes/auth/local.rs b/k-tv-backend/api/src/routes/auth/local.rs index 03e601f..501b5d5 100644 --- a/k-tv-backend/api/src/routes/auth/local.rs +++ b/k-tv-backend/api/src/routes/auth/local.rs @@ -35,6 +35,7 @@ pub(super) async fn login( } let token = create_jwt(&user, &state)?; + let _ = state.activity_log_repo.log("user_login", user.email.as_ref(), None).await; Ok(( StatusCode::OK, diff --git a/k-tv-backend/api/src/routes/channels/crud.rs b/k-tv-backend/api/src/routes/channels/crud.rs index c9d8e47..5201d4a 100644 --- a/k-tv-backend/api/src/routes/channels/crud.rs +++ b/k-tv-backend/api/src/routes/channels/crud.rs @@ -69,6 +69,7 @@ pub(super) async fn create_channel( } let _ = state.event_tx.send(domain::DomainEvent::ChannelCreated { channel: channel.clone() }); + let _ = state.activity_log_repo.log("channel_created", &channel.name, Some(channel.id)).await; Ok((StatusCode::CREATED, Json(ChannelResponse::from(channel)))) } @@ -144,6 +145,7 @@ pub(super) async fn update_channel( let channel = state.channel_service.update(channel).await?; let _ = state.event_tx.send(domain::DomainEvent::ChannelUpdated { channel: channel.clone() }); + let _ = state.activity_log_repo.log("channel_updated", &channel.name, Some(channel.id)).await; Ok(Json(ChannelResponse::from(channel))) } @@ -155,5 +157,6 @@ pub(super) async fn delete_channel( // ChannelService::delete enforces ownership internally state.channel_service.delete(channel_id, user.id).await?; let _ = state.event_tx.send(domain::DomainEvent::ChannelDeleted { channel_id }); + let _ = state.activity_log_repo.log("channel_deleted", &channel_id.to_string(), Some(channel_id)).await; Ok(StatusCode::NO_CONTENT) } diff --git a/k-tv-backend/api/src/routes/channels/schedule.rs b/k-tv-backend/api/src/routes/channels/schedule.rs index d915d62..f465403 100644 --- a/k-tv-backend/api/src/routes/channels/schedule.rs +++ b/k-tv-backend/api/src/routes/channels/schedule.rs @@ -37,6 +37,8 @@ pub(super) async fn generate_schedule( channel_id, schedule: schedule.clone(), }); + let detail = format!("{} slots", schedule.slots.len()); + let _ = state.activity_log_repo.log("schedule_generated", &detail, Some(channel_id)).await; Ok((StatusCode::CREATED, Json(ScheduleResponse::from(schedule)))) } diff --git a/k-tv-backend/api/src/routes/mod.rs b/k-tv-backend/api/src/routes/mod.rs index 0a6de3d..a766434 100644 --- a/k-tv-backend/api/src/routes/mod.rs +++ b/k-tv-backend/api/src/routes/mod.rs @@ -5,6 +5,7 @@ use crate::state::AppState; use axum::Router; +pub mod admin; pub mod auth; pub mod channels; pub mod config; @@ -15,6 +16,7 @@ pub mod library; /// Construct the API v1 router pub fn api_v1_router() -> Router { Router::new() + .nest("/admin", admin::router()) .nest("/auth", auth::router()) .nest("/channels", channels::router()) .nest("/config", config::router()) diff --git a/k-tv-backend/api/src/state.rs b/k-tv-backend/api/src/state.rs index 911d3e6..76296c5 100644 --- a/k-tv-backend/api/src/state.rs +++ b/k-tv-backend/api/src/state.rs @@ -8,11 +8,14 @@ use axum_extra::extract::cookie::Key; use infra::auth::jwt::{JwtConfig, JwtValidator}; #[cfg(feature = "auth-oidc")] use infra::auth::oidc::OidcService; -use std::sync::Arc; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; use crate::config::Config; use crate::events::EventBus; -use domain::{ChannelService, ScheduleEngineService, UserService}; +use crate::log_layer::LogLine; +use domain::{ActivityLogRepository, ChannelService, ScheduleEngineService, UserService}; #[derive(Clone)] pub struct AppState { @@ -27,6 +30,12 @@ pub struct AppState { pub jwt_validator: Option>, pub config: Arc, pub event_tx: EventBus, + /// Broadcast channel for streaming log lines to SSE clients. + pub log_tx: broadcast::Sender, + /// Ring buffer of recent log lines sent to new SSE clients on connect. + pub log_history: Arc>>, + /// Repository for persisted in-app activity events. + pub activity_log_repo: Arc, /// Index for the local-files provider, used by the rescan route. #[cfg(feature = "local-files")] pub local_index: Option>, @@ -46,6 +55,9 @@ impl AppState { provider_registry: Arc, config: Config, event_tx: EventBus, + log_tx: broadcast::Sender, + log_history: Arc>>, + activity_log_repo: Arc, ) -> anyhow::Result { let cookie_key = Key::derive_from(config.cookie_secret.as_bytes()); @@ -118,6 +130,9 @@ impl AppState { jwt_validator, config: Arc::new(config), event_tx, + log_tx, + log_history, + activity_log_repo, #[cfg(feature = "local-files")] local_index: None, #[cfg(feature = "local-files")] diff --git a/k-tv-backend/domain/src/repositories.rs b/k-tv-backend/domain/src/repositories.rs index d2dd07b..470629a 100644 --- a/k-tv-backend/domain/src/repositories.rs +++ b/k-tv-backend/domain/src/repositories.rs @@ -12,6 +12,16 @@ use crate::entities::{Channel, GeneratedSchedule, PlaybackRecord, User}; use crate::errors::DomainResult; use crate::value_objects::{ChannelId, UserId}; +/// An in-app activity event stored in the database for the admin log view. +#[derive(Debug, Clone)] +pub struct ActivityEvent { + pub id: Uuid, + pub timestamp: DateTime, + pub event_type: String, + pub detail: String, + pub channel_id: Option, +} + /// Repository port for User persistence #[async_trait] pub trait UserRepository: Send + Sync { @@ -71,3 +81,15 @@ pub trait ScheduleRepository: Send + Sync { async fn save_playback_record(&self, record: &PlaybackRecord) -> DomainResult<()>; } + +/// Repository port for activity log persistence. +#[async_trait] +pub trait ActivityLogRepository: Send + Sync { + async fn log( + &self, + event_type: &str, + detail: &str, + channel_id: Option, + ) -> DomainResult<()>; + async fn recent(&self, limit: u32) -> DomainResult>; +} diff --git a/k-tv-backend/infra/src/activity_log_repository/mod.rs b/k-tv-backend/infra/src/activity_log_repository/mod.rs new file mode 100644 index 0000000..57910cf --- /dev/null +++ b/k-tv-backend/infra/src/activity_log_repository/mod.rs @@ -0,0 +1,5 @@ +#[cfg(feature = "sqlite")] +mod sqlite; + +#[cfg(feature = "sqlite")] +pub use sqlite::SqliteActivityLogRepository; diff --git a/k-tv-backend/infra/src/activity_log_repository/sqlite.rs b/k-tv-backend/infra/src/activity_log_repository/sqlite.rs new file mode 100644 index 0000000..380a473 --- /dev/null +++ b/k-tv-backend/infra/src/activity_log_repository/sqlite.rs @@ -0,0 +1,71 @@ +use async_trait::async_trait; +use chrono::Utc; +use uuid::Uuid; + +use domain::{ActivityEvent, ActivityLogRepository, DomainError, DomainResult}; + +pub struct SqliteActivityLogRepository { + pool: sqlx::SqlitePool, +} + +impl SqliteActivityLogRepository { + pub fn new(pool: sqlx::SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl ActivityLogRepository for SqliteActivityLogRepository { + async fn log( + &self, + event_type: &str, + detail: &str, + channel_id: Option, + ) -> DomainResult<()> { + let id = Uuid::new_v4().to_string(); + let timestamp = Utc::now().to_rfc3339(); + let channel_id_str = channel_id.map(|id| id.to_string()); + + sqlx::query( + "INSERT INTO activity_log (id, timestamp, event_type, detail, channel_id) VALUES (?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(×tamp) + .bind(event_type) + .bind(detail) + .bind(&channel_id_str) + .execute(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + Ok(()) + } + + async fn recent(&self, limit: u32) -> DomainResult> { + let rows: Vec<(String, String, String, String, Option)> = sqlx::query_as( + "SELECT id, timestamp, event_type, detail, channel_id FROM activity_log ORDER BY timestamp DESC LIMIT ?", + ) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|e| DomainError::RepositoryError(e.to_string()))?; + + let events = rows + .into_iter() + .filter_map(|(id, timestamp, event_type, detail, channel_id)| { + let id = Uuid::parse_str(&id).ok()?; + let timestamp = timestamp.parse().ok()?; + let channel_id = channel_id.and_then(|s| Uuid::parse_str(&s).ok()); + Some(ActivityEvent { + id, + timestamp, + event_type, + detail, + channel_id, + }) + }) + .collect(); + + Ok(events) + } +} diff --git a/k-tv-backend/infra/src/factory.rs b/k-tv-backend/infra/src/factory.rs index ada97d0..51cddfe 100644 --- a/k-tv-backend/infra/src/factory.rs +++ b/k-tv-backend/infra/src/factory.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::db::DatabasePool; -use domain::{ChannelRepository, ScheduleRepository, UserRepository}; +use domain::{ActivityLogRepository, ChannelRepository, ScheduleRepository, UserRepository}; #[derive(Debug, thiserror::Error)] pub enum FactoryError { @@ -51,6 +51,25 @@ pub async fn build_channel_repository( } } +pub async fn build_activity_log_repository( + pool: &DatabasePool, +) -> FactoryResult> { + match pool { + #[cfg(feature = "sqlite")] + DatabasePool::Sqlite(pool) => Ok(Arc::new( + crate::activity_log_repository::SqliteActivityLogRepository::new(pool.clone()), + )), + #[cfg(feature = "postgres")] + DatabasePool::Postgres(_pool) => Err(FactoryError::NotImplemented( + "ActivityLogRepository not yet implemented for Postgres".to_string(), + )), + #[allow(unreachable_patterns)] + _ => Err(FactoryError::NotImplemented( + "No database feature enabled".to_string(), + )), + } +} + pub async fn build_schedule_repository( pool: &DatabasePool, ) -> FactoryResult> { diff --git a/k-tv-backend/infra/src/lib.rs b/k-tv-backend/infra/src/lib.rs index c8e1125..6ef5099 100644 --- a/k-tv-backend/infra/src/lib.rs +++ b/k-tv-backend/infra/src/lib.rs @@ -18,6 +18,7 @@ pub mod db; pub mod factory; pub mod jellyfin; pub mod provider_registry; +mod activity_log_repository; mod channel_repository; mod schedule_repository; mod user_repository; @@ -29,6 +30,8 @@ pub mod local_files; pub use db::run_migrations; pub use provider_registry::ProviderRegistry; +#[cfg(feature = "sqlite")] +pub use activity_log_repository::SqliteActivityLogRepository; #[cfg(feature = "sqlite")] pub use user_repository::SqliteUserRepository; #[cfg(feature = "sqlite")] diff --git a/k-tv-backend/migrations_sqlite/20260317000000_add_activity_log.sql b/k-tv-backend/migrations_sqlite/20260317000000_add_activity_log.sql new file mode 100644 index 0000000..218fd59 --- /dev/null +++ b/k-tv-backend/migrations_sqlite/20260317000000_add_activity_log.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS activity_log ( + id TEXT PRIMARY KEY NOT NULL, + timestamp TEXT NOT NULL, + event_type TEXT NOT NULL, + detail TEXT NOT NULL, + channel_id TEXT +); + +CREATE INDEX IF NOT EXISTS idx_activity_log_timestamp ON activity_log(timestamp DESC); diff --git a/k-tv-frontend/app/(main)/admin/components/activity-log-panel.tsx b/k-tv-frontend/app/(main)/admin/components/activity-log-panel.tsx new file mode 100644 index 0000000..67cb6ca --- /dev/null +++ b/k-tv-frontend/app/(main)/admin/components/activity-log-panel.tsx @@ -0,0 +1,73 @@ +"use client"; + +import type { ActivityEvent } from "@/lib/types"; + +const eventColors: Record = { + channel_created: "bg-green-900/40 text-green-400", + channel_updated: "bg-blue-900/40 text-blue-400", + channel_deleted: "bg-red-900/40 text-red-400", + schedule_generated: "bg-violet-900/40 text-violet-400", + user_login: "bg-zinc-800 text-zinc-400", +}; + +function fmtTs(ts: string) { + try { + const d = new Date(ts); + return d.toLocaleTimeString(undefined, { hour12: false }); + } catch { + return ts; + } +} + +interface ActivityLogPanelProps { + events: ActivityEvent[]; + isLoading: boolean; +} + +export function ActivityLogPanel({ events, isLoading }: ActivityLogPanelProps) { + return ( +
+
+ + Activity + +
+ +
+ {isLoading && events.length === 0 ? ( +

Loading…

+ ) : events.length === 0 ? ( +

No activity yet.

+ ) : ( +
+ {events.map((event) => ( +
+
+ + {event.event_type.replace(/_/g, " ")} + + + {fmtTs(event.timestamp)} + +
+

{event.detail}

+ {event.channel_id && ( +

+ ch: {event.channel_id.slice(0, 8)}… +

+ )} +
+ ))} +
+ )} +
+
+ ); +} diff --git a/k-tv-frontend/app/(main)/admin/components/server-logs-panel.tsx b/k-tv-frontend/app/(main)/admin/components/server-logs-panel.tsx new file mode 100644 index 0000000..8779667 --- /dev/null +++ b/k-tv-frontend/app/(main)/admin/components/server-logs-panel.tsx @@ -0,0 +1,129 @@ +"use client"; + +import { useEffect, useRef, useState } from "react"; +import type { LogLine } from "@/lib/types"; + +const LEVELS = ["DEBUG", "INFO", "WARN", "ERROR"] as const; + +const levelColor: Record = { + DEBUG: "text-zinc-500", + INFO: "text-zinc-300", + WARN: "text-yellow-400", + ERROR: "text-red-400", +}; + +interface ServerLogsPanelProps { + lines: LogLine[]; + connected: boolean; + onClear: () => void; +} + +export function ServerLogsPanel({ lines, connected, onClear }: ServerLogsPanelProps) { + const scrollRef = useRef(null); + const [autoScroll, setAutoScroll] = useState(true); + const [levelFilter, setLevelFilter] = useState>( + new Set(["DEBUG", "INFO", "WARN", "ERROR"]), + ); + + const filtered = lines.filter((l) => levelFilter.has(l.level.toUpperCase())); + + useEffect(() => { + if (!autoScroll) return; + scrollRef.current?.scrollTo({ top: scrollRef.current.scrollHeight }); + }, [filtered.length, autoScroll]); + + const toggleLevel = (level: string) => { + setLevelFilter((prev) => { + const next = new Set(prev); + if (next.has(level)) next.delete(level); + else next.add(level); + return next; + }); + }; + + const handleScroll = () => { + const el = scrollRef.current; + if (!el) return; + const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 40; + setAutoScroll(atBottom); + }; + + const fmtTime = (ts: string) => { + if (!ts) return ""; + try { + return new Date(ts).toLocaleTimeString(undefined, { hour12: false }); + } catch { + return ts; + } + }; + + return ( +
+ {/* Header */} +
+ + Server Logs + + + {connected ? "● live" : "○ disconnected"} + + +
+ {LEVELS.map((lvl) => ( + + ))} + +
+
+ + {/* Log lines */} +
+ {filtered.length === 0 ? ( +

+ {connected ? "Waiting for log events…" : "Connecting to server…"} +

+ ) : ( + filtered.map((line, i) => ( +
+ {fmtTime(line.timestamp)} + + {line.level?.toUpperCase()} + + {line.target} + {line.message} +
+ )) + )} +
{connected && lines.length > 0 ? "▋" : ""}
+
+
+ ); +} diff --git a/k-tv-frontend/app/(main)/admin/page.tsx b/k-tv-frontend/app/(main)/admin/page.tsx new file mode 100644 index 0000000..8adc1cd --- /dev/null +++ b/k-tv-frontend/app/(main)/admin/page.tsx @@ -0,0 +1,58 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { useRouter } from "next/navigation"; +import { useAuthContext } from "@/context/auth-context"; +import { useActivityLog, useServerLogs } from "@/hooks/use-admin"; +import { ServerLogsPanel } from "./components/server-logs-panel"; +import { ActivityLogPanel } from "./components/activity-log-panel"; + +export default function AdminPage() { + const { token, isLoaded } = useAuthContext(); + const router = useRouter(); + + useEffect(() => { + if (isLoaded && !token) { + router.replace("/login"); + } + }, [isLoaded, token, router]); + + const { lines, connected } = useServerLogs(token); + const [localLines, setLocalLines] = useState(lines); + + // Sync external lines into local state so Clear can reset without clearing the hook + useEffect(() => { + setLocalLines(lines); + }, [lines]); + + const { data: events = [], isLoading } = useActivityLog(token); + + if (!isLoaded || !token) return null; + + return ( +
+ {/* Page header */} +
+

Admin

+ System monitoring & logs +
+ + {/* Two-column layout */} +
+ {/* Left: server logs */} +
+ setLocalLines([])} + /> +
+ + {/* Right: activity log */} +
+ +
+
+
+ ); +} diff --git a/k-tv-frontend/app/(main)/layout.tsx b/k-tv-frontend/app/(main)/layout.tsx index 28d376a..2d7da23 100644 --- a/k-tv-frontend/app/(main)/layout.tsx +++ b/k-tv-frontend/app/(main)/layout.tsx @@ -6,6 +6,7 @@ const NAV_LINKS = [ { href: "/tv", label: "TV" }, { href: "/guide", label: "Guide" }, { href: "/dashboard", label: "Dashboard" }, + { href: "/admin", label: "Admin" }, { href: "/docs", label: "Docs" }, ]; diff --git a/k-tv-frontend/app/(main)/tv/components/stats-panel.tsx b/k-tv-frontend/app/(main)/tv/components/stats-panel.tsx new file mode 100644 index 0000000..f3305e9 --- /dev/null +++ b/k-tv-frontend/app/(main)/tv/components/stats-panel.tsx @@ -0,0 +1,150 @@ +"use client"; + +import { useEffect, useState } from "react"; +import type Hls from "hls.js"; +import type { CurrentBroadcastResponse } from "@/lib/types"; + +interface StatsPanelProps { + videoRef: React.RefObject; + hlsRef: React.RefObject; + streamingProtocol?: "hls" | "direct_file"; + broadcast?: CurrentBroadcastResponse | null; +} + +interface Stats { + protocol: string; + resolution: string; + bitrate: string; + bandwidth: string; + buffer: string; + offset: string; + slotEnds: string; +} + +function fmtSecs(s: number) { + const h = Math.floor(s / 3600); + const m = Math.floor((s % 3600) / 60); + const sec = Math.floor(s % 60); + if (h > 0) return `${h}:${String(m).padStart(2, "0")}:${String(sec).padStart(2, "0")}`; + return `${String(m).padStart(2, "0")}:${String(sec).padStart(2, "0")}`; +} + +function fmtKbps(bps: number) { + if (!bps) return "--"; + return `${Math.round(bps / 1000).toLocaleString()} kbps`; +} + +function getBufferAhead(video: HTMLVideoElement) { + const ct = video.currentTime; + for (let i = 0; i < video.buffered.length; i++) { + if (video.buffered.start(i) <= ct && ct <= video.buffered.end(i)) { + return video.buffered.end(i) - ct; + } + } + return 0; +} + +export function StatsPanel({ videoRef, hlsRef, streamingProtocol, broadcast }: StatsPanelProps) { + const [stats, setStats] = useState({ + protocol: "--", + resolution: "--", + bitrate: "--", + bandwidth: "--", + buffer: "--", + offset: "--", + slotEnds: "--", + }); + + useEffect(() => { + const update = () => { + const video = videoRef.current; + const hls = hlsRef.current; + + const protocol = + streamingProtocol === "direct_file" + ? "Direct file" + : hls + ? "HLS (hls.js)" + : "HLS (native)"; + + let resolution = "--"; + let bitrate = "--"; + let bandwidth = "--"; + + if (hls) { + const level = hls.currentLevel >= 0 ? hls.levels[hls.currentLevel] : null; + if (level) { + resolution = `${level.width}×${level.height}`; + bitrate = fmtKbps(level.bitrate); + } + if (hls.bandwidthEstimate > 0) { + bandwidth = fmtKbps(hls.bandwidthEstimate); + } + } + + const buffer = video ? `${getBufferAhead(video).toFixed(1)} s` : "--"; + const offset = video ? fmtSecs(video.currentTime) : "--"; + + let slotEnds = "--"; + if (broadcast?.slot.end_at) { + const secsLeft = (new Date(broadcast.slot.end_at).getTime() - Date.now()) / 1000; + if (secsLeft > 0) { + slotEnds = `in ${fmtSecs(secsLeft)}`; + } else { + slotEnds = "ending…"; + } + } + + setStats({ protocol, resolution, bitrate, bandwidth, buffer, offset, slotEnds }); + }; + + update(); + const id = setInterval(update, 1000); + return () => clearInterval(id); + }, [videoRef, hlsRef, streamingProtocol, broadcast]); + + return ( +
+
+ + Stats for nerds + + + LIVE + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Protocol{stats.protocol}
Resolution{stats.resolution}
Bitrate{stats.bitrate}
Bandwidth est.{stats.bandwidth}
Buffer{stats.buffer}
Offset{stats.offset}
Slot ends{stats.slotEnds}
+
+ ); +} diff --git a/k-tv-frontend/app/(main)/tv/components/video-player.tsx b/k-tv-frontend/app/(main)/tv/components/video-player.tsx index 13ee850..9682150 100644 --- a/k-tv-frontend/app/(main)/tv/components/video-player.tsx +++ b/k-tv-frontend/app/(main)/tv/components/video-player.tsx @@ -1,6 +1,8 @@ import { forwardRef, useEffect, useRef, useState } from "react"; import Hls from "hls.js"; import { Loader2 } from "lucide-react"; +import type { CurrentBroadcastResponse } from "@/lib/types"; +import { StatsPanel } from "./stats-panel"; export interface SubtitleTrack { id: number; @@ -19,6 +21,10 @@ interface VideoPlayerProps { muted?: boolean; /** Force direct-file mode (skips hls.js even for .m3u8 URLs). */ streamingProtocol?: "hls" | "direct_file"; + /** When true, renders the Stats for Nerds overlay. */ + showStats?: boolean; + /** Current broadcast data passed to the stats panel for slot timing. */ + broadcast?: CurrentBroadcastResponse | null; onStreamError?: () => void; onSubtitleTracksChange?: (tracks: SubtitleTrack[]) => void; /** Called when the browser blocks autoplay and user interaction is required. */ @@ -37,6 +43,8 @@ const VideoPlayer = forwardRef( subtitleTrack = -1, muted = false, streamingProtocol, + showStats = false, + broadcast, onStreamError, onSubtitleTracksChange, onNeedsInteraction, @@ -162,6 +170,16 @@ const VideoPlayer = forwardRef( )} + + {/* Stats for Nerds overlay */} + {showStats && ( + + )} ); }, diff --git a/k-tv-frontend/app/(main)/tv/page.tsx b/k-tv-frontend/app/(main)/tv/page.tsx index 893eb51..60b05af 100644 --- a/k-tv-frontend/app/(main)/tv/page.tsx +++ b/k-tv-frontend/app/(main)/tv/page.tsx @@ -16,6 +16,7 @@ import type { SubtitleTrack } from "./components/video-player"; import type { LogoPosition } from "@/lib/types"; import { Cast, + Info, Maximize2, Minimize2, Volume1, @@ -101,6 +102,7 @@ function TvPageContent() { // Overlay / idle state const [showOverlays, setShowOverlays] = useState(true); const [showSchedule, setShowSchedule] = useState(false); + const [showStats, setShowStats] = useState(false); const idleTimer = useRef | null>(null); // Video ref — used to resume playback if autoplay was blocked on load @@ -411,6 +413,10 @@ function TvPageContent() { e.preventDefault(); prevChannel(); break; + case "s": + case "S": + setShowStats((v) => !v); + break; case "g": case "G": toggleSchedule(); @@ -623,6 +629,8 @@ function TvPageContent() { } subtitleTrack={activeSubtitleTrack} muted={isMuted} + showStats={showStats} + broadcast={broadcast} onSubtitleTracksChange={setSubtitleTracks} onStreamError={handleStreamError} onEnded={handleVideoEnded} @@ -850,6 +858,14 @@ function TvPageContent() { )} + +