Skip to content

Commit 841587a

Browse files
committed
fix the bug
1 parent 282eef9 commit 841587a

2 files changed

Lines changed: 25 additions & 111 deletions

File tree

tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,15 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
249249
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
250250
configResourceType = ConfigResource.Type.CLIENT_METRICS;
251251
} else if (BROKER_TYPE.equals(entityType)) {
252+
if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
253+
validateBrokerId(entityName, entityType);
254+
}
252255
configResourceType = ConfigResource.Type.BROKER;
253256
} else {
254257
configResourceType = ConfigResource.Type.GROUP;
255258
}
256259
try {
257-
alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
260+
alterResourceConfig(adminClient, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
258261
} catch (ExecutionException ee) {
259262
if (ee.getCause() instanceof UnsupportedVersionException) {
260263
throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0."
@@ -263,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
263266
throw ee;
264267
}
265268
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
266-
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList();
269+
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, false).stream().map(ConfigEntry::name).toList();
267270
// fail the command if any of the configured broker loggers do not exist
268271
List<String> invalidBrokerLoggers = Stream.concat(
269272
configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)),
@@ -568,7 +571,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
568571
String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
569572
System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:");
570573
}
571-
getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> {
574+
getResourceConfig(adminClient, entityType, entity, describeAll).forEach(entry -> {
572575
String synonyms = entry.synonyms().stream()
573576
.map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value())
574577
.collect(Collectors.joining(", ", "{", "}"));
@@ -577,18 +580,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
577580
}
578581
}
579582

580-
private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
581-
Map<String, ConfigEntry> oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false)
582-
.stream()
583-
.collect(Collectors.toMap(ConfigEntry::name, entry -> entry));
584-
585-
// fail the command if any of the configs to be deleted does not exist
586-
List<String> invalidConfigs = configsToBeDeleted.stream()
587-
.filter(config -> !oldConfig.containsKey(config))
588-
.toList();
589-
if (!invalidConfigs.isEmpty())
590-
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));
591-
583+
private static void alterResourceConfig(Admin adminClient, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
592584
ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
593585
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
594586
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
@@ -605,7 +597,7 @@ static void validateBrokerId(String entityName, String entityType) {
605597
}
606598
}
607599

608-
private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
600+
private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
609601
ConfigResource.Type configResourceType;
610602
Optional<ConfigEntry.ConfigSource> dynamicConfigSource;
611603

@@ -643,7 +635,7 @@ private static List<ConfigEntry> getResourceConfig(Admin adminClient, String ent
643635
Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource;
644636

645637
ConfigResource configResource = new ConfigResource(configResourceType, entityName);
646-
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
638+
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true);
647639
Map<ConfigResource, Config> configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
648640
.all().get(30, TimeUnit.SECONDS);
649641

tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java

Lines changed: 16 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -805,30 +805,13 @@ public void shouldAlterTopicConfig(boolean file) throws Exception {
805805
"--delete-config", "unclean.leader.election.enable"));
806806
AtomicBoolean alteredConfigs = new AtomicBoolean();
807807

808-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
809-
List<ConfigEntry> configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1"));
810-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
811-
future.complete(Map.of(resource, new Config(configEntries)));
812-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
813-
when(describeResult.all()).thenReturn(future);
814-
815808
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
816809
alterFuture.complete(null);
817810
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
818811
when(alterResult.all()).thenReturn(alterFuture);
819812

820813
Node node = new Node(1, "localhost", 9092);
821814
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
822-
@Override
823-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
824-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
825-
assertEquals(1, resources.size());
826-
ConfigResource res = resources.iterator().next();
827-
assertEquals(ConfigResource.Type.TOPIC, res.type());
828-
assertEquals(resourceName, res.name());
829-
return describeResult;
830-
}
831-
832815
@Override
833816
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
834817
assertEquals(1, configs.size());
@@ -858,7 +841,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
858841
};
859842
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
860843
assertTrue(alteredConfigs.get());
861-
verify(describeResult).all();
844+
verify(alterResult).all();
862845
}
863846

