Packaged Workflow Architecture
This article explains the overall architecture of workflow packages and how they integrate with the Cloacina core system. Understanding this architecture is essential for deploying workflow packages in production and designing systems that leverage both packaged and embedded workflows.
Workflow packages extend Cloacina’s core architecture by adding dynamic loading capabilities while maintaining full compatibility with the existing embedded workflow system. The architecture enables seamless mixing of both workflow types within the same application.
- Unified Execution: Both embedded and packaged workflows use the same execution engine
- Registry Integration: Packaged workflows integrate through a registry layer for lifecycle management
- Namespace Isolation: Task namespaces prevent conflicts between packages and tenants
- Persistent Storage: Workflows and execution state persist across restarts
- Hot-swapping: Workflow packages can be updated without application restarts
- Minimal Compilation Dependencies: Workflow packages can use
cloacina-workflow(minimal types only) instead of the fullcloacinacrate, enabling fast compilation without database drivers
┌─────────────────────────────────────────────────────────────────────┐
│ Cloacina Application │
├─────────────────────────────────────────────────────────────────────┤
│ DefaultRunner │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Task │ │ Pipeline │ │ Background │ │
│ │ Scheduler │ │ Executor │ │ Services │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────────┤
│ Global Task Registry │
│ (Unified namespace for embedded + packaged tasks) │
├─────────────────────────────────────────────────────────────────────┤
│ Embedded Workflows │ Workflow Packages │
│ ┌─────────────────┐ │ ┌─────────────────┐ │
│ │ #[workflow] │ │ │ Workflow │ │
│ │ #[task] macro │ │ │ Registry │ │
│ │ Compile-time │ │ │ - Storage │ │
│ │ registration │ │ │ - Loader │ │
│ └─────────────────┘ │ │ - Validator │ │
│ │ │ - Reconciler │ │
│ │ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────────┤
│ Database Layer │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Execution State │ │ Package │ │ Cron Schedules │ │
│ │ - Pipelines │ │ Metadata │ │ - Time-based │ │
│ │ - Tasks │ │ - Binary Store │ │ - Recovery │ │
│ │ - Context │ │ - Registry │ │ - Missed runs │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
The DefaultRunner is the central orchestrator that manages both embedded and packaged workflows:
pub struct DefaultRunner {
/// Database connection for persistence and state management
database: Database,
/// Configuration parameters for the runner
config: DefaultRunnerConfig,
/// Task scheduler for managing workflow execution scheduling
scheduler: Arc<TaskScheduler>,
/// Task executor for running individual tasks
executor: Arc<dyn TaskExecutorTrait>,
/// Optional workflow registry for workflow packages
workflow_registry: Arc<RwLock<Option<Arc<WorkflowRegistryImpl<FilesystemRegistryStorage>>>>>,
/// Optional registry reconciler for workflow packages
registry_reconciler: Arc<RwLock<Option<Arc<RegistryReconciler>>>>,
/// Optional cron scheduler for time-based workflow execution
cron_scheduler: Arc<RwLock<Option<Arc<CronScheduler>>>>,
}
The workflow registry provides comprehensive package lifecycle management:
pub struct WorkflowRegistryImpl<S: RegistryStorage> {
/// Storage backend for binary data (.cloacina files)
storage: S,
/// Database for metadata storage
database: Database,
/// Package loader for metadata extraction
loader: PackageLoader,
/// Task registrar — projects packaged tasks into the host's `Runtime`
/// task registry so they're discoverable alongside embedded tasks.
registrar: TaskRegistrar,
/// Package validator for safety checks
validator: PackageValidator,
/// Map of package IDs to registered task namespaces
loaded_packages: HashMap<Uuid, Vec<TaskNamespace>>,
}
Both embedded and packaged workflows surface tasks through the host’s
cloacina::Runtime registry. The mechanisms differ by mode but the
lookup surface is identical:
- Embedded tasks: The
#[task]macro emits aninventory::submit!entry of typeTaskEntry. At startup,Runtime::seed_from_inventory()walks the process-local inventory section and registers a constructor per entry. - Packaged tasks: The package’s cdylib carries its own inventory
section. The host calls
get_task_metadata()over the FFI vtable (method index 0), receives aPackageTasksMetadatadescribing every task in the package, and registers aDynamicLibraryTaskconstructor in the hostRuntimeper metadata entry. Inventory does not span the cdylib boundary — that’s why the FFI bridge exists. - Namespace format:
tenant::package::workflow::task_id - Unified lookup: All tasks discoverable through the same
Runtimeinterface regardless of registration path.
Both workflow types populate the same Runtime task registry but
through different mechanisms:
Embedded Workflow Registration:
// Emitted by the #[task] macro for each task in a #[workflow] module.
// `Runtime::seed_from_inventory()` walks these entries at startup.
cloacina_workflow_plugin::inventory::submit! {
cloacina_workflow_plugin::TaskEntry {
namespace: || cloacina::TaskNamespace::new(
"public", "embedded", "embedded_workflow", "collect_data",
),
constructor: || std::sync::Arc::new(CollectDataTask::new()),
}
}
Packaged Workflow Registration:
// Runtime registration via TaskRegistrar
registrar.register_package_tasks(
&package_metadata,
&task_namespaces,
library_handle
).await?;
Both workflow types use identical execution paths:
- Task Discovery: Lookup in global task registry
- Dependency Resolution: Same graph algorithms via
TaskScheduler - Execution: Same
ThreadTaskExecutorruns tasks - Context Management: Same
Contextobject for data flow - Persistence: Same database schema for execution state
The DefaultRunner can be configured with or without workflow package support:
// Embedded workflows only
let runner = DefaultRunner::new(&database_url).await?;
// With workflow package support
let mut config = DefaultRunnerConfig::default();
config.enable_registry_reconciler = true;
config.registry_storage_path = Some(PathBuf::from("/path/to/storage"));
let runner = DefaultRunner::with_config(&database_url, config).await?;
Workflow packages add dedicated tables to the schema:
workflow_packages:
CREATE TABLE workflow_packages (
id UUID PRIMARY KEY,
package_name TEXT NOT NULL,
version TEXT NOT NULL,
description TEXT,
author TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
UNIQUE(package_name, version)
);
workflow_registry:
CREATE TABLE workflow_registry (
id UUID PRIMARY KEY,
package_id UUID NOT NULL REFERENCES workflow_packages(id),
binary_data BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
Both workflow types use the same execution state tables:
- pipelines: Workflow execution instances
- tasks: Individual task execution records
- cron_schedules: Time-based scheduling
- pipeline_context: Execution context data
- Compile-time: Macros generate workflow structures and register tasks
- Runtime:
runner.execute(workflow, context)starts execution - Scheduling:
TaskSchedulerresolves dependencies - Execution:
ThreadTaskExecutorruns tasks in dependency order - Persistence: Results saved to database
- Package Registration:
.cloacinafile loaded into registry - Task Registration: Task metadata read across the FFI vtable (method index 0); a
DynamicLibraryTaskconstructor is registered in the hostRuntimeper task - Runtime: Same execution path as embedded workflows
- Reconciliation: Background service monitors for package changes
- Hot-swapping: New package versions can replace existing ones
// Same execution interface for both types
impl DefaultRunner {
pub async fn execute<T>(&self, workflow_name: &str, context: Context<T>) -> Result<()>
where T: Clone + Send + Sync + 'static {
// 1. Create pipeline execution record
let pipeline_id = self.pipeline_executor.create_pipeline(workflow_name, context).await?;
// 2. Task discovery (embedded or packaged)
let tasks = self.scheduler.get_workflow_tasks(workflow_name).await?;
// 3. Dependency resolution and scheduling
let execution_plan = self.scheduler.create_execution_plan(tasks).await?;
// 4. Execute tasks (same executor for both types)
self.executor.execute_plan(pipeline_id, execution_plan).await?;
Ok(())
}
}
Workflow packages use hierarchical namespaces to prevent conflicts:
Format: {tenant}.{package}.{workflow}.{task_id}
Examples:
acme.data_processor.etl_pipeline.extract_dataacme.data_processor.etl_pipeline.transform_databeta_corp.ml_trainer.model_pipeline.train_model
- Tenant isolation: Different tenants can use same package names
- Package isolation: Multiple packages can define same task names
- Workflow isolation: Multiple workflows in same package are isolated
- Version isolation: Different package versions maintain separate namespaces
Monitors the storage backend for package changes and automatically updates the registry:
pub struct RegistryReconciler {
registry: Arc<WorkflowRegistryImpl<FilesystemRegistryStorage>>,
config: ReconcilerConfig,
}
impl RegistryReconciler {
pub async fn run(&self, mut shutdown: broadcast::Receiver<()>) {
loop {
// Scan storage for new/updated packages
self.scan_and_reconcile().await;
// Wait for next poll interval or shutdown
tokio::select! {
_ = shutdown.recv() => break,
_ = tokio::time::sleep(self.config.poll_interval) => continue,
}
}
}
}
Provides time-based execution for both workflow types:
- Unified scheduling: Works with embedded and packaged workflows
- Recovery support: Handles missed executions during downtime
- Persistent schedules: Cron expressions stored in database
- Multi-tenant aware: Schedules isolated by tenant namespace
// Tenant A
let runner_a = DefaultRunnerBuilder::new()
.database_url("postgresql://user:pass@localhost/cloacina")
.schema("tenant_a")
.build()
.await?;
// Tenant B
let runner_b = DefaultRunnerBuilder::new()
.database_url("postgresql://user:pass@localhost/cloacina")
.schema("tenant_b")
.build()
.await?;
Each tenant can have isolated storage for workflow packages:
// Tenant-specific storage paths
config.registry_storage_path = Some(PathBuf::from("/storage/tenant_a"));
Cloacina is organized into separate crates to support both embedded and packaged workflow development:
cloacina/
cloacina-workflow/ # Minimal types for workflow compilation
cloacina-macros/ # Procedural macros (#[task], #[workflow])
cloacina/ # Full runtime (executor, scheduler, database)
Contains only the types needed to compile workflows:
Context<T>- Data container for task communicationTasktrait - Interface for task implementationsTaskError,ContextError- Error typesRetryPolicy,BackoffStrategy- Retry configurationTaskNamespace- Namespace utilities
Dependencies: async-trait, serde, serde_json, thiserror, chrono
Does NOT include: Database drivers (diesel), connection pools, executor, scheduler, libloading
Re-exports everything from cloacina-workflow plus:
- Database backends (PostgreSQL, SQLite)
DefaultRunnerand execution engine- Workflow registry and package loading
- Cron scheduling
- Multi-tenancy support
| Use Case | Crate |
|---|---|
| Workflow packages | cloacina-workflow (includes macros) |
| Embedded workflows | cloacina |
| Host application | cloacina |
| Aspect | Embedded Workflows | Workflow Packages |
|---|---|---|
| Definition | #[workflow] and #[task] macros |
.cloacina archives |
| Registration | Compile-time via macro expansion | Runtime via registry loading |
| Distribution | Part of application binary | Separate distributable files |
| Loading | Static linking | Dynamic loading via libloading |
| Validation | Compile-time dependency checking | Runtime validation during registration |
| Hot-swapping | Requires recompilation and restart | Runtime replacement possible |
| Storage | In-memory function pointers | Database + filesystem storage |
| Deployment | Application deployment | Independent package deployment |
| Versioning | Application version | Independent package versioning |
| Multi-tenancy | Shared across tenants | Tenant-specific workflow packages possible |
| Performance | Direct function calls | FFI overhead |
| Development | Integrated development cycle | Independent development cycle |
| Compile Time | Full crate dependencies | Minimal dependencies with cloacina-workflow |
Most production systems use both workflow types strategically:
Embedded Workflows for:
- Core business logic
- Application-specific workflows
- Performance-critical paths
- Workflows that rarely change
Workflow Packages for:
- Customer-specific customizations
- Frequently updated workflows
- Multi-tenant scenarios
- External integrations
// Production configuration
let mut config = DefaultRunnerConfig::default();
config.max_concurrent_tasks = 50;
config.task_timeout = Duration::from_mins(30);
config.enable_registry_reconciler = true;
config.enable_cron_scheduling = true;
config.enable_recovery = true;
let runner = DefaultRunner::with_config(&database_url, config).await?;
- Monitoring: Same monitoring for both workflow types
- Logging: Unified logging with namespace identification
- Metrics: Combined metrics collection and reporting
- Backup: Include both database and package storage
- Security: Package validation and namespace isolation
- Start with embedded workflows for core functionality
- Add packaged workflows for customization and extensions
- Use consistent naming across embedded and packaged workflows
- Plan namespace hierarchy before deployment
- Consider tenant isolation requirements early
- Minimize FFI calls in packaged workflows
- Cache package metadata to avoid repeated loading
- Use appropriate buffer sizes for task communication
- Monitor resource usage of both workflow types
- Tune database connection pools for expected load
- Version package dependencies explicitly
- Test package compatibility before deployment
- Monitor package storage disk usage
- Implement gradual rollouts for package updates
- Maintain rollback procedures for package issues