10 - Accumulators
In this tutorial you’ll add an accumulator to your computation graph. Accumulators sit between raw data sources and the graph: they receive events, transform them, and emit the processed values that the graph’s entry nodes consume.
@cloaca.passthrough_accumulator— defining a simple event transformer- How to call the accumulator manually to simulate event processing
- Wiring accumulator output into
builder.execute() - The separation of concerns between raw events and graph inputs
- Completion of Tutorial 09 — Your First Computation Graph
The full source lives at examples/tutorials/python/computation-graphs/10_accumulators.py.
To run it:
python examples/tutorials/python/computation-graphs/10_accumulators.py
A passthrough accumulator transforms one dict shape into another. Decorate a function with @cloaca.passthrough_accumulator and give it a name that matches the source name in your graph topology.
import cloaca
@cloaca.passthrough_accumulator
def pricing(event):
"""Transform a raw pricing event into a pricing signal.
Input event shape: {"mid_price": float}
Output shape: {"price": float, "change_pct": float}
"""
return {"price": event["mid_price"], "change_pct": 0.0}
The function name (pricing) becomes the source name. This must match the key you use in the graph’s react accumulator list and in builder.execute().
The body receives a raw event dict and returns the processed dict that the graph will see. Returning None would suppress the event — useful for filtering duplicates or invalid data.
The graph topology is identical to Tutorial 09 — only the source name changes.
# Declare the reactor that fires the graph (CLOACI-I-0101 split — the
# bundled `react={...}` kwarg was removed in favour of first-class
# `@cloaca.reactor` classes).
@cloaca.reactor(
name="pricing_graph_reactor",
accumulators=["pricing"],
mode="when_any",
)
class PricingGraphReactor:
pass
with cloaca.ComputationGraphBuilder(
"pricing_graph",
reactor=PricingGraphReactor,
graph={
"ingest": {
"inputs": ["pricing"],
"next": "analyze",
},
"analyze": {
"next": "format_signal",
},
"format_signal": {},
},
) as builder:
@cloaca.node
def ingest(pricing):
"""Entry node: receive pricing data from accumulator."""
if pricing is None:
return {"price": 0.0, "change_pct": 0.0}
return pricing # accumulator already shaped the data
@cloaca.node
def analyze(input_data):
"""Analyze pricing for large moves."""
price = input_data["price"]
change_pct = ((price - 100.0) / 100.0) * 100.0 if price > 100.0 else 0.0
return {"price": price, "change_pct": change_pct}
@cloaca.node
def format_signal(input_data):
"""Terminal node: format the signal."""
return {
"message": f"Price: {input_data['price']:.2f}, Change: {input_data['change_pct']:.2f}%",
}
Notice that ingest simply passes its input through — the accumulator already did the heavy lifting of shaping mid_price into the {price, change_pct} structure. This separation keeps nodes focused: accumulators transform raw external data, nodes process structured graph data.
In a live deployment the accumulator runs as part of the computation graph runtime. For this tutorial you call it directly to simulate the pipeline.
events = [
{"mid_price": 99.50},
{"mid_price": 101.25},
{"mid_price": 103.75},
]
for i, event in enumerate(events, 1):
print(f"Event {i}: {event}")
# Step 1: accumulator transforms the raw event
processed = pricing(event)
print(f" Accumulator output: {processed}")
# Step 2: graph processes the accumulator's output
result = builder.execute({"pricing": processed})
print(f" Graph result: {result.get('message', 'N/A')}\n")
Calling pricing(event) invokes your accumulator function and returns the transformed dict. You then pass that dict to builder.execute() under the same source name ("pricing").
In a reactive deployment the runtime handles this automatically — the accumulator feeds the boundary channel, the reactor calls execute() for you. But calling them manually here makes the data flow explicit.
=== Python Tutorial 10: Accumulators ===
Event 1: {'mid_price': 99.5}
Accumulator output: {'price': 99.5, 'change_pct': 0.0}
Graph result: Price: 99.50, Change: 0.00%
Event 2: {'mid_price': 101.25}
Accumulator output: {'price': 101.25, 'change_pct': 0.0}
Graph result: Price: 101.25, Change: 1.25%
Event 3: {'mid_price': 103.75}
Accumulator output: {'price': 103.75, 'change_pct': 0.0}
Graph result: Price: 103.75, Change: 3.75%
=== Tutorial 10 Complete ===
Event 1 produces Change: 0.00% because the price is below 100. Events 2 and 3 compute the percentage above baseline.
Raw event Accumulator Graph entry node
{"mid_price": 99.5} → pricing(event) → ingest(pricing)
↓ ↓
{"price": 99.5, passed as-is
"change_pct": 0.0} to analyze
The accumulator is responsible for:
- Shape translation — converting the external event format to what the graph expects
- Filtering — returning
Noneto suppress unwanted events - Stateful accumulation — maintaining state between events (e.g. computing running averages) — the decorator keeps the function’s local state alive across calls
| Concept | Rust | Python |
|---|---|---|
| Define accumulator | impl Accumulator for MyAcc |
@cloaca.passthrough_accumulator |
| Transform event | fn process(&mut self, event) -> Option<Output> |
function body, return None to suppress |
| Source name | BoundarySender::new(tx, SourceName::new("pricing")) |
function name (pricing) |
| Invoke manually | serialize(event) → socket channel |
pricing(event) |
You’ve added an accumulator to your pipeline:
@cloaca.passthrough_accumulatorwraps a transformation function as a named accumulator- The function name is the source name — it must match in
react,graph, andexecute() - Returning
Nonesuppresses an event; returning a dict passes it to the graph - Calling the accumulator directly and feeding its output to
execute()makes the pipeline explicit during development
- Tutorial 11 — Routing: add conditional branching with tuple-based enum dispatch