Not fully written yet.
The Bluesky Firehose exposes an authenticated stream of all events that take place on the Bluesky social network. The Firehose is a powerful tool that is considered part of Bluesky’s core network infrastructure.
In these notes, I’ll describe how to decode Firehose events using only the Python standard library.
The Firehose can be hard to work with. Firehose events are surfaced in a low-level binary data format that takes a little bit of work to decode.
If you want to play with Bluesky data in realtime you’re almost certainly better off using the Bluesky Jetstream, which surfaces events as simple JSON objects. Most of the time, the Jetstream is all you need.
There are some tradeoffs:
I’ve written a separate set of notes on how to use the Jetstream with just a few lines of Python.
Firehose events are encoded using the Concise Binary Object Representation (CBOR), a relatively straightforward binary data format. Think of it as “JSON, but with bytes”. Or close enough.
But the firehose doesn’t use vanilla CBOR. Instead, it uses a restrictive subset of CBOR called DAG-CBOR that implements the ATProto data model, a variant of the Interplanetary Linked Data (IPLD) data model.
That’s a mouthful! In practice, the use of a CBOR subset means we can write fewer lines of Python code to decode Firehose events. We can mostly ignore the IPLD data model.
Let’s start by processing Firehose events. Each event contains two CBOR-encoded objects concatenated together: a header and a payload.
Here’s simple code to loop over Firehose events and print them out:
from io import BytesIO
# We take a dependency on httpx websockets, but no other libraries
from httpx_ws import connect_ws
BSKY_FIREHOSE = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
with connect_ws(BSKY_FIREHOSE) as ws:
while True:
event = ws.receive_bytes()
with BytesIO(frame) as bio:
header, payload = read_dag_cbor(bio), read_dag_cbor(bio)
print(header, payload)
We’ll get to the implementation of read_dag_cbor()
in a moment, but for now let’s look to see what we get:
$ ./firehose.py
{'t': '#commit', 'op': 1} {'ops': [{'cid': 'bafy...', 'path': 'app.bsky.feed.post/abcd', 'action': 'create'}]...
# ... so much data!
Event headers contain a top-level event type. In this case, we see the event is a commit
event. This means that some user or service has made changes to a Bluesky repository.
TODO: write about the event types that the firehose surfaces. Unfortunately, this is comically underspecified. In particular, Bluesky’s official Firehose documentation says nothing. Nor does the official ATProto documentation. The only true source of information I can find is the Indigo codebase itself, which shows me that the following headers are possible:
#commit
(common, and with well-specified schema for the payload)#handle
, #identity
, and #account
#tombstone
event that I still see in practice when connecting to the Firehose#migrate
event (presumably for data migration, but can’t find any documentation)#info
event (TODO what is this?)Honestly, teasing all this apart is a bit of a mess.
Further links for teasing apart the specification:
app.bsky.feed.post
event are handyTODO DAVE: so much to say here. None of this is complicated; the only trick was finding the right documentation for each bit of this code. (That took me a little while.) Hopefully I’ve linked each bit of code to the appropriate references…
# See the IANA registry for CBOR tags at
# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml
CID_CBOR_TAG = 42
def read_dag_cbor(stream: t.IO[bytes]) -> t.Any:
"""
Decodes a DAG_CBOR encoded byte string from the given stream.
The base CBOR specification is RFC 8949; details at https://cbor.io
There's a useful CBOR playground at https://cbor.me
DAG_CBOR is a more restrictive variant of CBOR defined in IPLD; see:
https://ipld.io/specs/codecs/dag-cbor/spec/
"""
initial_byte = stream.read(1)
if not initial_byte:
raise ValueError("Unexpected end of input while decoding CBOR.")
initial_value = initial_byte[0]
major_type = initial_value >> 5
additional_info = initial_value & 0x1F
if major_type == 0: # Unsigned integer
return read_cbor_uint(stream, additional_info)
elif major_type == 1: # Negative integer
return -1 - read_cbor_uint(stream, additional_info)
elif major_type == 2: # Byte string
length = read_cbor_uint(stream, additional_info)
return stream.read(length)
elif major_type == 3: # Text string
length = read_cbor_uint(stream, additional_info)
return stream.read(length).decode("utf-8")
elif major_type == 4: # Array
length = read_cbor_uint(stream, additional_info)
return [read_dag_cbor(stream) for _ in range(length)]
elif major_type == 5: # Map
length = read_cbor_uint(stream, additional_info)
return {read_dag_cbor(stream): read_dag_cbor(stream) for _ in range(length)}
elif major_type == 6: # Tagged item
# DAG_CBOR *requires* all tags to be of type 42 (IPLD CID)
# We convert these to base32 CID strings by default
tag = read_cbor_uint(stream, additional_info)
if tag != CID_CBOR_TAG:
raise ValueError(f"Unsupported CBOR tag {tag} in DAG_CBOR.")
value = read_dag_cbor(stream)
return encode_dag_cbor_cid(value)
elif major_type == 7: # Simple values and floats
if additional_info == 20: # False
return False
elif additional_info == 21: # True
return True
elif additional_info == 22: # Null
return None
elif additional_info == 23: # Undefined
# Technically, this is not supported in DAG_CBOR. But we'll allow it.
return None # CBOR 'undefined' is translated as None
elif additional_info == 25: # Half-precision float (not implemented)
raise NotImplementedError("Half-precision floats are not supported.")
elif additional_info == 26: # Single-precision float
return struct.unpack(">f", stream.read(4))[0]
elif additional_info == 27: # Double-precision float
return struct.unpack(">d", stream.read(8))[0]
else:
raise ValueError(
f"Unsupported simple value with additional info {additional_info}."
)
else:
raise ValueError(f"Unsupported DAG_CBOR major type {major_type}.")
def read_cbor_uint(stream: t.IO[bytes], additional_info: int) -> int:
"""
Parses an unsigned integer from the stream based on the additional information.
See https://cbor.io/spec.html#ints for details.
"""
if additional_info < 24:
return additional_info
elif additional_info == 24:
return struct.unpack(">B", stream.read(1))[0]
elif additional_info == 25:
return struct.unpack(">H", stream.read(2))[0]
elif additional_info == 26:
return struct.unpack(">I", stream.read(4))[0]
elif additional_info == 27:
return struct.unpack(">Q", stream.read(8))[0]
else:
raise ValueError(
f"Unsupported additional information for integer parsing: {additional_info}."
)
def encode_dag_cbor_cid(value: bytes) -> str:
"""
Convert a CID tag value to a base32 encoded CID string with a multibase prefix.
This is the default representation for CIDs used elsewhere in the ATProto,
and in examples, so it should be familiar.
A CID (Content Identifier) is a multiformats self-describing
content-addressed identifier.
See the specification for CIDs in general at:
https://github.com/multiformats/cid
See the specification for CIDs found in DAG_CBOR (aka tag 42) at:
https://github.com/ipld/cid-cbor/
Other useful details about CIDs can be found at:
https://docs.ipfs.tech/concepts/content-addressing/#cid-versions
And the reference Go implementation at: https://github.com/ipfs/go-cid
"""
if len(value) != 37:
raise NotImplementedError("Only DAG_CBOR encoded CIDs are supported.")
multibase_prefix = value[0]
if multibase_prefix != 0x00: # Multibase identity prefix
raise ValueError("DAG_CBOR CIDs must have a multibase identity prefix.")
cid_data = value[1:]
return multibase_encode_b(cid_data)
def multibase_encode_b(b: bytes) -> str:
"""
Encode the given byte string using RFC 4648 base32 encoding, case-insensitive,
without padding. Add a multibase prefix 'b' to indicate the encoding.
See the raw encoding specification at https://tools.ietf.org/html/rfc4648#section-6
See the multibase specification at https://github.com/multiformats/multibase
"""
b32_str = b32encode(b).decode("ascii").replace("=", "").lower()
return f"b{b32_str}"
def read_uvarint(stream: t.IO[bytes]) -> int:
"""
Read a multiformats unsigned varint from the given stream.
See the specification at https://github.com/multiformats/unsigned-varint
And the reference Go implementation at https://github.com/multiformats/go-varint
"""
shift = 0
result = 0
while True:
byte = stream.read(1)
if not byte:
raise ValueError("Unexpected end of input while parsing varint.")
byte_val = byte[0]
result |= (byte_val & 0x7F) << shift
shift += 7
if not (byte_val & 0x80):
break
return result
Data inside events payloads is sometimes represented using IPLD CAR, a binary archive format that is used to store IPLD data structures.
TODO DAVE lots to say here.
def read_carv1(stream: t.IO[bytes]) -> t.Any:
"""
Decodes a CARv1 encoded byte string from the given stream.
CARv1 is a format used for content-addressed archives in IPLD.
See the specification at: https://ipld.io/specs/transport/car/carv2/
(This is the CARv2 specification, but CARv1 is a subset of it.)
See the reference Go implementation at: https://github.com/ipld/go-car
"""
# Dict containing the CAR header, with a 'roots' and a 'version' key
header = read_car_header(stream)
car_version = header["version"]
if car_version != 1:
raise ValueError(f"Unsupported CAR version {car_version}.")
blocks = []
while True:
try:
node = read_car_node(stream)
blocks.append(node)
except ValueError:
break
return {"header": header, "blocks": blocks}
def read_car_header(stream: t.IO[bytes]) -> dict:
"""Read the header of any CAR version from the given stream."""
cbor_bytes = read_car_ld(stream)
with BytesIO(cbor_bytes) as bio:
return read_dag_cbor(bio)
def read_car_node(stream: t.IO[bytes]) -> dict:
"""Read a single CAR node from the given stream."""
bytes = read_car_ld(stream)
cid_bytes = bytes[:36]
cid_str = encode_dag_cbor_cid(b"\00" + cid_bytes)
data_cbor = bytes[36:]
data = read_dag_cbor(BytesIO(data_cbor))
return {"cid": cid_str, "data": data}
def read_car_ld(stream: t.IO[bytes]) -> bytes:
"""Read the CAR link data section from the given stream."""
length = read_uvarint(stream)
return stream.read(length)
TODO DAVE wow, even more to say! Merkle trees! Commit signatures! So much to document here.
TODO DAVE document this
The firehose.py
command line script is currently available on GitHub
All you have to do is download it, and make sure you install Astral’s UV. Then, just run:
$ ./firehose.py
# ... so much data
TODO I still want to write a little more code here; my ultimate goal is to have a flag that causes firehose.py
to emit JSON objects that are schema compatible with whatever the higher-level Jetstream emits. Right now this script still emits the low-level structures (for instance, it emits full commit signature and merkle tree nodes, etc.)