Decoding the Bluesky Firehose with zero Python dependencies

Last updated: November 22, 2024

Not fully written yet.

The Bluesky Firehose exposes an authenticated stream of all events that take place on the Bluesky social network. The Firehose is a powerful tool that is considered part of Bluesky’s core network infrastructure.

In these notes, I’ll describe how to decode Firehose events using only the Python standard library.

But first: Firehose or Jetstream?

The Firehose can be hard to work with. Firehose events are surfaced in a low-level binary data format that takes a little bit of work to decode.

If you want to play with Bluesky data in realtime you’re almost certainly better off using the Bluesky Jetstream, which surfaces events as simple JSON objects. Most of the time, the Jetstream is all you need.

There are some tradeoffs:

  • Jetstream messages are not authenticated. You must trust the Jetstream endpoint you’re connecting to.
  • The Jetstream is not considered core infrastructure. It’s a convenience feature whose API, schema, and availability may change at any time.
  • Not all event types are surfaced. (Probably, the type you care about is.)

I’ve written a separate set of notes on how to use the Jetstream with just a few lines of Python.

Firehose events and DAG-CBOR

Firehose events are encoded using the Concise Binary Object Representation (CBOR), a relatively straightforward binary data format. Think of it as “JSON, but with bytes”. Or close enough.

But the firehose doesn’t use vanilla CBOR. Instead, it uses a restrictive subset of CBOR called DAG-CBOR that implements the ATProto data model, a variant of the Interplanetary Linked Data (IPLD) data model.

That’s a mouthful! In practice, the use of a CBOR subset means we can write fewer lines of Python code to decode Firehose events. We can mostly ignore the IPLD data model.

Decoding Firehose events

Let’s start by processing Firehose events. Each event contains two CBOR-encoded objects concatenated together: a header and a payload.

Here’s simple code to loop over Firehose events and print them out:

from io import BytesIO

# We take a dependency on httpx websockets, but no other libraries
from httpx_ws import connect_ws

BSKY_FIREHOSE = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"

with connect_ws(BSKY_FIREHOSE) as ws:
	while True:
		event = ws.receive_bytes()
		with BytesIO(frame) as bio:
			header, payload = read_dag_cbor(bio), read_dag_cbor(bio)
			print(header, payload)

We’ll get to the implementation of read_dag_cbor() in a moment, but for now let’s look to see what we get:

