Skip to content

[rust] Add Fluss 1.x protocol support to the admin client#631

Merged
fresh-borzoni merged 6 commits into
apache:mainfrom
gnuhpc:pr/4-admin-methods
Jun 26, 2026
Merged

[rust] Add Fluss 1.x protocol support to the admin client#631
fresh-borzoni merged 6 commits into
apache:mainfrom
gnuhpc:pr/4-admin-methods

Conversation

@gnuhpc

@gnuhpc gnuhpc commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

What

Adds 27 new admin methods to FlussAdmin, consuming the message wrappers from #629/#630:

  • Database/table extensions: list_database_summaries, alter_database, alter_table, get_table_stats
  • KV snapshot operations: get_latest_kv_snapshots, get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot
  • ACL management: create_acls, list_acls, drop_acls
  • Cluster configuration: describe_cluster_configs, alter_cluster_configs
  • Server management: add_server_tag, remove_server_tag, rebalance, list_rebalance_progress, cancel_rebalance
  • Producer offsets: register_producer_offsets, get_producer_offsets, delete_producer_offsets
  • Monitoring: get_cluster_health, list_remote_log_manifests
  • KV snapshots: list_kv_snapshots, release_kv_snapshot_lease, drop_kv_snapshot_lease

(The proto-compat fix for scanner.rs / admin.rs lives in #628 so that PR leaves the tree building.)

Stack

Part 4/6, stacked on #630#629#628. All target main.

🤖 Generated with Claude Code

@gnuhpc

gnuhpc commented Jun 19, 2026

Copy link
Copy Markdown
Contributor Author

Rebased onto updated #630 and applied the same proto::*/Default cleanup the maintainer requested on #629 (#629 (review)) to every signature this PR adds. All 21 admin methods that previously leaked proto::Pb* in args or returns now take/return domain types. use crate::proto; is gone from admin.rs; only the pre-existing use crate::proto::GetTableInfoResponse; (from main) remains.

Response-side domain types added under crates/fluss/src/metadata/:

  • cluster_health.rsClusterHealth
  • kv_snapshot.rsKvSnapshot, LatestKvSnapshots, RemotePathAndLocalFile, KvSnapshotMetadata, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots
  • lake_snapshot.rsLakeBucketSnapshot, LakeSnapshotInfo (distinct from the existing LakeSnapshot used by get_latest_lake_snapshot)
  • rebalance.rsRebalanceProgress, TableRebalanceProgress, BucketRebalanceProgress, BucketRebalancePlan
  • remote_log.rsRemoteLogManifestEntry
  • Extended config.rs (DescribeConfig), database.rs (DatabaseSummary), table_stats.rs (BucketStats/TableStats/BucketStatsError), acl.rs (CreateAclResult/DropAclsFilterResult/DropAclMatchingAcl), producer_offsets.rs (ProducerOffsets)

For trivially-shaped responses (single field), methods now return the primitive directly instead of wrapping — e.g. rebalance returns String (the rebalance id), register_producer_offsets returns Option<i32>, list_acls returns Vec<AclInfo>, list_remote_log_manifests returns Vec<RemoteLogManifestEntry>.

alter_table parameter fix (same drop-silently-features class as target_columns in #629): previously took only config_changes + add_columns and hardcoded vec![],vec![],vec![] for drop/rename/modify columns. Now takes an AlterTableChanges bundle struct (avoids clippy::too_many_arguments) exposing all 5 change types, plus ignore_if_not_exists is now a parameter instead of hardcoded false.

#[derive(Default)] not present anywhere in this PR (admin.rs has none); the wrappers consumed are already cleaned up on #629/#630.

cargo build + clippy (-D warnings) + 556 unit tests pass locally. Pushed 29a4d0b.

@gnuhpc gnuhpc force-pushed the pr/4-admin-methods branch from 29a4d0b to 9fbe919 Compare June 20, 2026 17:12
@gnuhpc

gnuhpc commented Jun 20, 2026

Copy link
Copy Markdown
Contributor Author

Rebased on top of updated #630. Same pub(crate) visibility pattern applied. All admin signatures use domain types (only the pre-existing use crate::proto::GetTableInfoResponse; import remains). alter_table now exposes drop/rename/modify column changes via an AlterTableChanges bundle struct.

Verified against real Java Fluss 0.9.1-incubating (68/68 integration tests pass) and Fluss 1.0-SNAPSHOT-dev (73/73 with fluss_v1 feature).

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gnuhpc thank you for the PR, left some comments, PTAL

Comment thread crates/fluss/src/client/admin.rs Outdated
let response = self
.admin_gateway()
.await?
.request(GetLakeSnapshotRequest::new(table_path, snapshot_id, None))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we hardcode readable to None?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Exposed as a parameter in get_lake_snapshot(table_path, snapshot_id, readable).

pub in_sync_replicas: i32,
pub num_leader_replicas: i32,
pub active_leader_replicas: i32,
pub status: i32,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added ClusterHealthStatus enum (Green/Yellow/Red/Unknown). from_pb now returns Result since the conversion can fail on unknown values.

Comment thread crates/fluss/src/metadata/rebalance.rs Outdated
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BucketRebalanceProgress {
pub rebalance_plan: BucketRebalancePlan,
pub rebalance_status: i32,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added RebalanceStatus enum (NotStarted/Rebalancing/Failed/Completed/Canceled/Timeout). from_pb returns Result with collect::<Result<Vec<_>>>()? for the nested bucket progress.

Comment thread crates/fluss/src/metadata/rebalance.rs Outdated
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RebalanceProgress {
pub rebalance_id: Option<String>,
pub rebalance_status: Option<i32>,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Same enum applied here as Option<RebalanceStatus>.

/// Per-bucket KV snapshot info.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KvSnapshot {
pub bucket_id: i32,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these use raw i64/i32 for table_id/partition_id/bucket_id, but the crate has TableId/PartitionId/BucketId aliases (lib.rs) that the rest of the code use. Let's stay consistent

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Applied TableId/PartitionId/BucketId across all metadata types: kv_snapshot, lake_snapshot, producer_offsets, rebalance, table_stats, and kv_snapshot_lease.

gnuhpc pushed a commit to gnuhpc/fluss-rust that referenced this pull request Jun 22, 2026
…dable

- Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs
- Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs
- Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in
  kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs
- Expose readable parameter in admin.get_lake_snapshot() instead of
  hardcoding None

Addresses reviewer feedback on PR apache#631.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gnuhpc gnuhpc force-pushed the pr/4-admin-methods branch from 9fbe919 to 5e7f29c Compare June 22, 2026 02:50
gnuhpc pushed a commit to gnuhpc/fluss-rust that referenced this pull request Jun 24, 2026
…dable

- Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs
- Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs
- Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in
  kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs
- Expose readable parameter in admin.get_lake_snapshot() instead of
  hardcoding None

Addresses reviewer feedback on PR apache#631.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gnuhpc gnuhpc force-pushed the pr/4-admin-methods branch from 5e7f29c to 3537c7d Compare June 24, 2026 01:55
@fresh-borzoni

Copy link
Copy Markdown
Member

@gnuhpc Thank you, can you, please, rebase/cherry-pick/resolve conflicts, so it's comfortable to continue review?

warmbupt and others added 5 commits June 25, 2026 07:31
Add 27 new admin methods to FlussAdmin:
- Database/table extensions: list_database_summaries, alter_database,
  alter_table, get_table_stats
- KV snapshot operations: get_latest_kv_snapshots,
  get_kv_snapshot_metadata, create_kv_snapshot_lease, get_lake_snapshot
- ACL management: create_acls, list_acls, drop_acls
- Cluster configuration: describe_cluster_configs, alter_cluster_configs
- Server management: add_server_tag, remove_server_tag, rebalance,
  list_rebalance_progress, cancel_rebalance
- Producer offsets: register_producer_offsets, get_producer_offsets,
  delete_producer_offsets
- Monitoring: get_cluster_health, list_remote_log_manifests
- KV snapshots: list_kv_snapshots, release_kv_snapshot_lease,
  drop_kv_snapshot_lease
…dable

- Add ClusterHealthStatus enum (Green/Yellow/Red/Unknown) to cluster_health.rs
- Add RebalanceStatus enum (NotStarted..Timeout) to rebalance.rs
- Replace raw i32/i64 with BucketId/TableId/PartitionId aliases in
  kv_snapshot.rs, lake_snapshot.rs, rebalance.rs, producer_offsets.rs
- Expose readable parameter in admin.get_lake_snapshot() instead of
  hardcoding None

Addresses reviewer feedback on PR apache#631.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…hot_lease)

Extends the BucketId/TableId/PartitionId alias consistency fix to
table_stats.rs (BucketStatsRequest, BucketStats) and
kv_snapshot_lease.rs (KvSnapshotLeaseForBucket, KvSnapshotLeaseForTable).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Now that pr/3 provides GoalType and ServerTag enums in the RPC
wrappers, update the admin client methods to use them in their
public signatures too.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Update the new admin method signatures introduced by this PR to use
the i64/i32 type aliases from lib.rs instead of raw primitives, matching
the underlying RPC message wrappers.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@gnuhpc gnuhpc force-pushed the pr/4-admin-methods branch from 3537c7d to 6c52a1f Compare June 24, 2026 23:34
@gnuhpc

gnuhpc commented Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Rebased onto main, conflicts resolved. Stack is clean now.

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gnuhpc Thank you for the changes, left some comments, PTAL

Comment thread crates/fluss/src/client/admin.rs Outdated
producer_id: &str,
table_offsets: Vec<ProducerTableOffsets>,
ttl_ms: Option<i64>,
) -> Result<Option<i32>> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's registerProducerOffsets returns a RegisterResult enum (fromCode, throws on unknown).
Could we port that as an enum + try_from_i32 like RebalanceStatus/ClusterHealthStatus do

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added RegisterProducerResult { Created, AlreadyExists } in metadata/, with to_i32/try_from_i32 matching the other enums. register_producer_offsets now returns Option<RegisterProducerResult> (Option kept because the proto field is optional).

Comment thread crates/fluss/src/client/admin.rs Outdated
pub async fn get_table_stats(
&self,
table_id: TableId,
buckets_req: Vec<crate::metadata::BucketStatsRequest>,

@fresh-borzoni fresh-borzoni Jun 25, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have FQ paths here and in some other parts(AlterConfig, PbDatabaseSummary) as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — BucketStatsRequest added to the import block here; also cleaned up the FQ for PbDatabaseSummary in metadata/database.rs and the self-referencing crate::metadata::AlterConfig in metadata/table_change.rs.

Comment thread crates/fluss/src/client/admin.rs Outdated
}

/// Create ACLs. Returns one result per submitted ACL (success or per-ACL error).
pub async fn create_acls(&self, acl: Vec<AclInfo>) -> Result<Vec<CreateAclResult>> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why acl, if we pass vector, should it be plural?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — renamed to acls.

Comment thread crates/fluss/src/client/admin.rs Outdated
}

/// Drop ACLs matching filters. Returns one result per submitted filter.
pub async fn drop_acls(&self, acl_filter: Vec<AclFilter>) -> Result<Vec<DropAclsFilterResult>> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: plural

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — renamed to acl_filters.

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gnuhpc Also this

Comment thread crates/fluss/src/client/admin.rs Outdated
producer_id: &str,
table_offsets: Vec<ProducerTableOffsets>,
ttl_ms: Option<i64>,
) -> Result<Option<i32>> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — same enum applies; see reply on the other thread.

}

