Workflow
The Workflow class represents a built workflow that can be executed by a runner. Workflows are typically created using the WorkflowBuilder class.
A workflow defines the structure and execution order of tasks, including their dependencies and metadata. Once built, workflows are immutable and can be executed multiple times with different contexts.
Workflows are created using the WorkflowBuilder:
import cloaca
# Define tasks
@cloaca.task(id="task_a")
def task_a(context):
context.set("step", "A completed")
return context
@cloaca.task(id="task_b", dependencies=["task_a"])
def task_b(context):
previous_step = context.get("step")
context.set("step", f"{previous_step}, B completed")
return context
# Create workflow
builder = cloaca.WorkflowBuilder("my_workflow")
builder.description("Example workflow with dependencies")
builder.add_task("task_a")
builder.add_task("task_b")
workflow = builder.build()
name(str): Unique identifier for the workflowdescription(str): Human-readable description of the workflow’s purposetasks(list): List of tasks in the workflowdependencies(dict): Task dependency mapping
# Get workflow information
print(f"Workflow name: {workflow.name}")
print(f"Description: {workflow.description}")
print(f"Number of tasks: {len(workflow.tasks)}")
Workflows are executed using a DefaultRunner:
# Create runner
runner = cloaca.DefaultRunner("sqlite:///:memory:")
# Register workflow
cloaca.register_workflow_constructor("my_workflow", lambda: workflow)
# Execute workflow
context = cloaca.Context({"input_data": "example"})
result = runner.execute("my_workflow", context)
print(f"Execution status: {result.status}")
print(f"Final context: {result.final_context.data}")
Workflows are automatically validated during the build process:
try:
workflow = builder.build()
print("Workflow is valid")
except Exception as e:
print(f"Validation failed: {e}")
Common validation errors include:
- Circular dependencies between tasks
- Missing task dependencies
- Duplicate task IDs
- Invalid task references
import cloaca
@cloaca.task(id="extract")
def extract_data(context):
"""Extract data from source."""
# Simulate data extraction
raw_data = {"users": 100, "orders": 250, "revenue": 15000}
context.set("raw_data", raw_data)
return context
@cloaca.task(id="transform", dependencies=["extract"])
def transform_data(context):
"""Transform the extracted data."""
raw_data = context.get("raw_data")
# Calculate metrics
avg_order_value = raw_data["revenue"] / raw_data["orders"]
metrics = {
"total_users": raw_data["users"],
"total_orders": raw_data["orders"],
"total_revenue": raw_data["revenue"],
"avg_order_value": round(avg_order_value, 2)
}
context.set("metrics", metrics)
return context
@cloaca.task(id="load", dependencies=["transform"])
def load_data(context):
"""Load processed data to destination."""
metrics = context.get("metrics")
# Simulate loading to database/file
print(f"Loading metrics: {metrics}")
context.set("load_complete", True)
return context
# Create ETL workflow
def create_etl_workflow():
builder = cloaca.WorkflowBuilder("etl_pipeline")
builder.description("Extract, Transform, Load data pipeline")
builder.add_task("extract")
builder.add_task("transform")
builder.add_task("load")
return builder.build()
# Usage
etl_workflow = create_etl_workflow()
Workflows support parallel execution of independent tasks:
@cloaca.task(id="fetch_users")
def fetch_users(context):
# Simulate API call
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
context.set("users", users)
return context
@cloaca.task(id="fetch_orders")
def fetch_orders(context):
# Simulate API call
orders = [{"id": 101, "user_id": 1}, {"id": 102, "user_id": 2}]
context.set("orders", orders)
return context
@cloaca.task(id="merge_data", dependencies=["fetch_users", "fetch_orders"])
def merge_data(context):
users = context.get("users")
orders = context.get("orders")
# Merge data
result = {"users": users, "orders": orders}
context.set("merged_data", result)
return context
# Create parallel workflow
def create_parallel_workflow():
builder = cloaca.WorkflowBuilder("parallel_pipeline")
builder.description("Fetch data in parallel, then merge")
builder.add_task("fetch_users")
builder.add_task("fetch_orders")
builder.add_task("merge_data")
return builder.build()
- Single Responsibility: Each workflow should have a clear, focused purpose
- Idempotency: Design workflows to be safely re-runnable
- Error Handling: Include error handling and recovery strategies
- Documentation: Provide clear descriptions and task documentation
- Minimize Dependencies: Only declare necessary dependencies to maximize parallelism
- Context Size: Keep context data reasonably sized for better performance
- Task Granularity: Balance between too many small tasks and too few large tasks
Workflows handle task failures gracefully:
@cloaca.task(id="risky_task")
def risky_task(context):
"""Task that might fail."""
try:
# Potentially failing operation
result = perform_risky_operation()
context.set("success", True)
context.set("result", result)
except Exception as e:
context.set("success", False)
context.set("error", str(e))
# Workflow can continue with error state
return context
@cloaca.task(id="handle_errors", dependencies=["risky_task"])
def handle_errors(context):
"""Handle errors from previous tasks."""
if context.get("success"):
print("Task succeeded!")
else:
error = context.get("error")
print(f"Task failed with error: {error}")
# Implement recovery logic
return context
- WorkflowBuilder - Build workflows
- Task Decorator - Define workflow tasks
- DefaultRunner - Execute workflows
- Context - Data flow between tasks