Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

05 - Cron Scheduling

Cron Scheduling

Welcome to the cron scheduling tutorial! In this tutorial, you’ll learn how to create time-based triggers for your workflows using Cloaca’s built-in cron scheduling capabilities. This is essential for building automated data pipelines, periodic maintenance tasks, and scheduled business processes.

Learning Objectives

  • Understand cron scheduling in Cloaca
  • Create time-based workflow triggers
  • Configure schedule policies and recovery
  • Monitor scheduled executions
  • Handle timezone considerations
  • Implement robust scheduling patterns

Prerequisites

  • Completion of Tutorial 4
  • Understanding of cron expression syntax
  • Basic knowledge of timezone handling
  • Familiarity with Python datetime operations

Time Estimate

25-30 minutes

Cron Scheduling Overview

Cloaca provides built-in cron scheduling that runs within your application process, eliminating the need for external schedulers like crontab or task queues for time-based workflows.

Key Features

  • Cron expressions for flexible scheduling
  • Timezone support for global applications
  • Missed execution recovery when applications restart
  • Execution monitoring and logging
  • Per-schedule configuration for different policies

Basic Cron Scheduling

Let’s start with a simple scheduled workflow:

import sys
import cloaca
from datetime import datetime
import time

# Create workflow using the new workflow-scoped pattern
with cloaca.WorkflowBuilder("daily_report") as builder:
    builder.description("Daily business analytics report")

    # Define a scheduled task within workflow scope
    @cloaca.task(id="daily_report")
    def daily_report(context):
        """Generate daily business report."""
        current_time = datetime.now()

        # Simulate report generation
        report_data = {
            "generated_at": current_time.isoformat(),
            "total_orders": 150,
            "revenue": 12500.50,
            "active_users": 89
        }

        print(f"📊 Daily Report Generated at {current_time}")
        print(f"   Orders: {report_data['total_orders']}")
        print(f"   Revenue: ${report_data['revenue']}")
        print(f"   Users: {report_data['active_users']}")

        context.set("report_data", report_data)
        context.set("report_type", "daily")

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

def basic_cron_scheduling():
    """Demonstrate basic cron scheduling."""
    print("=== Basic Cron Scheduling Demo ===")

    # Create runner with cron scheduling enabled
    runner = cloaca.DefaultRunner(":memory:")

    # Workflow is automatically registered when context exits

    # Create cron schedule
    schedule = cloaca.CronSchedule(
        workflow_name="daily_report",
        cron_expression="0 9 * * *",  # Daily at 9:00 AM
        timezone="UTC",
        enabled=True
    )

    try:
        # Register the schedule
        runner.add_cron_schedule(schedule)
        print("✓ Cron schedule registered: Daily at 9:00 AM UTC")

        # For demo purposes, we'll use a frequent schedule
        demo_schedule = cloaca.CronSchedule(
            workflow_name="daily_report",
            cron_expression="*/30 * * * * *",  # Every 30 seconds for demo
            timezone="UTC",
            enabled=True
        )

        runner.add_cron_schedule(demo_schedule)
        print("✓ Demo schedule added: Every 30 seconds")

        # Run for a short time to see executions
        print("⏰ Waiting for scheduled executions...")
        time.sleep(65)  # Wait for at least 2 executions

    finally:
        runner.shutdown()

if __name__ == "__main__":
    basic_cron_scheduling()

Advanced Scheduling Patterns

Multi-Schedule Workflows

import cloaca
from datetime import datetime, timezone
import json

# Create maintenance workflows using the new workflow-scoped pattern
with cloaca.WorkflowBuilder("full_backup") as builder:
    builder.description("Weekly full database backup")

    @cloaca.task(id="data_backup")
    def data_backup(context):
        """Perform database backup."""
        backup_type = context.get("backup_type", "incremental")
        timestamp = datetime.now().isoformat()

        print(f"💾 Performing {backup_type} backup at {timestamp}")

        # Simulate backup process
        if backup_type == "full":
            print("   Full backup: All tables exported")
            backup_size = "2.5GB"
        else:
            print("   Incremental backup: Changed records only")
            backup_size = "150MB"

        context.set("backup_completed_at", timestamp)
        context.set("backup_size", backup_size)
        context.set("backup_type", backup_type)

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

with cloaca.WorkflowBuilder("incremental_backup") as builder:
    builder.description("Daily incremental backup")

    @cloaca.task(id="data_backup")
    def data_backup_incremental(context):
        """Perform database backup."""
        backup_type = context.get("backup_type", "incremental")
        timestamp = datetime.now().isoformat()

        print(f"💾 Performing {backup_type} backup at {timestamp}")

        # Simulate backup process
        if backup_type == "full":
            print("   Full backup: All tables exported")
            backup_size = "2.5GB"
        else:
            print("   Incremental backup: Changed records only")
            backup_size = "150MB"

        context.set("backup_completed_at", timestamp)
        context.set("backup_size", backup_size)
        context.set("backup_type", backup_type)

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

with cloaca.WorkflowBuilder("log_cleanup") as builder:
    builder.description("Weekly log file cleanup")

    @cloaca.task(id="cleanup_logs")
    def cleanup_logs(context):
        """Clean up old log files."""
        retention_days = context.get("retention_days", 30)
        timestamp = datetime.now().isoformat()

        print(f"🧹 Cleaning logs older than {retention_days} days at {timestamp}")

        # Simulate cleanup
        files_removed = 47
        space_freed = "1.2GB"

        context.set("cleanup_completed_at", timestamp)
        context.set("files_removed", files_removed)
        context.set("space_freed", space_freed)

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

with cloaca.WorkflowBuilder("health_check") as builder:
    builder.description("Hourly system health monitoring")

    @cloaca.task(id="system_health_check")
    def system_health_check(context):
        """Perform system health monitoring."""
        timestamp = datetime.now().isoformat()

        print(f"🏥 System health check at {timestamp}")

        # Simulate health checks
        health_status = {
            "cpu_usage": 45.2,
            "memory_usage": 62.8,
            "disk_usage": 73.1,
            "active_connections": 234,
            "response_time_ms": 145
        }

        # Determine overall health
        if health_status["cpu_usage"] > 80 or health_status["memory_usage"] > 90:
            overall_status = "warning"
        elif health_status["cpu_usage"] > 95 or health_status["memory_usage"] > 95:
            overall_status = "critical"
        else:
            overall_status = "healthy"

        print(f"   Status: {overall_status.upper()}")
        print(f"   CPU: {health_status['cpu_usage']}%")
        print(f"   Memory: {health_status['memory_usage']}%")

        context.set("health_check_at", timestamp)
        context.set("health_status", health_status)
        context.set("overall_status", overall_status)

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

def demonstrate_multiple_schedules():
    """Demonstrate multiple cron schedules for different maintenance tasks."""
    print("=== Multiple Schedule Management ===")

    # Create runner
    runner = cloaca.DefaultRunner("sqlite:///:memory:")

    # Workflows are automatically registered when WorkflowBuilder context exits

    # Define schedules with different patterns
    schedules = [
        # Full backup: Weekly on Sunday at 2:00 AM
        cloaca.CronSchedule(
            workflow_name="full_backup",
            cron_expression="0 2 * * SUN",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"backup_type": "full"})
        ),

        # Incremental backup: Daily at 3:00 AM
        cloaca.CronSchedule(
            workflow_name="incremental_backup",
            cron_expression="0 3 * * *",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"backup_type": "incremental"})
        ),

        # Log cleanup: Weekly on Saturday at 1:00 AM
        cloaca.CronSchedule(
            workflow_name="log_cleanup",
            cron_expression="0 1 * * SAT",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"retention_days": 30})
        ),

        # Health check: Every 15 minutes during business hours
        cloaca.CronSchedule(
            workflow_name="health_check",
            cron_expression="*/15 9-17 * * MON-FRI",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({})
        )
    ]

    # For demo, use frequent schedules
    demo_schedules = [
        cloaca.CronSchedule(
            workflow_name="full_backup",
            cron_expression="*/45 * * * * *",  # Every 45 seconds
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"backup_type": "full"})
        ),
        cloaca.CronSchedule(
            workflow_name="health_check",
            cron_expression="*/20 * * * * *",  # Every 20 seconds
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({})
        )
    ]

    try:
        # Register production schedules (commented out for demo)
        for schedule in schedules:
            # runner.add_cron_schedule(schedule)
            print(f"📅 Would register: {schedule.workflow_name} - {schedule.cron_expression}")

        # Register demo schedules
        for schedule in demo_schedules:
            runner.add_cron_schedule(schedule)
            print(f"✓ Demo schedule: {schedule.workflow_name} - {schedule.cron_expression}")

        print("\n⏰ Running demo schedules...")
        time.sleep(70)  # Let schedules run

    finally:
        runner.shutdown()

if __name__ == "__main__":
    demonstrate_multiple_schedules()

Timezone and Recovery Configuration

import cloaca
from datetime import datetime, timezone, timedelta
import time

# Create global synchronization workflow using the new workflow-scoped pattern
with cloaca.WorkflowBuilder("global_sync") as builder:
    builder.description("Cross-region data synchronization")

    @cloaca.task(id="global_sync")
    def global_sync(context):
        """Synchronize data across global regions."""
        region = context.get("region", "unknown")
        sync_type = context.get("sync_type", "delta")
        current_time = datetime.now()

        print(f"🌍 Global sync for {region} at {current_time}")
        print(f"   Sync type: {sync_type}")

        # Simulate region-specific processing
        if region == "us-east":
            records_synced = 15420
        elif region == "eu-west":
            records_synced = 8930
        elif region == "asia-pacific":
            records_synced = 12150
        else:
            records_synced = 5000

        print(f"   Records synced: {records_synced}")

        context.set("sync_completed_at", current_time.isoformat())
        context.set("records_synced", records_synced)
        context.set("region", region)

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

def demonstrate_timezone_scheduling():
    """Demonstrate timezone-aware scheduling and recovery policies."""
    print("=== Timezone & Recovery Configuration ===")

    # Create runner with recovery settings
    runner = cloaca.DefaultRunner("sqlite:///:memory:")

    # Workflow is automatically registered when WorkflowBuilder context exits

    # Create timezone-specific schedules
    regional_schedules = [
        # US East Coast: 6:00 AM ET (11:00 AM UTC)
        cloaca.CronSchedule(
            workflow_name="global_sync",
            cron_expression="0 11 * * *",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({
                "region": "us-east",
                "sync_type": "full"
            })
        ),

        # Europe: 6:00 AM CET (5:00 AM UTC)
        cloaca.CronSchedule(
            workflow_name="global_sync",
            cron_expression="0 5 * * *",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({
                "region": "eu-west",
                "sync_type": "incremental"
            })
        ),

        # Asia Pacific: 6:00 AM JST (9:00 PM UTC previous day)
        cloaca.CronSchedule(
            workflow_name="global_sync",
            cron_expression="0 21 * * *",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({
                "region": "asia-pacific",
                "sync_type": "delta"
            })
        )
    ]

    # Demo schedules (every 25 seconds for different regions)
    demo_schedules = [
        cloaca.CronSchedule(
            workflow_name="global_sync",
            cron_expression="*/25 * * * * *",
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({
                "region": "us-east",
                "sync_type": "demo"
            })
        ),
        cloaca.CronSchedule(
            workflow_name="global_sync",
            cron_expression="5,30,55 * * * * *",  # Offset by 5 seconds
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({
                "region": "eu-west",
                "sync_type": "demo"
            })
        )
    ]

    try:
        # Show production schedules
        print("📍 Production Regional Schedules:")
        for schedule in regional_schedules:
            print(f"   {schedule.context.get('region')}: {schedule.cron_expression} UTC")

        # Register demo schedules
        print("\n🚀 Starting demo schedules...")
        for schedule in demo_schedules:
            runner.add_cron_schedule(schedule)
            print(f"✓ {schedule.context.get('region')}: {schedule.cron_expression}")

        print("\n⏰ Watching scheduled executions...")
        time.sleep(70)

    finally:
        runner.shutdown()

if __name__ == "__main__":
    demonstrate_timezone_scheduling()

Schedule Management and Monitoring

import cloaca
from datetime import datetime
import time
import json

# Create schedule monitoring workflow using the new workflow-scoped pattern
with cloaca.WorkflowBuilder("schedule_monitor") as builder:
    builder.description("Schedule execution monitoring")

    @cloaca.task(id="schedule_monitor")
    def schedule_monitor(context):
        """Monitor and report on schedule execution."""
        monitor_type = context.get("monitor_type", "health")
        timestamp = datetime.now()

        print(f"📊 Schedule monitoring ({monitor_type}) at {timestamp}")

        # Simulate monitoring data
        if monitor_type == "health":
            metrics = {
                "active_schedules": 12,
                "successful_executions_24h": 87,
                "failed_executions_24h": 2,
                "average_execution_time_ms": 1250,
                "next_execution": (datetime.now() + timedelta(minutes=30)).isoformat()
            }
        else:  # performance
            metrics = {
                "peak_concurrent_workflows": 8,
                "resource_utilization": 34.5,
                "queue_length": 3,
                "avg_wait_time_ms": 150
            }

        print(f"   Metrics: {json.dumps(metrics, indent=2)}")

        context.set("monitoring_data", metrics)
        context.set("monitor_type", monitor_type)
        context.set("measured_at", timestamp.isoformat())

        return context
    # Task is automatically registered when defined within WorkflowBuilder context

