Skip to main content

Admin Operations

Get Admin Interface

fluss::Admin admin;
conn.GetAdmin(admin);

Database Operations

// Create database
fluss::DatabaseDescriptor db_descriptor;
db_descriptor.comment = "My database";
admin.CreateDatabase("my_database", db_descriptor, true);

// List all databases
std::vector<std::string> databases;
admin.ListDatabases(databases);
for (const auto& db : databases) {
std::cout << "Database: " << db << std::endl;
}

// Check if database exists
bool exists = false;
admin.DatabaseExists("my_database", exists);

// Get database information
fluss::DatabaseInfo db_info;
admin.GetDatabaseInfo("my_database", db_info);
std::cout << "Database: " << db_info.database_name << std::endl;

// Drop database
admin.DropDatabase("my_database", true, false);

Table Operations

fluss::TablePath table_path("fluss", "my_table");

auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("score", fluss::DataType::Float())
.AddColumn("age", fluss::DataType::Int())
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(3)
.SetComment("Example table")
.Build();

// Create table
admin.CreateTable(table_path, descriptor, true);

// Get table information
fluss::TableInfo table_info;
admin.GetTableInfo(table_path, table_info);
std::cout << "Table ID: " << table_info.table_id << std::endl;
std::cout << "Number of buckets: " << table_info.num_buckets << std::endl;
std::cout << "Has primary key: " << table_info.has_primary_key << std::endl;
std::cout << "Is partitioned: " << table_info.is_partitioned << std::endl;

// Drop table
admin.DropTable(table_path, true);

Schema Builder Options

// Schema with primary key
auto pk_schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("value", fluss::DataType::Double())
.SetPrimaryKeys({"id"})
.Build();

// Table descriptor with partitioning
auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetPartitionKeys({"date"})
.SetBucketCount(3)
.SetBucketKeys({"user_id"})
.SetProperty("retention_days", "7")
.SetComment("Sample table")
.Build();

Partition Operations

// Create a partition
std::unordered_map<std::string, std::string> partition_spec = {{"region", "US"}};
admin.CreatePartition(table_path, partition_spec, true);

// List all partitions
std::vector<fluss::PartitionInfo> partitions;
admin.ListPartitionInfos(table_path, partitions);
for (const auto& p : partitions) {
std::cout << "Partition: id=" << p.partition_id
<< ", name=" << p.partition_name << std::endl;
}

// Drop a partition
admin.DropPartition(table_path, partition_spec, true);

Offset Operations

std::vector<int32_t> bucket_ids = {0, 1, 2};

// Query earliest offsets
std::unordered_map<int32_t, int64_t> earliest_offsets;
admin.ListOffsets(table_path, bucket_ids,
fluss::OffsetSpec::Earliest(), earliest_offsets);

// Query latest offsets
std::unordered_map<int32_t, int64_t> latest_offsets;
admin.ListOffsets(table_path, bucket_ids,
fluss::OffsetSpec::Latest(), latest_offsets);

// Query offsets for a specific timestamp
std::unordered_map<int32_t, int64_t> timestamp_offsets;
admin.ListOffsets(table_path, bucket_ids,
fluss::OffsetSpec::Timestamp(timestamp_ms),
timestamp_offsets);

// Query partition offsets
std::unordered_map<int32_t, int64_t> partition_offsets;
admin.ListPartitionOffsets(table_path, "partition_name",
bucket_ids, fluss::OffsetSpec::Latest(),
partition_offsets);

Lake Snapshot

note

Lake snapshots require lake integration (e.g. Paimon or Iceberg) to be enabled on the server. Without it, GetLatestLakeSnapshot will return an error.

fluss::LakeSnapshot snapshot;
admin.GetLatestLakeSnapshot(table_path, snapshot);
std::cout << "Snapshot ID: " << snapshot.snapshot_id << std::endl;
for (const auto& bucket_offset : snapshot.bucket_offsets) {
std::cout << " Table " << bucket_offset.table_id
<< ", Bucket " << bucket_offset.bucket_id
<< ": offset=" << bucket_offset.offset << std::endl;
}