Log Tables
Log tables are append-only tables without primary keys, suitable for event streaming.
Creating a Log Table
auto schema = fluss::Schema::NewBuilder()
.AddColumn("event_id", fluss::DataType::Int())
.AddColumn("event_type", fluss::DataType::String())
.AddColumn("timestamp", fluss::DataType::BigInt())
.Build();
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.Build();
fluss::TablePath table_path("fluss", "events");
admin.CreateTable(table_path, descriptor, true);
Writing to Log Tables
fluss::Table table;
conn.GetTable(table_path, table);
fluss::AppendWriter writer;
table.NewAppend().CreateWriter(writer);
fluss::GenericRow row;
row.SetInt32(0, 1); // event_id
row.SetString(1, "user_login"); // event_type
row.SetInt64(2, 1704067200000L); // timestamp
writer.Append(row);
writer.Flush();
Reading from Log Tables
fluss::LogScanner scanner;
table.NewScan().CreateLogScanner(scanner);
auto info = table.GetTableInfo();
for (int b = 0; b < info.num_buckets; ++b) {
scanner.Subscribe(b, 0);
}
fluss::ScanRecords records;
scanner.Poll(5000, records); // timeout in ms
for (const auto& rec : records) {
std::cout << "event_id=" << rec.row.GetInt32(0)
<< " event_type=" << rec.row.GetString(1)
<< " timestamp=" << rec.row.GetInt64(2)
<< " @ offset=" << rec.offset << std::endl;
}
// Or per-bucket access
for (const auto& bucket : records.Buckets()) {
auto view = records.Records(bucket);
std::cout << "Bucket " << bucket.bucket_id << ": "
<< view.Size() << " records" << std::endl;
for (const auto& rec : view) {
std::cout << " event_id=" << rec.row.GetInt32(0)
<< " event_type=" << rec.row.GetString(1)
<< " @ offset=" << rec.offset << std::endl;
}
}
Continuous polling:
while (running) {
fluss::ScanRecords records;
scanner.Poll(1000, records);
for (const auto& rec : records) {
process(rec);
}
}
Accumulating records across polls:
ScanRecord is a value type — it can be freely copied, stored, and accumulated. The underlying data stays alive via reference counting (zero-copy).
std::vector<fluss::ScanRecord> all_records;
while (all_records.size() < 1000) {
fluss::ScanRecords records;
scanner.Poll(1000, records);
for (const auto& rec : records) {
all_records.push_back(rec); // ref-counted, no data copy
}
}
// all_records is valid — each record keeps its data alive
Batch subscribe:
std::vector<fluss::BucketSubscription> subscriptions;
subscriptions.push_back({0, 0}); // bucket 0, offset 0
subscriptions.push_back({1, 100}); // bucket 1, offset 100
scanner.Subscribe(subscriptions);
Unsubscribe from a bucket:
// Stop receiving records from bucket 1
scanner.Unsubscribe(1);
Arrow RecordBatch polling (high performance):
#include <arrow/record_batch.h>
fluss::LogScanner arrow_scanner;
table.NewScan().CreateRecordBatchLogScanner(arrow_scanner);
for (int b = 0; b < info.num_buckets; ++b) {
arrow_scanner.Subscribe(b, 0);
}
fluss::ArrowRecordBatches batches;
arrow_scanner.PollRecordBatch(5000, batches);
for (size_t i = 0; i < batches.Size(); ++i) {
const auto& batch = batches[i];
if (batch->Available()) {
auto arrow_batch = batch->GetArrowRecordBatch();
std::cout << "Batch " << i << ": " << arrow_batch->num_rows() << " rows"
<< ", partition_id=" << batch->GetPartitionId()
<< ", bucket_id=" << batch->GetBucketId() << std::endl;
}
}
Column Projection
// Project by column index
fluss::LogScanner projected_scanner;
table.NewScan().ProjectByIndex({0, 2}).CreateLogScanner(projected_scanner);
// Project by column name
fluss::LogScanner name_projected_scanner;
table.NewScan().ProjectByName({"event_id", "timestamp"}).CreateLogScanner(name_projected_scanner);
// Arrow RecordBatch with projection
fluss::LogScanner projected_arrow_scanner;
table.NewScan().ProjectByIndex({0, 2}).CreateRecordBatchLogScanner(projected_arrow_scanner);