Skip to content

Commit 3c688ff

Browse files
authored
KAFKA-20506 kafka-configs.sh can't delete the config from a offline broker when using bootstrap controller (#22113)
Remove the pre-flight DescribeConfigs existence check in alterResourceConfig() since deleting a non-existent config is idempotent, and the check causes a timeout when the target broker is offline. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent 322b055 commit 3c688ff

4 files changed

Lines changed: 52 additions & 112 deletions

File tree

core/src/main/scala/kafka/admin/ConfigCommand.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,15 @@ object ConfigCommand extends Logging {
184184
val configResourceType = entityTypeHead match {
185185
case TopicType => ConfigResource.Type.TOPIC
186186
case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
187-
case BrokerType => ConfigResource.Type.BROKER
187+
case BrokerType =>
188+
if (entityNameHead.nonEmpty)
189+
validateBrokerId(entityNameHead, entityTypeHead)
190+
ConfigResource.Type.BROKER
188191
case GroupType => ConfigResource.Type.GROUP
189192
case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
190193
}
191194
try {
192-
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
195+
alterResourceConfig(adminClient, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
193196
} catch {
194197
case e: ExecutionException =>
195198
e.getCause match {
@@ -202,7 +205,7 @@ object ConfigCommand extends Logging {
202205
}
203206

204207
case BrokerLoggerConfigType =>
205-
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
208+
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false).map(_.name)
206209
// fail the command if any of the configured broker loggers do not exist
207210
val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
208211
if (invalidBrokerLoggers.nonEmpty)
@@ -405,15 +408,7 @@ object ConfigCommand extends Logging {
405408
}
406409
}
407410

408-
private def alterResourceConfig(adminClient: Admin, entityTypeHead: String, entityNameHead: String, configsToBeDeleted: Seq[String], configsToBeAdded: Map[String, ConfigEntry], resourceType: ConfigResource.Type): Unit = {
409-
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
410-
.map { entry => (entry.name, entry) }.toMap
411-
412-
// fail the command if any of the configs to be deleted does not exist
413-
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
414-
if (invalidConfigs.nonEmpty)
415-
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
416-
411+
private def alterResourceConfig(adminClient: Admin, entityNameHead: String, configsToBeDeleted: Seq[String], configsToBeAdded: Map[String, ConfigEntry], resourceType: ConfigResource.Type): Unit = {
417412
val configResource = new ConfigResource(resourceType, entityNameHead)
418413
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
419414
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
@@ -422,11 +417,12 @@ object ConfigCommand extends Logging {
422417
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
423418
}
424419

420+
private def validateBrokerId(entityName: String, entityType: String): Unit = try entityName.toInt catch {
421+
case _: NumberFormatException =>
422+
throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
423+
}
424+
425425
private def getResourceConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
426-
def validateBrokerId(): Unit = try entityName.toInt catch {
427-
case _: NumberFormatException =>
428-
throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
429-
}
430426

431427
val (configResourceType, dynamicConfigSource) = entityType match {
432428
case TopicType =>
@@ -437,12 +433,12 @@ object ConfigCommand extends Logging {
437433
case BrokerDefaultEntityName =>
438434
(ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
439435
case _ =>
440-
validateBrokerId()
436+
validateBrokerId(entityName, entityType)
441437
(ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG))
442438
}
443439
case BrokerLoggerConfigType =>
444440
if (entityName.nonEmpty)
445-
validateBrokerId()
441+
validateBrokerId(entityName, entityType)
446442
(ConfigResource.Type.BROKER_LOGGER, None)
447443
case ClientMetricsType =>
448444
(ConfigResource.Type.CLIENT_METRICS, Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))

docs/getting-started/upgrade.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type: docs
3232

3333
### Notable changes in 4.3.0
3434

35+
* `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506).
3536
* Support dynamically changing configs for dynamic quorum controllers. Previously only brokers and static quorum controllers were supported. For further details, please refer to [KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928).
3637
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
3738
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).

tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,29 @@ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
616616
}
617617
}
618618

619+
@ClusterTest
620+
public void testDeleteNonExistentConfigIsIdempotent() throws Exception {
621+
String topicName = "test-delete-nonexistent-topic";
622+
try (Admin client = cluster.admin()) {
623+
client.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();
624+
625+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
626+
List.of("--bootstrap-server", cluster.bootstrapServers(),
627+
"--entity-type", "topics", "--entity-name", topicName,
628+
"--alter", "--delete-config", "non.existent.config"))));
629+
630+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
631+
List.of("--bootstrap-server", cluster.bootstrapServers(),
632+
"--entity-type", "brokers", "--entity-name", defaultBrokerId,
633+
"--alter", "--delete-config", "non.existent.config"))));
634+
635+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
636+
List.of("--bootstrap-server", cluster.bootstrapServers(),
637+
"--entity-type", "brokers", "--entity-default",
638+
"--alter", "--delete-config", "non.existent.config"))));
639+
}
640+
}
641+
619642
@ClusterTest(
620643
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.
621644
serverProperties = {@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "2097154")},

tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java

Lines changed: 14 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -808,30 +808,13 @@ public void shouldAlterTopicConfig(boolean file) {
808808
"--delete-config", "unclean.leader.election.enable"));
809809
AtomicBoolean alteredConfigs = new AtomicBoolean();
810810

811-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
812-
List<ConfigEntry> configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1"));
813-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
814-
future.complete(Map.of(resource, new Config(configEntries)));
815-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
816-
when(describeResult.all()).thenReturn(future);
817-
818811
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
819812
alterFuture.complete(null);
820813
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
821814
when(alterResult.all()).thenReturn(alterFuture);
822815

823816
Node node = new Node(1, "localhost", 9092);
824817
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
825-
@Override
826-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
827-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
828-
assertEquals(1, resources.size());
829-
ConfigResource res = resources.iterator().next();
830-
assertEquals(ConfigResource.Type.TOPIC, res.type());
831-
assertEquals(resourceName, res.name());
832-
return describeResult;
833-
}
834-
835818
@Override
836819
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
837820
assertEquals(1, configs.size());
@@ -861,7 +844,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
861844
};
862845
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
863846
assertTrue(alteredConfigs.get());
864-
verify(describeResult).all();
847+
verify(alterResult).all();
865848
}
866849

867850
public ConfigEntry newConfigEntry(String name, String value) {
@@ -974,16 +957,16 @@ public void shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLogge
974957
@Test
975958
public void shouldAddDefaultBrokerDynamicConfig() {
976959
Node node = new Node(1, "localhost", 9092);
977-
verifyAlterBrokerConfig(node, "", List.of("--entity-default"));
960+
verifyAlterBrokerConfig(node, List.of("--entity-default"));
978961
}
979962

980963
@Test
981964
public void shouldAddBrokerDynamicConfig() {
982965
Node node = new Node(1, "localhost", 9092);
983-
verifyAlterBrokerConfig(node, "1", List.of("--entity-name", "1"));
966+
verifyAlterBrokerConfig(node, List.of("--entity-name", "1"));
984967
}
985968

986-
public void verifyAlterBrokerConfig(Node node, String resourceName, List<String> resourceOpts) {
969+
public void verifyAlterBrokerConfig(Node node, List<String> resourceOpts) {
987970
String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092",
988971
"--entity-type", "brokers",
989972
"--alter",
@@ -992,29 +975,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List<String>
992975
Map<String, String> brokerConfigs = new HashMap<>();
993976
brokerConfigs.put("num.io.threads", "5");
994977

995-
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName);
996-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("num.io.threads", "5"));
997-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
998-
future.complete(Map.of(resource, new Config(configEntries)));
999-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1000-
when(describeResult.all()).thenReturn(future);
1001-
1002978
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1003979
alterFuture.complete(null);
1004980
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1005981
when(alterResult.all()).thenReturn(alterFuture);
1006982

1007983
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1008-
@Override
1009-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1010-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1011-
assertEquals(1, resources.size());
1012-
ConfigResource res = resources.iterator().next();
1013-
assertEquals(ConfigResource.Type.BROKER, res.type());
1014-
assertEquals(resourceName, res.name());
1015-
return describeResult;
1016-
}
1017-
1018984
@Override
1019985
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1020986
assertEquals(1, configs.size());
@@ -1032,7 +998,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
1032998
expected.put("num.io.threads", "5");
1033999
expected.put("leader.replication.throttled.rate", "10");
10341000
assertEquals(expected, brokerConfigs);
1035-
verify(describeResult).all();
1001+
verify(alterResult).all();
10361002
}
10371003

10381004
@Test
@@ -1161,35 +1127,29 @@ public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
11611127
}
11621128

11631129
@Test
1164-
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
1130+
public void shouldAllowDeletingNonExistingConfig() throws Exception {
11651131
String resourceName = "my-topic";
11661132
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
11671133
"--entity-name", resourceName,
11681134
"--entity-type", "topics",
11691135
"--alter",
11701136
"--delete-config", "missing_config1, missing_config2"));
11711137

1172-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
1173-
List<ConfigEntry> configEntries = List.of();
1174-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1175-
future.complete(Map.of(resource, new Config(configEntries)));
1176-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1177-
when(describeResult.all()).thenReturn(future);
1138+
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1139+
alterFuture.complete(null);
1140+
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1141+
when(alterResult.all()).thenReturn(alterFuture);
11781142

11791143
Node node = new Node(1, "localhost", 9092);
11801144
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
11811145
@Override
1182-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1183-
assertEquals(1, resources.size());
1184-
ConfigResource res = resources.iterator().next();
1185-
assertEquals(ConfigResource.Type.TOPIC, res.type());
1186-
assertEquals(resourceName, res.name());
1187-
return describeResult;
1146+
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1147+
return alterResult;
11881148
}
11891149
};
11901150

1191-
assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts));
1192-
verify(describeResult).all();
1151+
ConfigCommand.alterConfig(mockAdminClient, createOpts);
1152+
verify(alterResult).all();
11931153
}
11941154

11951155
@Test
@@ -1210,31 +1170,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List
12101170
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts);
12111171
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
12121172

1213-
ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName);
1214-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("interval.ms", "1000",
1215-
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(),
1216-
ConfigEntry.ConfigType.UNKNOWN, null));
1217-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1218-
future.complete(Map.of(resource, new Config(configEntries)));
1219-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1220-
when(describeResult.all()).thenReturn(future);
1221-
12221173
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
12231174
alterFuture.complete(null);
12241175
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
12251176
when(alterResult.all()).thenReturn(alterFuture);
12261177

12271178
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1228-
@Override
1229-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1230-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1231-
assertEquals(1, resources.size());
1232-
ConfigResource res = resources.iterator().next();
1233-
assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type());
1234-
assertEquals(resourceName, res.name());
1235-
return describeResult;
1236-
}
1237-
12381179
@Override
12391180
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
12401181
assertEquals(1, configs.size());
@@ -1258,7 +1199,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
12581199
}
12591200
};
12601201
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1261-
verify(describeResult).all();
12621202
verify(alterResult).all();
12631203
}
12641204

@@ -1320,31 +1260,12 @@ private void verifyAlterGroupConfig(Node node, String resourceName, List<String>
13201260
"--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts);
13211261
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
13221262

1323-
ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName);
1324-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000",
1325-
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(),
1326-
ConfigEntry.ConfigType.UNKNOWN, null));
1327-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1328-
future.complete(Map.of(resource, new Config(configEntries)));
1329-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1330-
when(describeResult.all()).thenReturn(future);
1331-
13321263
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
13331264
alterFuture.complete(null);
13341265
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
13351266
when(alterResult.all()).thenReturn(alterFuture);
13361267

13371268
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1338-
@Override
1339-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1340-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1341-
assertEquals(1, resources.size());
1342-
ConfigResource res = resources.iterator().next();
1343-
assertEquals(ConfigResource.Type.GROUP, res.type());
1344-
assertEquals(resourceName, res.name());
1345-
return describeResult;
1346-
}
1347-
13481269
@Override
13491270
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
13501271
assertEquals(1, configs.size());
@@ -1367,7 +1288,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
13671288
}
13681289
};
13691290
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1370-
verify(describeResult).all();
13711291
verify(alterResult).all();
13721292
}
13731293

0 commit comments

Comments
 (0)