Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command {
cmd.AddCommand(c.newCatalogDeleteCommand())
cmd.AddCommand(c.newCatalogDescribeCommand())
cmd.AddCommand(c.newCatalogListCommand())
cmd.AddCommand(c.newCatalogDatabaseCommand())

return cmd
}
Expand Down
50 changes: 50 additions & 0 deletions internal/flink/command_catalog_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flink

import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"
Comment thread
channingdong marked this conversation as resolved.

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

type databaseOut struct {
CreationTime string `human:"Creation Time" serialized:"creation_time"`
Name string `human:"Name" serialized:"name"`
Catalog string `human:"Catalog" serialized:"catalog"`
}

func (c *command) newCatalogDatabaseCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "database",
Short: "Manage Flink databases in Confluent Platform.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

Comment thread
paras-negi-flink marked this conversation as resolved.
cmd.AddCommand(c.newCatalogDatabaseCreateCommand())
cmd.AddCommand(c.newCatalogDatabaseListCommand())

return cmd
}

func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase {
return LocalKafkaDatabase{
ApiVersion: sdkDatabase.ApiVersion,
Kind: sdkDatabase.Kind,
Metadata: LocalDatabaseMetadata{
Name: sdkDatabase.Metadata.Name,
CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp,
UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp,
Uid: sdkDatabase.Metadata.Uid,
Labels: sdkDatabase.Metadata.Labels,
Annotations: sdkDatabase.Metadata.Annotations,
},
Spec: LocalKafkaDatabaseSpec{
KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{
ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig,
ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId,
},
AlterEnvironments: sdkDatabase.Spec.AlterEnvironments,
},
}
}
102 changes: 102 additions & 0 deletions internal/flink/command_catalog_database_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <resourceFilePath>",
Short: "Create a Flink database.",
Comment thread
channingdong marked this conversation as resolved.
Long: "Create a Flink database in a catalog in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseCreate,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

data, err := os.ReadFile(resourceFilePath)
if err != nil {
return fmt.Errorf("failed to read file: %v", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return errors.NewErrorWithSuggestions(
fmt.Sprintf("unsupported file format: %s", ext),
"Supported file formats are .json, .yaml, and .yml.",
)
}
if err != nil {
return err
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkDatabase cmfsdk.KafkaDatabase
if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil {
return fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err)
}
Comment thread
paras-negi-flink marked this conversation as resolved.
Outdated

sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
var creationTime string
if sdkOutputDatabase.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkOutputDatabase.GetMetadata().CreationTimestamp
}
table.Add(&databaseOut{
CreationTime: creationTime,
Name: sdkOutputDatabase.GetMetadata().Name,
Catalog: catalogName,
})
return table.Print()
}

localDatabase := convertSdkDatabaseToLocalDatabase(sdkOutputDatabase)
return output.SerializedOutput(cmd, localDatabase)
}
64 changes: 64 additions & 0 deletions internal/flink/command_catalog_database_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDatabaseListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink databases in a catalog in Confluent Platform.",
Args: cobra.NoArgs,
RunE: c.catalogDatabaseList,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, db := range sdkDatabases {
var creationTime string
if db.GetMetadata().CreationTimestamp != nil {
creationTime = *db.GetMetadata().CreationTimestamp
}
list.Add(&databaseOut{
CreationTime: creationTime,
Name: db.GetMetadata().Name,
Catalog: catalogName,
})
}
return list.Print()
}

localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases))
for _, sdkDatabase := range sdkDatabases {
localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase))
}

return output.SerializedOutput(cmd, localDatabases)
}
26 changes: 26 additions & 0 deletions internal/flink/local_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,32 @@ type LocalKafkaCatalogSpecSrInstance struct {
ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"`
}

type LocalKafkaDatabase struct {
ApiVersion string `json:"apiVersion" yaml:"apiVersion"`
Kind string `json:"kind" yaml:"kind"`
Metadata LocalDatabaseMetadata `json:"metadata" yaml:"metadata"`
Spec LocalKafkaDatabaseSpec `json:"spec" yaml:"spec"`
}

type LocalDatabaseMetadata struct {
Name string `json:"name" yaml:"name"`
CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"`
UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"`
Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"`
Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}

type LocalKafkaDatabaseSpec struct {
KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"`
AlterEnvironments *[]string `json:"alterEnvironments,omitempty" yaml:"alterEnvironments,omitempty"`
}

type LocalKafkaDatabaseSpecKafkaCluster struct {
ConnectionConfig map[string]string `json:"connectionConfig" yaml:"connectionConfig"`
ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"`
}

type LocalResultSchema struct {
Columns []LocalResultSchemaColumn `json:"columns" yaml:"columns"`
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,33 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s
return parseSdkError(httpResp, err)
}

func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) {
databaseName := kafkaDatabase.Metadata.Name
outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to create database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr)
}
return outputDatabase, nil
}

func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) {
databases := make([]cmfsdk.KafkaDatabase, 0)
done := false
const pageSize = 100
var currentPageNumber int32 = 0

for !done {
databasePage, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabases(ctx, catalogName).Page(currentPageNumber).Size(pageSize).Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return nil, fmt.Errorf(`failed to list databases in catalog "%s": %s`, catalogName, parsedErr)
}
databases = append(databases, databasePage.GetItems()...)
currentPageNumber, done = extractPageOptions(len(databasePage.GetItems()), currentPageNumber)
}

return databases, nil
}

// Returns the next page number and whether we need to fetch more pages or not.
func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) {
if receivedItemsLength == 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"apiVersion": "cmf/api/v1/database",
"kind": "KafkaDatabase",
"metadata": {
"name": "invalid-database"
},
"spec": {
"kafkaCluster": {
"connectionConfig": {}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"apiVersion": "cmf/api/v1/database",
"kind": "KafkaDatabase",
"metadata": {
"name": "test-database"
},
"spec": {
"kafkaCluster": {
"connectionConfig": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Create a Flink database in a catalog in Confluent Platform.

Usage:
confluent flink catalog database create <resourceFilePath> [flags]

Flags:
--catalog string REQUIRED: Name of the catalog.
--url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag.
--client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag.
--client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: failed to create database "invalid-database" in catalog "test-catalog": The Kafka database object from resource file is invalid
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"apiVersion": "cmf/api/v1/database",
"kind": "KafkaDatabase",
"metadata": {
"name": "test-database",
"creationTimestamp": "2025-03-12 23:42:00 +0000 UTC"
},
"spec": {
"kafkaCluster": {
"connectionConfig": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: cmf/api/v1/database
kind: KafkaDatabase
metadata:
name: test-database
creationTimestamp: 2025-03-12 23:42:00 +0000 UTC
spec:
kafkaCluster:
connectionConfig:
bootstrap.servers: localhost:9092
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
+---------------+-------------------------------+
| Creation Time | 2025-03-12 23:42:00 +0000 UTC |
| Name | test-database |
| Catalog | test-catalog |
+---------------+-------------------------------+
15 changes: 15 additions & 0 deletions test/fixtures/output/flink/catalog/database/help-onprem.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Manage Flink databases in Confluent Platform.

Usage:
confluent flink catalog database [command]

Available Commands:
create Create a Flink database.
list List Flink databases in a catalog in Confluent Platform.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent flink catalog database [command] --help" for more information about a command.
Loading
Loading