use async_trait::async_trait; use domain::errors::DomainError; use domain::ports::{DataStream, StorageReader, StorageWriter}; use futures::stream::StreamExt; use object_store::{Error as OsError, ObjectStore, path::Path}; use std::sync::Arc; pub struct ObjectStorageAdapter { store: Arc, prefix: String, } impl ObjectStorageAdapter { pub fn new( store: Arc, prefix: impl Into, ) -> Result { let prefix = prefix.into(); if !prefix.is_empty() { validate_key(&prefix)?; } Ok(Self { store, prefix }) } fn path(&self, key: &str) -> Path { if self.prefix.is_empty() { Path::from(key) } else { Path::from(format!("{}/{key}", self.prefix)) } } } fn map_err(e: OsError, key: &str) -> DomainError { match e { OsError::NotFound { .. } => DomainError::NotFound(key.to_string()), e => DomainError::Internal(e.to_string()), } } fn validate_key(key: &str) -> Result<(), DomainError> { if key.is_empty() { return Err(DomainError::Validation( "storage key must not be empty".into(), )); } if key.starts_with('/') { return Err(DomainError::Validation(format!( "storage key must not start with '/': {key}" ))); } if key.split('/').any(|seg| seg == ".." || seg == ".") { return Err(DomainError::Validation(format!( "storage key contains invalid path segment: {key}" ))); } Ok(()) } #[async_trait] impl StorageWriter for ObjectStorageAdapter { async fn put(&self, key: &str, data: DataStream) -> Result<(), DomainError> { validate_key(key)?; let path = self.path(key); let mut upload = self .store .put_multipart(&path) .await .map_err(|e| DomainError::Internal(e.to_string()))?; let mut stream = data; while let Some(result) = stream.next().await { match result { Ok(bytes) => { if let Err(e) = upload.put_part(bytes.into()).await { let _ = upload.abort().await; return Err(DomainError::Internal(e.to_string())); } } Err(e) => { let _ = upload.abort().await; return Err(e); } } } upload .complete() .await .map_err(|e| DomainError::Internal(e.to_string()))?; Ok(()) } async fn delete(&self, key: &str) -> Result<(), DomainError> { validate_key(key)?; let path = self.path(key); match self.store.delete(&path).await { Ok(()) => Ok(()), Err(OsError::NotFound { .. }) => Ok(()), Err(e) => Err(DomainError::Internal(e.to_string())), } } } #[async_trait] impl StorageReader for ObjectStorageAdapter { async fn get(&self, key: &str) -> Result { validate_key(key)?; let path = self.path(key); let result = self.store.get(&path).await.map_err(|e| map_err(e, key))?; let s = result .into_stream() .map(|r| r.map_err(|e| DomainError::Internal(e.to_string()))); Ok(Box::pin(s)) } async fn list(&self, prefix: Option<&str>) -> Result, DomainError> { if let Some(p) = prefix { validate_key(p)?; } let list_prefix = match (prefix, self.prefix.is_empty()) { (Some(p), false) => Some(Path::from(format!("{}/{p}", self.prefix))), (Some(p), true) => Some(Path::from(p)), (None, false) => Some(Path::from(self.prefix.as_str())), (None, true) => None, }; let mut result = Vec::new(); let mut stream = self.store.list(list_prefix.as_ref()); while let Some(meta) = stream.next().await { let meta = meta.map_err(|e| DomainError::Internal(e.to_string()))?; let key = meta.location.to_string(); let stripped = if !self.prefix.is_empty() { key.strip_prefix(&format!("{}/", self.prefix)) .ok_or_else(|| { DomainError::Internal(format!( "listed key '{key}' does not start with expected prefix '{}'", self.prefix )) })? .to_string() } else { key }; result.push(stripped); } Ok(result) } }