feat: auth hardening + codebase quality sweep
Refresh tokens: RefreshToken entity, PostgresRefreshTokenRepository, login returns refresh token, POST /auth/refresh (rotation), POST /auth/logout, JWT expiry 24h→1h, configurable via with_expiry(). Route protection: require_auth middleware on protected routes, public routes split (register, login, refresh, sharing/access). Authorization: caller_id added to ReadAssetFileQuery, ReadDerivativeQuery, GetStackQuery, DeleteStackCommand with ownership checks. Admin-only gates on processing, storage, sidecar, duplicates handlers. Quality fixes: visibility filtering bypass in search(), unwrap panics in date parsing, DRY auth header parsing, centralized parsers module, email validation via email_address crate, value objects (Username, MimeType, RelativePath), domain events (UserCreated, UserDeleted, AlbumCreated, TagCreated, DuplicateDetected), postgres error mapping for constraint violations, OptionExt::or_not_found helper, in_memory_repo! macro, GetStackQuery moved to queries, album add_entry 200→201.
This commit is contained in:
@@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::watch;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use application::processing::{EnqueueJobCommand, ProcessNextJobCommand};
|
||||
@@ -44,7 +45,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
let sidecar_writer: Arc<dyn domain::ports::SidecarWriterPort> =
|
||||
Arc::new(adapters_sidecar::XmpSidecarWriter);
|
||||
|
||||
// Publisher transport consumes a client clone; the consumer gets another.
|
||||
let pub_transport = adapters_nats::NatsTransport::new(nats_client.clone());
|
||||
let nats_publisher: Arc<dyn domain::ports::EventPublisher> = Arc::new(
|
||||
adapters_event_transport::EventPublisherAdapter::new(pub_transport),
|
||||
@@ -72,16 +72,45 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
let enqueue = Arc::new(build_enqueue_handler(&repos, event_pub));
|
||||
|
||||
// ── Shutdown signal ───────────────────────────────────────────────
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ctrl_c = tokio::signal::ctrl_c();
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {},
|
||||
_ = sigterm.recv() => {},
|
||||
}
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
ctrl_c.await.ok();
|
||||
}
|
||||
info!("shutdown signal received");
|
||||
shutdown_tx.send(true).ok();
|
||||
});
|
||||
|
||||
// ── Fallback sweep task ────────────────────────────────────────────
|
||||
let sweep_interval = Duration::from_secs(config.fallback_sweep_secs);
|
||||
let sweep_handler = Arc::clone(&process_next);
|
||||
let mut sweep_shutdown = shutdown_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
info!(
|
||||
every_secs = config.fallback_sweep_secs,
|
||||
"fallback sweep task started"
|
||||
);
|
||||
loop {
|
||||
tokio::time::sleep(sweep_interval).await;
|
||||
tokio::select! {
|
||||
_ = sweep_shutdown.changed() => {
|
||||
info!("sweep task: shutting down");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(sweep_interval) => {}
|
||||
}
|
||||
info!("fallback sweep: draining queued jobs");
|
||||
loop {
|
||||
match sweep_handler.execute(ProcessNextJobCommand).await {
|
||||
@@ -104,69 +133,79 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
info!("event loop: listening for NATS events");
|
||||
let mut stream = event_consumer.consume();
|
||||
let mut event_shutdown = shutdown_rx.clone();
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let envelope = match result {
|
||||
Ok(env) => env,
|
||||
Err(e) => {
|
||||
error!(error = %e, "event loop: consumer error");
|
||||
continue;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = event_shutdown.changed() => {
|
||||
info!("event loop: shutting down");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match &envelope.event {
|
||||
DomainEvent::AssetIngested { asset_id, .. } => {
|
||||
info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata");
|
||||
(envelope.ack)();
|
||||
let cmd = EnqueueJobCommand {
|
||||
job_type: JobType::ExtractMetadata,
|
||||
priority: 10,
|
||||
payload: StructuredData::new(),
|
||||
target_asset_id: Some(*asset_id),
|
||||
batch_id: None,
|
||||
};
|
||||
if let Err(e) = enqueue.execute(cmd).await {
|
||||
error!(error = %e, "event loop: failed to enqueue ExtractMetadata");
|
||||
}
|
||||
}
|
||||
DomainEvent::SidecarSyncRequested { asset_id, .. } => {
|
||||
info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar");
|
||||
(envelope.ack)();
|
||||
let cmd = EnqueueJobCommand {
|
||||
job_type: JobType::SyncSidecar,
|
||||
priority: 5,
|
||||
payload: StructuredData::new(),
|
||||
target_asset_id: Some(*asset_id),
|
||||
batch_id: None,
|
||||
};
|
||||
if let Err(e) = enqueue.execute(cmd).await {
|
||||
error!(error = %e, "event loop: failed to enqueue SyncSidecar");
|
||||
}
|
||||
}
|
||||
DomainEvent::JobEnqueued {
|
||||
job_id, job_type, ..
|
||||
} => {
|
||||
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process");
|
||||
(envelope.ack)();
|
||||
match process_next.execute(ProcessNextJobCommand).await {
|
||||
Ok(Some(job)) => {
|
||||
info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job");
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("event loop: JobEnqueued but no queued job found");
|
||||
}
|
||||
msg = stream.next() => {
|
||||
let Some(result) = msg else { break };
|
||||
let envelope = match result {
|
||||
Ok(env) => env,
|
||||
Err(e) => {
|
||||
error!(error = %e, "event loop: error processing job");
|
||||
error!(error = %e, "event loop: consumer error");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match &envelope.event {
|
||||
DomainEvent::AssetIngested { asset_id, .. } => {
|
||||
info!(asset_id = %asset_id, "event loop: AssetIngested → enqueue ExtractMetadata");
|
||||
(envelope.ack)();
|
||||
let cmd = EnqueueJobCommand {
|
||||
job_type: JobType::ExtractMetadata,
|
||||
priority: 10,
|
||||
payload: StructuredData::new(),
|
||||
target_asset_id: Some(*asset_id),
|
||||
batch_id: None,
|
||||
};
|
||||
if let Err(e) = enqueue.execute(cmd).await {
|
||||
error!(error = %e, "event loop: failed to enqueue ExtractMetadata");
|
||||
}
|
||||
}
|
||||
DomainEvent::SidecarSyncRequested { asset_id, .. } => {
|
||||
info!(asset_id = %asset_id, "event loop: SidecarSyncRequested → enqueue SyncSidecar");
|
||||
(envelope.ack)();
|
||||
let cmd = EnqueueJobCommand {
|
||||
job_type: JobType::SyncSidecar,
|
||||
priority: 5,
|
||||
payload: StructuredData::new(),
|
||||
target_asset_id: Some(*asset_id),
|
||||
batch_id: None,
|
||||
};
|
||||
if let Err(e) = enqueue.execute(cmd).await {
|
||||
error!(error = %e, "event loop: failed to enqueue SyncSidecar");
|
||||
}
|
||||
}
|
||||
DomainEvent::JobEnqueued {
|
||||
job_id, job_type, ..
|
||||
} => {
|
||||
info!(job_id = %job_id, job_type = %job_type, "event loop: JobEnqueued → process");
|
||||
(envelope.ack)();
|
||||
match process_next.execute(ProcessNextJobCommand).await {
|
||||
Ok(Some(job)) => {
|
||||
info!(job_id = %job.job_id, status = ?job.status, "event loop: processed job");
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("event loop: JobEnqueued but no queued job found");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, "event loop: error processing job");
|
||||
}
|
||||
}
|
||||
}
|
||||
other => {
|
||||
(envelope.ack)();
|
||||
tracing::debug!(event = ?other, "event loop: unhandled event, acked");
|
||||
}
|
||||
}
|
||||
}
|
||||
other => {
|
||||
(envelope.ack)();
|
||||
tracing::debug!(event = ?other, "event loop: unhandled event, acked");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error!("event loop: NATS stream ended unexpectedly");
|
||||
info!("worker shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user