Refactor schedule and user repositories into modular structure

- Moved schedule repository logic into separate modules for SQLite and PostgreSQL implementations.
- Created a mapping module for shared data structures and mapping functions in the schedule repository.
- Added new mapping module for user repository to handle user data transformations.
- Implemented PostgreSQL and SQLite user repository adapters with necessary CRUD operations.
- Added tests for user repository functionality, including saving, finding, and deleting users.
This commit is contained in:
2026-03-13 01:35:14 +01:00
parent 79ced7b77b
commit eeb4e2cb41
39 changed files with 2288 additions and 2194 deletions

View File

@@ -0,0 +1,57 @@
use std::sync::Arc;
use crate::entities::Channel;
use crate::errors::{DomainError, DomainResult};
use crate::repositories::ChannelRepository;
use crate::value_objects::{ChannelId, UserId};
/// Service for managing channels (CRUD + ownership enforcement).
pub struct ChannelService {
channel_repo: Arc<dyn ChannelRepository>,
}
impl ChannelService {
pub fn new(channel_repo: Arc<dyn ChannelRepository>) -> Self {
Self { channel_repo }
}
pub async fn create(
&self,
owner_id: UserId,
name: &str,
timezone: &str,
) -> DomainResult<Channel> {
let channel = Channel::new(owner_id, name, timezone);
self.channel_repo.save(&channel).await?;
Ok(channel)
}
pub async fn find_by_id(&self, id: ChannelId) -> DomainResult<Channel> {
self.channel_repo
.find_by_id(id)
.await?
.ok_or(DomainError::ChannelNotFound(id))
}
pub async fn find_all(&self) -> DomainResult<Vec<Channel>> {
self.channel_repo.find_all().await
}
pub async fn find_by_owner(&self, owner_id: UserId) -> DomainResult<Vec<Channel>> {
self.channel_repo.find_by_owner(owner_id).await
}
pub async fn update(&self, channel: Channel) -> DomainResult<Channel> {
self.channel_repo.save(&channel).await?;
Ok(channel)
}
/// Delete a channel, enforcing that `requester_id` is the owner.
pub async fn delete(&self, id: ChannelId, requester_id: UserId) -> DomainResult<()> {
let channel = self.find_by_id(id).await?;
if channel.owner_id != requester_id {
return Err(DomainError::forbidden("You don't own this channel"));
}
self.channel_repo.delete(id).await
}
}

View File

@@ -0,0 +1,11 @@
//! Domain Services
//!
//! Services contain the business logic of the application.
pub mod channel;
pub mod schedule;
pub mod user;
pub use channel::ChannelService;
pub use schedule::ScheduleEngineService;
pub use user::UserService;

View File

