feat: expand workspace to include libertas_infra and libertas_worker
feat(libertas_api): add dependency on libertas_infra and async-nats refactor(libertas_api): consolidate config loading and add broker_url refactor(libertas_api): integrate NATS client into app state and services feat(libertas_core): introduce config module for database and server settings fix(libertas_core): enhance error handling with detailed messages feat(libertas_infra): create infrastructure layer with database repositories feat(libertas_infra): implement Postgres repositories for media and albums feat(libertas_worker): add worker service to process media jobs via NATS
This commit is contained in:
80
libertas_worker/src/main.rs
Normal file
80
libertas_worker/src/main.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use libertas_core::plugins::PluginContext;
|
||||
use libertas_infra::factory::{
|
||||
build_album_repository, build_database_pool, build_media_repository, build_user_repository,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::config::load_config;
|
||||
|
||||
pub mod config;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct MediaJob {
|
||||
media_id: Uuid,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
println!("Starting libertas worker...");
|
||||
|
||||
let config = load_config()?;
|
||||
|
||||
let db_pool = build_database_pool(&config.database).await?;
|
||||
println!("Worker connected to database.");
|
||||
|
||||
let media_repo = build_media_repository(&config.database, db_pool.clone()).await?;
|
||||
let album_repo = build_album_repository(&config.database, db_pool.clone()).await?;
|
||||
let user_repo = build_user_repository(&config.database, db_pool.clone()).await?;
|
||||
|
||||
// 3. Create the abstracted PluginContext
|
||||
let context = Arc::new(PluginContext {
|
||||
media_repo,
|
||||
album_repo,
|
||||
user_repo,
|
||||
});
|
||||
println!("Plugin context created.");
|
||||
|
||||
let nats_client = async_nats::connect(&config.broker_url).await?;
|
||||
|
||||
println!("Connected to NATS server at {}", config.broker_url);
|
||||
|
||||
let mut subscriber = nats_client
|
||||
.queue_subscribe("media.new", "media_processors".to_string())
|
||||
.await?;
|
||||
println!("Subscribed to 'media.new' queue");
|
||||
|
||||
while let Some(msg) = subscriber.next().await {
|
||||
let context = context.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = process_job(msg, context).await {
|
||||
eprintln!("Job failed: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_job(msg: async_nats::Message, context: Arc<PluginContext>) -> anyhow::Result<()> {
|
||||
let job: MediaJob = serde_json::from_slice(&msg.payload)?;
|
||||
|
||||
let media = context
|
||||
.media_repo
|
||||
.find_by_id(job.media_id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("Media not found: {}", job.media_id))?;
|
||||
|
||||
println!("Processing media: {}", media.original_filename);
|
||||
|
||||
// 3. Pass to the (future) PluginManager
|
||||
// plugin_manager.process(&media, &context).await?;
|
||||
|
||||
// For now, we'll just print a success message
|
||||
println!("Successfully processed job for media_id: {}", media.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user