diff --git a/Cargo.toml b/Cargo.toml index b5d1d50..436b427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ resolver = "2" [workspace.dependencies] tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] } +bytes = "1" futures = "0.3" dotenvy = "0.15" serde = { version = "1.0", features = ["derive"] } diff --git a/crates/adapters/image-storage/Cargo.toml b/crates/adapters/image-storage/Cargo.toml index a985bd1..f7dff3c 100644 --- a/crates/adapters/image-storage/Cargo.toml +++ b/crates/adapters/image-storage/Cargo.toml @@ -8,6 +8,8 @@ domain = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } object_store = { workspace = true } infer = "0.19.0" diff --git a/crates/adapters/image-storage/src/lib.rs b/crates/adapters/image-storage/src/lib.rs index 3d8eb47..9435bf9 100644 --- a/crates/adapters/image-storage/src/lib.rs +++ b/crates/adapters/image-storage/src/lib.rs @@ -7,6 +7,7 @@ use domain::{ events::DomainEvent, ports::{EventHandler, ImageStorage}, }; +use futures::StreamExt; use object_store::{ObjectStore, path::Path}; use std::sync::Arc; @@ -48,6 +49,24 @@ impl ImageStorage for ImageStorageAdapter { .map_err(|e| DomainError::InfrastructureError(e.to_string())) } + async fn get_stream( + &self, + key: &str, + ) -> Result>, DomainError> + { + let path = Path::from(key); + let result = self.store.get(&path).await.map_err(|e| match e { + object_store::Error::NotFound { .. } => DomainError::NotFound("not found".into()), + _ => DomainError::InfrastructureError(e.to_string()), + })?; + let stream = result.into_stream().map(|chunk| { + chunk + .map(|b| bytes::Bytes::from(b.to_vec())) + .map_err(|e| DomainError::InfrastructureError(e.to_string())) + }); + Ok(Box::pin(stream)) + } + async fn delete(&self, key: &str) -> Result<(), DomainError> { let path = Path::from(key); match self.store.delete(&path).await { diff --git a/crates/domain/Cargo.toml b/crates/domain/Cargo.toml index afcb7dd..3331f47 100644 --- a/crates/domain/Cargo.toml +++ b/crates/domain/Cargo.toml @@ -8,6 +8,7 @@ uuid = { workspace = true } chrono = { workspace = true } async-trait = { workspace = true } thiserror = { workspace = true } +bytes = { workspace = true } futures = { workspace = true } serde = { workspace = true } diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 71c0020..0cecc92 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -188,6 +188,10 @@ pub trait ImageStorage: Send + Sync { /// Stores `image_bytes` at `key` and returns the stored key. async fn store(&self, key: &str, image_bytes: &[u8]) -> Result; async fn get(&self, key: &str) -> Result, DomainError>; + async fn get_stream( + &self, + key: &str, + ) -> Result>, DomainError>; async fn delete(&self, key: &str) -> Result<(), DomainError>; } diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index 1e1e4b9..9b6b6ed 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -365,6 +365,14 @@ impl ImageStorage for NoopImageStorage { Ok(vec![]) } + async fn get_stream( + &self, + _key: &str, + ) -> Result>, DomainError> + { + Ok(Box::pin(futures::stream::empty())) + } + async fn delete(&self, _key: &str) -> Result<(), DomainError> { Ok(()) } diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index fe3a520..0f4766e 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -74,5 +74,7 @@ sqlite-federation = { workspace = true, optional = true } postgres-federation = { workspace = true, optional = true } [dev-dependencies] +bytes = { workspace = true } +futures = { workspace = true } tower = { version = "0.5", features = ["util"] } http-body-util = "0.1" diff --git a/crates/presentation/src/handlers/wrapup.rs b/crates/presentation/src/handlers/wrapup.rs index 7519511..56920ca 100644 --- a/crates/presentation/src/handlers/wrapup.rs +++ b/crates/presentation/src/handlers/wrapup.rs @@ -162,19 +162,40 @@ pub async fn get_video( State(state): State, Path(id): Path, ) -> impl IntoResponse { - let record = match state.app_ctx.repos.wrapup_repo.get_by_id(&WrapUpId::from_uuid(id)).await { + let record = match state + .app_ctx + .repos + .wrapup_repo + .get_by_id(&WrapUpId::from_uuid(id)) + .await + { Ok(Some(r)) if r.status == WrapUpStatus::Ready => r, _ => return StatusCode::NOT_FOUND.into_response(), }; - let _ = record; // used only for status check + let _ = record; let video_key = format!("wrapups/{}/video.mp4", id); - match state.app_ctx.services.image_storage.get(&video_key).await { - Ok(bytes) => ( - StatusCode::OK, - [(axum::http::header::CONTENT_TYPE, "video/mp4"), - (axum::http::header::CONTENT_DISPOSITION, "attachment; filename=\"wrapup.mp4\"")], - bytes, - ).into_response(), + match state + .app_ctx + .services + .image_storage + .get_stream(&video_key) + .await + { + Ok(stream) => { + let body = axum::body::Body::from_stream(stream); + ( + StatusCode::OK, + [ + (axum::http::header::CONTENT_TYPE, "video/mp4"), + ( + axum::http::header::CONTENT_DISPOSITION, + "attachment; filename=\"wrapup.mp4\"", + ), + ], + body, + ) + .into_response() + } Err(_) => StatusCode::NOT_FOUND.into_response(), } } diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index bf99b0d..f6ed484 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -201,6 +201,13 @@ impl ImageStorage for Panic { async fn get(&self, _: &str) -> Result, DomainError> { panic!() } + async fn get_stream( + &self, + _: &str, + ) -> Result>, DomainError> + { + panic!() + } async fn delete(&self, _: &str) -> Result<(), DomainError> { panic!() } @@ -657,6 +664,12 @@ pub fn make_test_state(auth_service: Arc) -> crate::state::AppS allow_registration: false, base_url: "http://localhost:3000".to_string(), rate_limit: 20, + wrapup: application::config::WrapUpConfig { + font_path: None, + logo_path: None, + ffmpeg_path: "ffmpeg".into(), + max_concurrent_renders: 2, + }, }, }, rss_renderer: Arc::new(Panic), diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 8d955d2..8bf0cd7 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -70,6 +70,13 @@ impl ImageStorage for PanicImageStorage { async fn get(&self, _: &str) -> Result, DomainError> { panic!() } + async fn get_stream( + &self, + _: &str, + ) -> Result>, DomainError> + { + panic!() + } async fn delete(&self, _: &str) -> Result<(), DomainError> { panic!() } @@ -433,6 +440,12 @@ async fn test_app() -> Router { allow_registration: false, base_url: "http://localhost:3000".to_string(), rate_limit: 20, + wrapup: application::config::WrapUpConfig { + font_path: None, + logo_path: None, + ffmpeg_path: "ffmpeg".into(), + max_concurrent_renders: 2, + }, }, }, rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())),