Skip to content

Infrastructure

WebSocket at HFT Scale: Connection Multiplexing, Sequence Resync, and Backpressure Patterns

How to manage WebSocket connections to 12 concurrent exchanges without data loss, sequence gaps, or backpressure failures. Lessons from Akuna Capital's multi-venue trading infrastructure.

14 min
#websocket #hft #exchange-connectivity #python #asyncio #market-data

At Akuna Capital, we maintained live WebSocket connections to 12 exchanges simultaneously. Each exchange sent market data at different rates, used different sequence numbering schemes, had different reconnection protocols, and failed in different ways. The infrastructure that managed this was one of the most operationally demanding systems I’ve worked on - not because any individual exchange was complicated, but because you couldn’t afford a failure on any of them at any time.

The naive implementation - open a connection, read messages in a loop, reconnect on error - works fine in a demo. In production, it burns you in four distinct ways: you reconnect but aren’t resynchronized, you miss sequence numbers without knowing it, your fast exchange backs up behind your slow strategy, and your Python event loop stalls during a burst. This post covers all four.

WS Framing Overhead vs Raw TCP

Before getting into the reconnection machinery, it’s worth understanding why most institutional HFT uses raw TCP or UDP for internal market data and reserves WebSocket for exchange-facing connections.

A WebSocket frame adds 2-14 bytes of overhead per message depending on payload size:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

For a 48-byte order book update, the 2-byte minimum header adds ~4% overhead. More importantly, WebSocket requires a TCP connection with TLS, adding 1-3 RTTs for the initial handshake. For exchange connections, you’re paying this handshake cost once per session and then amortizing it over thousands of messages - acceptable. For your internal market data distribution where you’re rebuilding connections every time a process restarts, it adds up. Use raw TCP internally.

For exchange connections, WebSocket is effectively mandatory. Every major crypto exchange (Binance, OKX, Bybit, Deribit, Kraken) uses WebSocket for streaming market data. The question isn’t whether to use it, but how to operate it correctly.

The “Reconnected vs Resynchronized” Distinction

This is the most important concept in this entire post. I have seen this mistake in production code at multiple firms.

Reconnected means your WebSocket connection is open and receiving messages.

Resynchronized means you have a consistent, gap-free view of the exchange’s state.

These are not the same thing. When you reconnect to Binance’s depth stream, for example, the stream resumes from the current state - it does not replay the messages you missed during the disconnect. If the order book moved while you were disconnected, you now have a stale book that appears fresh.

The correct reconnection flow for an order book stream:

1. WebSocket disconnects (or gap detected)
2. Buffer all incoming diff messages (do not apply them yet)
3. Request a REST snapshot: GET /api/v3/depth?symbol=BTCUSDT&limit=5000
4. Find the snapshot's lastUpdateId (call it U)
5. Drop all buffered diffs where U_diff.final <= U
6. Verify: first accepted diff must have U_diff.first <= U+1 <= U_diff.final
7. Apply remaining buffered diffs and continue
8. Mark state as SYNCHRONIZED

If you skip step 2-7 and just start applying diffs from the reconnected stream, you will apply diffs against the wrong base state. Your order book will be wrong in a way that’s hard to detect - bids and asks will be plausible but slightly off from reality. Your strategy will trade on stale data.

The Reconnection State Machine

Here is the state machine I use for all exchange WebSocket connections. It makes the “reconnected vs resynchronized” distinction explicit in code:

import asyncio
import aiohttp
import enum
import logging
from dataclasses import dataclass, field
from typing import Callable, Awaitable, Any
from collections import deque

logger = logging.getLogger(__name__)


class WSState(enum.Enum):
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"          # TCP open, messages flowing
    SYNCHRONIZING = "synchronizing"  # Fetching snapshot
    SYNCHRONIZED = "synchronized"    # State consistent, safe to trade


