feat: add SQLite and PostgreSQL event queue adapters with migrations

This commit is contained in:
2026-05-10 17:46:16 +02:00
parent 37b0e07055
commit dca50b46d1
10 changed files with 45 additions and 13 deletions

View File

@@ -38,12 +38,22 @@ SECURE_COOKIES=false
ALLOW_REGISTRATION=false ALLOW_REGISTRATION=false
RATE_LIMIT=20 RATE_LIMIT=20
POSTER_FETCH_TIMEOUT_SECONDS=30 POSTER_FETCH_TIMEOUT_SECONDS=30
EVENT_CHANNEL_BUFFER=128
# NATS event bus (optional — falls back to in-memory channel when unset) # Event bus — controls background poster sync and other async work.
# Priority: NATS (if NATS_URL set) → DB queue (default, uses existing DB)
# The worker binary must run alongside the presentation to process events.
# Option A: DB queue (default — no extra infrastructure needed)
# Events are stored in the same database as the app and polled by the worker.
# EVENT_QUEUE_POLL_INTERVAL_MS=500 # polling interval (default: 500ms)
# EVENT_QUEUE_BATCH_SIZE=10 # rows claimed per poll cycle (default: 10)
# EVENT_QUEUE_MAX_ATTEMPTS=5 # retries before dead-lettering (default: 5)
# Option B: NATS (at-least-once delivery, recommended for higher throughput)
# NATS_URL=nats://localhost:4222 # NATS_URL=nats://localhost:4222
# NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget) # NATS_MODE=jetstream # "jetstream" (default, at-least-once) or "core" (fire-and-forget)
# NATS_SUBJECT_PREFIX=movies-diary.events # NATS_SUBJECT_PREFIX=movies-diary.events
# NATS_STREAM_NAME=MOVIES_DIARY_EVENTS # NATS_STREAM_NAME=MOVIES_DIARY_EVENTS
# NATS_CONSUMER_NAME=worker # NATS_CONSUMER_NAME=worker
RUST_LOG=presentation=debug,tower_http=debug
RUST_LOG=presentation=debug,tower_http=debug,worker=info,application=info

View File

@@ -17,10 +17,12 @@ COPY crates/adapters/poster-fetcher/Cargo.toml crates/adapters/poster-fetcher
COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml COPY crates/adapters/poster-storage/Cargo.toml crates/adapters/poster-storage/Cargo.toml
COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.toml COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.toml
COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml
COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml
COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml
COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml COPY crates/adapters/sqlite-event-queue/Cargo.toml crates/adapters/sqlite-event-queue/Cargo.toml
COPY crates/adapters/postgres/Cargo.toml crates/adapters/postgres/Cargo.toml
COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml COPY crates/adapters/postgres-federation/Cargo.toml crates/adapters/postgres-federation/Cargo.toml
COPY crates/adapters/postgres-event-queue/Cargo.toml crates/adapters/postgres-event-queue/Cargo.toml
COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml COPY crates/adapters/template-askama/Cargo.toml crates/adapters/template-askama/Cargo.toml
COPY crates/application/Cargo.toml crates/application/Cargo.toml COPY crates/application/Cargo.toml crates/application/Cargo.toml
COPY crates/domain/Cargo.toml crates/domain/Cargo.toml COPY crates/domain/Cargo.toml crates/domain/Cargo.toml

View File

