Workflow Versioning
Cloacina’s workflow versioning system is a core feature that tracks changes to workflows and their constituent tasks. The system is agnostic to what these changes mean - it simply detects and records them. Consumers of Cloacina can then decide how to handle these changes based on their specific needs.
In workflow systems, versioning is crucial for:
- Change Detection: Identifying when workflows or their components have changed
- State Tracking: Maintaining a history of workflow modifications
- Consumer Control: Allowing consumers to implement their own versioning policies
- Audit Trail: Providing a record of workflow evolution
Cloacina generates unique versions for each workflow based on:
-
Workflow Structure cloacina::workflow::Workflow
- Task IDs and their relationships
- Dependency graph topology
- Task ordering
- Implementation detail: The structure is hashed using a deterministic algorithm that considers task IDs and their dependencies
fn hash_topology(&self, hasher: &mut DefaultHasher) { // Get tasks in deterministic order let mut task_ids: Vec<_> = self.tasks.keys().collect(); task_ids.sort(); for task_id in task_ids { task_id.hash(hasher); // Include dependencies in deterministic order let mut deps: Vec<_> = self.tasks[task_id].dependencies().to_vec(); deps.sort(); deps.hash(hasher); } }
-
Task Definitions cloacina::task::Task
- Task implementation code (fingerprints)
- Task dependencies
- Task configuration
- Implementation detail: Task definitions are hashed using a deterministic algorithm that considers task metadata and code fingerprints
fn hash_task_definitions(&self, hasher: &mut DefaultHasher) { // Get tasks in deterministic order let mut task_ids: Vec<_> = self.tasks.keys().collect(); task_ids.sort(); for task_id in task_ids { let task = &self.tasks[task_id]; // Hash task metadata task.id().hash(hasher); task.dependencies().hash(hasher); // Hash task code fingerprint (if available) if let Some(code_hash) = self.get_task_code_hash(task_id) { code_hash.hash(hasher); } } }
- Task fingerprints are calculated by:
fn calculate_function_fingerprint(func: &ItemFn) -> String { let mut hasher = DefaultHasher::new(); // Hash function signature (excluding name) func.sig.inputs.iter().for_each(|input| { if let syn::FnArg::Typed(pat_type) = input { quote::quote!(#pat_type).to_string().hash(&mut hasher); } }); // Hash return type quote::quote!(#(&func.sig.output)).to_string().hash(&mut hasher); // Hash function body (this is the key part for detecting changes) let body_tokens = quote::quote!(#(&func.block)).to_string(); body_tokens.hash(&mut hasher); // Include async info func.sig.asyncness.is_some().hash(&mut hasher); format!("{:016x}", hasher.finish()) }
-
Workflow Configuration cloacina::workflow::WorkflowMetadata
- Workflow name and description
- Workflow tags
- Global settings
- Implementation detail: Configuration is hashed using a deterministic algorithm that considers all metadata fields
fn hash_configuration(&self, hasher: &mut DefaultHasher) { // Hash Workflow-level configuration (excluding version and timestamps) self.name.hash(hasher); self.metadata.description.hash(hasher); // Hash tags in deterministic order let mut tags: Vec<_> = self.metadata.tags.iter().collect(); tags.sort_by_key(|(k, _)| *k); tags.hash(hasher); }
The final version is calculated by combining all three components:
pub fn calculate_version(&self) -> String {
let mut hasher = DefaultHasher::new();
// 1. Hash Workflow structure (topology)
self.hash_topology(&mut hasher);
// 2. Hash task definitions
self.hash_task_definitions(&mut hasher);
// 3. Hash Workflow configuration
self.hash_configuration(&mut hasher);
// Return hex representation of hash
format!("{:016x}", hasher.finish())
}
The workflow version hash changes when any of these components change:
-
Task Implementation
- Function body changes
- Function signature changes (parameters, return type)
- Async status changes
- Task dependencies changes
-
Workflow Structure
- Task IDs added or removed
- Task dependency relationships changed
- Task ordering changed
- Task configuration changes
-
Workflow Configuration
- Workflow name changes
- Description changes
- Tags added, removed, or modified
- Global settings changes
The version hash is deterministic - the same workflow configuration will always produce the same hash, making it reliable for change detection and version tracking.
Workflow versions are:
- Stored in the database with each pipeline execution
- Tracked using content-based hashing
- Made available to consumers through the API
Example of version tracking in the database:
CREATE TABLE pipeline_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_name VARCHAR NOT NULL,
pipeline_version VARCHAR NOT NULL, -- Content-based version hash
status VARCHAR NOT NULL CHECK (status IN ('Pending', 'Running', 'Completed', 'Failed', 'Cancelled')),
context_id UUID REFERENCES contexts(id),
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error_details TEXT,
recovery_attempts INTEGER DEFAULT 0 NOT NULL,
last_recovery_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
-- Indexes for efficient querying
CREATE INDEX pipeline_executions_name_version_idx ON pipeline_executions(pipeline_name, pipeline_version);
CREATE INDEX pipeline_executions_status_idx ON pipeline_executions(status);
Consumers can detect version changes using the following pattern:
let current_version = workflow.metadata().version.clone();
let last_version = dal.pipeline_execution().get_last_version(workflow_name)?;
if last_version.as_ref() != Some(¤t_version) {
// Version has changed - handle as needed
info!(
"Workflow '{}' version changed: {} -> {}",
workflow_name,
last_version.unwrap_or_else(|| "none".to_string()),
current_version
);
}
This pattern allows consumers to detect when a workflow’s version has changed, indicating modifications to the workflow’s structure, task implementations, or configuration.
Cloacina’s workflow versioning system uses content-based hashing to track changes to workflows. The version hash is calculated from the workflow’s structure, task definitions, and configuration, providing a reliable way to detect when any of these components change. This deterministic versioning system enables consumers to detect changes while remaining agnostic to how those changes are handled.