$ ./firehose.py
{'t': '#commit', 'op': 1} {'ops': [{'cid': 'bafy...', 'path': 'app.bsky.feed.post/abcd', 'action': 'create'}]...
# ... so much data!

Event headers contain a top-level event type. In this case, we see the event is a commit event. This means that some user or service has made changes to a Bluesky repository.

TODO: write about the event types that the firehose surfaces. Unfortunately, this is comically underspecified. In particular, Bluesky’s official Firehose documentation says nothing. Nor does the official ATProto documentation. The only true source of information I can find is the Indigo codebase itself, which shows me that the following headers are possible:

  • An error frame (TODO what is the payload, if any?)
  • A repository #commit (common, and with well-specified schema for the payload)
  • Three account-related events whose exact semantics seem to overlap a little and be in flux: #handle, #identity, and #account
  • A deprecated #tombstone event that I still see in practice when connecting to the Firehose
  • A #migrate event (presumably for data migration, but can’t find any documentation)
  • A network information #info event (TODO what is this?)

Honestly, teasing all this apart is a bit of a mess.

Further links for teasing apart the specification:

  1. The Indigo codebase, which implements the production firehose service in golang; when in doubt, code is truth
  2. The ATProto typescript Firehose consumer interfaces, which has a few comments like “#tombstone is deprecated, use #account event instead”, or “semantics of this field may evolve over time”, although it mostly fails to document the meaning of individual fields
  3. For working with commit events, the ATProto lexicons themselves, for instance for an app.bsky.feed.post event are handy

Decoding DAG-CBOR

TODO DAVE: so much to say here. None of this is complicated; the only trick was finding the right documentation for each bit of this code. (That took me a little while.) Hopefully I’ve linked each bit of code to the appropriate references…

# See the IANA registry for CBOR tags at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml
CID_CBOR_TAG = 42


def read_dag_cbor(stream: t.IO[bytes]) -> t.Any:
    """
    Decodes a DAG_CBOR encoded byte string from the given stream.

    The base CBOR specification is RFC 8949; details at https://cbor.io

    There's a useful CBOR playground at https://cbor.me

    DAG_CBOR is a more restrictive variant of CBOR defined in IPLD; see:
    https://ipld.io/specs/codecs/dag-cbor/spec/

    """
    initial_byte = stream.read(1)
    if not initial_byte:
        raise ValueError("Unexpected end of input while decoding CBOR.")
    initial_value = initial_byte[0]

    major_type = initial_value >> 5
    additional_info = initial_value & 0x1F

    if major_type == 0:  # Unsigned integer
        return read_cbor_uint(stream, additional_info)
    elif major_type == 1:  # Negative integer
        return -1 - read_cbor_uint(stream, additional_info)
    elif major_type == 2:  # Byte string
        length = read_cbor_uint(stream, additional_info)
        return stream.read(length)
    elif major_type == 3:  # Text string
        length = read_cbor_uint(stream, additional_info)
        return stream.read(length).decode("utf-8")
    elif major_type == 4:  # Array
        length = read_cbor_uint(stream, additional_info)
        return [read_dag_cbor(stream) for _ in range(length)]
    elif major_type == 5:  # Map
        length = read_cbor_uint(stream, additional_info)
        return {read_dag_cbor(stream): read_dag_cbor(stream) for _ in range(length)}
    elif major_type == 6:  # Tagged item
        # DAG_CBOR *requires* all tags to be of type 42 (IPLD CID)
        # We convert these to base32 CID strings by default
        tag = read_cbor_uint(stream, additional_info)
        if tag != CID_CBOR_TAG:
            raise ValueError(f"Unsupported CBOR tag {tag} in DAG_CBOR.")
        value = read_dag_cbor(stream)
        return encode_dag_cbor_cid(value)
    elif major_type == 7:  # Simple values and floats
        if additional_info == 20:  # False
            return False
        elif additional_info == 21:  # True
            return True
        elif additional_info == 22:  # Null
            return None
        elif additional_info == 23:  # Undefined
            # Technically, this is not supported in DAG_CBOR. But we'll allow it.
            return None  # CBOR 'undefined' is translated as None
        elif additional_info == 25:  # Half-precision float (not implemented)
            raise NotImplementedError("Half-precision floats are not supported.")
        elif additional_info == 26:  # Single-precision float
            return struct.unpack(">f", stream.read(4))[0]
        elif additional_info == 27:  # Double-precision float
            return struct.unpack(">d", stream.read(8))[0]
        else:
            raise ValueError(
                f"Unsupported simple value with additional info {additional_info}."
            )
    else:
        raise ValueError(f"Unsupported DAG_CBOR major type {major_type}.")


def read_cbor_uint(stream: t.IO[bytes], additional_info: int) -> int:
    """
    Parses an unsigned integer from the stream based on the additional information.

    See https://cbor.io/spec.html#ints for details.
    """
    if additional_info < 24:
        return additional_info
    elif additional_info == 24:
        return struct.unpack(">B", stream.read(1))[0]
    elif additional_info == 25:
        return struct.unpack(">H", stream.read(2))[0]
    elif additional_info == 26:
        return struct.unpack(">I", stream.read(4))[0]
    elif additional_info == 27:
        return struct.unpack(">Q", stream.read(8))[0]
    else:
        raise ValueError(
            f"Unsupported additional information for integer parsing: {additional_info}."
        )
		

def encode_dag_cbor_cid(value: bytes) -> str:
    """
    Convert a CID tag value to a base32 encoded CID string with a multibase prefix.

    This is the default representation for CIDs used elsewhere in the ATProto,
    and in examples, so it should be familiar.

    A CID (Content Identifier) is a multiformats self-describing
    content-addressed identifier.

    See the specification for CIDs in general at:
    https://github.com/multiformats/cid

    See the specification for CIDs found in DAG_CBOR (aka tag 42) at:
    https://github.com/ipld/cid-cbor/

    Other useful details about CIDs can be found at:
    https://docs.ipfs.tech/concepts/content-addressing/#cid-versions

    And the reference Go implementation at: https://github.com/ipfs/go-cid
    """
    if len(value) != 37:
        raise NotImplementedError("Only DAG_CBOR encoded CIDs are supported.")
    multibase_prefix = value[0]
    if multibase_prefix != 0x00:  # Multibase identity prefix
        raise ValueError("DAG_CBOR CIDs must have a multibase identity prefix.")
    cid_data = value[1:]
    return multibase_encode_b(cid_data)


def multibase_encode_b(b: bytes) -> str:
    """
    Encode the given byte string using RFC 4648 base32 encoding, case-insensitive,
    without padding. Add a multibase prefix 'b' to indicate the encoding.

    See the raw encoding specification at https://tools.ietf.org/html/rfc4648#section-6

    See the multibase specification at https://github.com/multiformats/multibase
    """
    b32_str = b32encode(b).decode("ascii").replace("=", "").lower()
    return f"b{b32_str}"


def read_uvarint(stream: t.IO[bytes]) -> int:
    """
    Read a multiformats unsigned varint from the given stream.

    See the specification at https://github.com/multiformats/unsigned-varint

    And the reference Go implementation at https://github.com/multiformats/go-varint
    """
    shift = 0
    result = 0

    while True:
        byte = stream.read(1)
        if not byte:
            raise ValueError("Unexpected end of input while parsing varint.")
        byte_val = byte[0]
        result |= (byte_val & 0x7F) << shift
        shift += 7
        if not (byte_val & 0x80):
            break

    return result

Decoding blocks and IPLD CAR files

Data inside events payloads is sometimes represented using IPLD CAR, a binary archive format that is used to store IPLD data structures.

TODO DAVE lots to say here.

def read_carv1(stream: t.IO[bytes]) -> t.Any:
    """
    Decodes a CARv1 encoded byte string from the given stream.

    CARv1 is a format used for content-addressed archives in IPLD.

    See the specification at: https://ipld.io/specs/transport/car/carv2/
    (This is the CARv2 specification, but CARv1 is a subset of it.)

    See the reference Go implementation at: https://github.com/ipld/go-car
    """
    # Dict containing the CAR header, with a 'roots' and a 'version' key
    header = read_car_header(stream)
    car_version = header["version"]
    if car_version != 1:
        raise ValueError(f"Unsupported CAR version {car_version}.")
    blocks = []
    while True:
        try:
            node = read_car_node(stream)
            blocks.append(node)
        except ValueError:
            break
    return {"header": header, "blocks": blocks}


def read_car_header(stream: t.IO[bytes]) -> dict:
    """Read the header of any CAR version from the given stream."""
    cbor_bytes = read_car_ld(stream)
    with BytesIO(cbor_bytes) as bio:
        return read_dag_cbor(bio)


def read_car_node(stream: t.IO[bytes]) -> dict:
    """Read a single CAR node from the given stream."""
    bytes = read_car_ld(stream)
    cid_bytes = bytes[:36]
    cid_str = encode_dag_cbor_cid(b"\00" + cid_bytes)
    data_cbor = bytes[36:]
    data = read_dag_cbor(BytesIO(data_cbor))
    return {"cid": cid_str, "data": data}


def read_car_ld(stream: t.IO[bytes]) -> bytes:
    """Read the CAR link data section from the given stream."""
    length = read_uvarint(stream)
    return stream.read(length)

TODO DAVE wow, even more to say! Merkle trees! Commit signatures! So much to document here.

A zero dependency command-line script for decoding Firehose events

TODO DAVE document this

The firehose.py command line script is currently available on GitHub

All you have to do is download it, and make sure you install Astral’s UV. Then, just run:

$ ./firehose.py
# ... so much data

TODO I still want to write a little more code here; my ultimate goal is to have a flag that causes firehose.py to emit JSON objects that are schema compatible with whatever the higher-level Jetstream emits. Right now this script still emits the low-level structures (for instance, it emits full commit signature and merkle tree nodes, etc.)