perf: scale fixes for 1M+ photo libraries

Indexes: share_targets.target_id, duplicate_groups.status,
GIN on stacks members + duplicate candidates JSONB,
composite (owner_user_id, created_at DESC) on assets.

N+1 elimination: batch metadata loading via find_by_assets(ids)
using WHERE asset_id = ANY($1), used in timeline + sidecar export.

Visibility: cache find_targets_for_user per request via OnceCell,
extract filter_visible helper to reduce duplication.

Streaming: FileStoragePort.open_file() returns (DataStream, u64),
LocalFileStorage uses ReaderStream instead of loading full file.
serve_file/serve_derivative use Body::from_stream().

Unbounded queries: sidecar full_export/import batched in 500-row
chunks instead of u32::MAX. find_unresolved paginated with
limit/offset. list_duplicates API accepts pagination params.
This commit is contained in:
2026-05-31 22:40:25 +02:00
parent d879fd6437
commit bcaf49cc81
21 changed files with 263 additions and 123 deletions

View File

@@ -66,7 +66,10 @@ impl ResolveDuplicateHandler {
}
}
pub struct ListDuplicatesQuery;
pub struct ListDuplicatesQuery {
pub limit: u32,
pub offset: u32,
}
pub struct ListDuplicatesHandler {
duplicate_repo: Arc<dyn DuplicateRepository>,
@@ -79,8 +82,10 @@ impl ListDuplicatesHandler {
pub async fn execute(
&self,
_query: ListDuplicatesQuery,
query: ListDuplicatesQuery,
) -> Result<Vec<domain::entities::DuplicateGroup>, DomainError> {
self.duplicate_repo.find_unresolved().await
self.duplicate_repo
.find_unresolved(query.limit, query.offset)
.await
}
}

View File

@@ -62,12 +62,21 @@ impl GetTimelineHandler {
.find_by_owner(&query.owner_id, query.limit, query.offset)
.await?;
let mut results = Vec::with_capacity(assets.len());
for asset in assets {
let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?;
let resolved = resolve_metadata(&layers);
results.push((asset, resolved));
}
let asset_ids: Vec<SystemId> = assets.iter().map(|a| a.asset_id).collect();
let all_layers = self.metadata_repo.find_by_assets(&asset_ids).await?;
let results = assets
.into_iter()
.map(|asset| {
let layers: Vec<_> = all_layers
.iter()
.filter(|m| m.asset_id == asset.asset_id)
.cloned()
.collect();
let resolved = resolve_metadata(&layers);
(asset, resolved)
})
.collect();
Ok(results)
}

View File

@@ -1,7 +1,6 @@
use bytes::Bytes;
use domain::{
errors::DomainError,
ports::{AssetRepository, FileStoragePort},
ports::{AssetRepository, DataStream, FileStoragePort},
value_objects::SystemId,
};
use std::sync::Arc;
@@ -13,7 +12,8 @@ pub struct ReadAssetFileQuery {
}
pub struct AssetFileResult {
pub data: Bytes,
pub stream: DataStream,
pub size: u64,
pub mime_type: String,
pub filename: String,
}
@@ -45,9 +45,9 @@ impl ReadAssetFileHandler {
return Err(DomainError::Forbidden("Access denied".into()));
}
let data = self
let (stream, size) = self
.file_storage
.read_file(&asset.source_reference.relative_path)
.open_file(&asset.source_reference.relative_path)
.await?;
let filename = asset
@@ -59,7 +59,8 @@ impl ReadAssetFileHandler {
.to_string();
Ok(AssetFileResult {
data,
stream,
size,
mime_type: asset.mime_type,
filename,
})

View File

@@ -1,8 +1,7 @@
use bytes::Bytes;
use domain::{
entities::{DerivativeProfile, GenerationStatus},
errors::DomainError,
ports::{DerivativeRepository, FileStoragePort},
ports::{DataStream, DerivativeRepository, FileStoragePort},
value_objects::SystemId,
};
use std::sync::Arc;
@@ -14,7 +13,8 @@ pub struct ReadDerivativeQuery {
}
pub struct DerivativeFileResult {
pub data: Bytes,
pub stream: DataStream,
pub size: u64,
pub mime_type: String,
}
@@ -68,13 +68,14 @@ impl ReadDerivativeHandler {
)));
}
let data = self
let (stream, size) = self
.file_storage
.read_file(&derivative.storage_path)
.open_file(&derivative.storage_path)
.await?;
Ok(DerivativeFileResult {
data,
stream,
size,
mime_type: derivative.mime_type,
})
}

