Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Packaged Workflow Architecture

This article explains the overall architecture of workflow packages and how they integrate with the Cloacina core system. Understanding this architecture is essential for deploying workflow packages in production and designing systems that leverage both packaged and embedded workflows.

System Overview

Workflow packages extend Cloacina’s core architecture by adding dynamic loading capabilities while maintaining full compatibility with the existing embedded workflow system. The architecture enables seamless mixing of both workflow types within the same application.

Core Design Principles

  1. Unified Execution: Both embedded and packaged workflows use the same execution engine
  2. Registry Integration: Packaged workflows integrate through a registry layer for lifecycle management
  3. Namespace Isolation: Task namespaces prevent conflicts between packages and tenants
  4. Persistent Storage: Workflows and execution state persist across restarts
  5. Hot-swapping: Workflow packages can be updated without application restarts
  6. Minimal Compilation Dependencies: Workflow packages can use cloacina-workflow (minimal types only) instead of the full cloacina crate, enabling fast compilation without database drivers

High-Level Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                        Cloacina Application                         │
├─────────────────────────────────────────────────────────────────────┤
│                          DefaultRunner                              │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐     │
│  │   Task          │  │   Pipeline      │  │   Background    │     │
│  │   Scheduler     │  │   Executor      │  │   Services      │     │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘     │
├─────────────────────────────────────────────────────────────────────┤
│                      Global Task Registry                           │
│         (Unified namespace for embedded + packaged tasks)           │
├─────────────────────────────────────────────────────────────────────┤
│  Embedded Workflows        │        Workflow Packages                │
│  ┌─────────────────┐      │      ┌─────────────────┐                │
│  │ #[workflow]     │      │      │ Workflow        │                │
│  │ #[task] macro   │      │      │ Registry        │                │
│  │ Compile-time    │      │      │ - Storage       │                │
│  │ registration    │      │      │ - Loader        │                │
│  └─────────────────┘      │      │ - Validator     │                │
│                           │      │ - Reconciler    │                │
│                           │      └─────────────────┘                │
├─────────────────────────────────────────────────────────────────────┤
│                        Database Layer                               │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐     │
│  │ Execution State │  │ Package         │  │ Cron Schedules  │     │
│  │ - Pipelines     │  │ Metadata        │  │ - Time-based    │     │
│  │ - Tasks         │  │ - Binary Store  │  │ - Recovery      │     │
│  │ - Context       │  │ - Registry      │  │ - Missed runs   │     │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘     │
└─────────────────────────────────────────────────────────────────────┘

Core Components

DefaultRunner - Unified Execution Engine

The DefaultRunner is the central orchestrator that manages both embedded and packaged workflows:

pub struct DefaultRunner {
    /// Database connection for persistence and state management
    database: Database,
    /// Configuration parameters for the runner
    config: DefaultRunnerConfig,
    /// Task scheduler for managing workflow execution scheduling
    scheduler: Arc<TaskScheduler>,
    /// Task executor for running individual tasks
    executor: Arc<dyn TaskExecutorTrait>,
    /// Optional workflow registry for workflow packages
    workflow_registry: Arc<RwLock<Option<Arc<WorkflowRegistryImpl<FilesystemRegistryStorage>>>>>,
    /// Optional registry reconciler for workflow packages
    registry_reconciler: Arc<RwLock<Option<Arc<RegistryReconciler>>>>,
    /// Optional cron scheduler for time-based workflow execution
    cron_scheduler: Arc<RwLock<Option<Arc<CronScheduler>>>>,
}

WorkflowRegistryImpl - Package Management

The workflow registry provides comprehensive package lifecycle management:

pub struct WorkflowRegistryImpl<S: RegistryStorage> {
    /// Storage backend for binary data (.cloacina files)
    storage: S,
    /// Database for metadata storage
    database: Database,
    /// Package loader for metadata extraction
    loader: PackageLoader,
    /// Task registrar — projects packaged tasks into the host's `Runtime`
    /// task registry so they're discoverable alongside embedded tasks.
    registrar: TaskRegistrar,
    /// Package validator for safety checks
    validator: PackageValidator,
    /// Map of package IDs to registered task namespaces
    loaded_packages: HashMap<Uuid, Vec<TaskNamespace>>,
}

Unified Task Registry

Both embedded and packaged workflows surface tasks through the host’s cloacina::Runtime registry. The mechanisms differ by mode but the lookup surface is identical:

  • Embedded tasks: The #[task] macro emits an inventory::submit! entry of type TaskEntry. At startup, Runtime::seed_from_inventory() walks the process-local inventory section and registers a constructor per entry.
  • Packaged tasks: The package’s cdylib carries its own inventory section. The host calls get_task_metadata() over the FFI vtable (method index 0), receives a PackageTasksMetadata describing every task in the package, and registers a DynamicLibraryTask constructor in the host Runtime per metadata entry. Inventory does not span the cdylib boundary — that’s why the FFI bridge exists.
  • Namespace format: tenant::package::workflow::task_id
  • Unified lookup: All tasks discoverable through the same Runtime interface regardless of registration path.

