diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..2af4794436 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -1,11 +1,19 @@ package flink import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" ) type catalogOut struct { @@ -25,10 +33,67 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogUpdateCommand()) return cmd } +func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) + for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { + databases = append(databases, kafkaCluster.DatabaseName) + } + var creationTime string + if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp + } + table.Add(&catalogOut{ + CreationTime: creationTime, + Name: sdkOutputCatalog.GetMetadata().Name, + Databases: databases, + }) + return table.Print() + } + + localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) + return output.SerializedOutput(cmd, localCatalog) +} + +func readCatalogResourceFile(resourceFilePath string) (cmfsdk.KafkaCatalog, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to read file: %w", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaCatalog{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkCatalog cmfsdk.KafkaCatalog + if err = json.Unmarshal(jsonBytes, &sdkCatalog); err != nil { + return cmfsdk.KafkaCatalog{}, fmt.Errorf("failed to bind data to KafkaCatalog model: %w", err) + } + + return sdkCatalog, nil +} + func convertSdkCatalogToLocalCatalog(sdkOutputCatalog cmfsdk.KafkaCatalog) LocalKafkaCatalog { localClusters := make([]LocalKafkaCatalogSpecKafkaClusters, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) for _, sdkCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { diff --git a/internal/flink/command_catalog_create.go b/internal/flink/command_catalog_create.go index 791dec844c..71634056d0 100644 --- a/internal/flink/command_catalog_create.go +++ b/internal/flink/command_catalog_create.go @@ -1,25 +1,15 @@ package flink import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "github.com/spf13/cobra" - "gopkg.in/yaml.v3" - - cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/errors" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogCreateCommand() *cobra.Command { cmd := &cobra.Command{ Use: "create ", - Short: "Create a Flink catalog.", + Short: "Create a Flink catalog in Confluent Platform.", Long: "Create a Flink catalog in Confluent Platform that provides metadata about tables and other database objects such as views and functions.", Args: cobra.ExactArgs(1), RunE: c.catalogCreate, @@ -32,69 +22,20 @@ func (c *command) newCatalogCreateCommand() *cobra.Command { } func (c *command) catalogCreate(cmd *cobra.Command, args []string) error { - resourceFilePath := args[0] - client, err := c.GetCmfClient(cmd) if err != nil { return err } - // Read file contents - data, err := os.ReadFile(resourceFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %v", err) - } - - var genericData map[string]interface{} - ext := filepath.Ext(resourceFilePath) - switch ext { - case ".json": - err = json.Unmarshal(data, &genericData) - case ".yaml", ".yml": - err = yaml.Unmarshal(data, &genericData) - default: - return errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") - } + sdkCatalog, err := readCatalogResourceFile(args[0]) if err != nil { return err } - jsonBytes, err := json.Marshal(genericData) - if err != nil { - return fmt.Errorf("failed to marshal intermediate data: %w", err) - } - - var sdkCatalog cmfsdk.KafkaCatalog - if err = json.Unmarshal(jsonBytes, &sdkCatalog); err != nil { - return fmt.Errorf("failed to bind data to KafkaCatalog model: %w", err) - } - sdkOutputCatalog, err := client.CreateCatalog(c.createContext(), sdkCatalog) if err != nil { return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - // Populate the databases field with the names of the databases - databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) - for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { - databases = append(databases, kafkaCluster.DatabaseName) - } - var creationTime string - if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp - } else { - creationTime = "" - } - table.Add(&catalogOut{ - CreationTime: creationTime, - Name: sdkOutputCatalog.GetMetadata().Name, - Databases: databases, - }) - return table.Print() - } - - localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) - return output.SerializedOutput(cmd, localCatalog) + return printCatalogOutput(cmd, sdkOutputCatalog) } diff --git a/internal/flink/command_catalog_describe.go b/internal/flink/command_catalog_describe.go index ae784540b3..5c569a3871 100644 --- a/internal/flink/command_catalog_describe.go +++ b/internal/flink/command_catalog_describe.go @@ -4,7 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDescribeCommand() *cobra.Command { @@ -22,39 +21,15 @@ func (c *command) newCatalogDescribeCommand() *cobra.Command { } func (c *command) catalogDescribe(cmd *cobra.Command, args []string) error { - name := args[0] - client, err := c.GetCmfClient(cmd) if err != nil { return err } - sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), name) + sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), args[0]) if err != nil { return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - // Populate the databases field with the names of the databases - databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) - for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { - databases = append(databases, kafkaCluster.DatabaseName) - } - var creationTime string - if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp - } else { - creationTime = "" - } - table.Add(&catalogOut{ - CreationTime: creationTime, - Name: sdkOutputCatalog.GetMetadata().Name, - Databases: databases, - }) - return table.Print() - } - - localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) - return output.SerializedOutput(cmd, localCatalog) + return printCatalogOutput(cmd, sdkOutputCatalog) } diff --git a/internal/flink/command_catalog_update.go b/internal/flink/command_catalog_update.go new file mode 100644 index 0000000000..6ae9372b92 --- /dev/null +++ b/internal/flink/command_catalog_update.go @@ -0,0 +1,54 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink catalog in Confluent Platform.", + Long: "Update an existing Flink catalog in Confluent Platform from a resource file.", + Args: cobra.ExactArgs(1), + RunE: c.catalogUpdate, + } + + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkCatalog, err := readCatalogResourceFile(resourceFilePath) + if err != nil { + return err + } + + catalogName := sdkCatalog.Metadata.Name + if catalogName == "" { + return fmt.Errorf("catalog name is required: ensure the resource file contains a non-empty \"metadata.name\" field") + } + + if err := client.UpdateCatalog(c.createContext(), catalogName, sdkCatalog); err != nil { + return err + } + + sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), catalogName) + if err != nil { + return fmt.Errorf("catalog %q was updated successfully, but failed to retrieve updated details: %w", catalogName, err) + } + + return printCatalogOutput(cmd, sdkOutputCatalog) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..158d587a9a 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -565,6 +565,14 @@ func (cmfClient *CmfRestClient) ListCatalog(ctx context.Context) ([]cmfsdk.Kafka return catalogs, nil } +func (cmfClient *CmfRestClient) UpdateCatalog(ctx context.Context, catalogName string, kafkaCatalog cmfsdk.KafkaCatalog) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaCatalog(ctx, catalogName).KafkaCatalog(kafkaCatalog).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update Kafka Catalog "%s": %s`, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName string) error { httpResp, err := cmfClient.SQLApi.DeleteKafkaCatalog(ctx, catalogName).Execute() return parseSdkError(httpResp, err) diff --git a/test/fixtures/input/flink/catalog/update-invalid-failure.json b/test/fixtures/input/flink/catalog/update-invalid-failure.json new file mode 100644 index 0000000000..761b3c784d --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-invalid-failure.json @@ -0,0 +1,17 @@ +{ + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", + "metadata": { + "name": "invalid-catalog" + }, + "spec": { + "kafkaClusters": [ + { + "databaseName": "dev" + }, + { + "databaseName": "prod" + } + ] + } +} diff --git a/test/fixtures/input/flink/catalog/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/update-invalid-failure.yaml new file mode 100644 index 0000000000..0f763ec3aa --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog +metadata: + name: invalid-catalog +spec: + kafkaClusters: + - databaseName: dev + - databaseName: prod diff --git a/test/fixtures/input/flink/catalog/update-successful.json b/test/fixtures/input/flink/catalog/update-successful.json new file mode 100644 index 0000000000..651a5de54c --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-successful.json @@ -0,0 +1,17 @@ +{ + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", + "metadata": { + "name": "test-catalog" + }, + "spec": { + "kafkaClusters": [ + { + "databaseName": "dev" + }, + { + "databaseName": "prod" + } + ] + } +} diff --git a/test/fixtures/input/flink/catalog/update-successful.yaml b/test/fixtures/input/flink/catalog/update-successful.yaml new file mode 100644 index 0000000000..d0de02e1ab --- /dev/null +++ b/test/fixtures/input/flink/catalog/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog +metadata: + name: test-catalog +spec: + kafkaClusters: + - databaseName: dev + - databaseName: prod diff --git a/test/fixtures/output/flink/catalog/describe-success-json.golden b/test/fixtures/output/flink/catalog/describe-success-json.golden index 783d7c90f3..a5ba5c18cc 100644 --- a/test/fixtures/output/flink/catalog/describe-success-json.golden +++ b/test/fixtures/output/flink/catalog/describe-success-json.golden @@ -1,22 +1,28 @@ { - "apiVersion": "", - "kind": "", + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", "metadata": { "name": "test-catalog1", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "srInstance": { - "connectionConfig": null + "connectionConfig": { + "schema.registry.url": "http://localhost:8081" + } }, "kafkaClusters": [ { "databaseName": "test-database", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } }, { "databaseName": "test-database-2", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } } ] } diff --git a/test/fixtures/output/flink/catalog/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/describe-success-yaml.golden index d5215859b5..f1153a026b 100644 --- a/test/fixtures/output/flink/catalog/describe-success-yaml.golden +++ b/test/fixtures/output/flink/catalog/describe-success-yaml.golden @@ -1,13 +1,16 @@ -apiVersion: "" -kind: "" +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog metadata: name: test-catalog1 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: srInstance: - connectionConfig: {} + connectionConfig: + schema.registry.url: http://localhost:8081 kafkaClusters: - databaseName: test-database - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 - databaseName: test-database-2 - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/describe-success.golden b/test/fixtures/output/flink/catalog/describe-success.golden index fb05e097a0..369ab8183a 100644 --- a/test/fixtures/output/flink/catalog/describe-success.golden +++ b/test/fixtures/output/flink/catalog/describe-success.golden @@ -1,5 +1,5 @@ +---------------+--------------------------------+ -| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Creation Time | 2025-08-05T12:00:00Z | | Name | test-catalog1 | | Databases | test-database, test-database-2 | +---------------+--------------------------------+ diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..0e936a93e9 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -8,6 +8,7 @@ Available Commands: delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. + update Update a Flink catalog. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/list-json.golden b/test/fixtures/output/flink/catalog/list-json.golden index 26965642a8..418c054b9f 100644 --- a/test/fixtures/output/flink/catalog/list-json.golden +++ b/test/fixtures/output/flink/catalog/list-json.golden @@ -1,69 +1,87 @@ [ { - "apiVersion": "", - "kind": "", + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", "metadata": { "name": "test-catalog1", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "srInstance": { - "connectionConfig": null + "connectionConfig": { + "schema.registry.url": "http://localhost:8081" + } }, "kafkaClusters": [ { "databaseName": "test-database", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } }, { "databaseName": "test-database-2", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } } ] } }, { - "apiVersion": "", - "kind": "", + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", "metadata": { "name": "test-catalog2", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "srInstance": { - "connectionConfig": null + "connectionConfig": { + "schema.registry.url": "http://localhost:8081" + } }, "kafkaClusters": [ { "databaseName": "test-database", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } }, { "databaseName": "test-database-2", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } } ] } }, { - "apiVersion": "", - "kind": "", + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", "metadata": { "name": "test-catalog3", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "srInstance": { - "connectionConfig": null + "connectionConfig": { + "schema.registry.url": "http://localhost:8081" + } }, "kafkaClusters": [ { "databaseName": "test-database", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } }, { "databaseName": "test-database-2", - "connectionConfig": null + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } } ] } diff --git a/test/fixtures/output/flink/catalog/list-successful.golden b/test/fixtures/output/flink/catalog/list-successful.golden index 1973b7b8d4..db0906ee20 100644 --- a/test/fixtures/output/flink/catalog/list-successful.golden +++ b/test/fixtures/output/flink/catalog/list-successful.golden @@ -1,5 +1,5 @@ - Creation Time | Name | Databases ---------------------------------+---------------+--------------------------------- - 2025-08-05 12:00:00 +0000 UTC | test-catalog1 | test-database, test-database-2 - 2025-08-05 12:00:00 +0000 UTC | test-catalog2 | test-database, test-database-2 - 2025-08-05 12:00:00 +0000 UTC | test-catalog3 | test-database, test-database-2 + Creation Time | Name | Databases +-----------------------+---------------+--------------------------------- + 2025-08-05T12:00:00Z | test-catalog1 | test-database, test-database-2 + 2025-08-05T12:00:00Z | test-catalog2 | test-database, test-database-2 + 2025-08-05T12:00:00Z | test-catalog3 | test-database, test-database-2 diff --git a/test/fixtures/output/flink/catalog/list-yaml.golden b/test/fixtures/output/flink/catalog/list-yaml.golden index 37eb9df2f5..992df17211 100644 --- a/test/fixtures/output/flink/catalog/list-yaml.golden +++ b/test/fixtures/output/flink/catalog/list-yaml.golden @@ -1,39 +1,48 @@ -- apiVersion: "" - kind: "" +- apiVersion: cmf/api/v1/catalog + kind: KafkaCatalog metadata: name: test-catalog1 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: srInstance: - connectionConfig: {} + connectionConfig: + schema.registry.url: http://localhost:8081 kafkaClusters: - databaseName: test-database - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 - databaseName: test-database-2 - connectionConfig: {} -- apiVersion: "" - kind: "" + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/catalog + kind: KafkaCatalog metadata: name: test-catalog2 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: srInstance: - connectionConfig: {} + connectionConfig: + schema.registry.url: http://localhost:8081 kafkaClusters: - databaseName: test-database - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 - databaseName: test-database-2 - connectionConfig: {} -- apiVersion: "" - kind: "" + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/catalog + kind: KafkaCatalog metadata: name: test-catalog3 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: srInstance: - connectionConfig: {} + connectionConfig: + schema.registry.url: http://localhost:8081 kafkaClusters: - databaseName: test-database - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 - databaseName: test-database-2 - connectionConfig: {} + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/update-help-onprem.golden b/test/fixtures/output/flink/catalog/update-help-onprem.golden new file mode 100644 index 0000000000..325c3ef05c --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-help-onprem.golden @@ -0,0 +1,16 @@ +Update an existing Kafka Catalog in Confluent Platform from a resource file. + +Usage: + confluent flink catalog update [flags] + +Flags: + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/update-invalid-failure.golden new file mode 100644 index 0000000000..235404bf44 --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update Kafka Catalog "invalid-catalog": The catalog name is invalid diff --git a/test/fixtures/output/flink/catalog/update-success-json.golden b/test/fixtures/output/flink/catalog/update-success-json.golden new file mode 100644 index 0000000000..0f02ca2699 --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success-json.golden @@ -0,0 +1,29 @@ +{ + "apiVersion": "cmf/api/v1/catalog", + "kind": "KafkaCatalog", + "metadata": { + "name": "test-catalog", + "creationTimestamp": "2025-08-05T12:00:00Z" + }, + "spec": { + "srInstance": { + "connectionConfig": { + "schema.registry.url": "http://localhost:8081" + } + }, + "kafkaClusters": [ + { + "databaseName": "test-database", + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + }, + { + "databaseName": "test-database-2", + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + ] + } +} diff --git a/test/fixtures/output/flink/catalog/update-success-yaml.golden b/test/fixtures/output/flink/catalog/update-success-yaml.golden new file mode 100644 index 0000000000..b9076e194e --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success-yaml.golden @@ -0,0 +1,16 @@ +apiVersion: cmf/api/v1/catalog +kind: KafkaCatalog +metadata: + name: test-catalog + creationTimestamp: "2025-08-05T12:00:00Z" +spec: + srInstance: + connectionConfig: + schema.registry.url: http://localhost:8081 + kafkaClusters: + - databaseName: test-database + connectionConfig: + bootstrap.servers: localhost:9092 + - databaseName: test-database-2 + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/update-success.golden b/test/fixtures/output/flink/catalog/update-success.golden new file mode 100644 index 0000000000..c55f3f5aea --- /dev/null +++ b/test/fixtures/output/flink/catalog/update-success.golden @@ -0,0 +1,5 @@ ++---------------+--------------------------------+ +| Creation Time | 2025-08-05T12:00:00Z | +| Name | test-catalog | +| Databases | test-database, test-database-2 | ++---------------+--------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..fc77865470 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,19 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, + // failure + {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success @@ -607,6 +620,19 @@ func (s *CLITestSuite) TestFlinkCatalogCreateWithYAML() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogUpdateOnPremWithYAML() { + tests := []CLITest{ + // success + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml", fixture: "flink/catalog/update-success.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output json", fixture: "flink/catalog/update-success-json.golden"}, + {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, + // failure + {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.yaml", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkShellOnPrem() { tests := []flinkShellTest{ { diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..7c9ccbd53c 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -205,19 +205,32 @@ func createComputePool(poolName, phase string) cmfsdk.ComputePool { } func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { - timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).Format(time.RFC3339) return cmfsdk.KafkaCatalog{ + ApiVersion: "cmf/api/v1/catalog", + Kind: "KafkaCatalog", Metadata: cmfsdk.CatalogMetadata{ Name: catName, CreationTimestamp: &timeStamp, }, Spec: cmfsdk.KafkaCatalogSpec{ + SrInstance: cmfsdk.KafkaCatalogSpecSrInstance{ + ConnectionConfig: map[string]string{ + "schema.registry.url": "http://localhost:8081", + }, + }, KafkaClusters: &[]cmfsdk.KafkaCatalogSpecKafkaClusters{ { DatabaseName: "test-database", + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, }, { DatabaseName: "test-database-2", + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, }, }, }, @@ -918,7 +931,7 @@ func handleCmfCatalogs(t *testing.T) http.HandlerFunc { } // Handler for "cmf/api/v1/catalogs/kafka/{catName}" -// Used by describe, delete catalog, no update catalog. +// Used by describe, update, delete catalog. func handleCmfCatalog(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) @@ -937,6 +950,18 @@ func handleCmfCatalog(t *testing.T) http.HandlerFunc { err := json.NewEncoder(w).Encode(catalog) require.NoError(t, err) return + case http.MethodPut: + if catalogName == "invalid-catalog" { + http.Error(w, "The catalog name is invalid", http.StatusNotFound) + return + } + + req := new(cmfsdk.KafkaCatalog) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + return case http.MethodDelete: if catalogName == "non-exist-catalog" { http.Error(w, "", http.StatusNotFound)