Computation Graphs
Computation graphs are reactive, directed acyclic graphs (DAGs) of processing nodes. Unlike workflows (which are task-based pipelines driven by a runner), computation graphs react to data arriving at accumulators and propagate results through a fixed topology of nodes.
The Python API mirrors the Rust computation graph system, using decorators and a context-manager builder instead of macros and modules.
ComputationGraphBuilder is a context manager that declares a computation graph’s name, the reactor that fires it, and the topology. Inside the with block you define node functions with the @cloaca.node decorator. When the block exits, the builder validates that every node in the topology has a matching decorated function and vice versa, then registers the graph for execution.
As of CLOACI-I-0101 the firing criterion is its own first-class primitive: a class decorated with @cloaca.reactor. The bundled react={...} kwarg has been removed; pass the reactor class via reactor=.
cloaca.ComputationGraphBuilder(name, *, reactor, graph)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
name |
str |
Yes | Unique name for the computation graph. Used to identify the graph at execution time. |
reactor |
class decorated with @cloaca.reactor |
Yes (for reactor-triggered graphs) | The reactor that fires this graph. Its accumulators and mode determine when the graph runs. Omit for a trigger-less graph invoked via builder.execute() only. |
graph |
dict |
Yes | Topology dict mapping node names to their configuration. See Topology Dict. |
Returns: ComputationGraphBuilder instance (used as a context manager)
Example:
import cloaca
@cloaca.reactor(
name="pricing_pipeline_reactor",
accumulators=["orderbook"],
mode="when_any",
)
class PricingPipelineReactor:
pass
with cloaca.ComputationGraphBuilder(
"pricing_pipeline",
reactor=PricingPipelineReactor,
graph={
"ingest": {"inputs": ["orderbook"], "next": "compute_spread"},
"compute_spread": {"next": "format_output"},
"format_output": {},
},
) as builder:
# Define @cloaca.node functions here ...
pass
The reactor’s mode and accumulators control when the graph fires.
| Key | Type | Required | Description |
|---|---|---|---|
mode |
str |
Yes | "when_any" – fire when any named accumulator delivers new data. "when_all" – fire only when every named accumulator has delivered at least one value. |
accumulators |
list[str] |
Yes | List of accumulator names that feed this reactor. Each name must match a registered accumulator function. |
Example:
# Fire whenever either source delivers data
@cloaca.reactor(name="market_reactor", accumulators=["orderbook", "pricing"], mode="when_any")
class MarketReactor:
pass
# Fire only after both sources have delivered at least once
@cloaca.reactor(name="market_reactor_strict", accumulators=["orderbook", "pricing"], mode="when_all")
class MarketReactorStrict:
pass
The graph parameter is a dict mapping node names (strings) to node configuration dicts. Each key becomes a node in the DAG; the value dict controls how data flows to and from that node.
| Key | Type | Required | Description |
|---|---|---|---|
inputs |
list[str] |
No | List of accumulator/source names this node reads from the input cache. Only used on entry nodes. |
next |
str |
No | Name of the downstream node for linear (unconditional) edges. Mutually exclusive with routes. |
routes |
dict[str, str] |
No | Maps variant name strings to downstream node names for conditional routing. Mutually exclusive with next. |
A node with neither next nor routes is a terminal node – its return value becomes the graph’s output.
Linear topology example:
graph={
"ingest": {"inputs": ["orderbook"], "next": "compute_spread"},
"compute_spread": {"next": "format_output"},
"format_output": {}, # terminal
}
Routing topology example:
graph={
"decision": {
"inputs": ["orderbook", "pricing"],
"routes": {
"Trade": "signal_handler",
"NoAction": "audit_logger",
},
},
"signal_handler": {}, # terminal on Trade branch
"audit_logger": {}, # terminal on NoAction branch
}
Context manager protocol. __enter__ establishes the active graph context so @cloaca.node decorators can register functions. __exit__ validates the topology against registered nodes and builds the internal executor.
Raises:
AttributeError– if a node name in the topology has no matching@cloaca.nodefunctionValueError– if a@cloaca.nodefunction does not appear in the topology
Execute the computation graph synchronously with the given input data.
builder.execute(inputs)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
inputs |
dict[str, dict] |
Yes | Maps source/accumulator names to their data dicts. Each key must match a name from the react accumulators list. |
Returns: dict – the terminal node’s return value
Raises:
ValueError– if the graph has not been built yet (called before thewithblock exits) or if execution fails
Example:
with cloaca.ComputationGraphBuilder("pricing_pipeline", ...) as builder:
# ... define nodes ...
pass
# Execute after the with block
result = builder.execute({"orderbook": {"best_bid": 100.50, "best_ask": 100.55}})
print(result) # {"message": "Mid: 100.52, Spread: 4.9 bps", ...}
Returns a string representation of the builder.
repr(builder)
# "ComputationGraphBuilder(name='pricing_pipeline', nodes=3)"
The @cloaca.node decorator registers a function as a node in the active ComputationGraphBuilder context. It must be used inside a with ComputationGraphBuilder(...) as builder: block.
@cloaca.node
def node_name(arg1, arg2, ...):
...
Parameters:
| Parameter | Type | Description |
|---|---|---|
func |
callable |
The function to register. The function name must exactly match a key in the graph topology dict. |
Returns: The original function, unchanged (transparent decorator)
Raises:
ValueError– if used outside aComputationGraphBuildercontext manager
The role of a node is determined by its position in the topology, not by the decorator itself. There are three types:
Entry nodes have "inputs" in their topology config. They receive one positional argument per source listed in "inputs", drawn from the input cache. Arguments are None if the corresponding source has no data yet.
# Topology: "ingest": {"inputs": ["orderbook", "pricing"], "next": "analyze"}
@cloaca.node
def ingest(orderbook, pricing):
"""Each parameter matches a source name from 'inputs'."""
if orderbook is None:
return {"spread": 0.0}
spread = orderbook["best_ask"] - orderbook["best_bid"]
return {"spread": spread}
Interior nodes have a "next" or "routes" key and no "inputs". They receive a single positional argument: the dict returned by their upstream node.
Linear interior node:
# Topology: "compute_spread": {"next": "format_output"}
@cloaca.node
def compute_spread(input_data):
"""Receives the dict returned by the upstream node."""
spread_bps = (input_data["spread"] / input_data["mid_price"]) * 10_000
return {"spread_bps": spread_bps, "mid_price": input_data["mid_price"]}
Routing interior node (returns a tuple):
# Topology: "decision": {"inputs": [...], "routes": {"Trade": ..., "NoAction": ...}}
@cloaca.node
def decision(orderbook, pricing):
"""Routing nodes must return a (variant_name, payload_dict) tuple."""
if orderbook is None:
return ("NoAction", {"reason": "no data"})
return ("Trade", {"direction": "BUY", "price": 100.0})
Terminal nodes have neither "next" nor "routes" in their topology config. They receive the upstream node’s output and return a dict that becomes the graph’s final result.
# Topology: "format_output": {}
@cloaca.node
def format_output(input_data):
"""Return value becomes the result of builder.execute()."""
return {"message": f"Spread: {input_data['spread_bps']:.1f} bps"}
| Node Type | Required Return Type | Description |
|---|---|---|
| Entry node | dict |
Passed to the downstream node as its argument |
| Linear interior node | dict |
Passed to the downstream node as its argument |
| Routing node | tuple[str, dict] |
First element is the variant name (must match a key in "routes"), second element is the payload dict sent to the selected branch |
| Terminal node | dict |
Becomes the return value of builder.execute() |
When the with block exits, the builder performs two-way validation:
- Every node name in the
graphtopology must have a corresponding@cloaca.node-decorated function with the same name. - Every
@cloaca.node-decorated function must appear as a key in thegraphtopology.
Mismatches raise AttributeError or ValueError.
Accumulators sit between raw data sources and the computation graph. They receive events, optionally transform or buffer them, and emit the processed values that entry nodes consume. In a reactive deployment the runtime manages accumulators automatically; in tutorials and tests you can call the decorated function directly.
The function name of each accumulator becomes its source name, which must match entries in the react accumulators list and the inputs lists in the topology.
Registers a function as a passthrough accumulator. Each event is transformed one-to-one with no buffering.
@cloaca.passthrough_accumulator
def source_name(event):
...
Parameters: None (bare decorator)
Function Signature:
| Parameter | Type | Description |
|---|---|---|
event |
dict |
The raw incoming event |
Returns: dict to pass to the graph, or None to suppress the event
Example:
@cloaca.passthrough_accumulator
def pricing(event):
"""Transform raw pricing event into graph-ready format."""
return {"price": event["mid_price"], "change_pct": 0.0}
# Manual invocation (tutorials/testing)
processed = pricing({"mid_price": 101.25})
result = builder.execute({"pricing": processed})
Registers a function as a stream-backed accumulator. In a packaged deployment, this accumulator subscribes to a streaming backend (e.g., Kafka, Redpanda) and delivers messages to the graph.
@cloaca.stream_accumulator(type=..., topic=..., group=...)
def source_name(event):
...
Parameters (keyword-only):
| Parameter | Type | Required | Description |
|---|---|---|---|
type |
str |
Yes | Stream backend type (e.g., "kafka", "redpanda") |
topic |
str |
Yes | Topic name to subscribe to |
group |
str |
No | Consumer group name. Defaults to the graph name if omitted. |
Function Signature:
| Parameter | Type | Description |
|---|---|---|
event |
dict |
A deserialized message from the stream |
Returns: dict to pass to the graph, or None to suppress the message
Example:
@cloaca.stream_accumulator(type="kafka", topic="market.orderbook", group="mm-group")
def orderbook(event):
"""Receive order book updates from Kafka."""
return {
"best_bid": event["bid"],
"best_ask": event["ask"],
"timestamp": event["ts"],
}
Registers a function as a polling accumulator. The runtime calls the function at a fixed interval to produce data for the graph.
@cloaca.polling_accumulator(interval=...)
def source_name():
...
Parameters (keyword-only):
| Parameter | Type | Required | Description |
|---|---|---|---|
interval |
str |
Yes | Polling interval as a duration string (e.g., "5s", "100ms", "1m") |
Function Signature: The decorated function takes no arguments and returns a dict.
Returns: dict to deliver to the graph
Example:
@cloaca.polling_accumulator(interval="5s")
def system_metrics():
"""Poll system metrics every 5 seconds."""
return {
"cpu_usage": get_cpu_percent(),
"memory_mb": get_memory_usage(),
}
Registers a function as a batch accumulator. Events are buffered and flushed to the graph either when the buffer reaches max_buffer_size or after flush_interval elapses, whichever comes first.
@cloaca.batch_accumulator(flush_interval=..., max_buffer_size=...)
def source_name(events):
...
Parameters (keyword-only):
| Parameter | Type | Required | Description |
|---|---|---|---|
flush_interval |
str |
Yes | Maximum time to buffer before flushing (e.g., "10s", "500ms") |
max_buffer_size |
int |
No | Maximum number of events to buffer before flushing. If omitted, flushing is driven solely by the interval. |
Function Signature:
| Parameter | Type | Description |
|---|---|---|
events |
list[dict] |
The batch of buffered events |
Returns: dict to deliver to the graph (typically an aggregation of the batch)
Example:
@cloaca.batch_accumulator(flush_interval="10s", max_buffer_size=100)
def trade_events(events):
"""Aggregate trade events into a summary every 10s or 100 events."""
total_volume = sum(e["volume"] for e in events)
avg_price = sum(e["price"] for e in events) / len(events)
return {
"total_volume": total_volume,
"avg_price": avg_price,
"trade_count": len(events),
}
| Decorator | Buffering | Trigger | Use Case |
|---|---|---|---|
@passthrough_accumulator |
None | Each event | Low-latency, one-to-one transforms |
@stream_accumulator |
None | Each message from stream backend | Kafka/Redpanda subscriptions |
@polling_accumulator |
None | Fixed interval | Periodic data fetch (APIs, sensors) |
@batch_accumulator |
Yes | Interval or buffer size | Aggregation, high-throughput reduction |
After the with block exits, call builder.execute() to run the graph synchronously. This is the primary API for tutorials and testing.
with cloaca.ComputationGraphBuilder("my_graph", ...) as builder:
# ... define nodes ...
pass
result = builder.execute({"source_name": {"key": "value"}})
The inputs dict maps source names to data dicts. Each key should match an accumulator name from the react configuration. The return value is the terminal node’s output dict.
In a packaged deployment, computation graphs run inside the graph scheduler. The runtime:
- Starts each registered accumulator (subscribing to streams, setting up polling timers, etc.)
- Feeds accumulator output into the input cache
- Evaluates the reaction criteria (
when_anyorwhen_all) on each cache update - Executes the graph when criteria are met
- Delivers terminal node output to downstream consumers
No manual execute() call is needed – the runtime drives the graph automatically. See the Packaging guide for details on deploying computation graphs.
A three-node pipeline that ingests order book data, computes spread in basis points, and formats the output.
import cloaca
# 1. Define accumulator
@cloaca.passthrough_accumulator
def orderbook(event):
return {"best_bid": event["bid"], "best_ask": event["ask"]}
# 2. Declare the reactor that fires the graph
@cloaca.reactor(
name="pricing_pipeline_reactor",
accumulators=["orderbook"],
mode="when_any",
)
class PricingPipelineReactor:
pass
# 3. Declare graph topology and define nodes
with cloaca.ComputationGraphBuilder(
"pricing_pipeline",
reactor=PricingPipelineReactor,
graph={
"ingest": {"inputs": ["orderbook"], "next": "compute_spread"},
"compute_spread": {"next": "format_output"},
"format_output": {},
},
) as builder:
@cloaca.node
def ingest(orderbook):
if orderbook is None:
return {"spread": 0.0, "mid_price": 0.0}
spread = orderbook["best_ask"] - orderbook["best_bid"]
mid_price = (orderbook["best_ask"] + orderbook["best_bid"]) / 2.0
return {"spread": spread, "mid_price": mid_price}
@cloaca.node
def compute_spread(input_data):
mid = input_data["mid_price"]
if mid == 0:
return input_data
spread_bps = (input_data["spread"] / mid) * 10_000
return {"spread_bps": spread_bps, "mid_price": mid}
@cloaca.node
def format_output(input_data):
return {
"message": f"Mid: {input_data['mid_price']:.2f}, "
f"Spread: {input_data['spread_bps']:.1f} bps",
}
# 4. Execute
raw_event = {"bid": 100.50, "ask": 100.55}
processed = orderbook(raw_event)
result = builder.execute({"orderbook": processed})
print(result["message"])
# Mid: 100.52, Spread: 5.0 bps
A market-making graph that takes two inputs, makes a trade/no-trade decision, and routes to different handlers.
import cloaca
@cloaca.passthrough_accumulator
def orderbook(event):
return event
@cloaca.passthrough_accumulator
def pricing(event):
return event
@cloaca.reactor(
name="market_maker_reactor",
accumulators=["orderbook", "pricing"],
mode="when_any",
)
class MarketMakerReactor:
pass
with cloaca.ComputationGraphBuilder(
"market_maker",
reactor=MarketMakerReactor,
graph={
"decision": {
"inputs": ["orderbook", "pricing"],
"routes": {
"Trade": "signal_handler",
"NoAction": "audit_logger",
},
},
"signal_handler": {},
"audit_logger": {},
},
) as builder:
@cloaca.node
def decision(orderbook, pricing):
if orderbook is None:
return ("NoAction", {"reason": "no order book data"})
spread = orderbook["best_ask"] - orderbook["best_bid"]
if spread < 0.20:
return ("Trade", {"direction": "BUY", "price": orderbook["best_bid"]})
return ("NoAction", {"reason": f"spread too wide: {spread:.2f}"})
@cloaca.node
def signal_handler(signal):
return {"executed": True, "message": f"{signal['direction']} @ {signal['price']:.2f}"}
@cloaca.node
def audit_logger(reason):
return {"logged": True, "reason": reason["reason"]}
# Tight spread -> Trade
result = builder.execute({
"orderbook": {"best_bid": 100.00, "best_ask": 100.10},
"pricing": {"mid_price": 100.05},
})
print(result) # {"executed": True, "message": "BUY @ 100.00"}
# Wide spread -> NoAction
result = builder.execute({
"orderbook": {"best_bid": 99.50, "best_ask": 100.50},
"pricing": {"mid_price": 100.00},
})
print(result) # {"logged": True, "reason": "spread too wide: 1.00"}
import cloaca
@cloaca.reactor(name="src_reactor", accumulators=["src"], mode="when_any")
class SrcReactor:
pass
# Error: @cloaca.node outside a builder context
try:
@cloaca.node
def orphan_node(data):
return data
except ValueError as e:
print(f"Context error: {e}")
# "@cloaca.node must be used inside a ComputationGraphBuilder context manager"
# Error: topology references a node with no matching function
try:
with cloaca.ComputationGraphBuilder(
"broken_graph",
reactor=SrcReactor,
graph={"missing_node": {"inputs": ["src"]}},
) as builder:
pass # no @cloaca.node defined
except AttributeError as e:
print(f"Missing node: {e}")
# "topology references node 'missing_node' but no @cloaca.node function ..."
# Error: decorated function not in topology
try:
with cloaca.ComputationGraphBuilder(
"extra_graph",
reactor=SrcReactor,
graph={"ingest": {"inputs": ["src"]}},
) as builder:
@cloaca.node
def ingest(src):
return src
@cloaca.node
def extra_function(data):
return data
except ValueError as e:
print(f"Extra node: {e}")
# "function 'extra_function' was decorated with @cloaca.node but does not appear ..."
# Error: executing before the with block exits
try:
result = builder.execute({"src": {"key": "value"}})
except ValueError as e:
print(f"Not built: {e}")
# "graph '...' not built yet — call execute after the 'with' block exits"
# Error: routing node returns wrong type
# Routing nodes must return a (str, dict) tuple.
# Returning a plain dict raises an error at execution time.
- Computation Graph Tutorials – step-by-step tutorials covering linear graphs, accumulators, and routing
- WorkflowBuilder – the task-based workflow builder (different execution model)
- DefaultRunner – runs task-based workflows (not computation graphs)