03 - Parallel Processing
Welcome to the third tutorial in our Cloacina series! In this tutorial, you’ll learn how to create parallel processing workflows and implement task parallelization patterns. We’ll build on the concepts from Tutorial 2 and show you how to handle expensive operations efficiently by running them in parallel.
- Completion of Tutorial 2
- Basic understanding of Rust
- Rust toolchain installed (rustc, cargo)
- A code editor of your choice
25-30 minutes
Let’s start by creating a new Rust project. We’ll create it in a directory that’s a sibling to the Cloacina repository:
# Assuming you're in the parent directory of the Cloacina repository
mkdir -p my-cloacina-projects
cd my-cloacina-projects
cargo new parallel-workflow
cd parallel-workflow
Your directory structure should look like this:
.
├── cloacina/ # The Cloacina repository
└── my-cloacina-projects/ # Your projects directory
└── parallel-workflow/ # Your new project
├── Cargo.toml
└── src/
└── main.rs
Now, add Cloacina and its dependencies to your Cargo.toml
:
[dependencies]
cloacina = { path = "../../cloacina", default-features = false, features = ["macros", "sqlite"] }
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
async-trait = "0.1"
ctor = "0.2"
chrono = "0.4"
ImportantNormally you’d use
cloacina = "0.1.0"
in Cargo.toml. For these tutorials, we’re using path dependencies to vendor code locally.The path must be relative to your project. Examples:
- Next to Cloacina:
path = "../cloacina"
- In subdirectory:
path = "../../../cloacina"
Note: Use
version = "0.1.0"
when available on crates.io.
In Cloacina, tasks can run in parallel when they don’t depend on each other. This is particularly useful for handling expensive operations like:
- Long-running network calls
- CPU-intensive computations
- External API requests
- Database operations
- File I/O operations
By splitting these operations into independent tasks, we may significantly reduce the total execution time of our workflow.
- Task Parallelization: Running multiple tasks simultaneously when they don’t depend on each other
- Task Partitioning: Splitting expensive operations into independent tasks that can run in parallel
- Context Synchronization: Managing data flow and state between parallel tasks
- Result Combination: Merging results from parallel tasks into a coherent output
Here’s a visual representation of our parallel processing workflow:
graph TD A[generate_data] --> B[partition_data] B --> C[process_partition_1] B --> D[process_partition_2] B --> E[process_partition_3] C --> F[combine_results] D --> F E --> F F --> G[generate_report] F --> H[send_notifications] G --> I[cleanup] H --> I
The diagram shows:
- Initial data generation
- Data partitioning into three streams
- Parallel processing of each partition
- Final combination of results
Notice how process_partition_1
, process_partition_2
, and process_partition_3
can run simultaneously since they only depend on partition_data
and not on each other.
NoteImportant Note About Task Parallelization
While we use a product catalog as an example, these principles apply to any expensive operations like API calls, file processing, or database queries.
The key is to identify operations that:
- Can be split into independent tasks
- Are expensive enough to justify parallelization overhead
- Don’t have interdependencies preventing parallel execution
Let’s create a workflow that processes a product catalog in parallel. We’ll:
- Generate a dataset of products
- Split the processing into three independent tasks
- Process each partition in parallel
- Combine the results
- Generate reports and send notifications
First, let’s define our data structures:
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Product {
id: u32,
name: String,
category: String,
price: f64,
stock: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct CategoryStats {
total_value: f64,
total_stock: u32,
product_count: u32,
}
Now, let’s create our tasks:
// Generate large dataset
#[task(
id = "generate_data",
dependencies = [],
retry_attempts = 2
)]
async fn generate_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
info!("🚀 Generating product dataset");
// Simulate loading a large dataset
tokio::time::sleep(Duration::from_millis(500)).await;
let total_products = 10000;
let products = (1..=total_products)
.map(|id| Product {
id,
name: format!("Product {}", id),
category: format!("Category {}", (id % 10) + 1),
price: (id as f64 * 1.5) % 100.0,
stock: (id * 10) % 1000,
})
.collect::<Vec<_>>();
context.insert("total_products", json!(total_products))?;
context.insert("products", json!(products))?;
info!("Generated {} products across 10 categories", total_products);
Ok(())
}
// Partition data into three chunks
#[task(
id = "partition_data",
dependencies = ["generate_data"],
retry_attempts = 2
)]
async fn partition_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
info!("Partitioning product data");
let products: Vec<Product> = context.get("products")?;
let chunk_size = products.len() / 3;
let (chunk1, remainder) = products.split_at(chunk_size);
let (chunk2, chunk3) = remainder.split_at(chunk_size);
context.insert("partition_1", json!(chunk1.to_vec()))?;
context.insert("partition_2", json!(chunk2.to_vec()))?;
context.insert("partition_3", json!(chunk3.to_vec()))?;
info!("Data partitioned into 3 chunks of {} products each", chunk_size);
Ok(())
}
// Process first partition
#[task(
id = "process_partition_1",
dependencies = ["partition_data"],
retry_attempts = 3,
retry_delay_ms = 1000
)]
async fn process_partition_1(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
let products: Vec<Product> = context.get("partition_1")?;
info!("Processing partition 1: {} products", products.len());
// Simulate CPU-intensive processing
let processing_time = rand::thread_rng().gen_range(1000..3000);
tokio::time::sleep(Duration::from_millis(processing_time)).await;
let mut stats = std::collections::HashMap::new();
for product in products {
let entry = stats.entry(product.category.clone())
.or_insert(CategoryStats {
total_value: 0.0,
total_stock: 0,
product_count: 0,
});
entry.total_value += product.price * product.stock as f64;
entry.total_stock += product.stock;
entry.product_count += 1;
}
context.insert("stats_1", json!(stats))?;
context.insert("processing_time_1", json!(processing_time))?;
info!("Partition 1 complete: processed {} products in {}ms", products.len(), processing_time);
Ok(())
}
// Process second partition
#[task(
id = "process_partition_2",
dependencies = ["partition_data"],
retry_attempts = 3,
retry_delay_ms = 1000
)]
async fn process_partition_2(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
let products: Vec<Product> = context.get("partition_2")?;
info!("Processing partition 2: {} products", products.len());
// Simulate CPU-intensive processing
let processing_time = rand::thread_rng().gen_range(1500..4000);
tokio::time::sleep(Duration::from_millis(processing_time)).await;
let mut stats = std::collections::HashMap::new();
for product in products {
let entry = stats.entry(product.category.clone())
.or_insert(CategoryStats {
total_value: 0.0,
total_stock: 0,
product_count: 0,
});
entry.total_value += product.price * product.stock as f64;
entry.total_stock += product.stock;
entry.product_count += 1;
}
context.insert("stats_2", json!(stats))?;
context.insert("processing_time_2", json!(processing_time))?;
info!("Partition 2 complete: processed {} products in {}ms", products.len(), processing_time);
Ok(())
}
// Process third partition
#[task(
id = "process_partition_3",
dependencies = ["partition_data"],
retry_attempts = 3,
retry_delay_ms = 1000
)]
async fn process_partition_3(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
let products: Vec<Product> = context.get("partition_3")?;
info!("Processing partition 3: {} products", products.len());
// Simulate CPU-intensive processing
let processing_time = rand::thread_rng().gen_range(800..2500);
tokio::time::sleep(Duration::from_millis(processing_time)).await;
let mut stats = std::collections::HashMap::new();
for product in products {
let entry = stats.entry(product.category.clone())
.or_insert(CategoryStats {
total_value: 0.0,
total_stock: 0,
product_count: 0,
});
entry.total_value += product.price * product.stock as f64;
entry.total_stock += product.stock;
entry.product_count += 1;
}
context.insert("stats_3", json!(stats))?;
context.insert("processing_time_3", json!(processing_time))?;
info!("Partition 3 complete: processed {} products in {}ms", products.len(), processing_time);
Ok(())
}
// Combine results
#[task(
id = "combine_results",
dependencies = ["process_partition_1", "process_partition_2", "process_partition_3"],
retry_attempts = 2
)]
async fn combine_results(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
info!("🔀 Combining results from parallel processing");
let stats_1: std::collections::HashMap<String, CategoryStats> = context.get("stats_1")?;
let stats_2: std::collections::HashMap<String, CategoryStats> = context.get("stats_2")?;
let stats_3: std::collections::HashMap<String, CategoryStats> = context.get("stats_3")?;
let time_1: u64 = context.get("processing_time_1")?;
let time_2: u64 = context.get("processing_time_2")?;
let time_3: u64 = context.get("processing_time_3")?;
// Combine all stats
let mut combined_stats = stats_1;
for (category, stats) in stats_2 {
let entry = combined_stats.entry(category)
.or_insert(CategoryStats {
total_value: 0.0,
total_stock: 0,
product_count: 0,
});
entry.total_value += stats.total_value;
entry.total_stock += stats.total_stock;
entry.product_count += stats.product_count;
}
for (category, stats) in stats_3 {
let entry = combined_stats.entry(category)
.or_insert(CategoryStats {
total_value: 0.0,
total_stock: 0,
product_count: 0,
});
entry.total_value += stats.total_value;
entry.total_stock += stats.total_stock;
entry.product_count += stats.product_count;
}
// Calculate parallel efficiency
let total_processing_time = time_1 + time_2 + time_3;
let max_parallel_time = std::cmp::max(std::cmp::max(time_1, time_2), time_3);
let parallel_efficiency = (total_processing_time as f64 / max_parallel_time as f64) * 100.0;
context.insert("final_stats", json!(combined_stats))?;
context.insert("total_processing_time_ms", json!(total_processing_time))?;
context.insert("actual_parallel_time_ms", json!(max_parallel_time))?;
context.insert("parallel_efficiency_percent", json!(parallel_efficiency))?;
info!("Results combined: {:.1}% parallel efficiency", parallel_efficiency);
Ok(())
}
Parallel Processing PowerCloacina’s parallel processing capabilities enable you to:
- Task Independence: Run multiple tasks simultaneously when they don’t depend on each other
- Resource Efficiency: Process expensive operations in parallel to reduce total execution time
- Result Coordination: Automatically combine results from parallel tasks
- Error Isolation: Handle failures in parallel tasks without affecting others
- Scalability: Process more work in the same time by utilizing parallel execution
In the next tutorials, you’ll learn how to build more complex parallel processing patterns.
You can run this tutorial in two ways:
If you’re following along with the Cloacina repository, you can use angreal to run the tutorial:
# From the Cloacina repository root
angreal tutorials 03
This will run the tutorial code with all necessary dependencies.
If you’re building the project manually, simply run your workflow with: 2. A database named “cloacina” created 3. A user “cloacina” with password “cloacina” with access to the database
Then run your workflow with:
cargo run
Congratulations! You’ve created a parallel processing workflow. In the next tutorial, we’ll explore:
- More complex parallel processing patterns
- Advanced task coordination
- Error handling in parallel workflows
- Resource management strategies