This commit is contained in:
@@ -18,33 +18,42 @@ use payload::DbEventPayload;
|
||||
|
||||
pub struct DbEventQueueConfig {
|
||||
pub poll_interval_ms: u64,
|
||||
pub batch_size: i64,
|
||||
pub max_attempts: i32,
|
||||
pub batch_size: i64,
|
||||
pub max_attempts: i32,
|
||||
}
|
||||
|
||||
impl DbEventQueueConfig {
|
||||
pub fn from_env() -> Self {
|
||||
Self {
|
||||
poll_interval_ms: std::env::var("EVENT_QUEUE_POLL_INTERVAL_MS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(500),
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(500),
|
||||
batch_size: std::env::var("EVENT_QUEUE_BATCH_SIZE")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(10),
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(10),
|
||||
max_attempts: std::env::var("EVENT_QUEUE_MAX_ATTEMPTS")
|
||||
.ok().and_then(|v| v.parse().ok()).unwrap_or(5),
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(5),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SqliteEventQueue {
|
||||
pool: SqlitePool,
|
||||
pool: SqlitePool,
|
||||
config: Arc<DbEventQueueConfig>,
|
||||
}
|
||||
|
||||
impl SqliteEventQueue {
|
||||
pub async fn create(pool: SqlitePool, config: DbEventQueueConfig) -> anyhow::Result<Self> {
|
||||
migrations::run(&pool).await?;
|
||||
Ok(Self { pool, config: Arc::new(config) })
|
||||
Ok(Self {
|
||||
pool,
|
||||
config: Arc::new(config),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_publisher(pool: SqlitePool) -> anyhow::Result<Arc<dyn EventPublisher>> {
|
||||
@@ -68,14 +77,12 @@ impl EventPublisher for SqliteEventQueue {
|
||||
let payload_json = serde_json::to_string(&db_payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("serialize: {e}")))?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO event_queue (event_type, payload) VALUES (?, ?)"
|
||||
)
|
||||
.bind(event_type)
|
||||
.bind(payload_json)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||
sqlx::query("INSERT INTO event_queue (event_type, payload) VALUES (?, ?)")
|
||||
.bind(event_type)
|
||||
.bind(payload_json)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("insert event: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -83,10 +90,10 @@ impl EventPublisher for SqliteEventQueue {
|
||||
|
||||
impl EventConsumer for SqliteEventQueue {
|
||||
fn consume(&self) -> BoxStream<'_, Result<EventEnvelope, DomainError>> {
|
||||
let pool = self.pool.clone();
|
||||
let config = Arc::clone(&self.config);
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let rx = Arc::new(Mutex::new(rx));
|
||||
let pool = self.pool.clone();
|
||||
let config = Arc::clone(&self.config);
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let rx = Arc::new(Mutex::new(rx));
|
||||
|
||||
tokio::spawn(async move {
|
||||
let poll_interval = Duration::from_millis(config.poll_interval_ms);
|
||||
@@ -124,16 +131,18 @@ impl EventConsumer for SqliteEventQueue {
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct QueueRow {
|
||||
id: i64,
|
||||
payload: String,
|
||||
id: i64,
|
||||
payload: String,
|
||||
attempts: i32,
|
||||
}
|
||||
|
||||
async fn claim_batch(
|
||||
pool: &SqlitePool,
|
||||
pool: &SqlitePool,
|
||||
config: &DbEventQueueConfig,
|
||||
) -> Result<Vec<QueueRow>, DomainError> {
|
||||
let mut tx = pool.begin().await
|
||||
let mut tx = pool
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("begin tx: {e}")))?;
|
||||
|
||||
let rows = sqlx::query_as::<_, QueueRow>(
|
||||
@@ -141,7 +150,7 @@ async fn claim_batch(
|
||||
WHERE status = 'pending'
|
||||
AND next_attempt_at <= strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
|
||||
ORDER BY next_attempt_at ASC
|
||||
LIMIT ?"
|
||||
LIMIT ?",
|
||||
)
|
||||
.bind(config.batch_size)
|
||||
.fetch_all(&mut *tx)
|
||||
@@ -159,36 +168,43 @@ async fn claim_batch(
|
||||
placeholders
|
||||
);
|
||||
let mut q = sqlx::query(&sql);
|
||||
for r in &rows { q = q.bind(r.id); }
|
||||
q.execute(&mut *tx).await
|
||||
for r in &rows {
|
||||
q = q.bind(r.id);
|
||||
}
|
||||
q.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("mark processing: {e}")))?;
|
||||
|
||||
tx.commit().await
|
||||
tx.commit()
|
||||
.await
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("commit claim: {e}")))?;
|
||||
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn decode_row(
|
||||
pool: &SqlitePool,
|
||||
row: QueueRow,
|
||||
pool: &SqlitePool,
|
||||
row: QueueRow,
|
||||
max_attempts: i32,
|
||||
) -> Result<EventEnvelope, DomainError> {
|
||||
let db_payload: DbEventPayload = serde_json::from_str(&row.payload)
|
||||
.map_err(|e| DomainError::InfrastructureError(format!("deserialize: {e}")))?;
|
||||
let event = DomainEvent::try_from(db_payload)?;
|
||||
Ok(EventEnvelope::new(event, Box::new(DbAckHandle {
|
||||
pool: pool.clone(),
|
||||
row_id: row.id,
|
||||
attempts: row.attempts,
|
||||
max_attempts,
|
||||
})))
|
||||
Ok(EventEnvelope::new(
|
||||
event,
|
||||
Box::new(DbAckHandle {
|
||||
pool: pool.clone(),
|
||||
row_id: row.id,
|
||||
attempts: row.attempts,
|
||||
max_attempts,
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
struct DbAckHandle {
|
||||
pool: SqlitePool,
|
||||
row_id: i64,
|
||||
attempts: i32,
|
||||
pool: SqlitePool,
|
||||
row_id: i64,
|
||||
attempts: i32,
|
||||
max_attempts: i32,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user