04 - Complex Workflows
Welcome to the fourth tutorial in our Python Cloacina series! In this tutorial, you’ll learn how to create sophisticated workflows with advanced dependency patterns. We’ll explore diamond patterns, fan-out/fan-in architectures, multi-level chains, and complex mixed patterns that enable powerful parallel processing and conditional execution.
- Design diamond dependency patterns for fork-join operations
- Implement fan-out patterns for parallel task execution
- Create fan-in patterns for result aggregation
- Build multi-level dependency chains
- Combine patterns for complex workflow architectures
- Optimize workflow performance through smart dependency design
- Completion of Tutorial 3
- Understanding of dependency management
- Familiarity with parallel processing concepts
30-35 minutes
Complex workflows often require sophisticated task orchestration patterns beyond simple linear execution. Let’s explore the key patterns:
Fork-Join Pattern:
- One task branches into multiple parallel tasks
- Multiple tasks converge back to a single task
- Enables parallel processing with synchronization
A
/ \
B C
\ /
D
One-to-Many Distribution:
- Single task triggers multiple parallel tasks
- Enables parallel processing of independent operations
- Often used for batch processing
A
/||\
B C D E
Many-to-One Aggregation:
- Multiple independent tasks feed into one task
- Enables result aggregation and synchronization
- Common for collecting distributed results
A B C D
\||/
E
Sequential Processing:
- Deep dependency chains for complex processing
- Each level depends on the previous
- Enables step-by-step data transformation
A → B → C → D → E
Let’s implement each pattern with a practical example. We’ll build a comprehensive data processing system that demonstrates all these patterns.
import cloaca
import random
import time
from datetime import datetime
from typing import Dict, List, Any
# Diamond Pattern: Fork-Join Processing
@cloaca.task(id="prepare_dataset")
def prepare_dataset(context):
"""Prepare initial dataset for parallel processing."""
print("Preparing dataset...")
# Generate sample dataset
dataset = {
"customers": [
{"id": i, "name": f"Customer_{i}", "segment": random.choice(["premium", "standard", "basic"]), "value": random.randint(100, 10000)}
for i in range(1, 101)
],
"prepared_at": datetime.now().isoformat(),
"total_records": 100
}
context.set("dataset", dataset)
context.set("preparation_complete", True)
print(f"Dataset prepared with {len(dataset['customers'])} customers")
return context
# Fork: Split into parallel processing branches
@cloaca.task(id="analyze_premium_customers", dependencies=["prepare_dataset"])
def analyze_premium_customers(context):
"""Analyze premium customer segment (parallel branch 1)."""
print("Analyzing premium customers...")
dataset = context.get("dataset")
customers = dataset["customers"]
# Filter premium customers
premium_customers = [c for c in customers if c["segment"] == "premium"]
# Analyze premium segment
analysis = {
"segment": "premium",
"count": len(premium_customers),
"total_value": sum(c["value"] for c in premium_customers),
"average_value": sum(c["value"] for c in premium_customers) / len(premium_customers) if premium_customers else 0,
"top_customers": sorted(premium_customers, key=lambda x: x["value"], reverse=True)[:5]
}
context.set("premium_analysis", analysis)
print(f"Premium analysis complete: {analysis['count']} customers, avg value: ${analysis['average_value']:.2f}")
return context
@cloaca.task(id="analyze_standard_customers", dependencies=["prepare_dataset"])
def analyze_standard_customers(context):
"""Analyze standard customer segment (parallel branch 2)."""
print("Analyzing standard customers...")
dataset = context.get("dataset")
customers = dataset["customers"]
# Filter standard customers
standard_customers = [c for c in customers if c["segment"] == "standard"]
# Analyze standard segment
analysis = {
"segment": "standard",
"count": len(standard_customers),
"total_value": sum(c["value"] for c in standard_customers),
"average_value": sum(c["value"] for c in standard_customers) / len(standard_customers) if standard_customers else 0,
"distribution": {"low": 0, "medium": 0, "high": 0}
}
# Value distribution analysis
for customer in standard_customers:
if customer["value"] < 1000:
analysis["distribution"]["low"] += 1
elif customer["value"] < 5000:
analysis["distribution"]["medium"] += 1
else:
analysis["distribution"]["high"] += 1
context.set("standard_analysis", analysis)
print(f"Standard analysis complete: {analysis['count']} customers")
return context
# Join: Combine results from parallel branches
@cloaca.task(id="generate_customer_report", dependencies=["analyze_premium_customers", "analyze_standard_customers"])
def generate_customer_report(context):
"""Generate comprehensive customer report (join point)."""
print("Generating customer report...")
premium_analysis = context.get("premium_analysis")
standard_analysis = context.get("standard_analysis")
dataset = context.get("dataset")
# Combine analyses
report = {
"report_type": "customer_segmentation",
"generated_at": datetime.now().isoformat(),
"dataset_info": {
"total_customers": dataset["total_records"],
"prepared_at": dataset["prepared_at"]
},
"segment_analysis": {
"premium": premium_analysis,
"standard": standard_analysis
},
"summary": {
"total_premium_value": premium_analysis["total_value"],
"total_standard_value": standard_analysis["total_value"],
"premium_percentage": (premium_analysis["count"] / dataset["total_records"]) * 100,
"standard_percentage": (standard_analysis["count"] / dataset["total_records"]) * 100
}
}
context.set("customer_report", report)
print(f"Customer report generated - Premium: {premium_analysis['count']}, Standard: {standard_analysis['count']}")
return context
# Fan-Out Pattern: Distribute processing across multiple parallel tasks
@cloaca.task(id="distribute_workload")
def distribute_workload(context):
"""Distribute workload for parallel processing."""
print("Distributing workload...")
dataset = context.get("dataset")
customers = dataset["customers"]
# Split customers into 4 processing batches
batch_size = len(customers) // 4
batches = {
"batch_1": customers[0:batch_size],
"batch_2": customers[batch_size:batch_size*2],
"batch_3": customers[batch_size*2:batch_size*3],
"batch_4": customers[batch_size*3:]
}
context.set("processing_batches", batches)
context.set("workload_distributed", True)
print(f"Workload distributed into 4 batches: {[len(batch) for batch in batches.values()]}")
return context
# Parallel processing tasks
@cloaca.task(id="process_batch_1", dependencies=["distribute_workload"])
def process_batch_1(context):
"""Process batch 1 in parallel."""
print("Processing batch 1...")
batches = context.get("processing_batches")
batch = batches["batch_1"]
# Simulate processing work
processed_customers = []
for customer in batch:
processed_customer = {
**customer,
"processed": True,
"score": customer["value"] * 0.1 + random.randint(1, 10),
"batch": 1
}
processed_customers.append(processed_customer)
result = {
"batch_id": 1,
"processed_count": len(processed_customers),
"customers": processed_customers,
"batch_total_value": sum(c["value"] for c in processed_customers),
"processing_time": datetime.now().isoformat()
}
context.set("batch_1_result", result)
print(f"Batch 1 complete: {len(processed_customers)} customers processed")
return context
@cloaca.task(id="process_batch_2", dependencies=["distribute_workload"])
def process_batch_2(context):
"""Process batch 2 in parallel."""
print("Processing batch 2...")
batches = context.get("processing_batches")
batch = batches["batch_2"]
processed_customers = []
for customer in batch:
processed_customer = {
**customer,
"processed": True,
"score": customer["value"] * 0.12 + random.randint(1, 10),
"batch": 2
}
processed_customers.append(processed_customer)
result = {
"batch_id": 2,
"processed_count": len(processed_customers),
"customers": processed_customers,
"batch_total_value": sum(c["value"] for c in processed_customers),
"processing_time": datetime.now().isoformat()
}
context.set("batch_2_result", result)
print(f"Batch 2 complete: {len(processed_customers)} customers processed")
return context
@cloaca.task(id="process_batch_3", dependencies=["distribute_workload"])
def process_batch_3(context):
"""Process batch 3 in parallel."""
print("Processing batch 3...")
batches = context.get("processing_batches")
batch = batches["batch_3"]
processed_customers = []
for customer in batch:
processed_customer = {
**customer,
"processed": True,
"score": customer["value"] * 0.08 + random.randint(1, 10),
"batch": 3
}
processed_customers.append(processed_customer)
result = {
"batch_id": 3,
"processed_count": len(processed_customers),
"customers": processed_customers,
"batch_total_value": sum(c["value"] for c in processed_customers),
"processing_time": datetime.now().isoformat()
}
context.set("batch_3_result", result)
print(f"Batch 3 complete: {len(processed_customers)} customers processed")
return context
@cloaca.task(id="process_batch_4", dependencies=["distribute_workload"])
def process_batch_4(context):
"""Process batch 4 in parallel."""
print("Processing batch 4...")
batches = context.get("processing_batches")
batch = batches["batch_4"]
processed_customers = []
for customer in batch:
processed_customer = {
**customer,
"processed": True,
"score": customer["value"] * 0.09 + random.randint(1, 10),
"batch": 4
}
processed_customers.append(processed_customer)
result = {
"batch_id": 4,
"processed_count": len(processed_customers),
"customers": processed_customers,
"batch_total_value": sum(c["value"] for c in processed_customers),
"processing_time": datetime.now().isoformat()
}
context.set("batch_4_result", result)
print(f"Batch 4 complete: {len(processed_customers)} customers processed")
return context
# Fan-In Pattern: Aggregate results from parallel processing
@cloaca.task(id="aggregate_batch_results", dependencies=["process_batch_1", "process_batch_2", "process_batch_3", "process_batch_4"])
def aggregate_batch_results(context):
"""Aggregate results from all parallel batch processing (fan-in)."""
print("Aggregating batch results...")
# Collect all batch results
batch_results = [
context.get("batch_1_result"),
context.get("batch_2_result"),
context.get("batch_3_result"),
context.get("batch_4_result")
]
# Aggregate all processed customers
all_processed_customers = []
total_customers_processed = 0
total_value_processed = 0
batch_statistics = []
for batch_result in batch_results:
all_processed_customers.extend(batch_result["customers"])
total_customers_processed += batch_result["processed_count"]
total_value_processed += batch_result["batch_total_value"]
batch_statistics.append({
"batch_id": batch_result["batch_id"],
"count": batch_result["processed_count"],
"value": batch_result["batch_total_value"],
"avg_value": batch_result["batch_total_value"] / batch_result["processed_count"] if batch_result["processed_count"] > 0 else 0
})
# Create aggregated result
aggregated_result = {
"aggregation_type": "batch_processing",
"total_customers_processed": total_customers_processed,
"total_value_processed": total_value_processed,
"average_customer_value": total_value_processed / total_customers_processed if total_customers_processed > 0 else 0,
"batch_statistics": batch_statistics,
"processed_customers": all_processed_customers,
"aggregated_at": datetime.now().isoformat()
}
context.set("aggregated_results", aggregated_result)
print(f"Aggregation complete: {total_customers_processed} customers, total value: ${total_value_processed:,.2f}")
return context
# Multi-Level Chain: Sequential processing with deep dependencies
@cloaca.task(id="level_1_basic_analysis", dependencies=["aggregate_batch_results"])
def level_1_basic_analysis(context):
"""Level 1: Basic statistical analysis."""
print("Level 1: Basic analysis...")
aggregated = context.get("aggregated_results")
customers = aggregated["processed_customers"]
# Basic statistics
values = [c["value"] for c in customers]
scores = [c["score"] for c in customers]
basic_stats = {
"level": 1,
"customer_count": len(customers),
"value_stats": {
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"total": sum(values)
},
"score_stats": {
"min": min(scores),
"max": max(scores),
"avg": sum(scores) / len(scores)
}
}
context.set("level_1_analysis", basic_stats)
print(f"Level 1 complete: {basic_stats['customer_count']} customers analyzed")
return context
@cloaca.task(id="level_2_segment_analysis", dependencies=["level_1_basic_analysis"])
def level_2_segment_analysis(context):
"""Level 2: Segment-based analysis."""
print("Level 2: Segment analysis...")
aggregated = context.get("aggregated_results")
customers = aggregated["processed_customers"]
# Group by segment
segments = {}
for customer in customers:
segment = customer["segment"]
if segment not in segments:
segments[segment] = []
segments[segment].append(customer)
# Analyze each segment
segment_analysis = {}
for segment_name, segment_customers in segments.items():
values = [c["value"] for c in segment_customers]
scores = [c["score"] for c in segment_customers]
segment_analysis[segment_name] = {
"count": len(segment_customers),
"total_value": sum(values),
"avg_value": sum(values) / len(values),
"avg_score": sum(scores) / len(scores),
"top_customer": max(segment_customers, key=lambda x: x["value"])
}
level_2_result = {
"level": 2,
"segments": segment_analysis,
"segment_count": len(segments)
}
context.set("level_2_analysis", level_2_result)
print(f"Level 2 complete: {len(segments)} segments analyzed")
return context
@cloaca.task(id="level_3_advanced_metrics", dependencies=["level_2_segment_analysis"])
def level_3_advanced_metrics(context):
"""Level 3: Advanced metrics and correlations."""
print("Level 3: Advanced metrics...")
aggregated = context.get("aggregated_results")
level_2 = context.get("level_2_analysis")
customers = aggregated["processed_customers"]
# Advanced calculations
values = [c["value"] for c in customers]
scores = [c["score"] for c in customers]
# Calculate percentiles
sorted_values = sorted(values)
n = len(sorted_values)
advanced_metrics = {
"level": 3,
"percentiles": {
"p25": sorted_values[n//4],
"p50": sorted_values[n//2],
"p75": sorted_values[3*n//4],
"p90": sorted_values[9*n//10],
"p95": sorted_values[19*n//20]
},
"segment_comparison": {},
"outlier_analysis": {
"high_value_customers": [c for c in customers if c["value"] > sorted_values[19*n//20]],
"low_value_customers": [c for c in customers if c["value"] < sorted_values[n//20]]
}
}
# Compare segments
for segment_name, segment_data in level_2["segments"].items():
advanced_metrics["segment_comparison"][segment_name] = {
"value_efficiency": segment_data["avg_score"] / segment_data["avg_value"] if segment_data["avg_value"] > 0 else 0,
"relative_performance": segment_data["avg_value"] / (sum(values) / len(values))
}
context.set("level_3_analysis", advanced_metrics)
print(f"Level 3 complete: Advanced metrics calculated")
return context
@cloaca.task(id="level_4_predictive_insights", dependencies=["level_3_advanced_metrics"])
def level_4_predictive_insights(context):
"""Level 4: Predictive insights and recommendations."""
print("Level 4: Predictive insights...")
level_1 = context.get("level_1_analysis")
level_2 = context.get("level_2_analysis")
level_3 = context.get("level_3_analysis")
# Generate insights based on analysis
insights = {
"level": 4,
"key_insights": [],
"recommendations": [],
"risk_factors": [],
"opportunities": []
}
# Analyze trends and generate insights
avg_value = level_1["value_stats"]["avg"]
for segment_name, segment_data in level_2["segments"].items():
segment_avg = segment_data["avg_value"]
if segment_avg > avg_value * 1.5:
insights["key_insights"].append(f"{segment_name} segment shows high value potential (${segment_avg:.2f} vs ${avg_value:.2f} average)")
insights["recommendations"].append(f"Focus retention efforts on {segment_name} customers")
if segment_data["count"] < 10:
insights["risk_factors"].append(f"{segment_name} segment has low customer count ({segment_data['count']})")
efficiency = level_3["segment_comparison"][segment_name]["value_efficiency"]
if efficiency > 0.02:
insights["opportunities"].append(f"{segment_name} shows high score-to-value efficiency ({efficiency:.4f})")
# Overall recommendations
p95_value = level_3["percentiles"]["p95"]
high_value_count = len(level_3["outlier_analysis"]["high_value_customers"])
insights["recommendations"].extend([
f"Target customers with values above ${p95_value} for premium services",
f"Investigate {high_value_count} high-value customers for expansion opportunities"
])
context.set("level_4_analysis", insights)
print(f"Level 4 complete: {len(insights['key_insights'])} insights generated")
return context
@cloaca.task(id="final_comprehensive_report", dependencies=["level_4_predictive_insights"])
def final_comprehensive_report(context):
"""Level 5: Final comprehensive report combining all analyses."""
print("Generating final comprehensive report...")
# Collect all analysis levels
customer_report = context.get("customer_report")
aggregated_results = context.get("aggregated_results")
level_1 = context.get("level_1_analysis")
level_2 = context.get("level_2_analysis")
level_3 = context.get("level_3_analysis")
level_4 = context.get("level_4_analysis")
# Create comprehensive report
comprehensive_report = {
"report_title": "Comprehensive Customer Analysis Report",
"generated_at": datetime.now().isoformat(),
"workflow_summary": {
"total_tasks_executed": 15,
"patterns_demonstrated": ["diamond", "fan-out", "fan-in", "multi-level-chain"],
"parallel_processing_batches": 4
},
"executive_summary": {
"total_customers_analyzed": level_1["customer_count"],
"total_customer_value": f"${level_1['value_stats']['total']:,.2f}",
"average_customer_value": f"${level_1['value_stats']['avg']:,.2f}",
"segments_identified": level_2["segment_count"],
"key_insights_count": len(level_4["key_insights"]),
"recommendations_count": len(level_4["recommendations"])
},
"detailed_analysis": {
"diamond_pattern_results": customer_report,
"fanout_processing_results": aggregated_results,
"sequential_analysis": {
"level_1_basic": level_1,
"level_2_segments": level_2,
"level_3_advanced": level_3,
"level_4_insights": level_4
}
},
"actionable_recommendations": level_4["recommendations"],
"risk_assessment": level_4["risk_factors"],
"growth_opportunities": level_4["opportunities"]
}
context.set("comprehensive_report", comprehensive_report)
print("Final comprehensive report generated!")
print(f"Analysis complete: {comprehensive_report['executive_summary']['total_customers_analyzed']} customers")
print(f"Total value: {comprehensive_report['executive_summary']['total_customer_value']}")
print(f"Insights generated: {comprehensive_report['executive_summary']['key_insights_count']}")
return context
Now let’s assemble all these patterns into a comprehensive workflow:
def create_complex_workflow():
"""Build the complex workflow with multiple dependency patterns."""
builder = cloaca.WorkflowBuilder("complex_customer_analysis")
builder.description("Complex workflow demonstrating diamond, fan-out, fan-in, and multi-level patterns")
# Diamond pattern tasks
builder.add_task("prepare_dataset")
builder.add_task("analyze_premium_customers")
builder.add_task("analyze_standard_customers")
builder.add_task("generate_customer_report")
# Fan-out pattern tasks
builder.add_task("distribute_workload")
builder.add_task("process_batch_1")
builder.add_task("process_batch_2")
builder.add_task("process_batch_3")
builder.add_task("process_batch_4")
# Fan-in pattern task
builder.add_task("aggregate_batch_results")
# Multi-level chain tasks
builder.add_task("level_1_basic_analysis")
builder.add_task("level_2_segment_analysis")
builder.add_task("level_3_advanced_metrics")
builder.add_task("level_4_predictive_insights")
builder.add_task("final_comprehensive_report")
return builder.build()
# Register the workflow
cloaca.register_workflow_constructor("complex_customer_analysis", create_complex_workflow)
# Main execution
if __name__ == "__main__":
print("=== Complex Workflows Tutorial ===")
# Create runner
runner = cloaca.DefaultRunner("sqlite:///tutorial_04.db")
# Create initial context
context = cloaca.Context({
"tutorial": "04",
"analysis_type": "comprehensive_customer_analysis",
"request_id": f"req_{int(time.time())}"
})
# Execute the complex workflow
print("\nExecuting complex workflow...")
print("This will demonstrate:")
print("- Diamond pattern (fork-join)")
print("- Fan-out pattern (parallel batch processing)")
print("- Fan-in pattern (result aggregation)")
print("- Multi-level chain (sequential analysis)")
start_time = time.time()
result = runner.execute("complex_customer_analysis", context)
execution_time = time.time() - start_time
# Display results
print(f"\nWorkflow Status: {result.status}")
print(f"Execution Time: {execution_time:.2f} seconds")
if result.status == "Completed":
print("Success! Complex workflow completed.")
# Show comprehensive results
final_context = result.final_context
comprehensive_report = final_context.get("comprehensive_report")
if comprehensive_report:
print("\n=== Workflow Results ===")
summary = comprehensive_report["executive_summary"]
print(f"Customers Analyzed: {summary['total_customers_analyzed']}")
print(f"Total Customer Value: {summary['total_customer_value']}")
print(f"Average Customer Value: {summary['average_customer_value']}")
print(f"Segments Identified: {summary['segments_identified']}")
print(f"\n=== Insights Generated ===")
print(f"Key Insights: {summary['key_insights_count']}")
print(f"Recommendations: {summary['recommendations_count']}")
# Show some specific insights
if comprehensive_report["actionable_recommendations"]:
print("\nTop Recommendations:")
for i, rec in enumerate(comprehensive_report["actionable_recommendations"][:3], 1):
print(f" {i}. {rec}")
else:
print(f"Workflow failed with status: {result.status}")
# Cleanup
print("\nCleaning up...")
runner.shutdown()
print("Tutorial completed!")
graph TD A[prepare_dataset] --> B[analyze_premium_customers] A --> C[analyze_standard_customers] A --> D[distribute_workload] B --> E[generate_customer_report] C --> E D --> F[process_batch_1] D --> G[process_batch_2] D --> H[process_batch_3] D --> I[process_batch_4] F --> J[aggregate_batch_results] G --> J H --> J I --> J J --> K[level_1_basic_analysis] K --> L[level_2_segment_analysis] L --> M[level_3_advanced_metrics] M --> N[level_4_predictive_insights] N --> O[final_comprehensive_report] style A fill:#e6e6ff,stroke:#333,stroke-width:2px,color:#000 style E fill:#e6ffe6,stroke:#333,stroke-width:2px,color:#000 style J fill:#ffe6e6,stroke:#333,stroke-width:2px,color:#000 style O fill:#ffffe6,stroke:#333,stroke-width:2px,color:#000 subgraph "Diamond Pattern" B C end subgraph "Fan-Out Pattern" F G H I end subgraph "Multi-Level Chain" K L M N end
The diagram shows how all patterns work together:
- Diamond: Premium/standard analysis branches and rejoins
- Fan-Out: Workload distribution to 4 parallel batch processors
- Fan-In: Batch results aggregation
- Chain: Sequential analysis levels building on each other
Execute the complete example:
python complex_workflows.py
Expected output showing the progression through each pattern:
=== Complex Workflows Tutorial ===
Executing complex workflow...
This will demonstrate:
- Diamond pattern (fork-join)
- Fan-out pattern (parallel batch processing)
- Fan-in pattern (result aggregation)
- Multi-level chain (sequential analysis)
Preparing dataset...
Dataset prepared with 100 customers
Analyzing premium customers...
Analyzing standard customers...
Distributing workload...
Processing batch 1...
Processing batch 2...
Processing batch 3...
Processing batch 4...
...
Workflow Status: Completed
Execution Time: 2.34 seconds
=== Workflow Results ===
Customers Analyzed: 100
Total Customer Value: $487,423.00
Average Customer Value: $4,874.23
Segments Identified: 3
=== Insights Generated ===
Key Insights: 4
Recommendations: 6
Top Recommendations:
1. Focus retention efforts on premium customers
2. Target customers with values above $9,456 for premium services
3. Investigate 5 high-value customers for expansion opportunities
Choose the right pattern for your use case:
- Diamond: When you need parallel processing with a sync point
- Fan-Out: For distributing independent work across parallel tasks
- Fan-In: When collecting and combining results from multiple sources
- Chains: For sequential processing where each step builds on the previous
Optimize for parallel execution:
# Good: Independent parallel tasks
@cloaca.task(id="task_a", dependencies=["common_input"])
def task_a(context):
# Independent processing
pass
@cloaca.task(id="task_b", dependencies=["common_input"])
def task_b(context):
# Independent processing
pass
# Avoid: Unnecessary sequential dependencies
@cloaca.task(id="task_b", dependencies=["task_a"]) # Unnecessary if B doesn't need A's output
def task_b(context):
pass
Manage context data efficiently in complex workflows:
@cloaca.task(id="data_manager")
def data_manager(context):
# Clear intermediate data to save memory
context.delete("large_intermediate_data")
# Use structured data organization
context.set("results", {
"processing_stage": "batch_complete",
"metadata": {"processed_at": datetime.now().isoformat()},
"summary": summary_data # Keep only what's needed
})
return context
Handle failures in complex patterns:
@cloaca.task(id="robust_aggregator", dependencies=["batch_1", "batch_2", "batch_3"])
def robust_aggregator(context):
"""Aggregate results with partial failure handling."""
results = []
failed_batches = []
for batch_id in ["batch_1", "batch_2", "batch_3"]:
batch_result = context.get(f"{batch_id}_result")
if batch_result:
results.append(batch_result)
else:
failed_batches.append(batch_id)
if not results:
raise ValueError("All batch processing failed")
if failed_batches:
print(f"Warning: {len(failed_batches)} batches failed: {failed_batches}")
# Process available results
aggregated = aggregate_partial_results(results)
context.set("aggregated_results", aggregated)
context.set("partial_failure", len(failed_batches) > 0)
return context
Extend the diamond pattern with conditional logic:
@cloaca.task(id="conditional_processor", dependencies=["prepare_dataset"])
def conditional_processor(context):
"""Add conditional processing based on data characteristics."""
dataset = context.get("dataset")
total_value = sum(c["value"] for c in dataset["customers"])
# Set processing strategy based on data
if total_value > 500000:
context.set("processing_strategy", "high_value")
context.set("use_advanced_analytics", True)
elif total_value > 200000:
context.set("processing_strategy", "medium_value")
context.set("use_advanced_analytics", False)
else:
context.set("processing_strategy", "basic")
context.set("use_advanced_analytics", False)
return context
Create a fan-out pattern that adjusts the number of parallel tasks based on data size:
@cloaca.task(id="dynamic_distribution")
def dynamic_distribution(context):
"""Dynamically determine number of processing batches."""
dataset = context.get("dataset")
customer_count = len(dataset["customers"])
# Determine optimal batch count
if customer_count > 1000:
batch_count = 8
elif customer_count > 500:
batch_count = 4
else:
batch_count = 2
# Create dynamic batches
batch_size = customer_count // batch_count
batches = {}
for i in range(batch_count):
start_idx = i * batch_size
end_idx = start_idx + batch_size if i < batch_count - 1 else customer_count
batches[f"batch_{i+1}"] = dataset["customers"][start_idx:end_idx]
context.set("dynamic_batches", batches)
context.set("batch_count", batch_count)
return context
Excellent work! You now understand how to build sophisticated workflows with complex dependency patterns. In the next tutorial, we’ll explore:
- Multi-tenant workflow deployment
- Schema-based data isolation
- Tenant-specific configuration management
- Scaling workflows across multiple tenants
- Examples: Parallel Processing - More parallel workflow patterns
- How-to: Performance Optimization - Workflow optimization techniques
- API Reference: WorkflowBuilder - Advanced workflow configuration
Reference ImplementationThis tutorial demonstrates patterns fromtest_scenario_13_complex_dependency_chains.py
and parallel processing concepts from the Cloacina test suite.