feat: vertical slice — migrations, postgres adapters, presentation handlers, bootstrap wiring
This commit is contained in:
@@ -153,14 +153,12 @@ impl AssetMetadataRepository for PostgresAssetMetadataRepository {
|
||||
asset_id: &SystemId,
|
||||
source: MetadataSource,
|
||||
) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2",
|
||||
)
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
sqlx::query("DELETE FROM asset_metadata WHERE asset_id = $1 AND metadata_source = $2")
|
||||
.bind(*asset_id.as_uuid())
|
||||
.bind(source_to_str(&source))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
entities::StorageVolume,
|
||||
errors::DomainError,
|
||||
ports::StorageVolumeRepository,
|
||||
entities::StorageVolume, errors::DomainError, ports::StorageVolumeRepository,
|
||||
value_objects::SystemId,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -42,7 +42,10 @@ impl PostgresUserRepository {
|
||||
|
||||
#[async_trait]
|
||||
impl UserRepository for PostgresUserRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
async fn find_by_id(
|
||||
&self,
|
||||
id: &SystemId,
|
||||
) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1",
|
||||
)
|
||||
@@ -54,7 +57,10 @@ impl UserRepository for PostgresUserRepository {
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_email(&self, email: &Email) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
async fn find_by_email(
|
||||
&self,
|
||||
email: &Email,
|
||||
) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE email = $1",
|
||||
)
|
||||
@@ -66,7 +72,10 @@ impl UserRepository for PostgresUserRepository {
|
||||
row.map(TryInto::try_into).transpose()
|
||||
}
|
||||
|
||||
async fn find_by_username(&self, username: &str) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
async fn find_by_username(
|
||||
&self,
|
||||
username: &str,
|
||||
) -> Result<Option<domain::entities::User>, DomainError> {
|
||||
let row = sqlx::query_as::<_, UserRow>(
|
||||
"SELECT id, username, email, password_hash, created_at FROM users WHERE username = $1",
|
||||
)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use domain::errors::DomainError;
|
||||
use domain::ports::{DataStream, StorageReader, StorageWriter};
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
@@ -44,12 +44,10 @@ impl FileStoragePort for LocalFileStorage {
|
||||
|
||||
async fn read_file(&self, path: &str) -> Result<Bytes, DomainError> {
|
||||
let full = self.resolve(path)?;
|
||||
let data = tokio::fs::read(&full)
|
||||
.await
|
||||
.map_err(|e| match e.kind() {
|
||||
std::io::ErrorKind::NotFound => DomainError::NotFound(path.to_string()),
|
||||
_ => DomainError::Internal(format!("Failed to read file: {e}")),
|
||||
})?;
|
||||
let data = tokio::fs::read(&full).await.map_err(|e| match e.kind() {
|
||||
std::io::ErrorKind::NotFound => DomainError::NotFound(path.to_string()),
|
||||
_ => DomainError::Internal(format!("Failed to read file: {e}")),
|
||||
})?;
|
||||
Ok(Bytes::from(data))
|
||||
}
|
||||
|
||||
|
||||
@@ -79,9 +79,7 @@ impl AssetResponse {
|
||||
domain::value_objects::MetadataValue::Float(f) => {
|
||||
serde_json::json!(*f)
|
||||
}
|
||||
domain::value_objects::MetadataValue::Boolean(b) => {
|
||||
serde_json::Value::Bool(*b)
|
||||
}
|
||||
domain::value_objects::MetadataValue::Boolean(b) => serde_json::Value::Bool(*b),
|
||||
domain::value_objects::MetadataValue::Null => serde_json::Value::Null,
|
||||
};
|
||||
(k.clone(), json_val)
|
||||
|
||||
@@ -23,7 +23,10 @@ pub async fn create_album(
|
||||
creator_id: claims.user_id,
|
||||
};
|
||||
let album = state.create_album_handler.execute(cmd).await?;
|
||||
Ok((StatusCode::CREATED, Json(AlbumResponse::from_domain(&album))))
|
||||
Ok((
|
||||
StatusCode::CREATED,
|
||||
Json(AlbumResponse::from_domain(&album)),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn get_album(
|
||||
|
||||
@@ -40,43 +40,43 @@ pub async fn ingest(
|
||||
match name.as_str() {
|
||||
"file" => {
|
||||
filename = field.file_name().map(|s| s.to_string());
|
||||
let data = field
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Internal(e.to_string()))
|
||||
})?;
|
||||
let data = field.bytes().await.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Internal(e.to_string()))
|
||||
})?;
|
||||
file_data = Some(data);
|
||||
}
|
||||
"target_path_id" => {
|
||||
let text = field
|
||||
.text()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Validation(e.to_string()))
|
||||
})?;
|
||||
let text = field.text().await.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Validation(e.to_string()))
|
||||
})?;
|
||||
target_path_id = Some(text.parse::<uuid::Uuid>().map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Validation(e.to_string()))
|
||||
})?);
|
||||
}
|
||||
"client_device_id" => {
|
||||
client_device_id = field
|
||||
.text()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Validation(e.to_string()))
|
||||
})?;
|
||||
client_device_id = field.text().await.map_err(|e| {
|
||||
AppError::from(domain::errors::DomainError::Validation(e.to_string()))
|
||||
})?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let data = file_data
|
||||
.ok_or_else(|| AppError::from(domain::errors::DomainError::Validation("Missing file field".to_string())))?;
|
||||
let fname = filename
|
||||
.ok_or_else(|| AppError::from(domain::errors::DomainError::Validation("Missing filename".to_string())))?;
|
||||
let path_id = target_path_id
|
||||
.ok_or_else(|| AppError::from(domain::errors::DomainError::Validation("Missing target_path_id".to_string())))?;
|
||||
let data = file_data.ok_or_else(|| {
|
||||
AppError::from(domain::errors::DomainError::Validation(
|
||||
"Missing file field".to_string(),
|
||||
))
|
||||
})?;
|
||||
let fname = filename.ok_or_else(|| {
|
||||
AppError::from(domain::errors::DomainError::Validation(
|
||||
"Missing filename".to_string(),
|
||||
))
|
||||
})?;
|
||||
let path_id = target_path_id.ok_or_else(|| {
|
||||
AppError::from(domain::errors::DomainError::Validation(
|
||||
"Missing target_path_id".to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(&data);
|
||||
|
||||
@@ -18,7 +18,10 @@ pub async fn register_volume(
|
||||
is_writable: req.is_writable,
|
||||
};
|
||||
let volume = state.register_volume_handler.execute(cmd).await?;
|
||||
Ok((StatusCode::CREATED, Json(VolumeResponse::from_domain(&volume))))
|
||||
Ok((
|
||||
StatusCode::CREATED,
|
||||
Json(VolumeResponse::from_domain(&volume)),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn register_library_path(
|
||||
@@ -33,5 +36,8 @@ pub async fn register_library_path(
|
||||
is_ingest_destination: req.is_ingest_destination,
|
||||
};
|
||||
let path = state.register_library_path_handler.execute(cmd).await?;
|
||||
Ok((StatusCode::CREATED, Json(LibraryPathResponse::from_domain(&path))))
|
||||
Ok((
|
||||
StatusCode::CREATED,
|
||||
Json(LibraryPathResponse::from_domain(&path)),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -18,7 +18,10 @@ pub fn api_v1_router() -> Router<AppState> {
|
||||
.route("/albums", post(albums::create_album))
|
||||
.route("/albums/:id", get(albums::get_album))
|
||||
.route("/albums/:id/entries", post(albums::add_entry))
|
||||
.route("/albums/:id/entries/:asset_id", delete(albums::remove_entry))
|
||||
.route(
|
||||
"/albums/:id/entries/:asset_id",
|
||||
delete(albums::remove_entry),
|
||||
)
|
||||
// assets
|
||||
.route("/assets/ingest", post(assets::ingest))
|
||||
.route("/assets/timeline", get(assets::timeline))
|
||||
@@ -26,7 +29,10 @@ pub fn api_v1_router() -> Router<AppState> {
|
||||
.route("/assets/:id/metadata", put(assets::update_metadata))
|
||||
// storage
|
||||
.route("/storage/volumes", post(storage::register_volume))
|
||||
.route("/storage/library-paths", post(storage::register_library_path))
|
||||
.route(
|
||||
"/storage/library-paths",
|
||||
post(storage::register_library_path),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn app_router() -> Router<AppState> {
|
||||
|
||||
@@ -21,8 +21,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
let config = config::WorkerConfig::from_env();
|
||||
info!("Worker starting");
|
||||
|
||||
let _pool = adapters_sqlite::connect(&config.database_url).await?;
|
||||
adapters_sqlite::run_migrations(&_pool).await?;
|
||||
let _pool = adapters_postgres::connect(&config.database_url).await?;
|
||||
adapters_postgres::run_migrations(&_pool).await?;
|
||||
|
||||
let interval = Duration::from_secs(config.example_job_interval_secs);
|
||||
let runner = JobRunner::new().register(Arc::new(ExampleJob), interval);
|
||||
|
||||
Reference in New Issue
Block a user