Skip to content

Trading

Cross-Venue Inventory Management: Risk Aggregation Across 12 Exchanges in Real Time

Aggregating positions across 12 exchanges with different latency and consistency guarantees, without ever getting the net wrong. Reconciliation loop, split-brain handling, and the failure modes that matter.

10 min
#risk-management #inventory #multi-venue #market-making #exchange-connectivity #position-tracking

This was the hardest infrastructure problem at Akuna. Twelve exchanges, multiple strategies per exchange, dozens of instruments - and a single position management system that had to know the net exposure accurately in real-time. Not “within 5 seconds.” Real-time, under all failure conditions, including when exchanges are sending WebSocket updates with 300ms latency spikes during volatility events.

The problem sounds straightforward: track all your fills, aggregate them, maintain a net position per instrument. In practice, it’s a distributed systems problem with financial consequences if you get it wrong. A position management system that thinks you’re flat when you’re actually net long 10 BTC will let you take on more long exposure - and you’re now net long 20 BTC during a 5% drawdown.

This post is a detailed account of the architecture we built and the failure modes we had to handle.

The Position Aggregation Problem

Each exchange knows your position on that exchange. Binance knows your BTCUSDT perp position. Bybit knows your BTCUSDT perp position. OKX knows theirs. None of them know your aggregate position across venues.

Your system has to maintain that aggregate. It must be:

  • Accurate: Never report a position that differs from the true aggregate by more than your risk tolerance threshold
  • Current: Stale position data that’s 30 seconds old is not acceptable during fast markets
  • Consistent under failure: If a WebSocket disconnects, your system must fall back to REST polling and detect/reconcile any position changes during the gap
  • Correct at startup: When the system restarts, it must reconstitute the correct state from exchange REST APIs before allowing new orders

The Latency Challenge

Fill events from exchanges arrive via WebSocket with non-deterministic latency. On a normal day, Binance user data stream delivers fill events in 20-50ms from execution. But during volatile market conditions - the moments when position accuracy matters most - that latency can spike to 200-500ms.

During that 150-300ms window between execution and WebSocket delivery, your position is wrong by the size of the fill.

The risk: if your risk system checks position against limits during this window, it sees a stale position. If you’re near a risk limit and a large fill just happened, the risk check will approve new orders that push you over the limit - because the last fill hasn’t been received yet.

Two architectural philosophies:

Optimistic: Publish the new aggregate position immediately when the fill arrives. Accept that you might be briefly inconsistent (by the amount of any fills in flight at other venues). This maximizes responsiveness but requires careful risk limit buffer management.

Pessimistic: Don’t update the aggregate position until you’ve confirmed it against the exchange REST API. Adds 2-5 seconds latency to position updates but guarantees consistency. Appropriate for strategies where occasional position inaccuracy is unacceptable (e.g., if you’re managing regulatory capital constraints).

For most market-making operations, the optimistic approach with generous risk limit buffers is the practical choice. The key is to make the inconsistency window measurable and bounded.

The Architecture

Here’s the architecture we built at Akuna, simplified for illustration:

┌─────────────────────────────────────────────────────────────────┐
│                     Per-Exchange Trackers                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │Binance Tracker│  │ Bybit Tracker│  │  OKX Tracker │   ...    │
│  │ fills: WS     │  │ fills: WS    │  │ fills: WS    │          │
│  │ reconcile:REST│  │ reconcile:REST│  │ reconcile:REST│         │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘          │
│         │                 │                 │                    │
└─────────┼─────────────────┼─────────────────┼────────────────────┘
          │                 │                 │
          └─────────────────┼─────────────────┘

                  ┌─────────▼──────────┐
                  │ Aggregation Service │
                  │  net_position(sym)  │
                  │  all_positions()    │
                  │  risk_limit_check() │
                  └─────────┬──────────┘

              ┌─────────────┼─────────────┐
              │             │             │
        ┌─────▼──┐   ┌──────▼─┐   ┌──────▼──┐
        │ Risk   │   │ Order  │   │Monitor/ │
        │ Engine │   │ Router │   │Dashboard│
        └────────┘   └────────┘   └─────────┘

