PipelineResult
The PipelineResult class contains the outcome and metadata from a workflow execution. It provides information about execution status, timing, errors, and the final context state.
status(str): The final execution status (e.g.,"Completed","Failed")workflow_name(str): Name of the executed workflowexecution_id(str): Unique identifier for this executionfinal_context(Context): The context after all tasks completedstart_time(datetime): When execution beganend_time(datetime): When execution finishedduration(timedelta): Total execution time
import cloaca
# Execute workflow
runner = cloaca.DefaultRunner("sqlite:///:memory:")
result = runner.execute("my_workflow", context)
# Check execution status
print(f"Status: {result.status}")
print(f"Workflow: {result.workflow_name}")
print(f"Execution ID: {result.execution_id}")
print(f"Duration: {result.duration}")
The status property is a string indicating the outcome of workflow execution:
"Completed": All tasks completed successfully"Failed": One or more tasks failed
if result.status == "Completed":
print("Workflow completed successfully!")
# Process successful result
final_data = result.final_context.get("output_data")
elif result.status == "Failed":
print("Workflow failed!")
# Handle failure
error_info = result.final_context.get("error")
Access the final context state after execution:
# Get final context data
final_context = result.final_context
# Extract specific results
if result.status == "Completed":
output_data = final_context.get("processed_data")
record_count = final_context.get("records_processed", 0)
print(f"Processed {record_count} records")
print(f"Output: {output_data}")
When workflows fail, error information is available:
if result.status == "Failed":
# Check for error information in context
error_message = result.final_context.get("error_message")
failed_task = result.final_context.get("failed_task")
if error_message:
print(f"Error: {error_message}")
if failed_task:
print(f"Failed task: {failed_task}")
Analyze execution performance:
# Execution timing
print(f"Started: {result.start_time}")
print(f"Finished: {result.end_time}")
print(f"Duration: {result.duration}")
# Calculate performance metrics
if result.duration:
seconds = result.duration.total_seconds()
print(f"Execution took {seconds:.2f} seconds")
if seconds > 300: # 5 minutes
print("Long-running execution detected")
Access individual task results (if available):
# Get task execution details
task_results = result.final_context.get("task_results", {})
for task_id, task_result in task_results.items():
print(f"Task {task_id}:")
print(f" Status: {task_result.get('status')}")
print(f" Duration: {task_result.get('duration')}")
if task_result.get('error'):
print(f" Error: {task_result['error']}")
import cloaca
from datetime import datetime
@cloaca.task(id="process_data")
def process_data(context):
"""Example task that processes data."""
input_data = context.get("input_data", [])
# Simulate processing
processed = [x * 2 for x in input_data]
context.set("processed_data", processed)
context.set("records_processed", len(processed))
context.set("processing_complete", True)
return context
# Create and execute workflow
def create_workflow():
builder = cloaca.WorkflowBuilder("data_processing")
builder.description("Process input data")
builder.add_task("process_data")
return builder.build()
# Execute and analyze result
runner = cloaca.DefaultRunner("sqlite:///:memory:")
cloaca.register_workflow_constructor("data_processing", create_workflow)
input_context = cloaca.Context({"input_data": [1, 2, 3, 4, 5]})
result = runner.execute("data_processing", input_context)
# Comprehensive result analysis
def analyze_result(result):
"""Analyze workflow execution result."""
print("=== Workflow Execution Result ===")
print(f"Workflow: {result.workflow_name}")
print(f"Status: {result.status}")
print(f"Execution ID: {result.execution_id}")
if result.start_time and result.end_time:
duration = result.end_time - result.start_time
print(f"Duration: {duration.total_seconds():.2f} seconds")
if result.status == "Completed":
print("\n=== Successful Execution ===")
records = result.final_context.get("records_processed", 0)
processed_data = result.final_context.get("processed_data")
print(f"Records processed: {records}")
print(f"Output data: {processed_data}")
elif result.status == "Failed":
print("\n=== Failed Execution ===")
error = result.final_context.get("error_message")
if error:
print(f"Error: {error}")
return result.status == "Completed"
# Analyze the result
success = analyze_result(result)
print(f"\nExecution successful: {success}")
Always check the execution status before processing results:
def process_workflow_result(result):
"""Safely process workflow result."""
if result.status != "Completed":
raise RuntimeError(f"Workflow failed with status: {result.status}")
# Safe to process successful result
return result.final_context.get("output_data")
Implement comprehensive error handling:
def handle_workflow_result(result):
"""Handle workflow result with proper error handling."""
try:
if result.status == "Completed":
return process_successful_result(result)
elif result.status == "Failed":
return handle_failed_result(result)
else:
raise ValueError(f"Unexpected status: {result.status}")
except Exception as e:
print(f"Error handling result: {e}")
return None
Track execution performance:
def monitor_performance(result):
"""Monitor workflow performance."""
if result.duration:
seconds = result.duration.total_seconds()
# Performance thresholds
if seconds > 600: # 10 minutes
print(f"WARNING: Slow execution ({seconds:.1f}s)")
elif seconds < 1:
print(f"Very fast execution ({seconds:.3f}s)")
else:
print(f"Normal execution time ({seconds:.1f}s)")
- DefaultRunner - Execute workflows and get results
- Context - Access final context data
- Workflow - Workflows that produce results