class ScheduleManager:
    """Advanced schedule management with monitoring."""

    def __init__(self, database_url: str):
        self.runner = cloaca.DefaultRunner(database_url)
        self.active_schedules = {}

    def register_workflow(self, name: str, constructor):
        """Register a workflow constructor (not needed with new workflow-scoped pattern)."""
        # With the new workflow-scoped pattern, workflows are automatically registered
        # when the WorkflowBuilder context exits, so this method is not needed
        pass

    def add_schedule(self, schedule_id: str, schedule: cloaca.CronSchedule):
        """Add a named schedule for management."""
        self.runner.add_cron_schedule(schedule)
        self.active_schedules[schedule_id] = schedule
        print(f"✓ Added schedule '{schedule_id}': {schedule.workflow_name} - {schedule.cron_expression}")

    def list_schedules(self):
        """List all active schedules."""
        print(f"\n📋 Active Schedules ({len(self.active_schedules)}):")
        for schedule_id, schedule in self.active_schedules.items():
            status = "🟢 Enabled" if schedule.enabled else "🔴 Disabled"
            print(f"   {schedule_id}: {schedule.workflow_name}")
            print(f"      Expression: {schedule.cron_expression}")
            print(f"      Timezone: {schedule.timezone}")
            print(f"      Status: {status}")
            print()

    def get_schedule_info(self, schedule_id: str):
        """Get detailed information about a specific schedule."""
        if schedule_id not in self.active_schedules:
            print(f"❌ Schedule '{schedule_id}' not found")
            return None

        schedule = self.active_schedules[schedule_id]
        return {
            "id": schedule_id,
            "workflow_name": schedule.workflow_name,
            "cron_expression": schedule.cron_expression,
            "timezone": schedule.timezone,
            "enabled": schedule.enabled,
            "context": dict(schedule.context.data) if schedule.context else {}
        }

    def shutdown(self):
        """Clean shutdown of the schedule manager."""
        print("\n🛑 Shutting down schedule manager...")
        self.runner.shutdown()

def demonstrate_schedule_management():
    """Demonstrate advanced schedule management capabilities."""
    print("=== Schedule Management & Monitoring ===")

    # Create schedule manager
    manager = ScheduleManager("sqlite:///:memory:")

    # Monitoring workflow is automatically registered when WorkflowBuilder context exits
    # No need to manually register workflows with the new pattern

    # Add various schedules
    schedules = {
        "health_monitor": cloaca.CronSchedule(
            workflow_name="schedule_monitor",
            cron_expression="*/30 * * * * *",  # Every 30 seconds
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"monitor_type": "health"})
        ),

        "performance_monitor": cloaca.CronSchedule(
            workflow_name="schedule_monitor",
            cron_expression="*/45 * * * * *",  # Every 45 seconds
            timezone="UTC",
            enabled=True,
            context=cloaca.Context({"monitor_type": "performance"})
        )
    }

    try:
        # Add schedules
        for schedule_id, schedule in schedules.items():
            manager.add_schedule(schedule_id, schedule)

        # List active schedules
        manager.list_schedules()

        # Show schedule details
        print("🔍 Schedule Details:")
        for schedule_id in schedules.keys():
            info = manager.get_schedule_info(schedule_id)
            print(f"   {json.dumps(info, indent=2)}")

        print("\n⏰ Running monitoring schedules...")
        time.sleep(60)

    finally:
        manager.shutdown()

if __name__ == "__main__":
    demonstrate_schedule_management()

Running the Complete Example

Save this as python_cron_tutorial.py:

#!/usr/bin/env python3
"""
Cloaca Cron Scheduling Tutorial
Complete example demonstrating time-based workflow execution.
"""

import cloaca
from datetime import datetime, timedelta
import time
import json

