Documentation Index
Fetch the complete documentation index at: https://s2.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
SDKs expose three ways to read from streams:
read for a single batch of records.
readSession for streaming from a position and optionally following new records.
checkTail for inspecting the current end of a stream.
Single-batch read
At the simplest level, you can read a batch of records with a single call:
TypeScript
Python
Go
Rust
const batch = await stream.read({
start: { from: { seqNum: 0 } },
stop: { limits: { count: 100 } },
});
for (const record of batch.records) {
console.log(`[${record.seqNum}] ${record.body}`);
}
batch = await stream.read(
start=SeqNum(0),
limit=ReadLimit(count=100),
)
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
batch, _ := stream.Read(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Count: s2.Uint64(100),
})
for _, record := range batch.Records {
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
let batch = stream
.read(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
)
.await?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
Single-batch reads return at most one batch: up to 1000 records or 1 MiB of data. For larger reads or to follow the stream in real-time, use a read session.
Read session
A read session streams records from a starting position. It handles reconnection on transient failures and provides a simple iterator interface. Sessions that do not specify a stop condition will continue to follow updates in real-time.
Starting from a sequence number
TypeScript
Python
Go
Rust
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
async for batch in stream.read_session(start=SeqNum(0)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
readSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
})
defer readSession.Close()
for readSession.Next() {
record := readSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
if err := readSession.Err(); err != nil {
log.Fatal(err)
}
let mut session = stream
.read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a tail offset
Read the last N records in the stream, then follow for new ones:
TypeScript
Python
Go
Rust
// Start reading from 10 records before the current tail
const session = await stream.readSession({
start: { from: { tailOffset: 10 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Start reading from 10 records before the current tail
async for batch in stream.read_session(start=TailOffset(10)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Start reading from 10 records before the current tail
tailOffsetSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
TailOffset: s2.Int64(10),
})
defer tailOffsetSession.Close()
for tailOffsetSession.Next() {
record := tailOffsetSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from 10 records before the current tail
let mut session = stream
.read_session(
ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a timestamp
Read records starting from a point in time:
TypeScript
Python
Go
Rust
// Start reading from a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { timestamp: oneHourAgo } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Start reading from a specific timestamp
one_hour_ago_ms = int((time.time() - 3600) * 1000)
async for batch in stream.read_session(start=Timestamp(one_hour_ago_ms)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Start reading from a specific timestamp
oneHourAgo := uint64(time.Now().Add(-time.Hour).UnixMilli())
timestampSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
Timestamp: &oneHourAgo,
})
defer timestampSession.Close()
for timestampSession.Next() {
record := timestampSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Reading until a timestamp
Read records up to a point in time, then stop:
TypeScript
Python
Go
Rust
// Read records until a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { untilTimestamp: oneHourAgo },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Read records until a specific timestamp
one_hour_ago_ms = int((time.time() - 3600) * 1000)
async for batch in stream.read_session(
start=SeqNum(0),
until_timestamp=one_hour_ago_ms,
):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Read records until a specific timestamp
oneHourAgo = uint64(time.Now().Add(-time.Hour).UnixMilli())
untilSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Until: &oneHourAgo,
})
defer untilSession.Close()
for untilSession.Next() {
record := untilSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read records until a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_until(..one_hour_ago)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Following live updates
By default, a read session without stop conditions will follow the stream indefinitely, waiting for new records as they arrive.
When you provide a stop condition (count, bytes, or until), the read stops when either the condition is met or it reaches the current tail — whichever comes first.
See the API docs on waiting at the tail for the full semantics.
Long polling
The wait parameter controls how long to wait for new records when caught up to the tail. This works for both single-batch reads (long polling) as well as sessions.
TypeScript
Python
Go
Rust
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { waitSecs: 30 },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Read all available records, then wait up to 30 seconds for new ones
async for batch in stream.read_session(
start=SeqNum(0),
wait=30,
):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
waitSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Wait: s2.Int32(30),
})
defer waitSession.Close()
for waitSession.Next() {
record := waitSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read all available records, and once reaching the current tail, wait an additional 30 seconds
// for new ones
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_wait(30)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Check tail
To find the current end of the stream without reading any records:
TypeScript
Python
Go
Rust
const { tail } = await stream.checkTail();
console.log(`Stream has ${tail.seqNum} records`);
tail = await stream.check_tail()
print(f"Stream has {tail.seq_num} records")
tail, _ := stream.CheckTail(ctx)
fmt.Printf("Stream has %d records\n", tail.Tail.SeqNum)
let tail = stream.check_tail().await?;
println!("Stream has {} records", tail.seq_num);