/// Alter a database's configuration.
pub async fn alter_database(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we drop comment param?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — restored. Added comment: Option<&str> to both alter_database and the AlterDatabaseRequest wrapper, passing through to the proto field that was being hardcoded to None.

- Add `RegisterProducerResult` enum mirroring Java's `RegisterResult`
  (Created=0, AlreadyExists=1); `register_producer_offsets` now returns
  `Option<RegisterProducerResult>` instead of raw `Option<i32>`.
- Expose `comment: Option<&str>` on `alter_database` and the
  `AlterDatabaseRequest` wrapper (was hardcoded to `None`).
- Replace fully-qualified paths with imports: `BucketStatsRequest` in
  admin.rs, `PbDatabaseSummary` in metadata/database.rs, and
  `AlterConfig` self-reference in metadata/table_change.rs.
- Rename `create_acls(acl)` to `create_acls(acls)` and
  `drop_acls(acl_filter)` to `drop_acls(acl_filters)` for plural
  consistency with the `Vec` argument type.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@gnuhpc

gnuhpc commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

@fresh-borzoni All 5 round-2 items addressed in 828f9fe — new RegisterProducerResult enum, comment exposed on alter_database, FQ paths cleaned in 3 spots, plural ACL param names. pr/5/6 rebased to follow.

@fresh-borzoni fresh-borzoni left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gnuhpc Thank you for the changes, LGTM 👍

@fresh-borzoni fresh-borzoni merged commit 0dae316 into apache:main Jun 26, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants