feat: add local files provider with indexing and rescan functionality

- Implemented LocalFilesProvider to manage local video files.
- Added LocalIndex for in-memory and SQLite-backed indexing of video files.
- Introduced scanning functionality to detect video files and extract metadata.
- Added API endpoints for listing collections, genres, and series based on provider capabilities.
- Enhanced existing routes to check for provider capabilities before processing requests.
- Updated frontend to utilize provider capabilities for conditional rendering of UI elements.
- Implemented rescan functionality to refresh the local files index.
- Added database migration for local files index schema.
This commit is contained in:
2026-03-14 03:44:32 +01:00
parent 9b6bcfc566
commit 8f42164bce
30 changed files with 1033 additions and 59 deletions

View File

@@ -2,7 +2,7 @@ use async_trait::async_trait;
use domain::{
Collection, ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem,
MediaItemId, SeriesSummary,
MediaItemId, ProviderCapabilities, SeriesSummary, StreamingProtocol,
};
use super::config::JellyfinConfig;
@@ -129,6 +129,19 @@ impl JellyfinMediaProvider {
#[async_trait]
impl IMediaProvider for JellyfinMediaProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: true,
series: true,
genres: true,
tags: true,
decade: true,
search: true,
streaming_protocol: StreamingProtocol::Hls,
rescan: false,
}
}
/// Fetch items matching `filter` from the Jellyfin library.
///
/// When `series_names` has more than one entry the results from each series

View File

@@ -21,6 +21,9 @@ mod channel_repository;
mod schedule_repository;
mod user_repository;
#[cfg(feature = "local-files")]
pub mod local_files;
// Re-export for convenience
pub use db::run_migrations;
@@ -33,3 +36,6 @@ pub use schedule_repository::SqliteScheduleRepository;
#[cfg(feature = "jellyfin")]
pub use jellyfin::{JellyfinConfig, JellyfinMediaProvider};
#[cfg(feature = "local-files")]
pub use local_files::{LocalFilesConfig, LocalFilesProvider, LocalIndex, decode_stream_id};

View File

@@ -0,0 +1,9 @@
use std::path::PathBuf;
/// Configuration for the local files media provider.
pub struct LocalFilesConfig {
/// Root directory containing video files. All files are served relative to this.
pub root_dir: PathBuf,
/// Public base URL of this API server, used to build stream URLs.
pub base_url: String,
}

View File