Per-Exchange Trackers maintain position state for a single exchange. They receive fill events from WebSocket streams and update local position state. They run periodic reconciliation against the exchange REST API.

Aggregation Service subscribes to position updates from all per-exchange trackers and maintains the net aggregate. It also enforces risk limits - when a tracker reports a fill that would push aggregate position over a risk limit, it signals to the Order Router to halt new orders.

The Per-Exchange Tracker

import asyncio
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
import logging

logger = logging.getLogger(__name__)

@dataclass
class PositionSnapshot:
    """Position state at a point in time."""
    exchange: str
    instrument: str
    quantity: float  # Positive = long, negative = short
    entry_price: float
    unrealized_pnl: float
    snapshot_time_ms: int
    source: str  # "websocket" or "rest_reconcile"


class ExchangePositionTracker:
    """
    Tracks positions for a single exchange.
    Receives fills via WebSocket, reconciles against REST periodically.
    """

    def __init__(
        self,
        exchange_name: str,
        reconcile_interval_sec: float = 300.0,  # 5 minutes
        max_position_drift_threshold: float = 0.01,  # Alert if drift > 0.01 BTC
    ):
        self.exchange_name = exchange_name
        self.reconcile_interval_sec = reconcile_interval_sec
        self.max_drift_threshold = max_position_drift_threshold

        # Internal position state: instrument → quantity
        self._positions: dict[str, float] = {}
        self._last_fill_seq: dict[str, int] = {}  # sequence number tracking

        # Callbacks
        self._on_position_update: list[Callable] = []
        self._on_drift_detected: list[Callable] = []

        # Reconciliation state
        self._last_reconcile_time: float = 0.0
        self._pending_fills: list[dict] = []  # fills during reconcile gap

    def on_fill(self, fill_event: dict) -> None:
        """
        Process a fill event from the WebSocket user data stream.

        fill_event expected fields:
        - instrument: str (e.g., "BTCUSDT")
        - side: "BUY" or "SELL"
        - quantity: float
        - price: float
        - order_id: str
        - timestamp_ms: int
        """
        instrument = fill_event["instrument"]
        quantity = fill_event["quantity"]
        side = fill_event["side"]

        delta = quantity if side == "BUY" else -quantity

        prev_position = self._positions.get(instrument, 0.0)
        new_position = prev_position + delta
        self._positions[instrument] = new_position

        logger.debug(
            f"{self.exchange_name} fill: {instrument} {side} {quantity:.6f} → "
            f"position now {new_position:.6f}"
        )

        # Notify aggregation layer of position change
        snapshot = PositionSnapshot(
            exchange=self.exchange_name,
            instrument=instrument,
            quantity=new_position,
            entry_price=fill_event["price"],
            unrealized_pnl=0.0,  # Calculated by aggregation layer
            snapshot_time_ms=fill_event["timestamp_ms"],
            source="websocket",
        )
        self._emit_position_update(snapshot)

    def on_rest_position(self, positions: list[dict]) -> None:
        """
        Process position state from REST API reconciliation.
        Compare against WebSocket-derived state and alert on drift.
        """
        for pos in positions:
            instrument = pos["symbol"]
            rest_qty = float(pos["positionAmt"])
            ws_qty = self._positions.get(instrument, 0.0)

            drift = abs(rest_qty - ws_qty)
            if drift > self.max_drift_threshold:
                logger.warning(
                    f"POSITION DRIFT DETECTED: {self.exchange_name} {instrument} "
                    f"WebSocket={ws_qty:.6f} REST={rest_qty:.6f} drift={drift:.6f}"
                )
                for cb in self._on_drift_detected:
                    cb(self.exchange_name, instrument, ws_qty, rest_qty, drift)

            # Reconcile: trust REST as ground truth
            if drift > 0.0:
                self._positions[instrument] = rest_qty
                snapshot = PositionSnapshot(
                    exchange=self.exchange_name,
                    instrument=instrument,
                    quantity=rest_qty,
                    entry_price=float(pos.get("entryPrice", 0)),
                    unrealized_pnl=float(pos.get("unRealizedProfit", 0)),
                    snapshot_time_ms=int(time.time() * 1000),
                    source="rest_reconcile",
                )
                self._emit_position_update(snapshot)

        self._last_reconcile_time = time.time()

    def get_position(self, instrument: str) -> float:
        return self._positions.get(instrument, 0.0)

    def register_position_update_callback(self, cb: Callable) -> None:
        self._on_position_update.append(cb)

    def register_drift_callback(self, cb: Callable) -> None:
        self._on_drift_detected.append(cb)

    def _emit_position_update(self, snapshot: PositionSnapshot) -> None:
        for cb in self._on_position_update:
            cb(snapshot)

