Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,15 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
configResourceType = ConfigResource.Type.CLIENT_METRICS;
} else if (BROKER_TYPE.equals(entityType)) {
if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
validateBrokerId(entityName, entityType);
}
configResourceType = ConfigResource.Type.BROKER;
} else {
configResourceType = ConfigResource.Type.GROUP;
}
try {
alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
alterResourceConfig(adminClient, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof UnsupportedVersionException) {
throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0."
Expand All @@ -263,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
throw ee;
}
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like an existing bug, as the synonyms are not actually being used. Therefore, it should pass true instead of false.

List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, false).stream().map(ConfigEntry::name).toList();
// fail the command if any of the configured broker loggers do not exist
List<String> invalidBrokerLoggers = Stream.concat(
configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)),
Expand Down Expand Up @@ -568,7 +571,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:");
}
getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> {
getResourceConfig(adminClient, entityType, entity, describeAll).forEach(entry -> {
String synonyms = entry.synonyms().stream()
.map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value())
.collect(Collectors.joining(", ", "{", "}"));
Expand All @@ -577,18 +580,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
}
}

private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, ConfigEntry> oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add this changes to upgrade.md

.stream()
.collect(Collectors.toMap(ConfigEntry::name, entry -> entry));

// fail the command if any of the configs to be deleted does not exist
List<String> invalidConfigs = configsToBeDeleted.stream()
.filter(config -> !oldConfig.containsKey(config))
.toList();
if (!invalidConfigs.isEmpty())
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));

private static void alterResourceConfig(Admin adminClient, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
Expand All @@ -605,7 +597,7 @@ static void validateBrokerId(String entityName, String entityType) {
}
}

private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
ConfigResource.Type configResourceType;
Optional<ConfigEntry.ConfigSource> dynamicConfigSource;

Expand Down Expand Up @@ -643,7 +635,7 @@ private static List<ConfigEntry> getResourceConfig(Admin adminClient, String ent
Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource;

ConfigResource configResource = new ConfigResource(configResourceType, entityName);
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
.all().get(30, TimeUnit.SECONDS);

Expand Down
108 changes: 14 additions & 94 deletions tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,30 +805,13 @@ public void shouldAlterTopicConfig(boolean file) throws Exception {
"--delete-config", "unclean.leader.election.enable"));
AtomicBoolean alteredConfigs = new AtomicBoolean();

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
List<ConfigEntry> configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1"));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.TOPIC, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand Down Expand Up @@ -858,7 +841,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
assertTrue(alteredConfigs.get());
verify(describeResult).all();
verify(alterResult).all();
}

public ConfigEntry newConfigEntry(String name, String value) {
Expand Down Expand Up @@ -971,16 +954,16 @@ public void shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLogge
@Test
public void shouldAddDefaultBrokerDynamicConfig() throws Exception {
Node node = new Node(1, "localhost", 9092);
verifyAlterBrokerConfig(node, "", List.of("--entity-default"));
verifyAlterBrokerConfig(node, List.of("--entity-default"));
}

@Test
public void shouldAddBrokerDynamicConfig() throws Exception {
Node node = new Node(1, "localhost", 9092);
verifyAlterBrokerConfig(node, "1", List.of("--entity-name", "1"));
verifyAlterBrokerConfig(node, List.of("--entity-name", "1"));
}

public void verifyAlterBrokerConfig(Node node, String resourceName, List<String> resourceOpts) throws Exception {
public void verifyAlterBrokerConfig(Node node, List<String> resourceOpts) throws Exception {
String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092",
"--entity-type", "brokers",
"--alter",
Expand All @@ -989,29 +972,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List<String>
Map<String, String> brokerConfigs = new HashMap<>();
brokerConfigs.put("num.io.threads", "5");

ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("num.io.threads", "5"));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.BROKER, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1029,7 +995,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
expected.put("num.io.threads", "5");
expected.put("leader.replication.throttled.rate", "10");
assertEquals(expected, brokerConfigs);
verify(describeResult).all();
verify(alterResult).all();
}

@Test
Expand Down Expand Up @@ -1158,35 +1124,29 @@ public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
}

@Test
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
public void shouldAllowDeletingNonExistingConfig() throws Exception {
String resourceName = "my-topic";
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
"--entity-name", resourceName,
"--entity-type", "topics",
"--alter",
"--delete-config", "missing_config1, missing_config2"));

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
List<ConfigEntry> configEntries = List.of();
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.TOPIC, res.type());
assertEquals(resourceName, res.name());
return describeResult;
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
return alterResult;
}
};

assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts));
verify(describeResult).all();
ConfigCommand.alterConfig(mockAdminClient, createOpts);
verify(alterResult).all();
}

@Test
Expand All @@ -1207,31 +1167,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts);
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));

ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("interval.ms", "1000",
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.UNKNOWN, null));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1255,7 +1196,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
}
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
verify(describeResult).all();
verify(alterResult).all();
}

Expand Down Expand Up @@ -1317,31 +1257,12 @@ private void verifyAlterGroupConfig(Node node, String resourceName, List<String>
"--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts);
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));

ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000",
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.UNKNOWN, null));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.GROUP, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1364,7 +1285,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
}
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
verify(describeResult).all();
verify(alterResult).all();
}

Expand Down
Loading