Skip to main content

Example

Minimal working examples: connect to Fluss, create a table, write data, and read it back.

use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
// Connect
let mut config = Config::default();
config.bootstrap_servers = "127.0.0.1:9123".to_string();
let conn = FlussConnection::new(config).await?;
let admin = conn.get_admin().await?;

// Create a log table
let table_path = TablePath::new("fluss", "quickstart_rust");
let descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.build()?,
)
.build()?;
admin.create_table(&table_path, &descriptor, true).await?;

// Write
let table = conn.get_table(&table_path).await?;
let writer = table.new_append()?.create_writer()?;
let mut row = GenericRow::new(2);
row.set_field(0, 1);
row.set_field(1, "hello");
writer.append(&row)?;
writer.flush().await?;

// Read
let scanner = table.new_scan().create_log_scanner()?;
scanner.subscribe(0, 0).await?;
let records = scanner.poll(Duration::from_secs(5)).await?;
for record in records {
let row = record.row();
println!("id={}, name={}", row.get_int(0)?, row.get_string(1)?);
}

Ok(())
}