Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/getting-started/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type: docs

### Notable changes in 4.3.0

* `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506).
* Support dynamically changing configs for dynamic quorum controllers. Previously only brokers and static quorum controllers were supported. For further details, please refer to [KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928).
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
Expand Down
20 changes: 6 additions & 14 deletions tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,15 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
configResourceType = ConfigResource.Type.CLIENT_METRICS;
} else if (BROKER_TYPE.equals(entityType)) {
if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
validateBrokerId(entityName, entityType);
}
configResourceType = ConfigResource.Type.BROKER;
} else {
configResourceType = ConfigResource.Type.GROUP;
}
try {
alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
alterResourceConfig(adminClient, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof UnsupportedVersionException) {
throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0."
Expand All @@ -263,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
throw ee;
}
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like an existing bug, as the synonyms are not actually being used. Therefore, it should pass true instead of false.

List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, false, false).stream().map(ConfigEntry::name).toList();
// fail the command if any of the configured broker loggers do not exist
List<String> invalidBrokerLoggers = Stream.concat(
configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)),
Expand Down Expand Up @@ -577,18 +580,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
}
}

private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, ConfigEntry> oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add this changes to upgrade.md

.stream()
.collect(Collectors.toMap(ConfigEntry::name, entry -> entry));

// fail the command if any of the configs to be deleted does not exist
List<String> invalidConfigs = configsToBeDeleted.stream()
.filter(config -> !oldConfig.containsKey(config))
.toList();
if (!invalidConfigs.isEmpty())
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));

private static void alterResourceConfig(Admin adminClient, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,29 @@ public void testUpdateInvalidTopicConfigs() throws ExecutionException, Interrupt
}
}

@ClusterTest
public void testDeleteNonExistentConfigIsIdempotent() throws Exception {
String topicName = "test-delete-nonexistent-topic";
try (Admin client = cluster.admin()) {
client.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();

ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
List.of("--bootstrap-server", cluster.bootstrapServers(),
"--entity-type", "topics", "--entity-name", topicName,
"--alter", "--delete-config", "non.existent.config"))));

ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
List.of("--bootstrap-server", cluster.bootstrapServers(),
"--entity-type", "brokers", "--entity-name", defaultBrokerId,
"--alter", "--delete-config", "non.existent.config"))));

ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
List.of("--bootstrap-server", cluster.bootstrapServers(),
"--entity-type", "brokers", "--entity-default",
"--alter", "--delete-config", "non.existent.config"))));
}
}

// Test case from KAFKA-13788
@ClusterTest(serverProperties = {
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.
Expand Down
108 changes: 14 additions & 94 deletions tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -805,30 +805,13 @@ public void shouldAlterTopicConfig(boolean file) throws Exception {
"--delete-config", "unclean.leader.election.enable"));
AtomicBoolean alteredConfigs = new AtomicBoolean();

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
List<ConfigEntry> configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1"));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.TOPIC, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand Down Expand Up @@ -858,7 +841,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
assertTrue(alteredConfigs.get());
verify(describeResult).all();
verify(alterResult).all();
}

public ConfigEntry newConfigEntry(String name, String value) {
Expand Down Expand Up @@ -971,16 +954,16 @@ public void shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLogge
@Test
public void shouldAddDefaultBrokerDynamicConfig() throws Exception {
Node node = new Node(1, "localhost", 9092);
verifyAlterBrokerConfig(node, "", List.of("--entity-default"));
verifyAlterBrokerConfig(node, List.of("--entity-default"));
}

@Test
public void shouldAddBrokerDynamicConfig() throws Exception {
Node node = new Node(1, "localhost", 9092);
verifyAlterBrokerConfig(node, "1", List.of("--entity-name", "1"));
verifyAlterBrokerConfig(node, List.of("--entity-name", "1"));
}

public void verifyAlterBrokerConfig(Node node, String resourceName, List<String> resourceOpts) throws Exception {
public void verifyAlterBrokerConfig(Node node, List<String> resourceOpts) throws Exception {
String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092",
"--entity-type", "brokers",
"--alter",
Expand All @@ -989,29 +972,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List<String>
Map<String, String> brokerConfigs = new HashMap<>();
brokerConfigs.put("num.io.threads", "5");

ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("num.io.threads", "5"));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.BROKER, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1029,7 +995,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
expected.put("num.io.threads", "5");
expected.put("leader.replication.throttled.rate", "10");
assertEquals(expected, brokerConfigs);
verify(describeResult).all();
verify(alterResult).all();
}

@Test
Expand Down Expand Up @@ -1158,35 +1124,29 @@ public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
}

@Test
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
public void shouldAllowDeletingNonExistingConfig() throws Exception {
String resourceName = "my-topic";
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
"--entity-name", resourceName,
"--entity-type", "topics",
"--alter",
"--delete-config", "missing_config1, missing_config2"));

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
List<ConfigEntry> configEntries = List.of();
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.TOPIC, res.type());
assertEquals(resourceName, res.name());
return describeResult;
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
return alterResult;
}
};

assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts));
verify(describeResult).all();
ConfigCommand.alterConfig(mockAdminClient, createOpts);
verify(alterResult).all();
}

@Test
Expand All @@ -1207,31 +1167,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts);
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));

ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("interval.ms", "1000",
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.UNKNOWN, null));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1255,7 +1196,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
}
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
verify(describeResult).all();
verify(alterResult).all();
}

Expand Down Expand Up @@ -1317,31 +1257,12 @@ private void verifyAlterGroupConfig(Node node, String resourceName, List<String>
"--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts);
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));

ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName);
List<ConfigEntry> configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000",
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(),
ConfigEntry.ConfigType.UNKNOWN, null));
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
future.complete(Map.of(resource, new Config(configEntries)));
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
when(describeResult.all()).thenReturn(future);

KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
alterFuture.complete(null);
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
when(alterResult.all()).thenReturn(alterFuture);

MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
assertEquals(1, resources.size());
ConfigResource res = resources.iterator().next();
assertEquals(ConfigResource.Type.GROUP, res.type());
assertEquals(resourceName, res.name());
return describeResult;
}

@Override
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
assertEquals(1, configs.size());
Expand All @@ -1364,7 +1285,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
}
};
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
verify(describeResult).all();
verify(alterResult).all();
}

Expand Down
Loading