feat(storage): add generic object storage adapter with CQRS traits, key validation, StorageConfig, and cargo-generate integration
This commit is contained in:
21
crates/adapters/storage/Cargo.toml
Normal file
21
crates/adapters/storage/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "adapters-storage"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
s3 = ["object_store/aws"]
|
||||
gcs = ["object_store/gcp"]
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
object_store = { version = "0.11" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true }
|
||||
310
crates/adapters/storage/src/adapter.rs
Normal file
310
crates/adapters/storage/src/adapter.rs
Normal file
@@ -0,0 +1,310 @@
|
||||
use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::StreamExt;
|
||||
use object_store::{ObjectStore, path::Path, Error as OsError};
|
||||
use domain::errors::DomainError;
|
||||
use domain::ports::{DataStream, StorageReader, StorageWriter};
|
||||
|
||||
pub struct ObjectStorageAdapter {
|
||||
store: Arc<dyn ObjectStore>,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
impl ObjectStorageAdapter {
|
||||
pub fn new(store: Arc<dyn ObjectStore>, prefix: impl Into<String>) -> Result<Self, DomainError> {
|
||||
let prefix = prefix.into();
|
||||
if !prefix.is_empty() {
|
||||
validate_key(&prefix)?;
|
||||
}
|
||||
Ok(Self { store, prefix })
|
||||
}
|
||||
|
||||
fn path(&self, key: &str) -> Path {
|
||||
if self.prefix.is_empty() {
|
||||
Path::from(key)
|
||||
} else {
|
||||
Path::from(format!("{}/{key}", self.prefix))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err(e: OsError, key: &str) -> DomainError {
|
||||
match e {
|
||||
OsError::NotFound { .. } => DomainError::NotFound(key.to_string()),
|
||||
e => DomainError::Internal(e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_key(key: &str) -> Result<(), DomainError> {
|
||||
if key.is_empty() {
|
||||
return Err(DomainError::Validation("storage key must not be empty".into()));
|
||||
}
|
||||
if key.starts_with('/') {
|
||||
return Err(DomainError::Validation(
|
||||
format!("storage key must not start with '/': {key}"),
|
||||
));
|
||||
}
|
||||
if key.split('/').any(|seg| seg == ".." || seg == ".") {
|
||||
return Err(DomainError::Validation(
|
||||
format!("storage key contains invalid path segment: {key}"),
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageWriter for ObjectStorageAdapter {
|
||||
async fn put(&self, key: &str, data: DataStream) -> Result<(), DomainError> {
|
||||
validate_key(key)?;
|
||||
let path = self.path(key);
|
||||
let mut upload = self
|
||||
.store
|
||||
.put_multipart(&path)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
let mut stream = data;
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = upload.put_part(bytes.into()).await {
|
||||
let _ = upload.abort().await;
|
||||
return Err(DomainError::Internal(e.to_string()));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = upload.abort().await;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
upload.complete().await.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, key: &str) -> Result<(), DomainError> {
|
||||
validate_key(key)?;
|
||||
let path = self.path(key);
|
||||
match self.store.delete(&path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(OsError::NotFound { .. }) => Ok(()),
|
||||
Err(e) => Err(DomainError::Internal(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageReader for ObjectStorageAdapter {
|
||||
async fn get(&self, key: &str) -> Result<DataStream, DomainError> {
|
||||
validate_key(key)?;
|
||||
let path = self.path(key);
|
||||
let result = self
|
||||
.store
|
||||
.get(&path)
|
||||
.await
|
||||
.map_err(|e| map_err(e, key))?;
|
||||
let s = result
|
||||
.into_stream()
|
||||
.map(|r| r.map_err(|e| DomainError::Internal(e.to_string())));
|
||||
Ok(Box::pin(s))
|
||||
}
|
||||
|
||||
async fn list(&self, prefix: Option<&str>) -> Result<Vec<String>, DomainError> {
|
||||
if let Some(p) = prefix {
|
||||
validate_key(p)?;
|
||||
}
|
||||
let list_prefix = match (prefix, self.prefix.is_empty()) {
|
||||
(Some(p), false) => Some(Path::from(format!("{}/{p}", self.prefix))),
|
||||
(Some(p), true) => Some(Path::from(p)),
|
||||
(None, false) => Some(Path::from(self.prefix.as_str())),
|
||||
(None, true) => None,
|
||||
};
|
||||
|
||||
let mut result = Vec::new();
|
||||
let mut stream = self.store.list(list_prefix.as_ref());
|
||||
while let Some(meta) = stream.next().await {
|
||||
let meta = meta.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
let key = meta.location.to_string();
|
||||
let stripped = if !self.prefix.is_empty() {
|
||||
key.strip_prefix(&format!("{}/", self.prefix))
|
||||
.ok_or_else(|| DomainError::Internal(format!(
|
||||
"listed key '{key}' does not start with expected prefix '{}'",
|
||||
self.prefix
|
||||
)))?
|
||||
.to_string()
|
||||
} else {
|
||||
key
|
||||
};
|
||||
result.push(stripped);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use domain::ports::{StorageReader, StorageWriter};
|
||||
use futures::stream;
|
||||
use object_store::memory::InMemory;
|
||||
|
||||
fn make_adapter() -> ObjectStorageAdapter {
|
||||
ObjectStorageAdapter::new(Arc::new(InMemory::new()), "test").unwrap()
|
||||
}
|
||||
|
||||
fn one_shot(data: &'static [u8]) -> DataStream {
|
||||
Box::pin(stream::once(async move { Ok(Bytes::from(data)) }))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn put_get_roundtrip() {
|
||||
let a = make_adapter();
|
||||
a.put("hello.txt", one_shot(b"world")).await.unwrap();
|
||||
let mut s = a.get("hello.txt").await.unwrap();
|
||||
let mut out = Vec::new();
|
||||
while let Some(chunk) = s.next().await {
|
||||
out.extend_from_slice(&chunk.unwrap());
|
||||
}
|
||||
assert_eq!(out, b"world");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_missing_is_not_found() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(a.get("nope.txt").await, Err(DomainError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_is_idempotent() {
|
||||
let a = make_adapter();
|
||||
a.delete("nope.txt").await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_removes_key() {
|
||||
let a = make_adapter();
|
||||
a.put("file.txt", one_shot(b"data")).await.unwrap();
|
||||
a.delete("file.txt").await.unwrap();
|
||||
assert!(matches!(a.get("file.txt").await, Err(DomainError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_returns_keys_under_prefix() {
|
||||
let a = make_adapter();
|
||||
a.put("docs/readme.txt", one_shot(b"x")).await.unwrap();
|
||||
a.put("docs/guide.txt", one_shot(b"y")).await.unwrap();
|
||||
a.put("other/file.txt", one_shot(b"z")).await.unwrap();
|
||||
let keys = a.list(Some("docs")).await.unwrap();
|
||||
assert_eq!(keys.len(), 2);
|
||||
assert!(keys.iter().any(|k| k.ends_with("readme.txt")));
|
||||
assert!(keys.iter().any(|k| k.ends_with("guide.txt")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_none_returns_all() {
|
||||
let a = make_adapter();
|
||||
a.put("a.txt", one_shot(b"1")).await.unwrap();
|
||||
a.put("b.txt", one_shot(b"2")).await.unwrap();
|
||||
let keys = a.list(None).await.unwrap();
|
||||
assert_eq!(keys.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_empty_key() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(a.put("", one_shot(b"x")).await, Err(DomainError::Validation(_))));
|
||||
assert!(matches!(a.get("").await, Err(DomainError::Validation(_))));
|
||||
assert!(matches!(a.delete("").await, Err(DomainError::Validation(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_absolute_key() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(
|
||||
a.put("/etc/passwd", one_shot(b"x")).await,
|
||||
Err(DomainError::Validation(_))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_path_traversal() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(a.get("../escape").await, Err(DomainError::Validation(_))));
|
||||
assert!(matches!(a.get("a/../../../etc").await, Err(DomainError::Validation(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_dot_segment() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(
|
||||
a.put("./file.txt", one_shot(b"x")).await,
|
||||
Err(DomainError::Validation(_))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_invalid_list_prefix() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(a.list(Some("")).await, Err(DomainError::Validation(_))));
|
||||
assert!(matches!(a.list(Some("../escape")).await, Err(DomainError::Validation(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn put_overwrites_existing() {
|
||||
let a = make_adapter();
|
||||
a.put("file.txt", one_shot(b"version1")).await.unwrap();
|
||||
a.put("file.txt", one_shot(b"version2")).await.unwrap();
|
||||
let mut s = a.get("file.txt").await.unwrap();
|
||||
let mut out = Vec::new();
|
||||
while let Some(chunk) = s.next().await {
|
||||
out.extend_from_slice(&chunk.unwrap());
|
||||
}
|
||||
assert_eq!(out, b"version2");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_returns_exact_key_paths() {
|
||||
let a = make_adapter();
|
||||
a.put("docs/readme.txt", one_shot(b"x")).await.unwrap();
|
||||
let mut keys = a.list(Some("docs")).await.unwrap();
|
||||
keys.sort();
|
||||
assert_eq!(keys, vec!["docs/readme.txt"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn put_bytes_get_bytes_roundtrip() {
|
||||
let a = make_adapter();
|
||||
a.put_bytes("data.bin", Bytes::from("hello bytes")).await.unwrap();
|
||||
let got = a.get_bytes("data.bin").await.unwrap();
|
||||
assert_eq!(got.as_ref(), b"hello bytes");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_bytes_missing_is_not_found() {
|
||||
let a = make_adapter();
|
||||
assert!(matches!(a.get_bytes("nope.bin").await, Err(DomainError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_rejects_traversal_prefix() {
|
||||
let result = ObjectStorageAdapter::new(Arc::new(InMemory::new()), "../evil");
|
||||
assert!(matches!(result, Err(DomainError::Validation(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_rejects_absolute_prefix() {
|
||||
let result = ObjectStorageAdapter::new(Arc::new(InMemory::new()), "/root");
|
||||
assert!(matches!(result, Err(DomainError::Validation(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_accepts_empty_prefix() {
|
||||
assert!(ObjectStorageAdapter::new(Arc::new(InMemory::new()), "").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_accepts_valid_prefix() {
|
||||
assert!(ObjectStorageAdapter::new(Arc::new(InMemory::new()), "my-bucket/data").is_ok());
|
||||
}
|
||||
}
|
||||
90
crates/adapters/storage/src/config.rs
Normal file
90
crates/adapters/storage/src/config.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
use std::sync::Arc;
|
||||
use anyhow::{Context, Result};
|
||||
use object_store::ObjectStore;
|
||||
use object_store::local::LocalFileSystem;
|
||||
|
||||
/// All storage configuration. Populate once via `from_env()` and pass
|
||||
/// explicitly to `build_store` and `ObjectStorageAdapter::new`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StorageConfig {
|
||||
pub backend: String,
|
||||
pub prefix: String,
|
||||
// local backend:
|
||||
pub local_path: Option<String>,
|
||||
// s3/minio backend:
|
||||
pub s3_endpoint: Option<String>,
|
||||
pub s3_access_key_id: Option<String>,
|
||||
pub s3_secret_access_key: Option<String>,
|
||||
pub s3_bucket: Option<String>,
|
||||
pub s3_region: Option<String>,
|
||||
// gcs backend:
|
||||
pub gcs_bucket: Option<String>,
|
||||
}
|
||||
|
||||
impl StorageConfig {
|
||||
pub fn from_env() -> Result<Self> {
|
||||
Ok(Self {
|
||||
backend: std::env::var("STORAGE_BACKEND")
|
||||
.context("STORAGE_BACKEND must be set (local, s3, gcs)")?,
|
||||
prefix: std::env::var("STORAGE_PREFIX").unwrap_or_default(),
|
||||
local_path: std::env::var("STORAGE_PATH").ok(),
|
||||
s3_endpoint: std::env::var("S3_ENDPOINT").ok(),
|
||||
s3_access_key_id: std::env::var("S3_ACCESS_KEY_ID").ok(),
|
||||
s3_secret_access_key: std::env::var("S3_SECRET_ACCESS_KEY").ok(),
|
||||
s3_bucket: std::env::var("S3_BUCKET").ok(),
|
||||
s3_region: std::env::var("S3_REGION").ok(),
|
||||
gcs_bucket: std::env::var("GCS_BUCKET").ok(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_store(config: &StorageConfig) -> Result<Arc<dyn ObjectStore>> {
|
||||
match config.backend.as_str() {
|
||||
"local" => {
|
||||
let path = config.local_path.as_deref()
|
||||
.context("STORAGE_PATH must be set when STORAGE_BACKEND=local")?;
|
||||
std::fs::create_dir_all(path)
|
||||
.with_context(|| format!("failed to create storage dir: {path}"))?;
|
||||
let store = LocalFileSystem::new_with_prefix(path)?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
#[cfg(feature = "s3")]
|
||||
"s3" => {
|
||||
use object_store::aws::AmazonS3Builder;
|
||||
let store = AmazonS3Builder::new()
|
||||
.with_endpoint(
|
||||
config.s3_endpoint.as_deref().context("S3_ENDPOINT must be set")?,
|
||||
)
|
||||
.with_access_key_id(
|
||||
config.s3_access_key_id.as_deref()
|
||||
.context("S3_ACCESS_KEY_ID must be set")?,
|
||||
)
|
||||
.with_secret_access_key(
|
||||
config.s3_secret_access_key.as_deref()
|
||||
.context("S3_SECRET_ACCESS_KEY must be set")?,
|
||||
)
|
||||
.with_bucket_name(
|
||||
config.s3_bucket.as_deref().context("S3_BUCKET must be set")?,
|
||||
)
|
||||
.with_region(config.s3_region.as_deref().unwrap_or("us-east-1"))
|
||||
.with_allow_http(true)
|
||||
.build()?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
#[cfg(feature = "gcs")]
|
||||
"gcs" => {
|
||||
use object_store::gcp::GoogleCloudStorageBuilder;
|
||||
let store = GoogleCloudStorageBuilder::new()
|
||||
.with_bucket_name(
|
||||
config.gcs_bucket.as_deref().context("GCS_BUCKET must be set")?,
|
||||
)
|
||||
.build()?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
other => anyhow::bail!(
|
||||
"unknown STORAGE_BACKEND={other:?}; compiled features: local{}{}",
|
||||
if cfg!(feature = "s3") { ", s3" } else { "" },
|
||||
if cfg!(feature = "gcs") { ", gcs" } else { "" },
|
||||
),
|
||||
}
|
||||
}
|
||||
5
crates/adapters/storage/src/lib.rs
Normal file
5
crates/adapters/storage/src/lib.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod adapter;
|
||||
pub mod config;
|
||||
|
||||
pub use adapter::ObjectStorageAdapter;
|
||||
pub use config::{build_store, StorageConfig};
|
||||
@@ -10,7 +10,8 @@ path = "src/main.rs"
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
adapters-auth = { workspace = true }
|
||||
adapters-auth = { workspace = true }
|
||||
adapters-storage = { workspace = true }
|
||||
presentation = { workspace = true }
|
||||
adapters-sqlite = { path = "../adapters/sqlite" }
|
||||
tokio = { workspace = true }
|
||||
|
||||
@@ -11,6 +11,15 @@ path = "src/main.rs"
|
||||
domain = { workspace = true }
|
||||
application = { workspace = true }
|
||||
adapters-auth = { workspace = true }
|
||||
{% if storage and storage_s3 and storage_gcs %}
|
||||
adapters-storage = { workspace = true, features = ["s3", "gcs"] }
|
||||
{% elsif storage and storage_s3 %}
|
||||
adapters-storage = { workspace = true, features = ["s3"] }
|
||||
{% elsif storage and storage_gcs %}
|
||||
adapters-storage = { workspace = true, features = ["gcs"] }
|
||||
{% elsif storage %}
|
||||
adapters-storage = { workspace = true }
|
||||
{% endif %}
|
||||
presentation = { workspace = true }
|
||||
{% if database == "sqlite" %}
|
||||
adapters-sqlite = { path = "../adapters/sqlite" }
|
||||
|
||||
28
crates/bootstrap/src/config.rs.liquid
Normal file
28
crates/bootstrap/src/config.rs.liquid
Normal file
@@ -0,0 +1,28 @@
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub database_url: String,
|
||||
pub jwt_secret: String,
|
||||
pub cors_allowed_origins: Vec<String>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn from_env() -> Self {
|
||||
dotenvy::dotenv().ok();
|
||||
Self {
|
||||
host: std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()),
|
||||
port: std::env::var("PORT")
|
||||
.ok()
|
||||
.and_then(|p| p.parse().ok())
|
||||
.unwrap_or(3000),
|
||||
database_url: std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
|
||||
jwt_secret: std::env::var("JWT_SECRET").expect("JWT_SECRET must be set"),
|
||||
cors_allowed_origins: std::env::var("CORS_ALLOWED_ORIGINS")
|
||||
.unwrap_or_else(|_| "http://localhost:3000".to_string())
|
||||
.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
// If you chose postgres at cargo generate time, replace adapters_sqlite with
|
||||
// adapters_postgres throughout this file (connect, run_migrations, PostgresUserRepository).
|
||||
use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use axum::Router;
|
||||
@@ -8,6 +6,7 @@ use tower_http::{cors::{Any, CorsLayer}, trace::TraceLayer};
|
||||
|
||||
use adapters_auth::{BcryptPasswordHasher, JwtTokenIssuer};
|
||||
use adapters_sqlite::{connect, run_migrations, SqliteUserRepository};
|
||||
use adapters_storage::{ObjectStorageAdapter, StorageConfig, build_store};
|
||||
use application::use_cases::{GetProfile, LoginUser, RegisterUser};
|
||||
use presentation::{routes::app_router, state::AppState};
|
||||
|
||||
@@ -25,7 +24,13 @@ pub async fn build_app(config: &Config) -> Result<Router> {
|
||||
let login_uc = Arc::new(LoginUser::new(user_repo.clone(), hasher, issuer.clone()));
|
||||
let get_profile_uc = Arc::new(GetProfile::new(user_repo));
|
||||
|
||||
let state = AppState::new(register_uc, login_uc, get_profile_uc, issuer);
|
||||
let storage_cfg = StorageConfig::from_env()?;
|
||||
let store = build_store(&storage_cfg)?;
|
||||
// To inject storage into a use case, clone it into the constructor:
|
||||
// let my_uc = Arc::new(MyUseCase::new(repo, storage.clone()));
|
||||
let storage = Arc::new(ObjectStorageAdapter::new(store, &storage_cfg.prefix)?);
|
||||
|
||||
let state = AppState::new(register_uc, login_uc, get_profile_uc, issuer, storage);
|
||||
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin(
|
||||
|
||||
62
crates/bootstrap/src/factory.rs.liquid
Normal file
62
crates/bootstrap/src/factory.rs.liquid
Normal file
@@ -0,0 +1,62 @@
|
||||
use std::sync::Arc;
|
||||
use anyhow::Result;
|
||||
use axum::Router;
|
||||
use axum::http::HeaderValue;
|
||||
use tower_http::{cors::{Any, CorsLayer}, trace::TraceLayer};
|
||||
|
||||
use adapters_auth::{BcryptPasswordHasher, JwtTokenIssuer};
|
||||
{% if database == "sqlite" %}
|
||||
use adapters_sqlite::{connect, run_migrations, SqliteUserRepository};
|
||||
{% endif %}
|
||||
{% if database == "postgres" %}
|
||||
use adapters_postgres::{connect, run_migrations, PostgresUserRepository};
|
||||
{% endif %}
|
||||
{% if storage %}
|
||||
use adapters_storage::{ObjectStorageAdapter, StorageConfig, build_store};
|
||||
{% endif %}
|
||||
use application::use_cases::{GetProfile, LoginUser, RegisterUser};
|
||||
use presentation::{routes::app_router, state::AppState};
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
pub async fn build_app(config: &Config) -> Result<Router> {
|
||||
let pool = connect(&config.database_url).await?;
|
||||
run_migrations(&pool).await?;
|
||||
|
||||
{% if database == "sqlite" %}
|
||||
let user_repo = Arc::new(SqliteUserRepository::new(pool));
|
||||
{% endif %}
|
||||
{% if database == "postgres" %}
|
||||
let user_repo = Arc::new(PostgresUserRepository::new(pool));
|
||||
{% endif %}
|
||||
let hasher = Arc::new(BcryptPasswordHasher);
|
||||
let issuer = Arc::new(JwtTokenIssuer::new(&config.jwt_secret));
|
||||
|
||||
let register_uc = Arc::new(RegisterUser::new(user_repo.clone(), hasher.clone()));
|
||||
let login_uc = Arc::new(LoginUser::new(user_repo.clone(), hasher, issuer.clone()));
|
||||
let get_profile_uc = Arc::new(GetProfile::new(user_repo));
|
||||
|
||||
{% if storage %}
|
||||
let storage_cfg = StorageConfig::from_env()?;
|
||||
let store = build_store(&storage_cfg)?;
|
||||
// To inject storage into a use case, clone it into the constructor:
|
||||
// let my_uc = Arc::new(MyUseCase::new(repo, storage.clone()));
|
||||
let storage = Arc::new(ObjectStorageAdapter::new(store, &storage_cfg.prefix)?);
|
||||
{% endif %}
|
||||
|
||||
let state = AppState::new(register_uc, login_uc, get_profile_uc, issuer{% if storage %}, storage{% endif %});
|
||||
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin(
|
||||
config.cors_allowed_origins.iter()
|
||||
.filter_map(|o| o.parse::<HeaderValue>().ok())
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.allow_methods(Any)
|
||||
.allow_headers(Any);
|
||||
|
||||
Ok(app_router()
|
||||
.with_state(state)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.layer(cors))
|
||||
}
|
||||
@@ -9,3 +9,5 @@ chrono = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
mod auth;
|
||||
mod storage;
|
||||
mod user_repo;
|
||||
|
||||
pub use auth::{PasswordHasher, TokenIssuer};
|
||||
pub use storage::{DataStream, StoragePort, StorageReader, StorageWriter};
|
||||
pub use user_repo::UserRepository;
|
||||
|
||||
7
crates/domain/src/ports/mod.rs.liquid
Normal file
7
crates/domain/src/ports/mod.rs.liquid
Normal file
@@ -0,0 +1,7 @@
|
||||
mod auth;
|
||||
{% if storage %}mod storage;{% endif %}
|
||||
mod user_repo;
|
||||
|
||||
pub use auth::{PasswordHasher, TokenIssuer};
|
||||
{% if storage %}pub use storage::{DataStream, StoragePort, StorageReader, StorageWriter};{% endif %}
|
||||
pub use user_repo::UserRepository;
|
||||
52
crates/domain/src/ports/storage.rs
Normal file
52
crates/domain/src/ports/storage.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::{self, BoxStream, StreamExt};
|
||||
use crate::errors::DomainError;
|
||||
|
||||
pub type DataStream = BoxStream<'static, Result<Bytes, DomainError>>;
|
||||
|
||||
/// Read operations on object storage. Keys are full paths relative to the adapter root.
|
||||
#[async_trait]
|
||||
pub trait StorageReader: Send + Sync {
|
||||
/// Returns the content of `key` as a stream. Returns `DomainError::NotFound` if absent.
|
||||
async fn get(&self, key: &str) -> Result<DataStream, DomainError>;
|
||||
|
||||
/// Lists all keys whose path begins with `prefix`, or all keys when `prefix` is `None`.
|
||||
/// Returned keys are **full paths from the adapter root**, not relative to `prefix`.
|
||||
/// Example: `list(Some("docs"))` returns `["docs/readme.txt"]`, not `["readme.txt"]`.
|
||||
async fn list(&self, prefix: Option<&str>) -> Result<Vec<String>, DomainError>;
|
||||
|
||||
/// Convenience: reads the entire content of `key` into memory. Wraps `get`.
|
||||
async fn get_bytes(&self, key: &str) -> Result<Bytes, DomainError> {
|
||||
let mut stream = self.get(key).await?;
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
buf.extend_from_slice(&chunk?);
|
||||
}
|
||||
Ok(Bytes::from(buf))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write operations on object storage.
|
||||
#[async_trait]
|
||||
pub trait StorageWriter: Send + Sync {
|
||||
/// Stores `data` at `key`. Overwrites any existing content at that key silently.
|
||||
async fn put(&self, key: &str, data: DataStream) -> Result<(), DomainError>;
|
||||
|
||||
/// Deletes `key`. Returns `Ok(())` even if the key does not exist (idempotent).
|
||||
async fn delete(&self, key: &str) -> Result<(), DomainError>;
|
||||
|
||||
/// Convenience: stores an in-memory buffer at `key`. Wraps `put`.
|
||||
async fn put_bytes(&self, key: &str, data: Bytes) -> Result<(), DomainError> {
|
||||
self.put(key, Box::pin(stream::once(async move { Ok(data) }))).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Combined read + write storage interface.
|
||||
///
|
||||
/// **Usage note:** `Arc<dyn StoragePort>` is the intended DI type everywhere.
|
||||
/// `StorageReader` and `StorageWriter` exist for implementation clarity, but Rust does not
|
||||
/// support narrowing `Arc<dyn StoragePort>` to `Arc<dyn StorageReader>` at runtime.
|
||||
/// Inject `Arc<dyn StoragePort>` into constructors and pass `.clone()` from the factory.
|
||||
pub trait StoragePort: StorageReader + StorageWriter {}
|
||||
impl<T: StorageReader + StorageWriter> StoragePort for T {}
|
||||
27
crates/presentation/src/handlers/storage_example.rs
Normal file
27
crates/presentation/src/handlers/storage_example.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Example: stream a stored file as an HTTP response.
|
||||
// Remove this file or replace with your own handlers.
|
||||
//
|
||||
// To use, add to your router:
|
||||
// .route("/files/*key", get(storage_example::get_file))
|
||||
//
|
||||
// use axum::{
|
||||
// body::Body,
|
||||
// extract::{Path, State},
|
||||
// http::StatusCode,
|
||||
// response::IntoResponse,
|
||||
// };
|
||||
// use futures::StreamExt;
|
||||
// use crate::state::AppState;
|
||||
//
|
||||
// pub async fn get_file(
|
||||
// Path(key): Path<String>,
|
||||
// State(state): State<AppState>,
|
||||
// ) -> Result<impl IntoResponse, StatusCode> {
|
||||
// let stream = state
|
||||
// .storage
|
||||
// .get(&key)
|
||||
// .await
|
||||
// .map_err(|_| StatusCode::NOT_FOUND)?;
|
||||
// let body = Body::from_stream(stream.map(|r| r.map_err(|e| e.to_string())));
|
||||
// Ok(body)
|
||||
// }
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
use application::use_cases::{GetProfile, LoginUser, RegisterUser};
|
||||
use domain::ports::TokenIssuer;
|
||||
use domain::ports::{StoragePort, TokenIssuer};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
@@ -8,6 +8,9 @@ pub struct AppState {
|
||||
pub login_uc: Arc<LoginUser>,
|
||||
pub get_profile_uc: Arc<GetProfile>,
|
||||
pub token_issuer: Arc<dyn TokenIssuer>,
|
||||
/// Direct storage access for handlers. Use cases that need storage should receive
|
||||
/// `Arc<dyn StoragePort>` in their own constructor rather than reading it from `AppState`.
|
||||
pub storage: Arc<dyn StoragePort>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@@ -16,7 +19,8 @@ impl AppState {
|
||||
login_uc: Arc<LoginUser>,
|
||||
get_profile_uc: Arc<GetProfile>,
|
||||
token_issuer: Arc<dyn TokenIssuer>,
|
||||
storage: Arc<dyn StoragePort>,
|
||||
) -> Self {
|
||||
Self { register_uc, login_uc, get_profile_uc, token_issuer }
|
||||
Self { register_uc, login_uc, get_profile_uc, token_issuer, storage }
|
||||
}
|
||||
}
|
||||
|
||||
28
crates/presentation/src/state.rs.liquid
Normal file
28
crates/presentation/src/state.rs.liquid
Normal file
@@ -0,0 +1,28 @@
|
||||
use std::sync::Arc;
|
||||
use application::use_cases::{GetProfile, LoginUser, RegisterUser};
|
||||
{% if storage %}
|
||||
use domain::ports::{StoragePort, TokenIssuer};
|
||||
{% else %}
|
||||
use domain::ports::TokenIssuer;
|
||||
{% endif %}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub register_uc: Arc<RegisterUser>,
|
||||
pub login_uc: Arc<LoginUser>,
|
||||
pub get_profile_uc: Arc<GetProfile>,
|
||||
pub token_issuer: Arc<dyn TokenIssuer>,
|
||||
{% if storage %}pub storage: Arc<dyn StoragePort>,{% endif %}
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(
|
||||
register_uc: Arc<RegisterUser>,
|
||||
login_uc: Arc<LoginUser>,
|
||||
get_profile_uc: Arc<GetProfile>,
|
||||
token_issuer: Arc<dyn TokenIssuer>,
|
||||
{% if storage %}storage: Arc<dyn StoragePort>,{% endif %}
|
||||
) -> Self {
|
||||
Self { register_uc, login_uc, get_profile_uc, token_issuer{% if storage %}, storage{% endif %} }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user