Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command {
cmd.AddCommand(c.newCatalogDeleteCommand())
cmd.AddCommand(c.newCatalogDescribeCommand())
cmd.AddCommand(c.newCatalogListCommand())
cmd.AddCommand(c.newCatalogDatabaseCommand())

return cmd
}
Expand Down
113 changes: 113 additions & 0 deletions internal/flink/command_catalog_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"
Comment thread
channingdong marked this conversation as resolved.

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

type databaseOut struct {
CreationTime string `human:"Creation Time" serialized:"creation_time"`
Name string `human:"Name" serialized:"name"`
Catalog string `human:"Catalog" serialized:"catalog"`
}

func (c *command) newCatalogDatabaseCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "database",
Short: "Manage Flink databases in Confluent Platform.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

Comment thread
paras-negi-flink marked this conversation as resolved.
cmd.AddCommand(c.newCatalogDatabaseCreateCommand())
cmd.AddCommand(c.newCatalogDatabaseDeleteCommand())
cmd.AddCommand(c.newCatalogDatabaseDescribeCommand())
cmd.AddCommand(c.newCatalogDatabaseListCommand())
cmd.AddCommand(c.newCatalogDatabaseUpdateCommand())

return cmd
}

func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error {
if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
var creationTime string
if sdkDatabase.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkDatabase.GetMetadata().CreationTimestamp
}
table.Add(&databaseOut{
CreationTime: creationTime,
Name: sdkDatabase.GetMetadata().Name,
Catalog: catalogName,
})
Comment on lines +48 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be a concern if human mode has such a difference compared to json or yml mode through convertSdkDatabaseToLocalDatabase()?

Copy link
Copy Markdown
Contributor

@channingdong Channing Dong (channingdong) May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not responded yet, the concern here is, is it ok for customers to see different set of fields in json, yaml and human mode?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional and matches every CMF on-prem command in - catalog, savepoint, application, compute_pool . The shape mirrors kubectl get (compact columns) vs kubectl get -o yaml (full K8s resource), which fits since CMF resources are Kubernetes-style.

return table.Print()
}

localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase)
return output.SerializedOutput(cmd, localDatabase)
}

func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) {
data, err := os.ReadFile(resourceFilePath)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err)
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkDatabase cmfsdk.KafkaDatabase
if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err)
}

return sdkDatabase, nil
}

func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase {
return LocalKafkaDatabase{
ApiVersion: sdkDatabase.ApiVersion,
Kind: sdkDatabase.Kind,
Metadata: LocalDatabaseMetadata{
Name: sdkDatabase.Metadata.Name,
CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp,
UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp,
Uid: sdkDatabase.Metadata.Uid,
Labels: sdkDatabase.Metadata.Labels,
Annotations: sdkDatabase.Metadata.Annotations,
},
Spec: LocalKafkaDatabaseSpec{
KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{
ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig,
ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId,
},
AlterEnvironments: sdkDatabase.Spec.AlterEnvironments,
},
}
}
50 changes: 50 additions & 0 deletions internal/flink/command_catalog_database_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <resourceFilePath>",
Short: "Create a Flink database.",
Comment thread
channingdong marked this conversation as resolved.
Long: "Create a Flink database in a catalog in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseCreate,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabase, err := readDatabaseResourceFile(resourceFilePath)
if err != nil {
return err
}

sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
58 changes: 58 additions & 0 deletions internal/flink/command_catalog_database_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/deletion"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/resource"
)

func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "delete <name>",
Short: "Delete a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDelete,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddForceFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

existenceFunc := func(name string) bool {
_, err := client.DescribeDatabase(c.createContext(), catalogName, name)
return err == nil
}

if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil {
// We are validating only the existence of the resources (there is no prefix validation).
// Thus, we can add some extra context for the error.
suggestions := "List available Flink databases with `confluent flink catalog database list`."
Comment thread
paras-negi-flink marked this conversation as resolved.
Comment thread
channingdong marked this conversation as resolved.
suggestions += "\nCheck that CMF is running and accessible."
return errors.NewErrorWithSuggestions(err.Error(), suggestions)
}
Comment thread
paras-negi-flink marked this conversation as resolved.

deleteFunc := func(name string) error {
return client.DeleteDatabase(c.createContext(), catalogName, name)
}

_, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase)
return err
}
44 changes: 44 additions & 0 deletions internal/flink/command_catalog_database_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDescribe,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error {
name := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
64 changes: 64 additions & 0 deletions internal/flink/command_catalog_database_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDatabaseListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink databases in a catalog in Confluent Platform.",
Args: cobra.NoArgs,
RunE: c.catalogDatabaseList,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, db := range sdkDatabases {
var creationTime string
if db.GetMetadata().CreationTimestamp != nil {
creationTime = *db.GetMetadata().CreationTimestamp
}
list.Add(&databaseOut{
CreationTime: creationTime,
Name: db.GetMetadata().Name,
Catalog: catalogName,
})
}
return list.Print()
}

localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases))
for _, sdkDatabase := range sdkDatabases {
localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase))
}

return output.SerializedOutput(cmd, localDatabases)
}
Loading