@@ -0,0 +1,182 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::Utc;
use tokio::sync::RwLock;
use tracing::{error, info};
use domain::MediaItemId;
use super::config::LocalFilesConfig;
use super::scanner::{scan_dir, LocalFileItem};
/// Encode a rel-path string into a URL-safe, padding-free base64 MediaItemId.
pub fn encode_id(rel_path: &str) -> MediaItemId {
use base64::Engine as _;
MediaItemId::new(
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(rel_path.as_bytes()),
)
}
/// Decode a MediaItemId back to a relative path string.
pub fn decode_id(id: &MediaItemId) -> Option<String> {
use base64::Engine as _;
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(id.as_ref())
.ok()?;
String::from_utf8(bytes).ok()
}
/// In-memory (+ SQLite-backed) index of local video files.
///
/// On startup the index is populated from the SQLite cache so the provider can
/// serve requests immediately. A background task calls `rescan()` to pick up
/// any changes on disk and write them back to the cache.
pub struct LocalIndex {
items: Arc<RwLock<HashMap<MediaItemId, LocalFileItem>>>,
pub root_dir: PathBuf,
pool: sqlx::SqlitePool,
}
impl LocalIndex {
/// Create the index, immediately loading persisted entries from SQLite.
pub async fn new(config: &LocalFilesConfig, pool: sqlx::SqlitePool) -> Self {
let idx = Self {
items: Arc::new(RwLock::new(HashMap::new())),
root_dir: config.root_dir.clone(),
pool,
};
idx.load_from_db().await;
idx
}
/// Load previously scanned items from SQLite (instant on startup).
async fn load_from_db(&self) {
#[derive(sqlx::FromRow)]
struct Row {
id: String,
rel_path: String,
title: String,
duration_secs: i64,
year: Option<i64>,
tags: String,
top_dir: String,
}
let rows = sqlx::query_as::<_, Row>(
"SELECT id, rel_path, title, duration_secs, year, tags, top_dir FROM local_files_index",
)
.fetch_all(&self.pool)
.await;
match rows {
Ok(rows) => {
let mut map = self.items.write().await;
for row in rows {
let tags: Vec<String> =
serde_json::from_str(&row.tags).unwrap_or_default();
let item = LocalFileItem {
rel_path: row.rel_path,
title: row.title,
duration_secs: row.duration_secs as u32,
year: row.year.map(|y| y as u16),
tags,
top_dir: row.top_dir,
};
map.insert(MediaItemId::new(row.id), item);
}
info!("Local files index: loaded {} items from DB", map.len());
}
Err(e) => {
// Table might not exist yet on first run — that's fine.
tracing::debug!("Could not load local files index from DB: {}", e);
}
}
}
/// Scan the filesystem for video files and rebuild the index.
///
/// Returns the number of items found. Called on startup (background task)
/// and via `POST /files/rescan`.
pub async fn rescan(&self) -> u32 {
info!("Local files: scanning {:?}", self.root_dir);
let new_items = scan_dir(&self.root_dir).await;
let count = new_items.len() as u32;
// Swap in-memory map.
{
let mut map = self.items.write().await;
map.clear();
for item in &new_items {
let id = encode_id(&item.rel_path);
map.insert(id, item.clone());
}
}
// Persist to SQLite.
if let Err(e) = self.save_to_db(&new_items).await {
error!("Failed to persist local files index: {}", e);
}
info!("Local files: indexed {} items", count);
count
}
async fn save_to_db(&self, items: &[LocalFileItem]) -> Result<(), sqlx::Error> {
// Rebuild the table in one transaction.
let mut tx = self.pool.begin().await?;
sqlx::query("DELETE FROM local_files_index")
.execute(&mut *tx)
.await?;
let now = Utc::now().to_rfc3339();
for item in items {
let id = encode_id(&item.rel_path).into_inner();
let tags_json = serde_json::to_string(&item.tags).unwrap_or_else(|_| "[]".into());
sqlx::query(
"INSERT INTO local_files_index \
(id, rel_path, title, duration_secs, year, tags, top_dir, scanned_at) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&item.rel_path)
.bind(&item.title)
.bind(item.duration_secs as i64)
.bind(item.year.map(|y| y as i64))
.bind(&tags_json)
.bind(&item.top_dir)
.bind(&now)
.execute(&mut *tx)
.await?;
}
tx.commit().await
}
pub async fn get(&self, id: &MediaItemId) -> Option<LocalFileItem> {
self.items.read().await.get(id).cloned()
}
pub async fn get_all(&self) -> Vec<(MediaItemId, LocalFileItem)> {
self.items
.read()
.await
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
/// Return unique top-level directories as collection names.
pub async fn collections(&self) -> Vec<String> {
let map = self.items.read().await;
let mut seen = std::collections::HashSet::new();
for item in map.values() {
seen.insert(item.top_dir.clone());
}
let mut dirs: Vec<String> = seen.into_iter().collect();
dirs.sort();
dirs
}
}

View File

@@ -0,0 +1,8 @@
pub mod config;
pub mod index;
pub mod provider;
pub mod scanner;
pub use config::LocalFilesConfig;
pub use index::LocalIndex;
pub use provider::{LocalFilesProvider, decode_stream_id};

View File

@@ -0,0 +1,165 @@
use std::sync::Arc;
use async_trait::async_trait;
use domain::{
Collection, ContentType, DomainError, DomainResult, IMediaProvider, MediaFilter, MediaItem,
MediaItemId, ProviderCapabilities, StreamingProtocol,
};
use super::config::LocalFilesConfig;
use super::index::{LocalIndex, decode_id};
use super::scanner::LocalFileItem;
pub struct LocalFilesProvider {
pub index: Arc<LocalIndex>,
base_url: String,
}
const SHORT_DURATION_SECS: u32 = 1200; // 20 minutes
impl LocalFilesProvider {
pub fn new(index: Arc<LocalIndex>, config: LocalFilesConfig) -> Self {
Self {
index,
base_url: config.base_url.trim_end_matches('/').to_string(),
}
}
}
fn to_media_item(id: MediaItemId, item: &LocalFileItem) -> MediaItem {
let content_type = if item.duration_secs < 1200 {
ContentType::Short
} else {
ContentType::Movie
};
MediaItem {
id,
title: item.title.clone(),
content_type,
duration_secs: item.duration_secs,
description: None,
genres: vec![],
year: item.year,
tags: item.tags.clone(),
series_name: None,
season_number: None,
episode_number: None,
}
}
#[async_trait]
impl IMediaProvider for LocalFilesProvider {
fn capabilities(&self) -> ProviderCapabilities {
ProviderCapabilities {
collections: true,
series: false,
genres: false,
tags: true,
decade: true,
search: true,
streaming_protocol: StreamingProtocol::DirectFile,
rescan: true,
}
}
async fn fetch_items(&self, filter: &MediaFilter) -> DomainResult<Vec<MediaItem>> {
let all = self.index.get_all().await;
let results = all
.into_iter()
.filter_map(|(id, item)| {
// content_type: derive heuristically, then filter
let content_type = if item.duration_secs < SHORT_DURATION_SECS {
ContentType::Short
} else {
ContentType::Movie
};
if let Some(ref ct) = filter.content_type {
if &content_type != ct {
return None;
}
}
// collections: match against top_dir
if !filter.collections.is_empty() && !filter.collections.contains(&item.top_dir) {
return None;
}
// tags: OR — item must have at least one matching tag
if !filter.tags.is_empty() {
let has = filter
.tags
.iter()
.any(|tag| item.tags.iter().any(|t| t.eq_ignore_ascii_case(tag)));
if !has {
return None;
}
}
// decade: year in [decade, decade+9]
if let Some(decade) = filter.decade {
match item.year {
Some(y) if y >= decade && y <= decade + 9 => {}
_ => return None,
}
}
// duration bounds
if let Some(min) = filter.min_duration_secs {
if item.duration_secs < min {
return None;
}
}
if let Some(max) = filter.max_duration_secs {
if item.duration_secs > max {
return None;
}
}
// search_term: case-insensitive substring in title
if let Some(ref q) = filter.search_term {
if !item.title.to_lowercase().contains(&q.to_lowercase()) {
return None;
}
}
Some(to_media_item(id, &item))
})
.collect();
Ok(results)
}
async fn fetch_by_id(&self, item_id: &MediaItemId) -> DomainResult<Option<MediaItem>> {
Ok(self
.index
.get(item_id)
.await
.map(|item| to_media_item(item_id.clone(), &item)))
}
async fn get_stream_url(&self, item_id: &MediaItemId) -> DomainResult<String> {
Ok(format!(
"{}/api/v1/files/stream/{}",
self.base_url,
item_id.as_ref()
))
}
async fn list_collections(&self) -> DomainResult<Vec<Collection>> {
let dirs = self.index.collections().await;
Ok(dirs
.into_iter()
.map(|d| Collection {
id: d.clone(),
name: d,
collection_type: None,
})
.collect())
}
}
/// Decode an encoded ID from a URL path segment to its relative path string.
pub fn decode_stream_id(encoded: &str) -> Option<String> {
decode_id(&MediaItemId::new(encoded))
}

View File

@@ -0,0 +1,164 @@
use std::path::Path;
use tokio::process::Command;
const VIDEO_EXTENSIONS: &[&str] = &["mp4", "mkv", "avi", "mov", "webm", "m4v"];
/// In-memory representation of a scanned local video file.
#[derive(Debug, Clone)]
pub struct LocalFileItem {
/// Relative path from root, with forward slashes (used as the stable ID source).
pub rel_path: String,
pub title: String,
pub duration_secs: u32,
pub year: Option<u16>,
/// Ancestor directory names between root and file (excluding root itself).
pub tags: Vec<String>,
/// First path component under root (used as collection id/name).
pub top_dir: String,
}
/// Walk `root` and return all recognised video files with metadata.
///
/// ffprobe is called for each file to determine duration. Files that cannot be
/// probed are included with `duration_secs = 0` so they still appear in the index.
pub async fn scan_dir(root: &Path) -> Vec<LocalFileItem> {
let mut items = Vec::new();
let walker = walkdir::WalkDir::new(root).follow_links(true);
for entry in walker.into_iter().filter_map(|e| e.ok()) {
if !entry.file_type().is_file() {
continue;
}
let path = entry.path();
let ext = path
.extension()
.and_then(|e| e.to_str())
.map(|e| e.to_lowercase());
let ext = match ext {
Some(ref e) if VIDEO_EXTENSIONS.contains(&e.as_str()) => e.clone(),
_ => continue,
};
let _ = ext; // extension validated, not needed further
let rel = match path.strip_prefix(root) {
Ok(r) => r,
Err(_) => continue,
};
// Normalise to forward-slash string for cross-platform stability.
let rel_path: String = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<_>>()
.join("/");
// Top-level directory under root.
let top_dir = rel
.components()
.next()
.filter(|_| rel.components().count() > 1) // skip if file is at root level
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.unwrap_or_else(|| "__root__".to_string());
// Title: stem with separator chars replaced by spaces.
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
let title = stem.replace(['_', '-', '.'], " ");
let title = title.trim().to_string();
// Year: first 4-digit number starting with 19xx or 20xx in filename or parent dirs.
let search_str = format!(
"{} {}",
stem,
rel.parent()
.and_then(|p| p.to_str())
.unwrap_or("")
);
let year = extract_year(&search_str);
// Tags: ancestor directory components between root and the file.
let tags: Vec<String> = rel
.parent()
.into_iter()
.flat_map(|p| p.components())
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.filter(|s| !s.is_empty())
.collect();
let duration_secs = get_duration(path).await.unwrap_or(0);
items.push(LocalFileItem {
rel_path,
title,
duration_secs,
year,
tags,
top_dir,
});
}
items
}
/// Extract the first plausible 4-digit year (19002099) from `s`.
fn extract_year(s: &str) -> Option<u16> {
let chars: Vec<char> = s.chars().collect();
let n = chars.len();
if n < 4 {
return None;
}
for i in 0..=(n - 4) {
// All four chars must be ASCII digits.
if !chars[i..i + 4].iter().all(|c| c.is_ascii_digit()) {
continue;
}
// Must start with 19 or 20.
let prefix = chars[i] as u8 * 10 + chars[i + 1] as u8 - b'0' * 11;
// Simpler: just parse and range-check.
let s4: String = chars[i..i + 4].iter().collect();
let num: u16 = s4.parse().ok()?;
if !(1900..=2099).contains(&num) {
continue;
}
// Word-boundary: char before and after must not be digits.
let before_ok = i == 0 || !chars[i - 1].is_ascii_digit();
let after_ok = i + 4 >= n || !chars[i + 4].is_ascii_digit();
let _ = prefix;
if before_ok && after_ok {
return Some(num);
}
}
None
}
/// Run ffprobe to get the duration of `path` in whole seconds.
async fn get_duration(path: &Path) -> Option<u32> {
#[derive(serde::Deserialize)]
struct Fmt {
duration: Option<String>,
}
#[derive(serde::Deserialize)]
struct Out {
format: Fmt,
}
let output = Command::new("ffprobe")
.args([
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
path.to_str()?,
])
.output()
.await
.ok()?;
let parsed: Out = serde_json::from_slice(&output.stdout).ok()?;
let dur: f64 = parsed.format.duration?.parse().ok()?;
Some(dur as u32)
}