feat: streaming video download via ImageStorage::get_stream
Some checks failed
CI / Check / Test (push) Failing after 41s
Some checks failed
CI / Check / Test (push) Failing after 41s
This commit is contained in:
@@ -37,6 +37,7 @@ resolver = "2"
|
|||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
|
||||||
|
bytes = "1"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
dotenvy = "0.15"
|
dotenvy = "0.15"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ domain = { workspace = true }
|
|||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
object_store = { workspace = true }
|
object_store = { workspace = true }
|
||||||
infer = "0.19.0"
|
infer = "0.19.0"
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use domain::{
|
|||||||
events::DomainEvent,
|
events::DomainEvent,
|
||||||
ports::{EventHandler, ImageStorage},
|
ports::{EventHandler, ImageStorage},
|
||||||
};
|
};
|
||||||
|
use futures::StreamExt;
|
||||||
use object_store::{ObjectStore, path::Path};
|
use object_store::{ObjectStore, path::Path};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@@ -48,6 +49,24 @@ impl ImageStorage for ImageStorageAdapter {
|
|||||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_stream(
|
||||||
|
&self,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>
|
||||||
|
{
|
||||||
|
let path = Path::from(key);
|
||||||
|
let result = self.store.get(&path).await.map_err(|e| match e {
|
||||||
|
object_store::Error::NotFound { .. } => DomainError::NotFound("not found".into()),
|
||||||
|
_ => DomainError::InfrastructureError(e.to_string()),
|
||||||
|
})?;
|
||||||
|
let stream = result.into_stream().map(|chunk| {
|
||||||
|
chunk
|
||||||
|
.map(|b| bytes::Bytes::from(b.to_vec()))
|
||||||
|
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||||
|
});
|
||||||
|
Ok(Box::pin(stream))
|
||||||
|
}
|
||||||
|
|
||||||
async fn delete(&self, key: &str) -> Result<(), DomainError> {
|
async fn delete(&self, key: &str) -> Result<(), DomainError> {
|
||||||
let path = Path::from(key);
|
let path = Path::from(key);
|
||||||
match self.store.delete(&path).await {
|
match self.store.delete(&path).await {
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ uuid = { workspace = true }
|
|||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
|
bytes = { workspace = true }
|
||||||
futures = { workspace = true }
|
futures = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|
||||||
|
|||||||
@@ -188,6 +188,10 @@ pub trait ImageStorage: Send + Sync {
|
|||||||
/// Stores `image_bytes` at `key` and returns the stored key.
|
/// Stores `image_bytes` at `key` and returns the stored key.
|
||||||
async fn store(&self, key: &str, image_bytes: &[u8]) -> Result<String, DomainError>;
|
async fn store(&self, key: &str, image_bytes: &[u8]) -> Result<String, DomainError>;
|
||||||
async fn get(&self, key: &str) -> Result<Vec<u8>, DomainError>;
|
async fn get(&self, key: &str) -> Result<Vec<u8>, DomainError>;
|
||||||
|
async fn get_stream(
|
||||||
|
&self,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>;
|
||||||
async fn delete(&self, key: &str) -> Result<(), DomainError>;
|
async fn delete(&self, key: &str) -> Result<(), DomainError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -365,6 +365,14 @@ impl ImageStorage for NoopImageStorage {
|
|||||||
Ok(vec![])
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_stream(
|
||||||
|
&self,
|
||||||
|
_key: &str,
|
||||||
|
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>
|
||||||
|
{
|
||||||
|
Ok(Box::pin(futures::stream::empty()))
|
||||||
|
}
|
||||||
|
|
||||||
async fn delete(&self, _key: &str) -> Result<(), DomainError> {
|
async fn delete(&self, _key: &str) -> Result<(), DomainError> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,5 +74,7 @@ sqlite-federation = { workspace = true, optional = true }
|
|||||||
postgres-federation = { workspace = true, optional = true }
|
postgres-federation = { workspace = true, optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
bytes = { workspace = true }
|
||||||
|
futures = { workspace = true }
|
||||||
tower = { version = "0.5", features = ["util"] }
|
tower = { version = "0.5", features = ["util"] }
|
||||||
http-body-util = "0.1"
|
http-body-util = "0.1"
|
||||||
|
|||||||
@@ -162,19 +162,40 @@ pub async fn get_video(
|
|||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(id): Path<Uuid>,
|
Path(id): Path<Uuid>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
let record = match state.app_ctx.repos.wrapup_repo.get_by_id(&WrapUpId::from_uuid(id)).await {
|
let record = match state
|
||||||
|
.app_ctx
|
||||||
|
.repos
|
||||||
|
.wrapup_repo
|
||||||
|
.get_by_id(&WrapUpId::from_uuid(id))
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(Some(r)) if r.status == WrapUpStatus::Ready => r,
|
Ok(Some(r)) if r.status == WrapUpStatus::Ready => r,
|
||||||
_ => return StatusCode::NOT_FOUND.into_response(),
|
_ => return StatusCode::NOT_FOUND.into_response(),
|
||||||
};
|
};
|
||||||
let _ = record; // used only for status check
|
let _ = record;
|
||||||
let video_key = format!("wrapups/{}/video.mp4", id);
|
let video_key = format!("wrapups/{}/video.mp4", id);
|
||||||
match state.app_ctx.services.image_storage.get(&video_key).await {
|
match state
|
||||||
Ok(bytes) => (
|
.app_ctx
|
||||||
StatusCode::OK,
|
.services
|
||||||
[(axum::http::header::CONTENT_TYPE, "video/mp4"),
|
.image_storage
|
||||||
(axum::http::header::CONTENT_DISPOSITION, "attachment; filename=\"wrapup.mp4\"")],
|
.get_stream(&video_key)
|
||||||
bytes,
|
.await
|
||||||
).into_response(),
|
{
|
||||||
|
Ok(stream) => {
|
||||||
|
let body = axum::body::Body::from_stream(stream);
|
||||||
|
(
|
||||||
|
StatusCode::OK,
|
||||||
|
[
|
||||||
|
(axum::http::header::CONTENT_TYPE, "video/mp4"),
|
||||||
|
(
|
||||||
|
axum::http::header::CONTENT_DISPOSITION,
|
||||||
|
"attachment; filename=\"wrapup.mp4\"",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
body,
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
Err(_) => StatusCode::NOT_FOUND.into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -201,6 +201,13 @@ impl ImageStorage for Panic {
|
|||||||
async fn get(&self, _: &str) -> Result<Vec<u8>, DomainError> {
|
async fn get(&self, _: &str) -> Result<Vec<u8>, DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
|
async fn get_stream(
|
||||||
|
&self,
|
||||||
|
_: &str,
|
||||||
|
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>
|
||||||
|
{
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
async fn delete(&self, _: &str) -> Result<(), DomainError> {
|
async fn delete(&self, _: &str) -> Result<(), DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
@@ -657,6 +664,12 @@ pub fn make_test_state(auth_service: Arc<dyn AuthService>) -> crate::state::AppS
|
|||||||
allow_registration: false,
|
allow_registration: false,
|
||||||
base_url: "http://localhost:3000".to_string(),
|
base_url: "http://localhost:3000".to_string(),
|
||||||
rate_limit: 20,
|
rate_limit: 20,
|
||||||
|
wrapup: application::config::WrapUpConfig {
|
||||||
|
font_path: None,
|
||||||
|
logo_path: None,
|
||||||
|
ffmpeg_path: "ffmpeg".into(),
|
||||||
|
max_concurrent_renders: 2,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
rss_renderer: Arc::new(Panic),
|
rss_renderer: Arc::new(Panic),
|
||||||
|
|||||||
@@ -70,6 +70,13 @@ impl ImageStorage for PanicImageStorage {
|
|||||||
async fn get(&self, _: &str) -> Result<Vec<u8>, DomainError> {
|
async fn get(&self, _: &str) -> Result<Vec<u8>, DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
|
async fn get_stream(
|
||||||
|
&self,
|
||||||
|
_: &str,
|
||||||
|
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>
|
||||||
|
{
|
||||||
|
panic!()
|
||||||
|
}
|
||||||
async fn delete(&self, _: &str) -> Result<(), DomainError> {
|
async fn delete(&self, _: &str) -> Result<(), DomainError> {
|
||||||
panic!()
|
panic!()
|
||||||
}
|
}
|
||||||
@@ -433,6 +440,12 @@ async fn test_app() -> Router {
|
|||||||
allow_registration: false,
|
allow_registration: false,
|
||||||
base_url: "http://localhost:3000".to_string(),
|
base_url: "http://localhost:3000".to_string(),
|
||||||
rate_limit: 20,
|
rate_limit: 20,
|
||||||
|
wrapup: application::config::WrapUpConfig {
|
||||||
|
font_path: None,
|
||||||
|
logo_path: None,
|
||||||
|
ffmpeg_path: "ffmpeg".into(),
|
||||||
|
max_concurrent_renders: 2,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())),
|
rss_renderer: Arc::new(RssAdapter::new("http://localhost:3000".into())),
|
||||||
|
|||||||
Reference in New Issue
Block a user