Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
github.com/confluentinc/cmf-sdk-go v0.0.5
github.com/confluentinc/cmf-sdk-go v0.0.6
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
Expand All @@ -72,6 +72,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.5
github.com/gobuffalo/flect v1.0.2
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repo already depends on go.uber.org/mock (and many tests import go.uber.org/mock/gomock), but this PR adds github.com/golang/mock and regenerates several mocks against that package. Mixing these two gomock packages will break compilation because the Controller/Call types are not interchangeable. Please standardize on one gomock implementation (either switch all tests + remaining mocks to github.com/golang/mock/gomock, or revert these regenerated mocks and the new dependency back to go.uber.org/mock/gomock) and regenerate consistently.

Suggested change
github.com/golang/mock v1.6.0

Copilot uses AI. Check for mistakes.
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
Expand Down Expand Up @@ -184,7 +185,6 @@ require (
github.com/gogo/googleapis v1.4.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/cel-go v0.20.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE=
github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74=
github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI=
github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU=
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(c.newCatalogCommand())
cmd.AddCommand(c.newDetachedSavepointCommand())
cmd.AddCommand(c.newEnvironmentCommand())
cmd.AddCommand(c.newKubernetesClusterCommand())
cmd.AddCommand(c.newSavepointCommand())
Comment on lines 41 to 44
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description's release notes and checklist are still placeholders/unchecked, but this PR adds a new user-facing CLI command group (flink kubernetes-cluster). Please update the PR description to include real release notes entries and complete the checklist (or remove irrelevant sections) so reviewers can assess breaking changes, customer impact, and verification steps.

Copilot uses AI. Check for mistakes.

// On-Prem and Cloud Commands
Expand Down
19 changes: 18 additions & 1 deletion internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,26 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool)

if sdkComputePool.Status != nil {
localPool.Status = &LocalComputePoolStatus{
Phase: sdkComputePool.Status.Phase,
Phase: computePoolPhase(sdkComputePool.Status),
}
}

return localPool
}

// computePoolPhase extracts the phase string from ComputePool.Status.
// SDK v0.0.6 incorrectly generates Status as *map[string]map[string]interface{}
// due to additionalProperties: true in the spec; the real API returns a flat
// {"phase": "RUNNING"} object which cannot be decoded into that type.
// Until the SDK is fixed upstream, phase will be empty when calling the real API.
func computePoolPhase(status *map[string]map[string]interface{}) string {
if status == nil {
return ""
}
if phaseMap, ok := (*status)["phase"]; ok {
if val, ok := phaseMap["value"].(string); ok {
return val
}
}
return ""
}
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkOutputComputePool.GetStatus().Phase,
Phase: computePoolPhase(sdkOutputComputePool.Status),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_describe_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkComputePool.GetStatus().Phase,
Phase: computePoolPhase(sdkComputePool.Status),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_list_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error {
CreationTime: creationTime,
Name: pool.GetMetadata().Name,
Type: pool.GetSpec().Type,
Phase: pool.GetStatus().Phase,
Phase: computePoolPhase(pool.Status),
})
}
return list.Print()
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.KubernetesNamespace = &kubernetesNamespace
postEnvironment.StatementDefaults = &defaultsStatementParsed
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
Expand Down
61 changes: 61 additions & 0 deletions internal/flink/command_kubernetes_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package flink

import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type kubernetesClusterOutput struct {
Name string `human:"Name" serialized:"name"`
CreatedTime string `human:"Created Time" serialized:"created_time"`
UpdatedTime string `human:"Updated Time" serialized:"updated_time"`
LifecycleState string `human:"Lifecycle State,omitempty" serialized:"lifecycle_state,omitempty"`
ConnectionState string `human:"Connection State,omitempty" serialized:"connection_state,omitempty"`
KubernetesVersion string `human:"Kubernetes Version,omitempty" serialized:"kubernetes_version,omitempty"`
}

func (c *command) newKubernetesClusterCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "kubernetes-cluster",
Short: "Manage Kubernetes clusters registered with CMF.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

cmd.AddCommand(c.newKubernetesClusterListCommand())
cmd.AddCommand(c.newKubernetesClusterDescribeCommand())
cmd.AddCommand(c.newKubernetesClusterUpdateCommand())

return cmd
}

