Task Execution
Cloacina implements a robust task execution system that ensures reliable, concurrent execution of workflow tasks while maintaining proper dependency ordering and state management. This article explores the complete lifecycle of a task from initialization to completion.
Tasks progress through a well-defined set of states during their execution:
stateDiagram-v2 [*] --> NotStarted NotStarted --> Ready: Dependencies Satisfied Ready --> Running: Claimed by Executor Running --> Completed: Success Running --> Failed: Error Running --> Ready: Retry Needed Failed --> Ready: Retry Policy Allows Completed --> [*] Failed --> [*]: Max Retries Exceeded
When a workflow is scheduled for execution, tasks are initialized in a specific order:
async fn initialize_task_executions(
&self,
pipeline_execution_id: Uuid,
workflow: &Workflow,
) -> Result<(), ValidationError> {
let task_ids = workflow.topological_sort()?;
for task_id in task_ids {
let new_task = NewTaskExecution {
pipeline_execution_id: UniversalUuid(pipeline_execution_id),
task_name: task_id,
status: "NotStarted".to_string(),
attempt: 1,
max_attempts,
trigger_rules: trigger_rules.to_string(),
task_configuration: task_configuration.to_string(),
};
self.dal.task_execution().create(new_task)?;
}
Ok(())
}
Key aspects of initialization:
- Tasks are created in topological order (respecting dependencies)
- Each task starts in “NotStarted” state
- Initial attempt counter is set to 1
- Retry policy and trigger rules are configured
- Task configuration is stored for execution
The system uses a semaphore-based approach to control concurrent task execution:
pub struct ExecutorConfig {
pub max_concurrent_tasks: usize, // Default: 4
pub poll_interval: Duration, // Default: 1 second
pub task_timeout: Duration, // Default: 5 minutes
}
sequenceDiagram participant EX as TaskExecutor participant DB as Database participant Task as Task EX ->> DB: Poll for ready tasks DB -->> EX: Return ready task EX ->> EX: Acquire semaphore permit EX ->> Task: Execute task Task -->> EX: Return result EX ->> DB: Update task status EX ->> EX: Release semaphore permit
The execution process:
- Task executor polls for ready tasks
- Claims a task using a semaphore for concurrency control
- Executes the task in a background Tokio task
- Updates task status based on execution result
- Releases the semaphore permit
Each task runs in its own isolated Tokio task, providing several important benefits:
// Execute task in background with pre-loaded context
tokio::spawn(async move {
let _permit = permit; // Hold permit until task completes
info!(
"Executing task with pre-loaded context: {} (attempt {})",
claimed_task.task_name, claimed_task.attempt
);
if let Err(e) = executor
.execute_claimed_task_with_context(claimed_task, preloaded_context)
.await
{
error!("Task execution failed: {}", e);
}
});
Key aspects of task isolation:
-
Independent Execution Context:
- Each task runs in its own Tokio task
- Tasks can’t interfere with each other’s execution
- Memory and state are isolated between tasks
-
Resource Management:
- Semaphore controls total concurrent tasks
- Each task holds a permit until completion
- Resources are automatically released on task completion
-
Error Containment:
- Task failures are isolated
- Panics in one task don’t affect others
- Error handling is task-specific
-
Timeout Protection:
match tokio::time::timeout(self.config.task_timeout, task.execute(context)).await { Ok(result) => result.map_err(ExecutorError::TaskExecution), Err(_) => Err(ExecutorError::TaskTimeout), }
- Each task has its own timeout
- Long-running tasks are automatically terminated
- System resources are protected from runaway tasks
-
State Management:
- Task state is persisted in the database
- Context is isolated per task
- Dependencies are managed through database state
This isolation ensures that tasks can execute safely in parallel without interfering with each other, while still maintaining proper coordination through the database and semaphore controls.
Tasks don’t execute in isolation. The system manages dependencies through:
async fn check_task_dependencies(
&self,
task_execution: &TaskExecution,
) -> Result<bool, ValidationError> {
let dependencies = workflow.get_dependencies(&task_execution.task_name)?;
// Batch fetch all dependency statuses
let status_map = self.dal.task_execution()
.get_task_statuses_batch(
task_execution.pipeline_execution_id,
dependency_names
)?;
// Check all dependencies are in terminal states
let all_satisfied = dependencies.iter().all(|dependency| {
status_map.get(dependency)
.map(|status| matches!(status.as_str(),
"Completed" | "Failed" | "Skipped"))
.unwrap_or(false)
});
Ok(all_satisfied)
}
To prevent race conditions in task execution, particularly with retrying tasks, the system implements several safety mechanisms:
async fn claim_task_for_execution(
&self,
task_execution: &TaskExecution,
) -> Result<Option<ClaimedTask>, ExecutorError> {
// Start a transaction to ensure atomicity
let mut transaction = self.dal.begin_transaction().await?;
// Re-check dependencies within transaction
let dependencies_satisfied = self.check_task_dependencies_within_tx(
&mut transaction,
task_execution
).await?;
if !dependencies_satisfied {
return Ok(None);
}
// Claim the task atomically
let claimed_task = self.dal.task_execution()
.claim_task_within_tx(
&mut transaction,
task_execution.id,
self.executor_id
).await?;
// Commit the transaction
transaction.commit().await?;
Ok(Some(claimed_task))
}
The system prevents race conditions through:
-
Atomic Dependency Checking:
- Dependencies are re-checked within a transaction
- Prevents “greedy” executors from picking up tasks too early
- Ensures all upstream tasks are truly complete
-
Transaction-based Task Claiming:
- Task claiming is atomic with dependency verification
- Prevents multiple executors from claiming the same task
- Maintains consistency even with retrying tasks
-
Status Verification:
- Only terminal states (“Completed”, “Failed”, “Skipped”) are considered valid
- “Ready” state is not considered a valid completion state
- Prevents premature execution of downstream tasks
-
Retry-Aware Dependency Checking:
- System tracks retry attempts and states
- Downstream tasks wait for final resolution
- Prevents execution before retry attempts are exhausted
This ensures that even if a task is marked as “Ready” for retry, downstream tasks won’t execute until the retrying task reaches a terminal state (either “Completed” or permanently “Failed”).
{{% notice warning %}} Race Condition Prevention: The system’s transaction-based approach prevents a common race condition where an executor might try to pick up a downstream task before its upstream dependencies (including retrying tasks) have properly completed. This is achieved through atomic dependency checking and strict state requirements that only consider terminal states as valid for dependency satisfaction. {{% /notice %}}
Each task execution maintains its own context:
-
Initial Context:
- Loaded from parent pipeline execution
- Contains workflow-level variables
- Includes scheduled time and metadata
-
Dependency Context:
- Merged from completed dependency tasks
- Preserves task outputs
- Maintains execution history
-
Task Context:
- Task-specific configuration
- Runtime variables
- Execution metadata
The system implements comprehensive error handling:
async fn handle_task_result(
&self,
claimed_task: ClaimedTask,
result: Result<Context<serde_json::Value>, ExecutorError>,
) -> Result<(), ExecutorError> {
match result {
Ok(result_context) => {
// Complete task successfully
self.complete_task_transaction(&claimed_task, result_context).await?;
}
Err(error) => {
// Check retry policy
if self.should_retry_task(&claimed_task, &error, &retry_policy).await? {
self.schedule_task_retry(&claimed_task, &retry_policy).await?;
} else {
self.mark_task_failed(claimed_task.task_execution_id, &error).await?;
}
}
}
Ok(())
}
When a task fails, the system evaluates whether to retry the task based on the configured retry policy. This is why tasks can transition from “Failed” back to “Ready” state:
pub struct RetryPolicy {
pub max_attempts: u32, // Maximum number of retry attempts
pub initial_delay: Duration, // Initial delay before first retry
pub max_delay: Duration, // Maximum delay between retries
pub backoff_factor: f64, // Exponential backoff multiplier
pub retry_on_errors: Vec<String> // Specific error types to retry on
}
The retry process works as follows:
-
Failure Detection:
- Task execution fails with an error
- System captures error details and context
-
Retry Evaluation:
- Checks if error type is retryable
- Verifies attempt count against max_attempts
- Calculates next retry delay using backoff
-
State Transition:
- If retry is allowed: Failed → Ready
- If no more retries: Failed → [Terminal]
-
Retry Scheduling:
- Task is marked as Ready for next attempt
- Backoff delay is applied before execution
- Attempt counter is incremented
This retry mechanism ensures that transient failures (like network issues or temporary resource unavailability) don’t permanently fail tasks, while still maintaining control over retry behavior through configurable policies.
When a task completes:
-
Status Update:
- Task status is updated to terminal state
- Execution time and duration are recorded
- Error details are preserved if applicable
-
Context Persistence:
- Task output context is stored
- Available for dependent tasks
- Maintains execution history
-
Pipeline Progress:
- Dependent tasks are evaluated for readiness
- Pipeline completion is checked
- Final pipeline status is updated
The system includes robust recovery features:
-
Orphaned Task Detection:
- Identifies tasks stuck in “Running” state
- Implements timeout-based recovery
- Maintains execution consistency
-
Retry Management:
- Configurable retry policies
- Exponential backoff support
- Maximum attempt limits
-
Error Recovery:
- Detailed error tracking
- Recovery event logging
- Pipeline-level error handling
The task execution system is optimized for:
-
Concurrency:
- Controlled parallel execution
- Resource-aware scheduling
- Efficient task claiming
-
Database Operations:
- Batch status updates
- Efficient context loading
- Optimized dependency checks
-
Resource Management:
- Configurable timeouts
- Memory-efficient context handling
- Controlled retry behavior
Cloacina’s task execution system provides a robust, reliable framework for executing complex workflows. Through careful state management, dependency handling, and error recovery, it ensures that tasks execute in the correct order while maintaining system stability and performance.
The system’s design emphasizes:
- Reliability through retry mechanisms
- Efficiency through batch operations
- Observability through detailed status tracking
- Flexibility through configurable trigger rules
- Resilience through comprehensive error handling