# Create all workflow definitions using the new workflow-scoped pattern
with cloaca.WorkflowBuilder("daily_report") as builder:
    builder.description("Daily business analytics")

    @cloaca.task(id="daily_report")
    def daily_report(context):
        """Generate daily business report."""
        current_time = datetime.now()
        report_data = {
            "generated_at": current_time.isoformat(),
            "total_orders": 150,
            "revenue": 12500.50,
            "active_users": 89
        }

        print(f"📊 Daily Report Generated at {current_time}")
        print(f"   Orders: {report_data['total_orders']}")
        print(f"   Revenue: ${report_data['revenue']}")
        print(f"   Users: {report_data['active_users']}")

        context.set("report_data", report_data)
        return context
    # Task is automatically registered when defined within WorkflowBuilder context

with cloaca.WorkflowBuilder("system_backup") as builder:
    builder.description("System data backup")

    @cloaca.task(id="system_backup")
    def system_backup(context):
        """Perform system backup."""
        backup_type = context.get("backup_type", "incremental")
        timestamp = datetime.now()

        print(f"💾 {backup_type.title()} backup at {timestamp}")

        context.set("backup_completed", timestamp.isoformat())
        context.set("backup_type", backup_type)
        return context
    # Task is automatically registered when defined within WorkflowBuilder context

def main():
    """Main tutorial demonstration."""
    print("🕐 Cloaca Cron Scheduling Tutorial")
    print("=" * 50)

    # Create runner
    runner = cloaca.DefaultRunner("sqlite:///:memory:")

    try:
        # Workflows are automatically registered when WorkflowBuilder context exits

        # Create schedules
        schedules = [
            # Daily report every 20 seconds (demo)
            cloaca.CronSchedule(
                workflow_name="daily_report",
                cron_expression="*/20 * * * * *",
                timezone="UTC",
                enabled=True,
                context=cloaca.Context({})
            ),

            # Backup every 35 seconds (demo)
            cloaca.CronSchedule(
                workflow_name="system_backup",
                cron_expression="*/35 * * * * *",
                timezone="UTC",
                enabled=True,
                context=cloaca.Context({"backup_type": "incremental"})
            )
        ]

        # Register schedules
        for schedule in schedules:
            runner.add_cron_schedule(schedule)
            print(f"✓ Scheduled: {schedule.workflow_name} - {schedule.cron_expression}")

        print(f"\n⏰ Running schedules for 75 seconds...")
        print("   (You should see executions approximately every 20 and 35 seconds)")

        time.sleep(75)

        print("\n✅ Tutorial completed successfully!")
        print("\nWhat you learned:")
        print("- Creating cron schedules with expressions")
        print("- Registering schedules with workflows")
        print("- Configuring timezone and context")
        print("- Managing multiple concurrent schedules")

    finally:
        print("\n🛑 Shutting down...")
        runner.shutdown()

if __name__ == "__main__":
    main()

Run the tutorial:

python python_cron_tutorial.py

Best Practices

Design robust schedules:

  • Use appropriate intervals for your use case
  • Consider business hours and maintenance windows
  • Plan for timezone differences in global applications
  • Avoid overlapping long-running schedules

Handle failures gracefully:

  • Implement retry logic in tasks
  • Monitor for missed executions
  • Log schedule execution outcomes
  • Set up alerting for critical scheduled workflows

Optimize performance:

  • Limit concurrent scheduled executions
  • Use appropriate database backends for scale
  • Monitor resource usage during peak schedule times
  • Consider using separate runners for different schedule types

Production Considerations

Cron Expression Examples

# Common patterns
"0 0 * * *"        # Daily at midnight
"0 */6 * * *"      # Every 6 hours
"0 9 * * MON-FRI"  # Weekdays at 9 AM
"0 0 1 * *"        # First day of month
"0 2 * * SUN"      # Weekly on Sunday at 2 AM

Deployment Tips

  1. Environment Configuration: Use environment variables for schedule expressions
  2. Health Monitoring: Monitor schedule execution in production
  3. Graceful Shutdown: Ensure runners shut down cleanly
  4. Resource Limits: Set appropriate connection and memory limits

What You’ve Learned

Congratulations! You now understand:

  • Cron expressions and how to use them with Cloaca
  • Schedule registration and workflow management
  • Timezone handling for global applications
  • Recovery and monitoring capabilities
  • Production considerations for scheduled workflows

Next Steps

With cron scheduling mastered, you’re ready to:

  1. API Reference - Explore advanced scheduling options
  2. Examples - See real-world scheduling patterns
  3. Performance Guide - Optimize scheduled workflows