Skip to main content

Primary Key Tables

Primary key tables (KV tables) support upsert, delete, and lookup operations.

Creating a Primary Key Table

use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};

let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.column("age", DataTypes::bigint())
.primary_key(vec!["id"])
.build()?,
)
.build()?;

let table_path = TablePath::new("fluss", "users");
admin.create_table(&table_path, &table_descriptor, true).await?;

Upserting Records

use fluss::row::{GenericRow, InternalRow};

let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
let upsert_writer = table_upsert.create_writer()?;

for (id, name, age) in [(1, "Alice", 25i64), (2, "Bob", 30), (3, "Charlie", 35)] {
let mut row = GenericRow::new(3);
row.set_field(0, id);
row.set_field(1, name);
row.set_field(2, age);
upsert_writer.upsert(&row)?;
}
upsert_writer.flush().await?;

Updating Records

Upsert with the same primary key to update an existing record.

let mut row = GenericRow::new(3);
row.set_field(0, 1); // id (primary key)
row.set_field(1, "Alice");
row.set_field(2, 26i64); // updated age

upsert_writer.upsert(&row)?;
upsert_writer.flush().await?;

Deleting Records

// Only primary key field needs to be set
let mut row = GenericRow::new(3);
row.set_field(0, 2); // id of record to delete

upsert_writer.delete(&row)?;
upsert_writer.flush().await?;

Partial Updates

Update only specific columns while preserving others.

// By column indices
let partial_upsert = table_upsert.partial_update(Some(vec![0, 2]))?;
let partial_writer = partial_upsert.create_writer()?;

let mut row = GenericRow::new(3);
row.set_field(0, 1); // id (primary key, required)
row.set_field(2, 27i64); // age (will be updated)
// name will remain unchanged

partial_writer.upsert(&row)?;
partial_writer.flush().await?;

// By column names
let partial_upsert = table_upsert.partial_update_with_column_names(&["id", "age"])?;
let partial_writer = partial_upsert.create_writer()?;

Looking Up Records

let mut lookuper = table.new_lookup()?.create_lookuper()?;

let mut key = GenericRow::new(1);
key.set_field(0, 1); // id to lookup

let result = lookuper.lookup(&key).await?;

if let Some(row) = result.get_single_row()? {
println!(
"Found: id={}, name={}, age={}",
row.get_int(0)?,
row.get_string(1)?,
row.get_long(2)?
);
} else {
println!("Record not found");
}

Looking Up Records as Arrow RecordBatch

Use to_record_batch() to get lookup results in Arrow format, for example when integrating with DataFusion.

let result = lookuper.lookup(&key).await?;
let batch = result.to_record_batch()?;
println!("Rows: {}", batch.num_rows());