Primary Key Tables
Primary key tables (KV tables) support upsert, delete, and lookup operations.
Creating a Primary Key Table
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
.primary_key(vec!["id"])
.build()?,
)
.build()?;
let table_path = TablePath::new("fluss", "users");
admin.create_table(&table_path, &table_descriptor, true).await?;
Upserting Records
use fluss::row::{GenericRow, InternalRow};
let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
let upsert_writer = table_upsert.create_writer()?;
for (id, name, age) in [(1, "Alice", 25i64), (2, "Bob", 30), (3, "Charlie", 35)] {
let mut row = GenericRow::new(3);
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
upsert_writer.upsert(&row)?;
}
upsert_writer.flush().await?;
Updating Records
Upsert with the same primary key to update an existing record.
let mut row = GenericRow::new(3);
row.set_field(0, 1); // id (primary key)
row.set_field(1, "Alice");
row.set_field(2, 26i64); // updated age
upsert_writer.upsert(&row)?;
upsert_writer.flush().await?;
Deleting Records
// Only primary key field needs to be set
let mut row = GenericRow::new(3);
row.set_field(0, 2); // id of record to delete
upsert_writer.delete(&row)?;
upsert_writer.flush().await?;
Partial Updates
Update only specific columns while preserving others.
// By column indices
let partial_upsert = table_upsert.partial_update(Some(vec![0, 2]))?;
let partial_writer = partial_upsert.create_writer()?;
let mut row = GenericRow::new(3);
row.set_field(0, 1); // id (primary key, required)
row.set_field(2, 27i64); // age (will be updated)
// name will remain unchanged
partial_writer.upsert(&row)?;
partial_writer.flush().await?;
// By column names
let partial_upsert = table_upsert.partial_update_with_column_names(&["id", "age"])?;
let partial_writer = partial_upsert.create_writer()?;
Looking Up Records
let mut lookuper = table.new_lookup()?.create_lookuper()?;
let mut key = GenericRow::new(1);
key.set_field(0, 1); // id to lookup
let result = lookuper.lookup(&key).await?;
if let Some(row) = result.get_single_row()? {
println!(
"Found: id={}, name={}, age={}",
row.get_int(0)?,
row.get_string(1)?,
row.get_long(2)?
);
} else {
println!("Record not found");
}
Looking Up Records as Arrow RecordBatch
Use to_record_batch() to get lookup results in Arrow format, for example when integrating with DataFusion.
let result = lookuper.lookup(&key).await?;
let batch = result.to_record_batch()?;
println!("Rows: {}", batch.num_rows());