-
Notifications
You must be signed in to change notification settings - Fork 29
CF-1871 : Add Catalog Update Command #3304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,54 @@ | ||||||
| package flink | ||||||
|
|
||||||
| import ( | ||||||
| "fmt" | ||||||
|
|
||||||
| "github.com/spf13/cobra" | ||||||
|
|
||||||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||||||
| ) | ||||||
|
|
||||||
| func (c *command) newCatalogUpdateCommand() *cobra.Command { | ||||||
| cmd := &cobra.Command{ | ||||||
| Use: "update <resourceFilePath>", | ||||||
| Short: "Update a Flink catalog.", | ||||||
|
paras-negi-flink marked this conversation as resolved.
Outdated
|
||||||
| Long: "Update an existing Kafka Catalog in Confluent Platform from a resource file.", | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to keep it consistent.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And by the way, is there a convention for the user to know which fields are mutable from an update operation? |
||||||
| Args: cobra.ExactArgs(1), | ||||||
| RunE: c.catalogUpdate, | ||||||
| } | ||||||
|
|
||||||
| addCmfFlagSet(cmd) | ||||||
| pcmd.AddOutputFlag(cmd) | ||||||
|
|
||||||
| return cmd | ||||||
| } | ||||||
|
|
||||||
| func (c *command) catalogUpdate(cmd *cobra.Command, args []string) error { | ||||||
| resourceFilePath := args[0] | ||||||
|
|
||||||
| client, err := c.GetCmfClient(cmd) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| sdkCatalog, err := readCatalogResourceFile(resourceFilePath) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| catalogName := sdkCatalog.Metadata.Name | ||||||
| if catalogName == "" { | ||||||
| return fmt.Errorf("catalog name is required: ensure the resource file contains a non-empty \"metadata.name\" field") | ||||||
| } | ||||||
|
|
||||||
| if err := client.UpdateCatalog(c.createContext(), catalogName, sdkCatalog); err != nil { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as the catalog database, is it the design that Update command doesn't return the updated catalog to make this an atomic operation? I am seeing an additional describe call below.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes thats correct. UpdateKafkaCatalogExecute returns (*http.Response, error) only — no body. The follow-up DescribeCatalog is the only way to render the updated resource until CMF changes the PUT endpoint to return the catalog. As discussed, we will address this in a follow up PR. |
||||||
| return err | ||||||
| } | ||||||
|
|
||||||
| sdkOutputCatalog, err := client.DescribeCatalog(c.createContext(), catalogName) | ||||||
|
paras-negi-flink marked this conversation as resolved.
|
||||||
| if err != nil { | ||||||
| return fmt.Errorf("catalog %q was updated successfully, but failed to retrieve updated details: %w", catalogName, err) | ||||||
| } | ||||||
|
|
||||||
| return printCatalogOutput(cmd, sdkOutputCatalog) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use this unified print out function for the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. catalogCreate now ends with return printCatalogOutput(cmd, sdkOutputCatalog) |
||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| { | ||
| "apiVersion": "cmf/api/v1/catalog", | ||
| "kind": "KafkaCatalog", | ||
| "metadata": { | ||
| "name": "invalid-catalog" | ||
| }, | ||
| "spec": { | ||
| "kafkaClusters": [ | ||
| { | ||
| "databaseName": "dev" | ||
| }, | ||
| { | ||
| "databaseName": "prod" | ||
| } | ||
| ] | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| apiVersion: cmf/api/v1/catalog | ||
| kind: KafkaCatalog | ||
| metadata: | ||
| name: invalid-catalog | ||
| spec: | ||
| kafkaClusters: | ||
| - databaseName: dev | ||
| - databaseName: prod |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| { | ||
| "apiVersion": "cmf/api/v1/catalog", | ||
| "kind": "KafkaCatalog", | ||
| "metadata": { | ||
| "name": "test-catalog" | ||
| }, | ||
| "spec": { | ||
| "kafkaClusters": [ | ||
| { | ||
| "databaseName": "dev" | ||
| }, | ||
| { | ||
| "databaseName": "prod" | ||
| } | ||
| ] | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| apiVersion: cmf/api/v1/catalog | ||
| kind: KafkaCatalog | ||
| metadata: | ||
| name: test-catalog | ||
| spec: | ||
| kafkaClusters: | ||
| - databaseName: dev | ||
| - databaseName: prod |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| Update an existing Kafka Catalog in Confluent Platform from a resource file. | ||
|
|
||
| Usage: | ||
| confluent flink catalog update <resourceFilePath> [flags] | ||
|
|
||
| Flags: | ||
| --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. | ||
| --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. | ||
| --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. | ||
| --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. | ||
| -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") | ||
|
|
||
| Global Flags: | ||
| -h, --help Show help for this command. | ||
| --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. | ||
| -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Error: failed to update Kafka Catalog "invalid-catalog": The catalog name is invalid |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| { | ||
| "apiVersion": "", | ||
| "kind": "", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are apiVersion and kind empty?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| "metadata": { | ||
| "name": "test-catalog", | ||
| "creationTimestamp": "2025-08-05 12:00:00 +0000 UTC" | ||
| }, | ||
| "spec": { | ||
| "srInstance": { | ||
| "connectionConfig": null | ||
| }, | ||
| "kafkaClusters": [ | ||
| { | ||
| "databaseName": "test-database", | ||
| "connectionConfig": null | ||
| }, | ||
| { | ||
| "databaseName": "test-database-2", | ||
| "connectionConfig": null | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Are the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it should always be a populated map in CMF's response. Fixed now |
||
| ] | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| apiVersion: "" | ||
| kind: "" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems off
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fill these fields, and similar golden files.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| metadata: | ||
| name: test-catalog | ||
| creationTimestamp: 2025-08-05 12:00:00 +0000 UTC | ||
| spec: | ||
| srInstance: | ||
| connectionConfig: {} | ||
| kafkaClusters: | ||
| - databaseName: test-database | ||
| connectionConfig: {} | ||
| - databaseName: test-database-2 | ||
| connectionConfig: {} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| +---------------+--------------------------------+ | ||
| | Creation Time | 2025-08-05 12:00:00 +0000 UTC | | ||
| | Name | test-catalog | | ||
| | Databases | test-database, test-database-2 | | ||
| +---------------+--------------------------------+ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -410,6 +410,19 @@ | |
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkCatalogUpdateOnPrem() { | ||
| tests := []CLITest{ | ||
| // success | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, | ||
|
Check failure on line 416 in test/flink_onprem_test.go
|
||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, | ||
|
Check failure on line 417 in test/flink_onprem_test.go
|
||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, | ||
|
Check failure on line 418 in test/flink_onprem_test.go
|
||
| // failure | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, | ||
|
Check failure on line 420 in test/flink_onprem_test.go
|
||
| } | ||
|
|
||
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { | ||
| tests := []CLITest{ | ||
| // success | ||
|
|
@@ -607,6 +620,25 @@ | |
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkCatalogUpdateWithYAML() { | ||
|
paras-negi-flink marked this conversation as resolved.
Outdated
|
||
| tests := []CLITest{ | ||
| // success scenarios with JSON files | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json", fixture: "flink/catalog/update-success.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output json", fixture: "flink/catalog/update-success-json.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.json --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, | ||
| // failure scenarios with JSON files | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.json", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be json format files? Or are these totally redundant?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dropped them and renamed the function to TestFlinkCatalogUpdateOnPremWithYAML, so it's now YAML-only. |
||
| // YAML file tests | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml", fixture: "flink/catalog/update-success.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output json", fixture: "flink/catalog/update-success-json.golden"}, | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-successful.yaml --output yaml", fixture: "flink/catalog/update-success-yaml.golden"}, | ||
| // YAML file failure scenarios | ||
| {args: "flink catalog update test/fixtures/input/flink/catalog/update-invalid-failure.yaml", fixture: "flink/catalog/update-invalid-failure.golden", exitCode: 1}, | ||
| } | ||
|
paras-negi-flink marked this conversation as resolved.
Outdated
|
||
|
|
||
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkShellOnPrem() { | ||
| tests := []flinkShellTest{ | ||
| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printCatalogOutputcentralizes catalog rendering, butcatalogCreateandcatalogDescribestill contain their own (nearly identical) output formatting logic. To avoid future drift between commands, consider switching those commands to callprintCatalogOutputas well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Robert Metzger (@rmetzger) Kumar Mallikarjuna (@kumar-mallikarjuna) does it make sense to touch other existing files to reduce duplication, or should we tackle that in another change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to fix this as part of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Copilot and Robert Metzger (@rmetzger) 's comment, maybe we can apply this idea to the catalog database as well.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done — catalogCreate and catalogDescribe now both call printCatalogOutput.