From 0e30b7866b96a378bb0fdf20d3c2993088bbad9a Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Sat, 21 Mar 2026 12:52:16 +0530 Subject: [PATCH 01/16] Add flink catalog database create command --- internal/flink/command_catalog.go | 1 + internal/flink/command_catalog_database.go | 49 +++++++++ .../flink/command_catalog_database_create.go | 102 ++++++++++++++++++ internal/flink/local_types.go | 26 +++++ pkg/flink/cmf_rest_client.go | 9 ++ 5 files changed, 187 insertions(+) create mode 100644 internal/flink/command_catalog_database.go create mode 100644 internal/flink/command_catalog_database_create.go diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..8bfe75434f 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogDatabaseCommand()) return cmd } diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go new file mode 100644 index 0000000000..8bd27e0c49 --- /dev/null +++ b/internal/flink/command_catalog_database.go @@ -0,0 +1,49 @@ +package flink + +import ( + "github.com/spf13/cobra" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +type databaseOut struct { + CreationTime string `human:"Creation Time" serialized:"creation_time"` + Name string `human:"Name" serialized:"name"` + Catalog string `human:"Catalog" serialized:"catalog"` +} + +func (c *command) newCatalogDatabaseCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "database", + Short: "Manage Flink databases in Confluent Platform.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + + return cmd +} + +func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { + return LocalKafkaDatabase{ + ApiVersion: sdkDatabase.ApiVersion, + Kind: sdkDatabase.Kind, + Metadata: LocalDatabaseMetadata{ + Name: sdkDatabase.Metadata.Name, + CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp, + UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp, + Uid: sdkDatabase.Metadata.Uid, + Labels: sdkDatabase.Metadata.Labels, + Annotations: sdkDatabase.Metadata.Annotations, + }, + Spec: LocalKafkaDatabaseSpec{ + KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig, + ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId, + }, + AlterEnvironments: sdkDatabase.Spec.AlterEnvironments, + }, + } +} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go new file mode 100644 index 0000000000..4bd6dd3972 --- /dev/null +++ b/internal/flink/command_catalog_database_create.go @@ -0,0 +1,102 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create ", + Short: "Create a Flink database.", + Long: "Create a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseCreate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return errors.NewErrorWithSuggestions( + fmt.Sprintf("unsupported file format: %s", ext), + "Supported file formats are .json, .yaml, and .yml.", + ) + } + if err != nil { + return err + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 19e0b756d9..6342966129 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -142,6 +142,32 @@ type LocalKafkaCatalogSpecSrInstance struct { ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` } +type LocalKafkaDatabase struct { + ApiVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata LocalDatabaseMetadata `json:"metadata" yaml:"metadata"` + Spec LocalKafkaDatabaseSpec `json:"spec" yaml:"spec"` +} + +type LocalDatabaseMetadata struct { + Name string `json:"name" yaml:"name"` + CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` + UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"` + Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"` + Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` + Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"` +} + +type LocalKafkaDatabaseSpec struct { + KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` + AlterEnvironments *[]string `json:"alterEnvironments,omitempty" yaml:"alterEnvironments,omitempty"` +} + +type LocalKafkaDatabaseSpecKafkaCluster struct { + ConnectionConfig map[string]string `json:"connectionConfig" yaml:"connectionConfig"` + ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` +} + type LocalResultSchema struct { Columns []LocalResultSchemaColumn `json:"columns" yaml:"columns"` } diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 97d68971eb..0ca0282a47 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,15 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { + databaseName := kafkaDatabase.Metadata.Name + outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to create database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { From 96a12c9676d499ca3576bbed8eb4b5362a0f392d Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 23 Mar 2026 17:37:28 +0530 Subject: [PATCH 02/16] Add golden test files for flink catalog database command --- .../catalog/database/create-help-onprem.golden | 17 +++++++++++++++++ .../flink/catalog/database/help-onprem.golden | 14 ++++++++++++++ .../output/flink/catalog/help-onprem.golden | 1 + 3 files changed, 32 insertions(+) create mode 100644 test/fixtures/output/flink/catalog/database/create-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/help-onprem.golden diff --git a/test/fixtures/output/flink/catalog/database/create-help-onprem.golden b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden new file mode 100644 index 0000000000..4b0f03902e --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden @@ -0,0 +1,17 @@ +Create a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database create [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden new file mode 100644 index 0000000000..2c152416ae --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -0,0 +1,14 @@ +Manage Flink databases in Confluent Platform. + +Usage: + confluent flink catalog database [command] + +Available Commands: + create Create a Flink database. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink catalog database [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..1ce122027c 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink catalog. + database Manage Flink databases in Confluent Platform. delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. From 33e3b892c84bf99eb9e60680b0b002818cfea0e3 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 24 Mar 2026 15:34:33 +0530 Subject: [PATCH 03/16] Add integration tests and fix golden files for flink catalog database command --- .../database/create-invalid-failure.json | 12 +++++ .../catalog/database/create-successful.json | 14 ++++++ .../database/create-invalid-failure.golden | 1 + .../database/create-success-json.golden | 15 ++++++ .../catalog/database/create-success.golden | 5 ++ test/flink_onprem_test.go | 12 +++++ test/test-server/flink_onprem_handler.go | 48 +++++++++++++++++++ test/test-server/flink_onprem_router.go | 1 + 8 files changed, 108 insertions(+) create mode 100644 test/fixtures/input/flink/catalog/database/create-invalid-failure.json create mode 100644 test/fixtures/input/flink/catalog/database/create-successful.json create mode 100644 test/fixtures/output/flink/catalog/database/create-invalid-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/create-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/create-success.golden diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.json b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json new file mode 100644 index 0000000000..e45d244060 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json @@ -0,0 +1,12 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": {} + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.json b/test/fixtures/input/flink/catalog/database/create-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden new file mode 100644 index 0000000000..d2b4965a8a --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to create database "invalid-database" in catalog "test-catalog": The Kafka database object from resource file is invalid diff --git a/test/fixtures/output/flink/catalog/database/create-success-json.golden b/test/fixtures/output/flink/catalog/database/create-success-json.golden new file mode 100644 index 0000000000..065e5eba7c --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-03-12 23:42:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-success.golden b/test/fixtures/output/flink/catalog/database/create-success.golden new file mode 100644 index 0000000000..b1b1a2a1ff --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-03-12 23:42:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..0f7d956714 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,18 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + // failure + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 3089876db8..f2c5f031bc 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,6 +224,54 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + +func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + switch r.Method { + case http.MethodPost: + reqBody, err := io.ReadAll(r.Body) + require.NoError(t, err) + var database cmfsdk.KafkaDatabase + err = json.Unmarshal(reqBody, &database) + require.NoError(t, err) + + dbName := database.GetMetadata().Name + + if dbName == "invalid-database" { + http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) + return + } + + timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() + database.Metadata.CreationTimestamp = &timeStamp + err = json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..a565b7d3f5 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -9,6 +9,7 @@ import ( var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments}, From 36c006d225785d404cb19be2525a2230dd90afbf Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 24 Mar 2026 15:58:19 +0530 Subject: [PATCH 04/16] Fix flink_onprem_handler.go --- test/test-server/flink_onprem_handler.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index f2c5f031bc..1aec24fa3a 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,25 +224,6 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } -func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { - timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() - return cmfsdk.KafkaDatabase{ - ApiVersion: "cmf/api/v1/database", - Kind: "KafkaDatabase", - Metadata: cmfsdk.DatabaseMetadata{ - Name: dbName, - CreationTimestamp: &timeStamp, - }, - Spec: cmfsdk.KafkaDatabaseSpec{ - KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ - ConnectionConfig: map[string]string{ - "bootstrap.servers": "localhost:9092", - }, - }, - }, - } -} - func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) From d86fb7ef9fb47712494b9b3c6f7af6fc79ac4ab8 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 30 Mar 2026 09:26:11 +0530 Subject: [PATCH 05/16] Add flink catalog database list command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_list.go | 64 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 18 ++++++ .../database/create-success-yaml.golden | 9 +++ .../flink/catalog/database/help-onprem.golden | 1 + .../catalog/database/list-help-onprem.golden | 17 +++++ .../catalog/database/list-success-json.golden | 32 ++++++++++ .../catalog/database/list-success-yaml.golden | 18 ++++++ .../catalog/database/list-success.golden | 4 ++ test/flink_onprem_test.go | 12 ++++ test/test-server/flink_onprem_handler.go | 10 +++ 11 files changed, 186 insertions(+) create mode 100644 internal/flink/command_catalog_database_list.go create mode 100644 test/fixtures/output/flink/catalog/database/create-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/list-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 8bd27e0c49..f0aa00182a 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseListCommand()) return cmd } diff --git a/internal/flink/command_catalog_database_list.go b/internal/flink/command_catalog_database_list.go new file mode 100644 index 0000000000..2e0ff82f6e --- /dev/null +++ b/internal/flink/command_catalog_database_list.go @@ -0,0 +1,64 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Flink databases in a catalog in Confluent Platform.", + Args: cobra.NoArgs, + RunE: c.catalogDatabaseList, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + list := output.NewList(cmd) + for _, db := range sdkDatabases { + var creationTime string + if db.GetMetadata().CreationTimestamp != nil { + creationTime = *db.GetMetadata().CreationTimestamp + } + list.Add(&databaseOut{ + CreationTime: creationTime, + Name: db.GetMetadata().Name, + Catalog: catalogName, + }) + } + return list.Print() + } + + localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases)) + for _, sdkDatabase := range sdkDatabases { + localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase)) + } + + return output.SerializedOutput(cmd, localDatabases) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 0ca0282a47..d1c86800e5 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,24 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { + databases := make([]cmfsdk.KafkaDatabase, 0) + done := false + const pageSize = 100 + var currentPageNumber int32 = 0 + + for !done { + databasePage, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabases(ctx, catalogName).Page(currentPageNumber).Size(pageSize).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return nil, fmt.Errorf(`failed to list databases in catalog "%s": %s`, catalogName, parsedErr) + } + databases = append(databases, databasePage.GetItems()...) + currentPageNumber, done = extractPageOptions(len(databasePage.GetItems()), currentPageNumber) + } + + return databases, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/test/fixtures/output/flink/catalog/database/create-success-yaml.golden b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden new file mode 100644 index 0000000000..0d9dc39dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-03-12 23:42:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index 2c152416ae..f6edf74b3f 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + list List Flink databases in a catalog in Confluent Platform. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/database/list-help-onprem.golden b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden new file mode 100644 index 0000000000..4dbf71ed16 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden @@ -0,0 +1,17 @@ +List Flink databases in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database list [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/list-success-json.golden b/test/fixtures/output/flink/catalog/database/list-success-json.golden new file mode 100644 index 0000000000..42e40f7379 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-json.golden @@ -0,0 +1,32 @@ +[ + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-1", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + }, + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-2", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + } +] diff --git a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden new file mode 100644 index 0000000000..186a556114 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden @@ -0,0 +1,18 @@ +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-1 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-2 + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden new file mode 100644 index 0000000000..a3a7ea5545 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -0,0 +1,4 @@ + Creation Time | Name | Catalog +--------------------------------+------------------+-------------- + 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog + 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 0f7d956714..55be5662b1 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -415,6 +415,7 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { // success {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, // failure {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, } @@ -422,6 +423,17 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database list --catalog test-catalog", fixture: "flink/catalog/database/list-success.golden"}, + {args: "flink catalog database list --catalog test-catalog --output json", fixture: "flink/catalog/database/list-success-json.golden"}, + {args: "flink catalog database list --catalog test-catalog --output yaml", fixture: "flink/catalog/database/list-success-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 1aec24fa3a..44d302f478 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -228,6 +228,16 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) switch r.Method { + case http.MethodGet: + databases := []cmfsdk.KafkaDatabase{ + createKafkaDatabase("test-database-1"), + createKafkaDatabase("test-database-2"), + } + page := cmfsdk.KafkaDatabasesPage{ + Items: &databases, + } + err := json.NewEncoder(w).Encode(page) + require.NoError(t, err) case http.MethodPost: reqBody, err := io.ReadAll(r.Body) require.NoError(t, err) From 2c09071aec6900756f02e4aefbb6393f2fe1b1b8 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Mon, 30 Mar 2026 09:35:02 +0530 Subject: [PATCH 06/16] Fix build --- test/test-server/flink_onprem_handler.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 44d302f478..d9f8f67fe4 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -224,6 +224,25 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { handleLoginType(t, r) From f85825b252cd7dd8da4fed8c86b6f7cbb6f12e34 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 10:46:01 +0530 Subject: [PATCH 07/16] Fix ListDatabases() --- test/test-server/flink_onprem_handler.go | 34 +++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index d9f8f67fe4..63fc4b3889 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -252,10 +252,14 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { createKafkaDatabase("test-database-1"), createKafkaDatabase("test-database-2"), } - page := cmfsdk.KafkaDatabasesPage{ - Items: &databases, + databasesPage := cmfsdk.KafkaDatabasesPage{} + page := r.URL.Query().Get("page") + + if page == "0" { + databasesPage.SetItems(databases) } - err := json.NewEncoder(w).Encode(page) + + err := json.NewEncoder(w).Encode(databasesPage) require.NoError(t, err) case http.MethodPost: reqBody, err := io.ReadAll(r.Body) @@ -282,6 +286,30 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { } } +func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + dbName := vars["dbName"] + + switch r.Method { + case http.MethodGet: + if dbName == "invalid-database" { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + database := createKafkaDatabase(dbName) + err := json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ From 02dfaf2bfc6d81bdee205d121eda14468f637cd5 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 11:01:52 +0530 Subject: [PATCH 08/16] Added Describe/GET database cli command --- internal/flink/command_catalog_database.go | 1 + .../command_catalog_database_describe.go | 60 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ .../database/describe-help-onprem.golden | 17 ++++++ .../database/describe-not-found.golden | 1 + .../database/describe-success-json.golden | 15 +++++ .../database/describe-success-yaml.golden | 9 +++ .../catalog/database/describe-success.golden | 5 ++ .../flink/catalog/database/help-onprem.golden | 1 + test/flink_onprem_test.go | 13 ++++ test/test-server/flink_onprem_router.go | 1 + 11 files changed, 131 insertions(+) create mode 100644 internal/flink/command_catalog_database_describe.go create mode 100644 test/fixtures/output/flink/catalog/database/describe-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-not-found.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/describe-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index f0aa00182a..356132f443 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) return cmd diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go new file mode 100644 index 0000000000..f6598d5b22 --- /dev/null +++ b/internal/flink/command_catalog_database_describe.go @@ -0,0 +1,60 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDescribe, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error { + name := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index d1c86800e5..71686af98b 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,14 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { + outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to get database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { databases := make([]cmfsdk.KafkaDatabase, 0) done := false diff --git a/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden new file mode 100644 index 0000000000..23cffb3b6d --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden @@ -0,0 +1,17 @@ +Describe a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database describe [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/describe-not-found.golden b/test/fixtures/output/flink/catalog/database/describe-not-found.golden new file mode 100644 index 0000000000..0d2858c098 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-not-found.golden @@ -0,0 +1 @@ +Error: failed to get database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/describe-success-json.golden b/test/fixtures/output/flink/catalog/database/describe-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/describe-success.golden b/test/fixtures/output/flink/catalog/database/describe-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index f6edf74b3f..a566cc318d 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. Global Flags: diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 55be5662b1..9072b9b8b8 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -423,6 +423,19 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database describe test-database --catalog test-catalog", fixture: "flink/catalog/database/describe-success.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output json", fixture: "flink/catalog/database/describe-success-json.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output yaml", fixture: "flink/catalog/database/describe-success-yaml.golden"}, + // failure + {args: "flink catalog database describe invalid-database --catalog test-catalog", fixture: "flink/catalog/database/describe-not-found.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index a565b7d3f5..8d597732f6 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -10,6 +10,7 @@ var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases/{dbName}", handleCmfCatalogDatabase}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments}, From 3271eec05066eda01c41dcde0f0f6f22e6217814 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 16:39:58 +0530 Subject: [PATCH 09/16] Fix the golden files --- .../output/flink/catalog/database/list-success.golden | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden index a3a7ea5545..983bc1d5df 100644 --- a/test/fixtures/output/flink/catalog/database/list-success.golden +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -1,4 +1,4 @@ - Creation Time | Name | Catalog ---------------------------------+------------------+-------------- - 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog - 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog + Creation Time | Name | Catalog +--------------------------------+-----------------+--------------- + 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog + 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog From 8f3a1e13850ba60754d32e4f9bf36e1945ce2bd6 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Tue, 31 Mar 2026 17:50:47 +0530 Subject: [PATCH 10/16] Added the update database cli command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_update.go | 108 ++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 ++ .../database/update-invalid-failure.json | 14 +++ .../catalog/database/update-successful.json | 14 +++ .../flink/catalog/database/help-onprem.golden | 1 + .../database/update-help-onprem.golden | 17 +++ .../database/update-invalid-failure.golden | 1 + .../database/update-success-json.golden | 15 +++ .../database/update-success-yaml.golden | 9 ++ .../catalog/database/update-success.golden | 5 + test/flink_onprem_test.go | 13 +++ test/test-server/flink_onprem_handler.go | 8 ++ 13 files changed, 214 insertions(+) create mode 100644 internal/flink/command_catalog_database_update.go create mode 100644 test/fixtures/input/flink/catalog/database/update-invalid-failure.json create mode 100644 test/fixtures/input/flink/catalog/database/update-successful.json create mode 100644 test/fixtures/output/flink/catalog/database/update-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-invalid-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success-json.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success-yaml.golden create mode 100644 test/fixtures/output/flink/catalog/database/update-success.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 356132f443..80dfd0bd62 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -24,6 +24,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) + cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) return cmd } diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go new file mode 100644 index 0000000000..64a7202ac1 --- /dev/null +++ b/internal/flink/command_catalog_database_update.go @@ -0,0 +1,108 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink database.", + Long: "Update a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseUpdate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return errors.NewErrorWithSuggestions( + fmt.Sprintf("unsupported file format: %s", ext), + "Supported file formats are .json, .yaml, and .yml.", + ) + } + if err != nil { + return err + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + databaseName := sdkDatabase.Metadata.Name + + if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkOutputDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) + return output.SerializedOutput(cmd, localDatabase) +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index 71686af98b..f632682266 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -579,6 +579,14 @@ func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName return outputDatabase, nil } +func (cmfClient *CmfRestClient) UpdateDatabase(ctx context.Context, catalogName, databaseName string, kafkaDatabase cmfsdk.KafkaDatabase) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaDatabase(ctx, catalogName, databaseName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.json b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json new file mode 100644 index 0000000000..62292591c4 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-successful.json b/test/fixtures/input/flink/catalog/database/update-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index a566cc318d..1020cc1a21 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -7,6 +7,7 @@ Available Commands: create Create a Flink database. describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. + update Update a Flink database. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/catalog/database/update-help-onprem.golden b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden new file mode 100644 index 0000000000..f5c10ad850 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden @@ -0,0 +1,17 @@ +Update a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database update [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden new file mode 100644 index 0000000000..16726790eb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/update-success-json.golden b/test/fixtures/output/flink/catalog/database/update-success-json.golden new file mode 100644 index 0000000000..fb8c835dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden new file mode 100644 index 0000000000..a45817dbbb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-08-05 12:00:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/update-success.golden b/test/fixtures/output/flink/catalog/database/update-success.golden new file mode 100644 index 0000000000..96b1543c77 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-08-05 12:00:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 9072b9b8b8..f6f1c60415 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -447,6 +447,19 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 63fc4b3889..27195bb74d 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -304,6 +304,14 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { err := json.NewEncoder(w).Encode(database) require.NoError(t, err) return + case http.MethodPut: + if dbName == "invalid-database" { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusOK) + return default: require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) } From 25d8693b82695fdad5fba70872291c8ee9f0dc22 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Wed, 1 Apr 2026 20:13:39 +0530 Subject: [PATCH 11/16] Add flink catalog database update command --- internal/flink/command_catalog_database.go | 1 + .../flink/command_catalog_database_delete.go | 53 +++++++++++++++++++ pkg/flink/cmf_rest_client.go | 8 +++ pkg/resource/resource.go | 1 + .../database/delete-help-onprem.golden | 17 ++++++ .../database/delete-non-exist-failure.golden | 1 + .../database/delete-single-force.golden | 1 + .../database/delete-single-successful.golden | 1 + .../flink/catalog/database/help-onprem.golden | 1 + test/flink_onprem_test.go | 12 +++++ test/test-server/flink_onprem_handler.go | 15 ++++-- 11 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 internal/flink/command_catalog_database_delete.go create mode 100644 test/fixtures/output/flink/catalog/database/delete-help-onprem.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-single-force.golden create mode 100644 test/fixtures/output/flink/catalog/database/delete-single-successful.golden diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 80dfd0bd62..01327f68b8 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -22,6 +22,7 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { } cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDeleteCommand()) cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) cmd.AddCommand(c.newCatalogDatabaseListCommand()) cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go new file mode 100644 index 0000000000..8c161f6040 --- /dev/null +++ b/internal/flink/command_catalog_database_delete.go @@ -0,0 +1,53 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Short: "Delete a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDelete, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddForceFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + existenceFunc := func(name string) bool { + _, err := client.DescribeDatabase(c.createContext(), catalogName, name) + return err == nil + } + + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { + return err + } + + deleteFunc := func(name string) error { + return client.DeleteDatabase(c.createContext(), catalogName, name) + } + + _, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase) + return err +} diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index f632682266..05b3ce809b 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,14 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) DeleteDatabase(ctx context.Context, catalogName, databaseName string) error { + httpResp, err := cmfClient.SQLApi.DeleteKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResp, err); parsedErr != nil { + return fmt.Errorf(`failed to delete database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { databaseName := kafkaDatabase.Metadata.Name outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index a56592ddd9..bd50ce6853 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -45,6 +45,7 @@ const ( FlinkDetachedSavepoint = "Flink detached savepoint" FlinkApplication = "Flink application" FlinkCatalog = "Flink catalog" + FlinkDatabase = "Flink database" FlinkEnvironment = "Flink environment" FlinkRegion = "Flink region" FlinkEndpoint = "Flink endpoint" diff --git a/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden new file mode 100644 index 0000000000..91228009b3 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden @@ -0,0 +1,17 @@ +Delete a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database delete [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + --force Skip the deletion confirmation prompt. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden new file mode 100644 index 0000000000..b6cedfb005 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "non-exist-database"? (y/n): Error: failed to delete non-exist-database: failed to delete database "non-exist-database" in catalog "test-catalog": 404 Not Found diff --git a/test/fixtures/output/flink/catalog/database/delete-single-force.golden b/test/fixtures/output/flink/catalog/database/delete-single-force.golden new file mode 100644 index 0000000000..3517fbbf14 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-force.golden @@ -0,0 +1 @@ +Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/delete-single-successful.golden b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden new file mode 100644 index 0000000000..ecc2f170b7 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "test-database-1"? (y/n): Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden index 1020cc1a21..49da534a01 100644 --- a/test/fixtures/output/flink/catalog/database/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink database. + delete Delete a Flink database in Confluent Platform. describe Describe a Flink database in Confluent Platform. list List Flink databases in a catalog in Confluent Platform. update Update a Flink database. diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index f6f1c60415..b1655b19d2 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -423,6 +423,18 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseDeleteOnPrem() { + tests := []CLITest{ + // success scenarios + {args: "flink catalog database delete test-database-1 --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-single-successful.golden"}, + {args: "flink catalog database delete test-database-1 --catalog test-catalog --force", fixture: "flink/catalog/database/delete-single-force.golden"}, + // failure scenarios + {args: "flink catalog database delete non-exist-database --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-non-exist-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 27195bb74d..ebc7f3da92 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -16,6 +16,8 @@ import ( cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" ) +const invalidDatabaseName = "invalid-database" + // Helper function to create a Flink application. func createApplication(name string) cmfsdk.FlinkApplication { status := map[string]interface{}{ @@ -270,7 +272,7 @@ func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { dbName := database.GetMetadata().Name - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) return } @@ -295,7 +297,7 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { switch r.Method { case http.MethodGet: - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The database name is invalid", http.StatusNotFound) return } @@ -305,11 +307,18 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { require.NoError(t, err) return case http.MethodPut: - if dbName == "invalid-database" { + if dbName == invalidDatabaseName { http.Error(w, "The database name is invalid", http.StatusNotFound) return } + w.WriteHeader(http.StatusOK) + return + case http.MethodDelete: + if dbName == "non-exist-database" { + http.Error(w, "", http.StatusNotFound) + return + } w.WriteHeader(http.StatusOK) return default: From c95ac55c7d54de55fb5f3013f58b5f8f92771b8a Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Thu, 2 Apr 2026 11:24:26 +0530 Subject: [PATCH 12/16] Copilot suggested fixes --- internal/flink/command_catalog_database.go | 40 +++++++++++++++++++ .../flink/command_catalog_database_create.go | 38 +----------------- .../flink/command_catalog_database_delete.go | 7 +++- .../flink/command_catalog_database_update.go | 38 +----------------- 4 files changed, 48 insertions(+), 75 deletions(-) diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 01327f68b8..aa54e69a3c 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -1,11 +1,18 @@ package flink import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" ) type databaseOut struct { @@ -30,6 +37,39 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { return cmd } +func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %v", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + return sdkDatabase, nil +} + func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { return LocalKafkaDatabase{ ApiVersion: sdkDatabase.ApiVersion, diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go index 4bd6dd3972..212606f2fa 100644 --- a/internal/flink/command_catalog_database_create.go +++ b/internal/flink/command_catalog_database_create.go @@ -1,18 +1,9 @@ package flink import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "github.com/spf13/cobra" - "gopkg.in/yaml.v3" - - cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/output" ) @@ -46,38 +37,11 @@ func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error return err } - data, err := os.ReadFile(resourceFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %v", err) - } - - var genericData map[string]interface{} - ext := filepath.Ext(resourceFilePath) - switch ext { - case ".json": - err = json.Unmarshal(data, &genericData) - case ".yaml", ".yml": - err = yaml.Unmarshal(data, &genericData) - default: - return errors.NewErrorWithSuggestions( - fmt.Sprintf("unsupported file format: %s", ext), - "Supported file formats are .json, .yaml, and .yml.", - ) - } + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) if err != nil { return err } - jsonBytes, err := json.Marshal(genericData) - if err != nil { - return fmt.Errorf("failed to marshal intermediate data: %w", err) - } - - var sdkDatabase cmfsdk.KafkaDatabase - if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { - return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) - } - sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) if err != nil { return err diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go index 8c161f6040..a836b38e46 100644 --- a/internal/flink/command_catalog_database_delete.go +++ b/internal/flink/command_catalog_database_delete.go @@ -5,6 +5,7 @@ import ( pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/resource" ) @@ -41,7 +42,11 @@ func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error } if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { - return err + // We are validating only the existence of the resources (there is no prefix validation). + // Thus, we can add some extra context for the error. + suggestions := "List available Flink databases with `confluent flink catalog database list`." + suggestions += "\nCheck that CMF is running and accessible." + return errors.NewErrorWithSuggestions(err.Error(), suggestions) } deleteFunc := func(name string) error { diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 64a7202ac1..293420b881 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -1,18 +1,9 @@ package flink import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "github.com/spf13/cobra" - "gopkg.in/yaml.v3" - - cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/output" ) @@ -46,38 +37,11 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error return err } - data, err := os.ReadFile(resourceFilePath) - if err != nil { - return fmt.Errorf("failed to read file: %v", err) - } - - var genericData map[string]interface{} - ext := filepath.Ext(resourceFilePath) - switch ext { - case ".json": - err = json.Unmarshal(data, &genericData) - case ".yaml", ".yml": - err = yaml.Unmarshal(data, &genericData) - default: - return errors.NewErrorWithSuggestions( - fmt.Sprintf("unsupported file format: %s", ext), - "Supported file formats are .json, .yaml, and .yml.", - ) - } + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) if err != nil { return err } - jsonBytes, err := json.Marshal(genericData) - if err != nil { - return fmt.Errorf("failed to marshal intermediate data: %w", err) - } - - var sdkDatabase cmfsdk.KafkaDatabase - if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { - return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) - } - databaseName := sdkDatabase.Metadata.Name if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { From 021cf0170c72eac90ede7ce3f0950c06aa1913ae Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Fri, 3 Apr 2026 08:53:52 +0530 Subject: [PATCH 13/16] Add claude suggested fixes --- internal/flink/command_catalog_database.go | 22 ++++++++++- .../flink/command_catalog_database_create.go | 18 +-------- .../command_catalog_database_describe.go | 18 +-------- .../flink/command_catalog_database_update.go | 18 +-------- .../database/create-invalid-failure.yaml | 7 ++++ .../catalog/database/create-successful.yaml | 8 ++++ .../database/update-invalid-failure.yaml | 8 ++++ .../catalog/database/update-successful.yaml | 8 ++++ test/flink_onprem_test.go | 38 +++++++++++++++++++ test/test-server/flink_onprem_handler.go | 5 +++ 10 files changed, 99 insertions(+), 51 deletions(-) create mode 100644 test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml create mode 100644 test/fixtures/input/flink/catalog/database/create-successful.yaml create mode 100644 test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml create mode 100644 test/fixtures/input/flink/catalog/database/update-successful.yaml diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index aa54e69a3c..11a77fb6a2 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -13,6 +13,7 @@ import ( pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" ) type databaseOut struct { @@ -37,10 +38,29 @@ func (c *command) newCatalogDatabaseCommand() *cobra.Command { return cmd } +func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase) + return output.SerializedOutput(cmd, localDatabase) +} + func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { data, err := os.ReadFile(resourceFilePath) if err != nil { - return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %v", err) + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err) } var genericData map[string]interface{} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go index 212606f2fa..dbae647b6b 100644 --- a/internal/flink/command_catalog_database_create.go +++ b/internal/flink/command_catalog_database_create.go @@ -4,7 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { @@ -47,20 +46,5 @@ func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go index f6598d5b22..24b9952f8a 100644 --- a/internal/flink/command_catalog_database_describe.go +++ b/internal/flink/command_catalog_database_describe.go @@ -4,7 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { @@ -41,20 +40,5 @@ func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) err return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 293420b881..12646a9b66 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -50,23 +50,9 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) if err != nil { + output.ErrPrintf(c.Config.EnableColor, "Update request for database \"%s\" in catalog \"%s\" succeeded, but failed to retrieve updated details: %v\n", databaseName, catalogName, err) return err } - if output.GetFormat(cmd) == output.Human { - table := output.NewTable(cmd) - var creationTime string - if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil { - creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp - } - table.Add(&databaseOut{ - CreationTime: creationTime, - Name: sdkOutputDatabase.GetMetadata().Name, - Catalog: catalogName, - }) - return table.Print() - } - - localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase) - return output.SerializedOutput(cmd, localDatabase) + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) } diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml new file mode 100644 index 0000000000..ea5bb88064 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml @@ -0,0 +1,7 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: {} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.yaml b/test/fixtures/input/flink/catalog/database/create-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml new file mode 100644 index 0000000000..93e62665e9 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-successful.yaml b/test/fixtures/input/flink/catalog/database/update-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index b1655b19d2..632bcf7e82 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -472,6 +472,44 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { + tests := []CLITest{ + // success scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure scenarios with JSON files + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + // YAML file tests + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // YAML file failure scenarios + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index ebc7f3da92..6a43e51125 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -312,6 +312,11 @@ func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { return } + // Read and validate the request body. + req := new(cmfsdk.KafkaDatabase) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + w.WriteHeader(http.StatusOK) return case http.MethodDelete: From 977c3c9610b7270cc5ea5845a078c74e5aef12ee Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Fri, 3 Apr 2026 12:38:35 +0530 Subject: [PATCH 14/16] Fix the update method --- internal/flink/command_catalog_database_update.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go index 12646a9b66..8411a8b5cd 100644 --- a/internal/flink/command_catalog_database_update.go +++ b/internal/flink/command_catalog_database_update.go @@ -1,10 +1,11 @@ package flink import ( + "fmt" + "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" - "github.com/confluentinc/cli/v4/pkg/output" ) func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { @@ -50,8 +51,7 @@ func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) if err != nil { - output.ErrPrintf(c.Config.EnableColor, "Update request for database \"%s\" in catalog \"%s\" succeeded, but failed to retrieve updated details: %v\n", databaseName, catalogName, err) - return err + return fmt.Errorf("database %q was updated successfully, but failed to retrieve updated details: %w", databaseName, err) } return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) From 30867349a3fdaac9f92febb4208130940a03095d Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Wed, 13 May 2026 11:48:30 +0530 Subject: [PATCH 15/16] [CF-1870] Address Channing's review comments - Use RFC3339 for mock KafkaDatabase timestamp so it matches the prod response shape; regenerate the affected describe/list/update goldens. - Rename TestFlinkCatalogDatabase{Create,Update}WithYAML to include OnPrem and drop the duplicated JSON cases (already covered by the non-YAML tests). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../database/describe-success-json.golden | 2 +- .../database/describe-success-yaml.golden | 2 +- .../catalog/database/describe-success.golden | 10 ++++---- .../catalog/database/list-success-json.golden | 4 ++-- .../catalog/database/list-success-yaml.golden | 4 ++-- .../catalog/database/list-success.golden | 8 +++---- .../database/update-success-json.golden | 2 +- .../database/update-success-yaml.golden | 2 +- .../catalog/database/update-success.golden | 10 ++++---- test/flink_onprem_test.go | 24 +++++-------------- test/test-server/flink_onprem_handler.go | 2 +- 11 files changed, 29 insertions(+), 41 deletions(-) diff --git a/test/fixtures/output/flink/catalog/database/describe-success-json.golden b/test/fixtures/output/flink/catalog/database/describe-success-json.golden index fb8c835dc6..87b3be9876 100644 --- a/test/fixtures/output/flink/catalog/database/describe-success-json.golden +++ b/test/fixtures/output/flink/catalog/database/describe-success-json.golden @@ -3,7 +3,7 @@ "kind": "KafkaDatabase", "metadata": { "name": "test-database", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "kafkaCluster": { diff --git a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden index a45817dbbb..f0878e9e53 100644 --- a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden +++ b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden @@ -2,7 +2,7 @@ apiVersion: cmf/api/v1/database kind: KafkaDatabase metadata: name: test-database - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: kafkaCluster: connectionConfig: diff --git a/test/fixtures/output/flink/catalog/database/describe-success.golden b/test/fixtures/output/flink/catalog/database/describe-success.golden index 96b1543c77..3702c544e4 100644 --- a/test/fixtures/output/flink/catalog/database/describe-success.golden +++ b/test/fixtures/output/flink/catalog/database/describe-success.golden @@ -1,5 +1,5 @@ -+---------------+-------------------------------+ -| Creation Time | 2025-08-05 12:00:00 +0000 UTC | -| Name | test-database | -| Catalog | test-catalog | -+---------------+-------------------------------+ ++---------------+----------------------+ +| Creation Time | 2025-08-05T12:00:00Z | +| Name | test-database | +| Catalog | test-catalog | ++---------------+----------------------+ diff --git a/test/fixtures/output/flink/catalog/database/list-success-json.golden b/test/fixtures/output/flink/catalog/database/list-success-json.golden index 42e40f7379..36349dc6d4 100644 --- a/test/fixtures/output/flink/catalog/database/list-success-json.golden +++ b/test/fixtures/output/flink/catalog/database/list-success-json.golden @@ -4,7 +4,7 @@ "kind": "KafkaDatabase", "metadata": { "name": "test-database-1", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "kafkaCluster": { @@ -19,7 +19,7 @@ "kind": "KafkaDatabase", "metadata": { "name": "test-database-2", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "kafkaCluster": { diff --git a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden index 186a556114..c421dc678f 100644 --- a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden +++ b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden @@ -2,7 +2,7 @@ kind: KafkaDatabase metadata: name: test-database-1 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: kafkaCluster: connectionConfig: @@ -11,7 +11,7 @@ kind: KafkaDatabase metadata: name: test-database-2 - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: kafkaCluster: connectionConfig: diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden index 983bc1d5df..38bfed9632 100644 --- a/test/fixtures/output/flink/catalog/database/list-success.golden +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -1,4 +1,4 @@ - Creation Time | Name | Catalog ---------------------------------+-----------------+--------------- - 2025-08-05 12:00:00 +0000 UTC | test-database-1 | test-catalog - 2025-08-05 12:00:00 +0000 UTC | test-database-2 | test-catalog + Creation Time | Name | Catalog +-----------------------+-----------------+--------------- + 2025-08-05T12:00:00Z | test-database-1 | test-catalog + 2025-08-05T12:00:00Z | test-database-2 | test-catalog diff --git a/test/fixtures/output/flink/catalog/database/update-success-json.golden b/test/fixtures/output/flink/catalog/database/update-success-json.golden index fb8c835dc6..87b3be9876 100644 --- a/test/fixtures/output/flink/catalog/database/update-success-json.golden +++ b/test/fixtures/output/flink/catalog/database/update-success-json.golden @@ -3,7 +3,7 @@ "kind": "KafkaDatabase", "metadata": { "name": "test-database", - "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" + "creationTimestamp": "2025-08-05T12:00:00Z" }, "spec": { "kafkaCluster": { diff --git a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden index a45817dbbb..f0878e9e53 100644 --- a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden +++ b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden @@ -2,7 +2,7 @@ apiVersion: cmf/api/v1/database kind: KafkaDatabase metadata: name: test-database - creationTimestamp: 2025-08-05 12:00:00 +0000 UTC + creationTimestamp: "2025-08-05T12:00:00Z" spec: kafkaCluster: connectionConfig: diff --git a/test/fixtures/output/flink/catalog/database/update-success.golden b/test/fixtures/output/flink/catalog/database/update-success.golden index 96b1543c77..3702c544e4 100644 --- a/test/fixtures/output/flink/catalog/database/update-success.golden +++ b/test/fixtures/output/flink/catalog/database/update-success.golden @@ -1,5 +1,5 @@ -+---------------+-------------------------------+ -| Creation Time | 2025-08-05 12:00:00 +0000 UTC | -| Name | test-database | -| Catalog | test-catalog | -+---------------+-------------------------------+ ++---------------+----------------------+ +| Creation Time | 2025-08-05T12:00:00Z | +| Name | test-database | +| Catalog | test-catalog | ++---------------+----------------------+ diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 632bcf7e82..55b0583a6c 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -472,38 +472,26 @@ func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } -func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPremWithYAML() { tests := []CLITest{ - // success scenarios with JSON files - {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, - {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, - {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, - // failure scenarios with JSON files - {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, - // YAML file tests + // success {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, - // YAML file failure scenarios + // failure {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, } runIntegrationTestsWithMultipleAuth(s, tests) } -func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPremWithYAML() { tests := []CLITest{ - // success scenarios with JSON files - {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, - {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, - {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, - // failure scenarios with JSON files - {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, - // YAML file tests + // success {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, - // YAML file failure scenarios + // failure {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, } diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 6a43e51125..1c03ab8a61 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -227,7 +227,7 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { - timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).Format(time.RFC3339) return cmfsdk.KafkaDatabase{ ApiVersion: "cmf/api/v1/database", Kind: "KafkaDatabase", From 030c098d84557e6a6b418c6ef98bd6642e559320 Mon Sep 17 00:00:00 2001 From: Paras Negi Date: Wed, 13 May 2026 12:12:00 +0530 Subject: [PATCH 16/16] [CF-1870] Rename AlterEnvironments to DdlEnvironments cmf-sdk-go v0.0.7 renamed the KafkaDatabaseSpec field. Pick up the new name in both the SDK use site and the local mirror struct. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.sum | 26 ---------------------- internal/flink/command_catalog_database.go | 2 +- internal/flink/local_types.go | 4 ++-- 3 files changed, 3 insertions(+), 29 deletions(-) diff --git a/go.sum b/go.sum index 2b59513408..91f1821038 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,6 @@ github.com/containerd/typeurl/v2 v2.1.1/go.mod h1:IDp2JFvbwZ31H8dQbEIY7sDl2L3o3H github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/cyphar/filepath-securejoin v0.4.1 h1:JyxxyPEaktOD+GAnqIqTf9A8tHyAG22rowi7HkoSU1s= -github.com/cyphar/filepath-securejoin v0.4.1/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE= github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -376,14 +374,10 @@ github.com/gliderlabs/ssh v0.3.8 h1:a4YXD1V7xMF9g5nTkdfnja3Sxy1PVDCj1Zg4Wb8vY6c= github.com/gliderlabs/ssh v0.3.8/go.mod h1:xYoytBv1sV0aL3CavoDuJIQNURXkkfPA/wxQ1pL1fAU= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= -github.com/go-git/go-billy/v5 v5.8.0 h1:I8hjc3LbBlXTtVuFNJuwYuMiHvQJDq1AT6u4DwDzZG0= -github.com/go-git/go-billy/v5 v5.8.0/go.mod h1:RpvI/rw4Vr5QA+Z60c6d6LXH0rYJo0uD5SqfmrrheCY= github.com/go-git/go-billy/v5 v5.9.0 h1:jItGXszUDRtR/AlferWPTMN4j38BQ88XnXKbilmmBPA= github.com/go-git/go-billy/v5 v5.9.0/go.mod h1:jCnQMLj9eUgGU7+ludSTYoZL/GGmii14RxKFj7ROgHw= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= -github.com/go-git/go-git/v5 v5.18.0 h1:O831KI+0PR51hM2kep6T8k+w0/LIAD490gvqMCvL5hM= -github.com/go-git/go-git/v5 v5.18.0/go.mod h1:pW/VmeqkanRFqR6AljLcs7EA7FbZaN5MQqO7oZADXpo= github.com/go-git/go-git/v5 v5.19.0 h1:+WkVUQZSy/F1Gb13udrMKjIM2PrzsNfDKFSfo5tkMtc= github.com/go-git/go-git/v5 v5.19.0/go.mod h1:Pb1v0c7/g8aGQJwx9Us09W85yGoyvSwuhEGMH7zjDKQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -715,8 +709,6 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= -github.com/pjbgf/sha1cd v0.3.2 h1:a9wb0bp1oC2TGwStyn0Umc/IGKQnEgF0vVaZ8QF8eo4= -github.com/pjbgf/sha1cd v0.3.2/go.mod h1:zQWigSxVmsHEZow5qaLtPYxpcKMMQpa09ixqBxuCS6A= github.com/pjbgf/sha1cd v0.6.0 h1:3WJ8Wz8gvDz29quX1OcEmkAlUg9diU4GxJHqs0/XiwU= github.com/pjbgf/sha1cd v0.6.0/go.mod h1:lhpGlyHLpQZoxMv8HcgXvZEhcGs0PG/vsZnEJ7H0iCM= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= @@ -946,8 +938,6 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= -golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -960,8 +950,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f h1:W3F4c+6OLc6H2lb//N1q4WpJkhzJCK5J6kUi1NTVXfM= golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f/go.mod h1:J1xhfL/vlindoeF/aINzNzt2Bket5bjo9sdOYzOsU80= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -990,8 +978,6 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= -golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1032,8 +1018,6 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= -golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1061,8 +1045,6 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1125,8 +1107,6 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1134,8 +1114,6 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= -golang.org/x/term v0.38.0/go.mod h1:bSEAKrOT1W+VSu9TSCMtoGEOUcKxOKgl3LE5QEF/xVg= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1149,8 +1127,6 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1212,8 +1188,6 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= -golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c= golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go index 11a77fb6a2..605f875b90 100644 --- a/internal/flink/command_catalog_database.go +++ b/internal/flink/command_catalog_database.go @@ -107,7 +107,7 @@ func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKa ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig, ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId, }, - AlterEnvironments: sdkDatabase.Spec.AlterEnvironments, + DdlEnvironments: sdkDatabase.Spec.DdlEnvironments, }, } } diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 6342966129..1c6e52b47f 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -159,8 +159,8 @@ type LocalDatabaseMetadata struct { } type LocalKafkaDatabaseSpec struct { - KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` - AlterEnvironments *[]string `json:"alterEnvironments,omitempty" yaml:"alterEnvironments,omitempty"` + KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` + DdlEnvironments *[]string `json:"ddlEnvironments,omitempty" yaml:"ddlEnvironments,omitempty"` } type LocalKafkaDatabaseSpecKafkaCluster struct {