Skip to main content

Documentation Index

Fetch the complete documentation index at: https://s2.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

SDKs expose three ways to read from streams:
  • read for a single batch of records.
  • readSession for streaming from a position and optionally following new records.
  • checkTail for inspecting the current end of a stream.

Single-batch read

At the simplest level, you can read a batch of records with a single call:
const batch = await stream.read({
	start: { from: { seqNum: 0 } },
	stop: { limits: { count: 100 } },
});

for (const record of batch.records) {
	console.log(`[${record.seqNum}] ${record.body}`);
}
Single-batch reads return at most one batch: up to 1000 records or 1 MiB of data. For larger reads or to follow the stream in real-time, use a read session.

Read session

A read session streams records from a starting position. It handles reconnection on transient failures and provides a simple iterator interface. Sessions that do not specify a stop condition will continue to follow updates in real-time.

Starting from a sequence number

const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Starting from a tail offset

Read the last N records in the stream, then follow for new ones:
// Start reading from 10 records before the current tail
const session = await stream.readSession({
	start: { from: { tailOffset: 10 } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Starting from a timestamp

Read records starting from a point in time:
// Start reading from a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
	start: { from: { timestamp: oneHourAgo } },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Reading until a timestamp

Read records up to a point in time, then stop:
// Read records until a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
	stop: { untilTimestamp: oneHourAgo },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Following live updates

By default, a read session without stop conditions will follow the stream indefinitely, waiting for new records as they arrive. When you provide a stop condition (count, bytes, or until), the read stops when either the condition is met or it reaches the current tail — whichever comes first. See the API docs on waiting at the tail for the full semantics.

Long polling

The wait parameter controls how long to wait for new records when caught up to the tail. This works for both single-batch reads (long polling) as well as sessions.
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
const session = await stream.readSession({
	start: { from: { seqNum: 0 } },
	stop: { waitSecs: 30 },
});

for await (const record of session) {
	console.log(`[${record.seqNum}] ${record.body}`);
}

Check tail

To find the current end of the stream without reading any records:
const { tail } = await stream.checkTail();
console.log(`Stream has ${tail.seqNum} records`);