Macro Reference
Cloacina provides three procedural attribute macros for defining tasks, workflows, and triggers. These macros generate trait implementations, registration code, and compile-time validation.
use cloacina::{task, workflow, Context, TaskError};
use serde_json::Value;
Applied to an async fn to define a task with retry policies, dependency declarations, trigger rules, and lifecycle callbacks.
#[task(
id = "my_task",
dependencies = ["dep_a", "dep_b"],
retry_attempts = 3,
retry_backoff = "exponential",
retry_delay_ms = 1000,
retry_max_delay_ms = 30000,
retry_condition = "all",
retry_jitter = true,
trigger_rules = always,
on_success = my_success_handler,
on_failure = my_failure_handler,
)]
pub async fn my_task(context: &mut Context<Value>) -> Result<(), TaskError> {
Ok(())
}
| Attribute | Type | Required | Default | Description |
|---|---|---|---|---|
id |
string literal | yes | – | Unique identifier for the task within its workflow. Used for dependency references. |
dependencies |
array of string literals | no | [] |
List of task IDs that must complete before this task runs. |
retry_attempts |
integer | no | 3 |
Maximum number of retry attempts on failure. |
retry_backoff |
string literal | no | "exponential" |
Backoff strategy between retries. See Backoff Strategies. |
retry_delay_ms |
integer | no | 1000 |
Initial delay between retries in milliseconds. |
retry_max_delay_ms |
integer | no | 30000 |
Maximum delay between retries in milliseconds (caps exponential/linear growth). |
retry_condition |
string literal | no | "all" |
When to retry. See Retry Conditions. |
retry_jitter |
boolean | no | true |
Whether to add random jitter to retry delays to avoid thundering herd. |
trigger_rules |
expression | no | always |
Trigger rule expression controlling when the task should execute. See Trigger Rules. |
on_success |
expression (path) | no | – | Async callback on success. Signature: async fn(&str, &Context<Value>) -> Result<(), E> |
on_failure |
expression (path) | no | – | Async callback on failure. Signature: async fn(&str, &TaskError, &Context<Value>) -> Result<(), E> |
| Value | Behavior |
|---|---|
"fixed" |
Constant delay of retry_delay_ms between every attempt |
"linear" |
Delay increases by retry_delay_ms each attempt (1x, 2x, 3x, …) |
"exponential" |
Delay doubles each attempt (base 2, multiplier 1.0), capped at retry_max_delay_ms |
| Value | Behavior |
|---|---|
"never" |
Never retry, regardless of error type |
"all" |
Retry on all errors |
"transient" |
Retry only on transient errors |
"pattern1,pattern2" |
Retry only when the error message matches one of the comma-separated patterns |
Trigger rules are compile-time expressions that control conditional task execution:
| Expression | Description |
|---|---|
always |
Task always runs when dependencies are satisfied |
task_success("task_id") |
Run only if the named task succeeded |
task_failed("task_id") |
Run only if the named task failed |
task_skipped("task_id") |
Run only if the named task was skipped |
context_value("key", operator, value) |
Run based on a context value comparison |
all(cond1, cond2, ...) |
Run when all conditions are true |
any(cond1, cond2, ...) |
Run when any condition is true |
none(cond1, cond2, ...) |
Run when no conditions are true |
Context value operators: equals, not_equals, greater_than, less_than, contains, not_contains, exists, not_exists
Example:
#[task(
id = "cleanup",
dependencies = ["process"],
trigger_rules = any(
task_failed("process"),
context_value("force_cleanup", equals, true)
)
)]
pub async fn cleanup(context: &mut Context<Value>) -> Result<(), TaskError> {
Ok(())
}
The task function must:
- Have a
contextparameter (or_context) of type&mut Context<Value> - Return
Result<(), TaskError>(or any error type convertible toTaskError) - Be
async(or synchronous – both are supported)
An optional second parameter named handle or task_handle provides access to a TaskHandle for concurrency slot management. When the macro detects a parameter with one of these names, it sets requires_handle() = true on the generated Task trait implementation. The executor then creates a TaskHandle and injects it via task-local storage at runtime.
#[task(id = "wait_for_file")]
pub async fn wait_for_file(
context: &mut Context<Value>,
handle: &mut TaskHandle,
) -> Result<(), TaskError> {
handle.defer_until(
|| async { std::path::Path::new("/data/input.csv").exists() },
Duration::from_secs(5),
).await.map_err(|e| TaskError::ExecutionFailed {
message: format!("defer_until failed: {e}"),
task_id: "wait_for_file".into(),
timestamp: chrono::Utc::now(),
})?;
// File exists -- slot has been reclaimed, proceed with work
Ok(())
}
TaskHandle methods:
| Method | Signature | Description |
|---|---|---|
defer_until |
async fn(&mut self, condition: F, poll_interval: Duration) -> Result<(), ExecutorError> |
Release the concurrency slot, poll condition at poll_interval, reclaim when true |
is_slot_held |
fn(&self) -> bool |
Whether the handle currently holds a concurrency slot |
task_execution_id |
fn(&self) -> UniversalUuid |
The task execution ID for this invocation |
See Task Deferral Architecture for the full lifecycle and Tutorial 10 for a walkthrough.
The macro generates:
- The original function (preserved for direct testing)
- A
{PascalCase}Taskstruct implementingcloacina_workflow::Task - A
{fn_name}_task()constructor function - Static methods:
dependency_task_ids(),code_fingerprint(),create_retry_policy(),trigger_rules()
Applied to a pub mod containing #[task] functions. Auto-discovers tasks, validates dependencies, and generates registration code.
#[workflow(name = "etl_pipeline", description = "Extract, transform, and load data")]
pub mod etl_pipeline {
use super::*;
#[task(id = "extract", dependencies = [])]
pub async fn extract(context: &mut Context<Value>) -> Result<(), TaskError> {
Ok(())
}
#[task(id = "transform", dependencies = ["extract"])]
pub async fn transform(context: &mut Context<Value>) -> Result<(), TaskError> {
Ok(())
}
#[task(id = "load", dependencies = ["transform"])]
pub async fn load(context: &mut Context<Value>) -> Result<(), TaskError> {
Ok(())
}
}
| Attribute | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string literal | yes | – | Unique workflow identifier. Used for registration and execution. |
description |
string literal | no | – | Human-readable description of the workflow. |
tenant |
string literal | no | "public" |
Tenant identifier for multi-tenant deployments. |
author |
string literal | no | – | Author information. |
The #[workflow] macro generates different code depending on compilation features:
| Mode | Feature Flag | Behavior |
|---|---|---|
| Embedded | (default) | Emits inventory::submit! entries for the workflow + each task. cloacina::Runtime::seed_from_inventory() walks those entries at startup and populates the runtime registry. (Pre-I-0096 docs may reference #[ctor]; that path is gone — no ctor dependency is required.) |
| Packaged | features = ["packaged"] |
Generates FFI exports for .cloacina packages. Pair with the cloacina::package!() shell at the cdylib crate root. The workflow is loaded dynamically at runtime by the reconciler. |
The workflow macro performs these validations at compile time:
- Duplicate task IDs: Two tasks with the same
idin the same workflow produce a compile error - Cycle detection: Circular dependencies (e.g., A depends on B, B depends on A) produce a compile error
- Similar name suggestions: If a dependency references a non-existent task, the compiler suggests similar task names
Applied to an async fn to define a trigger that fires a workflow on a schedule or condition. Two modes are available: custom poll triggers and cron triggers.
The function body contains the poll logic. Called at poll_interval frequency.
#[trigger(on = "inbox_processor", poll_interval = "5s")]
pub async fn check_inbox() -> Result<TriggerResult, TriggerError> {
if has_new_messages().await? {
Ok(TriggerResult::Fire(Some(context)))
} else {
Ok(TriggerResult::Skip)
}
}
The cron expression provides the schedule. The function body is ignored (consumed by the macro).
#[trigger(on = "daily_report", cron = "0 2 * * *", timezone = "America/New_York")]
pub async fn nightly_report() {}
| Attribute | Type | Required | Default | Description |
|---|---|---|---|---|
on |
string literal | yes | – | Name of the workflow to trigger. |
poll_interval |
string literal | one of poll_interval or cron |
– | Poll frequency. Format: 100ms, 5s, 2m, 1h. |
cron |
string literal | one of poll_interval or cron |
– | Cron expression (5-7 fields). Validated at compile time. |
timezone |
string literal | no | "UTC" |
IANA timezone for cron evaluation (e.g., "America/New_York"). Only applies to cron triggers. |
allow_concurrent |
boolean | no | false |
Whether multiple trigger firings can overlap. |
name |
string literal | no | function name | Override the trigger name (used for registration and schedule records). |
Validation rules:
- Exactly one of
poll_intervalorcronmust be specified (not both, not neither) - Cron expressions must have 5-7 fields with valid characters (
0-9,,,-,*,/) - Poll interval must use a recognized unit suffix (
ms,s,m,h)
| Suffix | Unit | Example |
|---|---|---|
ms |
Milliseconds | 100ms |
s |
Seconds | 5s |
m |
Minutes | 2m |
h |
Hours | 1h |
Every #[task] function has a code fingerprint – a 16-character hexadecimal hash computed at compile time. The fingerprint is used to detect when a task’s implementation has changed.
The fingerprint includes:
- Function parameter types (excluding parameter names)
- Return type
- Function body (the complete token stream of the block)
- Async-ness (whether the function is
async)
- The function name
- Attributes (retry policy, dependencies, etc.)
- Comments and whitespace (after tokenization)
- Items outside the function body
A new fingerprint is generated when:
- The function body changes (any logic change)
- The function signature changes (parameter types or return type)
- The function changes from sync to async or vice versa
A fingerprint does NOT change when:
- Only the task attributes change (e.g., updating
retry_attempts) - Only the function name changes
- Only comments change
Fingerprints are available via the generated struct:
// Static method
let fp = MyTaskTask::code_fingerprint();
// Via Task trait
let task = my_task_task();
let fp = task.code_fingerprint(); // Returns Option<String>
- Cron Scheduling Architecture – how cron triggers are evaluated
- Errors Reference –
TaskError,TriggerErrorvariants - cloacina-testing API – testing tasks without a database