Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
github.com/confluentinc/cmf-sdk-go v0.0.5
github.com/confluentinc/cmf-sdk-go v0.0.6
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE=
github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/cmf-sdk-go v0.0.6 h1:3BFyPOJb4xBAvBMU1hXSh9+2kn/U2zr4EKDoWf8QM74=
github.com/confluentinc/cmf-sdk-go v0.0.6/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0 h1:y9wh3z7FdqN3RJ9IHW12hzytJx4KjlpviPWn4ncA5u0=
github.com/confluentinc/confluent-kafka-go/v2 v2.13.0/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI=
github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU=
Expand Down
36 changes: 34 additions & 2 deletions internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package flink

import (
"encoding/json"

"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/output"
)

type computePoolOut struct {
Expand Down Expand Up @@ -60,6 +63,35 @@
return c.autocompleteComputePools(cmd, args)
}

// extractComputePoolPhase extracts the phase string from the generic status map.
// ComputePool.Status changed from *ComputePoolStatus to *map[string]map[string]interface{} in SDK v0.0.6.
// The status is a nested map where phase is at status["phase"]["value"].
func extractComputePoolPhase(pool cmfsdk.ComputePool) string {
if pool.Status == nil {
return ""
}
status := pool.GetStatus()
if phaseMap, ok := status["phase"]; ok {
if value, ok := phaseMap["value"].(string); ok {
return value
}
}
// Fallback: try re-parsing as a simpler type in case the API shape varies.
raw, err := json.Marshal(pool.Status)
if err != nil {
output.ErrPrintf(false, "Warning: failed to marshal compute pool status: %v\n", err)
return ""
}
var flat map[string]string
if err := json.Unmarshal(raw, &flat); err == nil {

Check warning on line 86 in internal/flink/command_compute_pool.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove this unnecessary variable declaration and use the expression directly in the condition.

[S8193] Variables in if short statements should be used beyond just the condition See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3320&issues=a07ade1f-1181-489f-bf05-faab9b384318&open=a07ade1f-1181-489f-bf05-faab9b384318
if phase, ok := flat["phase"]; ok {
return phase
}
}
output.ErrPrintf(false, "Warning: compute pool has status but phase could not be extracted\n")
Comment thread
paras-negi-flink marked this conversation as resolved.
Outdated
return ""
}
Comment on lines +63 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make sense to move this utility into the SDK?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we don't do human code push in cmf-go-sdk, but if we want to : Adds a custom UnmarshalJSON method on the ComputePool type (in a new file v1/model_compute_pool_custom.go ) seems to be the better fix, which completely removes "custom unmarshaling with json.RawMessage embedding" from cli repo

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we don't do human code push in cmf-go-sdk, but if we want to

My only worry about human pushes in the sdk repo is that the code generator is overwriting human written code / utils.
But when we started the go sdk repo, I actually had additional, non generated code for shared utilities in mind.


func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool) LocalComputePool {
localPool := LocalComputePool{
ApiVersion: sdkComputePool.ApiVersion,
Expand All @@ -77,9 +109,9 @@
},
}

