Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

02 - Context Handling

Context Handling

Welcome to the second tutorial in our Python Cloacina series! In this tutorial, you’ll learn how to effectively manage data flow between tasks using the Context system. We’ll explore different data types, serialization patterns, and best practices for passing complex data through your workflows.

Learning Objectives

  • Understand the Context data model
  • Work with different data types in Context
  • Implement data transformation pipelines
  • Handle context serialization and type safety
  • Apply best practices for data flow

Prerequisites

  • Completion of Tutorial 1
  • Basic understanding of Python data types
  • Familiarity with JSON serialization

Time Estimate

20-25 minutes

Understanding Context

The Context is Cloacina’s data container that flows through your workflow. It provides:

  • Persistent storage across task executions
  • Type-safe access to stored data
  • Automatic serialization for database persistence
  • Dictionary-like interface for Python developers

Context Data Flow

graph TD
    A[Task A<br/>Sets: user_id, timestamp] --> B[Task B<br/>Gets: user_id<br/>Sets: processed_data]
    B --> C[Task C<br/>Gets: user_id, processed_data<br/>Sets: final_result]

    style A fill:#e6e6ff,stroke:#333,stroke-width:2px,color:#000
    style B fill:#ffe6e6,stroke:#333,stroke-width:2px,color:#000
    style C fill:#e6ffe6,stroke:#333,stroke-width:2px,color:#000

Basic Context Operations

Let’s start with a simple example showing basic context operations:

import cloaca

@cloaca.task(id="context_basics")
def context_basics(context):
    """Demonstrate basic context operations."""

    # Set different data types
    context.set("string_value", "Hello, World!")
    context.set("integer_value", 42)
    context.set("float_value", 3.14159)
    context.set("boolean_value", True)
    context.set("list_value", [1, 2, 3, 4, 5])
    context.set("dict_value", {"name": "Alice", "age": 30})

    # Get values with defaults
    name = context.get("dict_value", {}).get("name", "Unknown")
    print(f"User name: {name}")

    return context

def create_basics_workflow():
    builder = cloaca.WorkflowBuilder("context_basics")
    builder.description("Basic context operations")
    builder.add_task("context_basics")
    return builder.build()

cloaca.register_workflow_constructor("context_basics", create_basics_workflow)

Data Transformation Pipeline

Now let’s build a more complex example that demonstrates data transformation through multiple tasks:

import cloaca
import json
from datetime import datetime

# Data transformation pipeline
@cloaca.task(id="extract_data")
def extract_data(context):
    """Extract raw data from a simulated source."""
    print("Extracting data...")

    # Simulate raw data extraction
    raw_data = {
        "users": [
            {"id": 1, "name": "Alice", "email": "alice@example.com", "score": 85},
            {"id": 2, "name": "Bob", "email": "bob@example.com", "score": 92},
            {"id": 3, "name": "Charlie", "email": "charlie@example.com", "score": 78},
            {"id": 4, "name": "Diana", "email": "diana@example.com", "score": 95}
        ],
        "extracted_at": datetime.now().isoformat(),
        "source": "user_database"
    }

    context.set("raw_data", raw_data)
    context.set("extraction_complete", True)

    print(f"Extracted {len(raw_data['users'])} users")
    return context

@cloaca.task(id="transform_data", dependencies=["extract_data"])
def transform_data(context):
    """Transform the raw data into a processed format."""
    print("Transforming data...")

    # Get raw data from previous task
    raw_data = context.get("raw_data")

    if not raw_data:
        raise ValueError("No raw data found in context")

    # Transform data
    transformed_users = []
    total_score = 0

    for user in raw_data["users"]:
        # Calculate grade based on score
        score = user["score"]
        if score >= 90:
            grade = "A"
        elif score >= 80:
            grade = "B"
        elif score >= 70:
            grade = "C"
        else:
            grade = "F"

        transformed_user = {
            "user_id": user["id"],
            "display_name": user["name"].upper(),
            "email_domain": user["email"].split("@")[1],
            "score": score,
            "grade": grade,
            "performance": "high" if score >= 85 else "standard"
        }

        transformed_users.append(transformed_user)
        total_score += score

    # Create summary statistics
    transformation_result = {
        "users": transformed_users,
        "summary": {
            "total_users": len(transformed_users),
            "average_score": total_score / len(transformed_users),
            "high_performers": len([u for u in transformed_users if u["performance"] == "high"]),
            "grade_distribution": {
                "A": len([u for u in transformed_users if u["grade"] == "A"]),
                "B": len([u for u in transformed_users if u["grade"] == "B"]),
                "C": len([u for u in transformed_users if u["grade"] == "C"]),
                "F": len([u for u in transformed_users if u["grade"] == "F"])
            }
        },
        "transformed_at": datetime.now().isoformat()
    }

    context.set("transformed_data", transformation_result)
    context.set("transformation_complete", True)

    print(f"Transformed {len(transformed_users)} users")
    print(f"Average score: {transformation_result['summary']['average_score']:.1f}")

    return context

