diff --git a/crates/adapters/image-converter/src/backfill.rs b/crates/adapters/image-converter/src/backfill.rs index ee73212..e18a7de 100644 --- a/crates/adapters/image-converter/src/backfill.rs +++ b/crates/adapters/image-converter/src/backfill.rs @@ -4,17 +4,17 @@ use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, - ports::{EventPublisher, ImageRefPort, PeriodicJob}, + ports::{EventPublisher, ImageRefQuery, PeriodicJob}, }; pub struct ConversionBackfillJob { - image_ref: Arc, + image_ref: Arc, event_publisher: Arc, } impl ConversionBackfillJob { pub fn new( - image_ref: Arc, + image_ref: Arc, event_publisher: Arc, ) -> Self { Self { image_ref, event_publisher } @@ -56,8 +56,7 @@ mod tests { } #[async_trait::async_trait] - impl ImageRefPort for MockImageRef { - async fn swap(&self, _: &str, _: &str) -> Result<(), DomainError> { Ok(()) } + impl ImageRefQuery for MockImageRef { async fn list_keys(&self) -> Result, DomainError> { Ok(self.keys.clone()) } diff --git a/crates/adapters/image-converter/src/handler.rs b/crates/adapters/image-converter/src/handler.rs index ea679e0..ac33a33 100644 --- a/crates/adapters/image-converter/src/handler.rs +++ b/crates/adapters/image-converter/src/handler.rs @@ -4,21 +4,21 @@ use async_trait::async_trait; use domain::{ errors::DomainError, events::DomainEvent, - ports::{EventHandler, ImageRefPort, ImageStorage}, + ports::{EventHandler, ImageRefCommand, ImageStorage}, }; use crate::Format; pub struct ImageConversionHandler { storage: Arc, - image_ref: Arc, + image_ref: Arc, format: Format, } impl ImageConversionHandler { pub fn new( storage: Arc, - image_ref: Arc, + image_ref: Arc, format: Format, ) -> Self { Self { storage, image_ref, format } @@ -50,7 +50,7 @@ impl EventHandler for ImageConversionHandler { self.storage.store(&new_key, &converted).await?; if let Err(e) = self.image_ref.swap(&key, &new_key).await { - tracing::error!("ImageRefPort::swap failed for {key} → {new_key}: {e}"); + tracing::error!("swap failed for {key} → {new_key}: {e}"); return Err(e); } @@ -113,14 +113,11 @@ mod tests { } #[async_trait::async_trait] - impl ImageRefPort for MockImageRef { + impl ImageRefCommand for MockImageRef { async fn swap(&self, old: &str, new: &str) -> Result<(), DomainError> { self.swaps.lock().unwrap().push((old.into(), new.into())); Ok(()) } - async fn list_keys(&self) -> Result, DomainError> { - Ok(vec![]) - } } fn in_memory_storage() -> Arc { @@ -143,7 +140,7 @@ mod tests { let image_ref = MockImageRef::new(); let handler = ImageConversionHandler::new( Arc::clone(&storage) as Arc, - Arc::clone(&image_ref) as Arc, + Arc::clone(&image_ref) as Arc, Format::Avif, ); @@ -161,7 +158,7 @@ mod tests { let image_ref = MockImageRef::new(); let handler = ImageConversionHandler::new( Arc::clone(&storage) as Arc, - Arc::clone(&image_ref) as Arc, + Arc::clone(&image_ref) as Arc, Format::Avif, ); @@ -177,7 +174,7 @@ mod tests { let image_ref = MockImageRef::new(); let handler = ImageConversionHandler::new( Arc::clone(&storage) as Arc, - Arc::clone(&image_ref) as Arc, + Arc::clone(&image_ref) as Arc, Format::Webp, ); @@ -193,7 +190,7 @@ mod tests { let image_ref = MockImageRef::new(); let handler = ImageConversionHandler::new( Arc::clone(&storage) as Arc, - Arc::clone(&image_ref) as Arc, + Arc::clone(&image_ref) as Arc, Format::Avif, ); @@ -211,7 +208,7 @@ mod tests { let image_ref = MockImageRef::new(); let handler = ImageConversionHandler::new( Arc::clone(&storage) as Arc, - Arc::clone(&image_ref) as Arc, + Arc::clone(&image_ref) as Arc, Format::Webp, ); diff --git a/crates/adapters/image-converter/src/lib.rs b/crates/adapters/image-converter/src/lib.rs index 16370da..45b76df 100644 --- a/crates/adapters/image-converter/src/lib.rs +++ b/crates/adapters/image-converter/src/lib.rs @@ -7,11 +7,12 @@ pub use config::{ConversionConfig, Format}; pub use handler::ImageConversionHandler; use std::sync::Arc; -use domain::ports::{EventHandler, EventPublisher, ImageRefPort, ImageStorage, PeriodicJob}; +use domain::ports::{EventHandler, EventPublisher, ImageRefCommand, ImageRefQuery, ImageStorage, PeriodicJob}; pub fn build( image_storage: Arc, - image_ref: Arc, + image_ref_command: Arc, + image_ref_query: Arc, event_publisher: Arc, ) -> anyhow::Result, Arc)>> { let config = match ConversionConfig::from_env()? { @@ -23,12 +24,12 @@ pub fn build( let handler = Arc::new(ImageConversionHandler::new( Arc::clone(&image_storage), - Arc::clone(&image_ref), + image_ref_command, format, )) as Arc; let job = Arc::new(ConversionBackfillJob::new( - Arc::clone(&image_ref), + image_ref_query, Arc::clone(&event_publisher), )) as Arc; diff --git a/crates/adapters/postgres/src/image_ref.rs b/crates/adapters/postgres/src/image_ref.rs index 537c559..fe22af7 100644 --- a/crates/adapters/postgres/src/image_ref.rs +++ b/crates/adapters/postgres/src/image_ref.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use domain::{errors::DomainError, ports::ImageRefPort}; +use domain::{errors::DomainError, ports::{ImageRefCommand, ImageRefQuery}}; use sqlx::PgPool; use std::sync::Arc; @@ -13,12 +13,13 @@ impl PostgresImageRefAdapter { } } -pub fn create_image_ref(pool: PgPool) -> Arc { - Arc::new(PostgresImageRefAdapter::new(pool)) +pub fn create_image_ref(pool: PgPool) -> (Arc, Arc) { + let adapter = Arc::new(PostgresImageRefAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) } #[async_trait] -impl ImageRefPort for PostgresImageRefAdapter { +impl ImageRefCommand for PostgresImageRefAdapter { async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> { let mut tx = self.pool.begin().await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; @@ -33,7 +34,10 @@ impl ImageRefPort for PostgresImageRefAdapter { tx.commit().await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } +} +#[async_trait] +impl ImageRefQuery for PostgresImageRefAdapter { async fn list_keys(&self) -> Result, DomainError> { let rows: Vec<(String,)> = sqlx::query_as( "SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL diff --git a/crates/adapters/sqlite/src/image_ref.rs b/crates/adapters/sqlite/src/image_ref.rs index ce9dce1..b1322cb 100644 --- a/crates/adapters/sqlite/src/image_ref.rs +++ b/crates/adapters/sqlite/src/image_ref.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use domain::{errors::DomainError, ports::ImageRefPort}; +use domain::{errors::DomainError, ports::{ImageRefCommand, ImageRefQuery}}; use sqlx::SqlitePool; use std::sync::Arc; @@ -13,12 +13,13 @@ impl SqliteImageRefAdapter { } } -pub fn create_image_ref(pool: SqlitePool) -> Arc { - Arc::new(SqliteImageRefAdapter::new(pool)) +pub fn create_image_ref(pool: SqlitePool) -> (Arc, Arc) { + let adapter = Arc::new(SqliteImageRefAdapter::new(pool)); + (Arc::clone(&adapter) as Arc, adapter as Arc) } #[async_trait] -impl ImageRefPort for SqliteImageRefAdapter { +impl ImageRefCommand for SqliteImageRefAdapter { async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> { let mut tx = self.pool.begin().await .map_err(|e| DomainError::InfrastructureError(e.to_string()))?; @@ -33,7 +34,10 @@ impl ImageRefPort for SqliteImageRefAdapter { tx.commit().await .map_err(|e| DomainError::InfrastructureError(e.to_string())) } +} +#[async_trait] +impl ImageRefQuery for SqliteImageRefAdapter { async fn list_keys(&self) -> Result, DomainError> { let rows: Vec<(String,)> = sqlx::query_as( "SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 3ce9a38..cbb1f9e 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -266,7 +266,11 @@ pub trait ImportProfileRepository: Send + Sync { } #[async_trait] -pub trait ImageRefPort: Send + Sync { +pub trait ImageRefCommand: Send + Sync { async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>; +} + +#[async_trait] +pub trait ImageRefQuery: Send + Sync { async fn list_keys(&self) -> Result, DomainError>; } diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 01be48a..5639a7b 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use anyhow::Context; use domain::ports::{ - DiaryRepository, ImageRefPort, ImportProfileRepository, ImportSessionRepository, - MovieProfileRepository, MovieRepository, ReviewRepository, StatsRepository, UserRepository, + DiaryRepository, ImageRefCommand, ImageRefQuery, ImportProfileRepository, + ImportSessionRepository, MovieProfileRepository, MovieRepository, ReviewRepository, + StatsRepository, UserRepository, }; pub enum DbPool { @@ -22,7 +23,8 @@ pub struct Repos { pub import_session: Arc, pub import_profile: Arc, pub movie_profile: Arc, - pub image_ref: Arc, + pub image_ref_command: Arc, + pub image_ref_query: Arc, } pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> { @@ -31,15 +33,19 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos "postgres" => { let (pool, m, r, d, s, u, is, ip, mp) = postgres::wire(database_url).await.context("PostgreSQL connection failed")?; - let image_ref = postgres::create_image_ref(pool.clone()); - Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Postgres(pool))) + let (image_ref_command, image_ref_query) = postgres::create_image_ref(pool.clone()); + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, + import_session: is, import_profile: ip, movie_profile: mp, + image_ref_command, image_ref_query }, DbPool::Postgres(pool))) } #[cfg(feature = "sqlite")] _ => { let (pool, m, r, d, s, u, is, ip, mp) = sqlite::wire(database_url).await.context("SQLite connection failed")?; - let image_ref = sqlite::create_image_ref(pool.clone()); - Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Sqlite(pool))) + let (image_ref_command, image_ref_query) = sqlite::create_image_ref(pool.clone()); + Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, + import_session: is, import_profile: ip, movie_profile: mp, + image_ref_command, image_ref_query }, DbPool::Sqlite(pool))) } #[cfg(not(feature = "sqlite"))] _ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"), diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 5b0a32f..f1de06f 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -31,8 +31,8 @@ async fn main() -> anyhow::Result<()> { let (repos, db_pool) = db::connect(&database_url, &backend).await?; let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?; - // Save image_ref before ctx consumes repos. - let image_ref = Arc::clone(&repos.image_ref); + let image_ref_command = Arc::clone(&repos.image_ref_command); + let image_ref_query = Arc::clone(&repos.image_ref_query); // Clone refs federation handler needs before ctx consumes them. #[cfg(feature = "federation")] @@ -91,7 +91,8 @@ async fn main() -> anyhow::Result<()> { let conversion = image_converter::build( Arc::clone(&ctx.image_storage), - image_ref, + image_ref_command, + image_ref_query, Arc::clone(&ctx.event_publisher), )?;