08 - Packaged Triggers
In this tutorial, you’ll learn how to define event-driven triggers alongside your Python workflows so they can be packaged and deployed to the Cloacina daemon. While Tutorial 7 introduced triggers running directly in your Python process, this tutorial focuses on the packaging story — how triggers and workflows are declared, bundled, and auto-registered when loaded by the reconciler (the daemon component that discovers, validates, and registers packages).
- Define triggers alongside tasks in a packaged workflow
- Understand the relationship between
@cloaca.triggerdecorators and manifest declarations - Package a trigger-bearing workflow as a
.cloacinaarchive - See how the reconciler wires triggers to workflows on load
- Completion of Tutorial 7: Event Triggers
- Familiarity with the daemon (see Running the Daemon)
20-25 minutes
Start with a workflow that the trigger will fire. Create a file called data_ingest/__init__.py:
import cloaca
from datetime import datetime
with cloaca.WorkflowBuilder("data_ingest") as builder:
builder.description("Ingest data files detected by trigger")
@cloaca.task(id="validate")
def validate(context):
"""Validate the incoming data file."""
filename = context.get("filename", "unknown")
print(f" Validating: {filename}")
context.set("valid", True)
return context
@cloaca.task(id="load", dependencies=["validate"])
def load(context):
"""Load validated data into the warehouse."""
filename = context.get("filename", "unknown")
print(f" Loading: {filename}")
context.set("loaded_at", datetime.now().isoformat())
return context
This is a standard workflow — nothing special about triggers yet.
Add a trigger at module level, outside the WorkflowBuilder context. Triggers are registered independently from the workflow’s task graph:
@cloaca.trigger(
name="inbox_watcher",
poll_interval="5s",
allow_concurrent=False
)
def inbox_watcher():
"""
Poll for new files in the inbox directory.
Returns TriggerResult.fire() with context when a new file
is detected, or TriggerResult.skip() when nothing is found.
"""
# In a real trigger, you'd check a filesystem, API, queue, etc.
import os
inbox = os.environ.get("INBOX_PATH", "/data/inbox/")
new_files = [f for f in os.listdir(inbox) if f.endswith(".parquet")]
if new_files:
filename = new_files[0]
ctx = cloaca.Context({
"filename": filename,
"trigger_name": "inbox_watcher",
"triggered_at": datetime.now().isoformat(),
})
return cloaca.TriggerResult.fire(ctx)
return cloaca.TriggerResult.skip()
Notice the three parameters: name identifies the trigger (and must match the manifest declaration), poll_interval controls how often the function is called, and allow_concurrent=False prevents overlapping executions. See the Package Manifest Reference for the full field listing.
When this workflow is packaged as a .cloacina archive, the manifest (manifest.json) declares both the tasks and the triggers. Here’s what the triggers section looks like:
{
"triggers": [
{
"name": "inbox_watcher",
"trigger_type": "python",
"workflow": "data_ingest",
"poll_interval": "5s",
"allow_concurrent": false,
"config": { "path": "/data/inbox/" }
}
]
}
The name must match your @cloaca.trigger(name=...) value exactly, and workflow tells the reconciler which workflow to fire. See the Package Manifest Reference for the complete schema.
Create a pyproject.toml for the package:
[project]
name = "data-ingest"
version = "1.0.0"
description = "File ingestion workflow with inbox watcher trigger"
requires-python = ">=3.10"
[tool.cloaca]
entry_module = "data_ingest"
The entry_module tells the loader which Python module to import for task and trigger discovery.
Before packaging, test the trigger and workflow in library mode:
def test_trigger_and_workflow():
"""Simulate what the daemon does on trigger fire."""
runner = cloaca.DefaultRunner(":memory:")
try:
# Simulate a trigger poll that fires
result = inbox_watcher()
if result.is_fire_result():
# Execute the workflow with the trigger's context
context = cloaca.Context({
"filename": "orders_20260328.parquet",
"trigger_name": "inbox_watcher",
})
wf_result = runner.execute("data_ingest", context)
print(f"Workflow status: {wf_result.status}")
finally:
runner.shutdown()
Copy your .cloacina package into the daemon’s watch directory. The reconciler will import your module, match your @cloaca.trigger decorator to the manifest declaration, and start polling automatically.
cp data-ingest-1.0.0.cloacina ~/.cloacina/packages/
Name AgreementThenamein@cloaca.trigger(name="inbox_watcher")must match thenamein the manifest’striggersarray. If they disagree, the reconciler will reject the package.
@cloaca.triggerprovides the poll implementation; the manifest declares it for the reconciler- Both must agree on the trigger name
- Triggers are packaged alongside tasks in the same
.cloacinaarchive - The reconciler wires them together on package load
- Computation Graphs — reactive, event-driven processing
- Package Manifest Reference — full manifest schema
- Running the Daemon — deploy your package