Skip to main content
Cloacina Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Workflow Context

cloaca.python.workflow_context Binding

Classes

cloaca.python.workflow_context.WorkflowContext

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext

WorkflowContext provides namespace management for Python workflows

Methods

new
new(tenant_id: str, package_name: str, workflow_id: str) -> Self

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::new

Create a new WorkflowContext

Parameters:

Name Type Description
tenant_id str
package_name str
workflow_id str
Source
    pub fn new(tenant_id: &str, package_name: &str, workflow_id: &str) -> Self {
        Self {
            tenant_id: tenant_id.to_string(),
            package_name: package_name.to_string(),
            workflow_id: workflow_id.to_string(),
        }
    }
tenant_id
tenant_id() -> str

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::tenant_id

Get tenant ID

Source
    pub fn tenant_id(&self) -> &str {
        &self.tenant_id
    }
package_name
package_name() -> str

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::package_name

Get package name

Source
    pub fn package_name(&self) -> &str {
        &self.package_name
    }
workflow_id
workflow_id() -> str

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::workflow_id

Get workflow ID

Source
    pub fn workflow_id(&self) -> &str {
        &self.workflow_id
    }
task_namespace
task_namespace(task_id: str) -> PyTaskNamespace

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::task_namespace

Generate a TaskNamespace for a task within this workflow context

Parameters:

Name Type Description
task_id str
Source
    pub fn task_namespace(&self, task_id: &str) -> PyTaskNamespace {
        PyTaskNamespace::from_rust(crate::TaskNamespace::new(
            &self.tenant_id,
            &self.package_name,
            &self.workflow_id,
            task_id,
        ))
    }
resolve_dependency
resolve_dependency(task_name: str) -> PyTaskNamespace

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::resolve_dependency

Resolve a dependency task name to a full TaskNamespace within this context

Parameters:

Name Type Description
task_name str
Source
    pub fn resolve_dependency(&self, task_name: &str) -> PyTaskNamespace {
        self.task_namespace(task_name)
    }
workflow_namespace
workflow_namespace() -> PyTaskNamespace

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::workflow_namespace

Get the workflow namespace (without task_id)

Source
    pub fn workflow_namespace(&self) -> PyTaskNamespace {
        PyTaskNamespace::from_rust(crate::TaskNamespace::new(
            &self.tenant_id,
            &self.package_name,
            &self.workflow_id,
            "",
        ))
    }
contains_namespace
contains_namespace(namespace: PyTaskNamespace) -> bool

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::contains_namespace

Check if a namespace belongs to this workflow context

Parameters:

Name Type Description
namespace PyTaskNamespace
Source
    pub fn contains_namespace(&self, namespace: &PyTaskNamespace) -> bool {
        namespace.tenant_id() == self.tenant_id
            && namespace.package_name() == self.package_name
            && namespace.workflow_id() == self.workflow_id
    }
__str__
__str__() -> str

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::str

String representation

Source
    pub fn __str__(&self) -> String {
        format!(
            "{}::{}::{}",
            self.tenant_id, self.package_name, self.workflow_id
        )
    }
__repr__
__repr__() -> str

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::repr

String representation

Source
    pub fn __repr__(&self) -> String {
        format!(
            "WorkflowContext('{}', '{}', '{}')",
            self.tenant_id, self.package_name, self.workflow_id
        )
    }
__eq__
__eq__(other: PyWorkflowContext) -> bool

Rust Implementation: cloacina::python::workflow_context::PyWorkflowContext::eq

Equality comparison

Parameters:

Name Type Description
other PyWorkflowContext
Source
    pub fn __eq__(&self, other: &PyWorkflowContext) -> bool {
        self.tenant_id == other.tenant_id
            && self.package_name == other.package_name
            && self.workflow_id == other.workflow_id
    }