Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 67 additions & 34 deletions tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand Down Expand Up @@ -71,6 +72,8 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -93,7 +96,7 @@
* An entity described or altered by the command may be one of:
* <ul>
* <li> topic: --topic <topic> OR --entity-type topics --entity-name <topic>
* <li> client: --client <client> OR --entity-type clients --entity-name <client-id>
* <li> client: --client <client-id> OR --entity-type clients --entity-name <client-id>
* <li> user: --user <user-principal> OR --entity-type users --entity-name <user-principal>
* <li> <user, client>: --user <user-principal> --client <client-id> OR
* --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id>
Expand All @@ -113,7 +116,6 @@ public class ConfigCommand {
private static final Logger LOG = LoggerFactory.getLogger(ConfigCommand.class);

private static final String BROKER_DEFAULT_ENTITY_NAME = "";
private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES;
private static final int DEFAULT_SCRAM_ITERATIONS = 4096;
private static final String TOPIC_TYPE = ConfigType.TOPIC.value();
private static final String CLIENT_METRICS_TYPE = ConfigType.CLIENT_METRICS.value();
Expand All @@ -124,13 +126,10 @@ public class ConfigCommand {
private static final String IP_TYPE = ConfigType.IP.value();

static final String BROKER_LOGGER_CONFIG_TYPE = "broker-loggers";
static {
BROKER_SUPPORTED_CONFIG_TYPES = new ArrayList<>();
BROKER_SUPPORTED_CONFIG_TYPES.add(BROKER_LOGGER_CONFIG_TYPE);
for (ConfigType configType : ConfigType.values()) {
BROKER_SUPPORTED_CONFIG_TYPES.add(configType.value());
}
}
private static final List<String> BROKER_SUPPORTED_CONFIG_TYPES = Stream.concat(
Stream.of(BROKER_LOGGER_CONFIG_TYPE),
Stream.of(ConfigType.values()).map(ConfigType::value)
).toList();

public static void main(String[] args) {
try {
Expand Down Expand Up @@ -273,7 +272,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
throw new InvalidConfigurationException("Invalid broker logger(s): " + String.join(",", invalidBrokerLoggers));

ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000);
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
List<AlterConfigOp> deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
Collection<AlterConfigOp> alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
Expand Down Expand Up @@ -441,7 +440,7 @@ private static void alterQuotaConfigs(Admin adminClient, List<String> entityType
}
ClientQuotaEntity entity = new ClientQuotaEntity(alterEntityMap);

AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions().validateOnly(false);
AlterClientQuotasOptions alterOptions = new AlterClientQuotasOptions();

List<ClientQuotaAlteration.Op> addOps = configsToBeAddedMap.entrySet().stream()
.map(entry -> {
Expand Down Expand Up @@ -527,48 +526,71 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
}
}

List<String> entities;
Set<String> entities;
if (entityName.isPresent()) {
entities = List.of(entityName.get());
entities = Set.of(entityName.get());
} else {
if (TOPIC_TYPE.equals(entityType)) {
entities = new ArrayList<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get());
entities = new LinkedHashSet<>(adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get());
} else if (BROKER_TYPE.equals(entityType) || BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
List<String> brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream()
Set<String> brokerIds = adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().stream()
.map(Node::idString)
.collect(Collectors.toList());
.collect(Collectors.toCollection(LinkedHashSet::new));
brokerIds.add(BROKER_DEFAULT_ENTITY_NAME);
entities = brokerIds;
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
entities = adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get().stream()
.map(ConfigResource::name)
.toList();
.collect(Collectors.toCollection(LinkedHashSet::new));
} else if (GROUP_TYPE.equals(entityType)) {
Set<String> groupIds = adminClient.listGroups().all().get().stream()
.map(GroupListing::groupId)
.collect(Collectors.toSet());
.collect(Collectors.toCollection(LinkedHashSet::new));
Set<String> groupResources = listGroupConfigResources(adminClient)
.map(resources -> resources.stream()
.map(ConfigResource::name)
.collect(Collectors.toSet()))
.orElse(Set.of());
Set<String> combined = new HashSet<>(groupIds);
.collect(Collectors.toCollection(LinkedHashSet::new)))
.orElseGet(LinkedHashSet::new);
Set<String> combined = new LinkedHashSet<>(groupIds);
combined.addAll(groupResources);
entities = new ArrayList<>(combined);
entities = combined;
} else {
throw new IllegalArgumentException("Invalid entity type: " + entityType);
}
}

if (entities.isEmpty()) {
return;
}

Map<String, DescribeConfigContext> contextsByEntity = new LinkedHashMap<>();
for (String entity : entities) {
contextsByEntity.put(entity, describeConfigContext(entityType, entity));
}

DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(true);
Map<ConfigResource, KafkaFuture<Config>> configs = adminClient.describeConfigs(
contextsByEntity.values().stream()
.map(DescribeConfigContext::configResource)
.toList(),
describeOptions
).values();

for (String entity : entities) {
DescribeConfigContext context = contextsByEntity.get(entity);
if (BROKER_DEFAULT_ENTITY_NAME.equals(entity)) {
System.out.println("Default configs for " + entityType + " in the cluster are:");
} else {
String configSourceStr = describeAll ? "All" : "Dynamic";
String entityTypeSingular = entityType.substring(0, entityType.length() - 1);
System.out.println(configSourceStr + " configs for " + entityTypeSingular + " " + entity + " are:");
}
getResourceConfig(adminClient, entityType, entity, true, describeAll).forEach(entry -> {

Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
? Optional.empty()
: context.dynamicConfigSource();
Config config = configs.get(context.configResource()).get(30, TimeUnit.SECONDS);
filterAndSortEntries(config, configSourceFilter).forEach(entry -> {
String synonyms = entry.synonyms().stream()
.map(synonym -> synonym.source() + ":" + synonym.name() + "=" + synonym.value())
.collect(Collectors.joining(", ", "{", "}"));
Expand All @@ -590,7 +612,7 @@ private static void alterResourceConfig(Admin adminClient, String entityTypeHead
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));

ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000);
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();
List<AlterConfigOp> deleteEntries = configsToBeDeleted.stream().map(k -> new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE)).toList();
Collection<AlterConfigOp> alterEntries = Stream.concat(deleteEntries.stream(), addEntries.stream()).toList();
Expand All @@ -605,7 +627,10 @@ static void validateBrokerId(String entityName, String entityType) {
}
}

private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
private record DescribeConfigContext(ConfigResource configResource, Optional<ConfigEntry.ConfigSource> dynamicConfigSource) {
}

private static DescribeConfigContext describeConfigContext(String entityType, String entityName) {
ConfigResource.Type configResourceType;
Optional<ConfigEntry.ConfigSource> dynamicConfigSource;

Expand All @@ -616,12 +641,11 @@ private static List<ConfigEntry> getResourceConfig(Admin adminClient, String ent
configResourceType = ConfigResource.Type.TOPIC;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
} else if (BROKER_TYPE.equals(entityType)) {
configResourceType = ConfigResource.Type.BROKER;
if (BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
configResourceType = ConfigResource.Type.BROKER;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG);
} else {
validateBrokerId(entityName, entityType);
configResourceType = ConfigResource.Type.BROKER;
dynamicConfigSource = Optional.of(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG);
}
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
Expand All @@ -640,19 +664,28 @@ private static List<ConfigEntry> getResourceConfig(Admin adminClient, String ent
throw new IllegalArgumentException("Invalid entity type: " + entityType);
}

Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll ? Optional.empty() : dynamicConfigSource;

ConfigResource configResource = new ConfigResource(configResourceType, entityName);
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
.all().get(30, TimeUnit.SECONDS);
return new DescribeConfigContext(new ConfigResource(configResourceType, entityName), dynamicConfigSource);
}

return configs.get(configResource).entries().stream()
private static List<ConfigEntry> filterAndSortEntries(Config config, Optional<ConfigEntry.ConfigSource> configSourceFilter) {
return config.entries().stream()
.filter(entry -> configSourceFilter.isEmpty() || entry.source() == configSourceFilter.get())
.sorted(Comparator.comparing(ConfigEntry::name))
.toList();
}

private static List<ConfigEntry> getResourceConfig(Admin adminClient, String entityType, String entityName, boolean includeSynonyms, boolean describeAll) throws ExecutionException, InterruptedException, TimeoutException {
DescribeConfigContext context = describeConfigContext(entityType, entityName);
Optional<ConfigEntry.ConfigSource> configSourceFilter = describeAll
? Optional.empty()
: context.dynamicConfigSource();
DescribeConfigsOptions describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(Collections.singleton(context.configResource()), describeOptions)
.all().get(30, TimeUnit.SECONDS);

return filterAndSortEntries(configs.get(context.configResource()), configSourceFilter);
}

private static void describeQuotaConfigs(Admin adminClient, List<String> entityTypes, List<String> entityNames) throws ExecutionException, InterruptedException, TimeoutException {
Map<ClientQuotaEntity, Map<String, Double>> quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames);
quotaConfigs.forEach((entity, entries) -> {
Expand Down
Loading
Loading