The Aggregation Service

import threading
from collections import defaultdict

@dataclass
class AggregatePosition:
    instrument: str
    net_quantity: float
    per_exchange: dict[str, float]  # exchange → quantity
    last_updated_ms: int

class PositionAggregationService:
    """
    Aggregates positions from all per-exchange trackers.
    Thread-safe: tracker callbacks run on exchange-specific threads.
    """

    def __init__(self, risk_limits: dict[str, float]):
        """
        risk_limits: instrument → max absolute net position
        e.g., {"BTCUSDT": 10.0} means max 10 BTC net long or short
        """
        self._risk_limits = risk_limits
        self._lock = threading.RLock()

        # per-exchange positions: exchange → instrument → quantity
        self._per_exchange: dict[str, dict[str, float]] = defaultdict(dict)

        # Derived aggregates (computed from _per_exchange)
        self._aggregate: dict[str, float] = {}  # instrument → net qty

        # Risk breach callbacks
        self._risk_breach_callbacks: list[Callable] = []

    def on_position_snapshot(self, snapshot: PositionSnapshot) -> None:
        """Callback registered with each ExchangePositionTracker."""
        with self._lock:
            old_qty = self._per_exchange[snapshot.exchange].get(snapshot.instrument, 0.0)

            if old_qty == snapshot.quantity:
                return  # No change, skip

            # Update per-exchange state
            self._per_exchange[snapshot.exchange][snapshot.instrument] = snapshot.quantity

            # Recompute aggregate for this instrument
            new_aggregate = sum(
                ex_positions.get(snapshot.instrument, 0.0)
                for ex_positions in self._per_exchange.values()
            )

            old_aggregate = self._aggregate.get(snapshot.instrument, 0.0)
            self._aggregate[snapshot.instrument] = new_aggregate

            # Risk limit check
            limit = self._risk_limits.get(snapshot.instrument)
            if limit and abs(new_aggregate) > limit:
                self._handle_risk_breach(snapshot.instrument, new_aggregate, limit)

            logger.info(
                f"Aggregate position update: {snapshot.instrument} "
                f"{old_aggregate:+.4f}{new_aggregate:+.4f} "
                f"(via {snapshot.exchange}, source={snapshot.source})"
            )

    def _handle_risk_breach(self, instrument: str, net_qty: float, limit: float) -> None:
        """Called when net position exceeds risk limit."""
        logger.critical(
            f"RISK LIMIT BREACH: {instrument} net={net_qty:.4f} limit={limit:.4f}"
        )
        for cb in self._risk_breach_callbacks:
            cb(instrument, net_qty, limit)

    def net_position(self, instrument: str) -> float:
        with self._lock:
            return self._aggregate.get(instrument, 0.0)

    def all_positions(self) -> dict[str, float]:
        with self._lock:
            return dict(self._aggregate)

    def position_by_exchange(self, instrument: str) -> dict[str, float]:
        with self._lock:
            return {
                exchange: positions.get(instrument, 0.0)
                for exchange, positions in self._per_exchange.items()
                if positions.get(instrument, 0.0) != 0.0
            }