@cloaca.task(id="validate_data", dependencies=["transform_data"])
def validate_data(context):
    """Validate the transformed data meets quality standards."""
    print("Validating data...")

    transformed_data = context.get("transformed_data")

    if not transformed_data:
        raise ValueError("No transformed data found in context")

    validation_results = {
        "total_records": len(transformed_data["users"]),
        "validation_checks": {},
        "errors": [],
        "warnings": []
    }

    # Validation checks
    users = transformed_data["users"]

    # Check 1: All users have required fields
    required_fields = ["user_id", "display_name", "email_domain", "score", "grade"]
    for user in users:
        for field in required_fields:
            if field not in user:
                validation_results["errors"].append(
                    f"User {user.get('user_id', 'unknown')} missing field: {field}"
                )

    # Check 2: Score ranges are valid
    for user in users:
        score = user.get("score", 0)
        if not (0 <= score <= 100):
            validation_results["errors"].append(
                f"User {user['user_id']} has invalid score: {score}"
            )

    # Check 3: Grade consistency
    for user in users:
        score = user.get("score", 0)
        grade = user.get("grade", "")
        expected_grade = "A" if score >= 90 else "B" if score >= 80 else "C" if score >= 70 else "F"
        if grade != expected_grade:
            validation_results["errors"].append(
                f"User {user['user_id']} grade mismatch: expected {expected_grade}, got {grade}"
            )

    # Check 4: Data quality warnings
    summary = transformed_data["summary"]
    if summary["average_score"] < 75:
        validation_results["warnings"].append("Average score is below 75")

    validation_results["validation_checks"] = {
        "required_fields": "passed" if not any("missing field" in error for error in validation_results["errors"]) else "failed",
        "score_ranges": "passed" if not any("invalid score" in error for error in validation_results["errors"]) else "failed",
        "grade_consistency": "passed" if not any("grade mismatch" in error for error in validation_results["errors"]) else "failed"
    }

    # Overall validation status
    validation_status = "passed" if not validation_results["errors"] else "failed"
    validation_results["status"] = validation_status

    context.set("validation_results", validation_results)
    context.set("validation_complete", True)
    context.set("data_valid", validation_status == "passed")

    print(f"Validation {validation_status}")
    if validation_results["errors"]:
        print(f"Errors found: {len(validation_results['errors'])}")
    if validation_results["warnings"]:
        print(f"Warnings: {len(validation_results['warnings'])}")

    return context

@cloaca.task(id="generate_report", dependencies=["validate_data"])
def generate_report(context):
    """Generate a final report combining all data."""
    print("Generating report...")

    # Gather all data from context
    raw_data = context.get("raw_data")
    transformed_data = context.get("transformed_data")
    validation_results = context.get("validation_results")

    # Create comprehensive report
    report = {
        "report_metadata": {
            "generated_at": datetime.now().isoformat(),
            "workflow_id": context.get("tutorial", "02"),
            "report_type": "user_data_processing"
        },
        "data_summary": {
            "source_info": {
                "source": raw_data["source"],
                "extracted_at": raw_data["extracted_at"],
                "total_records": len(raw_data["users"])
            },
            "processing_summary": transformed_data["summary"],
            "validation_summary": {
                "status": validation_results["status"],
                "checks_performed": len(validation_results["validation_checks"]),
                "errors": len(validation_results["errors"]),
                "warnings": len(validation_results["warnings"])
            }
        },
        "processed_users": transformed_data["users"],
        "quality_metrics": validation_results
    }

    context.set("final_report", report)
    context.set("report_complete", True)

    print(f"Report generated with {len(report['processed_users'])} user records")
    return context

