10 - Task Deferral
This tutorial walks through using TaskHandle and its defer_until method to release a concurrency slot while a task polls an external condition. When the condition is met, the task reclaims a slot and resumes execution. This pattern is useful for tasks that spend most of their time waiting on I/O, external services, or file availability without occupying a concurrency slot the whole time.
Before starting this tutorial, you should:
- Have completed Tutorial 1 - First Workflow
- Be comfortable with the
#[task]and#[workflow]macros - Understand async/await patterns in Rust
15-20 minutes
- How to accept a
TaskHandlein a task function - How
defer_untilreleases and reclaims concurrency slots - How to compose deferred and non-deferred tasks in a single workflow
- How the executor manages slots across deferred tasks
TaskHandle is an optional second parameter that a #[task] function can accept. The macro system detects parameters named handle or task_handle and automatically arranges for the executor to provide a TaskHandle at runtime.
The handle provides access to concurrency slot management. Tasks that do not need this capability omit the parameter entirely and behave as before.
defer_until is the primary method on TaskHandle. It:
- Releases the executor’s concurrency slot so other tasks can run
- Polls a user-supplied async condition at a given interval
- Reclaims a slot when the condition returns
true - Returns control to the task, which continues executing with the slot held
While deferred, the task’s async future stays parked in the tokio runtime consuming minimal resources.
The example lives in examples/features/deferred-tasks/. It defines a two-task pipeline where the first task defers until simulated external data is ready, then the second task processes that data.
use cloacina::{task, workflow, Context, TaskError, TaskHandle};
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[workflow(
name = "deferred_pipeline",
description = "Pipeline demonstrating deferred task execution"
)]
pub mod deferred_pipeline {
use super::*;
#[task(id = "wait_for_data", dependencies = [])]
pub async fn wait_for_data(
context: &mut Context<serde_json::Value>,
handle: &mut TaskHandle,
) -> Result<(), TaskError> {
// ...
}
}
The key difference from a normal task is the second parameter: handle: &mut TaskHandle. The macro detects this by name (handle or task_handle) and sets requires_handle() = true on the generated Task trait implementation.
Inside wait_for_data, the task calls defer_until with a condition closure and a poll interval:
let poll_count = Arc::new(AtomicUsize::new(0));
let pc = poll_count.clone();
handle
.defer_until(
move || {
let pc = pc.clone();
async move {
let n = pc.fetch_add(1, Ordering::SeqCst);
info!("polling external source (attempt {})", n + 1);
// Simulate: data becomes ready after 3 polls
n >= 2
}
},
Duration::from_millis(500),
)
.await
.map_err(|e| TaskError::ExecutionFailed {
message: format!("defer_until failed: {e}"),
task_id: "wait_for_data".into(),
timestamp: chrono::Utc::now(),
})?;
While this loop is running:
- The task’s concurrency slot is released after the first call to
defer_until - Other tasks in the executor can use that freed slot
- Once the condition returns
true(after 3 polls here), a slot is reclaimed - Execution continues normally after the
await
After resuming, the task writes data into the context for downstream consumers:
context.insert("external_data", json!({"status": "ready", "records": 42}))?;
Ok(())
A second task depends on the deferred task and processes the data:
#[task(id = "process_data", dependencies = ["wait_for_data"])]
pub async fn process_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
let data = context
.get("external_data")
.ok_or_else(|| TaskError::ExecutionFailed {
message: "external_data not found in context".into(),
task_id: "process_data".into(),
timestamp: chrono::Utc::now(),
})?
.clone();
let records = data.get("records").and_then(|v| v.as_u64()).unwrap_or(0);
context.insert("processed_count", json!(records))?;
context.insert("processing_complete", json!(true))?;
Ok(())
}
This task does not take a TaskHandle – it runs as a normal task and executes once wait_for_data completes.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runner =
DefaultRunner::with_config("sqlite://deferred-tasks.db", DefaultRunnerConfig::default())
.await?;
let result = runner.execute("deferred_pipeline", Context::new()).await?;
println!("Status: {:?}", result.status);
println!("Processed: {} records", result.final_context.get("processed_count").unwrap());
runner.shutdown().await?;
Ok(())
}
The workflow is auto-registered by the #[workflow] macro. The executor detects that wait_for_data requires a handle and provides one at runtime.
The condition closure passed to defer_until must:
- Return
impl Future<Output = bool> - Be callable multiple times (it is invoked every
poll_interval) - Return
truewhen the task should resume
Common real-world conditions include:
| Pattern | Example |
|---|---|
| File existence | Path::new("/data/input.csv").exists() |
| API readiness | client.get(url).send().await?.status().is_success() |
| Queue message | queue.peek().await.is_some() |
| Database flag | db.query("SELECT ready FROM jobs WHERE id = $1", &[&id]).await?.ready |
defer_until returns Result<(), ExecutorError>. It can fail if:
- The executor’s semaphore is closed (typically during shutdown)
- The slot cannot be reclaimed
Map the error to TaskError to propagate it through the normal task error path, as shown in the example above.
- Short waits: If the expected wait is under a few seconds, the overhead of releasing and reclaiming a slot may not be worth it. Just
tokio::time::sleepinstead. - CPU-bound polling: The condition should be cheap. Expensive computation in the condition will block a tokio worker thread.
- Single-task pipelines: If only one task is running, releasing the slot provides no benefit since no other task can use it.
- Task Deferral Architecture – internal mechanics of slot tokens and the defer_until lifecycle
- Macro Reference – full
#[task]attribute reference including handle detection - Python TaskHandle – using
TaskHandlefrom Python