864847
public ConfigEntry newConfigEntry(String name, String value) {
@@ -971,16 +954,16 @@ public void shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLogge
971954
@Test
972955
public void shouldAddDefaultBrokerDynamicConfig() throws Exception {
973956
Node node = new Node(1, "localhost", 9092);
974-
verifyAlterBrokerConfig(node, "", List.of("--entity-default"));
957+
verifyAlterBrokerConfig(node, List.of("--entity-default"));
975958
}
976959

977960
@Test
978961
public void shouldAddBrokerDynamicConfig() throws Exception {
979962
Node node = new Node(1, "localhost", 9092);
980-
verifyAlterBrokerConfig(node, "1", List.of("--entity-name", "1"));
963+
verifyAlterBrokerConfig(node, List.of("--entity-name", "1"));
981964
}
982965

983-
public void verifyAlterBrokerConfig(Node node, String resourceName, List<String> resourceOpts) throws Exception {
966+
public void verifyAlterBrokerConfig(Node node, List<String> resourceOpts) throws Exception {
984967
String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092",
985968
"--entity-type", "brokers",
986969
"--alter",
@@ -989,29 +972,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List<String>
989972
Map<String, String> brokerConfigs = new HashMap<>();
990973
brokerConfigs.put("num.io.threads", "5");
991974

992-
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName);
993-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("num.io.threads", "5"));
994-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
995-
future.complete(Map.of(resource, new Config(configEntries)));
996-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
997-
when(describeResult.all()).thenReturn(future);
998-
999975
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1000976
alterFuture.complete(null);
1001977
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1002978
when(alterResult.all()).thenReturn(alterFuture);
1003979

1004980
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1005-
@Override
1006-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1007-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1008-
assertEquals(1, resources.size());
1009-
ConfigResource res = resources.iterator().next();
1010-
assertEquals(ConfigResource.Type.BROKER, res.type());
1011-
assertEquals(resourceName, res.name());
1012-
return describeResult;
1013-
}
1014-
1015981
@Override
1016982
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1017983
assertEquals(1, configs.size());
@@ -1029,7 +995,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
1029995
expected.put("num.io.threads", "5");
1030996
expected.put("leader.replication.throttled.rate", "10");
1031997
assertEquals(expected, brokerConfigs);
1032-
verify(describeResult).all();
998+
verify(alterResult).all();
1033999
}
10341000

10351001
@Test
@@ -1158,35 +1124,31 @@ public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
11581124
}
11591125

11601126
@Test
1161-
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
1127+
public void shouldAllowDeletingNonExistingConfig() throws Exception {
1128+
// Deleting a non-existent config is idempotent and should succeed,
1129+
// allowing offline broker configs to be managed via bootstrap-controller.
11621130
String resourceName = "my-topic";
11631131
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
11641132
"--entity-name", resourceName,
11651133
"--entity-type", "topics",
11661134
"--alter",
11671135
"--delete-config", "missing_config1, missing_config2"));
11681136

1169-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
1170-
List<ConfigEntry> configEntries = List.of();
1171-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1172-
future.complete(Map.of(resource, new Config(configEntries)));
1173-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1174-
when(describeResult.all()).thenReturn(future);
1137+
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1138+
alterFuture.complete(null);
1139+
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1140+
when(alterResult.all()).thenReturn(alterFuture);
11751141

11761142
Node node = new Node(1, "localhost", 9092);
11771143
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
11781144
@Override
1179-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1180-
assertEquals(1, resources.size());
1181-
ConfigResource res = resources.iterator().next();
1182-
assertEquals(ConfigResource.Type.TOPIC, res.type());
1183-
assertEquals(resourceName, res.name());
1184-
return describeResult;
1145+
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1146+
return alterResult;
11851147
}
11861148
};
11871149

1188-
assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts));
1189-
verify(describeResult).all();
1150+
ConfigCommand.alterConfig(mockAdminClient, createOpts);
1151+
verify(alterResult).all();
11901152
}
11911153

11921154
@Test
@@ -1207,31 +1169,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List
12071169
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts);
12081170
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
12091171

1210-
ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName);
1211-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("interval.ms", "1000",
1212-
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(),
1213-
ConfigEntry.ConfigType.UNKNOWN, null));
1214-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1215-
future.complete(Map.of(resource, new Config(configEntries)));
1216-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1217-
when(describeResult.all()).thenReturn(future);
1218-
12191172
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
12201173
alterFuture.complete(null);
12211174
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
12221175
when(alterResult.all()).thenReturn(alterFuture);
12231176

12241177
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1225-
@Override
1226-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1227-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1228-
assertEquals(1, resources.size());
1229-
ConfigResource res = resources.iterator().next();
1230-
assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type());
1231-
assertEquals(resourceName, res.name());
1232-
return describeResult;
1233-
}
1234-
12351178
@Override
12361179
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
12371180
assertEquals(1, configs.size());
@@ -1255,7 +1198,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
12551198
}
12561199
};
12571200
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1258-
verify(describeResult).all();
12591201
verify(alterResult).all();
12601202
}
12611203

@@ -1317,31 +1259,12 @@ private void verifyAlterGroupConfig(Node node, String resourceName, List<String>
13171259
"--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts);
13181260
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
13191261

1320-
ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName);
1321-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000",
1322-
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(),
1323-
ConfigEntry.ConfigType.UNKNOWN, null));
1324-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1325-
future.complete(Map.of(resource, new Config(configEntries)));
1326-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1327-
when(describeResult.all()).thenReturn(future);
1328-
13291262
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
13301263
alterFuture.complete(null);
13311264
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
13321265
when(alterResult.all()).thenReturn(alterFuture);
13331266

13341267
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1335-
@Override
1336-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1337-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1338-
assertEquals(1, resources.size());
1339-
ConfigResource res = resources.iterator().next();
1340-
assertEquals(ConfigResource.Type.GROUP, res.type());
1341-
assertEquals(resourceName, res.name());
1342-
return describeResult;
1343-
}
1344-
13451268
@Override
13461269
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
13471270
assertEquals(1, configs.size());
@@ -1364,7 +1287,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
13641287
}
13651288
};
13661289
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1367-
verify(describeResult).all();
13681290
verify(alterResult).all();
13691291
}
13701292

0 commit comments

Comments
 (0)