@dataclass
class WSConnectionConfig:
    exchange: str
    ws_url: str
    snapshot_url: str
    symbol: str
    max_reconnect_delay: float = 30.0
    ping_interval: float = 20.0
    sequence_gap_tolerance: int = 0  # 0 = no gaps tolerated


class ExchangeWSManager:
    """
    Manages a single WebSocket stream to a crypto exchange.
    Handles reconnection, resynchronization, and sequence gap detection.
    """

    def __init__(
        self,
        config: WSConnectionConfig,
        on_message: Callable[[dict], Awaitable[None]],
        on_state_change: Callable[[WSState], Awaitable[None]],
    ):
        self.config = config
        self.on_message = on_message
        self.on_state_change = on_state_change

        self.state = WSState.DISCONNECTED
        self._diff_buffer: deque[dict] = deque(maxlen=10000)
        self._snapshot_id: int | None = None
        self._last_seq: int | None = None
        self._reconnect_delay: float = 1.0
        self._session: aiohttp.ClientSession | None = None

    async def run(self) -> None:
        """Main loop. Runs until cancelled."""
        async with aiohttp.ClientSession() as session:
            self._session = session
            while True:
                try:
                    await self._connect_and_run()
                    # If we get here without exception, we were cleanly disconnected
                    self._reconnect_delay = 1.0
                except asyncio.CancelledError:
                    raise
                except Exception as e:
                    logger.error(
                        "WS error on %s/%s: %s - reconnecting in %.1fs",
                        self.config.exchange,
                        self.config.symbol,
                        e,
                        self._reconnect_delay,
                    )
                    await self._transition(WSState.DISCONNECTED)
                    await asyncio.sleep(self._reconnect_delay)
                    self._reconnect_delay = min(
                        self._reconnect_delay * 2,
                        self.config.max_reconnect_delay,
                    )

    async def _connect_and_run(self) -> None:
        await self._transition(WSState.CONNECTING)

        async with self._session.ws_connect(
            self.config.ws_url,
            heartbeat=self.config.ping_interval,
            receive_timeout=self.config.ping_interval * 3,
        ) as ws:
            await self._transition(WSState.CONNECTED)

            # Start buffering immediately - don't apply yet
            self._diff_buffer.clear()
            self._last_seq = None

            # Kick off resync in parallel with buffering
            sync_task = asyncio.create_task(self._synchronize())

            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    await self._handle_message(data)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    raise Exception(f"WS error frame: {ws.exception()}")
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    sync_task.cancel()
                    return

            sync_task.cancel()

    async def _synchronize(self) -> None:
        """
        Fetch REST snapshot and establish consistent state.
        This runs concurrently with message buffering.
        """
        await self._transition(WSState.SYNCHRONIZING)

        try:
            async with self._session.get(self.config.snapshot_url) as resp:
                snapshot = await resp.json()

            self._snapshot_id = snapshot['lastUpdateId']

            # Drain the buffer: discard diffs that are fully before the snapshot
            while self._diff_buffer:
                diff = self._diff_buffer[0]
                if diff.get('U', diff.get('u', 0)) <= self._snapshot_id:
                    self._diff_buffer.popleft()
                else:
                    break

            # Validate the first remaining diff
            if self._diff_buffer:
                first_diff = self._diff_buffer[0]
                first_u = first_diff.get('U', first_diff.get('pu', 0))
                last_u = first_diff.get('u', 0)
                if not (first_u <= self._snapshot_id + 1 <= last_u):
                    logger.warning(
                        "Snapshot gap on %s: snapshot=%d, first_diff=[%d,%d] - re-snapshotting",
                        self.config.symbol,
                        self._snapshot_id,
                        first_u,
                        last_u,
                    )
                    # Retry from the beginning
                    await self._synchronize()
                    return

            # Apply snapshot to local state (omitted: actual book management)
            await self._apply_snapshot(snapshot)

            # Apply buffered diffs
            while self._diff_buffer:
                diff = self._diff_buffer.popleft()
                await self._apply_diff(diff)

            self._last_seq = self._snapshot_id
            await self._transition(WSState.SYNCHRONIZED)

        except Exception as e:
            logger.error("Sync failed for %s: %s", self.config.symbol, e)
            raise

    async def _handle_message(self, data: dict) -> None:
        if self.state == WSState.SYNCHRONIZED:
            # Check sequence continuity
            seq = data.get('u')
            prev_seq = data.get('U')
            if seq is not None and self._last_seq is not None:
                expected = self._last_seq + 1
                if prev_seq > expected:
                    # GAP DETECTED
                    logger.error(
                        "Sequence gap on %s: expected %d, got %d - resyncing",
                        self.config.symbol,
                        expected,
                        prev_seq,
                    )
                    await self._transition(WSState.CONNECTED)
                    self._diff_buffer.clear()
                    asyncio.create_task(self._synchronize())
                    return
                self._last_seq = seq

            await self.on_message(data)

        elif self.state in (WSState.CONNECTED, WSState.SYNCHRONIZING):
            # Buffer for resync
            self._diff_buffer.append(data)

    async def _apply_snapshot(self, snapshot: dict) -> None:
        # Actual order book update - implement per exchange schema
        pass

    async def _apply_diff(self, diff: dict) -> None:
        # Actual order book update - implement per exchange schema
        pass

    async def _transition(self, new_state: WSState) -> None:
        if self.state != new_state:
            self.state = new_state
            await self.on_state_change(new_state)