if sdkComputePool.Status != nil {
if phase := extractComputePoolPhase(sdkComputePool); phase != "" {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always call this method, also when the CLI runs against older CMF versions? Probably yes? because the response has not changed, only the SDK.

localPool.Status = &LocalComputePoolStatus{
Phase: sdkComputePool.Status.Phase,
Phase: phase,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkOutputComputePool.GetStatus().Phase,
Phase: extractComputePoolPhase(sdkOutputComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_describe_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkComputePool.GetStatus().Phase,
Phase: extractComputePoolPhase(sdkComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_list_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error {
CreationTime: creationTime,
Name: pool.GetMetadata().Name,
Type: pool.GetSpec().Type,
Phase: pool.GetStatus().Phase,
Phase: extractComputePoolPhase(pool),
})
}
return list.Print()
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.KubernetesNamespace = &kubernetesNamespace
postEnvironment.StatementDefaults = &defaultsStatementParsed
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.SetName(environmentName)
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
Expand Down
14 changes: 10 additions & 4 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme
// CreateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before creation.
func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
if environmentName == "" {
return cmfsdk.Environment{}, fmt.Errorf("environment name is required")
}
_, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusOK {
Expand Down Expand Up @@ -281,10 +284,13 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk.
return environments, nil
}

// UpdateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before updation.
// UpdateEnvironment updates an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before updating.
func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
if environmentName == "" {
return cmfsdk.Environment{}, fmt.Errorf("environment name is required")
}
_, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
},
"spec": {
"type": "DEDICATED",
"clusterSpec": null
"clusterSpec": {}
},
"status": {
"phase": "RUNNING"
Expand Down
6 changes: 3 additions & 3 deletions test/fixtures/output/flink/compute-pool/list-json.golden
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
},
"spec": {
"type": "DEDICATED",
"clusterSpec": null
"clusterSpec": {}
},
"status": {
"phase": "RUNNING"
Expand All @@ -23,7 +23,7 @@
},
"spec": {
"type": "DEDICATED",
"clusterSpec": null
"clusterSpec": {}
},
"status": {
"phase": "PENDING"
Expand All @@ -38,7 +38,7 @@
},
"spec": {
"type": "DEDICATED",
"clusterSpec": null
"clusterSpec": {}
},
"status": {
"phase": "COMPLETE"
Expand Down
52 changes: 29 additions & 23 deletions test/test-server/flink_onprem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,26 @@
}
}

func createComputePool(poolName, phase string) cmfsdk.ComputePool {
// createComputePool returns a raw map instead of cmfsdk.ComputePool because
// in SDK v0.0.6, ComputePool.Status is typed as *map[string]map[string]interface{}.
// The status must use nested maps so the SDK client can deserialize the response.
func createComputePool(poolName, phase string) map[string]interface{} {
timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String()

status := cmfsdk.ComputePoolStatus{
Phase: phase,
}

return cmfsdk.ComputePool{
Metadata: cmfsdk.ComputePoolMetadata{
Name: poolName,
CreationTimestamp: &timeStamp,
return map[string]interface{}{
"metadata": map[string]interface{}{
"name": poolName,
"creationTimestamp": timeStamp,
},
Spec: cmfsdk.ComputePoolSpec{
Type: "DEDICATED",
"spec": map[string]interface{}{
"type": "DEDICATED",
"clusterSpec": map[string]interface{}{},
},
"status": map[string]interface{}{
"phase": map[string]interface{}{
"value": phase,
},
},
Status: &status,
}
}

Expand Down Expand Up @@ -364,14 +368,15 @@
err = json.Unmarshal(reqBody, &environment)
require.NoError(t, err)

if strings.Contains(environment.Name, "failure") {
envName := environment.GetName()
if strings.Contains(envName, "failure") {
http.Error(w, "", http.StatusUnprocessableEntity)
return
}

// Already existing environment: update
if environment.Name == "default" || environment.Name == "test" {
outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace")
if envName == "default" || envName == "test" {
outputEnvironment := createEnvironment(envName, envName+"-namespace")

Check failure on line 379 in test/test-server/flink_onprem_handler.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "-namespace" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3320&issues=91ed0128-7c1f-4cc3-89e4-28128ad4dcd5&open=91ed0128-7c1f-4cc3-89e4-28128ad4dcd5
// This is a dummy update - only the defaults can be updated anyway.
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
Expand All @@ -382,7 +387,7 @@
}

// New environment: create
outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace())
outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace())
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
outputEnvironment.StatementDefaults = environment.StatementDefaults
Expand Down Expand Up @@ -782,16 +787,17 @@

switch r.Method {
case http.MethodGet:
computePool1 := createComputePool("test-pool1", "RUNNING")
computePool2 := createComputePool("test-pool2", "PENDING")
computePool3 := createComputePool("test-pool3", "COMPLETE")

computePools := []cmfsdk.ComputePool{computePool1, computePool2, computePool3}
computePoolsPage := cmfsdk.ComputePoolsPage{}
computePoolsPage := map[string]interface{}{
"items": []interface{}{},
}
page := r.URL.Query().Get("page")

if page == "0" {
computePoolsPage.SetItems(computePools)
computePoolsPage["items"] = []interface{}{
createComputePool("test-pool1", "RUNNING"),
createComputePool("test-pool2", "PENDING"),
createComputePool("test-pool3", "COMPLETE"),
}
}

err := json.NewEncoder(w).Encode(computePoolsPage)
Expand Down