@@ -0,0 +1,119 @@
use std::collections::HashSet;
use rand::seq::SliceRandom;
use crate::entities::MediaItem;
use crate::value_objects::{FillStrategy, MediaItemId};
pub(super) fn fill_block<'a>(
candidates: &'a [MediaItem],
pool: &'a [MediaItem],
target_secs: u32,
strategy: &FillStrategy,
last_item_id: Option<&MediaItemId>,
) -> Vec<&'a MediaItem> {
match strategy {
FillStrategy::BestFit => fill_best_fit(pool, target_secs),
FillStrategy::Sequential => fill_sequential(candidates, pool, target_secs, last_item_id),
FillStrategy::Random => {
let mut indices: Vec<usize> = (0..pool.len()).collect();
indices.shuffle(&mut rand::thread_rng());
let mut remaining = target_secs;
let mut result = Vec::new();
for i in indices {
let item = &pool[i];
if item.duration_secs <= remaining {
remaining -= item.duration_secs;
result.push(item);
}
}
result
}
}
}
/// Greedy bin-packing: at each step pick the longest item that still fits
/// in the remaining budget, without repeating items within the same block.
pub(super) fn fill_best_fit(pool: &[MediaItem], target_secs: u32) -> Vec<&MediaItem> {
let mut remaining = target_secs;
let mut selected: Vec<&MediaItem> = Vec::new();
let mut used: HashSet<usize> = HashSet::new();
loop {
let best = pool
.iter()
.enumerate()
.filter(|(idx, item)| {
!used.contains(idx) && item.duration_secs <= remaining
})
.max_by_key(|(_, item)| item.duration_secs);
match best {
Some((idx, item)) => {
remaining -= item.duration_secs;
used.insert(idx);
selected.push(item);
}
None => break,
}
}
selected
}
/// Sequential fill with cross-generation series continuity.
///
/// `candidates` — all items matching the filter, in Jellyfin's natural order
/// (typically by season + episode number for TV shows).
/// `pool` — candidates filtered by the recycle policy (eligible to air).
/// `last_item_id` — the last item scheduled in this block in the previous
/// generation or in an earlier occurrence of this block within
/// the current generation. Used to resume the series from the
/// next episode rather than restarting from episode 1.
///
/// Algorithm:
/// 1. Find `last_item_id`'s position in `candidates` and start from the next index.
/// 2. Walk the full `candidates` list in order (wrapping around at the end),
/// but only pick items that are in `pool` (i.e. not on cooldown).
/// 3. Greedily fill the time budget with items in that order.
///
/// This ensures episodes always air in series order, the series wraps correctly
/// when the last episode has been reached, and cooldowns are still respected.
pub(super) fn fill_sequential<'a>(
candidates: &'a [MediaItem],
pool: &'a [MediaItem],
target_secs: u32,
last_item_id: Option<&MediaItemId>,
) -> Vec<&'a MediaItem> {
if pool.is_empty() {
return vec![];
}
// Set of item IDs currently eligible to air.
let available: HashSet<&MediaItemId> = pool.iter().map(|i| &i.id).collect();
// Find where in the full ordered list to resume.
// Falls back to index 0 if last_item_id is absent or was removed from the library.
let start_idx = last_item_id
.and_then(|id| candidates.iter().position(|c| &c.id == id))
.map(|pos| (pos + 1) % candidates.len())
.unwrap_or(0);
// Walk candidates in order from start_idx, wrapping around once,
// skipping any that are on cooldown (not in `available`).
let ordered: Vec<&MediaItem> = (0..candidates.len())
.map(|i| &candidates[(start_idx + i) % candidates.len()])
.filter(|item| available.contains(&item.id))
.collect();
// Greedily fill the block's time budget in episode order.
let mut remaining = target_secs;
let mut result = Vec::new();
for item in ordered {
if item.duration_secs <= remaining {
remaining -= item.duration_secs;
result.push(item);
}
}
result
}

View File

