diff --git a/.env.example b/.env.example index 2ebd92d..6696f43 100644 --- a/.env.example +++ b/.env.example @@ -38,12 +38,22 @@ SECURE_COOKIES=false ALLOW_REGISTRATION=false RATE_LIMIT=20 POSTER_FETCH_TIMEOUT_SECONDS=30 -EVENT_CHANNEL_BUFFER=128 -# NATS event bus (optional — falls back to in-memory channel when unset) +# Event bus — controls background poster sync and other async work. +# Priority: NATS (if NATS_URL set) → DB queue (default, uses existing DB) +# The worker binary must run alongside the presentation to process events. + +# Option A: DB queue (default — no extra infrastructure needed) +# Events are stored in the same database as the app and polled by the worker. +# EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms) +# EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10) +# EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5) + +# Option B: NATS (at-least-once delivery, recommended for higher throughput) # NATS_URL=nats://localhost:4222 # NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget) # NATS_SUBJECT_PREFIX=movies-diary.events # NATS_STREAM_NAME=MOVIES_DIARY_EVENTS # NATS_CONSUMER_NAME=worker -RUST_LOG=presentation=debug,tower_http=debug + +RUST_LOG=presentation=debug,tower_http=debug,worker=info,application=info diff --git a/Dockerfile b/Dockerfile index 4519fca..7f9de61 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,10 +17,12 @@ COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.toml COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml -COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml -COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml -COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml +COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml +COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml +COPY crates/adapters/sqlite-event-queue/Cargo.toml crates/adapters/sqlite-event-queue/Cargo.toml +COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml +COPY crates/adapters/postgres-event-queue/Cargo.toml crates/adapters/postgres-event-queue/Cargo.toml COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml diff --git a/README.md b/README.md index effab9c..0288428 100644 --- a/README.md +++ b/README.md @@ -34,10 +34,14 @@ adapters/ template-askama — Askama HTML rendering rss — RSS/Atom feed generation export — CSV and JSON diary serialization - event-publisher — async event channel for background poster sync - activitypub — ActivityPub federation (follow, inbox/outbox, actor) - activitypub-base — core ActivityPub types and repository traits + sqlite-event-queue — polling event queue backed by SQLite (DB-queue mode) + postgres-event-queue — polling event queue backed by PostgreSQL (DB-queue mode) + nats — NATS Core / JetStream event publisher and consumer + event-publisher — in-memory event channel (used in tests) + activitypub — ActivityPub federation (follow, inbox/outbox, actor) + activitypub-base — core ActivityPub types and repository traits doc — OpenAPI spec assembly and Swagger UI / Scalar serving +worker — standalone worker binary (polls the event queue, syncs posters) tui — terminal UI client (ratatui) ``` @@ -85,16 +89,28 @@ PORT=3000 RATE_LIMIT=60 # requests per minute per IP (default: 60) ALLOW_REGISTRATION=true # set to false to disable new sign-ups SECURE_COOKIES=true # set when serving over HTTPS -RUST_LOG=presentation=info,tower_http=info +RUST_LOG=presentation=info,tower_http=info,worker=info,application=info + +# Event bus — background poster sync runs in the worker binary. +# Default: DB queue (uses the same database, no extra infrastructure). +# Set NATS_URL to use NATS instead (JetStream recommended for at-least-once delivery). +# NATS_URL=nats://localhost:4222 +``` + +The `worker` binary must run alongside `presentation` to process events: + +```bash +cargo run -p worker ``` ## Run ```bash -cargo run -p presentation +cargo run -p presentation # HTTP server (0.0.0.0:3000) +cargo run -p worker # event worker (poster sync, in a separate terminal) ``` -Server listens on `0.0.0.0:3000` by default. +The worker polls the event queue and must run alongside the presentation to process background tasks like poster fetching. Both processes share the same database. ## API diff --git a/crates/adapters/postgres-event-queue/Cargo.toml b/crates/adapters/postgres-event-queue/Cargo.toml index 961236f..1bbb445 100644 --- a/crates/adapters/postgres-event-queue/Cargo.toml +++ b/crates/adapters/postgres-event-queue/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono", "uuid"] } +sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono"] } domain = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/adapters/postgres-event-queue/migrations/0001_event_queue.sql b/crates/adapters/postgres-event-queue/migrations/0100_event_queue.sql similarity index 100% rename from crates/adapters/postgres-event-queue/migrations/0001_event_queue.sql rename to crates/adapters/postgres-event-queue/migrations/0100_event_queue.sql diff --git a/crates/adapters/postgres-event-queue/src/migrations.rs b/crates/adapters/postgres-event-queue/src/migrations.rs index 66fb3be..ef49936 100644 --- a/crates/adapters/postgres-event-queue/src/migrations.rs +++ b/crates/adapters/postgres-event-queue/src/migrations.rs @@ -1,5 +1,6 @@ pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> { sqlx::migrate!("./migrations") + .set_ignore_missing(true) .run(pool) .await .map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}")) diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index c2c4edd..b58ad6e 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -57,6 +57,7 @@ impl PostgresRepository { pub async fn migrate(&self) -> Result<(), DomainError> { sqlx::migrate!("./migrations") + .set_ignore_missing(true) .run(&self.pool) .await .map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e))) diff --git a/crates/adapters/sqlite-event-queue/migrations/0001_event_queue.sql b/crates/adapters/sqlite-event-queue/migrations/0100_event_queue.sql similarity index 100% rename from crates/adapters/sqlite-event-queue/migrations/0001_event_queue.sql rename to crates/adapters/sqlite-event-queue/migrations/0100_event_queue.sql diff --git a/crates/adapters/sqlite-event-queue/src/migrations.rs b/crates/adapters/sqlite-event-queue/src/migrations.rs index 1a26dca..56c8921 100644 --- a/crates/adapters/sqlite-event-queue/src/migrations.rs +++ b/crates/adapters/sqlite-event-queue/src/migrations.rs @@ -1,5 +1,6 @@ pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> { sqlx::migrate!("./migrations") + .set_ignore_missing(true) .run(pool) .await .map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}")) diff --git a/crates/adapters/sqlite/src/migrations.rs b/crates/adapters/sqlite/src/migrations.rs index 02eacfd..3df6071 100644 --- a/crates/adapters/sqlite/src/migrations.rs +++ b/crates/adapters/sqlite/src/migrations.rs @@ -3,6 +3,7 @@ use sqlx::SqlitePool; pub(crate) async fn run(pool: &SqlitePool) -> Result<(), DomainError> { sqlx::migrate!("./migrations") + .set_ignore_missing(true) .run(pool) .await .map_err(|e| DomainError::InfrastructureError(e.to_string()))