Partitioned Tables
Partitioned tables distribute data across partitions based on column values. Partitions must exist before writing data, otherwise the client will by default retry indefinitely.
Creating and Managing Partitions
import pyarrow as pa
schema = fluss.Schema(pa.schema([
pa.field("id", pa.int32()),
pa.field("region", pa.string()),
pa.field("value", pa.int64()),
]))
table_path = fluss.TablePath("fluss", "partitioned_events")
await admin.create_table(
table_path,
fluss.TableDescriptor(schema, partition_keys=["region"], bucket_count=1),
ignore_if_exists=True,
)
# Create partitions
await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True)
await admin.create_partition(table_path, {"region": "EU"}, ignore_if_exists=True)
# List partitions
partition_infos = await admin.list_partition_infos(table_path)
Writing
Same as non-partitioned tables - include partition column values in each row. Partitions must exist before writing data, otherwise the client will by default retry indefinitely.
table = await conn.get_table(table_path)
writer = table.new_append().create_writer()
writer.append({"id": 1, "region": "US", "value": 100})
writer.append({"id": 2, "region": "EU", "value": 200})
await writer.flush()
Reading
Use subscribe_partition() or subscribe_partition_buckets() instead of subscribe():
scanner = await table.new_scan().create_record_batch_log_scanner()
# Subscribe to individual partitions
for p in partition_infos:
scanner.subscribe_partition(partition_id=p.partition_id, bucket_id=0, start_offset=fluss.EARLIEST_OFFSET)
# Or batch-subscribe
scanner.subscribe_partition_buckets({
(p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos
})
print(scanner.to_pandas())
Unsubscribing
To stop consuming from a specific partition bucket, use unsubscribe_partition():
scanner.unsubscribe_partition(partition_id=partition_infos[0].partition_id, bucket_id=0)
Partitioned Primary Key Tables
Partition columns must be part of the primary key. Partitions must exist before upserting data, otherwise the client will by default retry indefinitely.
schema = fluss.Schema(
pa.schema([
pa.field("user_id", pa.int32()),
pa.field("region", pa.string()),
pa.field("score", pa.int64()),
]),
primary_keys=["user_id", "region"],
)
table_path = fluss.TablePath("fluss", "partitioned_users")
await admin.create_table(
table_path,
fluss.TableDescriptor(schema, partition_keys=["region"]),
ignore_if_exists=True,
)
await admin.create_partition(table_path, {"region": "US"}, ignore_if_exists=True)
table = await conn.get_table(table_path)
writer = table.new_upsert().create_writer()
writer.upsert({"user_id": 1, "region": "US", "score": 1234})
await writer.flush()
# Lookup includes partition columns
lookuper = table.new_lookup().create_lookuper()
result = await lookuper.lookup({"user_id": 1, "region": "US"})