# Create the workflow
def create_data_pipeline_workflow():
    """Build the complete data transformation pipeline."""
    builder = cloaca.WorkflowBuilder("data_pipeline")
    builder.description("Complete data extraction, transformation, validation, and reporting pipeline")

    # Add tasks in dependency order
    builder.add_task("extract_data")
    builder.add_task("transform_data")
    builder.add_task("validate_data")
    builder.add_task("generate_report")

    return builder.build()

# Register the workflow
cloaca.register_workflow_constructor("data_pipeline", create_data_pipeline_workflow)

# Main execution
if __name__ == "__main__":
    print("=== Context Handling Tutorial ===")

    # Create runner
    runner = cloaca.DefaultRunner("sqlite:///tutorial_02.db")

    # Create initial context with metadata
    context = cloaca.Context({
        "tutorial": "02",
        "pipeline_name": "user_data_processing",
        "started_by": "tutorial_user"
    })

    # Execute the workflow
    print("\nExecuting data pipeline workflow...")
    result = runner.execute("data_pipeline", context)

    # Display results
    print(f"\nWorkflow Status: {result.status}")

    if result.status == "Completed":
        print("Success! Data pipeline completed.")

        # Access specific results from context
        final_context = result.final_context
        report = final_context.get("final_report")

        if report:
            print("\n=== Pipeline Results ===")
            print(f"Total users processed: {report['data_summary']['source_info']['total_records']}")
            print(f"Average score: {report['data_summary']['processing_summary']['average_score']:.1f}")
            print(f"High performers: {report['data_summary']['processing_summary']['high_performers']}")
            print(f"Validation status: {report['data_summary']['validation_summary']['status']}")

            # Show grade distribution
            grade_dist = report['data_summary']['processing_summary']['grade_distribution']
            print(f"Grade distribution: A={grade_dist['A']}, B={grade_dist['B']}, C={grade_dist['C']}, F={grade_dist['F']}")

    else:
        print(f"Pipeline failed with status: {result.status}")

    # Cleanup
    print("\nCleaning up...")
    runner.shutdown()
    print("Tutorial completed!")

Understanding Context Patterns

1. Data Storage and Retrieval

# Store data
context.set("key", value)

# Retrieve data
value = context.get("key")

# Retrieve with default
value = context.get("key", default_value)

# Check if key exists
if "key" in context:
    value = context.get("key")

2. Complex Data Types

The context can store any JSON-serializable data:

# Lists and dictionaries
context.set("user_list", [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}])
context.set("config", {"timeout": 30, "retries": 3, "debug": True})

# Nested structures
context.set("result", {
    "status": "success",
    "data": {
        "users": [1, 2, 3],
        "metadata": {"processed_at": "2025-01-07T10:00:00Z"}
    }
})

3. Data Transformation Patterns

@cloaca.task(id="transform_example", dependencies=["source_task"])
def transform_example(context):
    # Get input data
    input_data = context.get("input_data", [])

    # Transform data
    transformed = [
        {"id": item["id"], "value": item["value"] * 2}
        for item in input_data
        if item["value"] > 10
    ]

    # Store result
    context.set("transformed_data", transformed)
    return context

Working with Context Data

Data Pipeline Visualization

graph TD
    A[extract_data<br/>Sets: raw_data] --> B[transform_data<br/>Gets: raw_data<br/>Sets: transformed_data]
    B --> C[validate_data<br/>Gets: transformed_data<br/>Sets: validation_results]
    C --> D[generate_report<br/>Gets: all data<br/>Sets: final_report]

    style A fill:#e6e6ff,stroke:#333,stroke-width:2px,color:#000
    style B fill:#ffe6e6,stroke:#333,stroke-width:2px,color:#000
    style C fill:#ffffe6,stroke:#333,stroke-width:2px,color:#000
    style D fill:#e6ffe6,stroke:#333,stroke-width:2px,color:#000

Running the Pipeline

Execute the complete example:

python context_tutorial.py

Expected output:

=== Context Handling Tutorial ===

Executing data pipeline workflow...
Extracting data...
Extracted 4 users
Transforming data...
Transformed 4 users
Average score: 87.5
Validating data...
Validation passed
Generating report...
Report generated with 4 user records