Integration Points

Task Registration

Both workflow types populate the same Runtime task registry but through different mechanisms:

Embedded Workflow Registration:

// Emitted by the #[task] macro for each task in a #[workflow] module.
// `Runtime::seed_from_inventory()` walks these entries at startup.
cloacina_workflow_plugin::inventory::submit! {
    cloacina_workflow_plugin::TaskEntry {
        namespace: || cloacina::TaskNamespace::new(
            "public", "embedded", "embedded_workflow", "collect_data",
        ),
        constructor: || std::sync::Arc::new(CollectDataTask::new()),
    }
}

Packaged Workflow Registration:

// Runtime registration via TaskRegistrar
registrar.register_package_tasks(
    &package_metadata,
    &task_namespaces,
    library_handle
).await?;

Workflow Execution

Both workflow types use identical execution paths:

  1. Task Discovery: Lookup in global task registry
  2. Dependency Resolution: Same graph algorithms via TaskScheduler
  3. Execution: Same ThreadTaskExecutor runs tasks
  4. Context Management: Same Context object for data flow
  5. Persistence: Same database schema for execution state

Configuration and Setup

The DefaultRunner can be configured with or without workflow package support:

// Embedded workflows only
let runner = DefaultRunner::new(&database_url).await?;

// With workflow package support
let mut config = DefaultRunnerConfig::default();
config.enable_registry_reconciler = true;
config.registry_storage_path = Some(PathBuf::from("/path/to/storage"));

let runner = DefaultRunner::with_config(&database_url, config).await?;

Database Schema Integration

Workflow Package Tables

Workflow packages add dedicated tables to the schema:

workflow_packages:

CREATE TABLE workflow_packages (
    id UUID PRIMARY KEY,
    package_name TEXT NOT NULL,
    version TEXT NOT NULL,
    description TEXT,
    author TEXT,
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL,
    UNIQUE(package_name, version)
);

workflow_registry:

CREATE TABLE workflow_registry (
    id UUID PRIMARY KEY,
    package_id UUID NOT NULL REFERENCES workflow_packages(id),
    binary_data BYTEA NOT NULL,
    created_at TIMESTAMPTZ NOT NULL
);

Shared Execution Tables

Both workflow types use the same execution state tables:

  • pipelines: Workflow execution instances
  • tasks: Individual task execution records
  • cron_schedules: Time-based scheduling
  • pipeline_context: Execution context data

Execution Lifecycle

Embedded Workflow Lifecycle

  1. Compile-time: Macros generate workflow structures and register tasks
  2. Runtime: runner.execute(workflow, context) starts execution
  3. Scheduling: TaskScheduler resolves dependencies
  4. Execution: ThreadTaskExecutor runs tasks in dependency order
  5. Persistence: Results saved to database

Workflow Package Lifecycle

  1. Package Registration: .cloacina file loaded into registry
  2. Task Registration: Task metadata read across the FFI vtable (method index 0); a DynamicLibraryTask constructor is registered in the host Runtime per task
  3. Runtime: Same execution path as embedded workflows
  4. Reconciliation: Background service monitors for package changes
  5. Hot-swapping: New package versions can replace existing ones

Unified Execution Flow

// Same execution interface for both types
impl DefaultRunner {
    pub async fn execute<T>(&self, workflow_name: &str, context: Context<T>) -> Result<()>
    where T: Clone + Send + Sync + 'static {
        // 1. Create pipeline execution record
        let pipeline_id = self.pipeline_executor.create_pipeline(workflow_name, context).await?;

        // 2. Task discovery (embedded or packaged)
        let tasks = self.scheduler.get_workflow_tasks(workflow_name).await?;

        // 3. Dependency resolution and scheduling
        let execution_plan = self.scheduler.create_execution_plan(tasks).await?;

        // 4. Execute tasks (same executor for both types)
        self.executor.execute_plan(pipeline_id, execution_plan).await?;

        Ok(())
    }
}

Namespace Management

Task Namespacing

Workflow packages use hierarchical namespaces to prevent conflicts:

Format: {tenant}.{package}.{workflow}.{task_id}

Examples:

  • acme.data_processor.etl_pipeline.extract_data
  • acme.data_processor.etl_pipeline.transform_data
  • beta_corp.ml_trainer.model_pipeline.train_model

Namespace Isolation

  • Tenant isolation: Different tenants can use same package names
  • Package isolation: Multiple packages can define same task names
  • Workflow isolation: Multiple workflows in same package are isolated
  • Version isolation: Different package versions maintain separate namespaces

Background Services

Registry Reconciler

Monitors the storage backend for package changes and automatically updates the registry:

pub struct RegistryReconciler {
    registry: Arc<WorkflowRegistryImpl<FilesystemRegistryStorage>>,
    config: ReconcilerConfig,
}

