07 - Your First Computation Graph
In this tutorial you’ll build your first computation graph — a pricing pipeline that reads an order book snapshot, computes spread in basis points, and formats the result. You’ll learn how Cloacina’s two macros work together: #[reactor] declares the firing criterion, and #[computation_graph] references that reactor by name and wires async functions into a compiled, callable graph.
- How to define boundary types (the data that flows between nodes)
- The
#[reactor]attribute macro: declaring the firing criterion as a top-level primitive - The
#[computation_graph]attribute macro and topology declaration syntax — includingtrigger = reactor("...") - Entry nodes, processing nodes, and terminal nodes
InputCache,SourceName, andserialize()- Calling the generated
_compiled()function and inspectingGraphResult
- Completion of a Cloacina workflow tutorial, or basic Rust async familiarity
- Rust toolchain installed
The full source lives at examples/tutorials/computation-graphs/library/07-computation-graph.
To run it:
# From the Cloacina repository root
angreal demos tutorials rust 07
Every piece of data that flows between graph nodes must implement Serialize + Deserialize. The InputCache stores values as serialized bytes, so serde is required throughout.
use cloacina::computation_graph::types::{serialize, GraphResult, InputCache, SourceName};
use serde::{Deserialize, Serialize};
/// Raw order book snapshot — our input data.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderBookSnapshot {
pub best_bid: f64,
pub best_ask: f64,
pub timestamp: u64,
}
/// Computed spread signal — intermediate result.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpreadSignal {
pub spread: f64,
pub mid_price: f64,
}
/// Final formatted output — terminal node result.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FormattedOutput {
pub message: String,
pub mid_price: f64,
pub spread_bps: f64,
}
You define one struct per data boundary. OrderBookSnapshot enters the graph from outside, SpreadSignal flows between nodes internally, and FormattedOutput is what the graph produces.
As of CLOACI-I-0101 a graph’s firing criterion is its own top-level primitive. You declare a reactor with #[reactor] (giving it a name, an accumulators list, and a criteria expression), then point one or more #[computation_graph] declarations at it via trigger = reactor("name"). Inside the annotated mod, each pub async fn becomes a node.
#[cloacina_macros::reactor(
name = "pricing_pipeline_reactor",
accumulators = [orderbook],
criteria = when_any(orderbook),
)]
pub struct PricingPipelineReactor;
#[cloacina_macros::computation_graph(
trigger = reactor("pricing_pipeline_reactor"),
graph = {
ingest(orderbook) -> compute_spread,
compute_spread -> format_output,
}
)]
pub mod pricing_pipeline {
use super::*;
/// Entry node: reads the order book from the cache.
pub async fn ingest(orderbook: Option<&OrderBookSnapshot>) -> SpreadSignal {
let book = orderbook.expect("orderbook should be present");
let spread = book.best_ask - book.best_bid;
let mid_price = (book.best_ask + book.best_bid) / 2.0;
SpreadSignal { spread, mid_price }
}
/// Processing node: converts spread to basis points.
pub async fn compute_spread(input: &SpreadSignal) -> SpreadSignal {
let spread_bps = (input.spread / input.mid_price) * 10_000.0;
SpreadSignal {
spread: spread_bps,
mid_price: input.mid_price,
}
}
/// Terminal node: formats the result for display.
pub async fn format_output(input: &SpreadSignal) -> FormattedOutput {
FormattedOutput {
message: format!(
"Mid: {:.2}, Spread: {:.1} bps",
input.mid_price, input.spread
),
mid_price: input.mid_price,
spread_bps: input.spread,
}
}
}
Topology breakdown:
| Syntax | Meaning |
|---|---|
#[reactor(criteria = when_any(orderbook), ...)] |
Declares a reactor that fires whenever the orderbook source has new data |
trigger = reactor("pricing_pipeline_reactor") |
This graph subscribes to the reactor declared above (referenced by its string name) |
ingest(orderbook) |
ingest is an entry node; it reads orderbook from the cache |
-> compute_spread |
ingest’s output is passed to compute_spread as its input |
compute_spread -> format_output |
format_output receives compute_spread’s output |
Node function signatures:
- Entry nodes take
Option<&T>for each named cache source. TheOptionisNoneif that source hasn’t been populated yet. - Processing nodes take
&TwhereTis the return type of their upstream node. - The terminal node is whichever node has no downstream — here
format_output. Its return value ends up inGraphResult.
The macro generates a function called pricing_pipeline_compiled (the module name plus _compiled).
You don’t need a reactor or accumulator for the simplest case. Build an InputCache, serialize your input into it, and call the generated function directly.
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// Build an InputCache with our order book data
let mut cache = InputCache::new();
let orderbook = OrderBookSnapshot {
best_bid: 100.50,
best_ask: 100.55,
timestamp: 1234567890,
};
// Serialize and insert under the source name "orderbook"
// — must match the accumulator name declared on the reactor
// (#[reactor(accumulators = [orderbook], criteria = when_any(orderbook), ...)])
cache.update(
SourceName::new("orderbook"),
serialize(&orderbook).expect("serialization should succeed"),
);
// Call the compiled graph
let result: GraphResult = pricing_pipeline_compiled(&cache).await;
match result {
GraphResult::Completed { outputs } => {
for output in &outputs {
if let Some(formatted) = output.downcast_ref::<FormattedOutput>() {
println!("Output: {}", formatted.message);
println!("Mid price: {:.2}", formatted.mid_price);
println!("Spread: {:.1} bps", formatted.spread_bps);
}
}
}
GraphResult::Error(e) => {
eprintln!("Graph execution failed: {}", e);
}
}
}
Key points:
SourceName::new("orderbook")must exactly match the accumulator name in the reactor declaration (accumulators = [orderbook]) and in the topology (ingest(orderbook)).serialize()converts your value toVec<u8>using the same codec the cache uses internally.GraphResult::Completed { outputs }carries aVec<Box<dyn Any>>. Usedowncast_ref::<T>()to get your concrete type back.GraphResult::Error(e)carries a string describing what went wrong.
=== Tutorial 07: Your First Computation Graph ===
Input: OrderBookSnapshot { best_bid: 100.5, best_ask: 100.55, timestamp: 1234567890 }
Graph completed with 1 terminal output(s)
Output: Mid: 100.52, Spread: 4.9 bps
Mid price: 100.52
Spread: 4.9 bps
=== Tutorial 07 Complete ===
You’ve built and executed your first computation graph:
#[reactor]declares the firing criterion as a top-level primitive (name,accumulators,criteria)#[computation_graph]declares the topology, subscribes to a reactor viatrigger = reactor("name"), and generates the_compiledfunction- Entry nodes receive
Option<&T>from theInputCache; processing nodes receive&Tfrom their upstream peer InputCacheholds named, serialized data that feeds entry nodesGraphResult::Completedcarries boxed terminal outputs; downcast them to your concrete types
The _compiled function is the building block for everything that follows. In the next tutorial you’ll wrap it in an accumulator and reactor to create a live, event-driven pipeline.
- Tutorial 08 — Accumulators: wire the compiled graph into a reactor driven by live events