diff --git a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
index a5214a1a32660..5fa797f506a9c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
@@ -34,6 +34,7 @@
import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -71,6 +72,8 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -93,7 +96,7 @@
* An entity described or altered by the command may be one of:
*
* - topic: --topic OR --entity-type topics --entity-name
- *
- client: --client OR --entity-type clients --entity-name
+ *
- client: --client OR --entity-type clients --entity-name
*
- user: --user OR --entity-type users --entity-name
*
- : --user --client OR
* --entity-type users --entity-name --entity-type clients --entity-name
@@ -113,7 +116,6 @@ public class ConfigCommand {
private static final Logger LOG = LoggerFactory.getLogger(ConfigCommand.class);
private static final String BROKER_DEFAULT_ENTITY_NAME = "";
- private static final List BROKER_SUPPORTED_CONFIG_TYPES;
private static final int DEFAULT_SCRAM_ITERATIONS = 4096;
private static final String TOPIC_TYPE = ConfigType.TOPIC.value();
private static final String CLIENT_METRICS_TYPE = ConfigType.CLIENT_METRICS.value();
@@ -124,13 +126,10 @@ public class ConfigCommand {
private static final String IP_TYPE = ConfigType.IP.value();
static final String BROKER_LOGGER_CONFIG_TYPE = "broker-loggers";
- static {
- BROKER_SUPPORTED_CONFIG_TYPES = new ArrayList<>();
- BROKER_SUPPORTED_CONFIG_TYPES.add(BROKER_LOGGER_CONFIG_TYPE);
- for (ConfigType configType : ConfigType.values()) {
- BROKER_SUPPORTED_CONFIG_TYPES.add(configType.value());
- }
- }
+ private static final List BROKER_SUPPORTED_CONFIG_TYPES = Stream.concat(
+ Stream.of(BROKER_LOGGER_CONFIG_TYPE),
+ Stream.of(ConfigType.values()).map(ConfigType::value)
+ ).toList();
public static void main(String[] args) {
try {
@@ -273,7 +272,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
throw new InvalidConfigurationException("Invalid broker logger(s): " + String.join(",", invalidBrokerLoggers));
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName);
- AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000);
List addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
List deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
Collection alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -441,7 +440,7 @@ private static void alterQuotaConfigs(Admin adminClient, List entityType
}
ClientQuotaEntity entity = new ClientQuotaEntity(alterEntityMap);
- AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions().validateOnly(false);
+ AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions();
List addOps = configsToBeAddedMap.entrySet().stream()
.map(entry -> {
@@ -527,40 +526,58 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
}
}
- List entities;
+ Set entities;
if (entityName.isPresent()) {
- entities = List.of(entityName.get());
+ entities = Set.of(entityName.get());
} else {
if (TOPIC_TYPE.equals(entityType)) {
- entities = new ArrayList<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get());
+ entities = new LinkedHashSet<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get());
} else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
- List brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream()
+ Set brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream()
.map(Node::idString)
- .collect(Collectors.toList());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
brokerIds.add(BROKER_DEFAULT_ENTITY_NAME);
entities = brokerIds;
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
entities = adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get().stream()
.map(ConfigResource::name)
- .toList();
+ .collect(Collectors.toCollection(LinkedHashSet::new));
} else if (GROUP_TYPE.equals(entityType)) {
Set groupIds = adminClient.listGroups().all().get().stream()
.map(GroupListing::groupId)
- .collect(Collectors.toSet());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
Set groupResources = listGroupConfigResources(adminClient)
.map(resources -> resources.stream()
.map(ConfigResource::name)
- .collect(Collectors.toSet()))
- .orElse(Set.of());
- Set combined = new HashSet<>(groupIds);
+ .collect(Collectors.toCollection(LinkedHashSet::new)))
+ .orElseGet(LinkedHashSet::new);
+ Set combined = new LinkedHashSet<>(groupIds);
combined.addAll(groupResources);
- entities = new ArrayList<>(combined);
+ entities = combined;
} else {
throw new IllegalArgumentException("Invalid entity type: " + entityType);
}
}
+ if (entities.isEmpty()) {
+ return;
+ }
+
+ Map contextsByEntity = new LinkedHashMap<>();
for (String entity : entities) {
+ contextsByEntity.put(entity, describeConfigContext(entityType, entity));
+ }
+
+ DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true);
+ Map> configs = adminClient.describeConfigs(
+ contextsByEntity.values().stream()
+ .map(DescribeConfigContext::configResource)
+ .toList(),
+ describeOptions
+ ).values();
+
+ for (String entity : entities) {
+ DescribeConfigContext context = contextsByEntity.get(entity);
if (BROKER_DEFAULT_ENTITY_NAME.equals(entity)) {
System.out.println("Default configs for " + entityType + " in the cluster are:");
} else {
@@ -568,7 +585,12 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:");
}
- getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> {
+
+ Optional configSourceFilter = describeAll
+ ? Optional.empty()
+ : context.dynamicConfigSource();
+ Config config = configs.get(context.configResource()).get(30, TimeUnit.SECONDS);
+ filterAndSortEntries(config, configSourceFilter).forEach(entry -> {
String synonyms = entry.synonyms().stream()
.map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value())
.collect(Collectors.joining(", ", "{", "}"));
@@ -590,7 +612,7 @@ private static void alterResourceConfig(Admin adminClient, String entityTypeHead
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));
ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
- AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
+ AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000);
List addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
List deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
Collection alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
@@ -605,7 +627,10 @@ static void validateBrokerId(String entityName, String entityType) {
}
}
- private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+ private record DescribeConfigContext(ConfigResource configResource, Optional dynamicConfigSource) {
+ }
+
+ private static DescribeConfigContext describeConfigContext(String entityType, String entityName) {
ConfigResource.Type configResourceType;
Optional dynamicConfigSource;
@@ -616,12 +641,11 @@ private static List getResourceConfig(Admin adminClient, String ent
configResourceType = ConfigResource.Type.TOPIC;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
} else if (BROKER_TYPE.equals(entityType)) {
+ configResourceType = ConfigResource.Type.BROKER;
if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
- configResourceType = ConfigResource.Type.BROKER;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
} else {
validateBrokerId(entityName, entityType);
- configResourceType = ConfigResource.Type.BROKER;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG);
}
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
@@ -640,19 +664,28 @@ private static List getResourceConfig(Admin adminClient, String ent
throw new IllegalArgumentException("Invalid entity type: " + entityType);
}
- Optional configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource;
-
- ConfigResource configResource = new ConfigResource(configResourceType, entityName);
- DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
- Map configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
- .all().get(30, TimeUnit.SECONDS);
+ return new DescribeConfigContext(new ConfigResource(configResourceType, entityName), dynamicConfigSource);
+ }
- return configs.get(configResource).entries().stream()
+ private static List filterAndSortEntries(Config config, Optional configSourceFilter) {
+ return config.entries().stream()
.filter(entry -> configSourceFilter.isEmpty() || entry.source() == configSourceFilter.get())
.sorted(Comparator.comparing(ConfigEntry::name))
.toList();
}
+ private static List getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
+ DescribeConfigContext context = describeConfigContext(entityType, entityName);
+ Optional configSourceFilter = describeAll
+ ? Optional.empty()
+ : context.dynamicConfigSource();
+ DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
+ Map configs = adminClient.describeConfigs(Collections.singleton(context.configResource()), describeOptions)
+ .all().get(30, TimeUnit.SECONDS);
+
+ return filterAndSortEntries(configs.get(context.configResource()), configSourceFilter);
+ }
+
private static void describeQuotaConfigs(Admin adminClient, List entityTypes, List entityNames) throws ExecutionException, InterruptedException, TimeoutException {
Map> quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames);
quotaConfigs.forEach((entity, entries) -> {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 5e68bea182b2a..98d5ee1614886 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -80,7 +80,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -875,10 +874,10 @@ public void shouldDescribeConfigSynonyms() throws Exception {
"--all"));
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
- KafkaFutureImpl