09 - Event Triggers
This tutorial guides you through implementing event-driven workflow execution using Cloacina’s trigger system. Unlike cron scheduling (time-based), event triggers poll user-defined conditions and fire workflows when those conditions are met.
Before starting this tutorial, you should:
- Completion of Tutorial 5 - Cron Scheduling
- Be familiar with creating workflows and tasks using Cloacina’s macro system
- Understand async/await patterns in Rust
- Have Rust toolchain installed (rustc, cargo)
25-35 minutes
- How to implement the
Triggertrait for custom conditions in Rust - How to define triggers in Python using the
@triggerdecorator - Using
TriggerResultto control trigger behavior - Context passing from triggers to workflows
- Deduplication strategies for concurrent executions
- Best practices for trigger implementations
Event triggers complement cron scheduling by providing condition-based workflow execution:
| Feature | Event Triggers | Cron Scheduling |
|---|---|---|
| Activation | Condition-based | Time-based |
| Poll Logic | User-defined | Cron expression |
| Context | Dynamic from trigger | Static |
| Deduplication | Context hash | Time window |
| Use Case | Reactive workflows | Scheduled jobs |
Triggers implement the Trigger trait:
#[async_trait]
pub trait Trigger: Send + Sync {
/// Unique identifier for this trigger
fn name(&self) -> &str;
/// How often to poll this trigger
fn poll_interval(&self) -> Duration;
/// Whether to allow concurrent executions
fn allow_concurrent(&self) -> bool;
/// Check condition and return result
async fn poll(&self) -> Result<TriggerResult, TriggerError>;
}
The poll() function returns one of:
pub enum TriggerResult {
/// Don't fire, continue polling
Skip,
/// Fire the workflow with optional context
Fire(Option<Context<Value>>),
}
When allow_concurrent = false, the trigger scheduler prevents duplicate executions:
- Context is hashed when
TriggerResult::Fireis returned - Active executions are tracked by (trigger_name, context_hash)
- If an execution with the same hash is running, the trigger skips
Create a new Rust project for this tutorial:
cargo new event-triggers-tutorial
cd event-triggers-tutorial
Add the required dependencies to your Cargo.toml:
[package]
name = "event-triggers-tutorial"
version = "0.1.0"
edition = "2021"
[dependencies]
cloacina = { path = "../cloacina", features = ["sqlite", "macros"] }
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde_json = "1.0"
chrono = "0.4"
async-trait = "0.1"
Let’s create a file watcher trigger that monitors for new files:
// src/triggers.rs
use async_trait::async_trait;
use cloacina::trigger::{Trigger, TriggerError, TriggerResult};
use cloacina::Context;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tracing::info;
/// Counter for simulating file arrivals
static FILE_COUNTER: AtomicUsize = AtomicUsize::new(0);
/// A trigger that polls for new files in a directory
#[derive(Debug, Clone)]
pub struct FileWatcherTrigger {
name: String,
poll_interval: Duration,
watch_path: String,
}
impl FileWatcherTrigger {
pub fn new(name: &str, watch_path: &str, poll_interval: Duration) -> Self {
Self {
name: name.to_string(),
poll_interval,
watch_path: watch_path.to_string(),
}
}
/// Check for new files (simulated for demo)
async fn check_for_new_files(&self) -> Option<String> {
let count = FILE_COUNTER.fetch_add(1, Ordering::SeqCst);
if count % 5 == 4 {
// "Find" a file every 5th poll
let filename = format!("data_{}.csv", chrono::Utc::now().timestamp());
info!("Found new file: {}", filename);
Some(filename)
} else {
None
}
}
}
#[async_trait]
impl Trigger for FileWatcherTrigger {
fn name(&self) -> &str {
&self.name
}
fn poll_interval(&self) -> Duration {
self.poll_interval
}
fn allow_concurrent(&self) -> bool {
false // Don't process same file twice
}
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
if let Some(filename) = self.check_for_new_files().await {
let mut ctx = Context::new();
ctx.insert("filename", serde_json::json!(filename))
.map_err(|e| TriggerError::PollError {
message: e.to_string(),
})?;
ctx.insert("watch_path", serde_json::json!(self.watch_path.clone()))
.map_err(|e| TriggerError::PollError {
message: e.to_string(),
})?;
Ok(TriggerResult::Fire(Some(ctx)))
} else {
Ok(TriggerResult::Skip)
}
}
}
Create tasks that will be triggered:
// src/tasks.rs
use cloacina::prelude::*;
use cloacina::task;
use tracing::info;
#[task(id = "validate_file", dependencies = [])]
pub async fn validate_file(
context: &mut Context<serde_json::Value>,
) -> Result<(), TaskError> {
let filename = context
.get("filename")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
info!("Validating file: {}", filename);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
context.insert("validated", serde_json::json!(true))?;
info!("File '{}' validated", filename);
Ok(())
}
#[task(id = "process_file", dependencies = ["validate_file"])]
pub async fn process_file(
context: &mut Context<serde_json::Value>,
) -> Result<(), TaskError> {
let filename = context
.get("filename")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
info!("Processing file: {}", filename);
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
context.insert("processed", serde_json::json!(true))?;
info!("File '{}' processed", filename);
Ok(())
}
Set up the runner with trigger scheduling enabled:
// src/main.rs
use cloacina::runner::{DefaultRunner, DefaultRunnerConfig};
use cloacina::trigger::register_trigger;
use cloacina::workflow;
use std::time::Duration;
use tracing::info;
mod tasks;
mod triggers;
use tasks::*;
use triggers::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("info")
.init();
// Configure runner with trigger scheduling
let mut config = DefaultRunnerConfig::default();
config.enable_trigger_scheduling = true;
config.trigger_base_poll_interval = Duration::from_secs(1);
let runner = DefaultRunner::with_config(
"sqlite://triggers.db?mode=rwc",
config,
).await?;
// Create workflow
let _workflow = workflow! {
name: "file_processing",
tasks: [validate_file, process_file]
};
// Create and register trigger
let trigger = FileWatcherTrigger::new(
"file_watcher",
"/data/inbox",
Duration::from_secs(2),
);
register_trigger(trigger.clone());
// Register trigger schedule with DAL
let dal = runner.dal();
dal.trigger_schedule().upsert(
cloacina::models::trigger_schedule::NewTriggerSchedule::new(
"file_watcher",
"file_processing",
Duration::from_secs(2),
)
).await?;
info!("Trigger registered. Running for 30 seconds...");
tokio::time::sleep(Duration::from_secs(30)).await;
runner.shutdown().await?;
Ok(())
}
Fire when a queue exceeds a threshold:
#[derive(Debug, Clone)]
pub struct QueueDepthTrigger {
name: String,
queue_name: String,
threshold: usize,
poll_interval: Duration,
}
#[async_trait]
impl Trigger for QueueDepthTrigger {
// ... other methods
fn allow_concurrent(&self) -> bool {
true // Allow parallel queue draining
}
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
let depth = self.get_queue_depth().await;
if depth >= self.threshold {
let mut ctx = Context::new();
ctx.insert("queue_depth", serde_json::json!(depth))?;
Ok(TriggerResult::Fire(Some(ctx)))
} else {
Ok(TriggerResult::Skip)
}
}
}
Fire recovery workflow after consecutive failures:
#[derive(Debug, Clone)]
pub struct HealthCheckTrigger {
name: String,
service_name: String,
failure_threshold: usize,
consecutive_failures: Arc<AtomicUsize>,
poll_interval: Duration,
}
#[async_trait]
impl Trigger for HealthCheckTrigger {
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
if self.check_service_health().await {
self.consecutive_failures.store(0, Ordering::SeqCst);
return Ok(TriggerResult::Skip);
}
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.failure_threshold {
self.consecutive_failures.store(0, Ordering::SeqCst);
let mut ctx = Context::new();
ctx.insert("service_name", serde_json::json!(self.service_name))?;
ctx.insert("consecutive_failures", serde_json::json!(failures))?;
Ok(TriggerResult::Fire(Some(ctx)))
} else {
Ok(TriggerResult::Skip)
}
}
}
// Good: Quick check
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
if file_exists(&self.path).await? {
Ok(TriggerResult::Fire(Some(ctx)))
} else {
Ok(TriggerResult::Skip)
}
}
// Bad: Heavy processing in poll
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
let data = download_large_file().await?; // Don't do this!
process_data(&data).await?;
Ok(TriggerResult::Fire(None))
}
// Good: Context identifies the specific item
ctx.insert("filename", json!(filename))?;
ctx.insert("file_hash", json!(hash))?;
// Bad: No identifying information
Ok(TriggerResult::Fire(None)) // All fires look identical!
allow_concurrent = false: File processing, service recoveryallow_concurrent = true: Queue processing, parallel scaling
async fn poll(&self) -> Result<TriggerResult, TriggerError> {
match self.check_condition().await {
Ok(true) => Ok(TriggerResult::Fire(None)),
Ok(false) => Ok(TriggerResult::Skip),
Err(e) => {
tracing::warn!("Check failed: {}", e);
Ok(TriggerResult::Skip) // Continue polling
}
}
}
You can define triggers entirely in Python using the @trigger decorator:
import cloaca
import random
# Create a workflow for the trigger to activate
with cloaca.WorkflowBuilder("data_processor") as builder:
builder.description("Process incoming data")
@cloaca.task(id="process_data")
def process_data(context):
filename = context.get("filename", "unknown")
print(f"Processing: {filename}")
context.set("processed", True)
return context
# Define a trigger using the @trigger decorator
@cloaca.trigger(
workflow="data_processor",
name="file_watcher",
poll_interval="5s",
allow_concurrent=False
)
def file_watcher():
# Check for new files (simulated)
if random.randint(1, 10) == 5:
ctx = cloaca.Context({"filename": "data_123.csv"})
return cloaca.TriggerResult.fire(ctx)
return cloaca.TriggerResult.skip()
The TriggerResult class has two static methods:
TriggerResult.skip()- Condition not met, continue pollingTriggerResult.fire(context=None)- Condition met, trigger the workflow
# Skip - don't fire, continue polling
result = cloaca.TriggerResult.skip()
assert result.is_skip_result() == True
# Fire without context
result = cloaca.TriggerResult.fire()
assert result.is_fire_result() == True
# Fire with context
ctx = cloaca.Context({"key": "value"})
result = cloaca.TriggerResult.fire(ctx)
You can also manage triggers programmatically:
import cloaca
runner = cloaca.DefaultRunner("sqlite://triggers.db")
# List all triggers
schedules = runner.list_trigger_schedules()
for schedule in schedules:
print(f"{schedule['trigger_name']}: {schedule['enabled']}")
# Enable/disable triggers
runner.set_trigger_enabled("file_watcher", False)
# View execution history
history = runner.get_trigger_execution_history("file_watcher")
for execution in history:
print(f"Started: {execution['started_at']}")
You’ve learned how to:
- Implement the
Triggertrait for custom conditions in Rust - Define triggers in Python using the
@triggerdecorator - Use
TriggerResult.skip()andTriggerResult.fire()to control trigger behavior - Pass context from triggers to workflows
- Use deduplication to prevent duplicate executions
- Apply common trigger patterns
- Manage triggers programmatically from Python
- Review the Trigger Rules explanation for deeper understanding
- Check
examples/features/event-triggers/for a full working example