@@ -34,10 +34,14 @@ adapters/
template-askama — Askama HTML rendering template-askama — Askama HTML rendering
rss — RSS/Atom feed generation rss — RSS/Atom feed generation
export — CSV and JSON diary serialization export — CSV and JSON diary serialization
event-publisher — async event channel for background poster sync sqlite-event-queue — polling event queue backed by SQLite (DB-queue mode)
activitypub — ActivityPub federation (follow, inbox/outbox, actor) postgres-event-queue — polling event queue backed by PostgreSQL (DB-queue mode)
activitypub-base — core ActivityPub types and repository traits nats — NATS Core / JetStream event publisher and consumer
event-publisher — in-memory event channel (used in tests)
activitypub — ActivityPub federation (follow, inbox/outbox, actor)
activitypub-base — core ActivityPub types and repository traits
doc — OpenAPI spec assembly and Swagger UI / Scalar serving doc — OpenAPI spec assembly and Swagger UI / Scalar serving
worker — standalone worker binary (polls the event queue, syncs posters)
tui — terminal UI client (ratatui) tui — terminal UI client (ratatui)
``` ```
@@ -85,16 +89,28 @@ PORT=3000
RATE_LIMIT=60 # requests per minute per IP (default: 60) RATE_LIMIT=60 # requests per minute per IP (default: 60)
ALLOW_REGISTRATION=true # set to false to disable new sign-ups ALLOW_REGISTRATION=true # set to false to disable new sign-ups
SECURE_COOKIES=true # set when serving over HTTPS SECURE_COOKIES=true # set when serving over HTTPS
RUST_LOG=presentation=info,tower_http=info RUST_LOG=presentation=info,tower_http=info,worker=info,application=info
# Event bus — background poster sync runs in the worker binary.
# Default: DB queue (uses the same database, no extra infrastructure).
# Set NATS_URL to use NATS instead (JetStream recommended for at-least-once delivery).
# NATS_URL=nats://localhost:4222
```
The `worker` binary must run alongside `presentation` to process events:
```bash
cargo run -p worker
``` ```
## Run ## Run
```bash ```bash
cargo run -p presentation cargo run -p presentation # HTTP server (0.0.0.0:3000)
cargo run -p worker # event worker (poster sync, in a separate terminal)
``` ```
Server listens on `0.0.0.0:3000` by default. The worker polls the event queue and must run alongside the presentation to process background tasks like poster fetching. Both processes share the same database.
## API ## API

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono", "uuid"] } sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "postgres", "macros", "chrono"] }
domain = { workspace = true } domain = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }

View File

@@ -1,5 +1,6 @@
pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> { pub(crate) async fn run(pool: &sqlx::PgPool) -> anyhow::Result<()> {
sqlx::migrate!("./migrations") sqlx::migrate!("./migrations")
.set_ignore_missing(true)
.run(pool) .run(pool)
.await .await
.map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}")) .map_err(|e| anyhow::anyhow!("postgres-event-queue migration failed: {e}"))

View File

@@ -57,6 +57,7 @@ impl PostgresRepository {
pub async fn migrate(&self) -> Result<(), DomainError> { pub async fn migrate(&self) -> Result<(), DomainError> {
sqlx::migrate!("./migrations") sqlx::migrate!("./migrations")
.set_ignore_missing(true)
.run(&self.pool) .run(&self.pool)
.await .await
.map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e))) .map_err(|e| DomainError::InfrastructureError(format!("Migration failed: {}", e)))

View File

@@ -1,5 +1,6 @@
pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> { pub(crate) async fn run(pool: &sqlx::SqlitePool) -> anyhow::Result<()> {
sqlx::migrate!("./migrations") sqlx::migrate!("./migrations")
.set_ignore_missing(true)
.run(pool) .run(pool)
.await .await
.map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}")) .map_err(|e| anyhow::anyhow!("sqlite-event-queue migration failed: {e}"))

View File

@@ -3,6 +3,7 @@ use sqlx::SqlitePool;
pub(crate) async fn run(pool: &SqlitePool) -> Result<(), DomainError> { pub(crate) async fn run(pool: &SqlitePool) -> Result<(), DomainError> {
sqlx::migrate!("./migrations") sqlx::migrate!("./migrations")
.set_ignore_missing(true)
.run(pool) .run(pool)
.await .await
.map_err(|e| DomainError::InfrastructureError(e.to_string())) .map_err(|e| DomainError::InfrastructureError(e.to_string()))