Guaranteed Cron Scheduling
Traditional cron systems suffer from a fundamental reliability problem: they’re fire-and-forget. When a cron daemon executes a scheduled job, it simply spawns a process and moves on. If that process fails to start, crashes during execution, or gets lost due to system failures, there’s no mechanism to detect or recover from these failures. This creates gaps in scheduled execution that can be particularly problematic for critical business processes.
What “Guaranteed Execution” Means
In Cloacina’s context, guaranteed execution means:
- Every scheduled execution leaves a durable audit trail before handoff
- Failed handoffs can be detected and recovered automatically
- No executions are silently lost due to system failures
- Complete observability into the scheduling and execution pipeline
- Atomic claiming prevents duplicate executions across multiple instances
The system doesn’t guarantee that workflows will complete successfully (business logic can still fail), but it guarantees that scheduled executions won’t be lost in the handoff process between the scheduler and executor.
Cloacina implements a two-phase commit pattern that creates execution intent records before handing off to the pipeline executor. This pattern ensures durability and enables recovery.
sequenceDiagram
participant CS as CronScheduler
participant DB as Database
participant PE as PipelineExecutor
Note over CS,PE: Two-Phase Commit Pattern
CS->>DB: 1. Create audit record (Phase 1)
Note over DB: Execution intent recorded<br/>pipeline_execution_id = NULL
CS->>PE: 2. Execute workflow
alt Successful Handoff
PE-->>CS: Return pipeline_execution_id
CS->>DB: 3. Complete audit trail (Phase 2)
Note over DB: Link pipeline_execution_id<br/>Execution guaranteed
else Handoff Failure
Note over CS: Audit record remains incomplete<br/>Recovery will detect this
end
Before any workflow execution begins, the system creates a durable audit record:
// From CronScheduler::process_schedule()
let audit_record_id = match self.create_execution_audit(schedule.id, scheduled_time) {
Ok(id) => id,
Err(e) => {
error!("Failed to create execution audit for schedule {} at {}: {}",
schedule.id, scheduled_time, e);
continue; // This execution is lost, but we continue with others
}
};
The audit record creation uses the NewCronExecution::new() pattern:
pub fn new(schedule_id: UniversalUuid, scheduled_time: UniversalTimestamp) -> Self {
Self {
id: Some(UniversalUuid::new_v4()),
schedule_id,
pipeline_execution_id: None, // Will be set after successful handoff
scheduled_time,
claimed_at: None, // Will be set by DAL layer (database-specific)
created_at: None, // Will be set by DAL layer (database-specific)
updated_at: None, // Will be set by DAL layer (database-specific)
}
}
This creates a record with execution intent but no pipeline execution ID, establishing a “claim” on the scheduled time.
After successful handoff to the pipeline executor, the audit record is completed:
// From CronScheduler::process_schedule()
match self.execute_workflow(schedule, scheduled_time).await {
Ok(pipeline_execution_id) => {
// Complete audit trail linking
if let Err(e) = self.complete_execution_audit(audit_record_id, pipeline_execution_id) {
error!("Failed to complete audit trail for schedule {} execution: {}",
schedule.id, e);
// Continue - the execution succeeded, just audit completion failed
}
}
Err(e) => {
error!("Failed to execute workflow {} for schedule {}: {}",
schedule.workflow_name, schedule.id, e);
// Note: Audit record exists without pipeline_execution_id
// Recovery service will detect this as a "lost" execution
}
}
The audit trail serves multiple critical functions:
- Duplicate Prevention: The UNIQUE constraint on
(schedule_id, scheduled_time)prevents multiple executions for the same schedule at the same time - Recovery Detection: Records with no
pipeline_execution_idafter a threshold indicate lost executions - Observability: Complete history of what was scheduled versus what actually executed
- Performance Monitoring: Timing data for execution delays and patterns
The database schema is carefully designed to support the guaranteed execution pattern while maintaining cross-database compatibility.
PostgreSQL Implementation:
CREATE TABLE cron_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
schedule_id UUID NOT NULL REFERENCES cron_schedules(id) ON DELETE CASCADE,
pipeline_execution_id UUID REFERENCES pipeline_executions(id) ON DELETE CASCADE,
scheduled_time TIMESTAMP NOT NULL,
claimed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
-- Prevent duplicate executions for the same schedule at the same time
UNIQUE(schedule_id, scheduled_time)
);
SQLite Implementation:
CREATE TABLE cron_executions (
id BLOB PRIMARY KEY NOT NULL,
schedule_id BLOB NOT NULL REFERENCES cron_schedules(id) ON DELETE CASCADE,
pipeline_execution_id BLOB REFERENCES pipeline_executions(id) ON DELETE CASCADE,
scheduled_time TEXT NOT NULL,
claimed_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
-- Prevent duplicate executions for the same schedule at the same time
UNIQUE(schedule_id, scheduled_time)
);
Cloacina uses a UniversalTypes system to handle differences between PostgreSQL and SQLite:
// From models/cron_execution.rs
pub struct CronExecution {
pub id: UniversalUuid,
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: UniversalTimestamp,
pub claimed_at: UniversalTimestamp,
// ...
}
This abstraction allows the same business logic to work across both databases while handling their different type systems (PostgreSQL’s native UUID vs SQLite’s BLOB storage).
The system uses targeted indexes for efficient queries:
-- Index for efficient lookups by schedule
CREATE INDEX idx_cron_executions_schedule
ON cron_executions (schedule_id, scheduled_time DESC);
-- Index for pipeline execution correlation (only for non-null values)
CREATE INDEX idx_cron_executions_pipeline
ON cron_executions (pipeline_execution_id) WHERE pipeline_execution_id IS NOT NULL;
-- Index for time-based queries
CREATE INDEX idx_cron_executions_claimed_at
ON cron_executions (claimed_at DESC);
These indexes support the most common query patterns:
- Finding executions by schedule for history queries
- Correlating pipeline executions back to their cron origins
- Time-based queries for recovery and monitoring
The recovery system implements the detection and retry side of the guaranteed execution pattern.
The recovery service identifies lost executions using a LEFT JOIN query:
flowchart TD
A[Recovery Service Poll] --> B[Query Database]
B --> C{LEFT JOIN Query}
C --> D[cron_executions table]
C --> E[pipeline_executions table]
D --> F{Audit Record EXISTS?}
E --> G{Pipeline Execution EXISTS?}
F -->|Yes| H[Has execution intent]
G -->|No| I[No pipeline execution]
H --> J{Claimed > threshold?}
I --> J
J -->|Yes| K[Lost Execution Detected]
J -->|No| L[Still in progress]
K --> M[Attempt Recovery]
L --> N[Continue monitoring]
// From postgres_dal/cron_execution.rs
pub fn find_lost_executions(
&self,
older_than_minutes: i32,
) -> Result<Vec<CronExecution>, ValidationError> {
let cutoff_time = UniversalTimestamp(
Utc::now() - chrono::Duration::minutes(older_than_minutes as i64)
);
// Find cron executions that don't have corresponding pipeline executions
// and were claimed before the cutoff time
let lost_executions = cron_executions::table
.left_join(
pipeline_executions::table.on(
cron_executions::pipeline_execution_id
.eq(pipeline_executions::id.nullable())
)
)
.filter(pipeline_executions::id.is_null())
.filter(cron_executions::claimed_at.lt(cutoff_time))
.select(cron_executions::all_columns)
.load(&mut conn)?;
Ok(lost_executions)
}
This query finds executions that have audit records but no corresponding pipeline executions, indicating they were claimed but never successfully handed off.
The recovery service implements sophisticated retry logic:
// From cron_recovery.rs
async fn recover_execution(&self, execution: &CronExecution) -> Result<(), PipelineError> {
let execution_age = Utc::now() - execution.scheduled_time();
// Check if execution is too old to recover
if execution_age > chrono::Duration::from_std(self.config.max_recovery_age).unwrap() {
warn!("Execution {} is too old to recover (age: {:?}), abandoning",
execution.id, execution_age);
return Ok(());
}
// Check recovery attempts
let mut attempts = self.recovery_attempts.lock().await;
let attempt_count = attempts.entry(execution.id).or_insert(0);
*attempt_count += 1;
if *attempt_count > self.config.max_recovery_attempts {
error!("Execution {} has exceeded max recovery attempts ({}), abandoning",
execution.id, self.config.max_recovery_attempts);
return Ok(());
}
// ... recovery logic continues
}
Recovery policies include:
- Age limits: Executions older than 24 hours (configurable) are abandoned
- Retry limits: Maximum 3 recovery attempts per execution
- Schedule validation: Only recover for schedules that still exist and are enabled
- Context preservation: Recovery executions include metadata about being recoveries
The system provides comprehensive statistics for monitoring:
// From postgres_dal/cron_execution.rs
pub struct CronExecutionStats {
/// Total number of executions attempted
pub total_executions: i64,
/// Number of executions that successfully handed off to pipeline executor
pub successful_executions: i64,
/// Number of executions that were lost (claimed but never executed)
pub lost_executions: i64,
/// Success rate as a percentage
pub success_rate: f64,
}
These statistics enable monitoring dashboards and alerting systems to track the health of the cron scheduling system.
The complete execution flow demonstrates the sophisticated coordination between components:
flowchart TB
subgraph "1\. Schedule Discovery"
A[Poll for Due Schedules] --> B[Check next_run_at <= now]
B --> C[Atomic Claim & Update]
end
subgraph "2\. Execution Planning"
C --> D[Calculate Execution Times]
D --> E{Catchup Policy?}
E -->|Skip| F[Single Execution]
E -->|RunAll| G[Multiple Executions]
end
subgraph "3\. Two-Phase Commit"
F --> H[Create Audit Record]
G --> H
H --> I[Execute Workflow]
I --> J{Handoff Success?}
J -->|Yes| K[Complete Audit Trail]
J -->|No| L[Audit Record Orphaned]
end
subgraph "4\. Recovery System"
M[Recovery Service] --> N[Find Orphaned Audits]
L -.-> N
N --> O[Retry Execution]
O --> P{Recovery Success?}
P -->|Yes| Q[Update Audit Trail]
P -->|No| R[Retry Later]
end
subgraph "5\. Monitoring"
S[Statistics Collection] --> T[Track Success Rate]
K --> S
Q --> S
R --> S
end
style H fill:#e1f5fe
style L fill:#ffebee
style N fill:#f3e5f5
-
Schedule Discovery
let due_schedules = self.dal.cron_schedule().get_due_schedules(now)?; -
Atomic Claiming
let claimed = self.dal.cron_schedule() .claim_and_update(schedule.id, now, now, next_run)?; if !claimed { debug!("Schedule {} was already claimed by another instance", schedule.id); return Ok(()); } -
Execution Time Calculation (based on catchup policy)
let execution_times = self.calculate_execution_times(schedule, now)?; -
Audit Record Creation (Phase 1 of two-phase commit)
let audit_record_id = self.create_execution_audit(schedule.id, scheduled_time)?; -
Pipeline Handoff
let pipeline_execution_id = self.execute_workflow(schedule, scheduled_time).await?; -
Audit Completion (Phase 2 of two-phase commit)
self.complete_execution_audit(audit_record_id, pipeline_execution_id)?;
The system implements comprehensive error handling that maintains system resilience:
- Schedule claiming failures: Continue with other schedules
- Audit record creation failures: Skip this execution but continue
- Pipeline handoff failures: Leave audit record for recovery detection
- Audit completion failures: Log error but don’t fail the execution
The cron scheduler integrates seamlessly with Cloacina’s existing workflow execution infrastructure:
// From default_runner.rs - CronScheduler creation
let cron_scheduler = CronScheduler::new(
Arc::new(dal),
Arc::new(self.clone()), // DefaultRunner implements PipelineExecutor!
cron_config,
cron_shutdown_rx,
);
This design allows the cron scheduler to leverage all of Cloacina’s existing capabilities:
- Task retry logic
- Dependency resolution
- Context management
- Multi-tenancy support
- Performance monitoring
The recovery system is a critical component that ensures no executions are permanently lost due to system failures. It operates as an independent service within the cron scheduling subsystem.
The recovery service runs on a separate interval from the main scheduler:
// From cron_recovery.rs
impl CronRecovery {
pub async fn run(&self) -> Result<(), PipelineError> {
let mut interval = tokio::time::interval(self.config.recovery_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.recover_lost_executions().await {
error!("Recovery cycle failed: {}", e);
}
}
_ = self.shutdown_rx.recv() => {
info!("Recovery service shutting down");
break;
}
}
}
Ok(())
}
}
The recovery process identifies lost executions using sophisticated criteria:
sequenceDiagram
participant RS as Recovery Service
participant DB as Database
participant CS as CronScheduler
participant PE as PipelineExecutor
Note over RS,PE: Recovery Process Flow
loop Every recovery_interval
RS->>DB: Find lost executions
Note over DB: LEFT JOIN cron_executions<br/>with pipeline_executions
DB-->>RS: Return orphaned audit records
loop For each lost execution
RS->>RS: Validate execution age
RS->>RS: Check retry attempts
RS->>DB: Verify schedule still exists
alt Recovery Criteria Met
RS->>PE: Retry workflow execution
alt Retry Success
PE-->>RS: Return new pipeline_execution_id
RS->>DB: Update audit record
Note over DB: Link to new pipeline execution
else Retry Fails
RS->>RS: Increment retry count
Note over RS: Will retry later if<br/>under attempt limit
end
else Too Old or Max Retries
RS->>RS: Abandon execution
Note over RS: Log abandonment
end
end
end
async fn recover_lost_executions(&self) -> Result<(), PipelineError> {
let threshold_minutes = self.config.lost_threshold_minutes;
let lost_executions = self.dal
.cron_execution()
.find_lost_executions(threshold_minutes)?;
info!("Found {} potentially lost executions", lost_executions.len());
for execution in lost_executions {
if let Err(e) = self.recover_execution(&execution).await {
error!("Failed to recover execution {}: {}", execution.id, e);
}
}
Ok(())
}
The detection uses a LEFT JOIN to find audit records without corresponding pipeline executions:
- Time-based filtering: Only consider executions older than the threshold (default: 5 minutes)
- Null check: Find cron_executions where pipeline_execution_id is NULL
- Age verification: Ensure enough time has passed to confidently declare them “lost”
When recovering a lost execution, the system preserves the original schedule context while adding recovery metadata:
async fn recover_execution(&self, execution: &CronExecution) -> Result<(), PipelineError> {
// Validate the schedule still exists and is enabled
let schedule = match self.dal.cron_schedule().get_by_id(execution.schedule_id)? {
Some(s) if s.enabled => s,
Some(_) => {
debug!("Schedule {} is disabled, skipping recovery", execution.schedule_id);
return Ok(());
}
None => {
warn!("Schedule {} no longer exists, skipping recovery", execution.schedule_id);
return Ok(());
}
};
// Create recovery context with additional metadata
let mut context = Context::new();
context.insert("is_recovery", json!(true))?;
context.insert("original_scheduled_time", json!(execution.scheduled_time()))?;
context.insert("recovery_attempt_time", json!(Utc::now()))?;
context.insert("execution_id", json!(execution.id))?;
// Execute the workflow through the pipeline
match self.executor.execute_workflow(&schedule.workflow_name, context).await {
Ok(pipeline_execution_id) => {
// Link the recovered execution to the pipeline execution
self.dal.cron_execution().update_pipeline_execution_id(
execution.id,
pipeline_execution_id,
)?;
info!("Successfully recovered execution {} for schedule {}",
execution.id, execution.schedule_id);
}
Err(e) => {
error!("Recovery execution failed for {}: {}", execution.id, e);
return Err(e);
}
}
Ok(())
}
The recovery system implements several policies to prevent runaway recovery attempts:
Age Limits:
let execution_age = Utc::now() - execution.scheduled_time();
if execution_age > self.config.max_recovery_age {
warn!("Execution {} is too old to recover, abandoning", execution.id);
return Ok(());
}
Retry Limits:
let mut attempts = self.recovery_attempts.lock().await;
let attempt_count = attempts.entry(execution.id).or_insert(0);
*attempt_count += 1;
if *attempt_count > self.config.max_recovery_attempts {
error!("Execution {} has exceeded max recovery attempts, abandoning", execution.id);
return Ok(());
}
Schedule Validation:
- Only recover executions for schedules that still exist
- Skip recovery for disabled schedules
- Validate workflow names still exist in the system
The recovery system provides detailed metrics for operational monitoring:
pub struct RecoveryStats {
pub total_recovery_attempts: u64,
pub successful_recoveries: u64,
pub failed_recoveries: u64,
pub abandoned_recoveries: u64,
pub recovery_success_rate: f64,
}
These metrics enable operators to:
- Track recovery effectiveness
- Identify patterns in lost executions
- Tune recovery parameters for optimal performance
- Set up alerting for high recovery rates (indicating system issues)
Several key design decisions make this architecture particularly robust:
The separation between scheduling (CronScheduler) and execution (PipelineExecutor) follows the saga pattern, where each service has clear responsibilities and compensation actions. This prevents the complexity that would arise from trying to handle both concerns in a single component.
By creating audit records in the database before any execution, the system ensures that no execution intent is lost even in the face of process crashes or network failures. This is more reliable than in-memory coordination mechanisms.
The claim_and_update operation atomically updates both the last_run_at and next_run_at timestamps, preventing race conditions between multiple scheduler instances while providing clear semantics for when a schedule was last processed.
The recovery service runs independently of the main scheduler, providing resilience even if the scheduler itself fails. This separation of concerns allows for different retry policies and prevents recovery logic from complicating the main scheduling loop.
The sophisticated handling of timestamps (PostgreSQL using database defaults, SQLite using application-level timestamps) ensures consistent behavior across different database backends while leveraging each database’s strengths.
This architecture demonstrates how sophisticated distributed systems patterns can be applied to solve the fundamental reliability problems in cron-based scheduling, providing enterprise-grade guarantees while maintaining operational simplicity.