@@ -0,0 +1,349 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono_tz::Tz;
use uuid::Uuid;
use crate::entities::{
BlockContent, CurrentBroadcast, GeneratedSchedule, PlaybackRecord, ProgrammingBlock,
ScheduledSlot,
};
use crate::errors::{DomainError, DomainResult};
use crate::ports::IMediaProvider;
use crate::repositories::{ChannelRepository, ScheduleRepository};
use crate::value_objects::{
BlockId, ChannelId, FillStrategy, MediaFilter, MediaItemId, RecyclePolicy,
};
mod fill;
mod recycle;
/// Core scheduling engine.
///
/// Generates 48-hour broadcast schedules by walking through a channel's
/// `ScheduleConfig` day by day, resolving each `ProgrammingBlock` into concrete
/// `ScheduledSlot`s via the `IMediaProvider`, and applying the `RecyclePolicy`
/// to avoid replaying recently aired items.
pub struct ScheduleEngineService {
media_provider: Arc<dyn IMediaProvider>,
channel_repo: Arc<dyn ChannelRepository>,
schedule_repo: Arc<dyn ScheduleRepository>,
}
impl ScheduleEngineService {
pub fn new(
media_provider: Arc<dyn IMediaProvider>,
channel_repo: Arc<dyn ChannelRepository>,
schedule_repo: Arc<dyn ScheduleRepository>,
) -> Self {
Self {
media_provider,
channel_repo,
schedule_repo,
}
}
// -------------------------------------------------------------------------
// Public API
// -------------------------------------------------------------------------
/// Generate and persist a 48-hour schedule for `channel_id` starting at `from`.
///
/// The algorithm:
/// 1. Walk each calendar day in the 48-hour window.
/// 2. For each `ProgrammingBlock`, compute its UTC wall-clock interval for that day.
/// 3. Clip the interval to `[from, from + 48h)`.
/// 4. Resolve the block content via the media provider, applying the recycle policy.
/// 5. For `Sequential` blocks, resume from where the previous generation left off
/// (series continuity — see `fill::fill_sequential`).
/// 6. Record every played item in the playback history.
///
/// Gaps between blocks are left empty — clients render them as a no-signal state.
pub async fn generate_schedule(
&self,
channel_id: ChannelId,
from: DateTime<Utc>,
) -> DomainResult<GeneratedSchedule> {
let channel = self
.channel_repo
.find_by_id(channel_id)
.await?
.ok_or(DomainError::ChannelNotFound(channel_id))?;
let tz: Tz = channel
.timezone
.parse()
.map_err(|_| DomainError::TimezoneError(channel.timezone.clone()))?;
let history = self
.schedule_repo
.find_playback_history(channel_id)
.await?;
// Load the most recent schedule for two purposes:
// 1. Derive the next generation number.
// 2. Know where each Sequential block left off (series continuity).
let latest_schedule = self.schedule_repo.find_latest(channel_id).await?;
let generation = latest_schedule
.as_ref()
.map(|s| s.generation + 1)
.unwrap_or(1);
// Build the initial per-block continuity map from the previous generation's
// last slot per block. The map is updated as each block occurrence is resolved
// within this generation so that the second day of a 48h schedule continues
// from where the first day ended.
let mut block_continuity: HashMap<BlockId, MediaItemId> = latest_schedule
.iter()
.flat_map(|s| &s.slots)
.fold(HashMap::new(), |mut map, slot| {
// keep only the *last* slot per block (slots are sorted ascending)
map.insert(slot.source_block_id, slot.item.id.clone());
map
});
let valid_from = from;
let valid_until = from + Duration::hours(48);
let start_date = from.with_timezone(&tz).date_naive();
let end_date = valid_until.with_timezone(&tz).date_naive();
let mut slots: Vec<ScheduledSlot> = Vec::new();
let mut current_date = start_date;
while current_date <= end_date {
for block in &channel.schedule_config.blocks {
let naive_start = current_date.and_time(block.start_time);
// `earliest()` handles DST gaps — if the local time doesn't exist
// (e.g. clocks spring forward) we skip this block occurrence.
let block_start_utc = match tz.from_local_datetime(&naive_start).earliest() {
Some(dt) => dt.with_timezone(&Utc),
None => continue,
};
let block_end_utc =
block_start_utc + Duration::minutes(block.duration_mins as i64);
// Clip to the 48-hour window.
let slot_start = block_start_utc.max(valid_from);
let slot_end = block_end_utc.min(valid_until);
if slot_end <= slot_start {
continue;
}
// For Sequential blocks: resume from the last item aired in this block.
let last_item_id = block_continuity.get(&block.id);
let mut block_slots = self
.resolve_block(
block,
slot_start,
slot_end,
&history,
&channel.recycle_policy,
generation,
last_item_id,
)
.await?;
// Update continuity so the next occurrence of this block (same
// generation, next calendar day) continues from here.
if let Some(last_slot) = block_slots.last() {
block_continuity.insert(block.id, last_slot.item.id.clone());
}
slots.append(&mut block_slots);
}
current_date = current_date.succ_opt().ok_or_else(|| {
DomainError::validation("Date overflow during schedule generation")
})?;
}
// Blocks in ScheduleConfig are not required to be sorted; sort resolved slots.
slots.sort_by_key(|s| s.start_at);
let schedule = GeneratedSchedule {
id: Uuid::new_v4(),
channel_id,
valid_from,
valid_until,
generation,
slots,
};
self.schedule_repo.save(&schedule).await?;
// Persist playback history so the recycle policy has data for next generation.
for slot in &schedule.slots {
let record =
PlaybackRecord::new(channel_id, slot.item.id.clone(), generation);
self.schedule_repo.save_playback_record(&record).await?;
}
Ok(schedule)
}
/// Determine what is currently broadcasting on a schedule.
///
/// Returns `None` when `now` falls in a gap between blocks — the client
/// should display a no-signal / static screen in that case.
pub fn get_current_broadcast(
schedule: &GeneratedSchedule,
now: DateTime<Utc>,
) -> Option<CurrentBroadcast> {
schedule
.slots
.iter()
.find(|s| s.start_at <= now && now < s.end_at)
.map(|slot| CurrentBroadcast {
slot: slot.clone(),
offset_secs: (now - slot.start_at).num_seconds() as u32,
})
}
/// Look up the schedule currently active at `at` without generating a new one.
pub async fn get_active_schedule(
&self,
channel_id: ChannelId,
at: DateTime<Utc>,
) -> DomainResult<Option<GeneratedSchedule>> {
self.schedule_repo.find_active(channel_id, at).await
}
/// Delegate stream URL resolution to the configured media provider.
pub async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult<String> {
self.media_provider.get_stream_url(item_id).await
}
/// Return all slots that overlap the given time window — the EPG data.
pub fn get_epg<'a>(
schedule: &'a GeneratedSchedule,
from: DateTime<Utc>,
until: DateTime<Utc>,
) -> Vec<&'a ScheduledSlot> {
schedule
.slots
.iter()
.filter(|s| s.start_at < until && s.end_at > from)
.collect()
}
// -------------------------------------------------------------------------
// Block resolution
// -------------------------------------------------------------------------
async fn resolve_block(
&self,
block: &ProgrammingBlock,
start: DateTime<Utc>,
end: DateTime<Utc>,
history: &[PlaybackRecord],
policy: &RecyclePolicy,
generation: u32,
last_item_id: Option<&MediaItemId>,
) -> DomainResult<Vec<ScheduledSlot>> {
match &block.content {
BlockContent::Manual { items } => {
self.resolve_manual(items, start, end, block.id).await
}
BlockContent::Algorithmic { filter, strategy } => {
self.resolve_algorithmic(
filter, strategy, start, end, history, policy, generation,
block.id, last_item_id,
)
.await
}
}
}
/// Resolve a manual block by fetching each hand-picked item in order.
/// Stops when the block's time budget (`end`) is exhausted.
async fn resolve_manual(
&self,
item_ids: &[MediaItemId],
start: DateTime<Utc>,
end: DateTime<Utc>,
block_id: BlockId,
) -> DomainResult<Vec<ScheduledSlot>> {
let mut slots = Vec::new();
let mut cursor = start;
for item_id in item_ids {
if cursor >= end {
break;
}
if let Some(item) = self.media_provider.fetch_by_id(item_id).await? {
let item_end =
(cursor + Duration::seconds(item.duration_secs as i64)).min(end);
slots.push(ScheduledSlot {
id: Uuid::new_v4(),
start_at: cursor,
end_at: item_end,
item,
source_block_id: block_id,
});
cursor = item_end;
}
// If item is not found (deleted/unavailable), silently skip it.
}
Ok(slots)
}
/// Resolve an algorithmic block: fetch candidates, apply recycle policy,
/// run the fill strategy, and build slots.
///
/// `last_item_id` is the ID of the last item scheduled in this block in the
/// previous generation. Used only by `Sequential` for series continuity.
async fn resolve_algorithmic(
&self,
filter: &MediaFilter,
strategy: &FillStrategy,
start: DateTime<Utc>,
end: DateTime<Utc>,
history: &[PlaybackRecord],
policy: &RecyclePolicy,
generation: u32,
block_id: BlockId,
last_item_id: Option<&MediaItemId>,
) -> DomainResult<Vec<ScheduledSlot>> {
// `candidates` — all items matching the filter, in provider order.
// Kept separate from `pool` so Sequential can rotate through the full
// ordered list while still honouring cooldowns.
let candidates = self.media_provider.fetch_items(filter).await?;
if candidates.is_empty() {
return Ok(vec![]);
}
let pool = recycle::apply_recycle_policy(&candidates, history, policy, generation);
let target_secs = (end - start).num_seconds() as u32;
let selected = fill::fill_block(&candidates, &pool, target_secs, strategy, last_item_id);
let mut slots = Vec::new();
let mut cursor = start;
for item in selected {
if cursor >= end {
break;
}
let item_end =
(cursor + Duration::seconds(item.duration_secs as i64)).min(end);
slots.push(ScheduledSlot {
id: Uuid::new_v4(),
start_at: cursor,
end_at: item_end,
item: item.clone(),
source_block_id: block_id,
});
cursor = item_end;
}
Ok(slots)
}
}

