From 59f1ff740f4114ddb4a2d75d8f01c36b8d733229 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 21 Apr 2026 01:41:07 +0800 Subject: [PATCH 1/3] fix the bug --- .../org/apache/kafka/tools/ConfigCommand.java | 26 ++--- .../apache/kafka/tools/ConfigCommandTest.java | 108 +++--------------- 2 files changed, 23 insertions(+), 111 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java index dffcbc9122d9c..b7d5b04bffef9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java @@ -249,12 +249,15 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE } else if (CLIENT_METRICS_TYPE.equals(entityType)) { configResourceType = ConfigResource.Type.CLIENT_METRICS; } else if (BROKER_TYPE.equals(entityType)) { + if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) { + validateBrokerId(entityName, entityType); + } configResourceType = ConfigResource.Type.BROKER; } else { configResourceType = ConfigResource.Type.GROUP; } try { - alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType); + alterResourceConfig(adminClient, entityName, configsToBeDeleted, configsToBeAdded, configResourceType); } catch (ExecutionException ee) { if (ee.getCause() instanceof UnsupportedVersionException) { throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0." @@ -263,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE throw ee; } } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) { - List validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList(); + List validLoggers = getResourceConfig(adminClient, entityType, entityName, false).stream().map(ConfigEntry::name).toList(); // fail the command if any of the configured broker loggers do not exist List invalidBrokerLoggers = Stream.concat( configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)), @@ -568,7 +571,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType, String entityTypeSingular = entityType.substring(0, entityType.length() - 1); System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:"); } - getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> { + getResourceConfig(adminClient, entityType, entity, describeAll).forEach(entry -> { String synonyms = entry.synonyms().stream() .map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value()) .collect(Collectors.joining(", ", "{", "}")); @@ -577,18 +580,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType, } } - private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List configsToBeDeleted, Map configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException { - Map oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false) - .stream() - .collect(Collectors.toMap(ConfigEntry::name, entry -> entry)); - - // fail the command if any of the configs to be deleted does not exist - List invalidConfigs = configsToBeDeleted.stream() - .filter(config -> !oldConfig.containsKey(config)) - .toList(); - if (!invalidConfigs.isEmpty()) - throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs)); - + private static void alterResourceConfig(Admin adminClient, String entityNameHead, List configsToBeDeleted, Map configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException { ConfigResource configResource = new ConfigResource(resourceType, entityNameHead); AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false); List addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList(); @@ -605,7 +597,7 @@ static void validateBrokerId(String entityName, String entityType) { } } - private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException { + private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException { ConfigResource.Type configResourceType; Optional dynamicConfigSource; @@ -643,7 +635,7 @@ private static List getResourceConfig(Admin adminClient, String ent Optional configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource; ConfigResource configResource = new ConfigResource(configResourceType, entityName); - DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms); + DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true); Map configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions) .all().get(30, TimeUnit.SECONDS); diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index a0743d5ba00d0..fdb073827a4e0 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -805,13 +805,6 @@ public void shouldAlterTopicConfig(boolean file) throws Exception { "--delete-config", "unclean.leader.election.enable")); AtomicBoolean alteredConfigs = new AtomicBoolean(); - ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName); - List configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1")); - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Map.of(resource, new Config(configEntries))); - DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class); - when(describeResult.all()).thenReturn(future); - KafkaFutureImpl alterFuture = new KafkaFutureImpl<>(); alterFuture.complete(null); AlterConfigsResult alterResult = mock(AlterConfigsResult.class); @@ -819,16 +812,6 @@ public void shouldAlterTopicConfig(boolean file) throws Exception { Node node = new Node(1, "localhost", 9092); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { - @Override - public synchronized DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily"); - assertEquals(1, resources.size()); - ConfigResource res = resources.iterator().next(); - assertEquals(ConfigResource.Type.TOPIC, res.type()); - assertEquals(resourceName, res.name()); - return describeResult; - } - @Override public synchronized AlterConfigsResult incrementalAlterConfigs(Map> configs, AlterConfigsOptions options) { assertEquals(1, configs.size()); @@ -858,7 +841,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map resourceOpts) throws Exception { + public void verifyAlterBrokerConfig(Node node, List resourceOpts) throws Exception { String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092", "--entity-type", "brokers", "--alter", @@ -989,29 +972,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List Map brokerConfigs = new HashMap<>(); brokerConfigs.put("num.io.threads", "5"); - ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName); - List configEntries = List.of(new ConfigEntry("num.io.threads", "5")); - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Map.of(resource, new Config(configEntries))); - DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class); - when(describeResult.all()).thenReturn(future); - KafkaFutureImpl alterFuture = new KafkaFutureImpl<>(); alterFuture.complete(null); AlterConfigsResult alterResult = mock(AlterConfigsResult.class); when(alterResult.all()).thenReturn(alterFuture); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { - @Override - public synchronized DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily"); - assertEquals(1, resources.size()); - ConfigResource res = resources.iterator().next(); - assertEquals(ConfigResource.Type.BROKER, res.type()); - assertEquals(resourceName, res.name()); - return describeResult; - } - @Override public synchronized AlterConfigsResult incrementalAlterConfigs(Map> configs, AlterConfigsOptions options) { assertEquals(1, configs.size()); @@ -1029,7 +995,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map configEntries = List.of(); - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Map.of(resource, new Config(configEntries))); - DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class); - when(describeResult.all()).thenReturn(future); + KafkaFutureImpl alterFuture = new KafkaFutureImpl<>(); + alterFuture.complete(null); + AlterConfigsResult alterResult = mock(AlterConfigsResult.class); + when(alterResult.all()).thenReturn(alterFuture); Node node = new Node(1, "localhost", 9092); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { @Override - public synchronized DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - assertEquals(1, resources.size()); - ConfigResource res = resources.iterator().next(); - assertEquals(ConfigResource.Type.TOPIC, res.type()); - assertEquals(resourceName, res.name()); - return describeResult; + public synchronized AlterConfigsResult incrementalAlterConfigs(Map> configs, AlterConfigsOptions options) { + return alterResult; } }; - assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts)); - verify(describeResult).all(); + ConfigCommand.alterConfig(mockAdminClient, createOpts); + verify(alterResult).all(); } @Test @@ -1207,31 +1167,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List "match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts); ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList)); - ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName); - List configEntries = List.of(new ConfigEntry("interval.ms", "1000", - ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(), - ConfigEntry.ConfigType.UNKNOWN, null)); - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Map.of(resource, new Config(configEntries))); - DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class); - when(describeResult.all()).thenReturn(future); - KafkaFutureImpl alterFuture = new KafkaFutureImpl<>(); alterFuture.complete(null); AlterConfigsResult alterResult = mock(AlterConfigsResult.class); when(alterResult.all()).thenReturn(alterFuture); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { - @Override - public synchronized DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily"); - assertEquals(1, resources.size()); - ConfigResource res = resources.iterator().next(); - assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type()); - assertEquals(resourceName, res.name()); - return describeResult; - } - @Override public synchronized AlterConfigsResult incrementalAlterConfigs(Map> configs, AlterConfigsOptions options) { assertEquals(1, configs.size()); @@ -1255,7 +1196,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map "--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts); ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList)); - ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName); - List configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000", - ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(), - ConfigEntry.ConfigType.UNKNOWN, null)); - KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(Map.of(resource, new Config(configEntries))); - DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class); - when(describeResult.all()).thenReturn(future); - KafkaFutureImpl alterFuture = new KafkaFutureImpl<>(); alterFuture.complete(null); AlterConfigsResult alterResult = mock(AlterConfigsResult.class); when(alterResult.all()).thenReturn(alterFuture); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { - @Override - public synchronized DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily"); - assertEquals(1, resources.size()); - ConfigResource res = resources.iterator().next(); - assertEquals(ConfigResource.Type.GROUP, res.type()); - assertEquals(resourceName, res.name()); - return describeResult; - } - @Override public synchronized AlterConfigsResult incrementalAlterConfigs(Map> configs, AlterConfigsOptions options) { assertEquals(1, configs.size()); @@ -1364,7 +1285,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map Date: Tue, 21 Apr 2026 20:28:53 +0800 Subject: [PATCH 2/3] addressed by comment and add test --- docs/getting-started/upgrade.md | 1 + .../org/apache/kafka/tools/ConfigCommand.java | 8 +++---- .../tools/ConfigCommandIntegrationTest.java | 23 +++++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md index fb5ad37d83ea7..efeae5151e2b0 100644 --- a/docs/getting-started/upgrade.md +++ b/docs/getting-started/upgrade.md @@ -32,6 +32,7 @@ type: docs ### Notable changes in 4.4.0 + * `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506). * The `ClientQuotaCallback#updateClusterMetadata` method is deprecated and will be removed in Kafka 5.0. Custom implementations of `ClientQuotaCallback` no longer need to override this method, as a default no-op implementation is now provided. For further details, please refer to [KIP-1200](https://cwiki.apache.org/confluence/x/axBJFg). ## Upgrading to 4.3.0 diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java index b7d5b04bffef9..12f28980222a5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java @@ -266,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE throw ee; } } else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) { - List validLoggers = getResourceConfig(adminClient, entityType, entityName, false).stream().map(ConfigEntry::name).toList(); + List validLoggers = getResourceConfig(adminClient, entityType, entityName, false, false).stream().map(ConfigEntry::name).toList(); // fail the command if any of the configured broker loggers do not exist List invalidBrokerLoggers = Stream.concat( configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)), @@ -571,7 +571,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType, String entityTypeSingular = entityType.substring(0, entityType.length() - 1); System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:"); } - getResourceConfig(adminClient, entityType, entity, describeAll).forEach(entry -> { + getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> { String synonyms = entry.synonyms().stream() .map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value()) .collect(Collectors.joining(", ", "{", "}")); @@ -597,7 +597,7 @@ static void validateBrokerId(String entityName, String entityType) { } } - private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException { + private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException { ConfigResource.Type configResourceType; Optional dynamicConfigSource; @@ -635,7 +635,7 @@ private static List getResourceConfig(Admin adminClient, String ent Optional configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource; ConfigResource configResource = new ConfigResource(configResourceType, entityName); - DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true); + DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms); Map configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions) .all().get(30, TimeUnit.SECONDS); diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java index 7fca102f8a627..c5764c4bbf863 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java @@ -621,6 +621,29 @@ public void testUpdateInvalidTopicConfigs() throws ExecutionException, Interrupt } } + @ClusterTest + public void testDeleteNonExistentConfigIsIdempotent() throws Exception { + String topicName = "test-delete-nonexistent-topic"; + try (Admin client = cluster.admin()) { + client.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get(); + + ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray( + List.of("--bootstrap-server", cluster.bootstrapServers(), + "--entity-type", "topics", "--entity-name", topicName, + "--alter", "--delete-config", "non.existent.config")))); + + ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray( + List.of("--bootstrap-server", cluster.bootstrapServers(), + "--entity-type", "brokers", "--entity-name", defaultBrokerId, + "--alter", "--delete-config", "non.existent.config")))); + + ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray( + List.of("--bootstrap-server", cluster.bootstrapServers(), + "--entity-type", "brokers", "--entity-default", + "--alter", "--delete-config", "non.existent.config")))); + } + } + // Test case from KAFKA-13788 @ClusterTest(serverProperties = { // Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads. From bda0cf8430bfa3165a91061f77f273a0341a9431 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Thu, 23 Apr 2026 08:53:08 +0800 Subject: [PATCH 3/3] move to 4.3 --- docs/getting-started/upgrade.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md index efeae5151e2b0..1f30f2b2e7f3e 100644 --- a/docs/getting-started/upgrade.md +++ b/docs/getting-started/upgrade.md @@ -32,7 +32,6 @@ type: docs ### Notable changes in 4.4.0 - * `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506). * The `ClientQuotaCallback#updateClusterMetadata` method is deprecated and will be removed in Kafka 5.0. Custom implementations of `ClientQuotaCallback` no longer need to override this method, as a default no-op implementation is now provided. For further details, please refer to [KIP-1200](https://cwiki.apache.org/confluence/x/axBJFg). ## Upgrading to 4.3.0 @@ -41,6 +40,7 @@ type: docs ### Notable changes in 4.3.0 + * `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506). * Support dynamically changing configs for dynamic quorum controllers. Previously only brokers and static quorum controllers were supported. For further details, please refer to [KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928). * Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg). * The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).