Reconciliation: The Safety Net

Reconciliation runs every 5 minutes (configurable) and compares the WebSocket-derived position state against the exchange REST API response. If drift is detected beyond the threshold, the REST position is used as ground truth.

When does drift happen?

  1. WebSocket reconnections: During the disconnect-reconnect window, fills may arrive on the exchange but not reach your tracker. REST reconciliation catches these.
  2. Exchange bugs: Rare but real - exchanges occasionally send malformed or duplicate WebSocket events. A drift of exactly 2× the fill size suggests a duplicate event was processed.
  3. ADL/Liquidation events: These arrive as special event types and may be handled by different code paths. If your ADL handler has a bug, it might miss position updates that REST reconciliation catches.
  4. System restarts: After a restart, the tracker should bootstrap from REST before opening WebSocket subscriptions - never from internal state that may be stale.

The reconciliation frequency trade-off: more frequent = less exposure to undetected drift. Less frequent = less REST API weight consumed and fewer Binance rate limit risks. 5 minutes is a reasonable default for most strategies.

The Startup Sequence

Correct startup order is critical. A common mistake is connecting WebSocket streams before loading the initial position state from REST - this creates a race condition where fills arrive before you know your starting position.

1. Load initial positions from REST (GET /fapi/v2/positionRisk)
2. Record the timestamp of this REST snapshot
3. Connect WebSocket user data stream
4. Replay any fill events from after the REST snapshot timestamp
5. Begin normal operation

In practice, step 4 (replaying fills) is hard because WebSocket streams don’t provide historical replay. The practical solution: after loading REST state and connecting WebSocket, discard any WebSocket fill events with timestamps before the REST snapshot. Accept a brief inconsistency window (typically <1 second) between REST snapshot and WebSocket connection as unavoidable.

For higher-stakes operations: load REST state, wait 2 seconds with WebSocket connected (collecting any fills that arrived during REST loading), then cross-reference the WebSocket events against REST state to detect any fills that occurred during the loading gap.

The Mistakes Most Engineers Make

Mistake 1: Single position state, no per-exchange breakdown. If you only track the aggregate without the per-exchange breakdown, you can’t diagnose drift - you can’t tell if a position discrepancy is on Binance or Bybit. Always store per-exchange AND aggregate state.

Mistake 2: Trusting WebSocket as ground truth. WebSocket is the fast path, not the authoritative path. REST reconciliation is how you verify. Every position management system needs periodic REST reconciliation to catch the cases where WebSocket failed silently.

Mistake 3: No atomic initialization. If your system starts accepting orders before it has loaded initial position state from REST, you start from a false zero position and immediately have wrong aggregate state. The initialization sequence must complete before any order can be sent.

Mistake 4: Race conditions in the aggregation service. Fill events from Binance and Bybit arrive on separate threads. Without locks, the aggregate position state can be corrupted by concurrent updates. Use a proper lock (threading.RLock or asyncio-native approaches) on all writes to shared position state.

Mistake 5: Not alerting on drift. Position drift is a symptom of something wrong - a WebSocket gap, a bug in your fill processing, an ADL event your system missed. If you reconcile silently and correct without alerting, you’ll never know the source of the drift. Alert on every drift event above your threshold and investigate.

The cross-venue inventory management system is the foundation of everything else in your trading infrastructure. If your position state is wrong, your risk limits are wrong, your hedge decisions are wrong, and your PnL is wrong. Build it right before building anything else on top of it.

Continue Reading

Enjoyed this?

Get one deep infrastructure insight per week.

Free forever. Unsubscribe anytime.

You're in. Check your inbox.