Guaranteed Cron Scheduling
Traditional cron systems suffer from a fundamental reliability problem: they’re fire-and-forget. When a cron daemon executes a scheduled job, it simply spawns a process and moves on. If that process fails to start, crashes during execution, or gets lost due to system failures, there’s no mechanism to detect or recover from these failures. This creates gaps in scheduled execution that can be particularly problematic for critical business processes.
What “Guaranteed Execution” Means
In Cloacina’s context, guaranteed execution means:
- Every scheduled execution leaves a durable audit trail before handoff
- Failed handoffs can be detected and recovered automatically
- No executions are silently lost due to system failures
- Complete observability into the scheduling and execution pipeline
- Atomic claiming prevents duplicate executions across multiple instances
The system doesn’t guarantee that workflows will complete successfully (business logic can still fail), but it guarantees that scheduled executions won’t be lost in the handoff process between the scheduler and executor.
Cloacina implements a two-phase commit pattern that creates execution intent records before handing off to the pipeline executor. This pattern ensures durability and enables recovery.
sequenceDiagram participant CS as CronScheduler participant DB as Database participant PE as PipelineExecutor Note over CS,PE: Two-Phase Commit Pattern CS->>DB: 1. Create audit record (Phase 1) Note over DB: Execution intent recorded<br/>pipeline_execution_id = NULL CS->>PE: 2. Execute workflow alt Successful Handoff PE-->>CS: Return pipeline_execution_id CS->>DB: 3. Complete audit trail (Phase 2) Note over DB: Link pipeline_execution_id<br/>Execution guaranteed else Handoff Failure Note over CS: Audit record remains incomplete<br/>Recovery will detect this end
Before any workflow execution begins, the system creates a durable audit record:
// From CronScheduler::process_schedule()
let audit_record_id = match self.create_execution_audit(schedule.id, scheduled_time) {
Ok(id) => id,
Err(e) => {
error!("Failed to create execution audit for schedule {} at {}: {}",
schedule.id, scheduled_time, e);
continue; // This execution is lost, but we continue with others
}
};
The audit record creation uses the NewCronExecution::new()
pattern:
pub fn new(schedule_id: UniversalUuid, scheduled_time: UniversalTimestamp) -> Self {
Self {
id: Some(UniversalUuid::new_v4()),
schedule_id,
pipeline_execution_id: None, // Will be set after successful handoff
scheduled_time,
claimed_at: None, // Will be set by DAL layer (database-specific)
created_at: None, // Will be set by DAL layer (database-specific)
updated_at: None, // Will be set by DAL layer (database-specific)
}
}
This creates a record with execution intent but no pipeline execution ID, establishing a “claim” on the scheduled time.
After successful handoff to the pipeline executor, the audit record is completed:
// From CronScheduler::process_schedule()
match self.execute_workflow(schedule, scheduled_time).await {
Ok(pipeline_execution_id) => {
// Complete audit trail linking
if let Err(e) = self.complete_execution_audit(audit_record_id, pipeline_execution_id) {
error!("Failed to complete audit trail for schedule {} execution: {}",
schedule.id, e);
// Continue - the execution succeeded, just audit completion failed
}
}
Err(e) => {
error!("Failed to execute workflow {} for schedule {}: {}",
schedule.workflow_name, schedule.id, e);
// Note: Audit record exists without pipeline_execution_id
// Recovery service will detect this as a "lost" execution
}
}
The audit trail serves multiple critical functions:
- Duplicate Prevention: The UNIQUE constraint on
(schedule_id, scheduled_time)
prevents multiple executions for the same schedule at the same time - Recovery Detection: Records with no
pipeline_execution_id
after a threshold indicate lost executions - Observability: Complete history of what was scheduled versus what actually executed
- Performance Monitoring: Timing data for execution delays and patterns
The database schema is carefully designed to support the guaranteed execution pattern while maintaining cross-database compatibility.
PostgreSQL Implementation:
CREATE TABLE cron_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
schedule_id UUID NOT NULL REFERENCES cron_schedules(id) ON DELETE CASCADE,
pipeline_execution_id UUID REFERENCES pipeline_executions(id) ON DELETE CASCADE,
scheduled_time TIMESTAMP NOT NULL,
claimed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
-- Prevent duplicate executions for the same schedule at the same time
UNIQUE(schedule_id, scheduled_time)
);
SQLite Implementation:
CREATE TABLE cron_executions (
id BLOB PRIMARY KEY NOT NULL,
schedule_id BLOB NOT NULL REFERENCES cron_schedules(id) ON DELETE CASCADE,
pipeline_execution_id BLOB REFERENCES pipeline_executions(id) ON DELETE CASCADE,
scheduled_time TEXT NOT NULL,
claimed_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
-- Prevent duplicate executions for the same schedule at the same time
UNIQUE(schedule_id, scheduled_time)
);
Cloacina uses a UniversalTypes
system to handle differences between PostgreSQL and SQLite:
// From models/cron_execution.rs
pub struct CronExecution {
pub id: UniversalUuid,
pub schedule_id: UniversalUuid,
pub pipeline_execution_id: Option<UniversalUuid>,
pub scheduled_time: UniversalTimestamp,
pub claimed_at: UniversalTimestamp,
// ...
}
This abstraction allows the same business logic to work across both databases while handling their different type systems (PostgreSQL’s native UUID vs SQLite’s BLOB storage).
The system uses targeted indexes for efficient queries:
-- Index for efficient lookups by schedule
CREATE INDEX idx_cron_executions_schedule
ON cron_executions (schedule_id, scheduled_time DESC);
-- Index for pipeline execution correlation (only for non-null values)
CREATE INDEX idx_cron_executions_pipeline
ON cron_executions (pipeline_execution_id) WHERE pipeline_execution_id IS NOT NULL;
-- Index for time-based queries
CREATE INDEX idx_cron_executions_claimed_at
ON cron_executions (claimed_at DESC);
These indexes support the most common query patterns:
- Finding executions by schedule for history queries
- Correlating pipeline executions back to their cron origins
- Time-based queries for recovery and monitoring
The recovery system implements the detection and retry side of the guaranteed execution pattern.
The recovery service identifies lost executions using a LEFT JOIN query:
flowchart TD A[Recovery Service Poll] --> B[Query Database] B --> C{LEFT JOIN Query} C --> D[cron_executions table] C --> E[pipeline_executions table] D --> F{Audit Record EXISTS?} E --> G{Pipeline Execution EXISTS?} F -->|Yes| H[Has execution intent] G -->|No| I[No pipeline execution] H --> J{Claimed > threshold?} I --> J J -->|Yes| K[Lost Execution Detected] J -->|No| L[Still in progress] K --> M[Attempt Recovery] L --> N[Continue monitoring]
// From postgres_dal/cron_execution.rs
pub fn find_lost_executions(
&self,
older_than_minutes: i32,
) -> Result<Vec<CronExecution>, ValidationError> {
let cutoff_time = UniversalTimestamp(
Utc::now() - chrono::Duration::minutes(older_than_minutes as i64)
);
// Find cron executions that don't have corresponding pipeline executions
// and were claimed before the cutoff time
let lost_executions = cron_executions::table
.left_join(
pipeline_executions::table.on(
cron_executions::pipeline_execution_id
.eq(pipeline_executions::id.nullable())
)
)
.filter(pipeline_executions::id.is_null())
.filter(cron_executions::claimed_at.lt(cutoff_time))
.select(cron_executions::all_columns)
.load(&mut conn)?;
Ok(lost_executions)
}
This query finds executions that have audit records but no corresponding pipeline executions, indicating they were claimed but never successfully handed off.
The recovery service implements sophisticated retry logic:
// From cron_recovery.rs
async fn recover_execution(&self, execution: &CronExecution) -> Result<(), PipelineError> {
let execution_age = Utc::now() - execution.scheduled_time();
// Check if execution is too old to recover
if execution_age > chrono::Duration::from_std(self.config.max_recovery_age).unwrap() {
warn!("Execution {} is too old to recover (age: {:?}), abandoning",
execution.id, execution_age);
return Ok(());
}
// Check recovery attempts
let mut attempts = self.recovery_attempts.lock().await;
let attempt_count = attempts.entry(execution.id).or_insert(0);
*attempt_count += 1;
if *attempt_count > self.config.max_recovery_attempts {
error!("Execution {} has exceeded max recovery attempts ({}), abandoning",
execution.id, self.config.max_recovery_attempts);
return Ok(());
}
// ... recovery logic continues
}
Recovery policies include:
- Age limits: Executions older than 24 hours (configurable) are abandoned
- Retry limits: Maximum 3 recovery attempts per execution
- Schedule validation: Only recover for schedules that still exist and are enabled
- Context preservation: Recovery executions include metadata about being recoveries
The system provides comprehensive statistics for monitoring:
// From postgres_dal/cron_execution.rs
pub struct CronExecutionStats {
/// Total number of executions attempted
pub total_executions: i64,
/// Number of executions that successfully handed off to pipeline executor
pub successful_executions: i64,
/// Number of executions that were lost (claimed but never executed)
pub lost_executions: i64,
/// Success rate as a percentage
pub success_rate: f64,
}
These statistics enable monitoring dashboards and alerting systems to track the health of the cron scheduling system.
The complete execution flow demonstrates the sophisticated coordination between components:
flowchart TB subgraph "1\. Schedule Discovery" A[Poll for Due Schedules] --> B[Check next_run_at <= now] B --> C[Atomic Claim & Update] end subgraph "2\. Execution Planning" C --> D[Calculate Execution Times] D --> E{Catchup Policy?} E -->|Skip| F[Single Execution] E -->|RunAll| G[Multiple Executions] end subgraph "3\. Two-Phase Commit" F --> H[Create Audit Record] G --> H H --> I[Execute Workflow] I --> J{Handoff Success?} J -->|Yes| K[Complete Audit Trail] J -->|No| L[Audit Record Orphaned] end subgraph "4\. Recovery System" M[Recovery Service] --> N[Find Orphaned Audits] L -.-> N N --> O[Retry Execution] O --> P{Recovery Success?} P -->|Yes| Q[Update Audit Trail] P -->|No| R[Retry Later] end subgraph "5\. Monitoring" S[Statistics Collection] --> T[Track Success Rate] K --> S Q --> S R --> S end style H fill:#e1f5fe style L fill:#ffebee style N fill:#f3e5f5
-
Schedule Discovery
let due_schedules = self.dal.cron_schedule().get_due_schedules(now)?;
-
Atomic Claiming
let claimed = self.dal.cron_schedule() .claim_and_update(schedule.id, now, now, next_run)?; if !claimed { debug!("Schedule {} was already claimed by another instance", schedule.id); return Ok(()); }
-
Execution Time Calculation (based on catchup policy)
let execution_times = self.calculate_execution_times(schedule, now)?;
-
Audit Record Creation (Phase 1 of two-phase commit)
let audit_record_id = self.create_execution_audit(schedule.id, scheduled_time)?;
-
Pipeline Handoff
let pipeline_execution_id = self.execute_workflow(schedule, scheduled_time).await?;
-
Audit Completion (Phase 2 of two-phase commit)
self.complete_execution_audit(audit_record_id, pipeline_execution_id)?;
The system implements comprehensive error handling that maintains system resilience:
- Schedule claiming failures: Continue with other schedules
- Audit record creation failures: Skip this execution but continue
- Pipeline handoff failures: Leave audit record for recovery detection
- Audit completion failures: Log error but don’t fail the execution
The cron scheduler integrates seamlessly with Cloacina’s existing workflow execution infrastructure:
// From default_runner.rs - CronScheduler creation
let cron_scheduler = CronScheduler::new(
Arc::new(dal),
Arc::new(self.clone()), // DefaultRunner implements PipelineExecutor!
cron_config,
cron_shutdown_rx,
);
This design allows the cron scheduler to leverage all of Cloacina’s existing capabilities:
- Task retry logic
- Dependency resolution
- Context management
- Multi-tenancy support
- Performance monitoring
The recovery system is a critical component that ensures no executions are permanently lost due to system failures. It operates as an independent service within the cron scheduling subsystem.
The recovery service runs on a separate interval from the main scheduler:
// From cron_recovery.rs
impl CronRecovery {
pub async fn run(&self) -> Result<(), PipelineError> {
let mut interval = tokio::time::interval(self.config.recovery_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.recover_lost_executions().await {
error!("Recovery cycle failed: {}", e);
}
}
_ = self.shutdown_rx.recv() => {
info!("Recovery service shutting down");
break;
}
}
}
Ok(())
}
}
The recovery process identifies lost executions using sophisticated criteria:
sequenceDiagram participant RS as Recovery Service participant DB as Database participant CS as CronScheduler participant PE as PipelineExecutor Note over RS,PE: Recovery Process Flow loop Every recovery_interval RS->>DB: Find lost executions Note over DB: LEFT JOIN cron_executions<br/>with pipeline_executions DB-->>RS: Return orphaned audit records loop For each lost execution RS->>RS: Validate execution age RS->>RS: Check retry attempts RS->>DB: Verify schedule still exists alt Recovery Criteria Met RS->>PE: Retry workflow execution alt Retry Success PE-->>RS: Return new pipeline_execution_id RS->>DB: Update audit record Note over DB: Link to new pipeline execution else Retry Fails RS->>RS: Increment retry count Note over RS: Will retry later if<br/>under attempt limit end else Too Old or Max Retries RS->>RS: Abandon execution Note over RS: Log abandonment end end end
async fn recover_lost_executions(&self) -> Result<(), PipelineError> {
let threshold_minutes = self.config.lost_threshold_minutes;
let lost_executions = self.dal
.cron_execution()
.find_lost_executions(threshold_minutes)?;
info!("Found {} potentially lost executions", lost_executions.len());
for execution in lost_executions {
if let Err(e) = self.recover_execution(&execution).await {
error!("Failed to recover execution {}: {}", execution.id, e);
}
}
Ok(())
}
The detection uses a LEFT JOIN to find audit records without corresponding pipeline executions:
- Time-based filtering: Only consider executions older than the threshold (default: 5 minutes)
- Null check: Find cron_executions where pipeline_execution_id is NULL
- Age verification: Ensure enough time has passed to confidently declare them “lost”
When recovering a lost execution, the system preserves the original schedule context while adding recovery metadata:
async fn recover_execution(&self, execution: &CronExecution) -> Result<(), PipelineError> {
// Validate the schedule still exists and is enabled
let schedule = match self.dal.cron_schedule().get_by_id(execution.schedule_id)? {
Some(s) if s.enabled => s,
Some(_) => {
debug!("Schedule {} is disabled, skipping recovery", execution.schedule_id);
return Ok(());
}
None => {
warn!("Schedule {} no longer exists, skipping recovery", execution.schedule_id);
return Ok(());
}
};
// Create recovery context with additional metadata
let mut context = Context::new();
context.insert("is_recovery", json!(true))?;
context.insert("original_scheduled_time", json!(execution.scheduled_time()))?;
context.insert("recovery_attempt_time", json!(Utc::now()))?;
context.insert("execution_id", json!(execution.id))?;
// Execute the workflow through the pipeline
match self.executor.execute_workflow(&schedule.workflow_name, context).await {
Ok(pipeline_execution_id) => {
// Link the recovered execution to the pipeline execution
self.dal.cron_execution().update_pipeline_execution_id(
execution.id,
pipeline_execution_id,
)?;
info!("Successfully recovered execution {} for schedule {}",
execution.id, execution.schedule_id);
}
Err(e) => {
error!("Recovery execution failed for {}: {}", execution.id, e);
return Err(e);
}
}
Ok(())
}
The recovery system implements several policies to prevent runaway recovery attempts:
Age Limits:
let execution_age = Utc::now() - execution.scheduled_time();
if execution_age > self.config.max_recovery_age {
warn!("Execution {} is too old to recover, abandoning", execution.id);
return Ok(());
}
Retry Limits:
let mut attempts = self.recovery_attempts.lock().await;
let attempt_count = attempts.entry(execution.id).or_insert(0);
*attempt_count += 1;
if *attempt_count > self.config.max_recovery_attempts {
error!("Execution {} has exceeded max recovery attempts, abandoning", execution.id);
return Ok(());
}
Schedule Validation:
- Only recover executions for schedules that still exist
- Skip recovery for disabled schedules
- Validate workflow names still exist in the system
The recovery system provides detailed metrics for operational monitoring:
pub struct RecoveryStats {
pub total_recovery_attempts: u64,
pub successful_recoveries: u64,
pub failed_recoveries: u64,
pub abandoned_recoveries: u64,
pub recovery_success_rate: f64,
}
These metrics enable operators to:
- Track recovery effectiveness
- Identify patterns in lost executions
- Tune recovery parameters for optimal performance
- Set up alerting for high recovery rates (indicating system issues)
Several key design decisions make this architecture particularly robust:
The separation between scheduling (CronScheduler) and execution (PipelineExecutor) follows the saga pattern, where each service has clear responsibilities and compensation actions. This prevents the complexity that would arise from trying to handle both concerns in a single component.
By creating audit records in the database before any execution, the system ensures that no execution intent is lost even in the face of process crashes or network failures. This is more reliable than in-memory coordination mechanisms.
The claim_and_update
operation atomically updates both the last_run_at
and next_run_at
timestamps, preventing race conditions between multiple scheduler instances while providing clear semantics for when a schedule was last processed.
The recovery service runs independently of the main scheduler, providing resilience even if the scheduler itself fails. This separation of concerns allows for different retry policies and prevents recovery logic from complicating the main scheduling loop.
The sophisticated handling of timestamps (PostgreSQL using database defaults, SQLite using application-level timestamps) ensures consistent behavior across different database backends while leveraging each database’s strengths.
This architecture demonstrates how sophisticated distributed systems patterns can be applied to solve the fundamental reliability problems in cron-based scheduling, providing enterprise-grade guarantees while maintaining operational simplicity.