feat: add sidecar + processing migrations and postgres adapters
007_sidecar, 008_processing, 009_duplicate_groups migrations. Tag, sidecar, job, batch, plugin, pipeline, duplicate repos.
This commit is contained in:
126
crates/adapters/postgres/src/pipeline_repository.rs
Normal file
126
crates/adapters/postgres/src/pipeline_repository.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use crate::db::PgPool;
|
||||
use async_trait::async_trait;
|
||||
use domain::{
|
||||
entities::{PipelineStep, ProcessingPipeline},
|
||||
errors::DomainError,
|
||||
ports::PipelineRepository,
|
||||
value_objects::{MetadataValue, StructuredData, SystemId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct PipelineRow {
|
||||
pipeline_id: Uuid,
|
||||
trigger_event: String,
|
||||
steps: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct StepJson {
|
||||
plugin_id: Uuid,
|
||||
step_order: u32,
|
||||
configuration: serde_json::Map<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
fn steps_from_json(v: serde_json::Value) -> Vec<PipelineStep> {
|
||||
let arr: Vec<StepJson> = serde_json::from_value(v).unwrap_or_default();
|
||||
arr.into_iter()
|
||||
.map(|s| {
|
||||
let mut config = StructuredData::new();
|
||||
for (k, val) in s.configuration {
|
||||
config.insert(k, MetadataValue::from(val));
|
||||
}
|
||||
PipelineStep {
|
||||
plugin_id: SystemId::from_uuid(s.plugin_id),
|
||||
step_order: s.step_order,
|
||||
configuration: config,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn steps_to_json(steps: &[PipelineStep]) -> serde_json::Value {
|
||||
let arr: Vec<StepJson> = steps
|
||||
.iter()
|
||||
.map(|s| {
|
||||
let config: serde_json::Map<String, serde_json::Value> = s
|
||||
.configuration
|
||||
.inner()
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), serde_json::Value::from(v)))
|
||||
.collect();
|
||||
StepJson {
|
||||
plugin_id: *s.plugin_id.as_uuid(),
|
||||
step_order: s.step_order,
|
||||
configuration: config,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![]))
|
||||
}
|
||||
|
||||
impl From<PipelineRow> for ProcessingPipeline {
|
||||
fn from(r: PipelineRow) -> Self {
|
||||
Self {
|
||||
pipeline_id: SystemId::from_uuid(r.pipeline_id),
|
||||
trigger_event: r.trigger_event,
|
||||
steps: steps_from_json(r.steps),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresPipelineRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresPipelineRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PipelineRepository for PostgresPipelineRepository {
|
||||
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError> {
|
||||
let row = sqlx::query_as::<_, PipelineRow>(
|
||||
"SELECT pipeline_id, trigger_event, steps
|
||||
FROM processing_pipelines WHERE pipeline_id = $1",
|
||||
)
|
||||
.bind(*id.as_uuid())
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(row.map(Into::into))
|
||||
}
|
||||
|
||||
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
|
||||
let rows = sqlx::query_as::<_, PipelineRow>(
|
||||
"SELECT pipeline_id, trigger_event, steps
|
||||
FROM processing_pipelines WHERE trigger_event = $1",
|
||||
)
|
||||
.bind(event)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (pipeline_id) DO UPDATE SET
|
||||
trigger_event = EXCLUDED.trigger_event,
|
||||
steps = EXCLUDED.steps",
|
||||
)
|
||||
.bind(*pipeline.pipeline_id.as_uuid())
|
||||
.bind(&pipeline.trigger_event)
|
||||
.bind(steps_to_json(&pipeline.steps))
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user