Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Workflow

cloaca.python.workflow Binding

Classes

cloaca.python.workflow.WorkflowBuilder

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder

Python wrapper for WorkflowBuilder

Methods

new
new(name: str, tenant: Optional[str], package: Optional[str], workflow: Optional[str]) -> Self

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::new

Create a new WorkflowBuilder with namespace context

Parameters:

Name Type Description
name str
tenant Optional[str]
package Optional[str]
workflow Optional[str]
Source
    pub fn new(
        name: &str,
        tenant: Option<&str>,
        package: Option<&str>,
        workflow: Option<&str>,
    ) -> Self {
        let context = PyWorkflowContext::new(
            tenant.unwrap_or("public"),
            package.unwrap_or("embedded"),
            workflow.unwrap_or(name),
        );

        let (tenant_id, _package_name, _workflow_id) = context.as_components();
        let workflow_builder = crate::Workflow::builder(name).tenant(tenant_id);

        PyWorkflowBuilder {
            inner: workflow_builder,
            context,
        }
    }
description
description(description: str)

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::description

Set the workflow description

Parameters:

Name Type Description
description str
Source
    pub fn description(&mut self, description: &str) {
        self.inner = self.inner.clone().description(description);
    }
tag
tag(key: str, value: str)

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::tag

Add a tag to the workflow

Parameters:

Name Type Description
key str
value str
Source
    pub fn tag(&mut self, key: &str, value: &str) {
        self.inner = self.inner.clone().tag(key, value);
    }
add_task
add_task(task: Any) -> None

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::add_task

Add a task to the workflow by ID or function reference

Parameters:

Name Type Description
task Any
Source
    pub fn add_task(&mut self, py: Python, task: PyObject) -> PyResult<()> {
        if let Ok(task_id) = task.extract::<String>(py) {
            let registry = crate::task::global_task_registry();

            let (tenant_id, package_name, workflow_id) = self.context.as_components();
            let task_namespace =
                crate::TaskNamespace::new(tenant_id, package_name, workflow_id, &task_id);
            let guard = registry.read();

            let constructor = guard.get(&task_namespace).ok_or_else(|| {
                PyValueError::new_err(format!(
                    "Task '{}' not found in registry. Make sure it was decorated with @task.",
                    task_id
                ))
            })?;

            let task_instance = constructor();

            self.inner = self
                .inner
                .clone()
                .add_task(task_instance)
                .map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;

            Ok(())
        } else {
            match task.bind(py).hasattr("__name__") {
                Ok(true) => {
                    match task.getattr(py, "__name__") {
                        Ok(name_obj) => {
                            match name_obj.extract::<String>(py) {
                                Ok(func_name) => {
                                    let registry = crate::task::global_task_registry();

                                    let (tenant_id, package_name, workflow_id) = self.context.as_components();
                                    let task_namespace = crate::TaskNamespace::new(tenant_id, package_name, workflow_id, &func_name);
                                    let guard = registry.read();

                                    let constructor = guard.get(&task_namespace).ok_or_else(|| {
                                        PyValueError::new_err(format!(
                                            "Task '{}' not found in registry. Make sure it was decorated with @task.",
                                            func_name
                                        ))
                                    })?;

                                    let task_instance = constructor();

                                    self.inner = self.inner.clone().add_task(task_instance)
                                        .map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;

                                    Ok(())
                                },
                                Err(e) => {
                                    Err(PyValueError::new_err(format!(
                                        "Function has __name__ but it's not a string: {}",
                                        e
                                    )))
                                }
                            }
                        },
                        Err(e) => {
                            Err(PyValueError::new_err(format!(
                                "Failed to get __name__ from function: {}",
                                e
                            )))
                        }
                    }
                },
                Ok(false) => {
                    Err(PyValueError::new_err(
                        "Task must be either a string task ID or a function object with __name__ attribute"
                    ))
                },
                Err(e) => {
                    Err(PyValueError::new_err(format!(
                        "Failed to check if object has __name__ attribute: {}",
                        e
                    )))
                }
            }
        }
    }
build
build() -> PyWorkflow

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::build

Build the workflow

Source
    pub fn build(&self) -> PyResult<PyWorkflow> {
        let workflow = self
            .inner
            .clone()
            .build()
            .map_err(|e| PyValueError::new_err(format!("Failed to build workflow: {}", e)))?;
        Ok(PyWorkflow { inner: workflow })
    }
__enter__
__enter__(slf: PyRef<Self>) -> PyRef<Self>

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::enter

Context manager entry - establish workflow context for task decorators

Parameters:

Name Type Description
slf PyRef<Self>
Source
    pub fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
        push_workflow_context(slf.context.clone());
        slf
    }
__exit__
__exit__(_py: Python, _exc_type: Optional[Any], _exc_value: Optional[Any], _traceback: Optional[Any]) -> bool

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::exit

Context manager exit - clean up context and build workflow

Parameters:

