Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions internal/flink/command_catalog.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

type catalogOut struct {
Expand All @@ -25,10 +33,67 @@ func (c *command) newCatalogCommand() *cobra.Command {
cmd.AddCommand(c.newCatalogDeleteCommand())
cmd.AddCommand(c.newCatalogDescribeCommand())
cmd.AddCommand(c.newCatalogListCommand())
cmd.AddCommand(c.newCatalogUpdateCommand())

return cmd
}

func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error {
if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters()))
for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() {
databases = append(databases, kafkaCluster.DatabaseName)
}
var creationTime string
if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp
}
table.Add(&catalogOut{
CreationTime: creationTime,
Name: sdkOutputCatalog.GetMetadata().Name,
Databases: databases,
})
return table.Print()
}

localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog)
return output.SerializedOutput(cmd, localCatalog)
}
Comment on lines +41 to +62
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

printCatalogOutput centralizes catalog rendering, but catalogCreate and catalogDescribe still contain their own (nearly identical) output formatting logic. To avoid future drift between commands, consider switching those commands to call printCatalogOutput as well.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robert Metzger (@rmetzger) Kumar Mallikarjuna (@kumar-mallikarjuna) does it make sense to touch other existing files to reduce duplication, or should we tackle that in another change ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to fix this as part of this PR

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Copilot and Robert Metzger (@rmetzger) 's comment, maybe we can apply this idea to the catalog database as well.

Copy link
Copy Markdown
Contributor Author

@paras-negi-flink Paras Negi (paras-negi-flink) May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — catalogCreate and catalogDescribe now both call printCatalogOutput.


func readCatalogResourceFile(resourceFilePath string) (cmfsdk.KafkaCatalog, error) {
data, err := os.ReadFile(resourceFilePath)
if err != nil {
return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to read file: %w", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return cmfsdk.KafkaCatalog{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
if err != nil {
return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to parse input file: %w", err)
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkCatalog cmfsdk.KafkaCatalog
if err = json.Unmarshal(jsonBytes, &sdkCatalog); err != nil {
return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to bind data to KafkaCatalog model: %w", err)
}

return sdkCatalog, nil
}
Comment thread
paras-negi-flink marked this conversation as resolved.

func convertSdkCatalogToLocalCatalog(sdkOutputCatalog cmfsdk.KafkaCatalog) LocalKafkaCatalog {
localClusters := make([]LocalKafkaCatalogSpecKafkaClusters, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters()))
for _, sdkCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() {
Expand Down
65 changes: 3 additions & 62 deletions internal/flink/command_catalog_create.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <resourceFilePath>",
Short: "Create a Flink catalog.",
Short: "Create a Flink catalog in Confluent Platform.",
Long: "Create a Flink catalog in Confluent Platform that provides metadata about tables and other database objects such as views and functions.",
Args: cobra.ExactArgs(1),
RunE: c.catalogCreate,
Expand All @@ -32,69 +22,20 @@ func (c *command) newCatalogCreateCommand() *cobra.Command {
}

func (c *command) catalogCreate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

// Read file contents
data, err := os.ReadFile(resourceFilePath)
if err != nil {
return fmt.Errorf("failed to read file: %v", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
sdkCatalog, err := readCatalogResourceFile(args[0])
if err != nil {
return err
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkCatalog cmfsdk.KafkaCatalog
if err = json.Unmarshal(jsonBytes, &sdkCatalog); err != nil {
return fmt.Errorf("failed to bind data to KafkaCatalog model: %w", err)
}

sdkOutputCatalog, err := client.CreateCatalog(c.createContext(), sdkCatalog)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
// Populate the databases field with the names of the databases
databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters()))
for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() {
databases = append(databases, kafkaCluster.DatabaseName)
}
var creationTime string
if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp
} else {
creationTime = ""
}
table.Add(&catalogOut{
CreationTime: creationTime,
Name: sdkOutputCatalog.GetMetadata().Name,
Databases: databases,
})
return table.Print()
}

localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog)
return output.SerializedOutput(cmd, localCatalog)
return printCatalogOutput(cmd, sdkOutputCatalog)
}
29 changes: 2 additions & 27 deletions internal/flink/command_catalog_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDescribeCommand() *cobra.Command {
Expand All @@ -22,39 +21,15 @@ func (c *command) newCatalogDescribeCommand() *cobra.Command {
}

func (c *command) catalogDescribe(cmd *cobra.Command, args []string) error {
name := args[0]

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), name)
sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), args[0])
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
// Populate the databases field with the names of the databases
databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters()))
for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() {
databases = append(databases, kafkaCluster.DatabaseName)
}
var creationTime string
if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp
} else {
creationTime = ""
}
table.Add(&catalogOut{
CreationTime: creationTime,
Name: sdkOutputCatalog.GetMetadata().Name,
Databases: databases,
})
return table.Print()
}

localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog)
return output.SerializedOutput(cmd, localCatalog)
return printCatalogOutput(cmd, sdkOutputCatalog)
}
54 changes: 54 additions & 0 deletions internal/flink/command_catalog_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package flink

import (
"fmt"

"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogUpdateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "update <resourceFilePath>",
Short: "Update a Flink catalog in Confluent Platform.",
Long: "Update an existing Flink catalog in Confluent Platform from a resource file.",
Args: cobra.ExactArgs(1),
RunE: c.catalogUpdate,
}

addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogUpdate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkCatalog, err := readCatalogResourceFile(resourceFilePath)
if err != nil {
return err
}

catalogName := sdkCatalog.Metadata.Name
if catalogName == "" {
return fmt.Errorf("catalog name is required: ensure the resource file contains a non-empty \"metadata.name\" field")
}

if err := client.UpdateCatalog(c.createContext(), catalogName, sdkCatalog); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the catalog database, is it the design that Update command doesn't return the updated catalog to make this an atomic operation? I am seeing an additional describe call below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats correct. UpdateKafkaCatalogExecute returns (*http.Response, error) only — no body. The follow-up DescribeCatalog is the only way to render the updated resource until CMF changes the PUT endpoint to return the catalog.

As discussed, we will address this in a follow up PR.

return err
}

sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), catalogName)
Comment thread
paras-negi-flink marked this conversation as resolved.
if err != nil {
return fmt.Errorf("catalog %q was updated successfully, but failed to retrieve updated details: %w", catalogName, err)
}

return printCatalogOutput(cmd, sdkOutputCatalog)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use this unified print out function for the create command as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. catalogCreate now ends with return printCatalogOutput(cmd, sdkOutputCatalog)

}
8 changes: 8 additions & 0 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,14 @@ func (cmfClient *CmfRestClient) ListCatalog(ctx context.Context) ([]cmfsdk.Kafka
return catalogs, nil
}

func (cmfClient *CmfRestClient) UpdateCatalog(ctx context.Context, catalogName string, kafkaCatalog cmfsdk.KafkaCatalog) error {
httpResponse, err := cmfClient.SQLApi.UpdateKafkaCatalog(ctx, catalogName).KafkaCatalog(kafkaCatalog).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return fmt.Errorf(`failed to update Kafka Catalog "%s": %s`, catalogName, parsedErr)
}
return nil
}

func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName string) error {
httpResp, err := cmfClient.SQLApi.DeleteKafkaCatalog(ctx, catalogName).Execute()
return parseSdkError(httpResp, err)
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/input/flink/catalog/update-invalid-failure.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"apiVersion": "cmf/api/v1/catalog",
"kind": "KafkaCatalog",
"metadata": {
"name": "invalid-catalog"
},
"spec": {
"kafkaClusters": [
{
"databaseName": "dev"
},
{
"databaseName": "prod"
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: cmf/api/v1/catalog
kind: KafkaCatalog
metadata:
name: invalid-catalog
spec:
kafkaClusters:
- databaseName: dev
- databaseName: prod
17 changes: 17 additions & 0 deletions test/fixtures/input/flink/catalog/update-successful.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"apiVersion": "cmf/api/v1/catalog",
"kind": "KafkaCatalog",
"metadata": {
"name": "test-catalog"
},
"spec": {
"kafkaClusters": [
{
"databaseName": "dev"
},
{
"databaseName": "prod"
}
]
}
}
8 changes: 8 additions & 0 deletions test/fixtures/input/flink/catalog/update-successful.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: cmf/api/v1/catalog
kind: KafkaCatalog
metadata:
name: test-catalog
spec:
kafkaClusters:
- databaseName: dev
- databaseName: prod
18 changes: 12 additions & 6 deletions test/fixtures/output/flink/catalog/describe-success-json.golden
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
{
"apiVersion": "",
"kind": "",
"apiVersion": "cmf/api/v1/catalog",
"kind": "KafkaCatalog",
"metadata": {
"name": "test-catalog1",
"creationTimestamp": "2025-08-05 12:00:00 +0000 UTC"
"creationTimestamp": "2025-08-05T12:00:00Z"
},
"spec": {
"srInstance": {
"connectionConfig": null
"connectionConfig": {
"schema.registry.url": "http://localhost:8081"
}
},
"kafkaClusters": [
{
"databaseName": "test-database",
"connectionConfig": null
"connectionConfig": {
"bootstrap.servers": "localhost:9092"
}
},
{
"databaseName": "test-database-2",
"connectionConfig": null
"connectionConfig": {
"bootstrap.servers": "localhost:9092"
}
}
]
}
Expand Down
Loading