Trigger
Python trigger support for event-driven workflow execution.
This module provides Python bindings for defining triggers that poll user-defined conditions and fire workflows when those conditions are met.
Rust Implementation: cloacina::python::trigger::PyTriggerResult
Python TriggerResult class - represents the result of a trigger poll.
Use TriggerResult.skip() when the condition is not met.
Use TriggerResult.fire(context=None) when the condition is met.
skip() -> Self
Rust Implementation: cloacina::python::trigger::PyTriggerResult::skip
Create a Skip result - condition not met, continue polling.
Source
fn skip() -> Self {
PyTriggerResult {
is_fire: false,
data: None,
}
}
fire(context: Optional[PyContext]) -> Self
Rust Implementation: cloacina::python::trigger::PyTriggerResult::fire
Create a Fire result - condition met, trigger the workflow.
Parameters:
| Name | Type | Description |
|---|---|---|
context |
Optional[PyContext] |
Optional context to pass to the workflow |
Source
fn fire(context: Option<&PyContext>) -> Self {
let data = context.map(|c| c.get_data_clone());
PyTriggerResult {
is_fire: true,
data,
}
}
__repr__() -> str
Rust Implementation: cloacina::python::trigger::PyTriggerResult::repr
Source
fn __repr__(&self) -> String {
if !self.is_fire {
"TriggerResult.Skip".to_string()
} else if self.data.is_none() {
"TriggerResult.Fire(None)".to_string()
} else {
"TriggerResult.Fire(<context>)".to_string()
}
}
is_fire_result() -> bool
Rust Implementation: cloacina::python::trigger::PyTriggerResult::is_fire_result
Check if this is a Fire result
Source
fn is_fire_result(&self) -> bool {
self.is_fire
}
is_skip_result() -> bool
Rust Implementation: cloacina::python::trigger::PyTriggerResult::is_skip_result
Check if this is a Skip result
Source
fn is_skip_result(&self) -> bool {
!self.is_fire
}
Rust Implementation: cloacina::python::trigger::TriggerDecorator
Decorator class that holds trigger configuration
__call__(func: Any) -> Any
Rust Implementation: cloacina::python::trigger::TriggerDecorator::call
Source
pub fn __call__(&self, py: Python, func: PyObject) -> PyResult<PyObject> {
// Determine trigger name - use provided name or derive from function name
let trigger_name = if let Some(name) = &self.name {
name.clone()
} else {
func.getattr(py, "__name__")?.extract::<String>(py)?
};
// Store values for the closure
let workflow_name = self.workflow.clone();
let poll_interval = self.poll_interval;
let allow_concurrent = self.allow_concurrent;
let name_for_constructor = trigger_name.clone();
// Create Arc'd function for sharing with constructor
let shared_function = Arc::new(func.clone_ref(py));
// Register trigger constructor in the global registry
crate::trigger::register_trigger_constructor(trigger_name.clone(), move || {
let function_clone = Python::with_gil(|py| (*shared_function).clone_ref(py));
Arc::new(PythonTriggerWrapper {
name: name_for_constructor.clone(),
workflow_name: workflow_name.clone(),
poll_interval,
allow_concurrent,
python_function: function_clone,
}) as Arc<dyn Trigger>
});
tracing::info!(
trigger_name = %trigger_name,
workflow = %self.workflow,
poll_interval_ms = %self.poll_interval.as_millis(),
"Registered Python trigger"
);
// Return the original function
Ok(func)
}
trigger(workflow: str, name: Optional[str], poll_interval: str, allow_concurrent: bool) -> TriggerDecorator
Rust Implementation: cloacina::python::trigger::trigger
Python @trigger decorator function
This function is exposed to Python as a decorator that registers Python functions as triggers in the Cloacina trigger scheduler.
Parameters:
| Name | Type | Description |
|---|---|---|
workflow |
str |
|
name |
Optional[str] |
|
poll_interval |
str |
|
allow_concurrent |
bool |
Examples:
import cloaca
import random
@cloaca.trigger(
workflow="my_workflow",
poll_interval="5s",
allow_concurrent=False
)
def my_trigger():
# Check some condition
if random.randint(1, 100) == 42:
ctx = cloaca.Context({"triggered_at": "now"})
return cloaca.TriggerResult.fire(ctx)
return cloaca.TriggerResult.skip()
Source
pub fn trigger(
workflow: String,
name: Option<String>,
poll_interval: &str,
allow_concurrent: bool,
) -> PyResult<TriggerDecorator> {
let duration = parse_duration(poll_interval)
.map_err(|e| PyValueError::new_err(format!("Invalid poll_interval: {}", e)))?;
Ok(TriggerDecorator {
name,
workflow,
poll_interval: duration,
allow_concurrent,
})
}