Primary Key Tables
Primary key tables support upsert, delete, and point lookup operations.
Creating a Primary Key Table
Pass primary_keys to fluss.Schema:
import pyarrow as pa
schema = fluss.Schema(
pa.schema([
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("age", pa.int64()),
]),
primary_keys=["id"],
)
table_path = fluss.TablePath("fluss", "users")
await admin.create_table(table_path, fluss.TableDescriptor(schema, bucket_count=3), ignore_if_exists=True)
Upsert, Delete, Lookup
table = await conn.get_table(table_path)
# Upsert (fire-and-forget, flush at the end)
writer = table.new_upsert().create_writer()
writer.upsert({"id": 1, "name": "Alice", "age": 25})
writer.upsert({"id": 2, "name": "Bob", "age": 30})
await writer.flush()
# Per-record acknowledgment (for read-after-write)
handle = writer.upsert({"id": 3, "name": "Charlie", "age": 35})
await handle.wait()
# Delete by primary key
handle = writer.delete({"id": 2})
await handle.wait()
# Lookup
lookuper = table.new_lookup().create_lookuper()
result = await lookuper.lookup({"id": 1})
if result:
print(f"Found: name={result['name']}, age={result['age']}")
Partial Updates
Update specific columns while preserving others:
partial_writer = table.new_upsert().partial_update_by_name(["id", "age"]).create_writer()
partial_writer.upsert({"id": 1, "age": 27}) # only updates age
await partial_writer.flush()