07 - Packaging a Computation Graph
In this tutorial you’ll take a computation graph from Rust source code all the way to a running graph loaded inside the Cloacina server. You’ll build it as a shared library, package it into a .cloacina source archive, upload it via the REST API, and verify that the reconciler compiles and loads it automatically.
- The directory layout and
package.tomlfields for a computation graph package - The
Cargo.tomlconfiguration forcdyliboutput - How to write a minimal single-accumulator graph with
#[computation_graph] - Packaging the source into a
.cloacinaarchive and uploading viaPOST /tenants/public/workflows - Polling the health endpoints to confirm the graph is live
- Completion of the library tutorial 07 - Your First Computation Graph
- The Cloacina server running and reachable (see the workflow service tutorials for server setup)
- A valid PAK token (bootstrap key or one created via
POST /auth/keys) - Rust toolchain installed (
rustc,cargo) curlandtaravailable in your shell
20–30 minutes (most of which is waiting for the first Rust compile)
A computation graph package is a Rust crate compiled as a cdylib. The server’s reconciler watches for newly uploaded .cloacina archives, extracts the source, compiles it, and loads the resulting shared library via fidius FFI. Once loaded, the graph’s accumulators and reactor are registered with the ComputationGraphScheduler and start accepting events.
The key distinction from a packaged workflow: the graph plugin exposes an execute_graph() FFI method that receives a serialized InputCache snapshot and returns the terminal node outputs. The host server owns all accumulator channels and the reactor loop — your plugin only contains the pure computation logic.
mkdir my-price-signal
cd my-price-signal
package.toml is the Cloacina package manifest. Identity metadata only — package shape (workflow vs computation graph vs reactor) is now derived from the FFI metadata the cdylib produces, not from manifest keys.
[package]
name = "my-price-signal"
version = "0.1.0"
interface = "cloacina-workflow-plugin"
interface_version = 1
extension = "cloacina"
[metadata]
graph_name = "price_signal"
language = "rust"
description = "Compute a mid-price signal from order book snapshots"
The [metadata] fields for computation graph packages:
| Field | Required | Meaning |
|---|---|---|
graph_name |
Yes | Identifier used for accumulator and reactor names |
language |
Yes | "rust" — tells the reconciler how to compile |
description |
No | Human-readable package description |
Note: Earlier versions accepted package_type = ["computation_graph"] and [[triggers]] stanzas in [metadata]. Both are now hard-rejected at load time — package classification flows through FFI metadata (get_graph_metadata, get_reactor_metadata, get_trigger_metadata) and trigger declarations live on #[trigger] macros in the cdylib. Reaction mode and input strategy are read from the #[computation_graph(reaction = ..., strategy = ...)] attributes on the macro itself.
[package]
name = "my-price-signal"
version = "0.1.0"
edition = "2021"
[workspace]
[features]
default = ["packaged"]
packaged = []
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
cloacina-computation-graph = "0.3"
cloacina-macros = "0.3"
cloacina-workflow = { version = "0.3", features = ["packaged"] }
cloacina-workflow-plugin = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-trait = "0.1"
tokio = { version = "1.0", features = ["full"] }
[build-dependencies]
cloacina-build = "0.3"
Why both cdylib and rlib?cdylibproduces the shared library (.so/.dylib/.dll) that the server loads at runtime.rliblets you runcargo testagainst the crate — tests cannot link against acdylibdirectly.
cloacina-build generates the FFI glue that fidius needs to call your execute_graph() function.
fn main() {
cloacina_build::configure();
}
Create a minimal graph: a single orderbook accumulator drives a compute_signal entry node which produces a PriceSignal terminal output.
use cloacina_macros::reactor;
use serde::{Deserialize, Serialize};
// One invocation per cdylib — emits the unified FFI plugin shell that
// the reconciler calls (`get_task_metadata`, `get_graph_metadata`,
// `get_reactor_metadata`, `get_trigger_metadata`,
// `invoke_trigger_poll`, `get_triggerless_graph_metadata`,
// `invoke_triggerless_graph`).
cloacina_workflow_plugin::package!();
// --- Boundary types ---
/// Input from the orderbook accumulator.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderBook {
pub best_bid: f64,
pub best_ask: f64,
}
/// Terminal output of the graph.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriceSignal {
pub mid_price: f64,
pub spread: f64,
}
// --- Reactor: publishes the orderbook accumulator ---
#[reactor(
name = "price_signal_rx",
accumulators = [orderbook],
criteria = when_any(orderbook),
)]
pub struct PriceSignalReactor;
// --- Computation graph (reactor-bound) ---
#[cloacina_macros::computation_graph(
trigger = reactor("price_signal_rx"),
graph = {
compute_signal(orderbook) -> emit,
}
)]
pub mod price_signal {
use super::*;
/// Entry node: receives an order book snapshot and computes the mid-price.
pub async fn compute_signal(orderbook: Option<&OrderBook>) -> PriceSignal {
match orderbook {
Some(ob) => PriceSignal {
mid_price: (ob.best_bid + ob.best_ask) / 2.0,
spread: ob.best_ask - ob.best_bid,
},
None => PriceSignal {
mid_price: 0.0,
spread: 0.0,
},
}
}
/// Terminal node: receives the computed signal and logs it.
pub async fn emit(signal: &PriceSignal) -> String {
format!(
"mid={:.4} spread={:.4}",
signal.mid_price, signal.spread
)
}
}
The topology compute_signal(orderbook) -> emit means:
compute_signalis an entry node — it reads from theorderbookaccumulator (by receivingOption<&OrderBook>)emitis a terminal node — it receives the output ofcompute_signaland its return value is the final graph output- The reactor fires when the
orderbookaccumulator delivers a new value (when_any)
Before packaging, verify the crate compiles:
cargo build --lib
On success you’ll see the shared library in:
target/debug/libmy_price_signal.dylib # macOS
target/debug/libmy_price_signal.so # Linux
target/debug/my_price_signal.dll # Windows
You don’t need to ship this file — the server compiles from source.
The server expects a .cloacina file, which is a bz2-compressed tar archive. The archive must have a top-level directory named {package-name}-{version}/ containing all source files.
cd .. # go one level above my-price-signal/
tar -cjf my-price-signal.cloacina \
--transform 's,^my-price-signal,my-price-signal-0.1.0,' \
my-price-signal/package.toml \
my-price-signal/Cargo.toml \
my-price-signal/build.rs \
my-price-signal/src/lib.rs
Verify the archive structure:
tar -tjf my-price-signal.cloacina
Expected output:
my-price-signal-0.1.0/package.toml
my-price-signal-0.1.0/Cargo.toml
my-price-signal-0.1.0/build.rs
my-price-signal-0.1.0/src/lib.rs
Archive structure mattersThe reconciler expects a single top-level directory named{name}-{version}. If the paths inside the archive don’t match this layout, the extract step will fail and the package will be rejected.
Set your server base URL and PAK token:
BASE_URL="http://localhost:8080"
TOKEN="clk_your_bootstrap_or_api_key_here"
Upload via multipart form:
curl -s -w "\nHTTP %{http_code}\n" \
-X POST "${BASE_URL}/tenants/public/workflows" \
-H "Authorization: Bearer ${TOKEN}" \
-F "file=@my-price-signal.cloacina;type=application/octet-stream"
Expected response (HTTP 201):
{
"id": "a1b2c3d4-...",
"name": "my-price-signal",
"version": "0.1.0",
"status": "pending"
}
The status: "pending" means the reconciler has accepted the archive and queued the compile job.
The first Rust compile of a new package typically takes 60–120 seconds. The reconciler runs cargo build --lib with the Cloacina workspace available as a path dependency, then loads the resulting shared library into the server process.
Poll the reactor health endpoint until your graph appears:
# Poll every 5 seconds for up to 2 minutes
for i in $(seq 1 24); do
echo "--- attempt $i ---"
curl -s "${BASE_URL}/v1/health/graphs" \
-H "Authorization: Bearer ${TOKEN}" | \
python3 -m json.tool
sleep 5
done
While compiling you’ll see an empty reactor list:
{ "graphs": [] }
Once loaded:
{
"graphs": [
{
"name": "price_signal",
"health": { "state": "running" },
"accumulators": ["orderbook"],
"paused": false
}
]
}
curl -s "${BASE_URL}/v1/health/accumulators" \
-H "Authorization: Bearer ${TOKEN}" | python3 -m json.tool
Expected:
{
"accumulators": [
{
"name": "orderbook",
"status": "healthy"
}
]
}
If the accumulator is "healthy" and the reactor is "running", your packaged computation graph is live and ready to receive events.
When the server receives a .cloacina source package, the reconciler:
- Extracts the archive to a temporary build directory
- Injects a
[patch.crates-io]section intoCargo.tomlso path dependencies resolve to the server’s bundled Cloacina version - Runs
cargo build --lib --release(or--debugdepending on server mode) - Calls
build_declaration_from_ffi()to convert theGraphPackageMetadatareturned by the FFI plugin into aComputationGraphDeclaration - Calls
ComputationGraphScheduler::load_graph()to spawn the accumulator tasks and reactor loop
The FFI boundary uses JSON (debug builds) or bincode (release builds) for the InputCache snapshot passed to execute_graph().
HTTP 400 on upload: The archive is malformed. Check that the top-level directory matches {name}-{version} and that package.toml is present.
Graph never appears in /v1/health/graphs: Check the server logs. Look for cargo build errors — the most common cause is a version mismatch in Cargo.toml. Make sure cloacina-computation-graph, cloacina-macros, cloacina-workflow-plugin, and cloacina-build all use the same version.
Accumulator shows "unhealthy": The accumulator task crashed, usually due to a deserialization failure on the first event. Check that the event payload you send matches the boundary type (OrderBook in this example).
Now that your graph is deployed and running, the next step is to push events into it:
- Tutorial 08: WebSocket Event Injection — push events to the
orderbookaccumulator over a WebSocket connection - Tutorial 09: Kafka-Sourced Computation Graphs — drive accumulators from a Kafka topic instead of WebSocket