Workflow
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder
Python wrapper for WorkflowBuilder
new(name: str, tenant: Optional[str], package: Optional[str], workflow: Optional[str]) -> Self
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::new
Create a new WorkflowBuilder with namespace context
Parameters:
| Name | Type | Description |
|---|---|---|
name |
str |
|
tenant |
Optional[str] |
|
package |
Optional[str] |
|
workflow |
Optional[str] |
Source
pub fn new(
name: &str,
tenant: Option<&str>,
package: Option<&str>,
workflow: Option<&str>,
) -> Self {
let context = PyWorkflowContext::new(
tenant.unwrap_or("public"),
package.unwrap_or("embedded"),
workflow.unwrap_or(name),
);
let (tenant_id, _package_name, _workflow_id) = context.as_components();
let workflow_builder = crate::Workflow::builder(name).tenant(tenant_id);
PyWorkflowBuilder {
inner: workflow_builder,
context,
}
}
description(description: str)
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::description
Set the workflow description
Parameters:
| Name | Type | Description |
|---|---|---|
description |
str |
Source
pub fn description(&mut self, description: &str) {
self.inner = self.inner.clone().description(description);
}
tag(key: str, value: str)
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::tag
Add a tag to the workflow
Parameters:
| Name | Type | Description |
|---|---|---|
key |
str |
|
value |
str |
Source
pub fn tag(&mut self, key: &str, value: &str) {
self.inner = self.inner.clone().tag(key, value);
}
add_task(task: Any) -> None
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::add_task
Add a task to the workflow by ID or function reference
Parameters:
| Name | Type | Description |
|---|---|---|
task |
Any |
Source
pub fn add_task(&mut self, py: Python, task: PyObject) -> PyResult<()> {
if let Ok(task_id) = task.extract::<String>(py) {
let registry = crate::task::global_task_registry();
let (tenant_id, package_name, workflow_id) = self.context.as_components();
let task_namespace =
crate::TaskNamespace::new(tenant_id, package_name, workflow_id, &task_id);
let guard = registry.read();
let constructor = guard.get(&task_namespace).ok_or_else(|| {
PyValueError::new_err(format!(
"Task '{}' not found in registry. Make sure it was decorated with @task.",
task_id
))
})?;
let task_instance = constructor();
self.inner = self
.inner
.clone()
.add_task(task_instance)
.map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;
Ok(())
} else {
match task.bind(py).hasattr("__name__") {
Ok(true) => {
match task.getattr(py, "__name__") {
Ok(name_obj) => {
match name_obj.extract::<String>(py) {
Ok(func_name) => {
let registry = crate::task::global_task_registry();
let (tenant_id, package_name, workflow_id) = self.context.as_components();
let task_namespace = crate::TaskNamespace::new(tenant_id, package_name, workflow_id, &func_name);
let guard = registry.read();
let constructor = guard.get(&task_namespace).ok_or_else(|| {
PyValueError::new_err(format!(
"Task '{}' not found in registry. Make sure it was decorated with @task.",
func_name
))
})?;
let task_instance = constructor();
self.inner = self.inner.clone().add_task(task_instance)
.map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;
Ok(())
},
Err(e) => {
Err(PyValueError::new_err(format!(
"Function has __name__ but it's not a string: {}",
e
)))
}
}
},
Err(e) => {
Err(PyValueError::new_err(format!(
"Failed to get __name__ from function: {}",
e
)))
}
}
},
Ok(false) => {
Err(PyValueError::new_err(
"Task must be either a string task ID or a function object with __name__ attribute"
))
},
Err(e) => {
Err(PyValueError::new_err(format!(
"Failed to check if object has __name__ attribute: {}",
e
)))
}
}
}
}
build() -> PyWorkflow
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::build
Build the workflow
Source
pub fn build(&self) -> PyResult<PyWorkflow> {
let workflow = self
.inner
.clone()
.build()
.map_err(|e| PyValueError::new_err(format!("Failed to build workflow: {}", e)))?;
Ok(PyWorkflow { inner: workflow })
}
__enter__(slf: PyRef<Self>) -> PyRef<Self>
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::enter
Context manager entry - establish workflow context for task decorators
Parameters:
| Name | Type | Description |
|---|---|---|
slf |
PyRef<Self> |
Source
pub fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
push_workflow_context(slf.context.clone());
slf
}
__exit__(_py: Python, _exc_type: Optional[Any], _exc_value: Optional[Any], _traceback: Optional[Any]) -> bool
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::exit
Context manager exit - clean up context and build workflow
Parameters:
| Name | Type | Description |
|---|---|---|
_py |
Python |
|
_exc_type |
Optional[Any] |
|
_exc_value |
Optional[Any] |
|
_traceback |
Optional[Any] |
Source
pub fn __exit__(
&mut self,
_py: Python,
_exc_type: Option<&Bound<PyAny>>,
_exc_value: Option<&Bound<PyAny>>,
_traceback: Option<&Bound<PyAny>>,
) -> PyResult<bool> {
pop_workflow_context();
let (tenant_id, package_name, workflow_id) = self.context.as_components();
let mut workflow = crate::Workflow::new(workflow_id);
workflow.set_tenant(tenant_id);
workflow.set_package(package_name);
// Preserve description and tags set on the builder during the `with` block
if let Some(desc) = self.inner.get_description() {
workflow.set_description(desc);
}
for (key, value) in self.inner.get_tags() {
workflow.add_tag(key, value);
}
let registry = crate::task::global_task_registry();
let guard = registry.read();
for (namespace, constructor) in guard.iter() {
if namespace.tenant_id == tenant_id
&& namespace.package_name == package_name
&& namespace.workflow_id == workflow_id
{
let task_instance = constructor();
workflow
.add_task(task_instance)
.map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;
}
}
drop(guard);
workflow
.validate()
.map_err(|e| PyValueError::new_err(format!("Workflow validation failed: {}", e)))?;
let final_workflow = workflow.finalize();
let workflow_name = final_workflow.name().to_string();
crate::workflow::register_workflow_constructor(workflow_name, move || {
final_workflow.clone()
});
Ok(false)
}
__repr__() -> str
Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::repr
String representation
Source
pub fn __repr__(&self) -> String {
format!("WorkflowBuilder(name='{}')", self.inner.name())
}
Rust Implementation: cloacina::python::workflow::PyWorkflow
Python wrapper for Workflow
name() -> str
Rust Implementation: cloacina::python::workflow::PyWorkflow::name
Get workflow name
Source
pub fn name(&self) -> &str {
self.inner.name()
}
description() -> str
Rust Implementation: cloacina::python::workflow::PyWorkflow::description
Get workflow description
Source
pub fn description(&self) -> String {
self.inner
.metadata()
.description
.clone()
.unwrap_or_default()
}
version() -> str
Rust Implementation: cloacina::python::workflow::PyWorkflow::version
Get workflow version
Source
pub fn version(&self) -> &str {
&self.inner.metadata().version
}
topological_sort() -> List[str]
Rust Implementation: cloacina::python::workflow::PyWorkflow::topological_sort
Get topological sort of tasks
Source
pub fn topological_sort(&self) -> PyResult<Vec<String>> {
self.inner
.topological_sort()
.map(|namespaces| namespaces.into_iter().map(|ns| ns.to_string()).collect())
.map_err(|e| PyValueError::new_err(format!("Failed to sort tasks: {}", e)))
}
get_execution_levels() -> List[List[str]]
Rust Implementation: cloacina::python::workflow::PyWorkflow::get_execution_levels
Get execution levels (tasks that can run in parallel)
Source
pub fn get_execution_levels(&self) -> PyResult<Vec<Vec<String>>> {
self.inner
.get_execution_levels()
.map(|levels| {
levels
.into_iter()
.map(|level| level.into_iter().map(|ns| ns.to_string()).collect())
.collect()
})
.map_err(|e| PyValueError::new_err(format!("Failed to get execution levels: {}", e)))
}
get_roots() -> List[str]
Rust Implementation: cloacina::python::workflow::PyWorkflow::get_roots
Get root tasks (no dependencies)
Source
pub fn get_roots(&self) -> Vec<String> {
self.inner
.get_roots()
.into_iter()
.map(|ns| ns.to_string())
.collect()
}
get_leaves() -> List[str]
Rust Implementation: cloacina::python::workflow::PyWorkflow::get_leaves
Get leaf tasks (no dependents)
Source
pub fn get_leaves(&self) -> Vec<String> {
self.inner
.get_leaves()
.into_iter()
.map(|ns| ns.to_string())
.collect()
}
validate() -> None
Rust Implementation: cloacina::python::workflow::PyWorkflow::validate
Validate the workflow
Source
pub fn validate(&self) -> PyResult<()> {
self.inner
.validate()
.map_err(|e| PyValueError::new_err(format!("Workflow validation failed: {}", e)))
}
__repr__() -> str
Rust Implementation: cloacina::python::workflow::PyWorkflow::repr
String representation
Source
pub fn __repr__(&self) -> String {
format!(
"Workflow(name='{}', tasks={})",
self.inner.name(),
self.inner.get_task_ids().len()
)
}
register_workflow_constructor(name: str, constructor: Any) -> None
Rust Implementation: cloacina::python::workflow::register_workflow_constructor
Register a workflow constructor function
Parameters:
| Name | Type | Description |
|---|---|---|
name |
str |
|
constructor |
Any |
Source
pub fn register_workflow_constructor(name: String, constructor: PyObject) -> PyResult<()> {
Python::with_gil(|py| {
let workflow_obj = constructor.call0(py).map_err(|e| {
PyValueError::new_err(format!("Failed to call workflow constructor: {}", e))
})?;
let py_workflow: PyWorkflow = workflow_obj.extract(py).map_err(|e| {
PyValueError::new_err(format!(
"Failed to extract workflow from constructor: {}",
e
))
})?;
let workflow = py_workflow.inner.clone();
crate::workflow::register_workflow_constructor(name, move || workflow.clone());
Ok(())
})
}