Sequence Number Tracking Per Exchange

Every major exchange has a different sequence numbering scheme. You need to understand each one precisely:

Binance USDT-M Futures depth stream:

  • Each diff message has U (first update ID) and u (last update ID)
  • Update IDs are monotonically increasing integers
  • Gap detection: message.U == last_message.u + 1 (no gaps in continuous stream)
  • The REST snapshot returns lastUpdateId; first valid diff satisfies U <= lastUpdateId + 1 <= u

Binance Spot depth stream:

  • Same field names as futures but different semantics - spot uses event time (E) as secondary sort, not just sequence
  • Snapshots return lastUpdateId with no server-side gap detection

OKX:

  • Uses seqId and prevSeqId fields
  • prevSeqId of message N must equal seqId of message N-1
  • Missing prevSeqId (first message after subscribe) is expected - don’t treat as gap
  • Reset (explicit re-snapshot) triggered when action: "snapshot" arrives vs normal action: "update"

Bybit:

  • Linear perps use seq field in the topic data
  • Spot uses different field names from linear - do not reuse parsers between product types
  • Sends a full snapshot on subscription, then incremental updates
  • No prevSeqId equivalent - you must track seq N-1 and compare to seq N

Abstracting across these requires a per-exchange sequence validator:

from abc import ABC, abstractmethod
from typing import Optional


class SequenceValidator(ABC):
    @abstractmethod
    def validate(self, message: dict) -> Optional[str]:
        """Returns error string if gap detected, None if OK."""
        pass

    @abstractmethod
    def update(self, message: dict) -> None:
        """Update internal state after a valid message."""
        pass


class BinanceSequenceValidator(SequenceValidator):
    def __init__(self):
        self._last_u: Optional[int] = None

    def validate(self, message: dict) -> Optional[str]:
        U = message.get('U')
        u = message.get('u')
        if U is None or u is None:
            return None  # Non-depth message, skip validation

        if self._last_u is not None:
            if U != self._last_u + 1:
                return f"Gap: expected U={self._last_u + 1}, got U={U}"
        return None

    def update(self, message: dict) -> None:
        u = message.get('u')
        if u is not None:
            self._last_u = u


