refactor: rename image-storage crate to object-storage
Some checks failed
CI / Check / Test (push) Failing after 44s
Some checks failed
CI / Check / Test (push) Failing after 44s
This commit is contained in:
18
crates/adapters/object-storage/Cargo.toml
Normal file
18
crates/adapters/object-storage/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "object-storage"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
object_store = { workspace = true }
|
||||
infer = "0.19.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
66
crates/adapters/object-storage/src/config.rs
Normal file
66
crates/adapters/object-storage/src/config.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
use anyhow::Context;
|
||||
use object_store::{ObjectStore, aws::AmazonS3Builder, local::LocalFileSystem};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct StorageConfig(Arc<dyn ObjectStore>);
|
||||
|
||||
impl StorageConfig {
|
||||
pub fn from_env() -> anyhow::Result<Self> {
|
||||
let backend = std::env::var("IMAGE_STORAGE_BACKEND")
|
||||
.context("IMAGE_STORAGE_BACKEND required (valid values: s3, local)")?;
|
||||
|
||||
let store: Arc<dyn ObjectStore> = match backend.as_str() {
|
||||
"s3" => build_s3_store(
|
||||
&std::env::var("MINIO_ENDPOINT").context("MINIO_ENDPOINT required")?,
|
||||
&std::env::var("MINIO_ACCESS_KEY_ID").context("MINIO_ACCESS_KEY_ID required")?,
|
||||
&std::env::var("MINIO_SECRET_ACCESS_KEY")
|
||||
.context("MINIO_SECRET_ACCESS_KEY required")?,
|
||||
&std::env::var("MINIO_BUCKET").context("MINIO_BUCKET required")?,
|
||||
&std::env::var("MINIO_REGION").unwrap_or_else(|_| "minio".to_string()),
|
||||
)?,
|
||||
"local" => build_local_store(
|
||||
&std::env::var("IMAGE_STORAGE_PATH")
|
||||
.context("IMAGE_STORAGE_PATH required when IMAGE_STORAGE_BACKEND=local")?,
|
||||
)?,
|
||||
other => {
|
||||
anyhow::bail!("Unknown IMAGE_STORAGE_BACKEND: {other:?}. Valid values: s3, local")
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self(store))
|
||||
}
|
||||
|
||||
pub fn build_store(self) -> Arc<dyn ObjectStore> {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
fn build_s3_store(
|
||||
endpoint: &str,
|
||||
access_key_id: &str,
|
||||
secret_access_key: &str,
|
||||
bucket: &str,
|
||||
region: &str,
|
||||
) -> anyhow::Result<Arc<dyn ObjectStore>> {
|
||||
let store = AmazonS3Builder::new()
|
||||
.with_endpoint(endpoint)
|
||||
.with_access_key_id(access_key_id)
|
||||
.with_secret_access_key(secret_access_key)
|
||||
.with_bucket_name(bucket)
|
||||
.with_region(region)
|
||||
.with_allow_http(true)
|
||||
.build()
|
||||
.context("Failed to build S3/Minio store")?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
|
||||
fn build_local_store(path: &str) -> anyhow::Result<Arc<dyn ObjectStore>> {
|
||||
std::fs::create_dir_all(path).context("Failed to create image storage directory")?;
|
||||
let store = LocalFileSystem::new_with_prefix(path)
|
||||
.context("Failed to initialise local file system store")?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tests/config.rs"]
|
||||
mod tests;
|
||||
115
crates/adapters/object-storage/src/lib.rs
Normal file
115
crates/adapters/object-storage/src/lib.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
mod config;
|
||||
pub use config::StorageConfig;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
errors::DomainError,
|
||||
events::DomainEvent,
|
||||
ports::{EventHandler, ObjectStorage},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use object_store::{ObjectStore, path::Path};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct ObjectStorageAdapter {
|
||||
store: Arc<dyn ObjectStore>,
|
||||
}
|
||||
|
||||
impl ObjectStorageAdapter {
|
||||
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
|
||||
pub fn from_config(config: StorageConfig) -> Self {
|
||||
Self::new(config.build_store())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ObjectStorage for ObjectStorageAdapter {
|
||||
async fn store(&self, key: &str, image_bytes: &[u8]) -> Result<String, DomainError> {
|
||||
let path = Path::from(key);
|
||||
self.store
|
||||
.put(&path, image_bytes.to_vec().into())
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))?;
|
||||
Ok(key.to_string())
|
||||
}
|
||||
|
||||
async fn get(&self, key: &str) -> Result<Vec<u8>, DomainError> {
|
||||
let path = Path::from(key);
|
||||
let result = self.store.get(&path).await.map_err(|e| match e {
|
||||
object_store::Error::NotFound { .. } => DomainError::NotFound("Image not found".into()),
|
||||
_ => DomainError::InfrastructureError(e.to_string()),
|
||||
})?;
|
||||
result
|
||||
.bytes()
|
||||
.await
|
||||
.map(|b| b.to_vec())
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
}
|
||||
|
||||
async fn get_stream(
|
||||
&self,
|
||||
key: &str,
|
||||
) -> Result<futures::stream::BoxStream<'static, Result<bytes::Bytes, DomainError>>, DomainError>
|
||||
{
|
||||
let path = Path::from(key);
|
||||
let result = self.store.get(&path).await.map_err(|e| match e {
|
||||
object_store::Error::NotFound { .. } => DomainError::NotFound("not found".into()),
|
||||
_ => DomainError::InfrastructureError(e.to_string()),
|
||||
})?;
|
||||
let stream = result.into_stream().map(|chunk| {
|
||||
chunk
|
||||
.map(|b| bytes::Bytes::from(b.to_vec()))
|
||||
.map_err(|e| DomainError::InfrastructureError(e.to_string()))
|
||||
});
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
async fn delete(&self, key: &str) -> Result<(), DomainError> {
|
||||
let path = Path::from(key);
|
||||
match self.store.delete(&path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(object_store::Error::NotFound { .. }) => Ok(()),
|
||||
Err(e) => Err(DomainError::InfrastructureError(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ImageCleanupHandler {
|
||||
object_storage: Arc<dyn ObjectStorage>,
|
||||
}
|
||||
|
||||
impl ImageCleanupHandler {
|
||||
pub fn new(object_storage: Arc<dyn ObjectStorage>) -> Self {
|
||||
Self { object_storage }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventHandler for ImageCleanupHandler {
|
||||
async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError> {
|
||||
let poster_path = match event {
|
||||
DomainEvent::MovieDeleted { poster_path, .. } => poster_path,
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let Some(path) = poster_path else {
|
||||
return Ok(());
|
||||
};
|
||||
if let Err(e) = self.object_storage.delete(path.value()).await {
|
||||
tracing::warn!("image cleanup failed for {}: {e}", path.value());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create() -> anyhow::Result<Arc<dyn ObjectStorage>> {
|
||||
Ok(Arc::new(ObjectStorageAdapter::from_config(
|
||||
StorageConfig::from_env()?,
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "tests/lib.rs"]
|
||||
mod tests;
|
||||
17
crates/adapters/object-storage/src/tests/config.rs
Normal file
17
crates/adapters/object-storage/src/tests/config.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn local_store_creates_dir_and_succeeds() {
|
||||
let dir = std::env::temp_dir().join(format!("image_test_{}", uuid::Uuid::new_v4()));
|
||||
let result = build_local_store(dir.to_str().unwrap());
|
||||
assert!(result.is_ok(), "expected Ok, got: {:?}", result.err());
|
||||
assert!(dir.exists(), "directory should have been created");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_store_succeeds_if_dir_already_exists() {
|
||||
let dir = std::env::temp_dir().join(format!("image_test_{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&dir).unwrap();
|
||||
let result = build_local_store(dir.to_str().unwrap());
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
61
crates/adapters/object-storage/src/tests/lib.rs
Normal file
61
crates/adapters/object-storage/src/tests/lib.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use super::*;
|
||||
use object_store::memory::InMemory;
|
||||
|
||||
fn adapter() -> ObjectStorageAdapter {
|
||||
ObjectStorageAdapter::new(Arc::new(InMemory::new()))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn store_and_retrieve_round_trip() {
|
||||
let adapter = adapter();
|
||||
let bytes = b"fake-image-bytes";
|
||||
let path = adapter.store("posters/abc123", bytes).await.unwrap();
|
||||
assert_eq!(path, "posters/abc123");
|
||||
let retrieved = adapter.get("posters/abc123").await.unwrap();
|
||||
assert_eq!(retrieved, bytes);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_missing_returns_not_found() {
|
||||
let adapter = adapter();
|
||||
let result = adapter.get("nonexistent").await;
|
||||
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_removes_key() {
|
||||
let adapter = adapter();
|
||||
adapter.store("avatars/user1", b"img").await.unwrap();
|
||||
adapter.delete("avatars/user1").await.unwrap();
|
||||
let result = adapter.get("avatars/user1").await;
|
||||
assert!(matches!(result, Err(DomainError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_missing_returns_ok() {
|
||||
let adapter = adapter();
|
||||
assert!(adapter.delete("does-not-exist").await.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cleanup_handler_deletes_on_movie_deleted() {
|
||||
use domain::{
|
||||
events::DomainEvent,
|
||||
value_objects::{MovieId, PosterPath},
|
||||
};
|
||||
let inner = Arc::new(adapter());
|
||||
inner.store("some-uuid", b"img").await.unwrap();
|
||||
let path = PosterPath::new("some-uuid".to_string()).unwrap();
|
||||
let handler = ImageCleanupHandler::new(Arc::clone(&inner) as Arc<dyn ObjectStorage>);
|
||||
handler
|
||||
.handle(&DomainEvent::MovieDeleted {
|
||||
movie_id: MovieId::from_uuid(uuid::Uuid::new_v4()),
|
||||
poster_path: Some(path.clone()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(matches!(
|
||||
inner.get("some-uuid").await,
|
||||
Err(DomainError::NotFound(_))
|
||||
));
|
||||
}
|
||||
Reference in New Issue
Block a user