func convertSdkKubernetesClusterToLocal(cluster cmfsdk.KubernetesCluster) LocalKubernetesCluster {
local := LocalKubernetesCluster{
ApiVersion: cluster.ApiVersion,
Kind: cluster.Kind,
Metadata: LocalKubernetesClusterMetadata{
Name: cluster.Metadata.Name,
CreationTimestamp: cluster.Metadata.CreationTimestamp,
UpdateTimestamp: cluster.Metadata.UpdateTimestamp,
Uid: cluster.Metadata.Uid,
Labels: cluster.Metadata.Labels,
Annotations: cluster.Metadata.Annotations,
},
Spec: LocalKubernetesClusterSpec{
LifecycleState: cluster.Spec.LifecycleState,
},
}

if cluster.Status != nil {
local.Status = &LocalKubernetesClusterStatus{
State: cluster.Status.State,
Message: cluster.Status.Message,
LastHeartbeatTimestamp: cluster.Status.LastHeartbeatTimestamp,
KubernetesVersion: cluster.Status.KubernetesVersion,
}
}

return local
}
64 changes: 64 additions & 0 deletions internal/flink/command_kubernetes_cluster_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newKubernetesClusterDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Kubernetes cluster registered with CMF.",
Args: cobra.ExactArgs(1),
RunE: c.kubernetesClusterDescribe,
}

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) kubernetesClusterDescribe(cmd *cobra.Command, args []string) error {
client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

clusterName := args[0]
cluster, err := client.DescribeKubernetesCluster(c.createContext(), clusterName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
o := &kubernetesClusterOutput{
Name: cluster.Metadata.Name,
}
if cluster.Metadata.CreationTimestamp != nil {
o.CreatedTime = *cluster.Metadata.CreationTimestamp
}
if cluster.Metadata.UpdateTimestamp != nil {
o.UpdatedTime = *cluster.Metadata.UpdateTimestamp
}
if cluster.Spec.LifecycleState != nil {
o.LifecycleState = *cluster.Spec.LifecycleState
}
if cluster.Status != nil {
if cluster.Status.State != nil {
o.ConnectionState = *cluster.Status.State
}
if cluster.Status.KubernetesVersion != nil {
o.KubernetesVersion = *cluster.Status.KubernetesVersion
}
}
table.Add(o)
return table.Print()
}

localCluster := convertSdkKubernetesClusterToLocal(cluster)
return output.SerializedOutput(cmd, localCluster)
}
64 changes: 64 additions & 0 deletions internal/flink/command_kubernetes_cluster_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newKubernetesClusterListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Kubernetes clusters registered with CMF.",
Args: cobra.NoArgs,
RunE: c.kubernetesClusterList,
}

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) kubernetesClusterList(cmd *cobra.Command, _ []string) error {
client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

clusters, err := client.ListKubernetesClusters(c.createContext())
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
list.Filter([]string{"Name", "CreatedTime", "UpdatedTime", "LifecycleState", "ConnectionState"})
for _, cluster := range clusters {
o := &kubernetesClusterOutput{
Name: cluster.Metadata.Name,
}
if cluster.Metadata.CreationTimestamp != nil {
o.CreatedTime = *cluster.Metadata.CreationTimestamp
}
if cluster.Metadata.UpdateTimestamp != nil {
o.UpdatedTime = *cluster.Metadata.UpdateTimestamp
}
if cluster.Spec.LifecycleState != nil {
o.LifecycleState = *cluster.Spec.LifecycleState
}
if cluster.Status != nil && cluster.Status.State != nil {
o.ConnectionState = *cluster.Status.State
}
list.Add(o)
}
return list.Print()
}

localClusters := make([]LocalKubernetesCluster, 0, len(clusters))
for _, cluster := range clusters {
localClusters = append(localClusters, convertSdkKubernetesClusterToLocal(cluster))
}
return output.SerializedOutput(cmd, localClusters)
}
84 changes: 84 additions & 0 deletions internal/flink/command_kubernetes_cluster_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package flink

import (
"fmt"

"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newKubernetesClusterUpdateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "update <name>",
Short: "Update a Kubernetes cluster registered with CMF.",
Args: cobra.ExactArgs(1),
RunE: c.kubernetesClusterUpdate,
}

addCmfFlagSet(cmd)
cmd.Flags().String("lifecycle-state", "", "Lifecycle state for the Kubernetes cluster (ACTIVE or DECOMMISSIONED).")
cobra.CheckErr(cmd.MarkFlagRequired("lifecycle-state"))
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) kubernetesClusterUpdate(cmd *cobra.Command, args []string) error {
client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

clusterName := args[0]

lifecycleState, err := cmd.Flags().GetString("lifecycle-state")
if err != nil {
return fmt.Errorf("failed to read lifecycle-state: %v", err)
}

existingCluster, err := client.DescribeKubernetesCluster(c.createContext(), clusterName)
if err != nil {
return err
}

existingCluster.Spec = cmfsdk.KubernetesClusterSpec{}
existingCluster.Spec.SetLifecycleState(lifecycleState)

updatedCluster, err := client.UpdateKubernetesCluster(c.createContext(), clusterName, existingCluster)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
o := &kubernetesClusterOutput{
Name: updatedCluster.Metadata.Name,
}
if updatedCluster.Metadata.CreationTimestamp != nil {
o.CreatedTime = *updatedCluster.Metadata.CreationTimestamp
}
if updatedCluster.Metadata.UpdateTimestamp != nil {
o.UpdatedTime = *updatedCluster.Metadata.UpdateTimestamp
}
if updatedCluster.Spec.LifecycleState != nil {
o.LifecycleState = *updatedCluster.Spec.LifecycleState
}
if updatedCluster.Status != nil {
if updatedCluster.Status.State != nil {
o.ConnectionState = *updatedCluster.Status.State
}
if updatedCluster.Status.KubernetesVersion != nil {
o.KubernetesVersion = *updatedCluster.Status.KubernetesVersion
}
}
table.Add(o)
return table.Print()
}

localCluster := convertSdkKubernetesClusterToLocal(updatedCluster)
return output.SerializedOutput(cmd, localCluster)
}
Loading