Exceptions
Cloaca provides a hierarchy of exception classes for handling different types of errors that can occur during workflow definition, execution, and management.
CloacaException (base)
├── WorkflowError
│ ├── WorkflowExecutionError
│ └── WorkflowTimeoutError
├── TaskError
│ ├── TaskValidationError
│ ├── TaskExecutionError
│ └── TaskTimeoutError
├── ContextError
├── ConfigurationError
└── DatabaseError
├── ConnectionError
└── MigrationError
Base exception class for all Cloaca-related errors.
import cloaca
try:
# Cloaca operation
result = runner.execute("workflow", context)
except cloaca.CloacaException as e:
print(f"Cloaca error: {e}")
Base class for workflow-related errors.
try:
workflow = builder.build()
except cloaca.WorkflowError as e:
print(f"Workflow error: {e}")
Raised when workflow execution fails unexpectedly.
try:
result = runner.execute("my_workflow", context)
except cloaca.WorkflowExecutionError as e:
print(f"Execution failed: {e}")
print(f"Workflow: {e.workflow_name}")
print(f"Execution ID: {e.execution_id}")
Raised when workflow execution exceeds the timeout limit.
import cloaca
# Configure with timeout
config = cloaca.DefaultRunnerConfig(task_timeout_seconds=30)
runner = cloaca.DefaultRunner("sqlite:///:memory:", config)
try:
result = runner.execute("long_workflow", context)
except cloaca.WorkflowTimeoutError as e:
print(f"Workflow timed out after {e.timeout_seconds} seconds")
print(f"Partial result available: {e.partial_result}")
Base class for task-related errors.
@cloaca.task(id="risky_task")
def risky_task(context):
try:
# Risky operation
result = perform_operation()
context.set("result", result)
except Exception as e:
# Convert to TaskError
raise cloaca.TaskError(f"Task failed: {e}") from e
return context
Raised when task definition is invalid.
try:
@cloaca.task(id="") # Empty ID
def invalid_task(context):
return context
except cloaca.TaskValidationError as e:
print(f"Invalid task definition: {e}")
Raised when task execution fails.
@cloaca.task(id="failing_task")
def failing_task(context):
try:
# Operation that might fail
result = risky_operation()
context.set("result", result)
except Exception as e:
# Wrap in TaskExecutionError with context
raise cloaca.TaskExecutionError(
f"Task execution failed: {e}",
task_id="failing_task",
context=context
) from e
return context
Raised when individual task execution times out.
@cloaca.task(id="slow_task", timeout_seconds=60)
def slow_task(context):
# This will raise TaskTimeoutError if it takes > 60 seconds
time.sleep(120) # Simulates long operation
return context
try:
result = runner.execute("workflow_with_slow_task", context)
except cloaca.TaskTimeoutError as e:
print(f"Task {e.task_id} timed out after {e.timeout_seconds} seconds")
Raised for context-related errors.
try:
# Try to get non-existent required data
value = context.get_required("missing_key")
except cloaca.ContextError as e:
print(f"Context error: {e}")
# Handle missing required data
Raised for invalid configuration.
try:
config = cloaca.DefaultRunnerConfig(
max_concurrent_workflows=-5 # Invalid negative value
)
except cloaca.ConfigurationError as e:
print(f"Configuration error: {e}")
Base class for database-related errors.
try:
runner = cloaca.DefaultRunner("invalid://database/url")
except cloaca.DatabaseError as e:
print(f"Database error: {e}")
Raised when database connection fails.
try:
runner = cloaca.DefaultRunner("postgresql://user:pass@nonexistent:5432/db")
result = runner.execute("workflow", context)
except cloaca.ConnectionError as e:
print(f"Database connection failed: {e}")
print(f"Database URL: {e.database_url}")
Raised when database migration fails.
try:
runner = cloaca.DefaultRunner("sqlite:///readonly.db")
except cloaca.MigrationError as e:
print(f"Database migration failed: {e}")
print(f"Migration version: {e.target_version}")
import cloaca
def execute_workflow_safely(runner, workflow_name, context):
"""Execute workflow with comprehensive error handling."""
try:
result = runner.execute(workflow_name, context)
if result.status == "Completed":
return result.final_context
else:
raise cloaca.WorkflowExecutionError(
f"Workflow failed with status: {result.status}"
)
except cloaca.TaskTimeoutError as e:
print(f"Task {e.task_id} timed out")
return None
except cloaca.WorkflowTimeoutError as e:
print(f"Workflow timed out after {e.timeout_seconds}s")
return e.partial_result
except cloaca.DatabaseError as e:
print(f"Database error: {e}")
return None
except cloaca.CloacaException as e:
print(f"Unexpected Cloaca error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
import time
import random
def execute_with_retry(runner, workflow_name, context, max_attempts=3):
"""Execute workflow with retry logic."""
for attempt in range(max_attempts):
try:
return runner.execute(workflow_name, context)
except cloaca.ConnectionError as e:
if attempt < max_attempts - 1:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Connection failed, retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
continue
else:
print(f"All {max_attempts} attempts failed")
raise
except cloaca.TaskTimeoutError as e:
print(f"Task timeout on attempt {attempt + 1}")
if attempt < max_attempts - 1:
continue
else:
raise
except cloaca.ConfigurationError:
# Don't retry validation or configuration errors
raise
except cloaca.CloacaException as e:
print(f"Cloaca error on attempt {attempt + 1}: {e}")
if attempt < max_attempts - 1:
time.sleep(1)
continue
else:
raise
class WorkflowManager:
"""Workflow manager with custom error handling."""
def __init__(self, runner):
self.runner = runner
self.error_handlers = {
cloaca.TaskTimeoutError: self._handle_task_timeout,
cloaca.WorkflowTimeoutError: self._handle_workflow_timeout,
cloaca.DatabaseError: self._handle_database_error,
}
def execute_workflow(self, name, context):
"""Execute workflow with custom error handling."""
try:
return self.runner.execute(name, context)
except Exception as e:
# Find appropriate handler
for exception_type, handler in self.error_handlers.items():
if isinstance(e, exception_type):
return handler(e, name, context)
# No specific handler found
return self._handle_generic_error(e, name, context)
def _handle_task_timeout(self, error, workflow_name, context):
print(f"Task {error.task_id} timed out in workflow {workflow_name}")
# Could implement partial result recovery
return None
def _handle_workflow_timeout(self, error, workflow_name, context):
print(f"Workflow {workflow_name} timed out")
return error.partial_result # Return partial results
def _handle_database_error(self, error, workflow_name, context):
print(f"Database error during {workflow_name}: {error}")
# Could implement fallback to different database
return None
def _handle_generic_error(self, error, workflow_name, context):
print(f"Unexpected error in {workflow_name}: {error}")
return None
Always preserve exception context:
try:
result = runner.execute("workflow", context)
except cloaca.TaskExecutionError as e:
# Access exception details
print(f"Task ID: {e.task_id}")
print(f"Error message: {e}")
print(f"Original exception: {e.__cause__}")
# Access context if available
if hasattr(e, 'context'):
error_context = e.context
print(f"Context at error: {error_context.data}")
Implement proper logging:
import logging
logger = logging.getLogger(__name__)
try:
result = runner.execute("workflow", context)
except cloaca.CloacaException as e:
logger.error(
"Workflow execution failed",
extra={
"workflow_name": getattr(e, 'workflow_name', 'unknown'),
"error_type": type(e).__name__,
"error_message": str(e)
},
exc_info=True
)
- Task Decorator - Task-level error handling
- DefaultRunner - Workflow execution and errors
- Configuration - Configuration validation errors