Dispatcher Architecture
Cloacina uses a dispatcher architecture to decouple task scheduling from task execution. This enables pluggable execution backends - you can implement custom executors for Kubernetes jobs, serverless functions, remote workers, or any other execution environment.
flowchart TB
subgraph Scheduler["Task Scheduler"]
SM[State Manager]
SL[Scheduler Loop]
end
subgraph Dispatcher["Dispatcher"]
R[Router]
D[DefaultDispatcher]
end
subgraph Executors["Executors"]
THREAD[ThreadTaskExecutor]
K8S[K8sExecutor]
LAMBDA[LambdaExecutor]
CUSTOM[CustomExecutor]
end
SM -->|TaskReadyEvent| D
D -->|route| R
R -->|dispatch| THREAD
R -->|dispatch| K8S
R -->|dispatch| LAMBDA
R -->|dispatch| CUSTOM
THREAD -->|ExecutionResult| D
K8S -->|ExecutionResult| D
LAMBDA -->|ExecutionResult| D
CUSTOM -->|ExecutionResult| D
D -->|state update| DB[(Database)]
| Component | Purpose |
|---|---|
| TaskReadyEvent | Event emitted when a task becomes ready for execution |
| Dispatcher | Routes events to appropriate executors based on configuration |
| Router | Pattern-matching engine for task-to-executor routing |
| TaskExecutor | Trait implemented by execution backends |
| ExecutionResult | Outcome of task execution (success, failure, retry) |
The Dispatcher trait defines the interface for routing task events:
pub trait Dispatcher: Send + Sync {
/// Dispatch a task-ready event to an appropriate executor.
fn dispatch(&self, event: TaskReadyEvent) -> Result<(), DispatchError>;
/// Register an executor with a given key.
fn register_executor(&self, key: &str, executor: Arc<dyn TaskExecutor>);
/// Check if any registered executor has capacity.
fn has_capacity(&self) -> bool;
/// Resolve which executor key should handle a task.
fn resolve_executor_key(&self, task_namespace: &str) -> String;
}
To implement a custom executor, implement the TaskExecutor trait:
pub trait TaskExecutor: Send + Sync {
/// Execute a task and return the result.
fn execute(&self, event: TaskReadyEvent) -> Result<ExecutionResult, DispatchError>;
/// Check if this executor has capacity for more tasks.
fn has_capacity(&self) -> bool;
/// Get current executor metrics.
fn metrics(&self) -> ExecutorMetrics;
/// Get the executor's name for logging/debugging.
fn name(&self) -> &str;
}
When the scheduler determines a task is ready, it emits a TaskReadyEvent:
pub struct TaskReadyEvent {
/// The pipeline execution this task belongs to
pub pipeline_execution_id: UniversalUuid,
/// Unique identifier for this task execution record
pub task_execution_id: UniversalUuid,
/// Full task namespace (e.g., "public::embedded::workflow::task_name")
pub task_namespace: String,
/// Current attempt number (1-based)
pub attempt: i32,
/// Maximum allowed attempts
pub max_attempts: i32,
}
Note: The event does not include context data. Executors should load context from the database at execution time to ensure they have the latest state.
Here’s a template for implementing a custom executor:
use cloacina::dispatcher::{
TaskExecutor, TaskReadyEvent, ExecutionResult, ExecutionStatus,
ExecutorMetrics, DispatchError,
};
use std::sync::atomic::{AtomicU64, Ordering};
pub struct MyCustomExecutor {
name: String,
max_concurrent: usize,
active_tasks: AtomicU64,
total_executed: AtomicU64,
total_failed: AtomicU64,
// Your custom fields here (client connections, config, etc.)
}
impl MyCustomExecutor {
pub fn new(name: &str, max_concurrent: usize) -> Self {
Self {
name: name.to_string(),
max_concurrent,
active_tasks: AtomicU64::new(0),
total_executed: AtomicU64::new(0),
total_failed: AtomicU64::new(0),
}
}
}
impl TaskExecutor for MyCustomExecutor {
fn execute(&self, event: TaskReadyEvent) -> Result<ExecutionResult, DispatchError> {
self.active_tasks.fetch_add(1, Ordering::SeqCst);
// 1. Load context from database using event.pipeline_execution_id
// 2. Resolve the task implementation from registry
// 3. Execute the task in your custom environment
// 4. Handle success/failure and update database
let result = match self.run_task(&event) {
Ok(()) => {
self.total_executed.fetch_add(1, Ordering::SeqCst);
ExecutionResult {
task_execution_id: event.task_execution_id,
status: ExecutionStatus::Completed,
error_message: None,
should_retry: false,
}
}
Err(e) => {
self.total_failed.fetch_add(1, Ordering::SeqCst);
let should_retry = event.attempt < event.max_attempts;
ExecutionResult {
task_execution_id: event.task_execution_id,
status: if should_retry {
ExecutionStatus::Retry
} else {
ExecutionStatus::Failed
},
error_message: Some(e.to_string()),
should_retry,
}
}
};
self.active_tasks.fetch_sub(1, Ordering::SeqCst);
Ok(result)
}
fn has_capacity(&self) -> bool {
self.active_tasks.load(Ordering::SeqCst) < self.max_concurrent as u64
}
fn metrics(&self) -> ExecutorMetrics {
ExecutorMetrics {
active_tasks: self.active_tasks.load(Ordering::SeqCst),
total_executed: self.total_executed.load(Ordering::SeqCst),
total_failed: self.total_failed.load(Ordering::SeqCst),
}
}
fn name(&self) -> &str {
&self.name
}
}
The dispatcher uses glob-pattern routing to direct tasks to executors:
use cloacina::dispatcher::{RoutingConfig, RoutingRule};
// Route ML tasks to GPU executor, everything else to default
let routing = RoutingConfig::new("default")
.with_rule(RoutingRule::new("*::ml::*", "gpu"))
.with_rule(RoutingRule::new("*::batch::*", "k8s"));
| Pattern | Matches |
|---|---|
* |
Single path segment |
** |
Multiple path segments |
exact |
Exact match |
Examples:
*::ml::*matchespublic::ml::train,tenant::ml::inferencebatch::**matchesbatch::jobs::daily,batch::jobs::hourly::cleanuppublic::embedded::my_workflow::*matches all tasks inmy_workflow
Register executors with the dispatcher before starting the runner:
use cloacina::runner::DefaultRunner;
use cloacina::dispatcher::{RoutingConfig, RoutingRule};
// Create runner with custom routing
let runner = DefaultRunner::builder()
.database_url("postgresql://localhost/cloacina")
.routing_config(
RoutingConfig::new("default")
.with_rule(RoutingRule::new("*::gpu::*", "gpu"))
)
.build()
.await?;
// The default ThreadTaskExecutor is registered automatically as "default"
// Register additional executors for custom routing targets
- Scheduler evaluates task dependencies and trigger rules
- State Manager marks task as Ready and emits
TaskReadyEvent - Dispatcher receives event and routes via
Router - Router matches task namespace against rules, returns executor key
- Executor receives event, executes task, returns
ExecutionResult - Dispatcher processes result, updates database state
sequenceDiagram
participant S as Scheduler
participant D as Dispatcher
participant R as Router
participant E as Executor
participant DB as Database
S->>DB: Mark task Ready
S->>D: TaskReadyEvent
D->>R: resolve_executor_key(namespace)
R-->>D: "gpu"
D->>E: execute(event)
E->>DB: Load context
E->>E: Run task
E->>DB: Save context
E-->>D: ExecutionResult
D->>DB: Update task state
The dispatcher handles errors at multiple levels:
| Error Type | Handling |
|---|---|
| DispatchError::NoExecutor | No executor registered for routing target |
| DispatchError::NoCapacity | All executors at capacity (task stays Ready) |
| DispatchError::ExecutionFailed | Task execution failed (retry or fail based on policy) |
- Idempotency: Design tasks to be idempotent since they may be retried
- Context Loading: Always load fresh context at execution time
- Metrics: Track active tasks, success/failure counts for observability
- Capacity: Implement
has_capacity()accurately to prevent overload - Timeouts: Implement execution timeouts in your executor
- Error Messages: Return descriptive error messages for debugging
A K8s executor might:
- Create a Kubernetes Job spec from the task configuration
- Submit the job to the cluster
- Wait for completion (or timeout)
- Retrieve logs and results
- Return appropriate
ExecutionResult
impl TaskExecutor for K8sExecutor {
fn execute(&self, event: TaskReadyEvent) -> Result<ExecutionResult, DispatchError> {
// Create job spec
let job = self.create_job_spec(&event)?;
// Submit to K8s
let job_name = self.k8s_client.create_job(job).await?;
// Wait for completion
match self.wait_for_job(&job_name, self.timeout).await {
Ok(()) => Ok(ExecutionResult::success(event.task_execution_id)),
Err(e) => Ok(ExecutionResult::failed(
event.task_execution_id,
e.to_string(),
event.attempt < event.max_attempts,
)),
}
}
// ...
}
- Task Execution Sequence - Detailed task lifecycle
- Guaranteed Execution Architecture - Reliability guarantees
- Performance Characteristics - Tuning executors