From 3bd18f17adb7b6650c9b77dd4088a088c3263554 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 22 Apr 2026 23:40:57 +0900 Subject: [PATCH] KAFKA-19009: Move MetadataCacheTest to metadata module Signed-off-by: PoAn Yang --- .../unit/kafka/server/KafkaApisTest.scala | 14 +- .../unit/kafka/server/MetadataCacheTest.scala | 1021 ---------------- .../kafka/metadata/MetadataCacheTest.java | 1051 +++++++++++++++++ 3 files changed, 1058 insertions(+), 1028 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3fdcdaf3e4966..d6df28c371cb7 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -84,7 +84,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} -import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, MetadataCache, MockConfigRepository} +import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, MetadataCache, MetadataCacheTest, MockConfigRepository} import org.apache.kafka.network.Session import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig} @@ -4393,7 +4393,7 @@ class KafkaApisTest extends Logging { .setName(plaintextListener.value) ) MetadataCacheTest.updateCache(metadataCache, - Seq(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints)) + util.List.of(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints)) ) // 2. Set up authorizer @@ -4433,7 +4433,7 @@ class KafkaApisTest extends Logging { } val partitionRecords = Seq(authorizedTopicId, unauthorizedTopicId).map(createDummyPartitionRecord) - MetadataCacheTest.updateCache(metadataCache, partitionRecords) + MetadataCacheTest.updateCache(metadataCache, (partitionRecords : Seq[ApiMessage]).asJava) // 4. Send TopicMetadataReq using topicId val metadataReqByTopicId = MetadataRequest.Builder.forTopicIds(util.Set.of(authorizedTopicId, unauthorizedTopicId)).build() @@ -10101,7 +10101,7 @@ class KafkaApisTest extends Logging { ) MetadataCacheTest.updateCache(metadataCache, - Seq(new RegisterBrokerRecord() + util.List.of(new RegisterBrokerRecord() .setBrokerId(brokerId) .setRack("rack") .setFenced(false) @@ -10155,7 +10155,7 @@ class KafkaApisTest extends Logging { ) MetadataCacheTest.updateCache(metadataCache, - Seq(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints0), + util.List.of(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints0), new RegisterBrokerRecord().setBrokerId(1).setRack("rack").setFenced(false).setEndPoints(endpoints1)) ) @@ -10391,12 +10391,12 @@ class KafkaApisTest extends Logging { private def setupBasicMetadataCache(topic: String, numPartitions: Int, numBrokers: Int, topicId: Uuid): Unit = { val updateMetadata = createBasicMetadata(topic, numPartitions, 0, numBrokers, topicId) - MetadataCacheTest.updateCache(metadataCache, updateMetadata) + MetadataCacheTest.updateCache(metadataCache, updateMetadata.asJava) } private def addTopicToMetadataCache(topic: String, numPartitions: Int, numBrokers: Int = 1, topicId: Uuid = Uuid.ZERO_UUID): Unit = { val updateMetadata = createBasicMetadata(topic, numPartitions, 0, numBrokers, topicId) - MetadataCacheTest.updateCache(metadataCache, updateMetadata) + MetadataCacheTest.updateCache(metadataCache, updateMetadata.asJava) } private def createMetadataBroker(brokerId: Int, diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala deleted file mode 100644 index c61397b03183c..0000000000000 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ /dev/null @@ -1,1021 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition -import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection} -import org.apache.kafka.common.metadata._ -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{ApiMessage, Errors} -import org.apache.kafka.common.record.internal.RecordBatch -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid} -import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} -import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState, MetadataCache} -import org.apache.kafka.server.common.KRaftVersion -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import java.util -import java.util.Arrays.asList -import java.util.Collections -import java.util.stream.Collectors -import scala.collection.{Seq, mutable} -import scala.jdk.CollectionConverters._ - -object MetadataCacheTest { - def cacheProvider(): util.stream.Stream[MetadataCache] = - util.stream.Stream.of[MetadataCache]( - new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0) - ) - - def updateCache(cache: MetadataCache, records: Seq[ApiMessage]): Unit = { - cache match { - case c: KRaftMetadataCache => { - val image = c.currentImage() - val partialImage = new MetadataImage( - new MetadataProvenance(100L, 10, 1000L, true), - image.features(), - image.cluster(), - image.topics(), - image.configs(), - image.clientQuotas(), - image.producerIds(), - image.acls(), - image.scram(), - image.delegationTokens()) - val delta = new MetadataDelta.Builder().setImage(partialImage).build() - records.foreach(record => delta.replay(record)) - c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true))) - } - case _ => throw new RuntimeException("Unsupported cache type") - } - } -} - -class MetadataCacheTest { - val brokerEpoch = 0L - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataNonExistingTopics(cache: MetadataCache): Unit = { - val topic = "topic" - val topicMetadata = cache.getTopicMetadata(util.Set.of(topic), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false) - assertTrue(topicMetadata.isEmpty) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadata(cache: MetadataCache): Unit = { - val topic0 = "topic-0" - val topic1 = "topic-1" - - val topicIds = new util.HashMap[String, Uuid]() - topicIds.put(topic0, Uuid.randomUuid()) - topicIds.put(topic1, Uuid.randomUuid()) - - def endpoints(brokerId: Int): BrokerEndpointCollection = { - val host = s"foo-$brokerId" - new BrokerEndpointCollection(Seq( - new BrokerEndpoint() - .setHost(host) - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value), - new BrokerEndpoint() - .setHost(host) - .setPort(9093) - .setSecurityProtocol(SecurityProtocol.SSL.id) - .setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value) - ).iterator.asJava) - } - - val brokers = (0 to 4).map { brokerId => - new RegisterBrokerRecord() - .setBrokerId(brokerId) - .setEndPoints(endpoints(brokerId)) - .setRack("rack1") - } - - val topic0Record = new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)) - val topic1Record = new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1)) - - val partitionStates = Seq( - new PartitionRecord() - .setTopicId(topicIds.get(topic0)) - .setPartitionId(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(asList(0, 1, 3)) - .setReplicas(asList(0, 1, 3)), - new PartitionRecord() - .setTopicId(topicIds.get(topic0)) - .setPartitionId(1) - .setLeader(1) - .setLeaderEpoch(1) - .setIsr(asList(1, 0)) - .setReplicas(asList(1, 2, 0, 4)), - new PartitionRecord() - .setTopicId(topicIds.get(topic1)) - .setPartitionId(0) - .setLeader(2) - .setLeaderEpoch(2) - .setIsr(asList(2, 1)) - .setReplicas(asList(2, 1, 3))) - MetadataCacheTest.updateCache(cache, brokers ++ Seq(topic0Record, topic1Record) ++ partitionStates) - - for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - - def checkTopicMetadata(topic: String): Unit = { - val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), listenerName, false, false).asScala - assertEquals(1, topicMetadatas.size) - - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code, topicMetadata.errorCode) - assertEquals(topic, topicMetadata.name) - assertEquals(topicIds.get(topic), topicMetadata.topicId()) - - val topicPartitionStates = partitionStates.filter { ps => ps.topicId == topicIds.get(topic) } - val partitionMetadatas = topicMetadata.partitions.asScala.sortBy(_.partitionIndex) - assertEquals(topicPartitionStates.size, partitionMetadatas.size, s"Unexpected partition count for topic $topic") - - partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, partitionId) => - assertEquals(Errors.NONE.code, partitionMetadata.errorCode) - assertEquals(partitionId, partitionMetadata.partitionIndex) - val partitionState = topicPartitionStates.find(_.partitionId == partitionId).getOrElse( - fail(s"Unable to find partition state for partition $partitionId")) - assertEquals(partitionState.leader, partitionMetadata.leaderId) - assertEquals(partitionState.leaderEpoch, partitionMetadata.leaderEpoch) - assertEquals(partitionState.isr, partitionMetadata.isrNodes) - assertEquals(partitionState.replicas, partitionMetadata.replicaNodes) - } - } - - checkTopicMetadata(topic0) - checkTopicMetadata(topic1) - } - - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataPartitionLeaderNotAvailable(cache: MetadataCache): Unit = { - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val brokers = Seq(new RegisterBrokerRecord() - .setBrokerId(0) - .setFenced(false) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(listenerName.value) - ).iterator.asJava))) - - // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata version. - verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, listenerName, - leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) - verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, listenerName, - leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = true) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataPartitionListenerNotAvailableOnLeader(cache: MetadataCache): Unit = { - // when listener name is not present in the metadata cache for a broker, getTopicMetadata should - // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and new versions respectively. - val plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) - val sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL) - val broker0Endpoints = new BrokerEndpointCollection(Seq( - new BrokerEndpoint() - .setHost("host0") - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setName(plaintextListenerName.value), - new BrokerEndpoint() - .setHost("host0") - .setPort(9093) - .setSecurityProtocol(SecurityProtocol.SSL.id) - .setName(sslListenerName.value) - ).iterator.asJava) - - val broker1Endpoints = new BrokerEndpointCollection(Seq( - new BrokerEndpoint() - .setHost("host1") - .setPort(9092) - .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) - .setName(plaintextListenerName.value) - ).iterator.asJava) - - val brokers = Seq( - new RegisterBrokerRecord() - .setBrokerId(0) - .setFenced(false) - .setEndPoints(broker0Endpoints), - new RegisterBrokerRecord() - .setBrokerId(1) - .setFenced(false) - .setEndPoints(broker1Endpoints)) - - // leader available in cache but listener name not present. expect LISTENER_NOT_FOUND error for new metadata version - verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, sslListenerName, - leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true) - // leader available in cache but listener name not present. expect LEADER_NOT_AVAILABLE error for old metadata version - verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, sslListenerName, - leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = false) - } - - private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache: MetadataCache, - brokers: Seq[RegisterBrokerRecord], - listenerName: ListenerName, - leader: Int, - expectedError: Errors, - errorUnavailableListeners: Boolean): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - val topicRecords = Seq(new TopicRecord().setName(topic).setTopicId(topicId)) - - val leaderEpoch = 1 - val partitionEpoch = 3 - val partitionStates = Seq(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(0) - .setPartitionEpoch(partitionEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(asList(0)) - .setReplicas(asList(0))) - MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ partitionStates) - - val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), listenerName, false, errorUnavailableListeners).asScala - assertEquals(1, topicMetadatas.size) - - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code, topicMetadata.errorCode) - - val partitionMetadatas = topicMetadata.partitions - assertEquals(1, partitionMetadatas.size) - - val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex) - assertEquals(expectedError.code, partitionMetadata.errorCode) - assertFalse(partitionMetadata.isrNodes.isEmpty) - assertEquals(util.List.of(0), partitionMetadata.replicaNodes) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataReplicaNotAvailable(cache: MetadataCache): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - - val partitionEpoch = 3 - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val endPoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(listenerName.value) - ).iterator.asJava) - - val brokers = Seq(new RegisterBrokerRecord() - .setBrokerId(0) - .setFenced(false) - .setEndPoints(endPoints)) - - val topicRecords = Seq(new TopicRecord() - .setName(topic) - .setTopicId(topicId)) - // replica 1 is not available - val leader = 0 - val leaderEpoch = 0 - val replicas = asList[Integer](0, 1) - val isr = asList[Integer](0) - - val partitionStates = Seq( - new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(0) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setPartitionEpoch(partitionEpoch) - .setReplicas(replicas)) - MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ partitionStates) - - // Validate errorUnavailableEndpoints = false - val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), listenerName, false, false).asScala - assertEquals(1, topicMetadatas.size) - - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode) - - val partitionMetadatas = topicMetadata.partitions - assertEquals(1, partitionMetadatas.size) - - val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex) - assertEquals(Errors.NONE.code, partitionMetadata.errorCode) - assertEquals(Set(0, 1), partitionMetadata.replicaNodes.asScala.toSet) - assertEquals(Set(0), partitionMetadata.isrNodes.asScala.toSet) - - // Validate errorUnavailableEndpoints = true - val topicMetadatasWithError = cache.getTopicMetadata(util.Set.of(topic), listenerName, true, false).asScala - assertEquals(1, topicMetadatasWithError.size) - - val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode) - - val partitionMetadatasWithError = topicMetadataWithError.partitions() - assertEquals(1, partitionMetadatasWithError.size) - - val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partitionIndex) - assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode) - assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataIsrNotAvailable(cache: MetadataCache): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - - val endpoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(listenerName.value) - ).iterator.asJava) - - val brokers = Seq(new RegisterBrokerRecord() - .setBrokerId(0) - .setRack("rack1") - .setFenced(false) - .setEndPoints(endpoints)) - - val topicRecords = Seq(new TopicRecord() - .setName(topic) - .setTopicId(topicId)) - - // replica 1 is not available - val leader = 0 - val leaderEpoch = 0 - val replicas = asList[Integer](0) - val isr = asList[Integer](0, 1) - - val partitionStates = Seq(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(0) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setReplicas(replicas)) - MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ partitionStates) - - // Validate errorUnavailableEndpoints = false - val topicMetadatas = cache.getTopicMetadata(util.Set.of(topic), listenerName, false, false).asScala - assertEquals(1, topicMetadatas.size) - - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE.code(), topicMetadata.errorCode) - - val partitionMetadatas = topicMetadata.partitions - assertEquals(1, partitionMetadatas.size) - - val partitionMetadata = partitionMetadatas.get(0) - assertEquals(0, partitionMetadata.partitionIndex) - assertEquals(Errors.NONE.code, partitionMetadata.errorCode) - assertEquals(Set(0), partitionMetadata.replicaNodes.asScala.toSet) - assertEquals(Set(0, 1), partitionMetadata.isrNodes.asScala.toSet) - - // Validate errorUnavailableEndpoints = true - val topicMetadatasWithError = cache.getTopicMetadata(util.Set.of(topic), listenerName, true, false).asScala - assertEquals(1, topicMetadatasWithError.size) - - val topicMetadataWithError = topicMetadatasWithError.head - assertEquals(Errors.NONE.code, topicMetadataWithError.errorCode) - - val partitionMetadatasWithError = topicMetadataWithError.partitions - assertEquals(1, partitionMetadatasWithError.size) - - val partitionMetadataWithError = partitionMetadatasWithError.get(0) - assertEquals(0, partitionMetadataWithError.partitionIndex) - assertEquals(Errors.REPLICA_NOT_AVAILABLE.code, partitionMetadataWithError.errorCode) - assertEquals(Set(0), partitionMetadataWithError.replicaNodes.asScala.toSet) - assertEquals(Set(0), partitionMetadataWithError.isrNodes.asScala.toSet) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getTopicMetadataWithNonSupportedSecurityProtocol(cache: MetadataCache): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - val securityProtocol = SecurityProtocol.PLAINTEXT - - val brokers = new RegisterBrokerRecord() - .setBrokerId(0) - .setRack("") - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(ListenerName.forSecurityProtocol(securityProtocol).value) - ).iterator.asJava)) - - val topicRecord = new TopicRecord().setName(topic).setTopicId(topicId) - - val leader = 0 - val leaderEpoch = 0 - val replicas = asList[Integer](0) - val isr = asList[Integer](0, 1) - val partitionStates = Seq(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(0) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setReplicas(replicas)) - MetadataCacheTest.updateCache(cache, Seq(brokers, topicRecord) ++ partitionStates) - - val topicMetadata = cache.getTopicMetadata(util.Set.of(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL), false, false).asScala - assertEquals(1, topicMetadata.size) - assertEquals(1, topicMetadata.head.partitions.size) - assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.head.partitions.get(0).leaderId) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def getAliveBrokersShouldNotBeMutatedByUpdateCache(cache: MetadataCache): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - val topicRecords = Seq(new TopicRecord().setName(topic).setTopicId(topicId)) - - def updateCache(brokerIds: Seq[Int]): Unit = { - val brokers = brokerIds.map { brokerId => - val securityProtocol = SecurityProtocol.PLAINTEXT - new RegisterBrokerRecord() - .setBrokerId(brokerId) - .setRack("") - .setFenced(false) - .setBrokerEpoch(brokerEpoch) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(ListenerName.forSecurityProtocol(securityProtocol).value) - ).iterator.asJava)) - } - val leader = 0 - val leaderEpoch = 0 - val replicas = asList[Integer](0) - val isr = asList[Integer](0, 1) - val partitionStates = Seq(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(0) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setReplicas(replicas)) - - MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ partitionStates) - } - - val initialBrokerIds = (0 to 2) - updateCache(initialBrokerIds) - // This should not change `aliveBrokersFromCache` - updateCache((0 to 3)) - initialBrokerIds.foreach { brokerId => - assertTrue(cache.hasAliveBroker(brokerId)) - } - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def testGetPartitionReplicaEndpoints(cache: MetadataCache): Unit = { - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - - // Set up broker data for the metadata cache - val numBrokers = 10 - val fencedBrokerId = numBrokers / 3 - val brokerRecords = (0 until numBrokers).map { brokerId => - new RegisterBrokerRecord() - .setBrokerId(brokerId) - .setFenced(brokerId == fencedBrokerId) - .setRack("rack" + (brokerId % 3)) - .setEndPoints(new BrokerEndpointCollection( - Seq(new BrokerEndpoint() - .setHost("foo" + brokerId) - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(listenerName.value) - ).iterator.asJava)) - } - - // Set up a single topic (with many partitions) for the metadata cache - val topic = "many-partitions-topic" - val topicId = Uuid.randomUuid() - val topicRecords = Seq[ApiMessage](new TopicRecord().setName(topic).setTopicId(topicId)) - - // Set up a number of partitions such that each different combination of - // $replicationFactor brokers is made a replica set for exactly one partition - val replicationFactor = 3 - val replicaSets = getAllReplicaSets(numBrokers, replicationFactor) - val numPartitions = replicaSets.length - val partitionRecords = (0 until numPartitions).map { partitionId => - val replicas = replicaSets(partitionId) - val nonFencedReplicas = replicas.stream().filter(id => id != fencedBrokerId).collect(Collectors.toList()) - new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(partitionId) - .setReplicas(replicas) - .setLeader(replicas.get(0)) - .setIsr(nonFencedReplicas) - .setEligibleLeaderReplicas(nonFencedReplicas) - } - - // Load the prepared data in the metadata cache - MetadataCacheTest.updateCache(cache, brokerRecords ++ topicRecords ++ partitionRecords) - - (0 until numPartitions).foreach { partitionId => - val tp = new TopicPartition(topic, partitionId) - val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName).asScala - val replicaSet = brokerIdToNodeMap.keySet - val expectedReplicaSet = partitionRecords(partitionId).replicas().asScala.toSet - // Verify that we have endpoints for exactly the non-fenced brokers of the replica set - if (expectedReplicaSet.contains(fencedBrokerId)) { - assertEquals(expectedReplicaSet, - replicaSet + fencedBrokerId, - s"Unexpected partial replica set for partition $partitionId") - } else { - assertEquals(expectedReplicaSet, - replicaSet, - s"Unexpected replica set for partition $partitionId") - } - // Verify that the endpoint data for each non-fenced replica is as expected - replicaSet.foreach { brokerId => - val brokerNode = - brokerIdToNodeMap.getOrElse( - brokerId, fail(s"No brokerNode for broker $brokerId and partition $partitionId")) - val expectedBroker = brokerRecords(brokerId) - val expectedEndpoint = expectedBroker.endPoints().find(listenerName.value()) - assertEquals(expectedEndpoint.host(), - brokerNode.host(), - s"Unexpected host for broker $brokerId and partition $partitionId") - assertEquals(expectedEndpoint.port(), - brokerNode.port(), - s"Unexpected port for broker $brokerId and partition $partitionId") - assertEquals(expectedBroker.rack(), - brokerNode.rack(), - s"Unexpected rack for broker $brokerId and partition $partitionId") - } - } - - val tp = new TopicPartition(topic, numPartitions) - val brokerIdToNodeMap = cache.getPartitionReplicaEndpoints(tp, listenerName) - assertTrue(brokerIdToNodeMap.isEmpty) - } - - private def getAllReplicaSets(numBrokers: Int, - replicationFactor: Int): Array[util.List[Integer]] = { - (0 until numBrokers) - .combinations(replicationFactor) - .map(replicaSet => replicaSet.map(Integer.valueOf).toList.asJava) - .toArray - } - - @Test - def testIsBrokerFenced(): Unit = { - val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) - - val delta = new MetadataDelta.Builder().build() - delta.replay(new RegisterBrokerRecord() - .setBrokerId(0) - .setFenced(false)) - - metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)) - - assertFalse(metadataCache.isBrokerFenced(0)) - - delta.replay(new BrokerRegistrationChangeRecord() - .setBrokerId(0) - .setFenced(1.toByte)) - - metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)) - - assertTrue(metadataCache.isBrokerFenced(0)) - } - - @Test - def testIsBrokerInControlledShutdown(): Unit = { - val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) - - val delta = new MetadataDelta.Builder().build() - delta.replay(new RegisterBrokerRecord() - .setBrokerId(0) - .setInControlledShutdown(false)) - - metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)) - - assertFalse(metadataCache.isBrokerShuttingDown(0)) - - delta.replay(new BrokerRegistrationChangeRecord() - .setBrokerId(0) - .setInControlledShutdown(1.toByte)) - - metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)) - - assertTrue(metadataCache.isBrokerShuttingDown(0)) - } - - @Test - def testGetLiveBrokerEpoch(): Unit = { - val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) - - val delta = new MetadataDelta.Builder().build() - delta.replay(new RegisterBrokerRecord() - .setBrokerId(0) - .setBrokerEpoch(100) - .setFenced(false)) - - delta.replay(new RegisterBrokerRecord() - .setBrokerId(1) - .setBrokerEpoch(101) - .setFenced(true)) - - metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)) - - assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).orElse(-1L)) - assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).orElse(-1L)) - } - - @Test - def testDescribeTopicResponse(): Unit = { - val metadataCache = new KRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0) - - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val topic0 = "test0" - val topic1 = "test1" - - val topicIds = new util.HashMap[String, Uuid]() - topicIds.put(topic0, Uuid.randomUuid()) - topicIds.put(topic1, Uuid.randomUuid()) - - val partitionMap = Map[(String, Int), PartitionRecord]( - (topic0, 0) -> new PartitionRecord() - .setTopicId(topicIds.get(topic0)) - .setPartitionId(0) - .setReplicas(asList(0, 1, 2)) - .setLeader(0) - .setIsr(asList(0)) - .setEligibleLeaderReplicas(asList(1)) - .setLastKnownElr(asList(2)) - .setLeaderEpoch(0) - .setPartitionEpoch(1) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), - (topic0, 2) -> new PartitionRecord() - .setTopicId(topicIds.get(topic0)) - .setPartitionId(2) - .setReplicas(asList(0, 2, 3)) - .setLeader(3) - .setIsr(asList(3)) - .setEligibleLeaderReplicas(asList(2)) - .setLastKnownElr(asList(0)) - .setLeaderEpoch(1) - .setPartitionEpoch(2) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), - (topic0, 1) -> new PartitionRecord() - .setTopicId(topicIds.get(topic0)) - .setPartitionId(1) - .setReplicas(asList(0, 1, 3)) - .setLeader(0) - .setIsr(asList(0)) - .setEligibleLeaderReplicas(asList(1)) - .setLastKnownElr(asList(3)) - .setLeaderEpoch(0) - .setPartitionEpoch(2) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), - (topic1, 0) -> new PartitionRecord() - .setTopicId(topicIds.get(topic1)) - .setPartitionId(0) - .setReplicas(asList(0, 1, 2)) - .setLeader(2) - .setIsr(asList(2)) - .setEligibleLeaderReplicas(asList(1)) - .setLastKnownElr(asList(0)) - .setLeaderEpoch(10) - .setPartitionEpoch(11) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()), - ) - new BrokerEndpointCollection() - val brokers = Seq( - new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(0) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint().setHost("foo0").setPort(9092) - .setSecurityProtocol(securityProtocol.id).setName(listenerName.value) - ).iterator.asJava)), - new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(1) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint().setHost("foo1").setPort(9093) - .setSecurityProtocol(securityProtocol.id).setName(listenerName.value) - ).iterator.asJava)), - new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(2) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint().setHost("foo2").setPort(9094) - .setSecurityProtocol(securityProtocol.id).setName(listenerName.value) - ).iterator.asJava)), - new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(3) - .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint().setHost("foo3").setPort(9095) - .setSecurityProtocol(securityProtocol.id).setName(listenerName.value) - ).iterator.asJava)), - ) - - var recordSeq = Seq[ApiMessage]( - new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)), - new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1)) - ) - recordSeq = recordSeq ++ partitionMap.values.toSeq - MetadataCacheTest.updateCache(metadataCache, brokers ++ recordSeq) - - def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions: mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = { - partitions.foreach(partition => { - val partitionId = partition.partitionIndex() - assertTrue(partitionIds.contains(partitionId)) - val expectedPartition = partitionMap.get((topic, partitionId)).get - assertEquals(0, partition.errorCode()) - assertEquals(expectedPartition.leaderEpoch(), partition.leaderEpoch()) - assertEquals(expectedPartition.partitionId(), partition.partitionIndex()) - assertEquals(expectedPartition.eligibleLeaderReplicas(), partition.eligibleLeaderReplicas()) - assertEquals(expectedPartition.isr(), partition.isrNodes()) - assertEquals(expectedPartition.lastKnownElr(), partition.lastKnownElr()) - assertEquals(expectedPartition.leader(), partition.leaderId()) - }) - } - - // Basic test - var result = metadataCache.describeTopicResponse(util.List.of(topic0, topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList - assertEquals(2, result.size) - var resultTopic = result(0) - assertEquals(topic0, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic0), resultTopic.topicId()) - assertEquals(3, resultTopic.partitions().size()) - checkTopicMetadata(topic0, Set(0, 1, 2), resultTopic.partitions().asScala) - - resultTopic = result(1) - assertEquals(topic1, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic1), resultTopic.topicId()) - assertEquals(1, resultTopic.partitions().size()) - checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala) - - // Quota reached - var response = metadataCache.describeTopicResponse(util.List.of(topic0, topic1).iterator, listenerName, _ => 0, 2, false) - result = response.topics().asScala.toList - assertEquals(1, result.size) - resultTopic = result(0) - assertEquals(topic0, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic0), resultTopic.topicId()) - assertEquals(2, resultTopic.partitions().size()) - checkTopicMetadata(topic0, Set(0, 1), resultTopic.partitions().asScala) - assertEquals(topic0, response.nextCursor().topicName()) - assertEquals(2, response.nextCursor().partitionIndex()) - - // With start index - result = metadataCache.describeTopicResponse(util.List.of(topic0).iterator, listenerName, t => if (t.equals(topic0)) 1 else 0, 10, false).topics().asScala.toList - assertEquals(1, result.size) - resultTopic = result(0) - assertEquals(topic0, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic0), resultTopic.topicId()) - assertEquals(2, resultTopic.partitions().size()) - checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala) - - // With start index and quota reached - response = metadataCache.describeTopicResponse(util.List.of(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false) - result = response.topics().asScala.toList - assertEquals(1, result.size) - - resultTopic = result(0) - assertEquals(topic0, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic0), resultTopic.topicId()) - assertEquals(1, resultTopic.partitions().size()) - checkTopicMetadata(topic0, Set(2), resultTopic.partitions().asScala) - assertEquals(topic1, response.nextCursor().topicName()) - assertEquals(0, response.nextCursor().partitionIndex()) - - // When the first topic does not exist - result = metadataCache.describeTopicResponse(util.List.of("Non-exist", topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, false).topics().asScala.toList - assertEquals(2, result.size) - resultTopic = result(0) - assertEquals("Non-exist", resultTopic.name()) - assertEquals(3, resultTopic.errorCode()) - - resultTopic = result(1) - assertEquals(topic0, resultTopic.name()) - assertEquals(0, resultTopic.errorCode()) - assertEquals(topicIds.get(topic0), resultTopic.topicId()) - assertEquals(1, resultTopic.partitions().size()) - checkTopicMetadata(topic0, Set(0), resultTopic.partitions().asScala) - } - - @ParameterizedTest - @MethodSource(Array("cacheProvider")) - def testGetLeaderAndIsr(cache: MetadataCache): Unit = { - val topic = "topic" - val topicId = Uuid.randomUuid() - val partitionIndex = 0 - val leader = 0 - val leaderEpoch = 0 - val isr = asList[Integer](2, 3, 0) - val replicas = asList[Integer](2, 3, 0, 1, 4) - - val topicRecords = Seq(new TopicRecord().setName(topic).setTopicId(topicId)) - - val partitionStates = Seq(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(partitionIndex) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setReplicas(replicas)) - - val securityProtocol = SecurityProtocol.PLAINTEXT - val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - val brokers = Seq(new RegisterBrokerRecord() - .setBrokerId(0) - .setBrokerEpoch(brokerEpoch) - .setRack("rack1") - .setEndPoints(new BrokerEndpointCollection( - Seq(new BrokerEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setName(listenerName.value) - ).iterator.asJava))) - - MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ partitionStates) - - val leaderAndIsr = cache.getLeaderAndIsr(topic, partitionIndex) - assertEquals(util.Optional.of(leader), leaderAndIsr.map(_.leader())) - assertEquals(util.Optional.of(leaderEpoch), leaderAndIsr.map(_.leaderEpoch())) - assertEquals(util.Optional.of(util.Set.copyOf(isr)), leaderAndIsr.map(_.isr())) - assertEquals(util.Optional.of(-1), leaderAndIsr.map(_.partitionEpoch())) - assertEquals(util.Optional.of(LeaderRecoveryState.RECOVERED), leaderAndIsr.map(_.leaderRecoveryState())) - } - - @Test - def testGetOfflineReplicasConsidersDirAssignment(): Unit = { - case class Broker(id: Int, dirs: util.List[Uuid]) - case class Partition(id: Int, replicas: util.List[Integer], dirs: util.List[Uuid]) - - def offlinePartitions(brokers: Seq[Broker], partitions: Seq[Partition]): Map[Int, util.List[Integer]] = { - val delta = new MetadataDelta.Builder().build() - brokers.foreach(broker => delta.replay( - new RegisterBrokerRecord().setFenced(false). - setBrokerId(broker.id).setLogDirs(broker.dirs). - setEndPoints(new BrokerEndpointCollection(Collections.singleton( - new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). - setPort(9093.toShort).setName("PLAINTEXT").setHost(s"broker-${broker.id}")))))) - val topicId = Uuid.fromString("95OVr1IPRYGrcNCLlpImCA") - delta.replay(new TopicRecord().setTopicId(topicId).setName("foo")) - partitions.foreach(partition => delta.replay( - new PartitionRecord().setTopicId(topicId).setPartitionId(partition.id). - setReplicas(partition.replicas).setDirectories(partition.dirs). - setLeader(partition.replicas.get(0)).setIsr(partition.replicas))) - val cache = new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0) - cache.setImage(delta.apply(MetadataProvenance.EMPTY)) - val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false).asScala.head - topicMetadata.partitions().asScala.map(p => (p.partitionIndex(), p.offlineReplicas())).toMap - } - - val brokers = Seq( - Broker(0, asList(Uuid.fromString("broker1logdirjEo71BG0w"))), - Broker(1, asList(Uuid.fromString("broker2logdirRmQQgLxgw"))) - ) - val partitions = Seq( - Partition(0, asList(0, 1), asList(Uuid.fromString("broker1logdirjEo71BG0w"), DirectoryId.LOST)), - Partition(1, asList(0, 1), asList(Uuid.fromString("unknownlogdirjEo71BG0w"), DirectoryId.UNASSIGNED)), - Partition(2, asList(0, 1), asList(DirectoryId.MIGRATING, Uuid.fromString("broker2logdirRmQQgLxgw"))) - ) - assertEquals(Map( - 0 -> asList(1), - 1 -> asList(0), - 2 -> asList(), - ), offlinePartitions(brokers, partitions)) - } - - - val oldRequestControllerEpoch: Int = 122 - val newRequestControllerEpoch: Int = 123 - - val fooTopicName: String = "foo" - val fooTopicId: Uuid = Uuid.fromString("HDceyWK0Ry-j3XLR8DvvGA") - val oldFooPart0 = new PartitionRecord(). - setTopicId(fooTopicId). - setPartitionId(0). - setLeader(4). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val newFooPart0 = new PartitionRecord(). - setTopicId(fooTopicId). - setPartitionId(0). - setLeader(5). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val oldFooPart1 = new PartitionRecord(). - setTopicId(fooTopicId). - setPartitionId(1). - setLeader(5). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val newFooPart1 = new PartitionRecord(). - setTopicId(fooTopicId). - setPartitionId(1). - setLeader(5). - setIsr(java.util.Arrays.asList(4, 5)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val barTopicName: String = "bar" - val barTopicId: Uuid = Uuid.fromString("97FBD1g4QyyNNZNY94bkRA") - val recreatedBarTopicId: Uuid = Uuid.fromString("lZokxuaPRty7c5P4dNdTYA") - val oldBarPart0 = new PartitionRecord(). - setTopicId(fooTopicId). - setPartitionId(0). - setLeader(7). - setIsr(java.util.Arrays.asList(7, 8)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) - val newBarPart0 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(0). - setLeader(7). - setIsr(java.util.Arrays.asList(7, 8)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) - val deletedBarPart0 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(0). - setLeader(-2). - setIsr(java.util.Arrays.asList(7, 8)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) - val oldBarPart1 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(1). - setLeader(5). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val newBarPart1 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(1). - setLeader(5). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - val deletedBarPart1 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(1). - setLeader(-2). - setIsr(java.util.Arrays.asList(4, 5, 6)). - setReplicas(java.util.Arrays.asList(4, 5, 6)) - - val oldBarPart2 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(2). - setLeader(9). - setIsr(java.util.Arrays.asList(7, 8, 9)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) - - val newBarPart2 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(2). - setLeader(8). - setIsr(java.util.Arrays.asList(7, 8)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) - - val deletedBarPart2 = new PartitionRecord(). - setTopicId(barTopicId). - setPartitionId(2). - setLeader(-2). - setIsr(java.util.Arrays.asList(7, 8, 9)). - setReplicas(java.util.Arrays.asList(7, 8, 9)) -} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java new file mode 100644 index 0000000000000..a796682d7b351 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataCacheTest.java @@ -0,0 +1,1051 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.common.DirectoryId; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; +import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.internal.RecordBatch; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.server.common.KRaftVersion; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MetadataCacheTest { + + protected final long brokerEpoch = 0L; + + public static Stream cacheProvider() { + return Stream.of(new KRaftMetadataCache(1, () -> KRaftVersion.KRAFT_VERSION_0)); + } + + public static void updateCache(MetadataCache cache, List records) { + if (cache instanceof KRaftMetadataCache) { + KRaftMetadataCache c = (KRaftMetadataCache) cache; + MetadataImage image = c.currentImage(); + MetadataImage partialImage = new MetadataImage( + new MetadataProvenance(100L, 10, 1000L, true), + image.features(), + image.cluster(), + image.topics(), + image.configs(), + image.clientQuotas(), + image.producerIds(), + image.acls(), + image.scram(), + image.delegationTokens() + ); + MetadataDelta delta = new MetadataDelta.Builder().setImage(partialImage).build(); + for (ApiMessage record : records) { + delta.replay(record); + } + c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true))); + } else { + throw new RuntimeException("Unsupported cache type"); + } + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataNonExistingTopics(MetadataCache cache) { + String topic = "topic"; + List topicMetadata = cache.getTopicMetadata( + Set.of(topic), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false); + assertTrue(topicMetadata.isEmpty()); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadata(MetadataCache cache) { + String topic0 = "topic-0"; + String topic1 = "topic-1"; + + Map topicIds = new HashMap<>(); + topicIds.put(topic0, Uuid.randomUuid()); + topicIds.put(topic1, Uuid.randomUuid()); + + List partitionStates = List.of( + new PartitionRecord() + .setTopicId(topicIds.get(topic0)) + .setPartitionId(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(List.of(0, 1, 3)) + .setReplicas(List.of(0, 1, 3)), + new PartitionRecord() + .setTopicId(topicIds.get(topic0)) + .setPartitionId(1) + .setLeader(1) + .setLeaderEpoch(1) + .setIsr(List.of(1, 0)) + .setReplicas(List.of(1, 2, 0, 4)), + new PartitionRecord() + .setTopicId(topicIds.get(topic1)) + .setPartitionId(0) + .setLeader(2) + .setLeaderEpoch(2) + .setIsr(List.of(2, 1)) + .setReplicas(List.of(2, 1, 3)) + ); + + List records = new ArrayList<>(); + for (int brokerId = 0; brokerId <= 4; brokerId++) { + String host = "foo-" + brokerId; + BrokerEndpointCollection endpoints = new BrokerEndpointCollection(List.of( + new BrokerEndpoint() + .setHost(host) + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()), + new BrokerEndpoint() + .setHost(host) + .setPort(9093) + .setSecurityProtocol(SecurityProtocol.SSL.id) + .setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value()) + ).iterator()); + records.add(new RegisterBrokerRecord() + .setBrokerId(brokerId) + .setEndPoints(endpoints) + .setRack("rack1")); + } + records.add(new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0))); + records.add(new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))); + records.addAll(partitionStates); + updateCache(cache, records); + + for (SecurityProtocol securityProtocol : new SecurityProtocol[]{SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL}) { + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + + for (String topic : new String[]{topic0, topic1}) { + List topicMetadataList = + cache.getTopicMetadata(Set.of(topic), listenerName, false, false); + assertEquals(1, topicMetadataList.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadata = topicMetadataList.get(0); + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()); + assertEquals(topic, topicMetadata.name()); + assertEquals(topicIds.get(topic), topicMetadata.topicId()); + + List topicPartitionStates = new ArrayList<>(); + for (PartitionRecord ps : partitionStates) { + if (ps.topicId().equals(topicIds.get(topic))) { + topicPartitionStates.add(ps); + } + } + + List partitionMetadatas = + new ArrayList<>(topicMetadata.partitions()); + partitionMetadatas.sort((a, b) -> Integer.compare(a.partitionIndex(), b.partitionIndex())); + assertEquals(topicPartitionStates.size(), partitionMetadatas.size(), + "Unexpected partition count for topic " + topic); + + for (int i = 0; i < partitionMetadatas.size(); i++) { + MetadataResponseData.MetadataResponsePartition partitionMetadata = partitionMetadatas.get(i); + int partitionId = i; + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()); + assertEquals(partitionId, partitionMetadata.partitionIndex()); + PartitionRecord partitionState = topicPartitionStates.stream() + .filter(ps -> ps.partitionId() == partitionId) + .findFirst() + .orElseThrow(() -> new AssertionError("Unable to find partition state for partition " + partitionId)); + assertEquals(partitionState.leader(), partitionMetadata.leaderId()); + assertEquals(partitionState.leaderEpoch(), partitionMetadata.leaderEpoch()); + assertEquals(partitionState.isr(), partitionMetadata.isrNodes()); + assertEquals(partitionState.replicas(), partitionMetadata.replicaNodes()); + } + } + } + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataPartitionLeaderNotAvailable(MetadataCache cache) { + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + List brokers = Collections.singletonList( + new RegisterBrokerRecord() + .setBrokerId(0) + .setFenced(false) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value()) + ).iterator())) + ); + + // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata version. + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, listenerName, + 1, Errors.LEADER_NOT_AVAILABLE, false); + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, listenerName, + 1, Errors.LEADER_NOT_AVAILABLE, true); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataPartitionListenerNotAvailableOnLeader(MetadataCache cache) { + // when listener name is not present in the metadata cache for a broker, getTopicMetadata should + // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and new versions respectively. + ListenerName plaintextListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT); + ListenerName sslListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.SSL); + BrokerEndpointCollection broker0Endpoints = new BrokerEndpointCollection(List.of( + new BrokerEndpoint() + .setHost("host0") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setName(plaintextListenerName.value()), + new BrokerEndpoint() + .setHost("host0") + .setPort(9093) + .setSecurityProtocol(SecurityProtocol.SSL.id) + .setName(sslListenerName.value()) + ).iterator()); + + BrokerEndpointCollection broker1Endpoints = new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("host1") + .setPort(9092) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setName(plaintextListenerName.value()) + ).iterator()); + + List brokers = List.of( + new RegisterBrokerRecord() + .setBrokerId(0) + .setFenced(false) + .setEndPoints(broker0Endpoints), + new RegisterBrokerRecord() + .setBrokerId(1) + .setFenced(false) + .setEndPoints(broker1Endpoints) + ); + + // leader available in cache but listener name not present. expect LISTENER_NOT_FOUND error for new metadata version + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, sslListenerName, + 1, Errors.LISTENER_NOT_FOUND, true); + // leader available in cache but listener name not present. expect LEADER_NOT_AVAILABLE error for old metadata version + verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, sslListenerName, + 1, Errors.LEADER_NOT_AVAILABLE, false); + } + + private void verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable( + MetadataCache cache, + List brokers, + ListenerName listenerName, + int leader, + Errors expectedError, + boolean errorUnavailableListeners + ) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + List records = new ArrayList<>(brokers); + records.add(new TopicRecord().setName(topic).setTopicId(topicId)); + + int leaderEpoch = 1; + int partitionEpoch = 3; + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setPartitionEpoch(partitionEpoch) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(List.of(0)) + .setReplicas(List.of(0))); + updateCache(cache, records); + + List topicMetadataList = + cache.getTopicMetadata(Set.of(topic), listenerName, false, errorUnavailableListeners); + assertEquals(1, topicMetadataList.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadata = topicMetadataList.get(0); + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()); + + List partitionMetadatas = topicMetadata.partitions(); + assertEquals(1, partitionMetadatas.size()); + + MetadataResponseData.MetadataResponsePartition partitionMetadata = partitionMetadatas.get(0); + assertEquals(0, partitionMetadata.partitionIndex()); + assertEquals(expectedError.code(), partitionMetadata.errorCode()); + assertFalse(partitionMetadata.isrNodes().isEmpty()); + assertEquals(List.of(0), partitionMetadata.replicaNodes()); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataReplicaNotAvailable(MetadataCache cache) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + + int partitionEpoch = 3; + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + BrokerEndpointCollection endPoints = new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value()) + ).iterator()); + + List records = new ArrayList<>(); + records.add(new RegisterBrokerRecord() + .setBrokerId(0) + .setFenced(false) + .setEndPoints(endPoints)); + records.add(new TopicRecord().setName(topic).setTopicId(topicId)); + + // replica 1 is not available + int leader = 0; + int leaderEpoch = 0; + List replicas = List.of(0, 1); + List isr = List.of(0); + + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(partitionEpoch) + .setReplicas(replicas)); + updateCache(cache, records); + + // Validate errorUnavailableEndpoints = false + List topicMetadataList = + cache.getTopicMetadata(Set.of(topic), listenerName, false, false); + assertEquals(1, topicMetadataList.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadata = topicMetadataList.get(0); + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()); + + List partitionMetadatas = topicMetadata.partitions(); + assertEquals(1, partitionMetadatas.size()); + + MetadataResponseData.MetadataResponsePartition partitionMetadata = partitionMetadatas.get(0); + assertEquals(0, partitionMetadata.partitionIndex()); + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()); + assertEquals(new HashSet<>(List.of(0, 1)), new HashSet<>(partitionMetadata.replicaNodes())); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadata.isrNodes())); + + // Validate errorUnavailableEndpoints = true + List topicMetadatasWithError = + cache.getTopicMetadata(Set.of(topic), listenerName, true, false); + assertEquals(1, topicMetadatasWithError.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadataWithError = topicMetadatasWithError.get(0); + assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()); + + List partitionMetadatasWithError = topicMetadataWithError.partitions(); + assertEquals(1, partitionMetadatasWithError.size()); + + MetadataResponseData.MetadataResponsePartition partitionMetadataWithError = partitionMetadatasWithError.get(0); + assertEquals(0, partitionMetadataWithError.partitionIndex()); + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadataWithError.replicaNodes())); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadataWithError.isrNodes())); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataIsrNotAvailable(MetadataCache cache) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + + BrokerEndpointCollection endpoints = new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value()) + ).iterator()); + + List records = new ArrayList<>(); + records.add(new RegisterBrokerRecord() + .setBrokerId(0) + .setRack("rack1") + .setFenced(false) + .setEndPoints(endpoints)); + records.add(new TopicRecord().setName(topic).setTopicId(topicId)); + + // isr member 1 is not a registered broker + int leader = 0; + int leaderEpoch = 0; + List replicas = List.of(0); + List isr = List.of(0, 1); + + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setReplicas(replicas)); + updateCache(cache, records); + + // Validate errorUnavailableEndpoints = false + List topicMetadataList = + cache.getTopicMetadata(Set.of(topic), listenerName, false, false); + assertEquals(1, topicMetadataList.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadata = topicMetadataList.get(0); + assertEquals(Errors.NONE.code(), topicMetadata.errorCode()); + + List partitionMetadatas = topicMetadata.partitions(); + assertEquals(1, partitionMetadatas.size()); + + MetadataResponseData.MetadataResponsePartition partitionMetadata = partitionMetadatas.get(0); + assertEquals(0, partitionMetadata.partitionIndex()); + assertEquals(Errors.NONE.code(), partitionMetadata.errorCode()); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadata.replicaNodes())); + assertEquals(new HashSet<>(List.of(0, 1)), new HashSet<>(partitionMetadata.isrNodes())); + + // Validate errorUnavailableEndpoints = true + List topicMetadatasWithError = + cache.getTopicMetadata(Set.of(topic), listenerName, true, false); + assertEquals(1, topicMetadatasWithError.size()); + + MetadataResponseData.MetadataResponseTopic topicMetadataWithError = topicMetadatasWithError.get(0); + assertEquals(Errors.NONE.code(), topicMetadataWithError.errorCode()); + + List partitionMetadatasWithError = topicMetadataWithError.partitions(); + assertEquals(1, partitionMetadatasWithError.size()); + + MetadataResponseData.MetadataResponsePartition partitionMetadataWithError = partitionMetadatasWithError.get(0); + assertEquals(0, partitionMetadataWithError.partitionIndex()); + assertEquals(Errors.REPLICA_NOT_AVAILABLE.code(), partitionMetadataWithError.errorCode()); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadataWithError.replicaNodes())); + assertEquals(new HashSet<>(List.of(0)), new HashSet<>(partitionMetadataWithError.isrNodes())); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getTopicMetadataWithNonSupportedSecurityProtocol(MetadataCache cache) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + + RegisterBrokerRecord broker = new RegisterBrokerRecord() + .setBrokerId(0) + .setRack("") + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(ListenerName.forSecurityProtocol(securityProtocol).value()) + ).iterator())); + + TopicRecord topicRecord = new TopicRecord().setName(topic).setTopicId(topicId); + + int leader = 0; + int leaderEpoch = 0; + List replicas = List.of(0); + List isr = List.of(0, 1); + + List records = new ArrayList<>(); + records.add(broker); + records.add(topicRecord); + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setReplicas(replicas)); + updateCache(cache, records); + + List topicMetadata = + cache.getTopicMetadata(Set.of(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL), false, false); + assertEquals(1, topicMetadata.size()); + assertEquals(1, topicMetadata.get(0).partitions().size()); + assertEquals(RecordBatch.NO_PARTITION_LEADER_EPOCH, topicMetadata.get(0).partitions().get(0).leaderId()); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void getAliveBrokersShouldNotBeMutatedByUpdateCache(MetadataCache cache) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + List topicRecords = Collections.singletonList( + new TopicRecord().setName(topic).setTopicId(topicId)); + + List initialBrokerIds = List.of(0, 1, 2); + updateCacheWithBrokers(cache, initialBrokerIds, topicId, topicRecords); + // This should not change the alive brokers + updateCacheWithBrokers(cache, List.of(0, 1, 2, 3), topicId, topicRecords); + for (int brokerId : initialBrokerIds) { + assertTrue(cache.hasAliveBroker(brokerId)); + } + } + + private void updateCacheWithBrokers(MetadataCache cache, List brokerIds, + Uuid topicId, List topicRecords) { + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + List records = new ArrayList<>(); + for (int brokerId : brokerIds) { + records.add(new RegisterBrokerRecord() + .setBrokerId(brokerId) + .setRack("") + .setFenced(false) + .setBrokerEpoch(brokerEpoch) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(ListenerName.forSecurityProtocol(securityProtocol).value()) + ).iterator()))); + } + records.addAll(topicRecords); + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(List.of(0, 1)) + .setReplicas(List.of(0))); + updateCache(cache, records); + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void testGetPartitionReplicaEndpoints(MetadataCache cache) { + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + + // Set up broker data for the metadata cache + int numBrokers = 10; + int fencedBrokerId = numBrokers / 3; + List brokerRecords = new ArrayList<>(); + for (int brokerId = 0; brokerId < numBrokers; brokerId++) { + brokerRecords.add(new RegisterBrokerRecord() + .setBrokerId(brokerId) + .setFenced(brokerId == fencedBrokerId) + .setRack("rack" + (brokerId % 3)) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo" + brokerId) + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value()) + ).iterator()))); + } + + // Set up a single topic (with many partitions) for the metadata cache + String topic = "many-partitions-topic"; + Uuid topicId = Uuid.randomUuid(); + + // Set up a number of partitions such that each different combination of + // $replicationFactor brokers is made a replica set for exactly one partition + int replicationFactor = 3; + List> replicaSets = getAllReplicaSets(numBrokers, replicationFactor); + int numPartitions = replicaSets.size(); + + List partitionRecords = new ArrayList<>(); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + List replicas = replicaSets.get(partitionId); + List nonFencedReplicas = new ArrayList<>(); + for (Integer id : replicas) { + if (id != fencedBrokerId) nonFencedReplicas.add(id); + } + partitionRecords.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(partitionId) + .setReplicas(replicas) + .setLeader(replicas.get(0)) + .setIsr(nonFencedReplicas) + .setEligibleLeaderReplicas(nonFencedReplicas)); + } + + // Load the prepared data in the metadata cache + List records = new ArrayList<>(brokerRecords); + records.add(new TopicRecord().setName(topic).setTopicId(topicId)); + records.addAll(partitionRecords); + updateCache(cache, records); + + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + TopicPartition tp = new TopicPartition(topic, partitionId); + Map brokerIdToNodeMap = + cache.getPartitionReplicaEndpoints(tp, listenerName); + Set replicaSet = brokerIdToNodeMap.keySet(); + Set expectedReplicaSet = new HashSet<>(partitionRecords.get(partitionId).replicas()); + + // Verify that we have endpoints for exactly the non-fenced brokers of the replica set + if (expectedReplicaSet.contains(fencedBrokerId)) { + Set replicaSetPlusFenced = new HashSet<>(replicaSet); + replicaSetPlusFenced.add(fencedBrokerId); + assertEquals(expectedReplicaSet, replicaSetPlusFenced, + "Unexpected partial replica set for partition " + partitionId); + } else { + assertEquals(expectedReplicaSet, replicaSet, + "Unexpected replica set for partition " + partitionId); + } + + // Verify that the endpoint data for each non-fenced replica is as expected + for (int brokerId : replicaSet) { + Node brokerNode = brokerIdToNodeMap.get(brokerId); + if (brokerNode == null) { + throw new AssertionError("No brokerNode for broker " + brokerId + " and partition " + partitionId); + } + RegisterBrokerRecord expectedBroker = brokerRecords.get(brokerId); + BrokerEndpoint expectedEndpoint = expectedBroker.endPoints().find(listenerName.value()); + assertEquals(expectedEndpoint.host(), brokerNode.host(), + "Unexpected host for broker " + brokerId + " and partition " + partitionId); + assertEquals(expectedEndpoint.port(), brokerNode.port(), + "Unexpected port for broker " + brokerId + " and partition " + partitionId); + assertEquals(expectedBroker.rack(), brokerNode.rack(), + "Unexpected rack for broker " + brokerId + " and partition " + partitionId); + } + } + + TopicPartition tp = new TopicPartition(topic, numPartitions); + Map brokerIdToNodeMap = + cache.getPartitionReplicaEndpoints(tp, listenerName); + assertTrue(brokerIdToNodeMap.isEmpty()); + } + + private static List> getAllReplicaSets(int numBrokers, int replicationFactor) { + List> result = new ArrayList<>(); + int[] indices = new int[replicationFactor]; + for (int i = 0; i < replicationFactor; i++) indices[i] = i; + while (true) { + List combo = new ArrayList<>(); + for (int idx : indices) combo.add(idx); + result.add(combo); + int i = replicationFactor - 1; + while (i >= 0 && indices[i] == numBrokers - replicationFactor + i) i--; + if (i < 0) break; + indices[i]++; + for (int j = i + 1; j < replicationFactor; j++) indices[j] = indices[j - 1] + 1; + } + return result; + } + + @Test + public void testIsBrokerFenced() { + KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_0); + + MetadataDelta delta = new MetadataDelta.Builder().build(); + delta.replay(new RegisterBrokerRecord() + .setBrokerId(0) + .setFenced(false)); + + metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)); + + assertFalse(metadataCache.isBrokerFenced(0)); + + delta.replay(new BrokerRegistrationChangeRecord() + .setBrokerId(0) + .setFenced((byte) 1)); + + metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)); + + assertTrue(metadataCache.isBrokerFenced(0)); + } + + @Test + public void testIsBrokerInControlledShutdown() { + KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_0); + + MetadataDelta delta = new MetadataDelta.Builder().build(); + delta.replay(new RegisterBrokerRecord() + .setBrokerId(0) + .setInControlledShutdown(false)); + + metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)); + + assertFalse(metadataCache.isBrokerShuttingDown(0)); + + delta.replay(new BrokerRegistrationChangeRecord() + .setBrokerId(0) + .setInControlledShutdown((byte) 1)); + + metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)); + + assertTrue(metadataCache.isBrokerShuttingDown(0)); + } + + @Test + public void testGetLiveBrokerEpoch() { + KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_0); + + MetadataDelta delta = new MetadataDelta.Builder().build(); + delta.replay(new RegisterBrokerRecord() + .setBrokerId(0) + .setBrokerEpoch(100) + .setFenced(false)); + + delta.replay(new RegisterBrokerRecord() + .setBrokerId(1) + .setBrokerEpoch(101) + .setFenced(true)); + + metadataCache.setImage(delta.apply(MetadataProvenance.EMPTY)); + + assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).orElse(-1L)); + assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).orElse(-1L)); + } + + @Test + public void testDescribeTopicResponse() { + KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_0); + + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + String topic0 = "test0"; + String topic1 = "test1"; + + Map topicIds = new HashMap<>(); + topicIds.put(topic0, Uuid.randomUuid()); + topicIds.put(topic1, Uuid.randomUuid()); + + // partitionMap key: "topicName:partitionId" + Map partitionMap = new HashMap<>(); + partitionMap.put(topic0 + ":0", new PartitionRecord() + .setTopicId(topicIds.get(topic0)) + .setPartitionId(0) + .setReplicas(List.of(0, 1, 2)) + .setLeader(0) + .setIsr(List.of(0)) + .setEligibleLeaderReplicas(List.of(1)) + .setLastKnownElr(List.of(2)) + .setLeaderEpoch(0) + .setPartitionEpoch(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())); + partitionMap.put(topic0 + ":2", new PartitionRecord() + .setTopicId(topicIds.get(topic0)) + .setPartitionId(2) + .setReplicas(List.of(0, 2, 3)) + .setLeader(3) + .setIsr(List.of(3)) + .setEligibleLeaderReplicas(List.of(2)) + .setLastKnownElr(List.of(0)) + .setLeaderEpoch(1) + .setPartitionEpoch(2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())); + partitionMap.put(topic0 + ":1", new PartitionRecord() + .setTopicId(topicIds.get(topic0)) + .setPartitionId(1) + .setReplicas(List.of(0, 1, 3)) + .setLeader(0) + .setIsr(List.of(0)) + .setEligibleLeaderReplicas(List.of(1)) + .setLastKnownElr(List.of(3)) + .setLeaderEpoch(0) + .setPartitionEpoch(2) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())); + partitionMap.put(topic1 + ":0", new PartitionRecord() + .setTopicId(topicIds.get(topic1)) + .setPartitionId(0) + .setReplicas(List.of(0, 1, 2)) + .setLeader(2) + .setIsr(List.of(2)) + .setEligibleLeaderReplicas(List.of(1)) + .setLastKnownElr(List.of(0)) + .setLeaderEpoch(10) + .setPartitionEpoch(11) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())); + + List records = new ArrayList<>(); + records.add(new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(0) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint().setHost("foo0").setPort(9092) + .setSecurityProtocol(securityProtocol.id).setName(listenerName.value()) + ).iterator()))); + records.add(new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(1) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint().setHost("foo1").setPort(9093) + .setSecurityProtocol(securityProtocol.id).setName(listenerName.value()) + ).iterator()))); + records.add(new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(2) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint().setHost("foo2").setPort(9094) + .setSecurityProtocol(securityProtocol.id).setName(listenerName.value()) + ).iterator()))); + records.add(new RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(3) + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint().setHost("foo3").setPort(9095) + .setSecurityProtocol(securityProtocol.id).setName(listenerName.value()) + ).iterator()))); + records.add(new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0))); + records.add(new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))); + records.addAll(partitionMap.values()); + updateCache(metadataCache, records); + + // Basic test + List result = new ArrayList<>(metadataCache + .describeTopicResponse(List.of(topic0, topic1).iterator(), listenerName, t -> 0, 10, false) + .topics()); + assertEquals(2, result.size()); + DescribeTopicPartitionsResponseTopic resultTopic = result.get(0); + assertEquals(topic0, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic0), resultTopic.topicId()); + assertEquals(3, resultTopic.partitions().size()); + checkTopicMetadata(topic0, new HashSet<>(List.of(0, 1, 2)), resultTopic.partitions(), partitionMap); + + resultTopic = result.get(1); + assertEquals(topic1, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic1), resultTopic.topicId()); + assertEquals(1, resultTopic.partitions().size()); + checkTopicMetadata(topic1, new HashSet<>(List.of(0)), resultTopic.partitions(), partitionMap); + + // Quota reached + DescribeTopicPartitionsResponseData response = metadataCache + .describeTopicResponse(List.of(topic0, topic1).iterator(), listenerName, t -> 0, 2, false); + result = new ArrayList<>(response.topics()); + assertEquals(1, result.size()); + resultTopic = result.get(0); + assertEquals(topic0, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic0), resultTopic.topicId()); + assertEquals(2, resultTopic.partitions().size()); + checkTopicMetadata(topic0, new HashSet<>(List.of(0, 1)), resultTopic.partitions(), partitionMap); + assertEquals(topic0, response.nextCursor().topicName()); + assertEquals(2, response.nextCursor().partitionIndex()); + + // With start index + result = new ArrayList<>(metadataCache + .describeTopicResponse(List.of(topic0).iterator(), listenerName, + t -> t.equals(topic0) ? 1 : 0, 10, false) + .topics()); + assertEquals(1, result.size()); + resultTopic = result.get(0); + assertEquals(topic0, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic0), resultTopic.topicId()); + assertEquals(2, resultTopic.partitions().size()); + checkTopicMetadata(topic0, new HashSet<>(List.of(1, 2)), resultTopic.partitions(), partitionMap); + + // With start index and quota reached + response = metadataCache.describeTopicResponse(List.of(topic0, topic1).iterator(), listenerName, + t -> t.equals(topic0) ? 2 : 0, 1, false); + result = new ArrayList<>(response.topics()); + assertEquals(1, result.size()); + resultTopic = result.get(0); + assertEquals(topic0, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic0), resultTopic.topicId()); + assertEquals(1, resultTopic.partitions().size()); + checkTopicMetadata(topic0, new HashSet<>(List.of(2)), resultTopic.partitions(), partitionMap); + assertEquals(topic1, response.nextCursor().topicName()); + assertEquals(0, response.nextCursor().partitionIndex()); + + // When the first topic does not exist + result = new ArrayList<>(metadataCache.describeTopicResponse(List.of("Non-exist", topic0).iterator(), listenerName, + t -> t.equals("Non-exist") ? 1 : 0, 1, false).topics()); + assertEquals(2, result.size()); + resultTopic = result.get(0); + assertEquals("Non-exist", resultTopic.name()); + assertEquals(3, resultTopic.errorCode()); + + resultTopic = result.get(1); + assertEquals(topic0, resultTopic.name()); + assertEquals(0, resultTopic.errorCode()); + assertEquals(topicIds.get(topic0), resultTopic.topicId()); + assertEquals(1, resultTopic.partitions().size()); + checkTopicMetadata(topic0, new HashSet<>(List.of(0)), resultTopic.partitions(), partitionMap); + } + + private void checkTopicMetadata( + String topic, + Set partitionIds, + List partitions, + Map partitionMap + ) { + for (DescribeTopicPartitionsResponsePartition partition : partitions) { + int partitionId = partition.partitionIndex(); + assertTrue(partitionIds.contains(partitionId)); + PartitionRecord expectedPartition = partitionMap.get(topic + ":" + partitionId); + assertEquals(0, partition.errorCode()); + assertEquals(expectedPartition.leaderEpoch(), partition.leaderEpoch()); + assertEquals(expectedPartition.partitionId(), partition.partitionIndex()); + assertEquals(expectedPartition.eligibleLeaderReplicas(), partition.eligibleLeaderReplicas()); + assertEquals(expectedPartition.isr(), partition.isrNodes()); + assertEquals(expectedPartition.lastKnownElr(), partition.lastKnownElr()); + assertEquals(expectedPartition.leader(), partition.leaderId()); + } + } + + @ParameterizedTest + @MethodSource("cacheProvider") + public void testGetLeaderAndIsr(MetadataCache cache) { + String topic = "topic"; + Uuid topicId = Uuid.randomUuid(); + int partitionIndex = 0; + int leader = 0; + int leaderEpoch = 0; + List isr = List.of(2, 3, 0); + List replicas = List.of(2, 3, 0, 1, 4); + + SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; + ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + + List records = new ArrayList<>(); + records.add(new RegisterBrokerRecord() + .setBrokerId(0) + .setBrokerEpoch(brokerEpoch) + .setRack("rack1") + .setEndPoints(new BrokerEndpointCollection(Collections.singletonList( + new BrokerEndpoint() + .setHost("foo") + .setPort(9092) + .setSecurityProtocol(securityProtocol.id) + .setName(listenerName.value()) + ).iterator()))); + records.add(new TopicRecord().setName(topic).setTopicId(topicId)); + records.add(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(partitionIndex) + .setLeader(leader) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setReplicas(replicas)); + updateCache(cache, records); + + Optional leaderAndIsr = cache.getLeaderAndIsr(topic, partitionIndex); + assertEquals(Optional.of(leader), leaderAndIsr.map(LeaderAndIsr::leader)); + assertEquals(Optional.of(leaderEpoch), leaderAndIsr.map(LeaderAndIsr::leaderEpoch)); + assertEquals(Optional.of(new HashSet<>(isr)), leaderAndIsr.map(lai -> new HashSet<>(lai.isr()))); + assertEquals(Optional.of(-1), leaderAndIsr.map(LeaderAndIsr::partitionEpoch)); + assertEquals(Optional.of(LeaderRecoveryState.RECOVERED), leaderAndIsr.map(LeaderAndIsr::leaderRecoveryState)); + } + + @Test + public void testGetOfflineReplicasConsidersDirAssignment() { + Map> result = offlinePartitions( + List.of( + new BrokerInfo(0, List.of(Uuid.fromString("broker1logdirjEo71BG0w"))), + new BrokerInfo(1, List.of(Uuid.fromString("broker2logdirRmQQgLxgw"))) + ), + List.of( + new PartitionInfo2(0, List.of(0, 1), + List.of(Uuid.fromString("broker1logdirjEo71BG0w"), DirectoryId.LOST)), + new PartitionInfo2(1, List.of(0, 1), + List.of(Uuid.fromString("unknownlogdirjEo71BG0w"), DirectoryId.UNASSIGNED)), + new PartitionInfo2(2, List.of(0, 1), + List.of(DirectoryId.MIGRATING, Uuid.fromString("broker2logdirRmQQgLxgw"))) + ) + ); + + Map> expected = new HashMap<>(); + expected.put(0, List.of(1)); + expected.put(1, List.of(0)); + expected.put(2, Collections.emptyList()); + assertEquals(expected, result); + } + + private static class BrokerInfo { + final int id; + final List dirs; + + BrokerInfo(int id, List dirs) { + this.id = id; + this.dirs = dirs; + } + } + + private static class PartitionInfo2 { + final int id; + final List replicas; + final List dirs; + + PartitionInfo2(int id, List replicas, List dirs) { + this.id = id; + this.replicas = replicas; + this.dirs = dirs; + } + } + + private static Map> offlinePartitions( + List brokers, + List partitions + ) { + MetadataDelta delta = new MetadataDelta.Builder().build(); + for (BrokerInfo broker : brokers) { + delta.replay(new RegisterBrokerRecord() + .setFenced(false) + .setBrokerId(broker.id) + .setLogDirs(broker.dirs) + .setEndPoints(new BrokerEndpointCollection(Collections.singleton( + new BrokerEndpoint() + .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id) + .setPort((short) 9093) + .setName("PLAINTEXT") + .setHost("broker-" + broker.id) + ).iterator()))); + } + Uuid topicId = Uuid.fromString("95OVr1IPRYGrcNCLlpImCA"); + delta.replay(new TopicRecord().setTopicId(topicId).setName("foo")); + for (PartitionInfo2 partition : partitions) { + delta.replay(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(partition.id) + .setReplicas(partition.replicas) + .setDirectories(partition.dirs) + .setLeader(partition.replicas.get(0)) + .setIsr(partition.replicas)); + } + KRaftMetadataCache cache = new KRaftMetadataCache(1, () -> KRaftVersion.KRAFT_VERSION_0); + cache.setImage(delta.apply(MetadataProvenance.EMPTY)); + List topicMetadata = + cache.getTopicMetadata(Set.of("foo"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false); + Map> result = new HashMap<>(); + for (MetadataResponseData.MetadataResponsePartition p : topicMetadata.get(0).partitions()) { + result.put(p.partitionIndex(), p.offlineReplicas()); + } + return result; + } +}