Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

PipelineResult

PipelineResult

The PipelineResult class contains the outcome and metadata from a workflow execution. It provides information about execution status, timing, errors, and the final context state.

Properties

Basic Properties

  • status (str): The final execution status (e.g., "Completed", "Failed")
  • workflow_name (str): Name of the executed workflow
  • execution_id (str): Unique identifier for this execution
  • final_context (Context): The context after all tasks completed
  • start_time (datetime): When execution began
  • end_time (datetime): When execution finished
  • duration (timedelta): Total execution time

Status Information

import cloaca

# Execute workflow
runner = cloaca.DefaultRunner("sqlite:///:memory:")
result = runner.execute("my_workflow", context)

# Check execution status
print(f"Status: {result.status}")
print(f"Workflow: {result.workflow_name}")
print(f"Execution ID: {result.execution_id}")
print(f"Duration: {result.duration}")

Status Values

The status property is a string indicating the outcome of workflow execution:

  • "Completed": All tasks completed successfully
  • "Failed": One or more tasks failed

Status Checking

if result.status == "Completed":
    print("Workflow completed successfully!")
    # Process successful result
    final_data = result.final_context.get("output_data")

elif result.status == "Failed":
    print("Workflow failed!")
    # Handle failure
    error_info = result.final_context.get("error")

Context Access

Access the final context state after execution:

# Get final context data
final_context = result.final_context

# Extract specific results
if result.status == "Completed":
    output_data = final_context.get("processed_data")
    record_count = final_context.get("records_processed", 0)

    print(f"Processed {record_count} records")
    print(f"Output: {output_data}")

Error Information

When workflows fail, error information is available:

if result.status == "Failed":
    # Check for error information in context
    error_message = result.final_context.get("error_message")
    failed_task = result.final_context.get("failed_task")

    if error_message:
        print(f"Error: {error_message}")
    if failed_task:
        print(f"Failed task: {failed_task}")

Timing Information

Analyze execution performance:

# Execution timing
print(f"Started: {result.start_time}")
print(f"Finished: {result.end_time}")
print(f"Duration: {result.duration}")

# Calculate performance metrics
if result.duration:
    seconds = result.duration.total_seconds()
    print(f"Execution took {seconds:.2f} seconds")

    if seconds > 300:  # 5 minutes
        print("Long-running execution detected")

Task-Level Results

Access individual task results (if available):

# Get task execution details
task_results = result.final_context.get("task_results", {})

for task_id, task_result in task_results.items():
    print(f"Task {task_id}:")
    print(f"  Status: {task_result.get('status')}")
    print(f"  Duration: {task_result.get('duration')}")

    if task_result.get('error'):
        print(f"  Error: {task_result['error']}")

Complete Example

import cloaca
from datetime import datetime

@cloaca.task(id="process_data")
def process_data(context):
    """Example task that processes data."""
    input_data = context.get("input_data", [])

    # Simulate processing
    processed = [x * 2 for x in input_data]

    context.set("processed_data", processed)
    context.set("records_processed", len(processed))
    context.set("processing_complete", True)

    return context

# Create and execute workflow
def create_workflow():
    builder = cloaca.WorkflowBuilder("data_processing")
    builder.description("Process input data")
    builder.add_task("process_data")
    return builder.build()

# Execute and analyze result
runner = cloaca.DefaultRunner("sqlite:///:memory:")
cloaca.register_workflow_constructor("data_processing", create_workflow)

input_context = cloaca.Context({"input_data": [1, 2, 3, 4, 5]})
result = runner.execute("data_processing", input_context)

# Comprehensive result analysis
def analyze_result(result):
    """Analyze workflow execution result."""
    print("=== Workflow Execution Result ===")
    print(f"Workflow: {result.workflow_name}")
    print(f"Status: {result.status}")
    print(f"Execution ID: {result.execution_id}")

    if result.start_time and result.end_time:
        duration = result.end_time - result.start_time
        print(f"Duration: {duration.total_seconds():.2f} seconds")

    if result.status == "Completed":
        print("\n=== Successful Execution ===")
        records = result.final_context.get("records_processed", 0)
        processed_data = result.final_context.get("processed_data")

        print(f"Records processed: {records}")
        print(f"Output data: {processed_data}")

    elif result.status == "Failed":
        print("\n=== Failed Execution ===")
        error = result.final_context.get("error_message")
        if error:
            print(f"Error: {error}")

    return result.status == "Completed"

# Analyze the result
success = analyze_result(result)
print(f"\nExecution successful: {success}")

Best Practices

Result Validation

Always check the execution status before processing results:

def process_workflow_result(result):
    """Safely process workflow result."""
    if result.status != "Completed":
        raise RuntimeError(f"Workflow failed with status: {result.status}")

    # Safe to process successful result
    return result.final_context.get("output_data")

Error Handling

Implement comprehensive error handling:

def handle_workflow_result(result):
    """Handle workflow result with proper error handling."""
    try:
        if result.status == "Completed":
            return process_successful_result(result)
        elif result.status == "Failed":
            return handle_failed_result(result)
        else:
            raise ValueError(f"Unexpected status: {result.status}")

    except Exception as e:
        print(f"Error handling result: {e}")
        return None

Performance Monitoring

Track execution performance:

def monitor_performance(result):
    """Monitor workflow performance."""
    if result.duration:
        seconds = result.duration.total_seconds()

        # Performance thresholds
        if seconds > 600:  # 10 minutes
            print(f"WARNING: Slow execution ({seconds:.1f}s)")
        elif seconds < 1:
            print(f"Very fast execution ({seconds:.3f}s)")
        else:
            print(f"Normal execution time ({seconds:.1f}s)")

See Also