01 - Your First Python Workflow
Welcome to your first Cloacina Python tutorial! In this guide, you’ll learn how to create and execute a simple workflow using Python decorators and the workflow builder pattern. By the end of this tutorial, you’ll understand the basic concepts of tasks, workflows, context, and execution in Python.
- Define tasks using Python decorators
- Build workflows with the WorkflowBuilder
- Execute workflows with the DefaultRunner
- Pass data between tasks using Context
- Handle workflow results and cleanup
- Basic knowledge of Python
- Python 3.9 or higher installed
- A code editor of your choice
10-15 minutes
First, install the Python bindings:
pip install cloaca[sqlite]
Let’s create a simple workflow with three tasks that demonstrates the core concepts. Create a new file called first_workflow.py
:
import cloaca
# Define tasks using the @task decorator
@cloaca.task(id="start_process")
def start_process(context):
"""Initialize the workflow with some data."""
print("Starting the workflow...")
# Add initial data to the context
context.set("process_id", "proc_001")
context.set("start_time", "2025-01-07T10:00:00Z")
context.set("items_to_process", 10)
print(f"Process {context.get('process_id')} initialized")
return context
@cloaca.task(id="process_data", dependencies=["start_process"])
def process_data(context):
"""Process the data from the previous task."""
print("Processing data...")
# Get data from the previous task
process_id = context.get("process_id")
items_count = context.get("items_to_process")
# Simulate processing
processed_items = []
for i in range(items_count):
processed_items.append(f"item_{i}_processed")
# Store results in context
context.set("processed_items", processed_items)
context.set("processing_complete", True)
print(f"Processed {len(processed_items)} items for {process_id}")
return context
@cloaca.task(id="finalize_process", dependencies=["process_data"])
def finalize_process(context):
"""Finalize the workflow and generate summary."""
print("Finalizing process...")
# Get all the data
process_id = context.get("process_id")
start_time = context.get("start_time")
processed_items = context.get("processed_items")
# Create summary
summary = {
"process_id": process_id,
"start_time": start_time,
"items_processed": len(processed_items),
"status": "completed"
}
context.set("final_summary", summary)
print(f"Process {process_id} completed successfully")
return context
# Create workflow builder function
def create_simple_workflow():
"""Build and return the workflow."""
builder = cloaca.WorkflowBuilder("simple_workflow")
builder.description("A simple three-task workflow demonstrating basic concepts")
# Add tasks to the workflow
builder.add_task("start_process")
builder.add_task("process_data")
builder.add_task("finalize_process")
return builder.build()
# Register the workflow
cloaca.register_workflow_constructor("simple_workflow", create_simple_workflow)
# Execute the workflow
if __name__ == "__main__":
print("=== Cloacina Python Workflow Tutorial ===")
# Create a runner with SQLite database
runner = cloaca.DefaultRunner("sqlite:///tutorial_01.db")
# Create initial context
context = cloaca.Context({"tutorial": "01", "user": "learner"})
# Execute the workflow
print("\nExecuting workflow...")
result = runner.execute("simple_workflow", context)
# Check results
print(f"\nWorkflow Status: {result.status}")
if result.status == "Completed":
print("Success! Workflow completed.")
# Access the final context and results
final_context = result.final_context
summary = final_context.get("final_summary")
print(f"Final Summary: {summary}")
print(f"All context data: {dict(final_context)}")
else:
print(f"Workflow failed with status: {result.status}")
if hasattr(result, 'error'):
print(f"Error: {result.error}")
# Clean up
print("\nCleaning up...")
runner.shutdown()
print("Tutorial completed!")
Let’s examine the workflow structure and execution flow:
graph TD A[start_process] --> B[process_data] B --> C[finalize_process] style A fill:#e6e6ff,stroke:#333,stroke-width:2px,color:#000 style B fill:#ffe6e6,stroke:#333,stroke-width:2px,color:#000 style C fill:#e6ffe6,stroke:#333,stroke-width:2px,color:#000
The diagram shows:
- Sequential execution from start to finish
- Each task depends on the previous one
- Data flows through the context between tasks
Let’s walk through each component:
@cloaca.task(id="start_process")
def start_process(context):
# Task logic here
return context
Key points:
- Use the
@cloaca.task
decorator - Provide a unique
id
for each task - Tasks receive a
context
parameter - Always return the context (modified or unmodified)
@cloaca.task(id="process_data", dependencies=["start_process"])
def process_data(context):
# This task runs after start_process completes
return context
Dependencies ensure:
- Tasks run in the correct order
- Dependent tasks wait for their prerequisites
- Failed dependencies prevent downstream execution
# Set data in one task
context.set("process_id", "proc_001")
# Get data in another task
process_id = context.get("process_id")
The context provides:
- Persistent data storage across tasks
- Type-safe data access
- Automatic serialization/deserialization
def create_simple_workflow():
builder = cloaca.WorkflowBuilder("simple_workflow")
builder.description("A simple three-task workflow")
builder.add_task("start_process")
builder.add_task("process_data")
builder.add_task("finalize_process")
return builder.build()
The builder pattern:
- Creates workflows programmatically
- Validates task dependencies
- Generates executable workflow objects
# Register the workflow constructor
cloaca.register_workflow_constructor("simple_workflow", create_simple_workflow)
# Create runner and execute
runner = cloaca.DefaultRunner("sqlite:///tutorial_01.db")
result = runner.execute("simple_workflow", context)
Execution involves:
- Registering workflow constructors globally
- Creating a runner with database connection
- Executing by workflow name with initial context
Execute your workflow:
python first_workflow.py
You should see output like this:
=== Cloacina Python Workflow Tutorial ===
Executing workflow...
Starting the workflow...
Process proc_001 initialized
Processing data...
Processed 10 items for proc_001
Finalizing process...
Process proc_001 completed successfully
Workflow Status: Completed
Success! Workflow completed.
Final Summary: {'process_id': 'proc_001', 'start_time': '2025-01-07T10:00:00Z', 'items_processed': 10, 'status': 'completed'}
All context data: {'tutorial': '01', 'user': 'learner', 'process_id': 'proc_001', 'start_time': '2025-01-07T10:00:00Z', 'items_to_process': 10, 'processed_items': ['item_0_processed', 'item_1_processed', ...], 'processing_complete': True, 'final_summary': {...}}
Cleaning up...
Tutorial completed!
Tasks are Python functions decorated with @cloaca.task
:
- Each task has a unique ID
- Tasks can specify dependencies
- Tasks receive and return a context
- Tasks can be async or synchronous
Dependencies control execution order:
- Use
dependencies=["task_id"]
parameter - Multiple dependencies:
dependencies=["task1", "task2"]
- Creates a directed acyclic graph (DAG)
- Automatic parallelization when possible
Context manages data flow:
context.set(key, value)
stores datacontext.get(key)
retrieves data- Persisted throughout workflow execution
- Available in final results
WorkflowBuilder assembles workflows:
- Builder pattern for clean construction
- Automatic dependency validation
- Generates executable workflow objects
- Must be registered before execution
Try these modifications to deepen your understanding:
Add a new task that validates the processed data:
@cloaca.task(id="validate_results", dependencies=["process_data"])
def validate_results(context):
"""Validate that processing was successful."""
processed_items = context.get("processed_items")
# Validation logic
if len(processed_items) > 0:
context.set("validation_passed", True)
print("Validation passed")
else:
context.set("validation_passed", False)
print("Validation failed")
return context
Update the workflow to include validation before finalization:
# Update finalize_process dependencies
@cloaca.task(id="finalize_process", dependencies=["validate_results"])
def finalize_process(context):
# ... existing code ...
Create two parallel processing tasks:
@cloaca.task(id="process_odds", dependencies=["start_process"])
def process_odds(context):
"""Process odd-numbered items."""
items_count = context.get("items_to_process")
odds = [f"odd_item_{i}" for i in range(1, items_count, 2)]
context.set("odd_items", odds)
return context
@cloaca.task(id="process_evens", dependencies=["start_process"])
def process_evens(context):
"""Process even-numbered items."""
items_count = context.get("items_to_process")
evens = [f"even_item_{i}" for i in range(0, items_count, 2)]
context.set("even_items", evens)
return context
@cloaca.task(id="combine_results", dependencies=["process_odds", "process_evens"])
def combine_results(context):
"""Combine parallel processing results."""
odds = context.get("odd_items")
evens = context.get("even_items")
combined = odds + evens
context.set("combined_items", combined)
return context
Congratulations! You’ve created and executed your first Python workflow. In the next tutorial, we’ll explore:
- Advanced context handling and data types
- Working with complex data structures
- Context serialization and type safety
- Best practices for data flow
- Quick Start Guide - Simpler introduction
- API Reference - Complete API documentation
- Examples - More code examples
Reference ImplementationThis tutorial is based on the test patterns found intest_scenario_02_single_task_workflow_execution.py
andtest_scenario_08_multi_task_workflow_execution.py
from the Cloacina test suite.