use async_trait::async_trait; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use uuid::Uuid; use crate::{ errors::DomainError, events::{DomainEvent, EventEnvelope}, models::wrapup::WrapUpReport, models::{ AnnotatedRow, DiaryEntry, DiaryFilter, EntityType, ExportFormat, ExternalPersonId, FeedEntry, FieldMapping, FileFormat, Goal, ImportError, ImportProfile, ImportSession, IndexableDocument, Movie, MovieFilter, MovieProfile, MovieStats, MovieSummary, ParsedFile, ParsedPlaybackEvent, Person, PersonCredits, PersonEnrichmentData, PersonId, RefreshSession, RemoteGoalEntry, RemoteWatchlistEntry, Review, ReviewHistory, SearchQuery, SearchResults, User, UserSettings, UserStats, UserSummary, UserTrends, WatchEvent, WatchEventStatus, WatchlistEntry, WatchlistWithMovie, WebhookToken, collections::{self, PageParams, Paginated}, wrapup::{DateRange, WrapUpRecord, WrapUpScope, WrapUpStatus}, }, value_objects::{ Email, ExternalMetadataId, GoalId, ImportProfileId, ImportSessionId, MovieId, MovieTitle, PasswordHash, PosterUrl, ReleaseYear, ReviewId, UserId, Username, WatchEventId, WebhookTokenId, WrapUpId, }, }; pub trait DocumentParser: Send + Sync { fn parse(&self, bytes: &[u8], format: FileFormat) -> Result; fn apply_mapping(&self, file: &ParsedFile, mappings: &[FieldMapping]) -> Vec; } #[derive(Debug, Clone, Default, PartialEq)] pub enum FeedSortBy { #[default] Date, DateAsc, Rating, RatingAsc, } impl std::str::FromStr for FeedSortBy { type Err = std::convert::Infallible; fn from_str(s: &str) -> Result { Ok(match s { "date_asc" => Self::DateAsc, "rating" => Self::Rating, "rating_asc" => Self::RatingAsc, _ => Self::Date, }) } } #[derive(Debug, Clone, Default)] pub struct FollowingFilter { pub local_user_ids: Vec, pub remote_actor_urls: Vec, } #[derive(Debug, Clone)] pub struct RemoteActorInfo { pub url: String, pub handle: String, pub display_name: Option, } #[derive(Debug, Clone)] pub struct PendingFollowerInfo { pub url: String, pub handle: String, pub display_name: Option, pub avatar_url: Option, } #[async_trait] pub trait SocialQueryPort: Send + Sync { async fn get_accepted_following_urls( &self, user_id: uuid::Uuid, ) -> Result, DomainError>; async fn list_all_followed_remote_actors(&self) -> Result, DomainError>; async fn count_following(&self, user_id: uuid::Uuid) -> Result; async fn count_accepted_followers(&self, user_id: uuid::Uuid) -> Result; async fn get_pending_followers( &self, user_id: uuid::Uuid, ) -> Result, DomainError>; } #[async_trait] pub trait MovieRepository: Send + Sync { async fn get_movie_by_external_id( &self, external_metadata_id: &ExternalMetadataId, ) -> Result, DomainError>; async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError>; async fn get_movies_by_title_and_year( &self, title: &MovieTitle, year: &ReleaseYear, ) -> Result, DomainError>; async fn upsert_movie(&self, movie: &Movie) -> Result<(), DomainError>; async fn delete_movie(&self, movie_id: &MovieId) -> Result<(), DomainError>; async fn existing_external_ids( &self, ids: &[ExternalMetadataId], ) -> Result, DomainError>; async fn existing_title_year_pairs( &self, pairs: &[(MovieTitle, ReleaseYear)], ) -> Result, DomainError>; async fn list_movies( &self, page: &collections::PageParams, filter: &MovieFilter, ) -> Result, DomainError>; } #[async_trait] pub trait ReviewRepository: Send + Sync { async fn save_review(&self, review: &Review) -> Result; async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError>; async fn delete_review(&self, review_id: &ReviewId) -> Result<(), DomainError>; async fn get_all_reviews_for_user(&self, user_id: &UserId) -> Result, DomainError>; } #[async_trait] pub trait DiaryRepository: Send + Sync { async fn query_diary(&self, filter: &DiaryFilter) -> Result, DomainError>; async fn query_activity_feed( &self, page: &PageParams, ) -> Result, DomainError>; async fn query_activity_feed_filtered( &self, page: &PageParams, sort_by: &FeedSortBy, search: Option<&str>, following: Option<&FollowingFilter>, ) -> Result, DomainError>; async fn get_review_history(&self, movie_id: &MovieId) -> Result; async fn get_user_history(&self, user_id: &UserId) -> Result, DomainError>; fn stream_user_history( &self, user_id: UserId, ) -> futures::stream::BoxStream<'static, Result>; async fn get_movie_stats(&self, movie_id: &MovieId) -> Result; async fn get_movie_social_feed( &self, movie_id: &MovieId, page: &PageParams, ) -> Result, DomainError>; async fn count_local_posts(&self) -> Result; } #[async_trait] pub trait StatsRepository: Send + Sync { async fn get_user_stats(&self, user_id: &UserId) -> Result; async fn get_user_trends(&self, user_id: &UserId) -> Result; } pub enum MetadataSearchCriteria { ImdbId(ExternalMetadataId), Title { title: MovieTitle, year: Option, }, } #[async_trait] pub trait MetadataClient: Send + Sync { async fn fetch_movie_metadata( &self, criteria: &MetadataSearchCriteria, ) -> Result; async fn get_poster_url( &self, external_metadata_id: &ExternalMetadataId, ) -> Result, DomainError>; } #[async_trait] pub trait PosterFetcherClient: Send + Sync { async fn fetch_poster_bytes(&self, poster_url: &PosterUrl) -> Result, DomainError>; } #[async_trait] pub trait ObjectStorage: Send + Sync { /// Stores `image_bytes` at `key` and returns the stored key. async fn store(&self, key: &str, image_bytes: &[u8]) -> Result; async fn get(&self, key: &str) -> Result, DomainError>; async fn get_stream( &self, key: &str, ) -> Result>, DomainError>; async fn delete(&self, key: &str) -> Result<(), DomainError>; } pub struct GeneratedToken { pub token: String, pub expires_at: DateTime, } #[async_trait] pub trait AuthService: Send + Sync { async fn generate_token(&self, user_id: &UserId) -> Result; async fn validate_token(&self, token: &str) -> Result; } #[async_trait] pub trait UserRepository: Send + Sync { async fn find_by_email(&self, email: &Email) -> Result, DomainError>; async fn find_by_username(&self, username: &Username) -> Result, DomainError>; async fn save(&self, user: &User) -> Result<(), DomainError>; async fn find_by_id(&self, id: &UserId) -> Result, DomainError>; async fn list_with_stats(&self) -> Result, DomainError>; async fn update_profile( &self, user_id: &UserId, profile: &crate::models::UserProfile, ) -> Result<(), DomainError>; } #[async_trait] pub trait UserProfileFieldsRepository: Send + Sync { async fn get_fields( &self, user_id: &UserId, ) -> Result, DomainError>; async fn set_fields( &self, user_id: &UserId, fields: Vec, ) -> Result<(), DomainError>; } #[async_trait] pub trait EventPublisher: Send + Sync { async fn publish(&self, event: &DomainEvent) -> Result<(), DomainError>; } pub trait EventConsumer: Send + Sync { /// Returns a stream of event envelopes. Each envelope carries a domain event /// and an ack handle — callers ack after successful dispatch, nack on failure. /// Implementations decide transport (NATS, DB queue, in-memory channel). fn consume(&self) -> futures::stream::BoxStream<'_, Result>; } #[async_trait] pub trait PasswordHasher: Send + Sync { async fn hash(&self, plain_password: &str) -> Result; async fn verify(&self, plain_password: &str, hash: &PasswordHash) -> Result; } pub trait DiaryExporter: Send + Sync { fn stream_entries( &self, stream: futures::stream::BoxStream<'static, Result>, format: ExportFormat, ) -> futures::stream::BoxStream<'static, Result>; } #[async_trait] pub trait EventHandler: Send + Sync { async fn handle(&self, event: &DomainEvent) -> Result<(), DomainError>; } #[async_trait] pub trait PeriodicJob: Send + Sync { fn interval(&self) -> std::time::Duration; async fn run(&self) -> Result<(), DomainError>; } #[async_trait] pub trait MovieProfileRepository: Send + Sync { async fn upsert(&self, profile: &MovieProfile) -> Result<(), DomainError>; async fn get_by_movie_id(&self, id: &MovieId) -> Result, DomainError>; /// Returns (movie_id, external_metadata_id) for movies with no profile or a stale one /// (enriched_at older than 30 days). async fn list_stale(&self) -> Result, DomainError>; } #[async_trait] pub trait MovieEnrichmentClient: Send + Sync { /// Resolves an external ID (TMDb or IMDb) and fetches the full movie profile. async fn fetch_profile( &self, movie_id: MovieId, external_metadata_id: &str, ) -> Result; } #[async_trait] pub trait PersonEnrichmentClient: Send + Sync { async fn fetch_details(&self, external_id: &str) -> Result; } #[async_trait] pub trait ImportSessionRepository: Send + Sync { async fn create(&self, session: &ImportSession) -> Result<(), DomainError>; async fn get( &self, id: &ImportSessionId, user_id: &UserId, ) -> Result, DomainError>; async fn update(&self, session: &ImportSession) -> Result<(), DomainError>; async fn delete(&self, id: &ImportSessionId) -> Result<(), DomainError>; async fn delete_expired(&self) -> Result; async fn delete_expired_for_user(&self, user_id: &UserId) -> Result<(), DomainError>; } #[async_trait] pub trait RefreshSessionRepository: Send + Sync { async fn create(&self, session: &RefreshSession) -> Result<(), DomainError>; async fn get_by_token(&self, token: &str) -> Result, DomainError>; async fn revoke(&self, token: &str) -> Result<(), DomainError>; async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<(), DomainError>; async fn delete_expired(&self) -> Result; } #[async_trait] pub trait ImportProfileRepository: Send + Sync { async fn save(&self, profile: &ImportProfile) -> Result<(), DomainError>; async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError>; async fn get( &self, id: &ImportProfileId, user_id: &UserId, ) -> Result, DomainError>; async fn delete(&self, id: &ImportProfileId) -> Result<(), DomainError>; } #[async_trait] pub trait ImageRefCommand: Send + Sync { async fn swap(&self, old_key: &str, new_key: &str) -> Result<(), DomainError>; } #[async_trait] pub trait ImageRefQuery: Send + Sync { async fn list_keys(&self) -> Result, DomainError>; } /// Write port — mutates the persons table. No reads. #[async_trait] pub trait PersonCommand: Send + Sync { /// Upsert a batch of persons. Uses INSERT OR REPLACE (SQLite) / ON CONFLICT DO UPDATE (Postgres). async fn upsert_batch(&self, persons: &[Person]) -> Result<(), DomainError>; /// Insert a batch of missing persons from movie_cast/movie_crew into the persons table. /// Returns (inserted_count, has_more). async fn backfill_from_credits_batch( &self, batch_size: u32, ) -> Result<(u64, bool), DomainError>; async fn update_enrichment( &self, id: &PersonId, data: &PersonEnrichmentData, ) -> Result<(), DomainError>; } /// Read port — queries persons and credits. No mutations. #[async_trait] pub trait PersonQuery: Send + Sync { async fn get_by_id(&self, id: &PersonId) -> Result, DomainError>; async fn get_by_external_id( &self, id: &ExternalPersonId, ) -> Result, DomainError>; /// Returns the person's full cast and crew credit history across all indexed movies. async fn get_credits(&self, id: &PersonId) -> Result; /// Returns persons who have no remaining entries in movie_cast or movie_crew. /// Called after movie deletion to find index entries that can be pruned. async fn list_orphaned_persons(&self) -> Result, DomainError>; async fn list_page(&self, limit: u32, offset: u32) -> Result, DomainError>; } /// Read port — executes search queries. No mutations. #[async_trait] pub trait SearchPort: Send + Sync { async fn search(&self, query: &SearchQuery) -> Result; } /// Write port — manages the search index. No reads. #[async_trait] pub trait SearchCommand: Send + Sync { /// Add or replace a document in the search index. async fn index(&self, doc: IndexableDocument) -> Result<(), DomainError>; /// Remove a document from the search index by entity type and internal ID string. async fn remove(&self, entity_type: EntityType, id: &str) -> Result<(), DomainError>; } #[async_trait] pub trait WatchlistRepository: Send + Sync { /// Add a new entry. Silently succeeds if the entry already exists. async fn add(&self, entry: &WatchlistEntry) -> Result<(), DomainError>; /// Remove an entry. Returns NotFound if the entry does not exist. async fn remove(&self, user_id: &UserId, movie_id: &MovieId) -> Result<(), DomainError>; /// Remove an entry if it exists. Never returns NotFound. async fn remove_if_present( &self, user_id: &UserId, movie_id: &MovieId, ) -> Result; async fn get_for_user( &self, user_id: &UserId, page: &collections::PageParams, ) -> Result, DomainError>; async fn contains(&self, user_id: &UserId, movie_id: &MovieId) -> Result; } #[async_trait] pub trait RemoteWatchlistRepository: Send + Sync { async fn save(&self, entry: RemoteWatchlistEntry) -> Result<(), DomainError>; async fn remove_by_ap_id(&self, ap_id: &str, actor_url: &str) -> Result<(), DomainError>; async fn get_by_actor_url( &self, actor_url: &str, ) -> Result, DomainError>; async fn remove_all_by_actor(&self, actor_url: &str) -> Result<(), DomainError>; /// Find entries for a remote actor whose URL hashes (v5 UUID) to the given UUID. async fn get_by_derived_uuid( &self, uuid: uuid::Uuid, ) -> Result, DomainError>; } // ── Goals ──────────────────────────────────────────────────────────────────── #[async_trait] pub trait GoalRepository: Send + Sync { async fn save(&self, goal: &Goal) -> Result<(), DomainError>; async fn update(&self, goal: &Goal) -> Result<(), DomainError>; async fn delete(&self, id: &GoalId, user_id: &UserId) -> Result<(), DomainError>; async fn find_by_user_and_year( &self, user_id: &UserId, year: u16, ) -> Result, DomainError>; async fn list_for_user(&self, user_id: &UserId) -> Result, DomainError>; async fn count_reviews_in_year(&self, user_id: &UserId, year: u16) -> Result; } #[async_trait] pub trait UserSettingsRepository: Send + Sync { async fn get(&self, user_id: &UserId) -> Result; async fn save(&self, settings: &UserSettings) -> Result<(), DomainError>; } #[async_trait] pub trait RemoteGoalRepository: Send + Sync { async fn save(&self, entry: RemoteGoalEntry) -> Result<(), DomainError>; async fn update_by_ap_id( &self, ap_id: &str, target: u32, current: u32, ) -> Result<(), DomainError>; async fn remove_by_ap_id(&self, ap_id: &str, actor_url: &str) -> Result<(), DomainError>; async fn remove_all_by_actor(&self, actor_url: &str) -> Result<(), DomainError>; async fn get_by_actor_url(&self, actor_url: &str) -> Result, DomainError>; } /// Read-only query port used exclusively by the ActivityPub adapter. /// Consolidates all reads the AP adapter needs so it never touches write repositories. #[async_trait] pub trait LocalApContentQuery: Send + Sync { async fn get_local_reviews_for_user( &self, user_id: &UserId, ) -> Result, DomainError>; async fn get_local_watchlist_for_user( &self, user_id: &UserId, ) -> Result, DomainError>; async fn get_review_by_id(&self, review_id: &ReviewId) -> Result, DomainError>; async fn get_movie_by_id(&self, movie_id: &MovieId) -> Result, DomainError>; async fn count_local_posts(&self) -> Result; async fn get_local_reviews_for_movie( &self, movie_id: &MovieId, ) -> Result, DomainError>; async fn get_local_reviews_page( &self, user_id: &UserId, before: Option, limit: usize, ) -> Result, DomainError>; async fn get_user_federate_goals(&self, user_id: &UserId) -> Result; async fn get_goal_with_progress( &self, user_id: &UserId, year: u16, ) -> Result, DomainError>; } // ── Media server integration ────────────────────────────────────────────────── pub trait MediaServerParser: Send + Sync { fn parse_playback_event(&self, body: &[u8]) -> Result, DomainError>; } #[async_trait] pub trait WatchEventRepository: Send + Sync { async fn save(&self, event: &WatchEvent) -> Result<(), DomainError>; async fn update_status( &self, id: &WatchEventId, status: WatchEventStatus, ) -> Result<(), DomainError>; async fn list_pending(&self, user_id: &UserId) -> Result, DomainError>; async fn get_by_id(&self, id: &WatchEventId) -> Result, DomainError>; async fn get_by_ids(&self, ids: &[WatchEventId]) -> Result, DomainError>; async fn update_status_batch( &self, ids: &[WatchEventId], status: WatchEventStatus, ) -> Result; async fn find_duplicate( &self, user_id: &UserId, external_id: &str, after: chrono::NaiveDateTime, ) -> Result; async fn delete_non_pending_older_than( &self, before: chrono::NaiveDateTime, ) -> Result; } #[async_trait] pub trait WebhookTokenRepository: Send + Sync { async fn save(&self, token: &WebhookToken) -> Result<(), DomainError>; async fn find_by_token_hash(&self, hash: &str) -> Result, DomainError>; async fn list_by_user(&self, user_id: &UserId) -> Result, DomainError>; async fn delete(&self, id: &WebhookTokenId, user_id: &UserId) -> Result<(), DomainError>; async fn touch_last_used(&self, id: &WebhookTokenId) -> Result<(), DomainError>; } #[async_trait] pub trait WrapUpRepository: Send + Sync { async fn create(&self, record: &WrapUpRecord) -> Result<(), DomainError>; async fn update_status( &self, id: &WrapUpId, status: &WrapUpStatus, error: Option<&str>, ) -> Result<(), DomainError>; async fn set_complete(&self, id: &WrapUpId, report: &WrapUpReport) -> Result<(), DomainError>; async fn get_by_id(&self, id: &WrapUpId) -> Result, DomainError>; async fn list_for_user(&self, user_id: Uuid) -> Result, DomainError>; async fn list_global(&self) -> Result, DomainError>; async fn find_existing( &self, user_id: Option, start: NaiveDate, end: NaiveDate, ) -> Result, DomainError>; async fn delete(&self, id: &WrapUpId) -> Result<(), DomainError>; async fn delete_failed_older_than( &self, before: chrono::NaiveDateTime, ) -> Result; } // ── Wrap-up / Year-in-Review ───────────────────────────────────────────────── #[derive(Clone, Debug)] pub struct WrapUpMovieRow { pub movie_id: Uuid, pub title: String, pub release_year: u16, pub director: Option, pub poster_path: Option, pub rating: u8, pub watched_at: NaiveDateTime, pub user_id: Uuid, pub runtime_minutes: Option, pub budget_usd: Option, pub original_language: Option, pub genres: Vec, pub keywords: Vec, pub cast_names: Vec<(String, u32, i64)>, pub cast_profile_paths: Vec>, } #[async_trait] pub trait WrapUpStatsQuery: Send + Sync { async fn get_reviews_with_profiles( &self, scope: &WrapUpScope, range: &DateRange, ) -> Result, DomainError>; }