Name Type Description
_py Python
_exc_type Optional[Any]
_exc_value Optional[Any]
_traceback Optional[Any]
Source
    pub fn __exit__(
        &mut self,
        _py: Python,
        _exc_type: Option<&Bound<PyAny>>,
        _exc_value: Option<&Bound<PyAny>>,
        _traceback: Option<&Bound<PyAny>>,
    ) -> PyResult<bool> {
        pop_workflow_context();

        let (tenant_id, package_name, workflow_id) = self.context.as_components();

        let mut workflow = crate::Workflow::new(workflow_id);
        workflow.set_tenant(tenant_id);
        workflow.set_package(package_name);

        // Preserve description and tags set on the builder during the `with` block
        if let Some(desc) = self.inner.get_description() {
            workflow.set_description(desc);
        }
        for (key, value) in self.inner.get_tags() {
            workflow.add_tag(key, value);
        }

        let registry = crate::task::global_task_registry();
        let guard = registry.read();

        for (namespace, constructor) in guard.iter() {
            if namespace.tenant_id == tenant_id
                && namespace.package_name == package_name
                && namespace.workflow_id == workflow_id
            {
                let task_instance = constructor();
                workflow
                    .add_task(task_instance)
                    .map_err(|e| PyValueError::new_err(format!("Failed to add task: {}", e)))?;
            }
        }

        drop(guard);

        workflow
            .validate()
            .map_err(|e| PyValueError::new_err(format!("Workflow validation failed: {}", e)))?;
        let final_workflow = workflow.finalize();

        let workflow_name = final_workflow.name().to_string();
        crate::workflow::register_workflow_constructor(workflow_name, move || {
            final_workflow.clone()
        });

        Ok(false)
    }
__repr__
__repr__() -> str

Rust Implementation: cloacina::python::workflow::PyWorkflowBuilder::repr

String representation

Source
    pub fn __repr__(&self) -> String {
        format!("WorkflowBuilder(name='{}')", self.inner.name())
    }

cloaca.python.workflow.Workflow

Rust Implementation: cloacina::python::workflow::PyWorkflow

Python wrapper for Workflow

Methods

name
name() -> str

Rust Implementation: cloacina::python::workflow::PyWorkflow::name

Get workflow name

Source
    pub fn name(&self) -> &str {
        self.inner.name()
    }
description
description() -> str

Rust Implementation: cloacina::python::workflow::PyWorkflow::description

Get workflow description

Source
    pub fn description(&self) -> String {
        self.inner
            .metadata()
            .description
            .clone()
            .unwrap_or_default()
    }
version
version() -> str

Rust Implementation: cloacina::python::workflow::PyWorkflow::version

Get workflow version

Source
    pub fn version(&self) -> &str {
        &self.inner.metadata().version
    }
topological_sort
topological_sort() -> List[str]

Rust Implementation: cloacina::python::workflow::PyWorkflow::topological_sort

Get topological sort of tasks

Source
    pub fn topological_sort(&self) -> PyResult<Vec<String>> {
        self.inner
            .topological_sort()
            .map(|namespaces| namespaces.into_iter().map(|ns| ns.to_string()).collect())
            .map_err(|e| PyValueError::new_err(format!("Failed to sort tasks: {}", e)))
    }
get_execution_levels
get_execution_levels() -> List[List[str]]

Rust Implementation: cloacina::python::workflow::PyWorkflow::get_execution_levels

Get execution levels (tasks that can run in parallel)

Source
    pub fn get_execution_levels(&self) -> PyResult<Vec<Vec<String>>> {
        self.inner
            .get_execution_levels()
            .map(|levels| {
                levels
                    .into_iter()
                    .map(|level| level.into_iter().map(|ns| ns.to_string()).collect())
                    .collect()
            })
            .map_err(|e| PyValueError::new_err(format!("Failed to get execution levels: {}", e)))
    }
get_roots
get_roots() -> List[str]

Rust Implementation: cloacina::python::workflow::PyWorkflow::get_roots

Get root tasks (no dependencies)

Source
    pub fn get_roots(&self) -> Vec<String> {
        self.inner
            .get_roots()
            .into_iter()
            .map(|ns| ns.to_string())
            .collect()
    }
get_leaves
get_leaves() -> List[str]

Rust Implementation: cloacina::python::workflow::PyWorkflow::get_leaves

Get leaf tasks (no dependents)

Source
    pub fn get_leaves(&self) -> Vec<String> {
        self.inner
            .get_leaves()
            .into_iter()
            .map(|ns| ns.to_string())
            .collect()
    }
validate
validate() -> None

Rust Implementation: cloacina::python::workflow::PyWorkflow::validate

Validate the workflow

Source
    pub fn validate(&self) -> PyResult<()> {
        self.inner
            .validate()
            .map_err(|e| PyValueError::new_err(format!("Workflow validation failed: {}", e)))
    }
__repr__
__repr__() -> str

Rust Implementation: cloacina::python::workflow::PyWorkflow::repr

String representation

Source
    pub fn __repr__(&self) -> String {
        format!(
            "Workflow(name='{}', tasks={})",
            self.inner.name(),
            self.inner.get_task_ids().len()
        )
    }

Functions

cloaca.python.workflow.register_workflow_constructor

register_workflow_constructor(name: str, constructor: Any) -> None

Rust Implementation: cloacina::python::workflow::register_workflow_constructor

Register a workflow constructor function

Parameters:

Name Type Description
name str
constructor Any
Source
pub fn register_workflow_constructor(name: String, constructor: PyObject) -> PyResult<()> {
    Python::with_gil(|py| {
        let workflow_obj = constructor.call0(py).map_err(|e| {
            PyValueError::new_err(format!("Failed to call workflow constructor: {}", e))
        })?;

        let py_workflow: PyWorkflow = workflow_obj.extract(py).map_err(|e| {
            PyValueError::new_err(format!(
                "Failed to extract workflow from constructor: {}",
                e
            ))
        })?;

        let workflow = py_workflow.inner.clone();
        crate::workflow::register_workflow_constructor(name, move || workflow.clone());

        Ok(())
    })
}