01 - Your First Workflow
Welcome to your first Cloacina tutorial! In this guide, you’ll learn how to create and execute a simple workflow using Cloacina’s macro system. By the end of this tutorial, you’ll understand the basic concepts of tasks, workflows, context, and execution in Cloacina.
- Basic knowledge of Rust
- Rust toolchain installed (rustc, cargo)
- A code editor of your choice
10-15 minutes
Let’s start by creating a new Rust project. We’ll create it in a directory that’s a sibling to the Cloacina repository:
# Assuming you're in the parent directory of the Cloacina repository
mkdir -p my-cloacina-projects
cd my-cloacina-projects
cargo new first-workflow
cd first-workflow
Your directory structure should look like this:
.
├── cloacina/ # The Cloacina repository
└── my-cloacina-projects/ # Your projects directory
└── first-workflow/ # Your new project
├── Cargo.toml
└── src/
└── main.rs
Now, add Cloacina and its dependencies to your Cargo.toml
. Note that we’re using a relative path to the Cloacina repository:
[dependencies]
cloacina = { path = "../../cloacina", default-features = false, features = ["macros", "sqlite"] }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
async-trait = "0.1"
ctor = "0.2"
chrono = "0.4"
ImportantNormally you’d use
cloacina = "0.1.0"
in Cargo.toml. For these tutorials, we’re using path dependencies to vendor code locally.The path must be relative to your project. Examples:
- Next to Cloacina:
path = "../cloacina"
- In subdirectory:
path = "../../../cloacina"
Note: Use
version = "0.1.0"
when available on crates.io.
Each dependency serves a specific purpose in the Cloacina macro system:
async-trait
: Required for async functions in traits (macro expansion)ctor
: Enables static initialization beforemain()
chrono
: Timestamp handling for execution metadataserde_json
: Context serialization
These dependencies must be explicit because macro expansion happens at compile time, where transitive dependencies aren’t available.
Let’s create a simple workflow with a single task that prints a greeting message. Create a new file src/main.rs
with the following content:
//! Simple Cloacina Example
//!
//! This example demonstrates the most basic usage of Cloacina with a single task.
use cloacina::{task, workflow, Context, TaskError};
use cloacina::runner::DefaultRunner;
use serde_json::json;
use tracing::info;
/// A simple task that just logs a message
#[task(
id = "hello_world",
dependencies = []
)]
async fn hello_world(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
info!("Hello from Cloacina!");
// Add some data to context for demonstration
context.insert("message", json!("Hello World!"))?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter("simple_example=debug,cloacina=debug")
.init();
info!("Starting Simple Cloacina Example");
// Initialize runner with SQLite database (migrations run automatically)
let runner = DefaultRunner::new("simple_workflow.db").await?;
// Create a simple workflow (automatically registers in global registry)
let _workflow = workflow! {
name: "simple_workflow",
description: "A simple workflow with one task",
tasks: [
hello_world
]
};
// Create input context
let input_context = Context::new();
info!("Executing workflow");
// Execute the workflow (scheduler and runner managed automatically)
let result = runner.execute("simple_workflow", input_context).await?;
info!("Workflow completed with status: {:?}", result.status);
info!("Final context: {:?}", result.final_context);
// Shutdown the runner
runner.shutdown().await?;
info!("Simple example completed!");
Ok(())
}
Let’s walk through the code in execution order and understand why each component needs to be set up in this specific sequence:
-
Imports and Dependencies: First, we import all necessary components from Cloacina:
use cloacina::{task, workflow, Context, TaskError}; use cloacina::runner::DefaultRunner; use cloacina::runner::DefaultRunner;
These imports are needed because they define the core types and traits we’ll use throughout the program.
DefaultRunner
provides the interface for executing workflows and managing task pipelines. -
Task Definition: We define our task:
#[task(id = "hello_world", dependencies = [])] async fn hello_world(context: &mut Context<serde_json::Value>) -> Result<(), TaskError>
The task definition includes its ID and dependencies, which are used by the workflow system to build the execution graph.
-
Main Function Setup: The main function follows a specific sequence:
// 1. Initialize logging first - needed for all subsequent operations tracing_subscriber::fmt() .with_env_filter("simple_example=debug,cloacina=debug") .init(); // 2. Create the runner - this must happen before any workflow definition // because the workflow! macro registers workflows in a global registry // that the runner needs to access let runner = DefaultRunner::new("simple_workflow.db").await?; // 3. Define the workflow - the workflow! macro will automatically register // it in the global registry that the executor uses let _workflow = workflow! { name: "simple_workflow", description: "A simple workflow with one task", tasks: [hello_world] };
This sequence is important because:
- Logging must be initialized first to capture all subsequent operations
- The runner must be created before workflows because it manages the workflow registry
- The workflow! macro automatically registers workflows in the global registry that the runner uses
-
Workflow Execution: Only after all components are set up can we execute the workflow:
// Create and execute with input context let input_context = Context::new(); let result = runner.execute("simple_workflow", input_context).await?;
The execution requires:
- A properly initialized runner
- A registered workflow
- An input context
-
Cleanup: Finally, we properly shut down the runner:
runner.shutdown().await?;
This ensures all resources are properly released and the database connection is closed gracefully.
This ordered approach ensures that each component has its dependencies available when needed, and resources are properly managed throughout the workflow’s lifecycle.
Workflow PowerWhile this example shows a single task, Cloacina’s workflows are designed to handle complex business processes through:
- Task Dependencies: Define clear relationships between tasks, ensuring they run in the correct order
- Data Management: Share and transform data between tasks using the Context system
- Error Handling: Consistent error handling and recovery across all tasks
- Parallel Execution: Automatically run independent tasks in parallel
- Retry Management: Configure and manage task retries with:
- Custom retry policies
- Automatic retry scheduling
- State preservation between attempts
In the next tutorials, you’ll learn how to build these features into your workflows.
You can run this tutorial in two ways:
If you’re following along with the Cloacina repository, you can use angreal to run the tutorial:
# From the Cloacina repository root
angreal tutorials 01
This will run the tutorial code with all necessary dependencies.
If you’re building the project manually, simply run your workflow with:
cargo run
You should see output similar to:
INFO simple_example > Starting Simple Cloacina Example
INFO simple_example > Executing workflow
INFO simple_example > Hello from Cloacina!
INFO simple_example > Workflow completed with status: Success
INFO simple_example > Final context: {"message": "Hello World!"}
INFO simple_example > Simple example completed!
Congratulations! You’ve created and executed your first Cloacina workflow. In the next tutorial, we’ll explore:
- Adding dependencies between tasks
- Working with different types of context data
- Error handling and recovery
You can download the complete example code from our GitHub repository.