feat: async image conversion service (avif/webp) with backfill
This commit is contained in:
@@ -43,6 +43,9 @@ pub enum EventPayload {
|
||||
movie_id: String,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
ImageStored {
|
||||
key: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl EventPayload {
|
||||
@@ -55,6 +58,7 @@ impl EventPayload {
|
||||
EventPayload::UserUpdated { .. } => "UserUpdated",
|
||||
EventPayload::ReviewDeleted { .. } => "ReviewDeleted",
|
||||
EventPayload::MovieEnrichmentRequested { .. } => "MovieEnrichmentRequested",
|
||||
EventPayload::ImageStored { .. } => "ImageStored",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -114,6 +118,7 @@ impl From<&DomainEvent> for EventPayload {
|
||||
external_metadata_id: external_metadata_id.clone(),
|
||||
}
|
||||
}
|
||||
DomainEvent::ImageStored { key } => EventPayload::ImageStored { key: key.clone() },
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -171,6 +176,9 @@ impl TryFrom<EventPayload> for DomainEvent {
|
||||
external_metadata_id,
|
||||
})
|
||||
}
|
||||
EventPayload::ImageStored { key } => {
|
||||
Ok(DomainEvent::ImageStored { key })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -247,4 +255,20 @@ mod tests {
|
||||
assert_eq!(EventPayload::from(&review_updated()).event_type(), "ReviewUpdated");
|
||||
assert_eq!(EventPayload::from(&movie_discovered()).event_type(), "MovieDiscovered");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_image_stored() {
|
||||
let event = DomainEvent::ImageStored { key: "avatars/abc123".into() };
|
||||
let payload = EventPayload::from(&event);
|
||||
let json = serde_json::to_string(&payload).unwrap();
|
||||
let back: EventPayload = serde_json::from_str(&json).unwrap();
|
||||
let recovered = DomainEvent::try_from(back).unwrap();
|
||||
assert_eq!(EventPayload::from(&event), EventPayload::from(&recovered));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn image_stored_event_type() {
|
||||
let payload = EventPayload::from(&DomainEvent::ImageStored { key: "posters/x".into() });
|
||||
assert_eq!(payload.event_type(), "ImageStored");
|
||||
}
|
||||
}
|
||||
|
||||
19
crates/adapters/image-converter/Cargo.toml
Normal file
19
crates/adapters/image-converter/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "image-converter"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
image = { version = "0.25", default-features = false, features = ["jpeg", "png", "webp"] }
|
||||
ravif = { version = "0.11", default-features = false }
|
||||
webp = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
image-storage = { workspace = true }
|
||||
object_store = "0.11"
|
||||
uuid = { workspace = true }
|
||||
141
crates/adapters/image-converter/src/backfill.rs
Normal file
141
crates/adapters/image-converter/src/backfill.rs
Normal file
@@ -0,0 +1,141 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{EventPublisher, ImageRefPort, PeriodicJob},
|
||||
};
|
||||
|
||||
pub struct ConversionBackfillJob {
|
||||
image_ref: Arc<dyn ImageRefPort>,
|
||||
event_publisher: Arc<dyn EventPublisher>,
|
||||
}
|
||||
|
||||
impl ConversionBackfillJob {
|
||||
pub fn new(
|
||||
image_ref: Arc<dyn ImageRefPort>,
|
||||
event_publisher: Arc<dyn EventPublisher>,
|
||||
) -> Self {
|
||||
Self { image_ref, event_publisher }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PeriodicJob for ConversionBackfillJob {
|
||||
fn interval(&self) -> Duration {
|
||||
Duration::from_secs(60 * 60 * 24) // 24h
|
||||
}
|
||||
|
||||
async fn run(&self) -> Result<(), DomainError> {
|
||||
let keys = self.image_ref.list_keys().await?;
|
||||
|
||||
for key in keys {
|
||||
if key.ends_with(".avif") || key.ends_with(".webp") {
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = self.event_publisher
|
||||
.publish(&DomainEvent::ImageStored { key: key.clone() })
|
||||
.await
|
||||
{
|
||||
tracing::warn!("backfill: failed to emit ImageStored for {key}: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Mutex;
|
||||
|
||||
struct MockImageRef {
|
||||
keys: Vec<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ImageRefPort for MockImageRef {
|
||||
async fn swap(&self, _: &str, _: &str) -> Result<(), DomainError> { Ok(()) }
|
||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||
Ok(self.keys.clone())
|
||||
}
|
||||
}
|
||||
|
||||
struct MockPublisher {
|
||||
emitted: Mutex<Vec<String>>,
|
||||
}
|
||||
|
||||
impl MockPublisher {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self { emitted: Mutex::new(vec![]) })
|
||||
}
|
||||
|
||||
fn emitted(&self) -> Vec<String> {
|
||||
self.emitted.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl EventPublisher for MockPublisher {
|
||||
async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
if let DomainEvent::ImageStored { key } = event {
|
||||
self.emitted.lock().unwrap().push(key.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn emits_image_stored_for_unconverted_keys() {
|
||||
let image_ref = Arc::new(MockImageRef {
|
||||
keys: vec!["avatars/u1".into(), "posters/m1".into()],
|
||||
});
|
||||
let publisher = MockPublisher::new();
|
||||
let job = ConversionBackfillJob::new(
|
||||
image_ref,
|
||||
Arc::clone(&publisher) as Arc<dyn EventPublisher>,
|
||||
);
|
||||
|
||||
job.run().await.unwrap();
|
||||
|
||||
let mut emitted = publisher.emitted();
|
||||
emitted.sort();
|
||||
assert_eq!(emitted, vec!["avatars/u1", "posters/m1"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skips_already_converted_keys() {
|
||||
let image_ref = Arc::new(MockImageRef {
|
||||
keys: vec![
|
||||
"avatars/u1.avif".into(),
|
||||
"posters/m1".into(),
|
||||
"avatars/u2.webp".into(),
|
||||
],
|
||||
});
|
||||
let publisher = MockPublisher::new();
|
||||
let job = ConversionBackfillJob::new(
|
||||
image_ref,
|
||||
Arc::clone(&publisher) as Arc<dyn EventPublisher>,
|
||||
);
|
||||
|
||||
job.run().await.unwrap();
|
||||
|
||||
assert_eq!(publisher.emitted(), vec!["posters/m1"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_keys_emits_nothing() {
|
||||
let image_ref = Arc::new(MockImageRef { keys: vec![] });
|
||||
let publisher = MockPublisher::new();
|
||||
let job = ConversionBackfillJob::new(
|
||||
image_ref,
|
||||
Arc::clone(&publisher) as Arc<dyn EventPublisher>,
|
||||
);
|
||||
|
||||
job.run().await.unwrap();
|
||||
|
||||
assert!(publisher.emitted().is_empty());
|
||||
}
|
||||
}
|
||||
90
crates/adapters/image-converter/src/config.rs
Normal file
90
crates/adapters/image-converter/src/config.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum Format {
|
||||
Avif,
|
||||
Webp,
|
||||
}
|
||||
|
||||
impl Format {
|
||||
pub fn extension(self) -> &'static str {
|
||||
match self {
|
||||
Format::Avif => ".avif",
|
||||
Format::Webp => ".webp",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConversionConfig {
|
||||
pub format: Format,
|
||||
}
|
||||
|
||||
impl ConversionConfig {
|
||||
pub fn from_env() -> anyhow::Result<Option<Self>> {
|
||||
Self::from_vars(
|
||||
std::env::var("IMAGE_CONVERSION_ENABLED").ok().as_deref(),
|
||||
std::env::var("IMAGE_CONVERSION_FORMAT").ok().as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
fn from_vars(enabled: Option<&str>, format: Option<&str>) -> anyhow::Result<Option<Self>> {
|
||||
if enabled != Some("true") {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let format_str = format.ok_or_else(|| {
|
||||
anyhow::anyhow!("IMAGE_CONVERSION_FORMAT required when IMAGE_CONVERSION_ENABLED=true")
|
||||
})?;
|
||||
|
||||
let format = match format_str {
|
||||
"avif" => Format::Avif,
|
||||
"webp" => Format::Webp,
|
||||
other => anyhow::bail!(
|
||||
"Unknown IMAGE_CONVERSION_FORMAT: {other:?}. Valid values: avif, webp"
|
||||
),
|
||||
};
|
||||
|
||||
Ok(Some(Self { format }))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn disabled_by_default() {
|
||||
assert!(ConversionConfig::from_vars(None, None).unwrap().is_none());
|
||||
assert!(ConversionConfig::from_vars(Some("false"), None).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enabled_avif() {
|
||||
let cfg = ConversionConfig::from_vars(Some("true"), Some("avif")).unwrap().unwrap();
|
||||
assert_eq!(cfg.format, Format::Avif);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn enabled_webp() {
|
||||
let cfg = ConversionConfig::from_vars(Some("true"), Some("webp")).unwrap().unwrap();
|
||||
assert_eq!(cfg.format, Format::Webp);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_format_is_error() {
|
||||
assert!(ConversionConfig::from_vars(Some("true"), Some("gif")).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_format_when_enabled_is_error() {
|
||||
assert!(ConversionConfig::from_vars(Some("true"), None).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn avif_extension() {
|
||||
assert_eq!(Format::Avif.extension(), ".avif");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn webp_extension() {
|
||||
assert_eq!(Format::Webp.extension(), ".webp");
|
||||
}
|
||||
}
|
||||
224
crates/adapters/image-converter/src/handler.rs
Normal file
224
crates/adapters/image-converter/src/handler.rs
Normal file
@@ -0,0 +1,224 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{EventHandler, ImageRefPort, ImageStorage},
|
||||
};
|
||||
|
||||
use crate::Format;
|
||||
|
||||
pub struct ImageConversionHandler {
|
||||
storage: Arc<dyn ImageStorage>,
|
||||
image_ref: Arc<dyn ImageRefPort>,
|
||||
format: Format,
|
||||
}
|
||||
|
||||
impl ImageConversionHandler {
|
||||
pub fn new(
|
||||
storage: Arc<dyn ImageStorage>,
|
||||
image_ref: Arc<dyn ImageRefPort>,
|
||||
format: Format,
|
||||
) -> Self {
|
||||
Self { storage, image_ref, format }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for ImageConversionHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let key = match event {
|
||||
DomainEvent::ImageStored { key } => key.clone(),
|
||||
_ => return Ok(()),
|
||||
};
|
||||
|
||||
if key.ends_with(".avif") || key.ends_with(".webp") {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let bytes = self.storage.get(&key).await?;
|
||||
let format = self.format;
|
||||
|
||||
let converted = tokio::task::spawn_blocking(move || convert(bytes, format))
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?
|
||||
.map_err(|e| DomainError::InfrastructureError(e))?;
|
||||
|
||||
let ext = format.extension();
|
||||
let new_key = format!("{key}{ext}");
|
||||
self.storage.store(&new_key, &converted).await?;
|
||||
|
||||
if let Err(e) = self.image_ref.swap(&key, &new_key).await {
|
||||
tracing::error!("ImageRefPort::swap failed for {key} → {new_key}: {e}");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
if let Err(e) = self.storage.delete(&key).await {
|
||||
tracing::warn!("failed to delete old image key {key}: {e}");
|
||||
}
|
||||
|
||||
tracing::info!("converted {key} → {new_key}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn convert(bytes: Vec<u8>, format: Format) -> Result<Vec<u8>, String> {
|
||||
let img = image::load_from_memory(&bytes).map_err(|e| e.to_string())?;
|
||||
|
||||
match format {
|
||||
Format::Avif => {
|
||||
let rgba = img.to_rgba8();
|
||||
let width = rgba.width() as usize;
|
||||
let height = rgba.height() as usize;
|
||||
let pixels: Vec<ravif::RGBA8> = rgba
|
||||
.pixels()
|
||||
.map(|p| ravif::RGBA8 { r: p.0[0], g: p.0[1], b: p.0[2], a: p.0[3] })
|
||||
.collect();
|
||||
let result = ravif::Encoder::new()
|
||||
.with_quality(80.0)
|
||||
.with_speed(6)
|
||||
.encode_rgba(ravif::Img::new(&pixels, width, height))
|
||||
.map_err(|e| e.to_string())?;
|
||||
Ok(result.avif_file.to_vec())
|
||||
}
|
||||
Format::Webp => {
|
||||
let rgba = img.to_rgba8();
|
||||
let (width, height) = (rgba.width(), rgba.height());
|
||||
let encoder = webp::Encoder::from_rgba(rgba.as_raw(), width, height);
|
||||
Ok(encoder.encode(80.0).to_vec())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Mutex;
|
||||
use object_store::memory::InMemory;
|
||||
use image_storage::ImageStorageAdapter;
|
||||
|
||||
struct MockImageRef {
|
||||
swaps: Mutex<Vec<(String, String)>>,
|
||||
}
|
||||
|
||||
impl MockImageRef {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self { swaps: Mutex::new(vec![]) })
|
||||
}
|
||||
|
||||
fn swaps(&self) -> Vec<(String, String)> {
|
||||
self.swaps.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ImageRefPort for MockImageRef {
|
||||
async fn swap(&self, old: &str, new: &str) -> Result<(), DomainError> {
|
||||
self.swaps.lock().unwrap().push((old.into(), new.into()));
|
||||
Ok(())
|
||||
}
|
||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
fn in_memory_storage() -> Arc<ImageStorageAdapter> {
|
||||
Arc::new(ImageStorageAdapter::new(Arc::new(InMemory::new())))
|
||||
}
|
||||
|
||||
fn tiny_jpeg() -> Vec<u8> {
|
||||
use image::{DynamicImage, ImageBuffer, Rgb};
|
||||
let img = DynamicImage::ImageRgb8(
|
||||
ImageBuffer::from_pixel(4, 4, Rgb([200u8, 100, 50])),
|
||||
);
|
||||
let mut buf = std::io::Cursor::new(Vec::new());
|
||||
img.write_to(&mut buf, image::ImageFormat::Jpeg).unwrap();
|
||||
buf.into_inner()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ignores_non_image_stored_events() {
|
||||
let storage = in_memory_storage();
|
||||
let image_ref = MockImageRef::new();
|
||||
let handler = ImageConversionHandler::new(
|
||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
||||
Format::Avif,
|
||||
);
|
||||
|
||||
handler.handle(&DomainEvent::UserUpdated {
|
||||
user_id: domain::value_objects::UserId::from_uuid(uuid::Uuid::new_v4()),
|
||||
}).await.unwrap();
|
||||
|
||||
assert!(image_ref.swaps().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skips_already_converted_avif_key() {
|
||||
let storage = in_memory_storage();
|
||||
storage.store("avatars/u1.avif", &tiny_jpeg()).await.unwrap();
|
||||
let image_ref = MockImageRef::new();
|
||||
let handler = ImageConversionHandler::new(
|
||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
||||
Format::Avif,
|
||||
);
|
||||
|
||||
handler.handle(&DomainEvent::ImageStored { key: "avatars/u1.avif".into() }).await.unwrap();
|
||||
|
||||
assert!(image_ref.swaps().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skips_already_converted_webp_key() {
|
||||
let storage = in_memory_storage();
|
||||
storage.store("posters/m1.webp", &tiny_jpeg()).await.unwrap();
|
||||
let image_ref = MockImageRef::new();
|
||||
let handler = ImageConversionHandler::new(
|
||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
||||
Format::Webp,
|
||||
);
|
||||
|
||||
handler.handle(&DomainEvent::ImageStored { key: "posters/m1.webp".into() }).await.unwrap();
|
||||
|
||||
assert!(image_ref.swaps().is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn converts_jpeg_to_avif_and_swaps_key() {
|
||||
let storage = in_memory_storage();
|
||||
storage.store("avatars/u1", &tiny_jpeg()).await.unwrap();
|
||||
let image_ref = MockImageRef::new();
|
||||
let handler = ImageConversionHandler::new(
|
||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
||||
Format::Avif,
|
||||
);
|
||||
|
||||
handler.handle(&DomainEvent::ImageStored { key: "avatars/u1".into() }).await.unwrap();
|
||||
|
||||
assert_eq!(image_ref.swaps(), vec![("avatars/u1".into(), "avatars/u1.avif".into())]);
|
||||
assert!(storage.get("avatars/u1.avif").await.is_ok());
|
||||
assert!(storage.get("avatars/u1").await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn converts_jpeg_to_webp_and_swaps_key() {
|
||||
let storage = in_memory_storage();
|
||||
storage.store("avatars/u1", &tiny_jpeg()).await.unwrap();
|
||||
let image_ref = MockImageRef::new();
|
||||
let handler = ImageConversionHandler::new(
|
||||
Arc::clone(&storage) as Arc<dyn ImageStorage>,
|
||||
Arc::clone(&image_ref) as Arc<dyn ImageRefPort>,
|
||||
Format::Webp,
|
||||
);
|
||||
|
||||
handler.handle(&DomainEvent::ImageStored { key: "avatars/u1".into() }).await.unwrap();
|
||||
|
||||
assert_eq!(image_ref.swaps(), vec![("avatars/u1".into(), "avatars/u1.webp".into())]);
|
||||
assert!(storage.get("avatars/u1.webp").await.is_ok());
|
||||
assert!(storage.get("avatars/u1").await.is_err());
|
||||
}
|
||||
}
|
||||
36
crates/adapters/image-converter/src/lib.rs
Normal file
36
crates/adapters/image-converter/src/lib.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
mod backfill;
|
||||
mod config;
|
||||
mod handler;
|
||||
|
||||
pub use backfill::ConversionBackfillJob;
|
||||
pub use config::{ConversionConfig, Format};
|
||||
pub use handler::ImageConversionHandler;
|
||||
|
||||
use std::sync::Arc;
|
||||
use domain::ports::{EventHandler, EventPublisher, ImageRefPort, ImageStorage, PeriodicJob};
|
||||
|
||||
pub fn build(
|
||||
image_storage: Arc<dyn ImageStorage>,
|
||||
image_ref: Arc<dyn ImageRefPort>,
|
||||
event_publisher: Arc<dyn EventPublisher>,
|
||||
) -> anyhow::Result<Option<(Arc<dyn EventHandler>, Arc<dyn PeriodicJob>)>> {
|
||||
let config = match ConversionConfig::from_env()? {
|
||||
Some(c) => c,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let format = config.format;
|
||||
|
||||
let handler = Arc::new(ImageConversionHandler::new(
|
||||
Arc::clone(&image_storage),
|
||||
Arc::clone(&image_ref),
|
||||
format,
|
||||
)) as Arc<dyn EventHandler>;
|
||||
|
||||
let job = Arc::new(ConversionBackfillJob::new(
|
||||
Arc::clone(&image_ref),
|
||||
Arc::clone(&event_publisher),
|
||||
)) as Arc<dyn PeriodicJob>;
|
||||
|
||||
Ok(Some((handler, job)))
|
||||
}
|
||||
@@ -9,6 +9,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String {
|
||||
DomainEvent::MovieDeleted { .. } => "movie.deleted",
|
||||
DomainEvent::UserUpdated { .. } => "user.updated",
|
||||
DomainEvent::MovieEnrichmentRequested { .. } => "movie.enrichment.requested",
|
||||
DomainEvent::ImageStored { .. } => "image.stored",
|
||||
};
|
||||
format!("{prefix}.{suffix}")
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{EventHandler, ImageStorage, MetadataClient, MovieRepository, PosterFetcherClient},
|
||||
ports::{EventHandler, EventPublisher, ImageStorage, MetadataClient, MovieRepository, PosterFetcherClient},
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath},
|
||||
};
|
||||
|
||||
@@ -13,6 +13,7 @@ pub struct PosterSyncHandler {
|
||||
metadata_client: Arc<dyn MetadataClient>,
|
||||
poster_fetcher: Arc<dyn PosterFetcherClient>,
|
||||
image_storage: Arc<dyn ImageStorage>,
|
||||
event_publisher: Arc<dyn EventPublisher>,
|
||||
max_retries: u32,
|
||||
}
|
||||
|
||||
@@ -22,9 +23,10 @@ impl PosterSyncHandler {
|
||||
metadata_client: Arc<dyn MetadataClient>,
|
||||
poster_fetcher: Arc<dyn PosterFetcherClient>,
|
||||
image_storage: Arc<dyn ImageStorage>,
|
||||
event_publisher: Arc<dyn EventPublisher>,
|
||||
max_retries: u32,
|
||||
) -> Self {
|
||||
Self { movie_repository, metadata_client, poster_fetcher, image_storage, max_retries }
|
||||
Self { movie_repository, metadata_client, poster_fetcher, image_storage, event_publisher, max_retries }
|
||||
}
|
||||
|
||||
async fn sync(&self, movie_id: MovieId, external_metadata_id: ExternalMetadataId) -> Result<(), DomainError> {
|
||||
@@ -47,6 +49,12 @@ impl PosterSyncHandler {
|
||||
|
||||
let image_bytes = self.poster_fetcher.fetch_poster_bytes(&poster_url).await?;
|
||||
let stored_path = self.image_storage.store(&movie_id.value().to_string(), &image_bytes).await?;
|
||||
if let Err(e) = self.event_publisher
|
||||
.publish(&DomainEvent::ImageStored { key: stored_path.clone() })
|
||||
.await
|
||||
{
|
||||
tracing::warn!("failed to emit ImageStored for {stored_path}: {e}");
|
||||
}
|
||||
let poster_path = PosterPath::new(stored_path)?;
|
||||
|
||||
movie.update_poster(poster_path);
|
||||
|
||||
48
crates/adapters/postgres/src/image_ref.rs
Normal file
48
crates/adapters/postgres/src/image_ref.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, ports::ImageRefPort};
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct PostgresImageRefAdapter {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresImageRefAdapter {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_image_ref(pool: PgPool) -> Arc<dyn ImageRefPort> {
|
||||
Arc::new(PostgresImageRefAdapter::new(pool))
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ImageRefPort for PostgresImageRefAdapter {
|
||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
||||
let mut tx = self.pool.begin().await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
sqlx::query("UPDATE users SET avatar_path = $1 WHERE avatar_path = $2")
|
||||
.bind(new_key).bind(old_key)
|
||||
.execute(&mut *tx).await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
sqlx::query("UPDATE movies SET poster_path = $1 WHERE poster_path = $2")
|
||||
.bind(new_key).bind(old_key)
|
||||
.execute(&mut *tx).await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
tx.commit().await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
}
|
||||
|
||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||
let rows: Vec<(String,)> = sqlx::query_as(
|
||||
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
||||
UNION
|
||||
SELECT poster_path FROM movies WHERE poster_path IS NOT NULL",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
Ok(rows.into_iter().map(|(k,)| k).collect())
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ use domain::{
|
||||
};
|
||||
use sqlx::PgPool;
|
||||
|
||||
mod image_ref;
|
||||
mod import_profile;
|
||||
mod import_session;
|
||||
mod models;
|
||||
@@ -23,6 +24,7 @@ use models::{
|
||||
UserTotalsRow, datetime_to_str,
|
||||
};
|
||||
|
||||
pub use image_ref::{PostgresImageRefAdapter, create_image_ref};
|
||||
pub use import_profile::PostgresImportProfileRepository;
|
||||
pub use import_session::PostgresImportSessionRepository;
|
||||
pub use profile::PostgresMovieProfileRepository;
|
||||
|
||||
155
crates/adapters/sqlite/src/image_ref.rs
Normal file
155
crates/adapters/sqlite/src/image_ref.rs
Normal file
@@ -0,0 +1,155 @@
|
||||
use async_trait::async_trait;
|
||||
use domain::{errors::DomainError, ports::ImageRefPort};
|
||||
use sqlx::SqlitePool;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct SqliteImageRefAdapter {
|
||||
pool: SqlitePool,
|
||||
}
|
||||
|
||||
impl SqliteImageRefAdapter {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_image_ref(pool: SqlitePool) -> Arc<dyn ImageRefPort> {
|
||||
Arc::new(SqliteImageRefAdapter::new(pool))
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ImageRefPort for SqliteImageRefAdapter {
|
||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError> {
|
||||
let mut tx = self.pool.begin().await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
sqlx::query("UPDATE users SET avatar_path = ? WHERE avatar_path = ?")
|
||||
.bind(new_key).bind(old_key)
|
||||
.execute(&mut *tx).await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
sqlx::query("UPDATE movies SET poster_path = ? WHERE poster_path = ?")
|
||||
.bind(new_key).bind(old_key)
|
||||
.execute(&mut *tx).await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
tx.commit().await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
}
|
||||
|
||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError> {
|
||||
let rows: Vec<(String,)> = sqlx::query_as(
|
||||
"SELECT avatar_path FROM users WHERE avatar_path IS NOT NULL
|
||||
UNION
|
||||
SELECT poster_path FROM movies WHERE poster_path IS NOT NULL",
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
Ok(rows.into_iter().map(|(k,)| k).collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
async fn setup(pool: &SqlitePool) {
|
||||
sqlx::query(
|
||||
"CREATE TABLE IF NOT EXISTS users (
|
||||
id TEXT PRIMARY KEY,
|
||||
email TEXT NOT NULL,
|
||||
username TEXT NOT NULL,
|
||||
password_hash TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
role TEXT NOT NULL DEFAULT 'standard',
|
||||
bio TEXT,
|
||||
avatar_path TEXT
|
||||
)",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sqlx::query(
|
||||
"CREATE TABLE IF NOT EXISTS movies (
|
||||
id TEXT PRIMARY KEY,
|
||||
external_metadata_id TEXT,
|
||||
title TEXT NOT NULL,
|
||||
release_year INTEGER,
|
||||
director TEXT,
|
||||
poster_path TEXT
|
||||
)",
|
||||
)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_keys_returns_both_avatar_and_poster_paths() {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
setup(&pool).await;
|
||||
|
||||
sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,'avatars/u1')")
|
||||
.execute(&pool).await.unwrap();
|
||||
sqlx::query("INSERT INTO movies VALUES ('m1','tt1','Title',2020,'Dir','posters/m1')")
|
||||
.execute(&pool).await.unwrap();
|
||||
|
||||
let adapter = SqliteImageRefAdapter::new(pool);
|
||||
let mut keys = adapter.list_keys().await.unwrap();
|
||||
keys.sort();
|
||||
|
||||
assert_eq!(keys, vec!["avatars/u1", "posters/m1"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_keys_excludes_nulls() {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
setup(&pool).await;
|
||||
|
||||
sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,NULL)")
|
||||
.execute(&pool).await.unwrap();
|
||||
|
||||
let adapter = SqliteImageRefAdapter::new(pool);
|
||||
assert_eq!(adapter.list_keys().await.unwrap(), Vec::<String>::new());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn swap_updates_avatar_path() {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
setup(&pool).await;
|
||||
|
||||
sqlx::query("INSERT INTO users VALUES ('u1','e@e.com','u','h','2024-01-01','standard',NULL,'avatars/u1')")
|
||||
.execute(&pool).await.unwrap();
|
||||
|
||||
let adapter = SqliteImageRefAdapter::new(pool.clone());
|
||||
adapter.swap("avatars/u1", "avatars/u1.avif").await.unwrap();
|
||||
|
||||
let row: (Option<String>,) = sqlx::query_as("SELECT avatar_path FROM users WHERE id='u1'")
|
||||
.fetch_one(&pool).await.unwrap();
|
||||
assert_eq!(row.0.as_deref(), Some("avatars/u1.avif"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn swap_updates_poster_path() {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
setup(&pool).await;
|
||||
|
||||
sqlx::query("INSERT INTO movies VALUES ('m1','tt1','Title',2020,'Dir','posters/m1')")
|
||||
.execute(&pool).await.unwrap();
|
||||
|
||||
let adapter = SqliteImageRefAdapter::new(pool.clone());
|
||||
adapter.swap("posters/m1", "posters/m1.avif").await.unwrap();
|
||||
|
||||
let row: (Option<String>,) = sqlx::query_as("SELECT poster_path FROM movies WHERE id='m1'")
|
||||
.fetch_one(&pool).await.unwrap();
|
||||
assert_eq!(row.0.as_deref(), Some("posters/m1.avif"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn swap_noop_when_key_not_found() {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
setup(&pool).await;
|
||||
|
||||
let adapter = SqliteImageRefAdapter::new(pool);
|
||||
adapter.swap("missing/key", "missing/key.avif").await.unwrap();
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ use domain::{
|
||||
};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
mod image_ref;
|
||||
mod import_profile;
|
||||
mod import_session;
|
||||
mod migrations;
|
||||
@@ -24,6 +25,7 @@ use models::{
|
||||
UserTotalsRow, datetime_to_str,
|
||||
};
|
||||
|
||||
pub use image_ref::{SqliteImageRefAdapter, create_image_ref};
|
||||
pub use import_profile::SqliteImportProfileRepository;
|
||||
pub use import_session::SqliteImportSessionRepository;
|
||||
pub use profile::SqliteMovieProfileRepository;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
value_objects::{ExternalMetadataId, MovieId, PosterPath},
|
||||
};
|
||||
|
||||
@@ -39,6 +40,14 @@ pub async fn execute(ctx: &AppContext, cmd: SyncPosterCommand) -> Result<(), Dom
|
||||
.image_storage
|
||||
.store(&movie_id.value().to_string(), &image_bytes)
|
||||
.await?;
|
||||
|
||||
if let Err(e) = ctx.event_publisher
|
||||
.publish(&DomainEvent::ImageStored { key: stored_path.clone() })
|
||||
.await
|
||||
{
|
||||
tracing::warn!("failed to emit ImageStored for {stored_path}: {e}");
|
||||
}
|
||||
|
||||
let poster_path = PosterPath::new(stored_path)?;
|
||||
|
||||
movie.update_poster(poster_path);
|
||||
|
||||
@@ -27,6 +27,14 @@ pub async fn execute(ctx: &AppContext, cmd: UpdateProfileCommand) -> Result<(),
|
||||
}
|
||||
let key = format!("avatars/{}", user_id.value());
|
||||
let stored = ctx.image_storage.store(&key, &bytes).await?;
|
||||
|
||||
if let Err(e) = ctx.event_publisher
|
||||
.publish(&DomainEvent::ImageStored { key: stored.clone() })
|
||||
.await
|
||||
{
|
||||
tracing::warn!("failed to emit ImageStored for {stored}: {e}");
|
||||
}
|
||||
|
||||
Some(stored)
|
||||
} else {
|
||||
user.avatar_path().map(|s| s.to_string())
|
||||
|
||||
@@ -97,6 +97,7 @@ mod tests {
|
||||
DomainEvent::MovieDeleted { .. } => "movie_deleted",
|
||||
DomainEvent::UserUpdated { .. } => "user_updated",
|
||||
DomainEvent::MovieEnrichmentRequested { .. } => "movie_enrichment_requested",
|
||||
DomainEvent::ImageStored { .. } => "image_stored",
|
||||
};
|
||||
self.calls.lock().unwrap().push(label);
|
||||
Ok(())
|
||||
|
||||
@@ -41,6 +41,9 @@ pub enum DomainEvent {
|
||||
movie_id: MovieId,
|
||||
external_metadata_id: String,
|
||||
},
|
||||
ImageStored {
|
||||
key: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -264,3 +264,9 @@ pub trait ImportProfileRepository: Send + Sync {
|
||||
async fn get(&self, id: &ImportProfileId, user_id: &UserId) -> Result<Option<ImportProfile>, DomainError>;
|
||||
async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ImageRefPort: Send + Sync {
|
||||
async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>;
|
||||
async fn list_keys(&self) -> Result<Vec<String>, DomainError>;
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ poster-sync = { workspace = true }
|
||||
export = { workspace = true }
|
||||
tmdb-enrichment = { workspace = true }
|
||||
importer = { workspace = true }
|
||||
image-converter = { workspace = true }
|
||||
nats = { workspace = true, optional = true }
|
||||
sqlx = { workspace = true }
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use domain::ports::{
|
||||
DiaryRepository, ImportProfileRepository, ImportSessionRepository, MovieProfileRepository,
|
||||
MovieRepository, ReviewRepository, StatsRepository, UserRepository,
|
||||
DiaryRepository, ImageRefPort, ImportProfileRepository, ImportSessionRepository,
|
||||
MovieProfileRepository, MovieRepository, ReviewRepository, StatsRepository, UserRepository,
|
||||
};
|
||||
|
||||
pub enum DbPool {
|
||||
@@ -22,6 +22,7 @@ pub struct Repos {
|
||||
pub import_session: Arc<dyn ImportSessionRepository>,
|
||||
pub import_profile: Arc<dyn ImportProfileRepository>,
|
||||
pub movie_profile: Arc<dyn MovieProfileRepository>,
|
||||
pub image_ref: Arc<dyn ImageRefPort>,
|
||||
}
|
||||
|
||||
pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> {
|
||||
@@ -30,13 +31,15 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos
|
||||
"postgres" => {
|
||||
let (pool, m, r, d, s, u, is, ip, mp) =
|
||||
postgres::wire(database_url).await.context("PostgreSQL connection failed")?;
|
||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Postgres(pool)))
|
||||
let image_ref = postgres::create_image_ref(pool.clone());
|
||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Postgres(pool)))
|
||||
}
|
||||
#[cfg(feature = "sqlite")]
|
||||
_ => {
|
||||
let (pool, m, r, d, s, u, is, ip, mp) =
|
||||
sqlite::wire(database_url).await.context("SQLite connection failed")?;
|
||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp }, DbPool::Sqlite(pool)))
|
||||
let image_ref = sqlite::create_image_ref(pool.clone());
|
||||
Ok((Repos { movie: m, review: r, diary: d, stats: s, user: u, import_session: is, import_profile: ip, movie_profile: mp, image_ref }, DbPool::Sqlite(pool)))
|
||||
}
|
||||
#[cfg(not(feature = "sqlite"))]
|
||||
_ => anyhow::bail!("DATABASE_BACKEND={backend} is not supported by this build"),
|
||||
|
||||
@@ -31,6 +31,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
let (repos, db_pool) = db::connect(&database_url, &backend).await?;
|
||||
let (event_publisher_arc, consumer_arc) = event_bus::create(&db_pool).await?;
|
||||
|
||||
// Save image_ref before ctx consumes repos.
|
||||
let image_ref = Arc::clone(&repos.image_ref);
|
||||
|
||||
// Clone refs federation handler needs before ctx consumes them.
|
||||
#[cfg(feature = "federation")]
|
||||
let (fed_movie_repo, fed_review_repo, fed_diary_repo, fed_user_repo, base_url, allow_registration) = (
|
||||
@@ -84,12 +87,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
// ── Image conversion ──────────────────────────────────────────────────────
|
||||
|
||||
let conversion = image_converter::build(
|
||||
Arc::clone(&ctx.image_storage),
|
||||
image_ref,
|
||||
Arc::clone(&ctx.event_publisher),
|
||||
)?;
|
||||
|
||||
// ── Periodic jobs ─────────────────────────────────────────────────────────
|
||||
|
||||
let mut periodic_jobs: Vec<Arc<dyn PeriodicJob>> = vec![
|
||||
Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())),
|
||||
];
|
||||
if let Some(job) = enrichment_job { periodic_jobs.push(job); }
|
||||
if let Some((_, ref conv_job)) = conversion { periodic_jobs.push(Arc::clone(conv_job)); }
|
||||
|
||||
for job in periodic_jobs {
|
||||
tokio::spawn(async move {
|
||||
@@ -111,6 +123,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Arc::clone(&ctx.metadata_client),
|
||||
Arc::clone(&ctx.poster_fetcher),
|
||||
Arc::clone(&ctx.image_storage),
|
||||
Arc::clone(&ctx.event_publisher),
|
||||
3,
|
||||
)) as Arc<dyn EventHandler>;
|
||||
|
||||
@@ -122,6 +135,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
{
|
||||
let mut h = vec![poster, cleanup];
|
||||
if let Some(e) = enrichment_handler { h.push(e); }
|
||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||
h
|
||||
}
|
||||
|
||||
@@ -148,6 +162,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("federation event handler registered");
|
||||
let mut h = vec![poster, cleanup, ap];
|
||||
if let Some(e) = enrichment_handler { h.push(e); }
|
||||
if let Some((ref conv_handler, _)) = conversion { h.push(Arc::clone(conv_handler)); }
|
||||
h
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user