feat(channel): add auto-schedule feature to channels with background scheduler
This commit is contained in:
@@ -69,6 +69,7 @@ pub struct UpdateChannelRequest {
|
||||
/// Replace the entire schedule config (template import/edit)
|
||||
pub schedule_config: Option<domain::ScheduleConfig>,
|
||||
pub recycle_policy: Option<domain::RecyclePolicy>,
|
||||
pub auto_schedule: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -80,6 +81,7 @@ pub struct ChannelResponse {
|
||||
pub timezone: String,
|
||||
pub schedule_config: domain::ScheduleConfig,
|
||||
pub recycle_policy: domain::RecyclePolicy,
|
||||
pub auto_schedule: bool,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
@@ -94,6 +96,7 @@ impl From<domain::Channel> for ChannelResponse {
|
||||
timezone: c.timezone,
|
||||
schedule_config: c.schedule_config,
|
||||
recycle_policy: c.recycle_policy,
|
||||
auto_schedule: c.auto_schedule,
|
||||
created_at: c.created_at,
|
||||
updated_at: c.updated_at,
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ mod dto;
|
||||
mod error;
|
||||
mod extractors;
|
||||
mod routes;
|
||||
mod scheduler;
|
||||
mod state;
|
||||
|
||||
use crate::config::Config;
|
||||
@@ -72,6 +73,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Build media provider — Jellyfin if configured, no-op fallback otherwise.
|
||||
let media_provider: Arc<dyn IMediaProvider> = build_media_provider(&config);
|
||||
|
||||
let bg_channel_repo = channel_repo.clone();
|
||||
let schedule_engine = ScheduleEngineService::new(
|
||||
Arc::clone(&media_provider),
|
||||
channel_repo,
|
||||
@@ -91,6 +93,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
cors_origins: config.cors_allowed_origins.clone(),
|
||||
};
|
||||
|
||||
let bg_schedule_engine = Arc::clone(&state.schedule_engine);
|
||||
tokio::spawn(scheduler::run_auto_scheduler(bg_schedule_engine, bg_channel_repo));
|
||||
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", routes::api_v1_router())
|
||||
.with_state(state);
|
||||
|
||||
@@ -76,6 +76,9 @@ pub(super) async fn update_channel(
|
||||
if let Some(rp) = payload.recycle_policy {
|
||||
channel.recycle_policy = rp;
|
||||
}
|
||||
if let Some(auto) = payload.auto_schedule {
|
||||
channel.auto_schedule = auto;
|
||||
}
|
||||
channel.updated_at = Utc::now();
|
||||
|
||||
let channel = state.channel_service.update(channel).await?;
|
||||
|
||||
76
k-tv-backend/api/src/scheduler.rs
Normal file
76
k-tv-backend/api/src/scheduler.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
//! Background auto-scheduler task.
|
||||
//!
|
||||
//! Runs every hour, finds channels with `auto_schedule = true`, and regenerates
|
||||
//! their schedule if it is within 24 hours of expiry (or already expired).
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Utc;
|
||||
use domain::{ChannelRepository, ScheduleEngineService};
|
||||
|
||||
pub async fn run_auto_scheduler(
|
||||
schedule_engine: Arc<ScheduleEngineService>,
|
||||
channel_repo: Arc<dyn ChannelRepository>,
|
||||
) {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(3600)).await;
|
||||
tick(&schedule_engine, &channel_repo).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn tick(
|
||||
schedule_engine: &Arc<ScheduleEngineService>,
|
||||
channel_repo: &Arc<dyn ChannelRepository>,
|
||||
) {
|
||||
let channels = match channel_repo.find_auto_schedule_enabled().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::warn!("auto-scheduler: failed to fetch channels: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
for channel in channels {
|
||||
let from = match schedule_engine.get_latest_schedule(channel.id).await {
|
||||
Ok(Some(s)) => {
|
||||
let remaining = s.valid_until - now;
|
||||
if remaining > chrono::Duration::hours(24) {
|
||||
// Still fresh — skip until it gets close to expiry
|
||||
continue;
|
||||
} else if s.valid_until > now {
|
||||
// Seamless handoff: new schedule starts where the old one ends
|
||||
s.valid_until
|
||||
} else {
|
||||
// Expired: start from now to avoid scheduling in the past
|
||||
now
|
||||
}
|
||||
}
|
||||
Ok(None) => now,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"auto-scheduler: failed to fetch latest schedule for channel {}: {}",
|
||||
channel.id,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = schedule_engine.generate_schedule(channel.id, from).await {
|
||||
tracing::warn!(
|
||||
"auto-scheduler: failed to generate schedule for channel {}: {}",
|
||||
channel.id,
|
||||
e
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"auto-scheduler: generated schedule for channel {} starting at {}",
|
||||
channel.id,
|
||||
from
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,7 @@ pub struct Channel {
|
||||
pub timezone: String,
|
||||
pub schedule_config: ScheduleConfig,
|
||||
pub recycle_policy: RecyclePolicy,
|
||||
pub auto_schedule: bool,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
@@ -100,6 +101,7 @@ impl Channel {
|
||||
timezone: timezone.into(),
|
||||
schedule_config: ScheduleConfig::default(),
|
||||
recycle_policy: RecyclePolicy::default(),
|
||||
auto_schedule: false,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ pub trait ChannelRepository: Send + Sync {
|
||||
async fn find_by_id(&self, id: ChannelId) -> DomainResult<Option<Channel>>;
|
||||
async fn find_by_owner(&self, owner_id: UserId) -> DomainResult<Vec<Channel>>;
|
||||
async fn find_all(&self) -> DomainResult<Vec<Channel>>;
|
||||
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>>;
|
||||
/// Insert or update a channel.
|
||||
async fn save(&self, channel: &Channel) -> DomainResult<()>;
|
||||
async fn delete(&self, id: ChannelId) -> DomainResult<()>;
|
||||
|
||||
@@ -206,6 +206,14 @@ impl ScheduleEngineService {
|
||||
})
|
||||
}
|
||||
|
||||
/// Return the most recently generated schedule for a channel (used by the background scheduler).
|
||||
pub async fn get_latest_schedule(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
) -> DomainResult<Option<GeneratedSchedule>> {
|
||||
self.schedule_repo.find_latest(channel_id).await
|
||||
}
|
||||
|
||||
/// Look up the schedule currently active at `at` without generating a new one.
|
||||
pub async fn get_active_schedule(
|
||||
&self,
|
||||
|
||||
@@ -13,6 +13,7 @@ pub(super) struct ChannelRow {
|
||||
pub timezone: String,
|
||||
pub schedule_config: String,
|
||||
pub recycle_policy: String,
|
||||
pub auto_schedule: i64,
|
||||
pub created_at: String,
|
||||
pub updated_at: String,
|
||||
}
|
||||
@@ -51,6 +52,7 @@ impl TryFrom<ChannelRow> for Channel {
|
||||
timezone: row.timezone,
|
||||
schedule_config,
|
||||
recycle_policy,
|
||||
auto_schedule: row.auto_schedule != 0,
|
||||
created_at: parse_dt(&row.created_at)?,
|
||||
updated_at: parse_dt(&row.updated_at)?,
|
||||
})
|
||||
@@ -58,4 +60,4 @@ impl TryFrom<ChannelRow> for Channel {
|
||||
}
|
||||
|
||||
pub(super) const SELECT_COLS: &str =
|
||||
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at";
|
||||
"id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, created_at, updated_at";
|
||||
|
||||
@@ -61,14 +61,15 @@ impl ChannelRepository for PostgresChannelRepository {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO channels
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = EXCLUDED.name,
|
||||
description = EXCLUDED.description,
|
||||
timezone = EXCLUDED.timezone,
|
||||
schedule_config = EXCLUDED.schedule_config,
|
||||
recycle_policy = EXCLUDED.recycle_policy,
|
||||
auto_schedule = EXCLUDED.auto_schedule,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
"#,
|
||||
)
|
||||
@@ -79,6 +80,7 @@ impl ChannelRepository for PostgresChannelRepository {
|
||||
.bind(&channel.timezone)
|
||||
.bind(&schedule_config)
|
||||
.bind(&recycle_policy)
|
||||
.bind(channel.auto_schedule as i64)
|
||||
.bind(channel.created_at.to_rfc3339())
|
||||
.bind(channel.updated_at.to_rfc3339())
|
||||
.execute(&self.pool)
|
||||
@@ -88,6 +90,18 @@ impl ChannelRepository for PostgresChannelRepository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>> {
|
||||
let sql = format!(
|
||||
"SELECT {SELECT_COLS} FROM channels WHERE auto_schedule = 1 ORDER BY created_at ASC"
|
||||
);
|
||||
let rows: Vec<ChannelRow> = sqlx::query_as(&sql)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(Channel::try_from).collect()
|
||||
}
|
||||
|
||||
async fn delete(&self, id: ChannelId) -> DomainResult<()> {
|
||||
sqlx::query("DELETE FROM channels WHERE id = $1")
|
||||
.bind(id.to_string())
|
||||
|
||||
@@ -61,14 +61,15 @@ impl ChannelRepository for SqliteChannelRepository {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO channels
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
(id, owner_id, name, description, timezone, schedule_config, recycle_policy, auto_schedule, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
name = excluded.name,
|
||||
description = excluded.description,
|
||||
timezone = excluded.timezone,
|
||||
schedule_config = excluded.schedule_config,
|
||||
recycle_policy = excluded.recycle_policy,
|
||||
auto_schedule = excluded.auto_schedule,
|
||||
updated_at = excluded.updated_at
|
||||
"#,
|
||||
)
|
||||
@@ -79,6 +80,7 @@ impl ChannelRepository for SqliteChannelRepository {
|
||||
.bind(&channel.timezone)
|
||||
.bind(&schedule_config)
|
||||
.bind(&recycle_policy)
|
||||
.bind(channel.auto_schedule as i64)
|
||||
.bind(channel.created_at.to_rfc3339())
|
||||
.bind(channel.updated_at.to_rfc3339())
|
||||
.execute(&self.pool)
|
||||
@@ -88,6 +90,18 @@ impl ChannelRepository for SqliteChannelRepository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_auto_schedule_enabled(&self) -> DomainResult<Vec<Channel>> {
|
||||
let sql = format!(
|
||||
"SELECT {SELECT_COLS} FROM channels WHERE auto_schedule = 1 ORDER BY created_at ASC"
|
||||
);
|
||||
let rows: Vec<ChannelRow> = sqlx::query_as(&sql)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::RepositoryError(e.to_string()))?;
|
||||
|
||||
rows.into_iter().map(Channel::try_from).collect()
|
||||
}
|
||||
|
||||
async fn delete(&self, id: ChannelId) -> DomainResult<()> {
|
||||
sqlx::query("DELETE FROM channels WHERE id = ?")
|
||||
.bind(id.to_string())
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE channels ADD COLUMN auto_schedule INTEGER NOT NULL DEFAULT 0;
|
||||
Reference in New Issue
Block a user