Log Tables
Log tables are append-only tables without primary keys, suitable for event streaming.
Creating a Log Table
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("event_id", DataTypes::int())
.column("event_type", DataTypes::string())
.column("timestamp", DataTypes::bigint())
.build()?,
)
.build()?;
let table_path = TablePath::new("fluss", "events");
admin.create_table(&table_path, &table_descriptor, true).await?;
Writing to Log Tables
use fluss::row::{GenericRow, InternalRow};
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer()?;
let mut row = GenericRow::new(3);
row.set_field(0, 1); // event_id
row.set_field(1, "user_login"); // event_type
row.set_field(2, 1704067200000i64); // timestamp
append_writer.append(&row)?;
append_writer.flush().await?;
Write operations use a fire-and-forget pattern for efficient batching. Each call queues the write and returns a WriteResultFuture immediately. Call flush() to ensure all queued writes are sent to the server.
For per-record acknowledgment:
append_writer.append(&row)?.await?;
Reading from Log Tables
use std::time::Duration;
let table = conn.get_table(&table_path).await?;
let log_scanner = table.new_scan().create_log_scanner()?;
// Subscribe to bucket 0 starting from offset 0
log_scanner.subscribe(0, 0).await?;
// Poll for records
let records = log_scanner.poll(Duration::from_secs(10)).await?;
// Per-bucket access
for (bucket, bucket_records) in records.records_by_buckets() {
println!("Bucket {}: {} records", bucket.bucket_id(), bucket_records.len());
for record in bucket_records {
let row = record.row();
println!(
" event_id={}, event_type={} @ offset={}",
row.get_int(0)?,
row.get_string(1)?,
record.offset()
);
}
}
// Or flat iteration (consumes ScanRecords)
for record in records {
let row = record.row();
println!(
"event_id={}, event_type={}, timestamp={} @ offset={}",
row.get_int(0)?,
row.get_string(1)?,
row.get_long(2)?,
record.offset()
);
}
Subscribe from special offsets:
use fluss::client::EARLIEST_OFFSET;
log_scanner.subscribe(0, EARLIEST_OFFSET).await?; // from earliest
log_scanner.subscribe(0, 42).await?; // from specific offset
Subscribe from latest offset (only new records):
To start reading only new records, first resolve the current latest offset via list_offsets, then subscribe at that offset:
use fluss::rpc::message::OffsetSpec;
let admin = conn.get_admin().await?;
let offsets = admin.list_offsets(&table_path, &[0], OffsetSpec::Latest).await?;
let latest = offsets[&0];
log_scanner.subscribe(0, latest).await?;
Subscribe to all buckets:
let num_buckets = table.get_table_info().get_num_buckets();
for bucket_id in 0..num_buckets {
log_scanner.subscribe(bucket_id, 0).await?;
}
Subscribe to multiple buckets at once:
use std::collections::HashMap;
let mut bucket_offsets = HashMap::new();
bucket_offsets.insert(0, 0i64);
bucket_offsets.insert(1, 100i64);
log_scanner.subscribe_buckets(&bucket_offsets).await?;
Unsubscribe from a bucket:
// Non-partitioned tables
log_scanner.unsubscribe(bucket_id).await?;
// Partitioned tables
log_scanner.unsubscribe_partition(partition_id, bucket_id).await?;
Column Projection
// Project by column index
let scanner = table.new_scan().project(&[0, 2])?.create_log_scanner()?;
// Project by column name
let scanner = table.new_scan()
.project_by_name(&["event_id", "timestamp"])?
.create_log_scanner()?;