From aadad3cfb0a6d8b95c933dd59478832e76b762d5 Mon Sep 17 00:00:00 2001 From: Gabriel Kaszewski Date: Tue, 2 Jun 2026 17:34:16 +0200 Subject: [PATCH] feat: Jellyfin/Plex auto-import via watch queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Webhook ingestion from media servers — movies land in a pending watch queue, user rates and confirms to create diary entries. - domain: WatchEvent, WebhookToken models, MediaServerParser port - adapters: jellyfin + plex parser crates, SQLite + Postgres repos - application: ingest/confirm/dismiss/cleanup use cases, token mgmt - presentation: webhook endpoints (bearer + query param auth), watch queue + integrations settings HTML pages, OpenAPI docs - worker: WatchEventCleanupJob (daily, 30d retention) Movie resolution deferred to confirm — single canonical path through log_review for enrichment, poster fetch, federation. --- Cargo.lock | 23 ++ Cargo.toml | 7 + Dockerfile | 2 + README.md | 25 ++ crates/adapters/event-payload/src/lib.rs | 24 ++ crates/adapters/jellyfin/Cargo.toml | 9 + crates/adapters/jellyfin/src/lib.rs | 132 +++++++ crates/adapters/nats/src/subject.rs | 1 + crates/adapters/plex/Cargo.toml | 9 + crates/adapters/plex/src/lib.rs | 172 +++++++++ .../postgres/migrations/0023_watch_events.sql | 28 ++ crates/adapters/postgres/src/lib.rs | 2 + crates/adapters/postgres/src/watch_event.rs | 317 +++++++++++++++++ crates/adapters/sqlite-federation/src/lib.rs | 5 +- .../adapters/sqlite-search/src/tests/lib.rs | 6 +- .../sqlite/migrations/0023_watch_events.sql | 28 ++ crates/adapters/sqlite/src/lib.rs | 2 + crates/adapters/sqlite/src/tests/persons.rs | 1 - crates/adapters/sqlite/src/watch_event.rs | 327 ++++++++++++++++++ crates/adapters/template-askama/src/lib.rs | 43 ++- .../template-askama/templates/base.html | 1 + .../templates/integrations.html | 92 +++++ .../template-askama/templates/profile.html | 1 + .../templates/profile_settings.html | 3 + .../templates/watch_queue.html | 68 ++++ crates/api-types/src/lib.rs | 2 + crates/api-types/src/webhook.rs | 61 ++++ crates/application/Cargo.toml | 3 + crates/application/src/commands.rs | 35 ++ crates/application/src/context.rs | 10 +- crates/application/src/jobs.rs | 25 ++ crates/application/src/ports.rs | 32 ++ crates/application/src/queries.rs | 8 + crates/application/src/test_helpers.rs | 22 +- crates/application/src/tests/worker.rs | 1 + .../src/use_cases/cleanup_watch_events.rs | 11 + .../src/use_cases/confirm_watch_events.rs | 65 ++++ .../src/use_cases/dismiss_watch_events.rs | 33 ++ .../src/use_cases/generate_webhook_token.rs | 38 ++ .../src/use_cases/get_watch_queue.rs | 11 + .../src/use_cases/get_webhook_tokens.rs | 11 + .../src/use_cases/ingest_watch_event.rs | 69 ++++ .../application/src/use_cases/log_review.rs | 2 +- crates/application/src/use_cases/mod.rs | 8 + .../src/use_cases/revoke_webhook_token.rs | 14 + crates/domain/src/events.rs | 5 + crates/domain/src/models/mod.rs | 5 + crates/domain/src/models/watch_event.rs | 236 +++++++++++++ crates/domain/src/ports.rs | 47 ++- crates/domain/src/testing.rs | 80 +++++ crates/domain/src/value_objects.rs | 2 + crates/presentation/Cargo.toml | 2 + crates/presentation/src/factory.rs | 17 +- crates/presentation/src/forms.rs | 37 ++ crates/presentation/src/handlers/html.rs | 227 +++++++++++- crates/presentation/src/handlers/mod.rs | 1 + crates/presentation/src/handlers/webhook.rs | 319 +++++++++++++++++ crates/presentation/src/main.rs | 4 + crates/presentation/src/openapi/mod.rs | 2 + crates/presentation/src/openapi/webhook.rs | 32 ++ crates/presentation/src/routes.rs | 66 +++- crates/presentation/src/tests/extractors.rs | 85 +++++ crates/presentation/tests/api_test.rs | 2 + crates/worker/src/db.rs | 17 +- crates/worker/src/main.rs | 9 +- 65 files changed, 2946 insertions(+), 38 deletions(-) create mode 100644 crates/adapters/jellyfin/Cargo.toml create mode 100644 crates/adapters/jellyfin/src/lib.rs create mode 100644 crates/adapters/plex/Cargo.toml create mode 100644 crates/adapters/plex/src/lib.rs create mode 100644 crates/adapters/postgres/migrations/0023_watch_events.sql create mode 100644 crates/adapters/postgres/src/watch_event.rs create mode 100644 crates/adapters/sqlite/migrations/0023_watch_events.sql create mode 100644 crates/adapters/sqlite/src/watch_event.rs create mode 100644 crates/adapters/template-askama/templates/integrations.html create mode 100644 crates/adapters/template-askama/templates/watch_queue.html create mode 100644 crates/api-types/src/webhook.rs create mode 100644 crates/application/src/use_cases/cleanup_watch_events.rs create mode 100644 crates/application/src/use_cases/confirm_watch_events.rs create mode 100644 crates/application/src/use_cases/dismiss_watch_events.rs create mode 100644 crates/application/src/use_cases/generate_webhook_token.rs create mode 100644 crates/application/src/use_cases/get_watch_queue.rs create mode 100644 crates/application/src/use_cases/get_webhook_tokens.rs create mode 100644 crates/application/src/use_cases/ingest_watch_event.rs create mode 100644 crates/application/src/use_cases/revoke_webhook_token.rs create mode 100644 crates/domain/src/models/watch_event.rs create mode 100644 crates/presentation/src/handlers/webhook.rs create mode 100644 crates/presentation/src/openapi/webhook.rs diff --git a/Cargo.lock b/Cargo.lock index 2a0eaf8..20d9b32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,6 +317,9 @@ dependencies = [ "chrono", "domain", "futures", + "hex", + "rand 0.9.4", + "sha2", "tokio", "tracing", "uuid", @@ -2755,6 +2758,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jellyfin" +version = "0.1.0" +dependencies = [ + "domain", + "serde", + "serde_json", +] + [[package]] name = "jni" version = "0.22.4" @@ -3764,6 +3776,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "plex" +version = "0.1.0" +dependencies = [ + "domain", + "serde", + "serde_json", +] + [[package]] name = "png" version = "0.18.1" @@ -3924,9 +3945,11 @@ dependencies = [ "image-storage", "importer", "infer", + "jellyfin", "metadata", "nats", "percent-encoding", + "plex", "poster-fetcher", "postgres", "postgres-event-queue", diff --git a/Cargo.toml b/Cargo.toml index e6b46c5..4c2846b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ members = [ "crates/tui", "crates/worker", "crates/adapters/importer", + "crates/adapters/jellyfin", + "crates/adapters/plex", "crates/adapters/sqlite-search", "crates/adapters/postgres-search", ] @@ -51,7 +53,10 @@ sqlx = { version = "0.8.6", features = [ "uuid", "macros", ] } +rand = "0.9" reqwest = { version = "0.13", features = ["json", "query"] } +sha2 = "0.10" +hex = "0.4" object_store = { version = "0.11", features = ["aws"] } axum = { version = "0.8.8", features = ["macros", "multipart"] } csv = "1" @@ -80,6 +85,8 @@ nats = { path = "crates/adapters/nats" } sqlite-event-queue = { path = "crates/adapters/sqlite-event-queue" } postgres-event-queue = { path = "crates/adapters/postgres-event-queue" } importer = { path = "crates/adapters/importer" } +jellyfin = { path = "crates/adapters/jellyfin" } +plex = { path = "crates/adapters/plex" } image-converter = { path = "crates/adapters/image-converter" } sqlite-search = { path = "crates/adapters/sqlite-search" } postgres-search = { path = "crates/adapters/postgres-search" } diff --git a/Dockerfile b/Dockerfile index c992049..3831dbb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,8 @@ COPY crates/adapters/image-storage/Cargo.toml crates/adapters/image-storage/ COPY crates/adapters/poster-sync/Cargo.toml crates/adapters/poster-sync/Cargo.toml COPY crates/adapters/export/Cargo.toml crates/adapters/export/Cargo.toml COPY crates/adapters/importer/Cargo.toml crates/adapters/importer/Cargo.toml +COPY crates/adapters/jellyfin/Cargo.toml crates/adapters/jellyfin/Cargo.toml +COPY crates/adapters/plex/Cargo.toml crates/adapters/plex/Cargo.toml COPY crates/adapters/rss/Cargo.toml crates/adapters/rss/Cargo.toml COPY crates/adapters/sqlite/Cargo.toml crates/adapters/sqlite/Cargo.toml COPY crates/adapters/sqlite-federation/Cargo.toml crates/adapters/sqlite-federation/Cargo.toml diff --git a/README.md b/README.md index 10d79f4..fbe44a8 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ A self-hosted, server-side rendered movie logging system with a full REST API. B - Federation moderation — instance-level domain blocking (admin-managed), per-user actor blocking with `Block` activity, delivery filter excludes blocked actors and blocked-domain inboxes - Watchlist — add movies to watch later, per-user; federated watchlist entries visible for remote actors - User profiles — display name, bio, avatar, banner, custom profile fields; editable via HTML settings page or REST API +- Jellyfin/Plex auto-import — media server sends a webhook on playback stop, movies land in a watch queue; review and confirm with a rating to create diary entries; per-user webhook tokens with SHA-256 auth; setup UI at `/settings/integrations` - CSV and JSON diary export - File importer: upload CSV, TSV, JSON, or XLSX from any source (Letterboxd, IMDb, etc.), map columns to domain fields via a step-by-step wizard or REST API, save mapping profiles for repeat imports - REST API v1 (`/api/v1/`) with full feature parity with the HTML interface @@ -48,6 +49,8 @@ adapters/ rss — RSS/Atom feed generation export — CSV and JSON diary serialization importer — CSV/TSV/JSON/XLSX parser and column mapper for bulk import + jellyfin — Jellyfin webhook payload parser (MediaServerParser adapter) + plex — Plex webhook payload parser (MediaServerParser adapter; requires Plex Pass) event-payload — shared event serialization DTOs (used by all event bus adapters) sqlite-event-queue — durable polling event queue backed by SQLite postgres-event-queue — durable polling event queue backed by PostgreSQL @@ -207,6 +210,28 @@ docker run \ To build for PostgreSQL: `--build-arg FEATURES=postgres,postgres-federation,nats` +## Media Server Integration + +Auto-log movies you finish watching. Go to `/settings/integrations` to generate a webhook token, then configure your media server. + +### Jellyfin + +1. Install the **Webhook** plugin (Dashboard > Plugins > Catalog) +2. Add a **Generic** destination: + - **URL**: `https://yourdomain.example.com/api/v1/webhooks/jellyfin` + - **Header**: `Authorization` = `Bearer ` + - **Send All Properties**: enabled + - **Notification Type**: Playback Stop only + - **Item Type**: Movies only + +### Plex (requires Plex Pass) + +1. Go to Settings > Webhooks in your Plex server +2. Add webhook URL: `https://yourdomain.example.com/api/v1/webhooks/plex` +3. Plex does not support custom headers natively — pass the token as a query param: `https://yourdomain.example.com/api/v1/webhooks/plex?token=` + +Movies you finish watching appear in your watch queue at `/watch-queue` — rate and confirm to add to your diary. + ## License MIT License. See [LICENSE](LICENSE). diff --git a/crates/adapters/event-payload/src/lib.rs b/crates/adapters/event-payload/src/lib.rs index d1c5dc3..54dc218 100644 --- a/crates/adapters/event-payload/src/lib.rs +++ b/crates/adapters/event-payload/src/lib.rs @@ -72,6 +72,11 @@ pub enum EventPayload { activity_json: String, signing_actor_id: String, }, + WatchEventIngested { + user_id: String, + title: String, + source: String, + }, } impl EventPayload { @@ -90,6 +95,7 @@ impl EventPayload { EventPayload::FollowAccepted { .. } => "FollowAccepted", EventPayload::BackfillFollower { .. } => "BackfillFollower", EventPayload::FederationDeliveryRequested { .. } => "FederationDeliveryRequested", + EventPayload::WatchEventIngested { .. } => "WatchEventIngested", } } } @@ -208,6 +214,15 @@ impl From<&DomainEvent> for EventPayload { activity_json: activity_json.clone(), signing_actor_id: signing_actor_id.to_string(), }, + DomainEvent::WatchEventIngested { + user_id, + title, + source, + } => EventPayload::WatchEventIngested { + user_id: user_id.value().to_string(), + title: title.clone(), + source: source.clone(), + }, } } } @@ -324,6 +339,15 @@ impl TryFrom for DomainEvent { activity_json, signing_actor_id: parse_uuid(&signing_actor_id, "signing_actor_id")?, }), + EventPayload::WatchEventIngested { + user_id, + title, + source, + } => Ok(DomainEvent::WatchEventIngested { + user_id: UserId::from_uuid(parse_uuid(&user_id, "user_id")?), + title, + source, + }), } } } diff --git a/crates/adapters/jellyfin/Cargo.toml b/crates/adapters/jellyfin/Cargo.toml new file mode 100644 index 0000000..7836826 --- /dev/null +++ b/crates/adapters/jellyfin/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "jellyfin" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/adapters/jellyfin/src/lib.rs b/crates/adapters/jellyfin/src/lib.rs new file mode 100644 index 0000000..1ee790b --- /dev/null +++ b/crates/adapters/jellyfin/src/lib.rs @@ -0,0 +1,132 @@ +use domain::{errors::DomainError, models::ParsedPlaybackEvent, ports::MediaServerParser}; +use serde::Deserialize; + +pub struct JellyfinParser; + +impl MediaServerParser for JellyfinParser { + fn parse_playback_event( + &self, + body: &[u8], + ) -> Result, DomainError> { + let payload: JellyfinPayload = serde_json::from_slice(body) + .map_err(|e| DomainError::ValidationError(format!("invalid Jellyfin payload: {e}")))?; + + if payload.notification_type != "PlaybackStop" { + return Ok(None); + } + + let item_type = payload.item_type.as_deref().unwrap_or(""); + if item_type != "Movie" { + return Ok(None); + } + + if !payload.played_to_completion.unwrap_or(false) { + return Ok(None); + } + + let title = match payload.name { + Some(t) if !t.is_empty() => t, + _ => return Ok(None), + }; + + let tmdb_id = payload.provider_tmdb.map(|id| format!("tmdb:{id}")); + let imdb_id = payload.provider_imdb; + + Ok(Some(ParsedPlaybackEvent { + title, + year: payload.year, + tmdb_id, + imdb_id, + })) + } +} + +#[derive(Deserialize)] +struct JellyfinPayload { + #[serde(rename = "NotificationType")] + notification_type: String, + #[serde(rename = "ItemType")] + item_type: Option, + #[serde(rename = "Name")] + name: Option, + #[serde(rename = "Year")] + year: Option, + #[serde(rename = "PlayedToCompletion")] + played_to_completion: Option, + #[serde(rename = "Provider_tmdb")] + provider_tmdb: Option, + #[serde(rename = "Provider_imdb")] + provider_imdb: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_valid_playback_stop() { + let body = serde_json::json!({ + "NotificationType": "PlaybackStop", + "ItemType": "Movie", + "Name": "Blade Runner", + "Year": 1982, + "PlayedToCompletion": true, + "Provider_tmdb": "78", + "Provider_imdb": "tt0083658" + }); + let parser = JellyfinParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + let event = result.expect("should parse"); + assert_eq!(event.title, "Blade Runner"); + assert_eq!(event.year, Some(1982)); + assert_eq!(event.tmdb_id, Some("tmdb:78".into())); + assert_eq!(event.imdb_id, Some("tt0083658".into())); + } + + #[test] + fn ignores_non_movie() { + let body = serde_json::json!({ + "NotificationType": "PlaybackStop", + "ItemType": "Episode", + "Name": "Some Episode", + "PlayedToCompletion": true + }); + let parser = JellyfinParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn ignores_incomplete_playback() { + let body = serde_json::json!({ + "NotificationType": "PlaybackStop", + "ItemType": "Movie", + "Name": "Blade Runner", + "PlayedToCompletion": false + }); + let parser = JellyfinParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn ignores_playback_start() { + let body = serde_json::json!({ + "NotificationType": "PlaybackStart", + "ItemType": "Movie", + "Name": "Blade Runner", + "PlayedToCompletion": false + }); + let parser = JellyfinParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } +} diff --git a/crates/adapters/nats/src/subject.rs b/crates/adapters/nats/src/subject.rs index 007e0d4..055555c 100644 --- a/crates/adapters/nats/src/subject.rs +++ b/crates/adapters/nats/src/subject.rs @@ -15,6 +15,7 @@ pub fn event_to_subject(prefix: &str, event: &DomainEvent) -> String { DomainEvent::FollowAccepted { .. } => "follow.accepted", DomainEvent::BackfillFollower { .. } => "backfill.follower", DomainEvent::FederationDeliveryRequested { .. } => "federation.delivery.requested", + DomainEvent::WatchEventIngested { .. } => "watch.event.ingested", }; format!("{prefix}.{suffix}") } diff --git a/crates/adapters/plex/Cargo.toml b/crates/adapters/plex/Cargo.toml new file mode 100644 index 0000000..1dbb842 --- /dev/null +++ b/crates/adapters/plex/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "plex" +version = "0.1.0" +edition = "2024" + +[dependencies] +domain = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/adapters/plex/src/lib.rs b/crates/adapters/plex/src/lib.rs new file mode 100644 index 0000000..4406662 --- /dev/null +++ b/crates/adapters/plex/src/lib.rs @@ -0,0 +1,172 @@ +use domain::{errors::DomainError, models::ParsedPlaybackEvent, ports::MediaServerParser}; +use serde::Deserialize; + +pub struct PlexParser; + +impl MediaServerParser for PlexParser { + /// Plex sends multipart form data with a `payload` JSON field. + /// The caller must extract the JSON string from the multipart body + /// and pass it here as raw bytes. + fn parse_playback_event( + &self, + body: &[u8], + ) -> Result, DomainError> { + let payload: PlexPayload = serde_json::from_slice(body) + .map_err(|e| DomainError::ValidationError(format!("invalid Plex payload: {e}")))?; + + if payload.event != "media.scrobble" { + return Ok(None); + } + + let metadata = match payload.metadata { + Some(m) => m, + None => return Ok(None), + }; + + if metadata.media_type != "movie" { + return Ok(None); + } + + if metadata.title.is_empty() { + return Ok(None); + } + + let mut tmdb_id = None; + let mut imdb_id = None; + for guid in &metadata.guids { + if let Some(id) = guid.id.strip_prefix("tmdb://") { + tmdb_id = Some(format!("tmdb:{id}")); + } else if let Some(id) = guid.id.strip_prefix("imdb://") { + imdb_id = Some(id.to_string()); + } + } + + Ok(Some(ParsedPlaybackEvent { + title: metadata.title, + year: metadata.year.map(|y| y as u16), + tmdb_id, + imdb_id, + })) + } +} + +#[derive(Deserialize)] +struct PlexPayload { + event: String, + #[serde(rename = "Metadata")] + metadata: Option, +} + +#[derive(Deserialize)] +struct PlexMetadata { + #[serde(rename = "type")] + media_type: String, + title: String, + year: Option, + #[serde(rename = "Guid", default)] + guids: Vec, +} + +#[derive(Deserialize)] +struct PlexGuid { + id: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_movie_scrobble() { + let body = serde_json::json!({ + "event": "media.scrobble", + "Metadata": { + "type": "movie", + "title": "Blade Runner", + "year": 1982, + "Guid": [ + {"id": "tmdb://78"}, + {"id": "imdb://tt0083658"} + ] + } + }); + let parser = PlexParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + let event = result.expect("should parse"); + assert_eq!(event.title, "Blade Runner"); + assert_eq!(event.year, Some(1982)); + assert_eq!(event.tmdb_id, Some("tmdb:78".into())); + assert_eq!(event.imdb_id, Some("tt0083658".into())); + } + + #[test] + fn ignores_tv_episode() { + let body = serde_json::json!({ + "event": "media.scrobble", + "Metadata": { + "type": "episode", + "title": "Pilot", + "grandparentTitle": "Breaking Bad", + "year": 2008, + "Guid": [] + } + }); + let parser = PlexParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn ignores_play_event() { + let body = serde_json::json!({ + "event": "media.play", + "Metadata": { + "type": "movie", + "title": "Blade Runner", + "year": 1982, + "Guid": [] + } + }); + let parser = PlexParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } + + #[test] + fn handles_no_guids() { + let body = serde_json::json!({ + "event": "media.scrobble", + "Metadata": { + "type": "movie", + "title": "Some Indie Film", + "year": 2023 + } + }); + let parser = PlexParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + let event = result.expect("should parse"); + assert_eq!(event.title, "Some Indie Film"); + assert!(event.tmdb_id.is_none()); + assert!(event.imdb_id.is_none()); + } + + #[test] + fn handles_missing_metadata() { + let body = serde_json::json!({ + "event": "media.scrobble" + }); + let parser = PlexParser; + let result = parser + .parse_playback_event(serde_json::to_vec(&body).unwrap().as_slice()) + .unwrap(); + assert!(result.is_none()); + } +} diff --git a/crates/adapters/postgres/migrations/0023_watch_events.sql b/crates/adapters/postgres/migrations/0023_watch_events.sql new file mode 100644 index 0000000..eaf0f26 --- /dev/null +++ b/crates/adapters/postgres/migrations/0023_watch_events.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS webhook_tokens ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + token_hash TEXT NOT NULL, + provider TEXT NOT NULL, + label TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_used_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_webhook_tokens_hash ON webhook_tokens(token_hash); +CREATE INDEX IF NOT EXISTS idx_webhook_tokens_user ON webhook_tokens(user_id); + +CREATE TABLE IF NOT EXISTS watch_events ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + movie_id TEXT REFERENCES movies(id) ON DELETE SET NULL, + title TEXT NOT NULL, + year INTEGER, + external_metadata_id TEXT, + source TEXT NOT NULL, + watched_at TIMESTAMPTZ NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_watch_events_user_status ON watch_events(user_id, status); +CREATE INDEX IF NOT EXISTS idx_watch_events_dedup ON watch_events(user_id, external_metadata_id, created_at); diff --git a/crates/adapters/postgres/src/lib.rs b/crates/adapters/postgres/src/lib.rs index 036ea4e..1892607 100644 --- a/crates/adapters/postgres/src/lib.rs +++ b/crates/adapters/postgres/src/lib.rs @@ -21,6 +21,7 @@ mod persons; mod profile; mod profile_fields; mod users; +mod watch_event; mod watchlist; use models::{ @@ -36,6 +37,7 @@ pub use persons::{PostgresPersonAdapter, create_person_adapter}; pub use profile::PostgresMovieProfileRepository; pub use profile_fields::PostgresProfileFieldsRepository; pub use users::PostgresUserRepository; +pub use watch_event::{PostgresWatchEventRepository, PostgresWebhookTokenRepository}; pub use watchlist::PostgresWatchlistRepository; fn format_year_month(ym: &str) -> String { diff --git a/crates/adapters/postgres/src/watch_event.rs b/crates/adapters/postgres/src/watch_event.rs new file mode 100644 index 0000000..2802630 --- /dev/null +++ b/crates/adapters/postgres/src/watch_event.rs @@ -0,0 +1,317 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{PersistedWatchEvent, WatchEvent, WatchEventSource, WatchEventStatus, WebhookToken}, + ports::{WatchEventRepository, WebhookTokenRepository}, + value_objects::{MovieId, UserId, WatchEventId, WebhookTokenId}, +}; +use sqlx::{PgPool, Row}; + +use crate::models::{parse_datetime, parse_uuid}; + +fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) +} + +// ── WatchEventRepository ────────────────────────────────────────────────────── + +pub struct PostgresWatchEventRepository { + pool: PgPool, +} + +impl PostgresWatchEventRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WatchEventRepository for PostgresWatchEventRepository { + async fn save(&self, event: &WatchEvent) -> Result<(), DomainError> { + let id = event.id().value().to_string(); + let user_id = event.user_id().value().to_string(); + let movie_id = event.movie_id().map(|m| m.value().to_string()); + let source = event.source().to_string(); + let status = event.status().to_string(); + + sqlx::query( + "INSERT INTO watch_events \ + (id, user_id, movie_id, title, year, external_metadata_id, source, watched_at, status, created_at) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", + ) + .bind(&id) + .bind(&user_id) + .bind(&movie_id) + .bind(event.title()) + .bind(event.year().map(|y| y as i32)) + .bind(event.external_metadata_id()) + .bind(&source) + .bind(event.watched_at()) + .bind(&status) + .bind(event.created_at()) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn update_status( + &self, + id: &WatchEventId, + status: WatchEventStatus, + ) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let status_str = status.to_string(); + + sqlx::query("UPDATE watch_events SET status = $1 WHERE id = $2") + .bind(&status_str) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn list_pending(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, \ + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \ + status, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at \ + FROM watch_events \ + WHERE user_id = $1 AND status = 'pending' \ + ORDER BY watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_watch_event).collect() + } + + async fn get_by_id(&self, id: &WatchEventId) -> Result, DomainError> { + let id_str = id.value().to_string(); + + let row = sqlx::query( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, \ + to_char(watched_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS watched_at, \ + status, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at \ + FROM watch_events WHERE id = $1", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_watch_event).transpose() + } + + async fn find_duplicate( + &self, + user_id: &UserId, + external_id: &str, + after: chrono::NaiveDateTime, + ) -> Result { + let uid = user_id.value().to_string(); + + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM watch_events \ + WHERE user_id = $1 AND external_metadata_id = $2 AND created_at > $3", + ) + .bind(&uid) + .bind(external_id) + .bind(after) + .fetch_one(&self.pool) + .await + .map_err(map_err)?; + + Ok(count > 0) + } + + async fn delete_non_pending_older_than( + &self, + before: chrono::NaiveDateTime, + ) -> Result { + let result = + sqlx::query("DELETE FROM watch_events WHERE status != 'pending' AND created_at < $1") + .bind(before) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(result.rows_affected()) + } +} + +fn row_to_watch_event(row: &sqlx::postgres::PgRow) -> Result { + let id_str: String = row.try_get("id").map_err(map_err)?; + let user_id_str: String = row.try_get("user_id").map_err(map_err)?; + let movie_id_str: Option = row.try_get("movie_id").map_err(map_err)?; + let title: String = row.try_get("title").map_err(map_err)?; + let year: Option = row.try_get("year").map_err(map_err)?; + let ext_id: Option = row.try_get("external_metadata_id").map_err(map_err)?; + let source_str: String = row.try_get("source").map_err(map_err)?; + let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?; + let status_str: String = row.try_get("status").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + + let source: WatchEventSource = source_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + let status: WatchEventStatus = status_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + + let movie_id = movie_id_str + .as_deref() + .map(parse_uuid) + .transpose()? + .map(MovieId::from_uuid); + + Ok(WatchEvent::from_persistence(PersistedWatchEvent { + id: WatchEventId::from_uuid(parse_uuid(&id_str)?), + user_id: UserId::from_uuid(parse_uuid(&user_id_str)?), + movie_id, + title, + year: year.map(|y| y as u16), + external_metadata_id: ext_id, + source, + watched_at: parse_datetime(&watched_at_str)?, + status, + created_at: parse_datetime(&created_at_str)?, + })) +} + +// ── WebhookTokenRepository ──────────────────────────────────────────────────── + +pub struct PostgresWebhookTokenRepository { + pool: PgPool, +} + +impl PostgresWebhookTokenRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WebhookTokenRepository for PostgresWebhookTokenRepository { + async fn save(&self, token: &WebhookToken) -> Result<(), DomainError> { + let id = token.id().value().to_string(); + let user_id = token.user_id().value().to_string(); + let provider = token.provider().to_string(); + + sqlx::query( + "INSERT INTO webhook_tokens \ + (id, user_id, token_hash, provider, label, created_at, last_used_at) \ + VALUES ($1, $2, $3, $4, $5, $6, $7)", + ) + .bind(&id) + .bind(&user_id) + .bind(token.token_hash()) + .bind(&provider) + .bind(token.label()) + .bind(token.created_at()) + .bind(token.last_used_at()) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn find_by_token_hash(&self, hash: &str) -> Result, DomainError> { + let row = sqlx::query( + "SELECT id, user_id, token_hash, provider, label, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(last_used_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS last_used_at \ + FROM webhook_tokens WHERE token_hash = $1", + ) + .bind(hash) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_webhook_token).transpose() + } + + async fn list_by_user(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, token_hash, provider, label, \ + to_char(created_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS created_at, \ + to_char(last_used_at AT TIME ZONE 'UTC', 'YYYY-MM-DD HH24:MI:SS') AS last_used_at \ + FROM webhook_tokens WHERE user_id = $1 ORDER BY created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_webhook_token).collect() + } + + async fn delete(&self, id: &WebhookTokenId, user_id: &UserId) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let uid = user_id.value().to_string(); + + let result = sqlx::query("DELETE FROM webhook_tokens WHERE id = $1 AND user_id = $2") + .bind(&id_str) + .bind(&uid) + .execute(&self.pool) + .await + .map_err(map_err)?; + + if result.rows_affected() == 0 { + return Err(DomainError::NotFound(format!("Webhook token {id_str}"))); + } + Ok(()) + } + + async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + + sqlx::query("UPDATE webhook_tokens SET last_used_at = NOW() WHERE id = $1") + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } +} + +fn row_to_webhook_token(row: &sqlx::postgres::PgRow) -> Result { + let id_str: String = row.try_get("id").map_err(map_err)?; + let user_id_str: String = row.try_get("user_id").map_err(map_err)?; + let token_hash: String = row.try_get("token_hash").map_err(map_err)?; + let provider_str: String = row.try_get("provider").map_err(map_err)?; + let label: Option = row.try_get("label").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + let last_used_str: Option = row.try_get("last_used_at").map_err(map_err)?; + + let provider: WatchEventSource = provider_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + + let last_used = last_used_str.as_deref().map(parse_datetime).transpose()?; + + Ok(WebhookToken::from_persistence( + WebhookTokenId::from_uuid(parse_uuid(&id_str)?), + UserId::from_uuid(parse_uuid(&user_id_str)?), + token_hash, + provider, + label, + parse_datetime(&created_at_str)?, + last_used, + )) +} diff --git a/crates/adapters/sqlite-federation/src/lib.rs b/crates/adapters/sqlite-federation/src/lib.rs index 5404a78..8d549b2 100644 --- a/crates/adapters/sqlite-federation/src/lib.rs +++ b/crates/adapters/sqlite-federation/src/lib.rs @@ -72,7 +72,10 @@ fn remote_actor_from_row(row: &sqlx::sqlite::SqliteRow, url_col: &str) -> Remote .and_then(|s| { chrono::NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") .map(|ndt| ndt.and_utc()) - .or_else(|_| chrono::DateTime::parse_from_rfc3339(&s).map(|dt| dt.with_timezone(&chrono::Utc))) + .or_else(|_| { + chrono::DateTime::parse_from_rfc3339(&s) + .map(|dt| dt.with_timezone(&chrono::Utc)) + }) .ok() }), } diff --git a/crates/adapters/sqlite-search/src/tests/lib.rs b/crates/adapters/sqlite-search/src/tests/lib.rs index bc42d16..25771dc 100644 --- a/crates/adapters/sqlite-search/src/tests/lib.rs +++ b/crates/adapters/sqlite-search/src/tests/lib.rs @@ -1,10 +1,8 @@ -use super::{create_search_adapter, SqliteSearchAdapter}; +use super::create_search_adapter; use domain::{ models::{ - collections::PageParams, EntityType, ExternalPersonId, IndexableDocument, Movie, Person, - PersonId, SearchFilters, SearchQuery, + collections::PageParams, EntityType, IndexableDocument, Movie, SearchFilters, SearchQuery, }, - ports::{SearchCommand, SearchPort}, value_objects::{MovieId, MovieTitle, ReleaseYear}, }; use sqlx::SqlitePool; diff --git a/crates/adapters/sqlite/migrations/0023_watch_events.sql b/crates/adapters/sqlite/migrations/0023_watch_events.sql new file mode 100644 index 0000000..7859e0f --- /dev/null +++ b/crates/adapters/sqlite/migrations/0023_watch_events.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS webhook_tokens ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + token_hash TEXT NOT NULL, + provider TEXT NOT NULL, + label TEXT, + created_at TEXT NOT NULL, + last_used_at TEXT +); + +CREATE INDEX IF NOT EXISTS idx_webhook_tokens_hash ON webhook_tokens(token_hash); +CREATE INDEX IF NOT EXISTS idx_webhook_tokens_user ON webhook_tokens(user_id); + +CREATE TABLE IF NOT EXISTS watch_events ( + id TEXT PRIMARY KEY NOT NULL, + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + movie_id TEXT REFERENCES movies(id) ON DELETE SET NULL, + title TEXT NOT NULL, + year INTEGER, + external_metadata_id TEXT, + source TEXT NOT NULL, + watched_at TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_watch_events_user_status ON watch_events(user_id, status); +CREATE INDEX IF NOT EXISTS idx_watch_events_dedup ON watch_events(user_id, external_metadata_id, created_at); diff --git a/crates/adapters/sqlite/src/lib.rs b/crates/adapters/sqlite/src/lib.rs index 370943e..a5a04c3 100644 --- a/crates/adapters/sqlite/src/lib.rs +++ b/crates/adapters/sqlite/src/lib.rs @@ -22,6 +22,7 @@ mod persons; mod profile; mod profile_fields; mod users; +mod watch_event; mod watchlist; use models::{ @@ -37,6 +38,7 @@ pub use persons::{SqlitePersonAdapter, create_person_adapter}; pub use profile::SqliteMovieProfileRepository; pub use profile_fields::SqliteProfileFieldsRepository; pub use users::SqliteUserRepository; +pub use watch_event::{SqliteWatchEventRepository, SqliteWebhookTokenRepository}; pub use watchlist::SqliteWatchlistRepository; pub fn create_profile_fields_repo( diff --git a/crates/adapters/sqlite/src/tests/persons.rs b/crates/adapters/sqlite/src/tests/persons.rs index 37f3524..8b355ff 100644 --- a/crates/adapters/sqlite/src/tests/persons.rs +++ b/crates/adapters/sqlite/src/tests/persons.rs @@ -1,6 +1,5 @@ use super::super::persons::SqlitePersonAdapter; use domain::{ - errors::DomainError, models::{ExternalPersonId, Person, PersonId}, ports::{PersonCommand, PersonQuery}, }; diff --git a/crates/adapters/sqlite/src/watch_event.rs b/crates/adapters/sqlite/src/watch_event.rs new file mode 100644 index 0000000..c8f1ab5 --- /dev/null +++ b/crates/adapters/sqlite/src/watch_event.rs @@ -0,0 +1,327 @@ +use async_trait::async_trait; +use domain::{ + errors::DomainError, + models::{PersistedWatchEvent, WatchEvent, WatchEventSource, WatchEventStatus, WebhookToken}, + ports::{WatchEventRepository, WebhookTokenRepository}, + value_objects::{MovieId, UserId, WatchEventId, WebhookTokenId}, +}; +use sqlx::{Row, SqlitePool}; + +use crate::models::datetime_to_str; + +fn map_err(e: sqlx::Error) -> DomainError { + tracing::error!("Database error: {:?}", e); + DomainError::InfrastructureError("Database operation failed".into()) +} + +fn parse_uuid(s: &str) -> Result { + s.parse() + .map_err(|_| DomainError::InfrastructureError(format!("invalid UUID: {s}"))) +} + +fn parse_datetime(s: &str) -> Result { + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) + .map_err(|_| DomainError::InfrastructureError(format!("invalid datetime: {s}"))) +} + +// ── WatchEventRepository ────────────────────────────────────────────────────── + +pub struct SqliteWatchEventRepository { + pool: SqlitePool, +} + +impl SqliteWatchEventRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WatchEventRepository for SqliteWatchEventRepository { + async fn save(&self, event: &WatchEvent) -> Result<(), DomainError> { + let id = event.id().value().to_string(); + let user_id = event.user_id().value().to_string(); + let movie_id = event.movie_id().map(|m| m.value().to_string()); + let source = event.source().to_string(); + let watched_at = datetime_to_str(event.watched_at()); + let status = event.status().to_string(); + let created_at = datetime_to_str(event.created_at()); + + sqlx::query( + "INSERT INTO watch_events \ + (id, user_id, movie_id, title, year, external_metadata_id, source, watched_at, status, created_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&user_id) + .bind(&movie_id) + .bind(event.title()) + .bind(event.year().map(|y| y as i64)) + .bind(event.external_metadata_id()) + .bind(&source) + .bind(&watched_at) + .bind(&status) + .bind(&created_at) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn update_status( + &self, + id: &WatchEventId, + status: WatchEventStatus, + ) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let status_str = status.to_string(); + + sqlx::query("UPDATE watch_events SET status = ? WHERE id = ?") + .bind(&status_str) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn list_pending(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, watched_at, status, created_at \ + FROM watch_events \ + WHERE user_id = ? AND status = 'pending' \ + ORDER BY watched_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_watch_event).collect() + } + + async fn get_by_id(&self, id: &WatchEventId) -> Result, DomainError> { + let id_str = id.value().to_string(); + + let row = sqlx::query( + "SELECT id, user_id, movie_id, title, year, external_metadata_id, \ + source, watched_at, status, created_at \ + FROM watch_events WHERE id = ?", + ) + .bind(&id_str) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_watch_event).transpose() + } + + async fn find_duplicate( + &self, + user_id: &UserId, + external_id: &str, + after: chrono::NaiveDateTime, + ) -> Result { + let uid = user_id.value().to_string(); + let after_str = datetime_to_str(&after); + + let count: i64 = sqlx::query( + "SELECT COUNT(*) FROM watch_events \ + WHERE user_id = ? AND external_metadata_id = ? AND created_at > ?", + ) + .bind(&uid) + .bind(external_id) + .bind(&after_str) + .fetch_one(&self.pool) + .await + .map_err(map_err)? + .try_get(0) + .unwrap_or(0); + + Ok(count > 0) + } + + async fn delete_non_pending_older_than( + &self, + before: chrono::NaiveDateTime, + ) -> Result { + let before_str = datetime_to_str(&before); + let result = + sqlx::query("DELETE FROM watch_events WHERE status != 'pending' AND created_at < ?") + .bind(&before_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + Ok(result.rows_affected()) + } +} + +fn row_to_watch_event(row: &sqlx::sqlite::SqliteRow) -> Result { + let id_str: &str = row.try_get("id").map_err(map_err)?; + let user_id_str: &str = row.try_get("user_id").map_err(map_err)?; + let movie_id_str: Option<&str> = row.try_get("movie_id").map_err(map_err)?; + let title: String = row.try_get("title").map_err(map_err)?; + let year: Option = row.try_get("year").map_err(map_err)?; + let ext_id: Option = row.try_get("external_metadata_id").map_err(map_err)?; + let source_str: String = row.try_get("source").map_err(map_err)?; + let watched_at_str: String = row.try_get("watched_at").map_err(map_err)?; + let status_str: String = row.try_get("status").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + + let source: WatchEventSource = source_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + let status: WatchEventStatus = status_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + + let movie_id = movie_id_str + .map(parse_uuid) + .transpose()? + .map(MovieId::from_uuid); + + Ok(WatchEvent::from_persistence(PersistedWatchEvent { + id: WatchEventId::from_uuid(parse_uuid(id_str)?), + user_id: UserId::from_uuid(parse_uuid(user_id_str)?), + movie_id, + title, + year: year.map(|y| y as u16), + external_metadata_id: ext_id, + source, + watched_at: parse_datetime(&watched_at_str)?, + status, + created_at: parse_datetime(&created_at_str)?, + })) +} + +// ── WebhookTokenRepository ──────────────────────────────────────────────────── + +pub struct SqliteWebhookTokenRepository { + pool: SqlitePool, +} + +impl SqliteWebhookTokenRepository { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl WebhookTokenRepository for SqliteWebhookTokenRepository { + async fn save(&self, token: &WebhookToken) -> Result<(), DomainError> { + let id = token.id().value().to_string(); + let user_id = token.user_id().value().to_string(); + let provider = token.provider().to_string(); + let created_at = datetime_to_str(token.created_at()); + let last_used = token.last_used_at().map(datetime_to_str); + + sqlx::query( + "INSERT INTO webhook_tokens \ + (id, user_id, token_hash, provider, label, created_at, last_used_at) \ + VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&user_id) + .bind(token.token_hash()) + .bind(&provider) + .bind(token.label()) + .bind(&created_at) + .bind(&last_used) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } + + async fn find_by_token_hash(&self, hash: &str) -> Result, DomainError> { + let row = sqlx::query( + "SELECT id, user_id, token_hash, provider, label, created_at, last_used_at \ + FROM webhook_tokens WHERE token_hash = ?", + ) + .bind(hash) + .fetch_optional(&self.pool) + .await + .map_err(map_err)?; + + row.as_ref().map(row_to_webhook_token).transpose() + } + + async fn list_by_user(&self, user_id: &UserId) -> Result, DomainError> { + let uid = user_id.value().to_string(); + + let rows = sqlx::query( + "SELECT id, user_id, token_hash, provider, label, created_at, last_used_at \ + FROM webhook_tokens WHERE user_id = ? ORDER BY created_at DESC", + ) + .bind(&uid) + .fetch_all(&self.pool) + .await + .map_err(map_err)?; + + rows.iter().map(row_to_webhook_token).collect() + } + + async fn delete(&self, id: &WebhookTokenId, user_id: &UserId) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let uid = user_id.value().to_string(); + + let result = sqlx::query("DELETE FROM webhook_tokens WHERE id = ? AND user_id = ?") + .bind(&id_str) + .bind(&uid) + .execute(&self.pool) + .await + .map_err(map_err)?; + + if result.rows_affected() == 0 { + return Err(DomainError::NotFound(format!("Webhook token {id_str}"))); + } + Ok(()) + } + + async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError> { + let id_str = id.value().to_string(); + let now = datetime_to_str(&chrono::Utc::now().naive_utc()); + + sqlx::query("UPDATE webhook_tokens SET last_used_at = ? WHERE id = ?") + .bind(&now) + .bind(&id_str) + .execute(&self.pool) + .await + .map_err(map_err)?; + + Ok(()) + } +} + +fn row_to_webhook_token(row: &sqlx::sqlite::SqliteRow) -> Result { + let id_str: &str = row.try_get("id").map_err(map_err)?; + let user_id_str: &str = row.try_get("user_id").map_err(map_err)?; + let token_hash: String = row.try_get("token_hash").map_err(map_err)?; + let provider_str: String = row.try_get("provider").map_err(map_err)?; + let label: Option = row.try_get("label").map_err(map_err)?; + let created_at_str: String = row.try_get("created_at").map_err(map_err)?; + let last_used_str: Option = row.try_get("last_used_at").map_err(map_err)?; + + let provider: WatchEventSource = provider_str + .parse() + .map_err(|e: String| DomainError::InfrastructureError(e))?; + + let last_used = last_used_str.map(|s| parse_datetime(&s)).transpose()?; + + Ok(WebhookToken::from_persistence( + WebhookTokenId::from_uuid(parse_uuid(id_str)?), + UserId::from_uuid(parse_uuid(user_id_str)?), + token_hash, + provider, + label, + parse_datetime(&created_at_str)?, + last_used, + )) +} diff --git a/crates/adapters/template-askama/src/lib.rs b/crates/adapters/template-askama/src/lib.rs index 4bf89b3..3eafd51 100644 --- a/crates/adapters/template-askama/src/lib.rs +++ b/crates/adapters/template-askama/src/lib.rs @@ -2,8 +2,9 @@ use application::ports::{ ActivityFeedPageData, BlockedActorEntry, BlockedActorsPageData, BlockedDomainEntry, BlockedDomainsPageData, FollowersPageData, FollowingPageData, HtmlPageContext, HtmlRenderer, ImportMappingPageData, ImportPreviewPageData, ImportPreviewRow, ImportProfileView, - ImportRowStatus, ImportUploadPageData, LoginPageData, MovieDetailPageData, NewReviewPageData, - ProfilePageData, ProfileSettingsPageData, RegisterPageData, UsersPageData, WatchlistPageData, + ImportRowStatus, ImportUploadPageData, IntegrationsPageData, LoginPageData, + MovieDetailPageData, NewReviewPageData, ProfilePageData, ProfileSettingsPageData, + RegisterPageData, UsersPageData, WatchQueuePageData, WatchlistPageData, WebhookTokenView, }; use askama::Template; use chrono::Datelike; @@ -366,6 +367,23 @@ struct ProfileSettingsTemplate<'a> { saved: bool, } +#[derive(Template)] +#[template(path = "integrations.html")] +struct IntegrationsTemplate<'a> { + ctx: &'a HtmlPageContext, + tokens: &'a [WebhookTokenView], + webhook_base_url: &'a str, + new_token: Option<&'a str>, +} + +#[derive(Template)] +#[template(path = "watch_queue.html")] +struct WatchQueueTemplate<'a> { + ctx: &'a HtmlPageContext, + entries: &'a [application::ports::WatchQueueDisplayEntry], + error: Option<&'a str>, +} + #[derive(Template)] #[template(path = "import_upload.html")] struct ImportUploadTemplate<'a> { @@ -750,4 +768,25 @@ impl HtmlRenderer for AskamaHtmlRenderer { .render() .map_err(|e| e.to_string()) } + + fn render_integrations_page(&self, data: IntegrationsPageData) -> Result { + IntegrationsTemplate { + ctx: &data.ctx, + tokens: &data.tokens, + webhook_base_url: &data.webhook_base_url, + new_token: data.new_token.as_deref(), + } + .render() + .map_err(|e| e.to_string()) + } + + fn render_watch_queue_page(&self, data: WatchQueuePageData) -> Result { + WatchQueueTemplate { + ctx: &data.ctx, + entries: &data.entries, + error: data.error.as_deref(), + } + .render() + .map_err(|e| e.to_string()) + } } diff --git a/crates/adapters/template-askama/templates/base.html b/crates/adapters/template-askama/templates/base.html index 081a79c..0b33b69 100644 --- a/crates/adapters/template-askama/templates/base.html +++ b/crates/adapters/template-askama/templates/base.html @@ -37,6 +37,7 @@ Profile Add Review Import + Queue Logout {% else %} Login diff --git a/crates/adapters/template-askama/templates/integrations.html b/crates/adapters/template-askama/templates/integrations.html new file mode 100644 index 0000000..f5440fd --- /dev/null +++ b/crates/adapters/template-askama/templates/integrations.html @@ -0,0 +1,92 @@ +{% extends "base.html" %} +{% block content %} +

Integrations

+

+ Profile Settings +

+ +
+

Jellyfin / Plex Webhook

+

+ Automatically log movies you finish watching. Configure your media server's + webhook plugin to POST to the URL below. +

+ +
+ + {{ webhook_base_url }}/api/v1/webhooks/jellyfin +
+ +
+ + {{ webhook_base_url }}/api/v1/webhooks/plex?token=YOUR_TOKEN +
+ +
+ Jellyfin setup +
    +
  1. Install the Webhook plugin (Dashboard → Plugins → Catalog)
  2. +
  3. Add Generic Destination with the Jellyfin URL above
  4. +
  5. Add header: Authorization = Bearer YOUR_TOKEN
  6. +
  7. Check Send All Properties
  8. +
  9. Notification Type: Playback Stop only
  10. +
  11. Item Type: Movies only
  12. +
+
+ +
+ Plex setup (requires Plex Pass) +
    +
  1. Go to Settings → Webhooks in your Plex server
  2. +
  3. Add the Plex URL above, replacing YOUR_TOKEN with your generated token
  4. +
  5. Plex automatically sends scrobble events when a movie is watched to 90%+
  6. +
+
+ + {% if let Some(token) = new_token %} +
+ New token (copy now — shown only once):
+ {{ token }} +
+ {% endif %} + +
+ + +
+ + +
+ +
+
+ +{% if !tokens.is_empty() %} +
+

Active Tokens

+
+ {% for t in tokens %} +
+
+
+ {{ t.provider }}{% if let Some(l) = &t.label %} — {{ l }}{% endif %} +
+
+ Created {{ t.created_at }} + {% if let Some(used) = &t.last_used_at %} + Last used {{ used }} + {% else %} + Never used + {% endif %} +
+
+ + +
+
+
+ {% endfor %} +
+
+{% endif %} +{% endblock %} diff --git a/crates/adapters/template-askama/templates/profile.html b/crates/adapters/template-askama/templates/profile.html index 8ec4016..7f4b91f 100644 --- a/crates/adapters/template-askama/templates/profile.html +++ b/crates/adapters/template-askama/templates/profile.html @@ -70,6 +70,7 @@

Account

Watchlist Profile settings + Integrations Blocked users {% if ctx.is_admin %} Admin — blocked domains diff --git a/crates/adapters/template-askama/templates/profile_settings.html b/crates/adapters/template-askama/templates/profile_settings.html index 807531b..f46e9dc 100644 --- a/crates/adapters/template-askama/templates/profile_settings.html +++ b/crates/adapters/template-askama/templates/profile_settings.html @@ -1,6 +1,9 @@ {% extends "base.html" %} {% block content %}

Profile Settings

+

+ Integrations (Jellyfin/Plex) +

{% if saved %}

Saved.

{% endif %} diff --git a/crates/adapters/template-askama/templates/watch_queue.html b/crates/adapters/template-askama/templates/watch_queue.html new file mode 100644 index 0000000..f5dfae2 --- /dev/null +++ b/crates/adapters/template-askama/templates/watch_queue.html @@ -0,0 +1,68 @@ +{% extends "base.html" %} +{% block content %} +
+
Watch Queue
+ + {% if let Some(err) = error %} +

{{ err }}

+ {% endif %} + + {% if entries.is_empty() %} +

+ No pending watches. + Connect Jellyfin to start auto-logging. +

+ {% else %} +

+ Movies you watched via Jellyfin. Rate and confirm to add to your diary, or dismiss. +

+
+ {% for entry in entries %} +
+
+
+ {% if let Some(url) = &entry.movie_url %} + {{ entry.title }} + {% else %} + {{ entry.title }} + {% endif %} + {% if let Some(y) = entry.year %} + ({{ y }}) + {% endif %} +
+
+ Watched {{ entry.watched_at }} + via {{ entry.source }} +
+ +
+ +
+ + +
+
+ + +
+ +
+ +
+ + +
+
+
+ {% endfor %} +
+ {% endif %} +
+{% endblock %} diff --git a/crates/api-types/src/lib.rs b/crates/api-types/src/lib.rs index b9a5056..833f3bf 100644 --- a/crates/api-types/src/lib.rs +++ b/crates/api-types/src/lib.rs @@ -7,6 +7,7 @@ pub mod search; pub mod social; pub mod users; pub mod watchlist; +pub mod webhook; pub use auth::*; pub use common::*; @@ -16,3 +17,4 @@ pub use movies::*; pub use social::*; pub use users::*; pub use watchlist::*; +pub use webhook::*; diff --git a/crates/api-types/src/webhook.rs b/crates/api-types/src/webhook.rs new file mode 100644 index 0000000..57315e0 --- /dev/null +++ b/crates/api-types/src/webhook.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] +pub struct GenerateTokenRequest { + pub provider: String, + pub label: Option, +} + +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct GenerateTokenResponse { + pub id: String, + pub token: String, + pub webhook_url: String, +} + +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct WebhookTokenDto { + pub id: String, + pub provider: String, + pub label: Option, + pub created_at: String, + pub last_used_at: Option, +} + +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct WatchQueueEntryDto { + pub id: String, + pub title: String, + pub year: Option, + pub movie_id: Option, + pub source: String, + pub watched_at: String, +} + +#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] +pub struct ConfirmWatchRequest { + pub confirmations: Vec, +} + +#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] +pub struct ConfirmWatchEntry { + pub watch_event_id: Uuid, + pub rating: u8, + pub comment: Option, +} + +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct ConfirmWatchResponse { + pub confirmed: u32, +} + +#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] +pub struct DismissWatchRequest { + pub event_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct DismissWatchResponse { + pub dismissed: u32, +} diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index a4b8334..b0a1706 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -11,6 +11,9 @@ chrono = { workspace = true } tracing = { workspace = true } futures = { workspace = true } tokio = { workspace = true } +sha2 = { workspace = true } +rand = { workspace = true } +hex = { workspace = true } [features] xlsx = [] diff --git a/crates/application/src/commands.rs b/crates/application/src/commands.rs index e7c229b..ada0d80 100644 --- a/crates/application/src/commands.rs +++ b/crates/application/src/commands.rs @@ -72,6 +72,41 @@ pub struct DeleteImportProfileCommand { pub profile_id: Uuid, } +// ── Media server integration ────────────────────────────────────────────────── + +pub struct IngestWatchEventCommand { + pub token: String, + pub raw_payload: Vec, + pub source: domain::models::WatchEventSource, +} + +pub struct WatchEventConfirmation { + pub watch_event_id: Uuid, + pub rating: u8, + pub comment: Option, +} + +pub struct ConfirmWatchEventsCommand { + pub user_id: Uuid, + pub confirmations: Vec, +} + +pub struct DismissWatchEventsCommand { + pub user_id: Uuid, + pub event_ids: Vec, +} + +pub struct GenerateWebhookTokenCommand { + pub user_id: Uuid, + pub provider: domain::models::WatchEventSource, + pub label: Option, +} + +pub struct RevokeWebhookTokenCommand { + pub user_id: Uuid, + pub token_id: Uuid, +} + pub struct UpdateProfileCommand { pub user_id: Uuid, pub display_name: Option, diff --git a/crates/application/src/context.rs b/crates/application/src/context.rs index 36f494f..4885d6c 100644 --- a/crates/application/src/context.rs +++ b/crates/application/src/context.rs @@ -1,14 +1,14 @@ use std::sync::Arc; -#[cfg(feature = "federation")] -use domain::ports::RemoteWatchlistRepository; use domain::ports::{ AuthService, DiaryExporter, DiaryRepository, DocumentParser, EventPublisher, ImageStorage, ImportProfileRepository, ImportSessionRepository, MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, - ReviewRepository, SearchCommand, SearchPort, SocialQueryPort, StatsRepository, - UserProfileFieldsRepository, UserRepository, WatchlistRepository, + ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, + UserRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository, }; +#[cfg(feature = "federation")] +use domain::ports::{RemoteWatchlistRepository, SocialQueryPort}; use crate::config::AppConfig; @@ -35,6 +35,8 @@ pub struct AppContext { pub search_port: Arc, pub search_command: Arc, pub watchlist_repository: Arc, + pub watch_event_repository: Arc, + pub webhook_token_repository: Arc, pub profile_fields_repository: Arc, #[cfg(feature = "federation")] pub remote_watchlist_repository: Arc, diff --git a/crates/application/src/jobs.rs b/crates/application/src/jobs.rs index f1996f6..0a5d5e2 100644 --- a/crates/application/src/jobs.rs +++ b/crates/application/src/jobs.rs @@ -28,6 +28,31 @@ impl PeriodicJob for ImportSessionCleanupJob { } } +pub struct WatchEventCleanupJob { + ctx: AppContext, +} + +impl WatchEventCleanupJob { + pub fn new(ctx: AppContext) -> Self { + Self { ctx } + } +} + +#[async_trait] +impl PeriodicJob for WatchEventCleanupJob { + fn interval(&self) -> Duration { + Duration::from_secs(86400) + } + + async fn run(&self) -> Result<(), DomainError> { + let n = crate::use_cases::cleanup_watch_events::execute(&self.ctx).await?; + if n > 0 { + tracing::info!("watch event cleanup: removed {n} old entries"); + } + Ok(()) + } +} + pub struct EnrichmentStalenessJob { ctx: AppContext, } diff --git a/crates/application/src/ports.rs b/crates/application/src/ports.rs index 983a244..e107db9 100644 --- a/crates/application/src/ports.rs +++ b/crates/application/src/ports.rs @@ -206,6 +206,36 @@ pub struct BlockedActorsPageData { pub actors: Vec, } +pub struct WebhookTokenView { + pub id: String, + pub provider: String, + pub label: Option, + pub created_at: String, + pub last_used_at: Option, +} + +pub struct IntegrationsPageData { + pub ctx: HtmlPageContext, + pub tokens: Vec, + pub webhook_base_url: String, + pub new_token: Option, +} + +pub struct WatchQueueDisplayEntry { + pub id: String, + pub title: String, + pub year: Option, + pub source: String, + pub watched_at: String, + pub movie_url: Option, +} + +pub struct WatchQueuePageData { + pub ctx: HtmlPageContext, + pub entries: Vec, + pub error: Option, +} + pub trait HtmlRenderer: Send + Sync { fn render_diary_page( &self, @@ -229,6 +259,8 @@ pub trait HtmlRenderer: Send + Sync { fn render_blocked_domains_page(&self, data: BlockedDomainsPageData) -> Result; fn render_blocked_actors_page(&self, data: BlockedActorsPageData) -> Result; fn render_watchlist_page(&self, data: WatchlistPageData) -> Result; + fn render_integrations_page(&self, data: IntegrationsPageData) -> Result; + fn render_watch_queue_page(&self, data: WatchQueuePageData) -> Result; } pub trait RssFeedRenderer: Send + Sync { diff --git a/crates/application/src/queries.rs b/crates/application/src/queries.rs index b23d7e0..387679b 100644 --- a/crates/application/src/queries.rs +++ b/crates/application/src/queries.rs @@ -105,3 +105,11 @@ pub struct IsOnWatchlistQuery { pub struct GetCurrentProfileQuery { pub user_id: Uuid, } + +pub struct GetWatchQueueQuery { + pub user_id: Uuid, +} + +pub struct GetWebhookTokensQuery { + pub user_id: Uuid, +} diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index 1aba52b..e8b5025 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -10,16 +10,16 @@ use domain::{ ImportProfileRepository, ImportSessionRepository, MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, SearchCommand, SearchPort, StatsRepository, UserProfileFieldsRepository, - UserRepository, WatchlistRepository, + UserRepository, WatchEventRepository, WatchlistRepository, WebhookTokenRepository, }, testing::{ - FakeAuthService, FakeDiaryRepository, FakeMetadataClient, FakePasswordHasher, - InMemoryMovieRepository, InMemoryReviewRepository, InMemoryUserRepository, - InMemoryWatchlistRepository, NoopEventPublisher, NoopImageStorage, PanicDiaryExporter, - PanicDiaryRepository, PanicDocumentParser, PanicImportProfileRepository, - PanicImportSessionRepository, PanicMovieProfileRepository, PanicPersonCommand, - PanicPersonQuery, PanicPosterFetcher, PanicProfileFieldsRepo, PanicSearchCommand, - PanicSearchPort, PanicStatsRepository, + FakeAuthService, FakeMetadataClient, FakePasswordHasher, InMemoryMovieRepository, + InMemoryReviewRepository, InMemoryUserRepository, InMemoryWatchlistRepository, + NoopEventPublisher, NoopImageStorage, PanicDiaryExporter, PanicDiaryRepository, + PanicDocumentParser, PanicImportProfileRepository, PanicImportSessionRepository, + PanicMovieProfileRepository, PanicPersonCommand, PanicPersonQuery, PanicPosterFetcher, + PanicProfileFieldsRepo, PanicSearchCommand, PanicSearchPort, PanicStatsRepository, + PanicWatchEventRepository, PanicWebhookTokenRepository, }, }; @@ -43,6 +43,8 @@ pub struct TestContextBuilder { pub import_profile_repo: Arc, pub movie_profile_repo: Arc, pub watchlist_repo: Arc, + pub watch_event_repo: Arc, + pub webhook_token_repo: Arc, pub profile_fields_repo: Arc, pub person_command: Arc, pub person_query: Arc, @@ -71,6 +73,8 @@ impl TestContextBuilder { import_profile_repo: Arc::new(PanicImportProfileRepository), movie_profile_repo: Arc::new(PanicMovieProfileRepository), watchlist_repo: InMemoryWatchlistRepository::new(), + watch_event_repo: Arc::new(PanicWatchEventRepository), + webhook_token_repo: Arc::new(PanicWebhookTokenRepository), profile_fields_repo: Arc::new(PanicProfileFieldsRepo), person_command: Arc::new(PanicPersonCommand), person_query: Arc::new(PanicPersonQuery), @@ -138,6 +142,8 @@ impl TestContextBuilder { import_profile_repository: self.import_profile_repo, movie_profile_repository: self.movie_profile_repo, watchlist_repository: self.watchlist_repo, + watch_event_repository: self.watch_event_repo, + webhook_token_repository: self.webhook_token_repo, profile_fields_repository: self.profile_fields_repo, person_command: self.person_command, person_query: self.person_query, diff --git a/crates/application/src/tests/worker.rs b/crates/application/src/tests/worker.rs index c8e94f3..c6a1a9d 100644 --- a/crates/application/src/tests/worker.rs +++ b/crates/application/src/tests/worker.rs @@ -58,6 +58,7 @@ impl EventHandler for RecordingHandler { DomainEvent::FollowAccepted { .. } => "follow_accepted", DomainEvent::BackfillFollower { .. } => "backfill_follower", DomainEvent::FederationDeliveryRequested { .. } => "federation_delivery", + DomainEvent::WatchEventIngested { .. } => "watch_event_ingested", }; self.calls.lock().unwrap().push(label); Ok(()) diff --git a/crates/application/src/use_cases/cleanup_watch_events.rs b/crates/application/src/use_cases/cleanup_watch_events.rs new file mode 100644 index 0000000..62d73b7 --- /dev/null +++ b/crates/application/src/use_cases/cleanup_watch_events.rs @@ -0,0 +1,11 @@ +use chrono::Duration; +use domain::errors::DomainError; + +use crate::context::AppContext; + +pub async fn execute(ctx: &AppContext) -> Result { + let cutoff = chrono::Utc::now().naive_utc() - Duration::days(30); + ctx.watch_event_repository + .delete_non_pending_older_than(cutoff) + .await +} diff --git a/crates/application/src/use_cases/confirm_watch_events.rs b/crates/application/src/use_cases/confirm_watch_events.rs new file mode 100644 index 0000000..a28b7c7 --- /dev/null +++ b/crates/application/src/use_cases/confirm_watch_events.rs @@ -0,0 +1,65 @@ +use domain::{ + errors::DomainError, + models::WatchEventStatus, + value_objects::{UserId, WatchEventId}, +}; + +use crate::{ + commands::{ConfirmWatchEventsCommand, LogReviewCommand, MovieInput}, + context::AppContext, + use_cases::log_review, +}; + +pub async fn execute(ctx: &AppContext, cmd: ConfirmWatchEventsCommand) -> Result { + let user_id = UserId::from_uuid(cmd.user_id); + let mut confirmed = 0u32; + + for c in cmd.confirmations { + let event_id = WatchEventId::from_uuid(c.watch_event_id); + let event = ctx + .watch_event_repository + .get_by_id(&event_id) + .await? + .ok_or_else(|| DomainError::NotFound(format!("WatchEvent {}", c.watch_event_id)))?; + + if event.user_id() != &user_id { + return Err(DomainError::Unauthorized("not your watch event".into())); + } + + let input = if let Some(movie_id) = event.movie_id() { + MovieInput { + movie_id: Some(movie_id.value()), + external_metadata_id: None, + manual_title: None, + manual_release_year: None, + manual_director: None, + } + } else { + MovieInput { + movie_id: None, + external_metadata_id: event.external_metadata_id().map(String::from), + manual_title: Some(event.title().to_string()), + manual_release_year: event.year(), + manual_director: None, + } + }; + + let review_cmd = LogReviewCommand { + user_id: cmd.user_id, + input, + rating: c.rating, + comment: c.comment, + watched_at: *event.watched_at(), + }; + + log_review::execute(ctx, review_cmd).await?; + + ctx.watch_event_repository + .update_status(&event_id, WatchEventStatus::Confirmed) + .await?; + + confirmed += 1; + } + + Ok(confirmed) +} diff --git a/crates/application/src/use_cases/dismiss_watch_events.rs b/crates/application/src/use_cases/dismiss_watch_events.rs new file mode 100644 index 0000000..faa7e3f --- /dev/null +++ b/crates/application/src/use_cases/dismiss_watch_events.rs @@ -0,0 +1,33 @@ +use domain::{ + errors::DomainError, + models::WatchEventStatus, + value_objects::{UserId, WatchEventId}, +}; + +use crate::{commands::DismissWatchEventsCommand, context::AppContext}; + +pub async fn execute(ctx: &AppContext, cmd: DismissWatchEventsCommand) -> Result { + let user_id = UserId::from_uuid(cmd.user_id); + let mut dismissed = 0u32; + + for id in cmd.event_ids { + let event_id = WatchEventId::from_uuid(id); + let event = ctx + .watch_event_repository + .get_by_id(&event_id) + .await? + .ok_or_else(|| DomainError::NotFound(format!("WatchEvent {id}")))?; + + if event.user_id() != &user_id { + return Err(DomainError::Unauthorized("not your watch event".into())); + } + + ctx.watch_event_repository + .update_status(&event_id, WatchEventStatus::Dismissed) + .await?; + + dismissed += 1; + } + + Ok(dismissed) +} diff --git a/crates/application/src/use_cases/generate_webhook_token.rs b/crates/application/src/use_cases/generate_webhook_token.rs new file mode 100644 index 0000000..5fd9b4e --- /dev/null +++ b/crates/application/src/use_cases/generate_webhook_token.rs @@ -0,0 +1,38 @@ +use domain::{errors::DomainError, models::WebhookToken, value_objects::UserId}; +use sha2::{Digest, Sha256}; + +use crate::{commands::GenerateWebhookTokenCommand, context::AppContext}; + +pub struct GeneratedWebhookToken { + pub token_plaintext: String, + pub token: WebhookToken, +} + +pub async fn execute( + ctx: &AppContext, + cmd: GenerateWebhookTokenCommand, +) -> Result { + let plaintext = generate_random_token(); + let hash = hash_token(&plaintext); + + let user_id = UserId::from_uuid(cmd.user_id); + let token = WebhookToken::new(user_id, hash, cmd.provider, cmd.label); + + ctx.webhook_token_repository.save(&token).await?; + + Ok(GeneratedWebhookToken { + token_plaintext: plaintext, + token, + }) +} + +fn generate_random_token() -> String { + let bytes: [u8; 32] = rand::random(); + hex::encode(bytes) +} + +pub fn hash_token(plaintext: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(plaintext.as_bytes()); + hex::encode(hasher.finalize()) +} diff --git a/crates/application/src/use_cases/get_watch_queue.rs b/crates/application/src/use_cases/get_watch_queue.rs new file mode 100644 index 0000000..89bc040 --- /dev/null +++ b/crates/application/src/use_cases/get_watch_queue.rs @@ -0,0 +1,11 @@ +use domain::{errors::DomainError, models::WatchEvent, value_objects::UserId}; + +use crate::{context::AppContext, queries::GetWatchQueueQuery}; + +pub async fn execute( + ctx: &AppContext, + query: GetWatchQueueQuery, +) -> Result, DomainError> { + let user_id = UserId::from_uuid(query.user_id); + ctx.watch_event_repository.list_pending(&user_id).await +} diff --git a/crates/application/src/use_cases/get_webhook_tokens.rs b/crates/application/src/use_cases/get_webhook_tokens.rs new file mode 100644 index 0000000..f263c4d --- /dev/null +++ b/crates/application/src/use_cases/get_webhook_tokens.rs @@ -0,0 +1,11 @@ +use domain::{errors::DomainError, models::WebhookToken, value_objects::UserId}; + +use crate::{context::AppContext, queries::GetWebhookTokensQuery}; + +pub async fn execute( + ctx: &AppContext, + query: GetWebhookTokensQuery, +) -> Result, DomainError> { + let user_id = UserId::from_uuid(query.user_id); + ctx.webhook_token_repository.list_by_user(&user_id).await +} diff --git a/crates/application/src/use_cases/ingest_watch_event.rs b/crates/application/src/use_cases/ingest_watch_event.rs new file mode 100644 index 0000000..66c1183 --- /dev/null +++ b/crates/application/src/use_cases/ingest_watch_event.rs @@ -0,0 +1,69 @@ +use chrono::Duration; +use domain::{ + errors::DomainError, events::DomainEvent, models::WatchEvent, ports::MediaServerParser, +}; + +use crate::{ + commands::IngestWatchEventCommand, context::AppContext, use_cases::generate_webhook_token, +}; + +pub async fn execute( + ctx: &AppContext, + cmd: IngestWatchEventCommand, + parser: &dyn MediaServerParser, +) -> Result<(), DomainError> { + let token_hash = generate_webhook_token::hash_token(&cmd.token); + let webhook_token = ctx + .webhook_token_repository + .find_by_token_hash(&token_hash) + .await? + .ok_or_else(|| DomainError::Unauthorized("invalid webhook token".into()))?; + + let _ = ctx + .webhook_token_repository + .touch_last_used(webhook_token.id()) + .await; + + let parsed = match parser.parse_playback_event(&cmd.raw_payload)? { + Some(event) => event, + None => return Ok(()), + }; + + let external_metadata_id = parsed.tmdb_id.or(parsed.imdb_id); + let user_id = webhook_token.user_id().clone(); + + if let Some(ref ext_id) = external_metadata_id { + let one_hour_ago = chrono::Utc::now().naive_utc() - Duration::hours(1); + if ctx + .watch_event_repository + .find_duplicate(&user_id, ext_id, one_hour_ago) + .await? + { + return Ok(()); + } + } + + let watched_at = chrono::Utc::now().naive_utc(); + let event = WatchEvent::new( + user_id, + parsed.title, + parsed.year, + external_metadata_id, + cmd.source, + watched_at, + None, + ); + + ctx.watch_event_repository.save(&event).await?; + + let _ = ctx + .event_publisher + .publish(&DomainEvent::WatchEventIngested { + user_id: event.user_id().clone(), + title: event.title().to_string(), + source: event.source().to_string(), + }) + .await; + + Ok(()) +} diff --git a/crates/application/src/use_cases/log_review.rs b/crates/application/src/use_cases/log_review.rs index 9a6ef84..77841f3 100644 --- a/crates/application/src/use_cases/log_review.rs +++ b/crates/application/src/use_cases/log_review.rs @@ -66,7 +66,7 @@ mod tests { use domain::{ models::Movie, - value_objects::{MovieId, MovieTitle, ReleaseYear}, + value_objects::{MovieTitle, ReleaseYear}, }; use domain::ports::MovieRepository; diff --git a/crates/application/src/use_cases/mod.rs b/crates/application/src/use_cases/mod.rs index e21f437..7d77c04 100644 --- a/crates/application/src/use_cases/mod.rs +++ b/crates/application/src/use_cases/mod.rs @@ -2,12 +2,16 @@ pub mod add_to_watchlist; pub mod apply_import_mapping; pub mod apply_import_profile; pub mod cleanup_expired_import_sessions; +pub mod cleanup_watch_events; +pub mod confirm_watch_events; pub mod create_import_session; pub mod delete_import_profile; pub mod delete_review; +pub mod dismiss_watch_events; pub mod enrich_movie; pub mod execute_import; pub mod export_diary; +pub mod generate_webhook_token; pub mod get_activity_feed; pub mod get_current_profile; pub mod get_diary; @@ -20,8 +24,11 @@ pub mod get_remote_watchlist; pub mod get_review_history; pub mod get_user_profile; pub mod get_users; +pub mod get_watch_queue; pub mod get_watchlist; pub mod get_watchlist_page; +pub mod get_webhook_tokens; +pub mod ingest_watch_event; pub mod is_on_watchlist; pub mod list_import_profiles; pub mod log_review; @@ -29,6 +36,7 @@ pub mod login; pub mod register; pub mod register_and_login; pub mod remove_from_watchlist; +pub mod revoke_webhook_token; pub mod save_import_profile; pub mod search; pub mod sync_poster; diff --git a/crates/application/src/use_cases/revoke_webhook_token.rs b/crates/application/src/use_cases/revoke_webhook_token.rs new file mode 100644 index 0000000..dc288f4 --- /dev/null +++ b/crates/application/src/use_cases/revoke_webhook_token.rs @@ -0,0 +1,14 @@ +use domain::{ + errors::DomainError, + value_objects::{UserId, WebhookTokenId}, +}; + +use crate::{commands::RevokeWebhookTokenCommand, context::AppContext}; + +pub async fn execute(ctx: &AppContext, cmd: RevokeWebhookTokenCommand) -> Result<(), DomainError> { + let user_id = UserId::from_uuid(cmd.user_id); + let token_id = WebhookTokenId::from_uuid(cmd.token_id); + ctx.webhook_token_repository + .delete(&token_id, &user_id) + .await +} diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 4abc182..90e5f07 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -70,6 +70,11 @@ pub enum DomainEvent { activity_json: String, signing_actor_id: uuid::Uuid, }, + WatchEventIngested { + user_id: UserId, + title: String, + source: String, + }, } #[async_trait] diff --git a/crates/domain/src/models/mod.rs b/crates/domain/src/models/mod.rs index 48adadc..6eb3da1 100644 --- a/crates/domain/src/models/mod.rs +++ b/crates/domain/src/models/mod.rs @@ -18,6 +18,11 @@ pub mod watchlist; pub use watchlist::{WatchlistEntry, WatchlistWithMovie}; pub mod remote_watchlist; pub use remote_watchlist::RemoteWatchlistEntry; +pub mod watch_event; +pub use watch_event::{ + ParsedPlaybackEvent, PersistedWatchEvent, WatchEvent, WatchEventSource, WatchEventStatus, + WebhookToken, +}; pub use import::{ AnnotatedRow, DomainField, FieldMapping, FileFormat, ImportError, ImportRow, ParsedFile, diff --git a/crates/domain/src/models/watch_event.rs b/crates/domain/src/models/watch_event.rs new file mode 100644 index 0000000..de225eb --- /dev/null +++ b/crates/domain/src/models/watch_event.rs @@ -0,0 +1,236 @@ +use chrono::NaiveDateTime; + +use crate::value_objects::{MovieId, UserId, WatchEventId, WebhookTokenId}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WatchEventSource { + Jellyfin, + Plex, +} + +impl std::fmt::Display for WatchEventSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Jellyfin => write!(f, "jellyfin"), + Self::Plex => write!(f, "plex"), + } + } +} + +impl std::str::FromStr for WatchEventSource { + type Err = String; + fn from_str(s: &str) -> Result { + match s { + "jellyfin" => Ok(Self::Jellyfin), + "plex" => Ok(Self::Plex), + other => Err(format!("unknown watch event source: {other}")), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Default)] +pub enum WatchEventStatus { + #[default] + Pending, + Confirmed, + Dismissed, +} + +impl std::fmt::Display for WatchEventStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Pending => write!(f, "pending"), + Self::Confirmed => write!(f, "confirmed"), + Self::Dismissed => write!(f, "dismissed"), + } + } +} + +impl std::str::FromStr for WatchEventStatus { + type Err = String; + fn from_str(s: &str) -> Result { + match s { + "pending" => Ok(Self::Pending), + "confirmed" => Ok(Self::Confirmed), + "dismissed" => Ok(Self::Dismissed), + other => Err(format!("unknown watch event status: {other}")), + } + } +} + +pub struct PersistedWatchEvent { + pub id: WatchEventId, + pub user_id: UserId, + pub movie_id: Option, + pub title: String, + pub year: Option, + pub external_metadata_id: Option, + pub source: WatchEventSource, + pub watched_at: NaiveDateTime, + pub status: WatchEventStatus, + pub created_at: NaiveDateTime, +} + +#[derive(Clone, Debug)] +pub struct WatchEvent { + id: WatchEventId, + user_id: UserId, + movie_id: Option, + title: String, + year: Option, + external_metadata_id: Option, + source: WatchEventSource, + watched_at: NaiveDateTime, + status: WatchEventStatus, + created_at: NaiveDateTime, +} + +impl WatchEvent { + pub fn new( + user_id: UserId, + title: String, + year: Option, + external_metadata_id: Option, + source: WatchEventSource, + watched_at: NaiveDateTime, + movie_id: Option, + ) -> Self { + Self { + id: WatchEventId::generate(), + user_id, + movie_id, + title, + year, + external_metadata_id, + source, + watched_at, + status: WatchEventStatus::Pending, + created_at: chrono::Utc::now().naive_utc(), + } + } + + pub fn from_persistence(row: PersistedWatchEvent) -> Self { + Self { + id: row.id, + user_id: row.user_id, + movie_id: row.movie_id, + title: row.title, + year: row.year, + external_metadata_id: row.external_metadata_id, + source: row.source, + watched_at: row.watched_at, + status: row.status, + created_at: row.created_at, + } + } + + pub fn id(&self) -> &WatchEventId { + &self.id + } + pub fn user_id(&self) -> &UserId { + &self.user_id + } + pub fn movie_id(&self) -> Option<&MovieId> { + self.movie_id.as_ref() + } + pub fn title(&self) -> &str { + &self.title + } + pub fn year(&self) -> Option { + self.year + } + pub fn external_metadata_id(&self) -> Option<&str> { + self.external_metadata_id.as_deref() + } + pub fn source(&self) -> &WatchEventSource { + &self.source + } + pub fn watched_at(&self) -> &NaiveDateTime { + &self.watched_at + } + pub fn status(&self) -> &WatchEventStatus { + &self.status + } + pub fn created_at(&self) -> &NaiveDateTime { + &self.created_at + } +} + +#[derive(Clone, Debug)] +pub struct WebhookToken { + id: WebhookTokenId, + user_id: UserId, + token_hash: String, + provider: WatchEventSource, + label: Option, + created_at: NaiveDateTime, + last_used_at: Option, +} + +impl WebhookToken { + pub fn new( + user_id: UserId, + token_hash: String, + provider: WatchEventSource, + label: Option, + ) -> Self { + Self { + id: WebhookTokenId::generate(), + user_id, + token_hash, + provider, + label, + created_at: chrono::Utc::now().naive_utc(), + last_used_at: None, + } + } + + pub fn from_persistence( + id: WebhookTokenId, + user_id: UserId, + token_hash: String, + provider: WatchEventSource, + label: Option, + created_at: NaiveDateTime, + last_used_at: Option, + ) -> Self { + Self { + id, + user_id, + token_hash, + provider, + label, + created_at, + last_used_at, + } + } + + pub fn id(&self) -> &WebhookTokenId { + &self.id + } + pub fn user_id(&self) -> &UserId { + &self.user_id + } + pub fn token_hash(&self) -> &str { + &self.token_hash + } + pub fn provider(&self) -> &WatchEventSource { + &self.provider + } + pub fn label(&self) -> Option<&str> { + self.label.as_deref() + } + pub fn created_at(&self) -> &NaiveDateTime { + &self.created_at + } + pub fn last_used_at(&self) -> Option<&NaiveDateTime> { + self.last_used_at.as_ref() + } +} + +pub struct ParsedPlaybackEvent { + pub title: String, + pub year: Option, + pub tmdb_id: Option, + pub imdb_id: Option, +} diff --git a/crates/domain/src/ports.rs b/crates/domain/src/ports.rs index 5ef97c3..c10def7 100644 --- a/crates/domain/src/ports.rs +++ b/crates/domain/src/ports.rs @@ -8,14 +8,15 @@ use crate::{ AnnotatedRow, DiaryEntry, DiaryFilter, EntityType, ExportFormat, ExternalPersonId, FeedEntry, FieldMapping, FileFormat, ImportError, ImportProfile, ImportSession, IndexableDocument, Movie, MovieFilter, MovieProfile, MovieStats, MovieSummary, ParsedFile, - Person, PersonCredits, PersonId, RemoteWatchlistEntry, Review, ReviewHistory, SearchQuery, - SearchResults, User, UserStats, UserSummary, UserTrends, WatchlistEntry, - WatchlistWithMovie, + ParsedPlaybackEvent, Person, PersonCredits, PersonId, RemoteWatchlistEntry, Review, + ReviewHistory, SearchQuery, SearchResults, User, UserStats, UserSummary, UserTrends, + WatchEvent, WatchEventStatus, WatchlistEntry, WatchlistWithMovie, WebhookToken, collections::{self, PageParams, Paginated}, }, value_objects::{ Email, ExternalMetadataId, ImportProfileId, ImportSessionId, MovieId, MovieTitle, - PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, + PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WatchEventId, + WebhookTokenId, }, }; @@ -415,3 +416,41 @@ pub trait LocalApContentQuery: Send + Sync { limit: usize, ) -> Result, DomainError>; } + +// ── Media server integration ────────────────────────────────────────────────── + +pub trait MediaServerParser: Send + Sync { + fn parse_playback_event(&self, body: &[u8]) + -> Result, DomainError>; +} + +#[async_trait] +pub trait WatchEventRepository: Send + Sync { + async fn save(&self, event: &WatchEvent) -> Result<(), DomainError>; + async fn update_status( + &self, + id: &WatchEventId, + status: WatchEventStatus, + ) -> Result<(), DomainError>; + async fn list_pending(&self, user_id: &UserId) -> Result, DomainError>; + async fn get_by_id(&self, id: &WatchEventId) -> Result, DomainError>; + async fn find_duplicate( + &self, + user_id: &UserId, + external_id: &str, + after: chrono::NaiveDateTime, + ) -> Result; + async fn delete_non_pending_older_than( + &self, + before: chrono::NaiveDateTime, + ) -> Result; +} + +#[async_trait] +pub trait WebhookTokenRepository: Send + Sync { + async fn save(&self, token: &WebhookToken) -> Result<(), DomainError>; + async fn find_by_token_hash(&self, hash: &str) -> Result, DomainError>; + async fn list_by_user(&self, user_id: &UserId) -> Result, DomainError>; + async fn delete(&self, id: &WebhookTokenId, user_id: &UserId) -> Result<(), DomainError>; + async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError>; +} diff --git a/crates/domain/src/testing.rs b/crates/domain/src/testing.rs index 9d0f322..d9ad879 100644 --- a/crates/domain/src/testing.rs +++ b/crates/domain/src/testing.rs @@ -839,3 +839,83 @@ impl crate::ports::SocialQueryPort for NoopSocialQueryPort { Ok(vec![]) } } + +// ── PanicWatchEventRepository ──────────────────────────────────────────────── + +pub struct PanicWatchEventRepository; + +#[async_trait] +impl crate::ports::WatchEventRepository for PanicWatchEventRepository { + async fn save(&self, _: &crate::models::WatchEvent) -> Result<(), DomainError> { + panic!("PanicWatchEventRepository called") + } + async fn update_status( + &self, + _: &crate::value_objects::WatchEventId, + _: crate::models::WatchEventStatus, + ) -> Result<(), DomainError> { + panic!("PanicWatchEventRepository called") + } + async fn list_pending( + &self, + _: &UserId, + ) -> Result, DomainError> { + panic!("PanicWatchEventRepository called") + } + async fn get_by_id( + &self, + _: &crate::value_objects::WatchEventId, + ) -> Result, DomainError> { + panic!("PanicWatchEventRepository called") + } + async fn find_duplicate( + &self, + _: &UserId, + _: &str, + _: chrono::NaiveDateTime, + ) -> Result { + panic!("PanicWatchEventRepository called") + } + async fn delete_non_pending_older_than( + &self, + _: chrono::NaiveDateTime, + ) -> Result { + panic!("PanicWatchEventRepository called") + } +} + +// ── PanicWebhookTokenRepository ────────────────────────────────────────────── + +pub struct PanicWebhookTokenRepository; + +#[async_trait] +impl crate::ports::WebhookTokenRepository for PanicWebhookTokenRepository { + async fn save(&self, _: &crate::models::WebhookToken) -> Result<(), DomainError> { + panic!("PanicWebhookTokenRepository called") + } + async fn find_by_token_hash( + &self, + _: &str, + ) -> Result, DomainError> { + panic!("PanicWebhookTokenRepository called") + } + async fn list_by_user( + &self, + _: &UserId, + ) -> Result, DomainError> { + panic!("PanicWebhookTokenRepository called") + } + async fn delete( + &self, + _: &crate::value_objects::WebhookTokenId, + _: &UserId, + ) -> Result<(), DomainError> { + panic!("PanicWebhookTokenRepository called") + } + async fn touch_last_used( + &self, + _: &crate::value_objects::WebhookTokenId, + ) -> Result<(), DomainError> { + panic!("PanicWebhookTokenRepository called") + } +} diff --git a/crates/domain/src/value_objects.rs b/crates/domain/src/value_objects.rs index 42a7e22..f614066 100644 --- a/crates/domain/src/value_objects.rs +++ b/crates/domain/src/value_objects.rs @@ -26,6 +26,8 @@ uuid_id!(UserId); uuid_id!(ImportSessionId); uuid_id!(ImportProfileId); uuid_id!(WatchlistEntryId); +uuid_id!(WatchEventId); +uuid_id!(WebhookTokenId); #[derive(Clone, Debug, PartialEq, Eq)] pub struct ExternalMetadataId(String); diff --git a/crates/presentation/Cargo.toml b/crates/presentation/Cargo.toml index b835c2c..c990546 100644 --- a/crates/presentation/Cargo.toml +++ b/crates/presentation/Cargo.toml @@ -53,6 +53,8 @@ nats = { workspace = true, optional = true } rss = { workspace = true } export = { workspace = true } importer = { workspace = true } +jellyfin = { workspace = true } +plex = { workspace = true } sqlx = { workspace = true } utoipa = { version = "5.5.0", features = ["axum_extras", "uuid"] } utoipa-scalar = { version = "0.3.0", features = ["axum"], default-features = false } diff --git a/crates/presentation/src/factory.rs b/crates/presentation/src/factory.rs index b31703d..856a499 100644 --- a/crates/presentation/src/factory.rs +++ b/crates/presentation/src/factory.rs @@ -6,7 +6,8 @@ use domain::ports::{ AuthService, DiaryRepository, ImageStorage, ImportProfileRepository, ImportSessionRepository, LocalApContentQuery, MetadataClient, MovieProfileRepository, MovieRepository, PasswordHasher, PersonCommand, PersonQuery, PosterFetcherClient, ReviewRepository, SearchCommand, SearchPort, - StatsRepository, UserProfileFieldsRepository, UserRepository, WatchlistRepository, + StatsRepository, UserProfileFieldsRepository, UserRepository, WatchEventRepository, + WatchlistRepository, WebhookTokenRepository, }; pub struct DatabaseAdapters { @@ -25,6 +26,8 @@ pub struct DatabaseAdapters { pub search_port: Arc, pub search_command: Arc, pub profile_fields_repo: Arc, + pub watch_event_repo: Arc, + pub webhook_token_repo: Arc, pub db_pool: DbPool, } @@ -45,6 +48,10 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result let (pc, pq) = postgres::create_person_adapter(pool.clone()); let (sc, sp) = postgres_search::create_search_adapter(pool.clone()); let pf = postgres::create_profile_fields_repo(pool.clone()); + let we: Arc = + Arc::new(postgres::PostgresWatchEventRepository::new(pool.clone())); + let wt: Arc = + Arc::new(postgres::PostgresWebhookTokenRepository::new(pool.clone())); Ok(DatabaseAdapters { movie_repo: m, review_repo: r, @@ -61,6 +68,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result search_port: sp, search_command: sc, profile_fields_repo: pf, + watch_event_repo: we, + webhook_token_repo: wt, db_pool: DbPool::Postgres(pool), }) } @@ -72,6 +81,10 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result let (pc, pq) = sqlite::create_person_adapter(pool.clone()); let (sc, sp) = sqlite_search::create_search_adapter(pool.clone()); let pf = sqlite::create_profile_fields_repo(pool.clone()); + let we: Arc = + Arc::new(sqlite::SqliteWatchEventRepository::new(pool.clone())); + let wt: Arc = + Arc::new(sqlite::SqliteWebhookTokenRepository::new(pool.clone())); Ok(DatabaseAdapters { movie_repo: m, review_repo: r, @@ -88,6 +101,8 @@ pub async fn build_database_adapters(backend: &str, url: &str) -> anyhow::Result search_port: sp, search_command: sc, profile_fields_repo: pf, + watch_event_repo: we, + webhook_token_repo: wt, db_pool: DbPool::Sqlite(pool), }) } diff --git a/crates/presentation/src/forms.rs b/crates/presentation/src/forms.rs index 0d5ad8a..b57c69f 100644 --- a/crates/presentation/src/forms.rs +++ b/crates/presentation/src/forms.rs @@ -261,6 +261,43 @@ pub fn to_diary_query(p: DiaryQueryParams) -> GetDiaryQuery { } } +// ── Integrations forms ──────────────────────────────────────────────────────── + +#[derive(Deserialize)] +pub struct GenerateTokenForm { + pub provider: String, + #[serde(default)] + pub label: Option, + #[serde(rename = "_csrf", default)] + pub csrf_token: String, +} + +#[derive(Deserialize)] +pub struct RevokeTokenForm { + #[serde(rename = "_csrf", default)] + pub csrf_token: String, +} + +#[derive(Deserialize, Default)] +pub struct IntegrationsQuery { + pub token: Option, +} + +#[derive(Deserialize)] +pub struct ConfirmWatchForm { + pub rating: u8, + #[serde(default)] + pub comment: Option, + #[serde(rename = "_csrf", default)] + pub csrf_token: String, +} + +#[derive(Deserialize)] +pub struct DismissWatchForm { + #[serde(rename = "_csrf", default)] + pub csrf_token: String, +} + #[cfg(test)] #[path = "tests/forms.rs"] mod tests; diff --git a/crates/presentation/src/handlers/html.rs b/crates/presentation/src/handlers/html.rs index 99bb089..ca4a388 100644 --- a/crates/presentation/src/handlers/html.rs +++ b/crates/presentation/src/handlers/html.rs @@ -16,17 +16,24 @@ use application::ports::{ }; use application::{ commands::{ - AddToWatchlistCommand, DeleteReviewCommand, MovieInput, RemoveFromWatchlistCommand, + AddToWatchlistCommand, ConfirmWatchEventsCommand, DeleteReviewCommand, + DismissWatchEventsCommand, GenerateWebhookTokenCommand, MovieInput, + RemoveFromWatchlistCommand, RevokeWebhookTokenCommand, WatchEventConfirmation, }, ports::{ - HtmlPageContext, LoginPageData, MovieDetailPageData, NewReviewPageData, - ProfileSettingsPageData, RegisterPageData, RemoteActorView, WatchlistPageData, + HtmlPageContext, IntegrationsPageData, LoginPageData, MovieDetailPageData, + NewReviewPageData, ProfileSettingsPageData, RegisterPageData, RemoteActorView, + WatchQueueDisplayEntry, WatchQueuePageData, WatchlistPageData, WebhookTokenView, + }, + queries::{ + ExportQuery, GetMovieSocialPageQuery, GetWatchQueueQuery, GetWebhookTokensQuery, + IsOnWatchlistQuery, LoginQuery, }, - queries::{ExportQuery, GetMovieSocialPageQuery, IsOnWatchlistQuery, LoginQuery}, use_cases::{ - add_to_watchlist, delete_review, export_diary as export_diary_uc, get_movie_social_page, - is_on_watchlist, log_review, login as login_uc, remove_from_watchlist, update_profile, - update_profile_fields, + add_to_watchlist, confirm_watch_events, delete_review, dismiss_watch_events, + export_diary as export_diary_uc, generate_webhook_token, get_movie_social_page, + get_watch_queue, get_webhook_tokens, is_on_watchlist, log_review, login as login_uc, + remove_from_watchlist, revoke_webhook_token, update_profile, update_profile_fields, }, }; use domain::models::ExportFormat; @@ -1499,3 +1506,209 @@ pub async fn post_profile_settings( Redirect::to("/settings/profile?saved=1").into_response() } + +// ── Integrations ────────────────────────────────────────────────────────────── + +pub async fn get_integrations_page( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Query(params): Query, + Extension(csrf): Extension, +) -> impl IntoResponse { + let mut ctx = build_page_context(&state, Some(user_id.clone()), csrf.0).await; + ctx.page_title = "Integrations — Movies Diary".to_string(); + ctx.canonical_url = format!("{}/settings/integrations", state.app_ctx.config.base_url); + + let query = GetWebhookTokensQuery { + user_id: user_id.value(), + }; + let tokens = get_webhook_tokens::execute(&state.app_ctx, query) + .await + .unwrap_or_default(); + + let token_views: Vec = tokens + .into_iter() + .map(|t| WebhookTokenView { + id: t.id().value().to_string(), + provider: t.provider().to_string(), + label: t.label().map(String::from), + created_at: t.created_at().format("%Y-%m-%d %H:%M").to_string(), + last_used_at: t + .last_used_at() + .map(|d| d.format("%Y-%m-%d %H:%M").to_string()), + }) + .collect(); + + let data = IntegrationsPageData { + ctx, + tokens: token_views, + webhook_base_url: state.app_ctx.config.base_url.clone(), + new_token: params.token, + }; + + match state.html_renderer.render_integrations_page(data) { + Ok(html) => Html(html).into_response(), + Err(e) => { + tracing::error!("integrations template error: {}", e); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +pub async fn post_generate_token( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Extension(csrf): Extension, + Form(form): Form, +) -> impl IntoResponse { + if crate::csrf::mismatch(&csrf, &form.csrf_token) { + return StatusCode::FORBIDDEN.into_response(); + } + + let provider = match form.provider.parse::() { + Ok(p) => p, + Err(_) => return Redirect::to("/settings/integrations").into_response(), + }; + + let cmd = GenerateWebhookTokenCommand { + user_id: user_id.value(), + provider, + label: form.label.filter(|l| !l.trim().is_empty()), + }; + + match generate_webhook_token::execute(&state.app_ctx, cmd).await { + Ok(result) => { + let encoded = percent_encoding::utf8_percent_encode( + &result.token_plaintext, + percent_encoding::NON_ALPHANUMERIC, + ); + Redirect::to(&format!("/settings/integrations?token={encoded}")).into_response() + } + Err(e) => { + tracing::error!("generate token failed: {:?}", e); + Redirect::to("/settings/integrations").into_response() + } + } +} + +pub async fn post_revoke_token( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(token_id): Path, + Extension(csrf): Extension, + Form(form): Form, +) -> impl IntoResponse { + if crate::csrf::mismatch(&csrf, &form.csrf_token) { + return StatusCode::FORBIDDEN.into_response(); + } + + let cmd = RevokeWebhookTokenCommand { + user_id: user_id.value(), + token_id, + }; + if let Err(e) = revoke_webhook_token::execute(&state.app_ctx, cmd).await { + tracing::error!("revoke token failed: {:?}", e); + } + + Redirect::to("/settings/integrations").into_response() +} + +// ── Watch Queue ─────────────────────────────────────────────────────────────── + +pub async fn get_watch_queue_page( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Query(params): Query, + Extension(csrf): Extension, +) -> impl IntoResponse { + let mut ctx = build_page_context(&state, Some(user_id.clone()), csrf.0).await; + ctx.page_title = "Watch Queue — Movies Diary".to_string(); + ctx.canonical_url = format!("{}/watch-queue", state.app_ctx.config.base_url); + + let query = GetWatchQueueQuery { + user_id: user_id.value(), + }; + let events = get_watch_queue::execute(&state.app_ctx, query) + .await + .unwrap_or_default(); + + let entries: Vec = events + .into_iter() + .map(|e| WatchQueueDisplayEntry { + id: e.id().value().to_string(), + title: e.title().to_string(), + year: e.year(), + source: e.source().to_string(), + watched_at: e.watched_at().format("%Y-%m-%d %H:%M").to_string(), + movie_url: e.movie_id().map(|m| format!("/movies/{}", m.value())), + }) + .collect(); + + let data = WatchQueuePageData { + ctx, + entries, + error: params.error, + }; + + match state.html_renderer.render_watch_queue_page(data) { + Ok(html) => Html(html).into_response(), + Err(e) => { + tracing::error!("watch_queue template error: {}", e); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +pub async fn post_confirm_single( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(event_id): Path, + Extension(csrf): Extension, + Form(form): Form, +) -> impl IntoResponse { + if crate::csrf::mismatch(&csrf, &form.csrf_token) { + return StatusCode::FORBIDDEN.into_response(); + } + + let cmd = ConfirmWatchEventsCommand { + user_id: user_id.value(), + confirmations: vec![WatchEventConfirmation { + watch_event_id: event_id, + rating: form.rating, + comment: form.comment.filter(|c| !c.trim().is_empty()), + }], + }; + + match confirm_watch_events::execute(&state.app_ctx, cmd).await { + Ok(_) => Redirect::to("/watch-queue").into_response(), + Err(e) => { + let msg = encode_error(&e.to_string()); + Redirect::to(&format!("/watch-queue?error={msg}")).into_response() + } + } +} + +pub async fn post_dismiss_single( + RequiredCookieUser(user_id): RequiredCookieUser, + State(state): State, + Path(event_id): Path, + Extension(csrf): Extension, + Form(form): Form, +) -> impl IntoResponse { + if crate::csrf::mismatch(&csrf, &form.csrf_token) { + return StatusCode::FORBIDDEN.into_response(); + } + + let cmd = DismissWatchEventsCommand { + user_id: user_id.value(), + event_ids: vec![event_id], + }; + + match dismiss_watch_events::execute(&state.app_ctx, cmd).await { + Ok(_) => Redirect::to("/watch-queue").into_response(), + Err(e) => { + let msg = encode_error(&e.to_string()); + Redirect::to(&format!("/watch-queue?error={msg}")).into_response() + } + } +} diff --git a/crates/presentation/src/handlers/mod.rs b/crates/presentation/src/handlers/mod.rs index d6b9ff5..7383f8e 100644 --- a/crates/presentation/src/handlers/mod.rs +++ b/crates/presentation/src/handlers/mod.rs @@ -3,6 +3,7 @@ pub mod html; pub mod images; pub mod import; pub mod rss; +pub mod webhook; const DEFAULT_PAGE_LIMIT: u32 = 5; const RSS_FEED_LIMIT: u32 = 50; diff --git a/crates/presentation/src/handlers/webhook.rs b/crates/presentation/src/handlers/webhook.rs new file mode 100644 index 0000000..d527767 --- /dev/null +++ b/crates/presentation/src/handlers/webhook.rs @@ -0,0 +1,319 @@ +use axum::{ + Json, + extract::{Multipart, Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, +}; +use uuid::Uuid; + +use api_types::{ + ConfirmWatchRequest, ConfirmWatchResponse, DismissWatchRequest, DismissWatchResponse, + GenerateTokenRequest, GenerateTokenResponse, WatchQueueEntryDto, WebhookTokenDto, +}; +use application::{ + commands::{ + ConfirmWatchEventsCommand, DismissWatchEventsCommand, GenerateWebhookTokenCommand, + IngestWatchEventCommand, RevokeWebhookTokenCommand, WatchEventConfirmation, + }, + queries::{GetWatchQueueQuery, GetWebhookTokensQuery}, + use_cases::{ + confirm_watch_events, dismiss_watch_events, generate_webhook_token, get_watch_queue, + get_webhook_tokens, ingest_watch_event, revoke_webhook_token, + }, +}; +use domain::models::WatchEventSource; + +use crate::{errors::ApiError, extractors::AuthenticatedUser, state::AppState}; + +fn extract_bearer_token(headers: &HeaderMap) -> Option { + headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) + .map(|t| t.trim().to_string()) +} + +#[derive(serde::Deserialize, Default)] +pub struct WebhookTokenQuery { + pub token: Option, +} + +fn extract_webhook_token(headers: &HeaderMap, query: &WebhookTokenQuery) -> Option { + extract_bearer_token(headers).or_else(|| query.token.clone()) +} + +// ── Webhook ingestion (no JWT, uses webhook bearer token) ───────────────────── + +#[utoipa::path( + post, path = "/api/v1/webhooks/jellyfin", + request_body(content = String, description = "Jellyfin webhook JSON payload (SendAllProperties=true)", content_type = "application/json"), + responses( + (status = 200, description = "Event accepted or ignored"), + (status = 400, description = "Invalid payload"), + (status = 401, description = "Invalid or missing webhook token"), + ), + security(("bearer_auth" = [])) +)] +pub async fn post_jellyfin_webhook( + State(state): State, + headers: HeaderMap, + axum::extract::Query(query): axum::extract::Query, + body: axum::body::Bytes, +) -> impl IntoResponse { + let token = match extract_webhook_token(&headers, &query) { + Some(t) => t, + None => return StatusCode::UNAUTHORIZED, + }; + + let cmd = IngestWatchEventCommand { + token, + raw_payload: body.to_vec(), + source: WatchEventSource::Jellyfin, + }; + + run_ingest(&state, cmd, &jellyfin::JellyfinParser).await +} + +// ── Plex webhook (multipart form data with `payload` JSON field) ────────────── + +#[utoipa::path( + post, path = "/api/v1/webhooks/plex", + request_body(content = String, description = "Plex webhook multipart form (payload field contains JSON)", content_type = "multipart/form-data"), + responses( + (status = 200, description = "Event accepted or ignored"), + (status = 400, description = "Invalid payload"), + (status = 401, description = "Invalid or missing webhook token"), + ), + security(("bearer_auth" = [])) +)] +pub async fn post_plex_webhook( + State(state): State, + headers: HeaderMap, + axum::extract::Query(query): axum::extract::Query, + mut multipart: Multipart, +) -> impl IntoResponse { + let token = match extract_webhook_token(&headers, &query) { + Some(t) => t, + None => return StatusCode::UNAUTHORIZED, + }; + + let mut payload_bytes: Option> = None; + while let Ok(Some(field)) = multipart.next_field().await { + if field.name() == Some("payload") + && let Ok(bytes) = field.bytes().await + { + payload_bytes = Some(bytes.to_vec()); + break; + } + } + + let raw_payload = match payload_bytes { + Some(b) => b, + None => return StatusCode::BAD_REQUEST, + }; + + let cmd = IngestWatchEventCommand { + token, + raw_payload, + source: WatchEventSource::Plex, + }; + + run_ingest(&state, cmd, &plex::PlexParser).await +} + +async fn run_ingest( + state: &AppState, + cmd: IngestWatchEventCommand, + parser: &dyn domain::ports::MediaServerParser, +) -> StatusCode { + match ingest_watch_event::execute(&state.app_ctx, cmd, parser).await { + Ok(()) => StatusCode::OK, + Err(domain::errors::DomainError::Unauthorized(_)) => StatusCode::UNAUTHORIZED, + Err(domain::errors::DomainError::ValidationError(_)) => StatusCode::BAD_REQUEST, + Err(e) => { + tracing::error!("webhook ingestion failed: {e:?}"); + StatusCode::INTERNAL_SERVER_ERROR + } + } +} + +// ── Token management (JWT-authenticated) ────────────────────────────────────── + +#[utoipa::path( + post, path = "/api/v1/settings/webhook-tokens", + request_body = GenerateTokenRequest, + responses( + (status = 200, body = GenerateTokenResponse), + (status = 401, description = "Unauthorized"), + ), + security(("bearer_auth" = [])) +)] +pub async fn post_generate_webhook_token( + State(state): State, + user: AuthenticatedUser, + Json(req): Json, +) -> Result, ApiError> { + let provider: WatchEventSource = req + .provider + .parse() + .map_err(|e: String| domain::errors::DomainError::ValidationError(e))?; + + let cmd = GenerateWebhookTokenCommand { + user_id: user.0.value(), + provider: provider.clone(), + label: req.label, + }; + + let result = generate_webhook_token::execute(&state.app_ctx, cmd).await?; + + let base_url = &state.app_ctx.config.base_url; + let webhook_url = format!("{base_url}/api/v1/webhooks/{provider}"); + + Ok(Json(GenerateTokenResponse { + id: result.token.id().value().to_string(), + token: result.token_plaintext, + webhook_url, + })) +} + +#[utoipa::path( + get, path = "/api/v1/settings/webhook-tokens", + responses( + (status = 200, body = Vec), + (status = 401, description = "Unauthorized"), + ), + security(("bearer_auth" = [])) +)] +pub async fn get_webhook_tokens( + State(state): State, + user: AuthenticatedUser, +) -> Result>, ApiError> { + let query = GetWebhookTokensQuery { + user_id: user.0.value(), + }; + let tokens = get_webhook_tokens::execute(&state.app_ctx, query).await?; + + let dtos = tokens + .into_iter() + .map(|t| WebhookTokenDto { + id: t.id().value().to_string(), + provider: t.provider().to_string(), + label: t.label().map(String::from), + created_at: t.created_at().to_string(), + last_used_at: t.last_used_at().map(|d| d.to_string()), + }) + .collect(); + + Ok(Json(dtos)) +} + +#[utoipa::path( + delete, path = "/api/v1/settings/webhook-tokens/{id}", + params(("id" = Uuid, Path, description = "Webhook token ID")), + responses( + (status = 204, description = "Token revoked"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Token not found"), + ), + security(("bearer_auth" = [])) +)] +pub async fn delete_webhook_token( + State(state): State, + user: AuthenticatedUser, + Path(id): Path, +) -> Result { + let cmd = RevokeWebhookTokenCommand { + user_id: user.0.value(), + token_id: id, + }; + revoke_webhook_token::execute(&state.app_ctx, cmd).await?; + Ok(StatusCode::NO_CONTENT) +} + +// ── Watch queue (JWT-authenticated) ─────────────────────────────────────────── + +#[utoipa::path( + get, path = "/api/v1/watch-queue", + responses( + (status = 200, body = Vec), + (status = 401, description = "Unauthorized"), + ), + security(("bearer_auth" = [])) +)] +pub async fn get_watch_queue( + State(state): State, + user: AuthenticatedUser, +) -> Result>, ApiError> { + let query = GetWatchQueueQuery { + user_id: user.0.value(), + }; + let events = get_watch_queue::execute(&state.app_ctx, query).await?; + + let dtos = events + .into_iter() + .map(|e| WatchQueueEntryDto { + id: e.id().value().to_string(), + title: e.title().to_string(), + year: e.year(), + movie_id: e.movie_id().map(|m| m.value().to_string()), + source: e.source().to_string(), + watched_at: e.watched_at().to_string(), + }) + .collect(); + + Ok(Json(dtos)) +} + +#[utoipa::path( + post, path = "/api/v1/watch-queue/confirm", + request_body = ConfirmWatchRequest, + responses( + (status = 200, body = ConfirmWatchResponse), + (status = 401, description = "Unauthorized"), + ), + security(("bearer_auth" = [])) +)] +pub async fn post_confirm_watch_events( + State(state): State, + user: AuthenticatedUser, + Json(req): Json, +) -> Result, ApiError> { + let cmd = ConfirmWatchEventsCommand { + user_id: user.0.value(), + confirmations: req + .confirmations + .into_iter() + .map(|c| WatchEventConfirmation { + watch_event_id: c.watch_event_id, + rating: c.rating, + comment: c.comment, + }) + .collect(), + }; + + let confirmed = confirm_watch_events::execute(&state.app_ctx, cmd).await?; + Ok(Json(ConfirmWatchResponse { confirmed })) +} + +#[utoipa::path( + post, path = "/api/v1/watch-queue/dismiss", + request_body = DismissWatchRequest, + responses( + (status = 200, body = DismissWatchResponse), + (status = 401, description = "Unauthorized"), + ), + security(("bearer_auth" = [])) +)] +pub async fn post_dismiss_watch_events( + State(state): State, + user: AuthenticatedUser, + Json(req): Json, +) -> Result, ApiError> { + let cmd = DismissWatchEventsCommand { + user_id: user.0.value(), + event_ids: req.event_ids, + }; + + let dismissed = dismiss_watch_events::execute(&state.app_ctx, cmd).await?; + Ok(Json(DismissWatchResponse { dismissed })) +} diff --git a/crates/presentation/src/main.rs b/crates/presentation/src/main.rs index e918485..121208a 100644 --- a/crates/presentation/src/main.rs +++ b/crates/presentation/src/main.rs @@ -75,6 +75,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { let search_port = db.search_port; let search_command = db.search_command; let profile_fields_repo = db.profile_fields_repo; + let watch_event_repository = db.watch_event_repo; + let webhook_token_repository = db.webhook_token_repo; let db_pool = db.db_pool; // Wire up event channel, federation service, and ap_router @@ -199,6 +201,8 @@ async fn wire_dependencies() -> anyhow::Result<(AppState, axum::Router)> { import_profile_repository, movie_profile_repository, watchlist_repository, + watch_event_repository, + webhook_token_repository, profile_fields_repository: profile_fields_repo, #[cfg(feature = "federation")] remote_watchlist_repository: remote_watchlist_repo, diff --git a/crates/presentation/src/openapi/mod.rs b/crates/presentation/src/openapi/mod.rs index a2aa719..c466065 100644 --- a/crates/presentation/src/openapi/mod.rs +++ b/crates/presentation/src/openapi/mod.rs @@ -6,6 +6,7 @@ mod search; mod social; mod users; mod watchlist; +mod webhook; use axum::Router; use utoipa::{ @@ -40,6 +41,7 @@ fn build() -> utoipa::openapi::OpenApi { api.merge(import::ImportDoc::openapi()); api.merge(search::SearchDoc::openapi()); api.merge(watchlist::WatchlistDoc::openapi()); + api.merge(webhook::WebhookDoc::openapi()); #[cfg(feature = "federation")] api.merge(social::SocialDoc::openapi()); SecurityAddon.modify(&mut api); diff --git a/crates/presentation/src/openapi/webhook.rs b/crates/presentation/src/openapi/webhook.rs new file mode 100644 index 0000000..c1d5f6a --- /dev/null +++ b/crates/presentation/src/openapi/webhook.rs @@ -0,0 +1,32 @@ +use api_types::{ + ConfirmWatchEntry, ConfirmWatchRequest, ConfirmWatchResponse, DismissWatchRequest, + DismissWatchResponse, GenerateTokenRequest, GenerateTokenResponse, WatchQueueEntryDto, + WebhookTokenDto, +}; +use utoipa::OpenApi; + +#[derive(OpenApi)] +#[openapi( + paths( + crate::handlers::webhook::post_jellyfin_webhook, + crate::handlers::webhook::post_plex_webhook, + crate::handlers::webhook::post_generate_webhook_token, + crate::handlers::webhook::get_webhook_tokens, + crate::handlers::webhook::delete_webhook_token, + crate::handlers::webhook::get_watch_queue, + crate::handlers::webhook::post_confirm_watch_events, + crate::handlers::webhook::post_dismiss_watch_events, + ), + components(schemas( + GenerateTokenRequest, + GenerateTokenResponse, + WebhookTokenDto, + WatchQueueEntryDto, + ConfirmWatchRequest, + ConfirmWatchEntry, + ConfirmWatchResponse, + DismissWatchRequest, + DismissWatchResponse, + )) +)] +pub struct WebhookDoc; diff --git a/crates/presentation/src/routes.rs b/crates/presentation/src/routes.rs index b223148..46a28db 100644 --- a/crates/presentation/src/routes.rs +++ b/crates/presentation/src/routes.rs @@ -139,6 +139,30 @@ fn html_routes(rate_limit: u64) -> Router { .route( "/watchlist/{movie_id}/remove", routing::post(handlers::html::post_watchlist_remove), + ) + .route( + "/settings/integrations", + routing::get(handlers::html::get_integrations_page), + ) + .route( + "/settings/integrations/generate", + routing::post(handlers::html::post_generate_token), + ) + .route( + "/settings/integrations/{id}/revoke", + routing::post(handlers::html::post_revoke_token), + ) + .route( + "/watch-queue", + routing::get(handlers::html::get_watch_queue_page), + ) + .route( + "/watch-queue/{id}/confirm", + routing::post(handlers::html::post_confirm_single), + ) + .route( + "/watch-queue/{id}/dismiss", + routing::post(handlers::html::post_dismiss_single), ); #[cfg(feature = "federation")] @@ -301,12 +325,52 @@ fn api_routes(rate_limit: u64) -> Router { "/watchlist/{movie_id}", routing::get(handlers::api::get_watchlist_status) .delete(handlers::api::delete_watchlist_entry), + ) + .route( + "/settings/webhook-tokens", + routing::get(handlers::webhook::get_webhook_tokens) + .post(handlers::webhook::post_generate_webhook_token), + ) + .route( + "/settings/webhook-tokens/{id}", + routing::delete(handlers::webhook::delete_webhook_token), + ) + .route( + "/watch-queue", + routing::get(handlers::webhook::get_watch_queue), + ) + .route( + "/watch-queue/confirm", + routing::post(handlers::webhook::post_confirm_watch_events), + ) + .route( + "/watch-queue/dismiss", + routing::post(handlers::webhook::post_dismiss_watch_events), ); #[cfg(feature = "federation")] let base = base.merge(federation_api_routes()); - Router::new().nest("/api/v1", base.layer(GovernorLayer::new(cfg))) + let webhook_cfg = GovernorConfigBuilder::default() + .with_extractor(PeerIp::default()) + .expect_connect_info() + .quota_default(per_minute(rate_limit / 4)) + .finish() + .unwrap(); + let webhook_routes = Router::new() + .route( + "/webhooks/jellyfin", + routing::post(handlers::webhook::post_jellyfin_webhook), + ) + .route( + "/webhooks/plex", + routing::post(handlers::webhook::post_plex_webhook), + ) + .layer(GovernorLayer::new(webhook_cfg)); + + Router::new() + .nest("/api/v1", base.layer(GovernorLayer::new(cfg))) + .nest("/api/v1", webhook_routes) } #[cfg(feature = "federation")] diff --git a/crates/presentation/src/tests/extractors.rs b/crates/presentation/src/tests/extractors.rs index 3129b3c..f883904 100644 --- a/crates/presentation/src/tests/extractors.rs +++ b/crates/presentation/src/tests/extractors.rs @@ -488,6 +488,18 @@ impl crate::ports::HtmlRenderer for Panic { ) -> Result { panic!() } + fn render_integrations_page( + &self, + _: application::ports::IntegrationsPageData, + ) -> Result { + panic!() + } + fn render_watch_queue_page( + &self, + _: application::ports::WatchQueuePageData, + ) -> Result { + panic!() + } } impl crate::ports::RssFeedRenderer for Panic { fn render_feed(&self, _: &[DiaryEntry], _: &str) -> Result { @@ -571,6 +583,77 @@ impl domain::ports::RemoteWatchlistRepository for Panic { } } +#[async_trait::async_trait] +impl domain::ports::WatchEventRepository for Panic { + async fn save(&self, _: &domain::models::WatchEvent) -> Result<(), DomainError> { + panic!() + } + async fn update_status( + &self, + _: &domain::value_objects::WatchEventId, + _: domain::models::WatchEventStatus, + ) -> Result<(), DomainError> { + panic!() + } + async fn list_pending( + &self, + _: &domain::value_objects::UserId, + ) -> Result, DomainError> { + panic!() + } + async fn get_by_id( + &self, + _: &domain::value_objects::WatchEventId, + ) -> Result, DomainError> { + panic!() + } + async fn find_duplicate( + &self, + _: &domain::value_objects::UserId, + _: &str, + _: chrono::NaiveDateTime, + ) -> Result { + panic!() + } + async fn delete_non_pending_older_than( + &self, + _: chrono::NaiveDateTime, + ) -> Result { + panic!() + } +} +#[async_trait::async_trait] +impl domain::ports::WebhookTokenRepository for Panic { + async fn save(&self, _: &domain::models::WebhookToken) -> Result<(), DomainError> { + panic!() + } + async fn find_by_token_hash( + &self, + _: &str, + ) -> Result, DomainError> { + panic!() + } + async fn list_by_user( + &self, + _: &domain::value_objects::UserId, + ) -> Result, DomainError> { + panic!() + } + async fn delete( + &self, + _: &domain::value_objects::WebhookTokenId, + _: &domain::value_objects::UserId, + ) -> Result<(), DomainError> { + panic!() + } + async fn touch_last_used( + &self, + _: &domain::value_objects::WebhookTokenId, + ) -> Result<(), DomainError> { + panic!() + } +} + // --- Single state factory — only auth_service varies --- pub fn make_test_state(auth_service: Arc) -> crate::state::AppState { @@ -593,6 +676,8 @@ pub fn make_test_state(auth_service: Arc) -> crate::state::AppS import_profile_repository: Arc::clone(&repo) as _, movie_profile_repository: Arc::clone(&repo) as _, watchlist_repository: Arc::clone(&repo) as _, + watch_event_repository: Arc::clone(&repo) as _, + webhook_token_repository: Arc::clone(&repo) as _, profile_fields_repository: Arc::clone(&repo) as _, #[cfg(feature = "federation")] remote_watchlist_repository: Arc::clone(&repo) as _, diff --git a/crates/presentation/tests/api_test.rs b/crates/presentation/tests/api_test.rs index 006a2e1..a186bf5 100644 --- a/crates/presentation/tests/api_test.rs +++ b/crates/presentation/tests/api_test.rs @@ -411,6 +411,8 @@ async fn test_app() -> Router { import_profile_repository: Arc::new(PanicImportProfile), movie_profile_repository: Arc::new(PanicMovieProfile), watchlist_repository: Arc::new(PanicWatchlist), + watch_event_repository: Arc::new(domain::testing::PanicWatchEventRepository), + webhook_token_repository: Arc::new(domain::testing::PanicWebhookTokenRepository), profile_fields_repository: Arc::new(PanicProfileFields), #[cfg(feature = "federation")] remote_watchlist_repository: Arc::new(PanicRemoteWatchlist), diff --git a/crates/worker/src/db.rs b/crates/worker/src/db.rs index 94887c2..947c614 100644 --- a/crates/worker/src/db.rs +++ b/crates/worker/src/db.rs @@ -5,7 +5,8 @@ use domain::ports::{ DiaryRepository, ImageRefCommand, ImageRefQuery, ImportProfileRepository, ImportSessionRepository, LocalApContentQuery, MovieProfileRepository, MovieRepository, PersonCommand, PersonQuery, ReviewRepository, SearchCommand, SearchPort, StatsRepository, - UserProfileFieldsRepository, UserRepository, WatchlistRepository, + UserProfileFieldsRepository, UserRepository, WatchEventRepository, WatchlistRepository, + WebhookTokenRepository, }; pub enum DbPool { @@ -33,6 +34,8 @@ pub struct Repos { pub search_command: Arc, pub search_port: Arc, pub profile_fields: Arc, + pub watch_event: Arc, + pub webhook_token: Arc, } pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos, DbPool)> { @@ -47,6 +50,10 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos let (search_command, search_port) = postgres_search::create_search_adapter(pool.clone()); let pf = postgres::create_profile_fields_repo(pool.clone()); + let we: Arc = + Arc::new(postgres::PostgresWatchEventRepository::new(pool.clone())); + let wt: Arc = + Arc::new(postgres::PostgresWebhookTokenRepository::new(pool.clone())); Ok(( Repos { movie: m, @@ -66,6 +73,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos search_command, search_port, profile_fields: pf, + watch_event: we, + webhook_token: wt, }, DbPool::Postgres(pool), )) @@ -79,6 +88,10 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos let (person_command, person_query) = sqlite::create_person_adapter(pool.clone()); let (search_command, search_port) = sqlite_search::create_search_adapter(pool.clone()); let pf = sqlite::create_profile_fields_repo(pool.clone()); + let we: Arc = + Arc::new(sqlite::SqliteWatchEventRepository::new(pool.clone())); + let wt: Arc = + Arc::new(sqlite::SqliteWebhookTokenRepository::new(pool.clone())); Ok(( Repos { movie: m, @@ -98,6 +111,8 @@ pub async fn connect(database_url: &str, backend: &str) -> anyhow::Result<(Repos search_command, search_port, profile_fields: pf, + watch_event: we, + webhook_token: wt, }, DbPool::Sqlite(pool), )) diff --git a/crates/worker/src/main.rs b/crates/worker/src/main.rs index 5e0cc84..0fcd373 100644 --- a/crates/worker/src/main.rs +++ b/crates/worker/src/main.rs @@ -88,6 +88,8 @@ async fn main() -> anyhow::Result<()> { import_profile_repository: repos.import_profile, movie_profile_repository: repos.movie_profile, watchlist_repository: repos.watchlist, + watch_event_repository: repos.watch_event, + webhook_token_repository: repos.webhook_token, profile_fields_repository: Arc::clone(&profile_fields_repo), #[cfg(feature = "federation")] remote_watchlist_repository: fed_remote_watchlist_repo.clone(), @@ -137,9 +139,10 @@ async fn main() -> anyhow::Result<()> { // ── Periodic jobs ───────────────────────────────────────────────────────── - let mut periodic_jobs: Vec> = vec![Arc::new( - application::jobs::ImportSessionCleanupJob::new(ctx.clone()), - )]; + let mut periodic_jobs: Vec> = vec![ + Arc::new(application::jobs::ImportSessionCleanupJob::new(ctx.clone())), + Arc::new(application::jobs::WatchEventCleanupJob::new(ctx.clone())), + ]; if let Some(job) = enrichment_job { periodic_jobs.push(job); }