impl RegistryReconciler {
    pub async fn run(&self, mut shutdown: broadcast::Receiver<()>) {
        loop {
            // Scan storage for new/updated packages
            self.scan_and_reconcile().await;

            // Wait for next poll interval or shutdown
            tokio::select! {
                _ = shutdown.recv() => break,
                _ = tokio::time::sleep(self.config.poll_interval) => continue,
            }
        }
    }
}

Cron Scheduler

Provides time-based execution for both workflow types:

  • Unified scheduling: Works with embedded and packaged workflows
  • Recovery support: Handles missed executions during downtime
  • Persistent schedules: Cron expressions stored in database
  • Multi-tenant aware: Schedules isolated by tenant namespace

Multi-Tenancy Support

Schema-Based Isolation (PostgreSQL)

// Tenant A
let runner_a = DefaultRunnerBuilder::new()
    .database_url("postgresql://user:pass@localhost/cloacina")
    .schema("tenant_a")
    .build()
    .await?;

// Tenant B
let runner_b = DefaultRunnerBuilder::new()
    .database_url("postgresql://user:pass@localhost/cloacina")
    .schema("tenant_b")
    .build()
    .await?;

Storage Isolation

Each tenant can have isolated storage for workflow packages:

// Tenant-specific storage paths
config.registry_storage_path = Some(PathBuf::from("/storage/tenant_a"));

Crate Structure

Cloacina is organized into separate crates to support both embedded and packaged workflow development:

cloacina/
  cloacina-workflow/     # Minimal types for workflow compilation
  cloacina-macros/       # Procedural macros (#[task], #[workflow])
  cloacina/              # Full runtime (executor, scheduler, database)

cloacina-workflow (Minimal Crate)

Contains only the types needed to compile workflows:

  • Context<T> - Data container for task communication
  • Task trait - Interface for task implementations
  • TaskError, ContextError - Error types
  • RetryPolicy, BackoffStrategy - Retry configuration
  • TaskNamespace - Namespace utilities

Dependencies: async-trait, serde, serde_json, thiserror, chrono

Does NOT include: Database drivers (diesel), connection pools, executor, scheduler, libloading

cloacina (Full Crate)

Re-exports everything from cloacina-workflow plus:

  • Database backends (PostgreSQL, SQLite)
  • DefaultRunner and execution engine
  • Workflow registry and package loading
  • Cron scheduling
  • Multi-tenancy support

Usage

Use Case Crate
Workflow packages cloacina-workflow (includes macros)
Embedded workflows cloacina
Host application cloacina

Comparison: Embedded vs Packaged

Aspect Embedded Workflows Workflow Packages
Definition #[workflow] and #[task] macros .cloacina archives
Registration Compile-time via macro expansion Runtime via registry loading
Distribution Part of application binary Separate distributable files
Loading Static linking Dynamic loading via libloading
Validation Compile-time dependency checking Runtime validation during registration
Hot-swapping Requires recompilation and restart Runtime replacement possible
Storage In-memory function pointers Database + filesystem storage
Deployment Application deployment Independent package deployment
Versioning Application version Independent package versioning
Multi-tenancy Shared across tenants Tenant-specific workflow packages possible
Performance Direct function calls FFI overhead
Development Integrated development cycle Independent development cycle
Compile Time Full crate dependencies Minimal dependencies with cloacina-workflow

Production Deployment Patterns

Hybrid Architecture

Most production systems use both workflow types strategically:

Embedded Workflows for:

  • Core business logic
  • Application-specific workflows
  • Performance-critical paths
  • Workflows that rarely change

Workflow Packages for:

  • Customer-specific customizations
  • Frequently updated workflows
  • Multi-tenant scenarios
  • External integrations

Deployment Strategy

// Production configuration
let mut config = DefaultRunnerConfig::default();
config.max_concurrent_tasks = 50;
config.task_timeout = Duration::from_mins(30);
config.enable_registry_reconciler = true;
config.enable_cron_scheduling = true;
config.enable_recovery = true;

let runner = DefaultRunner::with_config(&database_url, config).await?;

Operational Considerations

  1. Monitoring: Same monitoring for both workflow types
  2. Logging: Unified logging with namespace identification
  3. Metrics: Combined metrics collection and reporting
  4. Backup: Include both database and package storage
  5. Security: Package validation and namespace isolation

Best Practices

Architecture Design

  1. Start with embedded workflows for core functionality
  2. Add packaged workflows for customization and extensions
  3. Use consistent naming across embedded and packaged workflows
  4. Plan namespace hierarchy before deployment
  5. Consider tenant isolation requirements early

Performance Optimization

  1. Minimize FFI calls in packaged workflows
  2. Cache package metadata to avoid repeated loading
  3. Use appropriate buffer sizes for task communication
  4. Monitor resource usage of both workflow types
  5. Tune database connection pools for expected load

Operational Excellence

  1. Version package dependencies explicitly
  2. Test package compatibility before deployment
  3. Monitor package storage disk usage
  4. Implement gradual rollouts for package updates
  5. Maintain rollback procedures for package issues