refactor: split ImageRefPort into ImageRefCommand and ImageRefQuery
This commit is contained in:
@@ -4,17 +4,17 @@ use async_trait::async_trait;
|
|||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
ports::{EventPublisher, ImageRefPort, PeriodicJob},
|
ports::{EventPublisher, ImageRefQuery, PeriodicJob},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct ConversionBackfillJob {
|
pub struct ConversionBackfillJob {
|
||||||
image_ref: Arc<dyn ImageRefPort>,
|
image_ref: Arc<dyn ImageRefQuery>,
|
||||||
event_publisher: Arc<dyn EventPublisher>,
|
event_publisher: Arc<dyn EventPublisher>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConversionBackfillJob {
|
impl ConversionBackfillJob {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
image_ref: Arc<dyn ImageRefPort>,
|
image_ref: Arc<dyn ImageRefQuery>,
|
||||||
event_publisher: Arc<dyn EventPublisher>,
|
event_publisher: Arc<dyn EventPublisher>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { image_ref, event_publisher }
|
Self { image_ref, event_publisher }
|
||||||
@@ -56,8 +56,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ImageRefPort for MockImageRef {
|
impl ImageRefQuery for MockImageRef {
|
||||||
async fn swap(&self, _: &str, _: &str) -> Result<(), DomainError> { Ok(()) }
|
|
||||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||||
Ok(self.keys.clone())
|
Ok(self.keys.clone())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,21 +4,21 @@ use async_trait::async_trait;
|
|||||||
use domain::{
|
use domain::{
|
||||||
errors::DomainError,
|
errors::DomainError,
|
||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
ports::{EventHandler, ImageRefPort, ImageStorage},
|
ports::{EventHandler, ImageRefCommand, ImageStorage},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::Format;
|
use crate::Format;
|
||||||
|
|
||||||
pub struct ImageConversionHandler {
|
pub struct ImageConversionHandler {
|
||||||
storage: Arc<dyn ImageStorage>,
|
storage: Arc<dyn ImageStorage>,
|
||||||
image_ref: Arc<dyn ImageRefPort>,
|
image_ref: Arc<dyn ImageRefCommand>,
|
||||||
format: Format,
|
format: Format,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ImageConversionHandler {
|
impl ImageConversionHandler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
storage: Arc<dyn ImageStorage>,
|
storage: Arc<dyn ImageStorage>,
|
||||||
image_ref: Arc<dyn ImageRefPort>,
|
image_ref: Arc<dyn ImageRefCommand>,
|
||||||
format: Format,
|
format: Format,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { storage, image_ref, format }
|
Self { storage, image_ref, format }
|
||||||
@@ -50,7 +50,7 @@ impl EventHandler for ImageConversionHandler {
|
|||||||
self.storage.store(&new_key, &converted).await?;
|
self.storage.store(&new_key, &converted).await?;
|
||||||
|
|
||||||
if let Err(e) = self.image_ref.swap(&key, &new_key).await {
|
if let Err(e) = self.image_ref.swap(&key, &new_key).await {
|
||||||
tracing::error!("ImageRefPort::swap failed for {key} → {new_key}: {e}");
|
tracing::error!("swap failed for {key} → {new_key}: {e}");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,14 +113,11 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl ImageRefPort for MockImageRef {
|
impl ImageRefCommand for MockImageRef {
|
||||||
async fn swap(&self, old: &str, new: &str) -> Result<(), DomainError> {
|
async fn swap(&self, old: &str, new: &str) -> Result<(), DomainError> {
|
||||||
self.swaps.lock().unwrap().push((old.into(), new.into()));
|
self.swaps.lock().unwrap().push((old.into(), new.into()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
|
||||||
Ok(vec![])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn in_memory_storage() -> Arc<ImageStorageAdapter> {
|
fn in_memory_storage() -> Arc<ImageStorageAdapter> {
|
||||||
@@ -143,7 +140,7 @@ mod tests {
|
|||||||
let image_ref = MockImageRef::new();
|
let image_ref = MockImageRef::new();
|
||||||
let handler = ImageConversionHandler::new(
|
let handler = ImageConversionHandler::new(
|
||||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
Arc::clone(&image_ref) as Arc<dyn ImageRefCommand>,
|
||||||
Format::Avif,
|
Format::Avif,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -161,7 +158,7 @@ mod tests {
|
|||||||
let image_ref = MockImageRef::new();
|
let image_ref = MockImageRef::new();
|
||||||
let handler = ImageConversionHandler::new(
|
let handler = ImageConversionHandler::new(
|
||||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
Arc::clone(&image_ref) as Arc<dyn ImageRefCommand>,
|
||||||
Format::Avif,
|
Format::Avif,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -177,7 +174,7 @@ mod tests {
|
|||||||
let image_ref = MockImageRef::new();
|
let image_ref = MockImageRef::new();
|
||||||
let handler = ImageConversionHandler::new(
|
let handler = ImageConversionHandler::new(
|
||||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
Arc::clone(&image_ref) as Arc<dyn ImageRefCommand>,
|
||||||
Format::Webp,
|
Format::Webp,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -193,7 +190,7 @@ mod tests {
|
|||||||
let image_ref = MockImageRef::new();
|
let image_ref = MockImageRef::new();
|
||||||
let handler = ImageConversionHandler::new(
|
let handler = ImageConversionHandler::new(
|
||||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
Arc::clone(&image_ref) as Arc<dyn ImageRefCommand>,
|
||||||
Format::Avif,
|
Format::Avif,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -211,7 +208,7 @@ mod tests {
|
|||||||
let image_ref = MockImageRef::new();
|
let image_ref = MockImageRef::new();
|
||||||
let handler = ImageConversionHandler::new(
|
let handler = ImageConversionHandler::new(
|
||||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
Arc::clone(&image_ref) as Arc<dyn ImageRefCommand>,
|
||||||
Format::Webp,
|
Format::Webp,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -7,11 +7,12 @@ pub use config::{ConversionConfig, Format};
|
|||||||
pub use handler::ImageConversionHandler;
|
pub use handler::ImageConversionHandler;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use domain::ports::{EventHandler, EventPublisher, ImageRefPort, ImageStorage, PeriodicJob};
|
use domain::ports::{EventHandler, EventPublisher, ImageRefCommand, ImageRefQuery, ImageStorage, PeriodicJob};
|
||||||
|
|
||||||
pub fn build(
|
pub fn build(
|
||||||
image_storage: Arc<dyn ImageStorage>,
|
image_storage: Arc<dyn ImageStorage>,
|
||||||
image_ref: Arc<dyn ImageRefPort>,
|
image_ref_command: Arc<dyn ImageRefCommand>,
|
||||||
|
image_ref_query: Arc<dyn ImageRefQuery>,
|
||||||
event_publisher: Arc<dyn EventPublisher>,
|
event_publisher: Arc<dyn EventPublisher>,
|
||||||
) -> anyhow::Result<Option<(Arc<dyn EventHandler>, Arc<dyn PeriodicJob>)>> {
|
) -> anyhow::Result<Option<(Arc<dyn EventHandler>, Arc<dyn PeriodicJob>)>> {
|
||||||
let config = match ConversionConfig::from_env()? {
|
let config = match ConversionConfig::from_env()? {
|
||||||
@@ -23,12 +24,12 @@ pub fn build(
|
|||||||
|
|
||||||
let handler = Arc::new(ImageConversionHandler::new(
|
let handler = Arc::new(ImageConversionHandler::new(
|
||||||
Arc::clone(&image_storage),
|
Arc::clone(&image_storage),
|
||||||
Arc::clone(&image_ref),
|
image_ref_command,
|
||||||
format,
|
format,
|
||||||
)) as Arc<dyn EventHandler>;
|
)) as Arc<dyn EventHandler>;
|
||||||
|
|
||||||
let job = Arc::new(ConversionBackfillJob::new(
|
let job = Arc::new(ConversionBackfillJob::new(
|
||||||
Arc::clone(&image_ref),
|
image_ref_query,
|
||||||
Arc::clone(&event_publisher),
|
Arc::clone(&event_publisher),
|
||||||
)) as Arc<dyn PeriodicJob>;
|
)) as Arc<dyn PeriodicJob>;
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{errors::DomainError, ports::ImageRefPort};
|
use domain::{errors::DomainError, ports::{ImageRefCommand, ImageRefQuery}};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -13,12 +13,13 @@ impl PostgresImageRefAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_image_ref(pool: PgPool) -> Arc<dyn ImageRefPort> {
|
pub fn create_image_ref(pool: PgPool) -> (Arc<dyn ImageRefCommand>, Arc<dyn ImageRefQuery>) {
|
||||||
Arc::new(PostgresImageRefAdapter::new(pool))
|
let adapter = Arc::new(PostgresImageRefAdapter::new(pool));
|
||||||
|
(Arc::clone(&adapter) as Arc<dyn ImageRefCommand>, adapter as Arc<dyn ImageRefQuery>)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ImageRefPort for PostgresImageRefAdapter {
|
impl ImageRefCommand for PostgresImageRefAdapter {
|
||||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
||||||
let mut tx = self.pool.begin().await
|
let mut tx = self.pool.begin().await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
@@ -33,7 +34,10 @@ impl ImageRefPort for PostgresImageRefAdapter {
|
|||||||
tx.commit().await
|
tx.commit().await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ImageRefQuery for PostgresImageRefAdapter {
|
||||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||||
let rows: Vec<(String,)> = sqlx::query_as(
|
let rows: Vec<(String,)> = sqlx::query_as(
|
||||||
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use domain::{errors::DomainError, ports::ImageRefPort};
|
use domain::{errors::DomainError, ports::{ImageRefCommand, ImageRefQuery}};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -13,12 +13,13 @@ impl SqliteImageRefAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_image_ref(pool: SqlitePool) -> Arc<dyn ImageRefPort> {
|
pub fn create_image_ref(pool: SqlitePool) -> (Arc<dyn ImageRefCommand>, Arc<dyn ImageRefQuery>) {
|
||||||
Arc::new(SqliteImageRefAdapter::new(pool))
|
let adapter = Arc::new(SqliteImageRefAdapter::new(pool));
|
||||||
|
(Arc::clone(&adapter) as Arc<dyn ImageRefCommand>, adapter as Arc<dyn ImageRefQuery>)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ImageRefPort for SqliteImageRefAdapter {
|
impl ImageRefCommand for SqliteImageRefAdapter {
|
||||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
||||||
let mut tx = self.pool.begin().await
|
let mut tx = self.pool.begin().await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||||
@@ -33,7 +34,10 @@ impl ImageRefPort for SqliteImageRefAdapter {
|
|||||||
tx.commit().await
|
tx.commit().await
|
||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ImageRefQuery for SqliteImageRefAdapter {
|
||||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||||
let rows: Vec<(String,)> = sqlx::query_as(
|
let rows: Vec<(String,)> = sqlx::query_as(
|
||||||
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
||||||
|
|||||||
@@ -266,7 +266,11 @@ pub trait ImportProfileRepository: Send + Sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ImageRefPort: Send + Sync {
|
pub trait ImageRefCommand: Send + Sync {
|
||||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>;
|
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ImageRefQuery: Send + Sync {
|
||||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError>;
|
async fn list_keys(&self) -> Result<Vec<String>, DomainError>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,8 +2,9 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use domain::ports::{
|
use domain::ports::{
|
||||||
DiaryRepository, ImageRefPort, ImportProfileRepository, ImportSessionRepository,
|
DiaryRepository, ImageRefCommand, ImageRefQuery, ImportProfileRepository,
|
||||||
MovieProfileRepository, MovieRepository, ReviewRepository, StatsRepository, UserRepository,
|
ImportSessionRepository, MovieProfileRepository, MovieRepository, ReviewRepository,
|
||||||
|
StatsRepository, UserRepository,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub enum DbPool {
|
pub enum DbPool {
|
||||||
@@ -22,7 +23,8 @@ pub struct Repos {
|
|||||||
pub import_session: Arc<dyn ImportSessionRepository>,
|
pub import_session: Arc<dyn ImportSessionRepository>,
|
||||||
pub import_profile: Arc<dyn ImportProfileRepository>,
|
pub import_profile: Arc<dyn ImportProfileRepository>,
|
||||||
pub movie_profile: Arc<dyn MovieProfileRepository>,
|
pub movie_profile: Arc<dyn MovieProfileRepository>,
|
||||||
pub image_ref: Arc<dyn ImageRefPort>,
|
pub image_ref_command: Arc<dyn ImageRefCommand>,
|
||||||
|
pub image_ref_query: Arc<dyn ImageRefQuery>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> {
|
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> {
|
||||||
@@ -31,15 +33,19 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos
|
|||||||
"postgres" => {
|
"postgres" => {
|
||||||
let (pool, m, r, d, s, u, is, ip, mp) =
|
let (pool, m, r, d, s, u, is, ip, mp) =
|
||||||
postgres::wire(database_url).await.context("PostgreSQL connection failed")?;
|
postgres::wire(database_url).await.context("PostgreSQL connection failed")?;
|
||||||
let image_ref = postgres::create_image_ref(pool.clone());
|
let (image_ref_command, image_ref_query) = postgres::create_image_ref(pool.clone());
|
||||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Postgres(pool)))
|
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u,
|
||||||
|
import_session: is, import_profile: ip, movie_profile: mp,
|
||||||
|
image_ref_command, image_ref_query }, DbPool::Postgres(pool)))
|
||||||
}
|
}
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
_ => {
|
_ => {
|
||||||
let (pool, m, r, d, s, u, is, ip, mp) =
|
let (pool, m, r, d, s, u, is, ip, mp) =
|
||||||
sqlite::wire(database_url).await.context("SQLite connection failed")?;
|
sqlite::wire(database_url).await.context("SQLite connection failed")?;
|
||||||
let image_ref = sqlite::create_image_ref(pool.clone());
|
let (image_ref_command, image_ref_query) = sqlite::create_image_ref(pool.clone());
|
||||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Sqlite(pool)))
|
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u,
|
||||||
|
import_session: is, import_profile: ip, movie_profile: mp,
|
||||||
|
image_ref_command, image_ref_query }, DbPool::Sqlite(pool)))
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "sqlite"))]
|
#[cfg(not(feature = "sqlite"))]
|
||||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let (repos, db_pool) = db::connect(&database_url, &backend).await?;
|
let (repos, db_pool) = db::connect(&database_url, &backend).await?;
|
||||||
let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?;
|
let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?;
|
||||||
|
|
||||||
// Save image_ref before ctx consumes repos.
|
let image_ref_command = Arc::clone(&repos.image_ref_command);
|
||||||
let image_ref = Arc::clone(&repos.image_ref);
|
let image_ref_query = Arc::clone(&repos.image_ref_query);
|
||||||
|
|
||||||
// Clone refs federation handler needs before ctx consumes them.
|
// Clone refs federation handler needs before ctx consumes them.
|
||||||
#[cfg(feature = "federation")]
|
#[cfg(feature = "federation")]
|
||||||
@@ -91,7 +91,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let conversion = image_converter::build(
|
let conversion = image_converter::build(
|
||||||
Arc::clone(&ctx.image_storage),
|
Arc::clone(&ctx.image_storage),
|
||||||
image_ref,
|
image_ref_command,
|
||||||
|
image_ref_query,
|
||||||
Arc::clone(&ctx.event_publisher),
|
Arc::clone(&ctx.event_publisher),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user