View File

@@ -0,0 +1,55 @@
use std::collections::HashSet;
use chrono::Utc;
use crate::entities::{MediaItem, PlaybackRecord};
use crate::value_objects::{MediaItemId, RecyclePolicy};
/// Filter `candidates` according to `policy`, returning the eligible pool.
///
/// An item is on cooldown if *either* the day-based or generation-based
/// threshold is exceeded. If honouring all cooldowns would leave fewer items
/// than `policy.min_available_ratio` of the total, all cooldowns are waived
/// and the full pool is returned (prevents small libraries from stalling).
pub(super) fn apply_recycle_policy(
candidates: &[MediaItem],
history: &[PlaybackRecord],
policy: &RecyclePolicy,
current_generation: u32,
) -> Vec<MediaItem> {
let now = Utc::now();
let excluded: HashSet<MediaItemId> = history
.iter()
.filter(|record| {
let by_days = policy
.cooldown_days
.map(|days| (now - record.played_at).num_days() < days as i64)
.unwrap_or(false);
let by_gen = policy
.cooldown_generations
.map(|gens| current_generation.saturating_sub(record.generation) < gens)
.unwrap_or(false);
by_days || by_gen
})
.map(|r| r.item_id.clone())
.collect();
let available: Vec<MediaItem> = candidates
.iter()
.filter(|i| !excluded.contains(&i.id))
.cloned()
.collect();
let min_count =
(candidates.len() as f32 * policy.min_available_ratio).ceil() as usize;
if available.len() < min_count {
// Pool too small after applying cooldowns — recycle everything.
candidates.to_vec()
} else {
available
}
}

