Files
k-tv/k-tv-backend/domain/src/services.rs

570 lines
19 KiB
Rust

//! Domain Services
//!
//! Services contain the business logic of the application.
use std::collections::HashSet;
use std::sync::Arc;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono_tz::Tz;
use rand::seq::SliceRandom;
use uuid::Uuid;
use crate::entities::{
BlockContent, CurrentBroadcast, GeneratedSchedule, MediaItem, PlaybackRecord,
ProgrammingBlock, ScheduledSlot,
};
use crate::errors::{DomainError, DomainResult};
use crate::ports::IMediaProvider;
use crate::repositories::{ChannelRepository, ScheduleRepository, UserRepository};
use crate::value_objects::{
BlockId, ChannelId, Email, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy,
};
// ============================================================================
// UserService
// ============================================================================
/// Service for managing users.
pub struct UserService {
user_repository: Arc<dyn UserRepository>,
}
impl UserService {
pub fn new(user_repository: Arc<dyn UserRepository>) -> Self {
Self { user_repository }
}
pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult<crate::entities::User> {
if let Some(user) = self.user_repository.find_by_subject(subject).await? {
return Ok(user);
}
if let Some(mut user) = self.user_repository.find_by_email(email).await? {
if user.subject != subject {
user.subject = subject.to_string();
self.user_repository.save(&user).await?;
}
return Ok(user);
}
let email = Email::try_from(email)?;
let user = crate::entities::User::new(subject, email);
self.user_repository.save(&user).await?;
Ok(user)
}
pub async fn find_by_id(&self, id: Uuid) -> DomainResult<crate::entities::User> {
self.user_repository
.find_by_id(id)
.await?
.ok_or(DomainError::UserNotFound(id))
}
pub async fn find_by_email(&self, email: &str) -> DomainResult<Option<crate::entities::User>> {
self.user_repository.find_by_email(email).await
}
pub async fn create_local(
&self,
email: &str,
password_hash: &str,
) -> DomainResult<crate::entities::User> {
let email = Email::try_from(email)?;
let user = crate::entities::User::new_local(email, password_hash);
self.user_repository.save(&user).await?;
Ok(user)
}
}
// ============================================================================
// ChannelService
// ============================================================================
/// Service for managing channels (CRUD + ownership enforcement).
pub struct ChannelService {
channel_repo: Arc<dyn ChannelRepository>,
}
impl ChannelService {
pub fn new(channel_repo: Arc<dyn ChannelRepository>) -> Self {
Self { channel_repo }
}
pub async fn create(
&self,
owner_id: crate::value_objects::UserId,
name: &str,
timezone: &str,
) -> DomainResult<crate::entities::Channel> {
let channel = crate::entities::Channel::new(owner_id, name, timezone);
self.channel_repo.save(&channel).await?;
Ok(channel)
}
pub async fn find_by_id(
&self,
id: ChannelId,
) -> DomainResult<crate::entities::Channel> {
self.channel_repo
.find_by_id(id)
.await?
.ok_or(DomainError::ChannelNotFound(id))
}
pub async fn find_all(&self) -> DomainResult<Vec<crate::entities::Channel>> {
self.channel_repo.find_all().await
}
pub async fn find_by_owner(
&self,
owner_id: crate::value_objects::UserId,
) -> DomainResult<Vec<crate::entities::Channel>> {
self.channel_repo.find_by_owner(owner_id).await
}
pub async fn update(
&self,
channel: crate::entities::Channel,
) -> DomainResult<crate::entities::Channel> {
self.channel_repo.save(&channel).await?;
Ok(channel)
}
/// Delete a channel, enforcing that `requester_id` is the owner.
pub async fn delete(
&self,
id: ChannelId,
requester_id: crate::value_objects::UserId,
) -> DomainResult<()> {
let channel = self.find_by_id(id).await?;
if channel.owner_id != requester_id {
return Err(DomainError::forbidden("You don't own this channel"));
}
self.channel_repo.delete(id).await
}
}
// ============================================================================
// ScheduleEngineService
// ============================================================================
/// Core scheduling engine.
///
/// Generates 48-hour broadcast schedules by walking through a channel's
/// `ScheduleConfig` day by day, resolving each `ProgrammingBlock` into concrete
/// `ScheduledSlot`s via the `IMediaProvider`, and applying the `RecyclePolicy`
/// to avoid replaying recently aired items.
pub struct ScheduleEngineService {
media_provider: Arc<dyn IMediaProvider>,
channel_repo: Arc<dyn ChannelRepository>,
schedule_repo: Arc<dyn ScheduleRepository>,
}
impl ScheduleEngineService {
pub fn new(
media_provider: Arc<dyn IMediaProvider>,
channel_repo: Arc<dyn ChannelRepository>,
schedule_repo: Arc<dyn ScheduleRepository>,
) -> Self {
Self {
media_provider,
channel_repo,
schedule_repo,
}
}
// -------------------------------------------------------------------------
// Public API
// -------------------------------------------------------------------------
/// Generate and persist a 48-hour schedule for `channel_id` starting at `from`.
///
/// The algorithm:
/// 1. Walk each calendar day in the 48-hour window.
/// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day.
/// 3. Clip the interval to `[from, from + 48h)`.
/// 4. Resolve the block content via the media provider, applying the recycle policy.
/// 5. Record every played item in the playback history.
///
/// Gaps between blocks are left empty — clients render them as a no-signal state.
pub async fn generate_schedule(
&self,
channel_id: ChannelId,
from: DateTime<Utc>,
) -> DomainResult<GeneratedSchedule> {
let channel = self
.channel_repo
.find_by_id(channel_id)
.await?
.ok_or(DomainError::ChannelNotFound(channel_id))?;
let tz: Tz = channel
.timezone
.parse()
.map_err(|_| DomainError::TimezoneError(channel.timezone.clone()))?;
let history = self
.schedule_repo
.find_playback_history(channel_id)
.await?;
let generation = self
.schedule_repo
.find_latest(channel_id)
.await?
.map(|s| s.generation + 1)
.unwrap_or(1);
let valid_from = from;
let valid_until = from + Duration::hours(48);
let start_date = from.with_timezone(&tz).date_naive();
let end_date = valid_until.with_timezone(&tz).date_naive();
let mut slots: Vec<ScheduledSlot> = Vec::new();
let mut current_date = start_date;
while current_date <= end_date {
for block in &channel.schedule_config.blocks {
let naive_start = current_date.and_time(block.start_time);
// `earliest()` handles DST gaps — if the local time doesn't exist
// (e.g. clocks spring forward) we skip this block occurrence.
let block_start_utc = match tz.from_local_datetime(&naive_start).earliest() {
Some(dt) => dt.with_timezone(&Utc),
None => continue,
};
let block_end_utc =
block_start_utc + Duration::minutes(block.duration_mins as i64);
// Clip to the 48-hour window.
let slot_start = block_start_utc.max(valid_from);
let slot_end = block_end_utc.min(valid_until);
if slot_end <= slot_start {
continue;
}
let mut block_slots = self
.resolve_block(
block,
slot_start,
slot_end,
&history,
&channel.recycle_policy,
generation,
)
.await?;
slots.append(&mut block_slots);
}
current_date = current_date.succ_opt().ok_or_else(|| {
DomainError::validation("Date overflow during schedule generation")
})?;
}
// Blocks in ScheduleConfig are not required to be sorted; sort resolved slots.
slots.sort_by_key(|s| s.start_at);
let schedule = GeneratedSchedule {
id: Uuid::new_v4(),
channel_id,
valid_from,
valid_until,
generation,
slots,
};
self.schedule_repo.save(&schedule).await?;
// Persist playback history so the recycle policy has data for next generation.
for slot in &schedule.slots {
let record =
PlaybackRecord::new(channel_id, slot.item.id.clone(), generation);
self.schedule_repo.save_playback_record(&record).await?;
}
Ok(schedule)
}
/// Determine what is currently broadcasting on a schedule.
///
/// Returns `None` when `now` falls in a gap between blocks — the client
/// should display a no-signal / static screen in that case.
pub fn get_current_broadcast(
schedule: &GeneratedSchedule,
now: DateTime<Utc>,
) -> Option<CurrentBroadcast> {
schedule
.slots
.iter()
.find(|s| s.start_at <= now && now < s.end_at)
.map(|slot| CurrentBroadcast {
slot: slot.clone(),
offset_secs: (now - slot.start_at).num_seconds() as u32,
})
}
/// Look up the schedule currently active at `at` without generating a new one.
pub async fn get_active_schedule(
&self,
channel_id: ChannelId,
at: DateTime<Utc>,
) -> DomainResult<Option<GeneratedSchedule>> {
self.schedule_repo.find_active(channel_id, at).await
}
/// Delegate stream URL resolution to the configured media provider.
pub async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult<String> {
self.media_provider.get_stream_url(item_id).await
}
/// Return all slots that overlap the given time window — the EPG data.
pub fn get_epg<'a>(
schedule: &'a GeneratedSchedule,
from: DateTime<Utc>,
until: DateTime<Utc>,
) -> Vec<&'a ScheduledSlot> {
schedule
.slots
.iter()
.filter(|s| s.start_at < until && s.end_at > from)
.collect()
}
// -------------------------------------------------------------------------
// Block resolution
// -------------------------------------------------------------------------
async fn resolve_block(
&self,
block: &ProgrammingBlock,
start: DateTime<Utc>,
end: DateTime<Utc>,
history: &[PlaybackRecord],
policy: &RecyclePolicy,
generation: u32,
) -> DomainResult<Vec<ScheduledSlot>> {
match &block.content {
BlockContent::Manual { items } => {
self.resolve_manual(items, start, end, block.id).await
}
BlockContent::Algorithmic { filter, strategy } => {
self.resolve_algorithmic(
filter, strategy, start, end, history, policy, generation, block.id,
)
.await
}
}
}
/// Resolve a manual block by fetching each hand-picked item in order.
/// Stops when the block's time budget (`end`) is exhausted.
async fn resolve_manual(
&self,
item_ids: &[MediaItemId],
start: DateTime<Utc>,
end: DateTime<Utc>,
block_id: BlockId,
) -> DomainResult<Vec<ScheduledSlot>> {
let mut slots = Vec::new();
let mut cursor = start;
for item_id in item_ids {
if cursor >= end {
break;
}
if let Some(item) = self.media_provider.fetch_by_id(item_id).await? {
let item_end =
(cursor + Duration::seconds(item.duration_secs as i64)).min(end);
slots.push(ScheduledSlot {
id: Uuid::new_v4(),
start_at: cursor,
end_at: item_end,
item,
source_block_id: block_id,
});
cursor = item_end;
}
// If item is not found (deleted/unavailable), silently skip it.
}
Ok(slots)
}
/// Resolve an algorithmic block: fetch candidates, apply recycle policy,
/// run the fill strategy, and build slots.
async fn resolve_algorithmic(
&self,
filter: &MediaFilter,
strategy: &FillStrategy,
start: DateTime<Utc>,
end: DateTime<Utc>,
history: &[PlaybackRecord],
policy: &RecyclePolicy,
generation: u32,
block_id: BlockId,
) -> DomainResult<Vec<ScheduledSlot>> {
let candidates = self.media_provider.fetch_items(filter).await?;
if candidates.is_empty() {
return Ok(vec![]);
}
let pool = Self::apply_recycle_policy(candidates, history, policy, generation);
let target_secs = (end - start).num_seconds() as u32;
let selected = Self::fill_block(&pool, target_secs, strategy);
let mut slots = Vec::new();
let mut cursor = start;
for item in selected {
if cursor >= end {
break;
}
let item_end =
(cursor + Duration::seconds(item.duration_secs as i64)).min(end);
slots.push(ScheduledSlot {
id: Uuid::new_v4(),
start_at: cursor,
end_at: item_end,
item: item.clone(),
source_block_id: block_id,
});
cursor = item_end;
}
Ok(slots)
}
// -------------------------------------------------------------------------
// Recycle policy
// -------------------------------------------------------------------------
/// Filter `candidates` according to `policy`, returning the eligible pool.
///
/// An item is on cooldown if *either* the day-based or generation-based
/// threshold is exceeded. If honouring all cooldowns would leave fewer items
/// than `policy.min_available_ratio` of the total, all cooldowns are waived
/// and the full pool is returned (prevents small libraries from stalling).
fn apply_recycle_policy(
candidates: Vec<MediaItem>,
history: &[PlaybackRecord],
policy: &RecyclePolicy,
current_generation: u32,
) -> Vec<MediaItem> {
let now = Utc::now();
let excluded: HashSet<MediaItemId> = history
.iter()
.filter(|record| {
let by_days = policy
.cooldown_days
.map(|days| (now - record.played_at).num_days() < days as i64)
.unwrap_or(false);
let by_gen = policy
.cooldown_generations
.map(|gens| {
current_generation.saturating_sub(record.generation) < gens
})
.unwrap_or(false);
by_days || by_gen
})
.map(|r| r.item_id.clone())
.collect();
let available: Vec<MediaItem> = candidates
.iter()
.filter(|i| !excluded.contains(&i.id))
.cloned()
.collect();
let min_count =
(candidates.len() as f32 * policy.min_available_ratio).ceil() as usize;
if available.len() < min_count {
// Pool too small after applying cooldowns — recycle everything.
candidates
} else {
available
}
}
// -------------------------------------------------------------------------
// Fill strategies
// -------------------------------------------------------------------------
fn fill_block<'a>(
pool: &'a [MediaItem],
target_secs: u32,
strategy: &FillStrategy,
) -> Vec<&'a MediaItem> {
match strategy {
FillStrategy::BestFit => Self::fill_best_fit(pool, target_secs),
FillStrategy::Sequential => Self::fill_sequential(pool, target_secs),
FillStrategy::Random => {
let mut indices: Vec<usize> = (0..pool.len()).collect();
indices.shuffle(&mut rand::thread_rng());
let mut remaining = target_secs;
let mut result = Vec::new();
for i in indices {
let item = &pool[i];
if item.duration_secs <= remaining {
remaining -= item.duration_secs;
result.push(item);
}
}
result
}
}
}
/// Greedy bin-packing: at each step pick the longest item that still fits
/// in the remaining budget, without repeating items within the same block.
fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> {
let mut remaining = target_secs;
let mut selected: Vec<&MediaItem> = Vec::new();
let mut used: HashSet<usize> = HashSet::new();
loop {
let best = pool
.iter()
.enumerate()
.filter(|(idx, item)| {
!used.contains(idx) && item.duration_secs <= remaining
})
.max_by_key(|(_, item)| item.duration_secs);
match best {
Some((idx, item)) => {
remaining -= item.duration_secs;
used.insert(idx);
selected.push(item);
}
None => break,
}
}
selected
}
/// Sequential: iterate the pool in order, picking items that fit within
/// the remaining budget. Good for series where episode order matters.
fn fill_sequential(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> {
let mut remaining = target_secs;
let mut result = Vec::new();
for item in pool {
if item.duration_secs <= remaining {
remaining -= item.duration_secs;
result.push(item);
}
}
result
}
}