007_sidecar, 008_processing, 009_duplicate_groups migrations. Tag, sidecar, job, batch, plugin, pipeline, duplicate repos.
127 lines
3.8 KiB
Rust
127 lines
3.8 KiB
Rust
use crate::db::PgPool;
|
|
use async_trait::async_trait;
|
|
use domain::{
|
|
entities::{PipelineStep, ProcessingPipeline},
|
|
errors::DomainError,
|
|
ports::PipelineRepository,
|
|
value_objects::{MetadataValue, StructuredData, SystemId},
|
|
};
|
|
use uuid::Uuid;
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct PipelineRow {
|
|
pipeline_id: Uuid,
|
|
trigger_event: String,
|
|
steps: serde_json::Value,
|
|
}
|
|
|
|
#[derive(serde::Serialize, serde::Deserialize)]
|
|
struct StepJson {
|
|
plugin_id: Uuid,
|
|
step_order: u32,
|
|
configuration: serde_json::Map<String, serde_json::Value>,
|
|
}
|
|
|
|
fn steps_from_json(v: serde_json::Value) -> Vec<PipelineStep> {
|
|
let arr: Vec<StepJson> = serde_json::from_value(v).unwrap_or_default();
|
|
arr.into_iter()
|
|
.map(|s| {
|
|
let mut config = StructuredData::new();
|
|
for (k, val) in s.configuration {
|
|
config.insert(k, MetadataValue::from(val));
|
|
}
|
|
PipelineStep {
|
|
plugin_id: SystemId::from_uuid(s.plugin_id),
|
|
step_order: s.step_order,
|
|
configuration: config,
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn steps_to_json(steps: &[PipelineStep]) -> serde_json::Value {
|
|
let arr: Vec<StepJson> = steps
|
|
.iter()
|
|
.map(|s| {
|
|
let config: serde_json::Map<String, serde_json::Value> = s
|
|
.configuration
|
|
.inner()
|
|
.iter()
|
|
.map(|(k, v)| (k.clone(), serde_json::Value::from(v)))
|
|
.collect();
|
|
StepJson {
|
|
plugin_id: *s.plugin_id.as_uuid(),
|
|
step_order: s.step_order,
|
|
configuration: config,
|
|
}
|
|
})
|
|
.collect();
|
|
serde_json::to_value(arr).unwrap_or(serde_json::Value::Array(vec![]))
|
|
}
|
|
|
|
impl From<PipelineRow> for ProcessingPipeline {
|
|
fn from(r: PipelineRow) -> Self {
|
|
Self {
|
|
pipeline_id: SystemId::from_uuid(r.pipeline_id),
|
|
trigger_event: r.trigger_event,
|
|
steps: steps_from_json(r.steps),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct PostgresPipelineRepository {
|
|
pool: PgPool,
|
|
}
|
|
|
|
impl PostgresPipelineRepository {
|
|
pub fn new(pool: PgPool) -> Self {
|
|
Self { pool }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PipelineRepository for PostgresPipelineRepository {
|
|
async fn find_by_id(&self, id: &SystemId) -> Result<Option<ProcessingPipeline>, DomainError> {
|
|
let row = sqlx::query_as::<_, PipelineRow>(
|
|
"SELECT pipeline_id, trigger_event, steps
|
|
FROM processing_pipelines WHERE pipeline_id = $1",
|
|
)
|
|
.bind(*id.as_uuid())
|
|
.fetch_optional(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
|
|
Ok(row.map(Into::into))
|
|
}
|
|
|
|
async fn find_by_trigger(&self, event: &str) -> Result<Vec<ProcessingPipeline>, DomainError> {
|
|
let rows = sqlx::query_as::<_, PipelineRow>(
|
|
"SELECT pipeline_id, trigger_event, steps
|
|
FROM processing_pipelines WHERE trigger_event = $1",
|
|
)
|
|
.bind(event)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(Into::into).collect())
|
|
}
|
|
|
|
async fn save(&self, pipeline: &ProcessingPipeline) -> Result<(), DomainError> {
|
|
sqlx::query(
|
|
"INSERT INTO processing_pipelines (pipeline_id, trigger_event, steps)
|
|
VALUES ($1, $2, $3)
|
|
ON CONFLICT (pipeline_id) DO UPDATE SET
|
|
trigger_event = EXCLUDED.trigger_event,
|
|
steps = EXCLUDED.steps",
|
|
)
|
|
.bind(*pipeline.pipeline_id.as_uuid())
|
|
.bind(&pipeline.trigger_event)
|
|
.bind(steps_to_json(&pipeline.steps))
|
|
.execute(&self.pool)
|
|
.await
|
|
.map_err(|e| DomainError::Internal(e.to_string()))?;
|
|
Ok(())
|
|
}
|
|
}
|