View File

@@ -0,0 +1,60 @@
use std::sync::Arc;
use uuid::Uuid;
use crate::entities::User;
use crate::errors::{DomainError, DomainResult};
use crate::repositories::UserRepository;
use crate::value_objects::Email;
/// Service for managing users.
pub struct UserService {
user_repository: Arc<dyn UserRepository>,
}
impl UserService {
pub fn new(user_repository: Arc<dyn UserRepository>) -> Self {
Self { user_repository }
}
pub async fn find_or_create(&self, subject: &str, email: &str) -> DomainResult<User> {
if let Some(user) = self.user_repository.find_by_subject(subject).await? {
return Ok(user);
}
if let Some(mut user) = self.user_repository.find_by_email(email).await? {
if user.subject != subject {
user.subject = subject.to_string();
self.user_repository.save(&user).await?;
}
return Ok(user);
}
let email = Email::try_from(email)?;
let user = User::new(subject, email);
self.user_repository.save(&user).await?;
Ok(user)
}
pub async fn find_by_id(&self, id: Uuid) -> DomainResult<User> {
self.user_repository
.find_by_id(id)
.await?
.ok_or(DomainError::UserNotFound(id))
}
pub async fn find_by_email(&self, email: &str) -> DomainResult<Option<User>> {
self.user_repository.find_by_email(email).await
}
pub async fn create_local(
&self,
email: &str,
password_hash: &str,
) -> DomainResult<User> {
let email = Email::try_from(email)?;
let user = User::new_local(email, password_hash);
self.user_repository.save(&user).await?;
Ok(user)
}
}