class OKXSequenceValidator(SequenceValidator):
    def __init__(self):
        self._last_seq_id: Optional[int] = None

    def validate(self, message: dict) -> Optional[str]:
        data = message.get('data', [{}])[0]
        seq_id = data.get('seqId')
        prev_seq_id = data.get('prevSeqId')

        if seq_id is None:
            return None

        if self._last_seq_id is not None and prev_seq_id is not None:
            if prev_seq_id != self._last_seq_id:
                return f"Gap: expected prevSeqId={self._last_seq_id}, got {prev_seq_id}"
        return None

    def update(self, message: dict) -> None:
        data = message.get('data', [{}])[0]
        seq_id = data.get('seqId')
        if seq_id is not None:
            self._last_seq_id = seq_id

Client-Side Ring Buffer for Burst Absorption

Crypto exchanges send market data in bursts. During high-volatility events, a single Binance WebSocket can deliver 5,000+ messages per second. If your strategy processing takes 200µs per message, you’ll process 5,000 messages in 1 second - your asyncio event loop will fall behind.

The fix is a ring buffer between the I/O layer and the strategy layer:

import asyncio
from collections import deque
import time


class MarketDataRingBuffer:
    """
    Single-producer (WebSocket reader), single-consumer (strategy) ring buffer.
    Tracks overruns for monitoring.
    """

    def __init__(self, maxlen: int = 50_000):
        self._buffer: deque = deque(maxlen=maxlen)
        self._maxlen = maxlen
        self._overrun_count = 0
        self._total_received = 0
        self._total_consumed = 0

    def push(self, message: dict) -> None:
        """Called from WebSocket reader coroutine."""
        self._total_received += 1
        if len(self._buffer) >= self._maxlen:
            # Ring buffer full - oldest message will be dropped (deque with maxlen does this)
            self._overrun_count += 1
        self._buffer.append((time.perf_counter_ns(), message))

    def pop_all(self) -> list[tuple[int, dict]]:
        """Called from strategy coroutine. Drains entire buffer."""
        items = list(self._buffer)
        self._buffer.clear()
        self._total_consumed += len(items)
        return items

    @property
    def overrun_rate(self) -> float:
        if self._total_received == 0:
            return 0.0
        return self._overrun_count / self._total_received

    @property
    def depth(self) -> int:
        return len(self._buffer)

The strategy consumer processes the buffer in batches:

async def strategy_loop(buffer: MarketDataRingBuffer) -> None:
    while True:
        messages = buffer.pop_all()

        if not messages:
            await asyncio.sleep(0.001)  # 1ms yield when idle
            continue

        # Process in time order
        for timestamp_ns, msg in messages:
            age_us = (time.perf_counter_ns() - timestamp_ns) / 1000
            if age_us > 5000:  # > 5ms old - we're falling behind
                # Log the backlog, don't skip - but alert ops
                pass

            await process_message(msg)

        # Alert if overrun rate is climbing
        if buffer.overrun_rate > 0.001:  # >0.1% overruns
            logger.warning("Ring buffer overrun rate: %.4f%%", buffer.overrun_rate * 100)

Backpressure When Strategy Is Slower Than Feed

The ring buffer handles transient bursts. It does not solve the case where your strategy is structurally slower than the data feed - where you’re consistently receiving faster than you can process.

In this case, you have three options:

1. Drop old data, keep new. The ring buffer’s maxlen-based eviction already does this. The oldest messages are dropped. For market data, this is usually correct - a 500ms-old order book update is less relevant than the current state.

2. Throttle the source. For exchange WebSocket subscriptions where you control the subscription, subscribe to fewer symbols or lower-resolution streams (e.g., Binance’s 100ms depth stream instead of 0ms).

3. Scale the consumer. Run the strategy in a separate process with shared memory, or split symbols across multiple asyncio event loops pinned to separate CPU cores. See CPU Pinning with isolcpus and nohz for how to do this correctly.

Backpressure shows up in production as: increasing buffer depth, rising overrun rate, and P99 message age exceeding your budget. Set up metrics on all three.

How This Breaks in Production