View File

@@ -3,20 +3,17 @@ use domain::{
catalog::entities::{Asset, AssetFilters},
errors::DomainError,
ports::{AssetRepository, ShareRepository},
sharing::entities::ShareTarget,
value_objects::{Checksum, SystemId},
};
use std::sync::Arc;
use tokio::sync::OnceCell;
/// Decorator that wraps an `AssetRepository` and filters query results
/// based on sharing permissions. The caller sees only assets they own
/// or have been granted access to via a `ShareScope` + `ShareTarget`.
///
/// Write operations (`save`, `delete`) pass through to the inner repository
/// unchanged — authorization for writes is handled at the use-case layer.
pub struct VisibilityFilteredAssetRepository {
inner: Arc<dyn AssetRepository>,
share_repo: Arc<dyn ShareRepository>,
caller_id: SystemId,
caller_targets: OnceCell<Vec<ShareTarget>>,
}
impl VisibilityFilteredAssetRepository {
@@ -29,17 +26,24 @@ impl VisibilityFilteredAssetRepository {
inner,
share_repo,
caller_id,
caller_targets: OnceCell::new(),
}
}
/// Returns `true` if the caller owns the asset or has been granted
/// access through a share scope that targets them.
async fn get_caller_targets(&self) -> Result<&[ShareTarget], DomainError> {
self.caller_targets
.get_or_try_init(|| async {
self.share_repo.find_targets_for_user(&self.caller_id).await
})
.await
.map(|v| v.as_slice())
}
async fn caller_can_access(&self, asset: &Asset) -> Result<bool, DomainError> {
if asset.owner_user_id == self.caller_id {
return Ok(true);
}
// Find all share scopes that cover this asset
let scopes = self
.share_repo
.find_scopes_for_resource(&asset.asset_id)
@@ -49,14 +53,8 @@ impl VisibilityFilteredAssetRepository {
return Ok(false);
}
// Find all share targets that name this caller
let caller_targets = self
.share_repo
.find_targets_for_user(&self.caller_id)
.await?;
let caller_targets = self.get_caller_targets().await?;
// The caller has access if any of their targets reference a scope
// that covers this asset.
for scope in &scopes {
if scope.is_expired() {
continue;
@@ -68,6 +66,16 @@ impl VisibilityFilteredAssetRepository {
Ok(false)
}
async fn filter_visible(&self, assets: Vec<Asset>) -> Result<Vec<Asset>, DomainError> {
let mut visible = Vec::with_capacity(assets.len());
for asset in assets {
if self.caller_can_access(&asset).await? {
visible.push(asset);
}
}
Ok(visible)
}
}
#[async_trait]
@@ -82,13 +90,7 @@ impl AssetRepository for VisibilityFilteredAssetRepository {
async fn find_by_checksum(&self, checksum: &Checksum) -> Result<Vec<Asset>, DomainError> {
let assets = self.inner.find_by_checksum(checksum).await?;
let mut visible = Vec::with_capacity(assets.len());
for asset in assets {
if self.caller_can_access(&asset).await? {
visible.push(asset);
}
}
Ok(visible)
self.filter_visible(assets).await
}
async fn find_by_owner(
@@ -98,18 +100,11 @@ impl AssetRepository for VisibilityFilteredAssetRepository {
offset: u32,
) -> Result<Vec<Asset>, DomainError> {
if owner_id == &self.caller_id {
// Querying own assets — no filtering needed.
return self.inner.find_by_owner(owner_id, limit, offset).await;
}
let assets = self.inner.find_by_owner(owner_id, limit, offset).await?;
let mut visible = Vec::with_capacity(assets.len());
for asset in assets {
if self.caller_can_access(&asset).await? {
visible.push(asset);
}
}
Ok(visible)
self.filter_visible(assets).await
}
async fn search(
@@ -124,13 +119,7 @@ impl AssetRepository for VisibilityFilteredAssetRepository {
}
let assets = self.inner.search(owner_id, filters, limit, offset).await?;
let mut visible = Vec::with_capacity(assets.len());
for asset in assets {
if self.caller_can_access(&asset).await? {
visible.push(asset);
}
}
Ok(visible)
self.filter_visible(assets).await
}
async fn save(&self, asset: &Asset) -> Result<(), DomainError> {
@@ -219,7 +208,6 @@ mod tests {
let share_repo = Arc::new(InMemoryShareRepository::new());
// Create a share scope on the asset and target the friend
let scope = share_asset(asset.asset_id, owner_id);
share_repo.save_scope(&scope).await.unwrap();
@@ -241,7 +229,6 @@ mod tests {
let asset_a = make_asset(owner_id);
let mut asset_b = make_asset(stranger_id);
// Give asset_b the same checksum as asset_a
asset_b.source_reference.checksum = asset_a.source_reference.checksum.clone();
let inner = Arc::new(InMemoryAssetRepository::new());
@@ -250,7 +237,6 @@ mod tests {
let share_repo = Arc::new(InMemoryShareRepository::new());
// Stranger queries by checksum — should only see their own
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id);
@@ -289,7 +275,6 @@ mod tests {
let share_repo = Arc::new(InMemoryShareRepository::new());
// Stranger queries owner's assets without a share — should get nothing
let filtered =
VisibilityFilteredAssetRepository::new(inner.clone(), share_repo.clone(), stranger_id);

View File

@@ -8,6 +8,8 @@ use domain::{
};
use std::sync::Arc;
const BATCH_SIZE: u32 = 500;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FullExportCommand {
pub owner_id: SystemId,
@@ -36,30 +38,51 @@ impl FullExportHandler {
}
pub async fn execute(&self, cmd: FullExportCommand) -> Result<u32, DomainError> {
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, u32::MAX, 0)
.await?;
let mut count = 0u32;
let mut offset = 0u32;
for asset in &assets {
let layers = self.metadata_repo.find_by_asset(&asset.asset_id).await?;
let resolved = resolve_metadata(&layers);
let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
Some(r) => r,
None => {
SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id))
}
};
self.writer
.write_sidecar(&resolved, &record.sidecar_storage_path)
loop {
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, BATCH_SIZE, offset)
.await?;
let hash = hash_structured_data(&resolved);
record.mark_synced(hash);
self.sidecar_repo.save(&record).await?;
count += 1;
if assets.is_empty() {
break;
}
let asset_ids: Vec<SystemId> = assets.iter().map(|a| a.asset_id).collect();
let all_layers = self.metadata_repo.find_by_assets(&asset_ids).await?;
for asset in &assets {
let layers: Vec<_> = all_layers
.iter()
.filter(|m| m.asset_id == asset.asset_id)
.cloned()
.collect();
let resolved = resolve_metadata(&layers);
let mut record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
Some(r) => r,
None => SidecarRecord::new(
asset.asset_id,
format!("sidecars/{}.xmp", asset.asset_id),
),
};
self.writer
.write_sidecar(&resolved, &record.sidecar_storage_path)
.await?;
let hash = hash_structured_data(&resolved);
record.mark_synced(hash);
self.sidecar_repo.save(&record).await?;
count += 1;
}
offset += assets.len() as u32;
if (assets.len() as u32) < BATCH_SIZE {
break;
}
}
Ok(count)

View File

@@ -8,6 +8,8 @@ use domain::{
};
use std::sync::Arc;
const BATCH_SIZE: u32 = 500;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FullImportCommand {
pub owner_id: SystemId,
@@ -36,38 +38,43 @@ impl FullImportHandler {
}
pub async fn execute(&self, cmd: FullImportCommand) -> Result<u32, DomainError> {
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, u32::MAX, 0)
.await?;
let mut count = 0u32;
let mut offset = 0u32;
for asset in &assets {
let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
Some(r) => r,
None => {
// No sidecar record — try creating one to read from
SidecarRecord::new(asset.asset_id, format!("sidecars/{}.xmp", asset.asset_id))
}
};
loop {
let assets = self
.asset_repo
.find_by_owner(&cmd.owner_id, BATCH_SIZE, offset)
.await?;
match self.writer.read_sidecar(&record.sidecar_storage_path).await {
Ok(data) => {
let metadata = AssetMetadata::new(
if assets.is_empty() {
break;
}
for asset in &assets {
let record = match self.sidecar_repo.find_by_asset(&asset.asset_id).await? {
Some(r) => r,
None => SidecarRecord::new(
asset.asset_id,
MetadataSource::ExifExtracted,
data.clone(),
);
self.metadata_repo.save(&metadata).await?;
format!("sidecars/{}.xmp", asset.asset_id),
),
};
if let Ok(data) = self.writer.read_sidecar(&record.sidecar_storage_path).await {
let hash = hash_structured_data(&data);
let metadata =
AssetMetadata::new(asset.asset_id, MetadataSource::ExifExtracted, data);
self.metadata_repo.save(&metadata).await?;
let mut record = record;
record.mark_synced(hash);
self.sidecar_repo.save(&record).await?;
count += 1;
}
Err(_) => {
// Sidecar file missing — skip
}
}
offset += assets.len() as u32;
if (assets.len() as u32) < BATCH_SIZE {
break;
}
}

View File

@@ -79,6 +79,13 @@ impl FileStoragePort for InMemoryFileStorage {
.ok_or_else(|| DomainError::NotFound(format!("File not found: {path}")))
}
async fn open_file(&self, path: &str) -> Result<(domain::ports::DataStream, u64), DomainError> {
let data = self.read_file(path).await?;
let len = data.len() as u64;
let stream = futures::stream::once(async move { Ok(data) });
Ok((Box::pin(stream), len))
}
async fn delete_file(&self, path: &str) -> Result<(), DomainError> {
self.files.lock().await.remove(path);
Ok(())

View File

@@ -550,6 +550,23 @@ impl AssetMetadataRepository for InMemoryAssetMetadataRepository {
.collect())
}
async fn find_by_assets(
&self,
asset_ids: &[SystemId],
) -> Result<Vec<AssetMetadata>, DomainError> {
let data = self.data.lock().await;
let mut results = Vec::new();
for id in asset_ids {
let prefix = format!("{id}:");
results.extend(
data.iter()
.filter(|(k, _)| k.starts_with(&prefix))
.map(|(_, v)| v.clone()),
);
}
Ok(results)
}
async fn find_by_asset_and_source(
&self,
asset_id: &SystemId,
@@ -785,13 +802,19 @@ impl DuplicateRepository for InMemoryDuplicateRepository {
Ok(self.data.lock().await.get(&id.to_string()).cloned())
}
async fn find_unresolved(&self) -> Result<Vec<DuplicateGroup>, DomainError> {
async fn find_unresolved(
&self,
limit: u32,
offset: u32,
) -> Result<Vec<DuplicateGroup>, DomainError> {
Ok(self
.data
.lock()
.await
.values()
.filter(|g| g.status == DuplicateStatus::Unresolved)
.skip(offset as usize)
.take(limit as usize)
.cloned()
.collect())
}