Skip to main content

Log Tables

Log tables are append-only tables without primary keys, suitable for event streaming.

Creating a Log Table

auto schema = fluss::Schema::NewBuilder()
.AddColumn("event_id", fluss::DataType::Int())
.AddColumn("event_type", fluss::DataType::String())
.AddColumn("timestamp", fluss::DataType::BigInt())
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.Build();

fluss::TablePath table_path("fluss", "events");
admin.CreateTable(table_path, descriptor, true);

Writing to Log Tables

fluss::Table table;
conn.GetTable(table_path, table);

fluss::AppendWriter writer;
table.NewAppend().CreateWriter(writer);

fluss::GenericRow row;
row.SetInt32(0, 1); // event_id
row.SetString(1, "user_login"); // event_type
row.SetInt64(2, 1704067200000L); // timestamp
writer.Append(row);

writer.Flush();

Reading from Log Tables

fluss::LogScanner scanner;
table.NewScan().CreateLogScanner(scanner);

auto info = table.GetTableInfo();
for (int b = 0; b < info.num_buckets; ++b) {
scanner.Subscribe(b, 0);
}

fluss::ScanRecords records;
scanner.Poll(5000, records); // timeout in ms

for (const auto& rec : records) {
std::cout << "event_id=" << rec.row.GetInt32(0)
<< " event_type=" << rec.row.GetString(1)
<< " timestamp=" << rec.row.GetInt64(2)
<< " @ offset=" << rec.offset << std::endl;
}

// Or per-bucket access
for (const auto& bucket : records.Buckets()) {
auto view = records.Records(bucket);
std::cout << "Bucket " << bucket.bucket_id << ": "
<< view.Size() << " records" << std::endl;
for (const auto& rec : view) {
std::cout << " event_id=" << rec.row.GetInt32(0)
<< " event_type=" << rec.row.GetString(1)
<< " @ offset=" << rec.offset << std::endl;
}
}

Continuous polling:

while (running) {
fluss::ScanRecords records;
scanner.Poll(1000, records);
for (const auto& rec : records) {
process(rec);
}
}

Accumulating records across polls:

ScanRecord is a value type — it can be freely copied, stored, and accumulated. The underlying data stays alive via reference counting (zero-copy).

std::vector<fluss::ScanRecord> all_records;
while (all_records.size() < 1000) {
fluss::ScanRecords records;
scanner.Poll(1000, records);
for (const auto& rec : records) {
all_records.push_back(rec); // ref-counted, no data copy
}
}
// all_records is valid — each record keeps its data alive

Batch subscribe:

std::vector<fluss::BucketSubscription> subscriptions;
subscriptions.push_back({0, 0}); // bucket 0, offset 0
subscriptions.push_back({1, 100}); // bucket 1, offset 100
scanner.Subscribe(subscriptions);

Unsubscribe from a bucket:

// Stop receiving records from bucket 1
scanner.Unsubscribe(1);

Arrow RecordBatch polling (high performance):

#include <arrow/record_batch.h>

fluss::LogScanner arrow_scanner;
table.NewScan().CreateRecordBatchLogScanner(arrow_scanner);

for (int b = 0; b < info.num_buckets; ++b) {
arrow_scanner.Subscribe(b, 0);
}

fluss::ArrowRecordBatches batches;
arrow_scanner.PollRecordBatch(5000, batches);

for (size_t i = 0; i < batches.Size(); ++i) {
const auto& batch = batches[i];
if (batch->Available()) {
auto arrow_batch = batch->GetArrowRecordBatch();
std::cout << "Batch " << i << ": " << arrow_batch->num_rows() << " rows"
<< ", partition_id=" << batch->GetPartitionId()
<< ", bucket_id=" << batch->GetBucketId() << std::endl;
}
}

Column Projection

// Project by column index
fluss::LogScanner projected_scanner;
table.NewScan().ProjectByIndex({0, 2}).CreateLogScanner(projected_scanner);

// Project by column name
fluss::LogScanner name_projected_scanner;
table.NewScan().ProjectByName({"event_id", "timestamp"}).CreateLogScanner(name_projected_scanner);

// Arrow RecordBatch with projection
fluss::LogScanner projected_arrow_scanner;
table.NewScan().ProjectByIndex({0, 2}).CreateRecordBatchLogScanner(projected_arrow_scanner);