1. Reconnection completes but book is stale Symptom: After a brief disconnect, your strategy behaves strangely - placing orders at prices that are clearly wrong. The WebSocket shows connected, sequence numbers look fine. Root cause: You reconnected and started applying diff messages, but never fetched a fresh REST snapshot. Your local book state is from before the disconnect and diffs are being applied against the wrong base. Fix: Implement the snapshot-first resync flow. Never mark state as SYNCHRONIZED until you’ve fetched a fresh snapshot and validated the first diff against it.

2. Buffer overflow during exchange reconnect storm Symptom: Ring buffer overrun rate spikes to 10-20% during an exchange API incident when all clients reconnect simultaneously. Your exchange sends a massive snapshot + all queued diffs when you reconnect. The burst overwhelms the ring buffer. Root cause: Exchange restart causes every client to reconnect simultaneously. Each client receives the full order book snapshot (several MB) plus all accumulated diffs. Your ring buffer maxlen of 10,000 messages isn’t enough for the reconnect burst. Fix: Increase ring buffer maxlen to 200,000+ during reconnection events. Detect the reconnect burst (large message size, high inter-message rate) and temporarily drain the buffer more aggressively.

3. Sequence gap on OKX silent disconnect Symptom: On OKX, occasionally your strategy processes data for minutes before you notice you’ve been reading a dead stream. No errors, no disconnect event. Root cause: OKX sends application-level pings. If you don’t respond with pong within 30 seconds, the server closes the connection at the application layer but the TCP connection may remain open. Your asyncio event loop sees no socket error, no data arrives, but you never detect the silence. Fix: Implement a watchdog that tracks time since last message per subscription. If no message arrives within 30 seconds on a normally-active stream, treat it as a disconnect and trigger reconnection.

4. asyncio event loop starvation during burst Symptom: Your P50 message processing latency is 50µs, but P99 is 50ms during volatile markets. The event loop isn’t cancelling connections, it’s just slow. Root cause: A single asyncio event loop can only run one coroutine at a time. If your message handler does any I/O (logging to file, writing to database, calling REST APIs), it yields the event loop. During a burst, the queue of pending I/O operations grows, and message processing waits behind them. Fix: Make your hot-path message handler synchronous and non-blocking. Move all I/O (logging, database writes, metrics) to a separate thread or process. Use asyncio.get_event_loop().call_soon_threadsafe() to bridge if needed.

5. Bybit disconnects after 20 seconds of no ping Symptom: Bybit connections randomly drop every 20-30 seconds in dev/test environments where you’re not actively trading. Root cause: Bybit’s WebSocket documentation states the server disconnects if no message (including ping) is received within 20 seconds. During low-activity periods, you’re not sending anything. Fix: Implement application-level pings separate from TCP keepalives. Send {"op": "ping"} every 15 seconds regardless of traffic. The server responds with {"op": "pong"}, which also confirms the connection is alive.

6. Race condition: diff arrives before snapshot request completes Symptom: Intermittently, your resync fails with “Snapshot gap” errors, requiring a second resync. The error rate increases during high-volume periods. Root cause: Between when you start the WebSocket connection and when your REST snapshot request completes, the exchange may publish many diffs. If your diff buffer has limited capacity and fills up during the snapshot request, you lose diffs and create a gap. Fix: Increase diff buffer capacity to at least 10,000 messages. Also, request the snapshot quickly after connection - don’t do any other work between “connected” and “snapshot requested”.


For the exchange-specific sequence number formats in detail, see Binance Connectivity Deep Dive and OKX, Bybit, and Deribit API Guide. For the TCP layer beneath these connections, see TCP Tuning for Trading. For the complete order book reconstruction algorithm, see Order Book Reconstruction at Scale.

Continue Reading

Enjoyed this?

Get one deep infrastructure insight per week.

Free forever. Unsubscribe anytime.

You're in. Check your inbox.