Workflow Status: Completed
Success! Data pipeline completed.

=== Pipeline Results ===
Total users processed: 4
Average score: 87.5
High performers: 3
Validation status: passed
Grade distribution: A=2, B=2, C=0, F=0

Cleaning up...
Tutorial completed!

Best Practices

Organize your context data consistently:

# Good: Structured data organization
context.set("user_data", {
    "raw": raw_users,
    "processed": processed_users,
    "metadata": {"count": len(raw_users)}
})

# Avoid: Flat, unclear structure
context.set("users", raw_users)
context.set("processed", processed_users)
context.set("count", len(raw_users))

Always validate context data:

@cloaca.task(id="safe_task")
def safe_task(context):
    # Validate input exists
    data = context.get("input_data")
    if not data:
        raise ValueError("Missing required input_data")

    # Validate data structure
    if not isinstance(data, list):
        raise TypeError("input_data must be a list")

    # Process safely
    result = process_data(data)
    context.set("result", result)
    return context

Use type hints for clarity:

from typing import Dict, List, Any

@cloaca.task(id="typed_task")
def typed_task(context):
    """Process user data with clear type expectations."""
    # Get with type hints in comments
    user_data: List[Dict[str, Any]] = context.get("users", [])
    config: Dict[str, Any] = context.get("config", {})

    # Process with type awareness
    processed: List[Dict[str, Any]] = [
        {"id": user["id"], "score": float(user["score"])}
        for user in user_data
    ]

    context.set("processed_users", processed)
    return context

Optimize context usage:

# Good: Store processed data, not intermediate steps
@cloaca.task(id="efficient_task")
def efficient_task(context):
    data = context.get("raw_data")

    # Process data
    result = expensive_processing(data)

    # Store only the final result
    context.set("final_result", result)
    return context

# Avoid: Storing every intermediate step
# context.set("step1", step1_result)
# context.set("step2", step2_result)
# context.set("step3", step3_result)

Exercises

Exercise 1: Add Data Aggregation

Add a new task that creates aggregated statistics:

@cloaca.task(id="aggregate_data", dependencies=["transform_data"])
def aggregate_data(context):
    """Create aggregated statistics from transformed data."""
    transformed_data = context.get("transformed_data")
    users = transformed_data["users"]

    # Calculate aggregations
    aggregations = {
        "score_stats": {
            "min": min(u["score"] for u in users),
            "max": max(u["score"] for u in users),
            "avg": sum(u["score"] for u in users) / len(users)
        },
        "domain_distribution": {},
        "performance_breakdown": {
            "high": len([u for u in users if u["performance"] == "high"]),
            "standard": len([u for u in users if u["performance"] == "standard"])
        }
    }

    # Count email domains
    for user in users:
        domain = user["email_domain"]
        aggregations["domain_distribution"][domain] = aggregations["domain_distribution"].get(domain, 0) + 1

    context.set("aggregations", aggregations)
    return context

Exercise 2: Implement Data Filtering

Create a task that filters data based on criteria:

@cloaca.task(id="filter_data", dependencies=["validate_data"])
def filter_data(context):
    """Filter data based on validation results and criteria."""
    transformed_data = context.get("transformed_data")
    validation_results = context.get("validation_results")

    # Only proceed if validation passed
    if not validation_results.get("status") == "passed":
        context.set("filtered_data", {"users": [], "filter_applied": False})
        return context

    users = transformed_data["users"]

    # Apply filters
    high_performers = [u for u in users if u["performance"] == "high"]
    top_grades = [u for u in users if u["grade"] in ["A", "B"]]

    filtered_result = {
        "high_performers": high_performers,
        "top_grades": top_grades,
        "filter_summary": {
            "total_users": len(users),
            "high_performers": len(high_performers),
            "top_grades": len(top_grades)
        },
        "filter_applied": True
    }

    context.set("filtered_data", filtered_result)
    return context

What’s Next?

Excellent work! You now understand how to manage complex data flows through Cloacina workflows. In the next tutorial, we’ll explore:

  • Error handling and recovery strategies
  • Task failure scenarios and debugging
  • Retry mechanisms and error propagation
  • Building resilient workflows
Continue to Tutorial 03
Reference Implementation
This tutorial is based on patterns from test_scenario_09_context_propagation.py and data processing examples from the Cloacina test suite.