diff --git a/Cargo.lock b/Cargo.lock index 9cd04f3..f0565c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,6 +100,7 @@ dependencies = [ "futures", "object_store", "tokio", + "tokio-util", "tracing", ] @@ -188,6 +189,7 @@ dependencies = [ "bytes", "chrono", "domain", + "futures", "serde", "serde_json", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 958376a..79e0b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ email_address = "0.2" sha2 = "0.10" uuid = { version = "1.0", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } +tokio-util = { version = "0.7", features = ["io"] } dotenvy = "0.15" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/crates/adapters/postgres/migrations/015_performance_indexes.sql b/crates/adapters/postgres/migrations/015_performance_indexes.sql new file mode 100644 index 0000000..d584633 --- /dev/null +++ b/crates/adapters/postgres/migrations/015_performance_indexes.sql @@ -0,0 +1,5 @@ +CREATE INDEX idx_share_targets_target ON share_targets(target_id); +CREATE INDEX idx_duplicate_groups_status ON duplicate_groups(status); +CREATE INDEX idx_stacks_members ON asset_stacks USING GIN (members); +CREATE INDEX idx_duplicate_candidates ON duplicate_groups USING GIN (candidates); +CREATE INDEX idx_assets_created ON assets(owner_user_id, created_at DESC); diff --git a/crates/adapters/postgres/src/catalog/mod.rs b/crates/adapters/postgres/src/catalog/mod.rs index 50a7661..66fcba4 100644 --- a/crates/adapters/postgres/src/catalog/mod.rs +++ b/crates/adapters/postgres/src/catalog/mod.rs @@ -336,6 +336,23 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository { Ok(rows.into_iter().map(Into::into).collect()) } + async fn find_by_assets( + &self, + asset_ids: &[SystemId], + ) -> Result, DomainError> { + let uuids: Vec = asset_ids.iter().map(|id| *id.as_uuid()).collect(); + let rows = sqlx::query_as::<_, AssetMetadataRow>( + "SELECT asset_id, metadata_source, data, updated_at + FROM asset_metadata WHERE asset_id = ANY($1)", + ) + .bind(&uuids) + .fetch_all(&self.pool) + .await + .map_pg()?; + + Ok(rows.into_iter().map(Into::into).collect()) + } + async fn find_by_asset_and_source( &self, asset_id: &SystemId, @@ -482,11 +499,18 @@ impl DuplicateRepository for PostgresDuplicateRepository { Ok(row.map(Into::into)) } - async fn find_unresolved(&self) -> Result, DomainError> { + async fn find_unresolved( + &self, + limit: u32, + offset: u32, + ) -> Result, DomainError> { let rows = sqlx::query_as::<_, GroupRow>( "SELECT group_id, detection_method, status, candidates - FROM duplicate_groups WHERE status = 'unresolved'", + FROM duplicate_groups WHERE status = 'unresolved' + ORDER BY group_id LIMIT $1 OFFSET $2", ) + .bind(limit as i64) + .bind(offset as i64) .fetch_all(&self.pool) .await .map_pg()?; diff --git a/crates/adapters/storage/Cargo.toml b/crates/adapters/storage/Cargo.toml index 7c51262..5c1329e 100644 --- a/crates/adapters/storage/Cargo.toml +++ b/crates/adapters/storage/Cargo.toml @@ -16,6 +16,7 @@ tracing = { workspace = true } bytes = { workspace = true } futures = { workspace = true } tokio = { workspace = true, features = ["fs"] } +tokio-util = { workspace = true } object_store = { version = "0.11" } [dev-dependencies] diff --git a/crates/adapters/storage/src/local_file_storage.rs b/crates/adapters/storage/src/local_file_storage.rs index 2df49e2..3ce9f69 100644 --- a/crates/adapters/storage/src/local_file_storage.rs +++ b/crates/adapters/storage/src/local_file_storage.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; use bytes::Bytes; use domain::errors::DomainError; -use domain::ports::{FileEntry, FileStoragePort}; +use domain::ports::{DataStream, FileEntry, FileStoragePort}; +use futures::StreamExt; use std::path::PathBuf; +use tokio_util::io::ReaderStream; pub struct LocalFileStorage { base_path: PathBuf, @@ -51,6 +53,25 @@ impl FileStoragePort for LocalFileStorage { Ok(Bytes::from(data)) } + async fn open_file(&self, path: &str) -> Result<(DataStream, u64), DomainError> { + let full = self.resolve(path)?; + let meta = tokio::fs::metadata(&full) + .await + .map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => DomainError::NotFound(path.to_string()), + _ => DomainError::Internal(format!("Failed to stat file: {e}")), + })?; + let file = tokio::fs::File::open(&full) + .await + .map_err(|e| match e.kind() { + std::io::ErrorKind::NotFound => DomainError::NotFound(path.to_string()), + _ => DomainError::Internal(format!("Failed to open file: {e}")), + })?; + let stream = ReaderStream::new(file) + .map(|r| r.map_err(|e| DomainError::Internal(format!("Read error: {e}")))); + Ok((Box::pin(stream), meta.len())) + } + async fn delete_file(&self, path: &str) -> Result<(), DomainError> { let full = self.resolve(path)?; match tokio::fs::remove_file(&full).await { diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index f1f503b..6048e4b 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -14,6 +14,7 @@ bytes = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } +futures = { workspace = true } [dependencies.chrono] workspace = true diff --git a/crates/application/src/catalog/commands/resolve_duplicate.rs b/crates/application/src/catalog/commands/resolve_duplicate.rs index bd377e2..61d885f 100644 --- a/crates/application/src/catalog/commands/resolve_duplicate.rs +++ b/crates/application/src/catalog/commands/resolve_duplicate.rs @@ -66,7 +66,10 @@ impl ResolveDuplicateHandler { } } -pub struct ListDuplicatesQuery; +pub struct ListDuplicatesQuery { + pub limit: u32, + pub offset: u32, +} pub struct ListDuplicatesHandler { duplicate_repo: Arc, @@ -79,8 +82,10 @@ impl ListDuplicatesHandler { pub async fn execute( &self, - _query: ListDuplicatesQuery, + query: ListDuplicatesQuery, ) -> Result, DomainError> { - self.duplicate_repo.find_unresolved().await + self.duplicate_repo + .find_unresolved(query.limit, query.offset) + .await } } diff --git a/crates/application/src/catalog/queries/get_timeline.rs b/crates/application/src/catalog/queries/get_timeline.rs index c3b9719..ef5450f 100644 --- a/crates/application/src/catalog/queries/get_timeline.rs +++ b/crates/application/src/catalog/queries/get_timeline.rs @@ -62,12 +62,21 @@ impl GetTimelineHandler { .find_by_owner(&query.owner_id, query.limit, query.offset) .await?; - let mut results = Vec::with_capacity(assets.len()); - for asset in assets { - let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?; - let resolved = resolve_metadata(&layers); - results.push((asset, resolved)); - } + let asset_ids: Vec = assets.iter().map(|a| a.asset_id).collect(); + let all_layers = self.metadata_repo.find_by_assets(&asset_ids).await?; + + let results = assets + .into_iter() + .map(|asset| { + let layers: Vec<_> = all_layers + .iter() + .filter(|m| m.asset_id == asset.asset_id) + .cloned() + .collect(); + let resolved = resolve_metadata(&layers); + (asset, resolved) + }) + .collect(); Ok(results) } diff --git a/crates/application/src/catalog/queries/read_asset_file.rs b/crates/application/src/catalog/queries/read_asset_file.rs index 36bd156..9fc8690 100644 --- a/crates/application/src/catalog/queries/read_asset_file.rs +++ b/crates/application/src/catalog/queries/read_asset_file.rs @@ -1,7 +1,6 @@ -use bytes::Bytes; use domain::{ errors::DomainError, - ports::{AssetRepository, FileStoragePort}, + ports::{AssetRepository, DataStream, FileStoragePort}, value_objects::SystemId, }; use std::sync::Arc; @@ -13,7 +12,8 @@ pub struct ReadAssetFileQuery { } pub struct AssetFileResult { - pub data: Bytes, + pub stream: DataStream, + pub size: u64, pub mime_type: String, pub filename: String, } @@ -45,9 +45,9 @@ impl ReadAssetFileHandler { return Err(DomainError::Forbidden("Access denied".into())); } - let data = self + let (stream, size) = self .file_storage - .read_file(&asset.source_reference.relative_path) + .open_file(&asset.source_reference.relative_path) .await?; let filename = asset @@ -59,7 +59,8 @@ impl ReadAssetFileHandler { .to_string(); Ok(AssetFileResult { - data, + stream, + size, mime_type: asset.mime_type, filename, }) diff --git a/crates/application/src/catalog/queries/read_derivative.rs b/crates/application/src/catalog/queries/read_derivative.rs index 2f231e2..a51e6ab 100644 --- a/crates/application/src/catalog/queries/read_derivative.rs +++ b/crates/application/src/catalog/queries/read_derivative.rs @@ -1,8 +1,7 @@ -use bytes::Bytes; use domain::{ entities::{DerivativeProfile, GenerationStatus}, errors::DomainError, - ports::{DerivativeRepository, FileStoragePort}, + ports::{DataStream, DerivativeRepository, FileStoragePort}, value_objects::SystemId, }; use std::sync::Arc; @@ -14,7 +13,8 @@ pub struct ReadDerivativeQuery { } pub struct DerivativeFileResult { - pub data: Bytes, + pub stream: DataStream, + pub size: u64, pub mime_type: String, } @@ -68,13 +68,14 @@ impl ReadDerivativeHandler { ))); } - let data = self + let (stream, size) = self .file_storage - .read_file(&derivative.storage_path) + .open_file(&derivative.storage_path) .await?; Ok(DerivativeFileResult { - data, + stream, + size, mime_type: derivative.mime_type, }) } diff --git a/crates/application/src/catalog/visibility.rs b/crates/application/src/catalog/visibility.rs index fcce4c9..b8ab5fe 100644 --- a/crates/application/src/catalog/visibility.rs +++ b/crates/application/src/catalog/visibility.rs @@ -3,20 +3,17 @@ use domain::{ catalog::entities::{Asset, AssetFilters}, errors::DomainError, ports::{AssetRepository, ShareRepository}, + sharing::entities::ShareTarget, value_objects::{Checksum, SystemId}, }; use std::sync::Arc; +use tokio::sync::OnceCell; -/// Decorator that wraps an `AssetRepository` and filters query results -/// based on sharing permissions. The caller sees only assets they own -/// or have been granted access to via a `ShareScope` + `ShareTarget`. -/// -/// Write operations (`save`, `delete`) pass through to the inner repository -/// unchanged — authorization for writes is handled at the use-case layer. pub struct VisibilityFilteredAssetRepository { inner: Arc, share_repo: Arc, caller_id: SystemId, + caller_targets: OnceCell>, } impl VisibilityFilteredAssetRepository { @@ -29,17 +26,24 @@ impl VisibilityFilteredAssetRepository { inner, share_repo, caller_id, + caller_targets: OnceCell::new(), } } - /// Returns `true` if the caller owns the asset or has been granted - /// access through a share scope that targets them. + async fn get_caller_targets(&self) -> Result<&[ShareTarget], DomainError> { + self.caller_targets + .get_or_try_init(|| async { + self.share_repo.find_targets_for_user(&self.caller_id).await + }) + .await + .map(|v| v.as_slice()) + } + async fn caller_can_access(&self, asset: &Asset) -> Result { if asset.owner_user_id == self.caller_id { return Ok(true); } - // Find all share scopes that cover this asset let scopes = self .share_repo .find_scopes_for_resource(&asset.asset_id) @@ -49,14 +53,8 @@ impl VisibilityFilteredAssetRepository { return Ok(false); } - // Find all share targets that name this caller - let caller_targets = self - .share_repo - .find_targets_for_user(&self.caller_id) - .await?; + let caller_targets = self.get_caller_targets().await?; - // The caller has access if any of their targets reference a scope - // that covers this asset. for scope in &scopes { if scope.is_expired() { continue; @@ -68,6 +66,16 @@ impl VisibilityFilteredAssetRepository { Ok(false) } + + async fn filter_visible(&self, assets: Vec) -> Result, DomainError> { + let mut visible = Vec::with_capacity(assets.len()); + for asset in assets { + if self.caller_can_access(&asset).await? { + visible.push(asset); + } + } + Ok(visible) + } } #[async_trait] @@ -82,13 +90,7 @@ impl AssetRepository for VisibilityFilteredAssetRepository { async fn find_by_checksum(&self, checksum: &Checksum) -> Result, DomainError> { let assets = self.inner.find_by_checksum(checksum).await?; - let mut visible = Vec::with_capacity(assets.len()); - for asset in assets { - if self.caller_can_access(&asset).await? { - visible.push(asset); - } - } - Ok(visible) + self.filter_visible(assets).await } async fn find_by_owner( @@ -98,18 +100,11 @@ impl AssetRepository for VisibilityFilteredAssetRepository { offset: u32, ) -> Result, DomainError> { if owner_id == &self.caller_id { - // Querying own assets — no filtering needed. return self.inner.find_by_owner(owner_id, limit, offset).await; } let assets = self.inner.find_by_owner(owner_id, limit, offset).await?; - let mut visible = Vec::with_capacity(assets.len()); - for asset in assets { - if self.caller_can_access(&asset).await? { - visible.push(asset); - } - } - Ok(visible) + self.filter_visible(assets).await } async fn search( @@ -124,13 +119,7 @@ impl AssetRepository for VisibilityFilteredAssetRepository { } let assets = self.inner.search(owner_id, filters, limit, offset).await?; - let mut visible = Vec::with_capacity(assets.len()); - for asset in assets { - if self.caller_can_access(&asset).await? { - visible.push(asset); - } - } - Ok(visible) + self.filter_visible(assets).await } async fn save(&self, asset: &Asset) -> Result<(), DomainError> { @@ -219,7 +208,6 @@ mod tests { let share_repo = Arc::new(InMemoryShareRepository::new()); - // Create a share scope on the asset and target the friend let scope = share_asset(asset.asset_id, owner_id); share_repo.save_scope(&scope).await.unwrap(); @@ -241,7 +229,6 @@ mod tests { let asset_a = make_asset(owner_id); let mut asset_b = make_asset(stranger_id); - // Give asset_b the same checksum as asset_a asset_b.source_reference.checksum = asset_a.source_reference.checksum.clone(); let inner = Arc::new(InMemoryAssetRepository::new()); @@ -250,7 +237,6 @@ mod tests { let share_repo = Arc::new(InMemoryShareRepository::new()); - // Stranger queries by checksum — should only see their own let filtered = VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id); @@ -289,7 +275,6 @@ mod tests { let share_repo = Arc::new(InMemoryShareRepository::new()); - // Stranger queries owner's assets without a share — should get nothing let filtered = VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id); diff --git a/crates/application/src/sidecar/commands/full_export.rs b/crates/application/src/sidecar/commands/full_export.rs index 6e5d449..12e3fa9 100644 --- a/crates/application/src/sidecar/commands/full_export.rs +++ b/crates/application/src/sidecar/commands/full_export.rs @@ -8,6 +8,8 @@ use domain::{ }; use std::sync::Arc; +const BATCH_SIZE: u32 = 500; + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct FullExportCommand { pub owner_id: SystemId, @@ -36,30 +38,51 @@ impl FullExportHandler { } pub async fn execute(&self, cmd: FullExportCommand) -> Result { - let assets = self - .asset_repo - .find_by_owner(&cmd.owner_id, u32::MAX, 0) - .await?; let mut count = 0u32; + let mut offset = 0u32; - for asset in &assets { - let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?; - let resolved = resolve_metadata(&layers); - - let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { - Some(r) => r, - None => { - SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)) - } - }; - - self.writer - .write_sidecar(&resolved, &record.sidecar_storage_path) + loop { + let assets = self + .asset_repo + .find_by_owner(&cmd.owner_id, BATCH_SIZE, offset) .await?; - let hash = hash_structured_data(&resolved); - record.mark_synced(hash); - self.sidecar_repo.save(&record).await?; - count += 1; + + if assets.is_empty() { + break; + } + + let asset_ids: Vec = assets.iter().map(|a| a.asset_id).collect(); + let all_layers = self.metadata_repo.find_by_assets(&asset_ids).await?; + + for asset in &assets { + let layers: Vec<_> = all_layers + .iter() + .filter(|m| m.asset_id == asset.asset_id) + .cloned() + .collect(); + let resolved = resolve_metadata(&layers); + + let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { + Some(r) => r, + None => SidecarRecord::new( + asset.asset_id, + format!("sidecars/{}.xmp", asset.asset_id), + ), + }; + + self.writer + .write_sidecar(&resolved, &record.sidecar_storage_path) + .await?; + let hash = hash_structured_data(&resolved); + record.mark_synced(hash); + self.sidecar_repo.save(&record).await?; + count += 1; + } + + offset += assets.len() as u32; + if (assets.len() as u32) < BATCH_SIZE { + break; + } } Ok(count) diff --git a/crates/application/src/sidecar/commands/full_import.rs b/crates/application/src/sidecar/commands/full_import.rs index a8a2279..399c389 100644 --- a/crates/application/src/sidecar/commands/full_import.rs +++ b/crates/application/src/sidecar/commands/full_import.rs @@ -8,6 +8,8 @@ use domain::{ }; use std::sync::Arc; +const BATCH_SIZE: u32 = 500; + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct FullImportCommand { pub owner_id: SystemId, @@ -36,38 +38,43 @@ impl FullImportHandler { } pub async fn execute(&self, cmd: FullImportCommand) -> Result { - let assets = self - .asset_repo - .find_by_owner(&cmd.owner_id, u32::MAX, 0) - .await?; let mut count = 0u32; + let mut offset = 0u32; - for asset in &assets { - let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { - Some(r) => r, - None => { - // No sidecar record — try creating one to read from - SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id)) - } - }; + loop { + let assets = self + .asset_repo + .find_by_owner(&cmd.owner_id, BATCH_SIZE, offset) + .await?; - match self.writer.read_sidecar(&record.sidecar_storage_path).await { - Ok(data) => { - let metadata = AssetMetadata::new( + if assets.is_empty() { + break; + } + + for asset in &assets { + let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? { + Some(r) => r, + None => SidecarRecord::new( asset.asset_id, - MetadataSource::ExifExtracted, - data.clone(), - ); - self.metadata_repo.save(&metadata).await?; + format!("sidecars/{}.xmp", asset.asset_id), + ), + }; + + if let Ok(data) = self.writer.read_sidecar(&record.sidecar_storage_path).await { let hash = hash_structured_data(&data); + let metadata = + AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, data); + self.metadata_repo.save(&metadata).await?; let mut record = record; record.mark_synced(hash); self.sidecar_repo.save(&record).await?; count += 1; } - Err(_) => { - // Sidecar file missing — skip - } + } + + offset += assets.len() as u32; + if (assets.len() as u32) < BATCH_SIZE { + break; } } diff --git a/crates/application/src/testing/fakes.rs b/crates/application/src/testing/fakes.rs index 81c713e..3dce0fe 100644 --- a/crates/application/src/testing/fakes.rs +++ b/crates/application/src/testing/fakes.rs @@ -79,6 +79,13 @@ impl FileStoragePort for InMemoryFileStorage { .ok_or_else(|| DomainError::NotFound(format!("File not found: {path}"))) } + async fn open_file(&self, path: &str) -> Result<(domain::ports::DataStream, u64), DomainError> { + let data = self.read_file(path).await?; + let len = data.len() as u64; + let stream = futures::stream::once(async move { Ok(data) }); + Ok((Box::pin(stream), len)) + } + async fn delete_file(&self, path: &str) -> Result<(), DomainError> { self.files.lock().await.remove(path); Ok(()) diff --git a/crates/application/src/testing/repositories.rs b/crates/application/src/testing/repositories.rs index a7cdbcb..401229f 100644 --- a/crates/application/src/testing/repositories.rs +++ b/crates/application/src/testing/repositories.rs @@ -550,6 +550,23 @@ impl AssetMetadataRepository for InMemoryAssetMetadataRepository { .collect()) } + async fn find_by_assets( + &self, + asset_ids: &[SystemId], + ) -> Result, DomainError> { + let data = self.data.lock().await; + let mut results = Vec::new(); + for id in asset_ids { + let prefix = format!("{id}:"); + results.extend( + data.iter() + .filter(|(k, _)| k.starts_with(&prefix)) + .map(|(_, v)| v.clone()), + ); + } + Ok(results) + } + async fn find_by_asset_and_source( &self, asset_id: &SystemId, @@ -785,13 +802,19 @@ impl DuplicateRepository for InMemoryDuplicateRepository { Ok(self.data.lock().await.get(&id.to_string()).cloned()) } - async fn find_unresolved(&self) -> Result, DomainError> { + async fn find_unresolved( + &self, + limit: u32, + offset: u32, + ) -> Result, DomainError> { Ok(self .data .lock() .await .values() .filter(|g| g.status == DuplicateStatus::Unresolved) + .skip(offset as usize) + .take(limit as usize) .cloned() .collect()) } diff --git a/crates/application/tests/catalog/queries/read_asset_file.rs b/crates/application/tests/catalog/queries/read_asset_file.rs index 5a264ae..b6aed02 100644 --- a/crates/application/tests/catalog/queries/read_asset_file.rs +++ b/crates/application/tests/catalog/queries/read_asset_file.rs @@ -5,6 +5,7 @@ use domain::catalog::entities::{Asset, AssetType, SourceReference}; use domain::errors::DomainError; use domain::ports::{AssetRepository, FileStoragePort}; use domain::value_objects::{Checksum, SystemId}; +use futures::StreamExt; use std::sync::Arc; #[tokio::test] @@ -36,7 +37,9 @@ async fn reads_file_successfully() { .await .unwrap(); - assert_eq!(result.data, file_data); + let chunks: Vec = result.stream.map(|r| r.unwrap()).collect().await; + let data: Bytes = chunks.into_iter().flatten().collect(); + assert_eq!(data, file_data); assert_eq!(result.mime_type, "image/jpeg"); assert_eq!(result.filename, "cat.jpg"); } diff --git a/crates/domain/src/catalog/ports.rs b/crates/domain/src/catalog/ports.rs index 9662e94..7111302 100644 --- a/crates/domain/src/catalog/ports.rs +++ b/crates/domain/src/catalog/ports.rs @@ -35,6 +35,10 @@ pub trait AssetRepository: Send + Sync { #[async_trait] pub trait AssetMetadataRepository: Send + Sync { async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError>; + async fn find_by_assets( + &self, + asset_ids: &[SystemId], + ) -> Result, DomainError>; async fn find_by_asset_and_source( &self, asset_id: &SystemId, @@ -78,7 +82,11 @@ pub trait DerivativeRepository: Send + Sync { #[async_trait] pub trait DuplicateRepository: Send + Sync { async fn find_by_id(&self, id: &SystemId) -> Result, DomainError>; - async fn find_unresolved(&self) -> Result, DomainError>; + async fn find_unresolved( + &self, + limit: u32, + offset: u32, + ) -> Result, DomainError>; async fn find_by_asset(&self, asset_id: &SystemId) -> Result, DomainError>; async fn save(&self, group: &DuplicateGroup) -> Result<(), DomainError>; } diff --git a/crates/domain/src/storage/ports.rs b/crates/domain/src/storage/ports.rs index 7ede133..7397a1f 100644 --- a/crates/domain/src/storage/ports.rs +++ b/crates/domain/src/storage/ports.rs @@ -97,6 +97,7 @@ pub struct FileEntry { pub trait FileStoragePort: Send + Sync { async fn store_file(&self, path: &str, data: Bytes) -> Result<(), DomainError>; async fn read_file(&self, path: &str) -> Result; + async fn open_file(&self, path: &str) -> Result<(DataStream, u64), DomainError>; async fn delete_file(&self, path: &str) -> Result<(), DomainError>; async fn list_directory(&self, path: &str) -> Result, DomainError>; async fn file_exists(&self, path: &str) -> Result; diff --git a/crates/presentation/src/handlers/assets.rs b/crates/presentation/src/handlers/assets.rs index 0931895..00a9a11 100644 --- a/crates/presentation/src/handlers/assets.rs +++ b/crates/presentation/src/handlers/assets.rs @@ -203,12 +203,12 @@ pub async fn serve_file( Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, &result.mime_type) - .header(header::CONTENT_LENGTH, result.data.len()) + .header(header::CONTENT_LENGTH, result.size) .header( header::CONTENT_DISPOSITION, format!("inline; filename=\"{}\"", result.filename), ) - .body(Body::from(result.data)) + .body(Body::from_stream(result.stream)) .map_err(|e| AppError::from(domain::errors::DomainError::Internal(e.to_string()))) } @@ -256,9 +256,9 @@ pub async fn serve_derivative( Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, &result.mime_type) - .header(header::CONTENT_LENGTH, result.data.len()) + .header(header::CONTENT_LENGTH, result.size) .header(header::CACHE_CONTROL, "public, max-age=31536000, immutable") - .body(Body::from(result.data)) + .body(Body::from_stream(result.stream)) .map_err(|e| AppError::from(DomainError::Internal(e.to_string()))) } diff --git a/crates/presentation/src/handlers/duplicates.rs b/crates/presentation/src/handlers/duplicates.rs index f35394f..1288f9c 100644 --- a/crates/presentation/src/handlers/duplicates.rs +++ b/crates/presentation/src/handlers/duplicates.rs @@ -1,23 +1,35 @@ -use crate::{errors::AppError, extractors::JwtClaims, state::AppState}; +use crate::{ + constants::{DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE}, + errors::AppError, + extractors::JwtClaims, + state::AppState, +}; use api_types::{requests::ResolveDuplicateRequest, responses::DuplicateGroupResponse}; use application::catalog::{ListDuplicatesQuery, ResolveDuplicateCommand}; use axum::{ Json, - extract::{Path, State}, + extract::{Path, Query, State}, http::StatusCode, }; use domain::value_objects::SystemId; +#[derive(Debug, serde::Deserialize)] +pub struct ListDuplicatesParams { + pub limit: Option, + pub offset: Option, +} + pub async fn list_duplicates( State(state): State, claims: JwtClaims, + Query(params): Query, ) -> Result>, AppError> { super::require_admin(&claims)?; - let groups = state - .catalog - .list_duplicates - .execute(ListDuplicatesQuery) - .await?; + let query = ListDuplicatesQuery { + limit: params.limit.unwrap_or(DEFAULT_PAGE_SIZE).min(MAX_PAGE_SIZE), + offset: params.offset.unwrap_or(0), + }; + let groups = state.catalog.list_duplicates.execute(query).await?; let resp = groups .iter() .map(DuplicateGroupResponse::from_domain)