diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index d5e9014905222..a4afc33211f1b 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -4177,11 +4177,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } def removeAllClientAcls(): Unit = { - val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers, controllerServers) + val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers) val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY) val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, aclEntryFilter) - authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala. + authorizerForWrite.deleteAcls(anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala. map(_.toCompletableFuture.get).flatMap { deletion => deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet }.foreach { resource => diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 16dec9dc00800..5760042bbd514 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -58,7 +58,10 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg override def setUp(testInfo: TestInfo): Unit = { this.testInfo = testInfo super.setUp(testInfo) - waitUntilBrokerMetadataIsPropagated(brokers) + val expectedBrokerIds = brokers.map(_.config.brokerId).toSet + waitUntilTrue(() => brokers.forall(server => + expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_)) + ), "Timed out waiting for broker metadata to propagate to all servers", 15000) } @AfterEach diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 97adc98cee47c..2427b2f11e148 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -70,7 +70,7 @@ import scala.collection.Seq import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.jdk.CollectionConverters._ -import scala.util.{Random, Using} +import scala.util.{Failure, Random, Success, Try, Using} /** * An integration test of the KafkaAdminClient. @@ -1552,7 +1552,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val result1 = client.deleteRecords(util.Map.of(topicPartition, RecordsToDelete.beforeOffset(117L))) result1.all().get() restartDeadBrokers() - TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex)) + waitForBrokersInIsr(client, topicPartition, Set(followerIndex)) waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L) } @@ -1675,7 +1675,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // delete records in corrupt segment (the first segment) client.deleteRecords(util.Map.of(topicPartition, RecordsToDelete.beforeOffset(firstSegmentRecordsSize))).all.get // verify reassignment is finished after delete records - TestUtils.waitForBrokersInIsr(client, topicPartition, Set(partitionLeaderId, partitionFollowerId)) + waitForBrokersInIsr(client, topicPartition, Set(partitionLeaderId, partitionFollowerId)) // seek to beginning and make sure we can consume all records consumer.seekToBeginning(util.List.of(topicPartition)) assertEquals(19, TestUtils.consumeRecords(consumer, 20 - firstSegmentRecordsSize).last.offset()) @@ -3152,25 +3152,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000) // Check the leader hasn't moved - TestUtils.assertLeader(client, partition1, prior1) - TestUtils.assertLeader(client, partition2, prior2) + assertLeader(client, partition1, prior1) + assertLeader(client, partition2, prior2) } // Check current leaders are 0 - TestUtils.assertLeader(client, partition1, 0) - TestUtils.assertLeader(client, partition2, 0) + assertLeader(client, partition1, 0) + assertLeader(client, partition2, 0) // Noop election var electResult = client.electLeaders(ElectionType.PREFERRED, util.Set.of(partition1)) val exception = electResult.partitions.get.get(partition1).get assertEquals(classOf[ElectionNotNeededException], exception.getClass) - TestUtils.assertLeader(client, partition1, 0) + assertLeader(client, partition1, 0) // Noop election with null partitions electResult = client.electLeaders(ElectionType.PREFERRED, null) assertTrue(electResult.partitions.get.isEmpty) - TestUtils.assertLeader(client, partition1, 0) - TestUtils.assertLeader(client, partition2, 0) + assertLeader(client, partition1, 0) + assertLeader(client, partition2, 0) // Now change the preferred leader to 1 waitForBrokerMetadataPropagation(partition1) @@ -3182,18 +3182,18 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet) electResult.partitions.get.get(partition1) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition1")) - TestUtils.assertLeader(client, partition1, 1) + assertLeader(client, partition1, 1) // topic 2 unchanged assertFalse(electResult.partitions.get.containsKey(partition2)) - TestUtils.assertLeader(client, partition2, 0) + assertLeader(client, partition2, 0) // meaningful election with null partitions electResult = client.electLeaders(ElectionType.PREFERRED, null) assertEquals(Set(partition2), electResult.partitions.get.keySet.asScala) electResult.partitions.get.get(partition2) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition2")) - TestUtils.assertLeader(client, partition2, 1) + assertLeader(client, partition2, 1) def assertUnknownTopicOrPartition( topicPartition: TopicPartition, @@ -3209,8 +3209,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { electResult = client.electLeaders(ElectionType.PREFERRED, util.Set.of(unknownPartition)) assertEquals(util.Set.of(unknownPartition), electResult.partitions.get.keySet) assertUnknownTopicOrPartition(unknownPartition, electResult) - TestUtils.assertLeader(client, partition1, 1) - TestUtils.assertLeader(client, partition2, 1) + assertLeader(client, partition1, 1) + assertLeader(client, partition2, 1) // Now change the preferred leader to 2 waitForBrokerMetadataPropagation(partition1) @@ -3220,15 +3220,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // mixed results electResult = client.electLeaders(ElectionType.PREFERRED, util.Set.of(unknownPartition, partition1)) assertEquals(util.Set.of(unknownPartition, partition1), electResult.partitions.get.keySet) - TestUtils.assertLeader(client, partition1, 2) - TestUtils.assertLeader(client, partition2, 1) + assertLeader(client, partition1, 2) + assertLeader(client, partition2, 1) assertUnknownTopicOrPartition(unknownPartition, electResult) // elect preferred leader for partition 2 electResult = client.electLeaders(ElectionType.PREFERRED, util.Set.of(partition2)) assertEquals(util.Set.of(partition2), electResult.partitions.get.keySet) assertFalse(electResult.partitions.get.get(partition2).isPresent) - TestUtils.assertLeader(client, partition2, 2) + assertLeader(client, partition2, 2) // Now change the preferred leader to 1 waitForBrokerMetadataPropagation(partition1) @@ -3238,7 +3238,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { killBroker(1) waitForBrokerMetadataPropagation(partition1) waitForBrokerMetadataPropagation(partition2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) + waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) def assertPreferredLeaderNotAvailable( topicPartition: TopicPartition, @@ -3257,17 +3257,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(util.Set.of(partition1), electResult.partitions.get.keySet) assertPreferredLeaderNotAvailable(partition1, electResult) - TestUtils.assertLeader(client, partition1, 2) + assertLeader(client, partition1, 2) // preferred leader unavailable with null argument electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout) assertTrue(Set(partition1, partition2).subsetOf(electResult.partitions.get.keySet.asScala)) assertPreferredLeaderNotAvailable(partition1, electResult) - TestUtils.assertLeader(client, partition1, 2) + assertLeader(client, partition1, 2) assertPreferredLeaderNotAvailable(partition2, electResult) - TestUtils.assertLeader(client, partition2, 2) + assertLeader(client, partition2, 2) } @Test @@ -3283,19 +3283,19 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val partition1 = new TopicPartition("unclean-test-topic-1", 0) createTopicWithAssignment(partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1)) - TestUtils.assertLeader(client, partition1, broker1) + assertLeader(client, partition1, broker1) killBroker(broker2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) + waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) killBroker(broker1) - TestUtils.assertNoLeader(client, partition1) + assertNoLeader(client, partition1) brokers(broker2).startup() - TestUtils.waitForOnlineBroker(client, broker2) + waitForOnlineBroker(client, broker2) val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(partition1)) electResult.partitions.get.get(partition1) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition1")) - TestUtils.assertLeader(client, partition1, broker2) + assertLeader(client, partition1, broker2) } @Test @@ -3318,24 +3318,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) - TestUtils.assertLeader(client, partition1, broker1) - TestUtils.assertLeader(client, partition2, broker1) + assertLeader(client, partition1, broker1) + assertLeader(client, partition2, broker1) killBroker(broker2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2)) + waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2)) killBroker(broker1) - TestUtils.assertNoLeader(client, partition1) - TestUtils.assertNoLeader(client, partition2) + assertNoLeader(client, partition1) + assertNoLeader(client, partition2) brokers(broker2).startup() - TestUtils.waitForOnlineBroker(client, broker2) + waitForOnlineBroker(client, broker2) val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(partition1, partition2)) electResult.partitions.get.get(partition1) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition1")) electResult.partitions.get.get(partition2) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition2")) - TestUtils.assertLeader(client, partition1, broker2) - TestUtils.assertLeader(client, partition2, broker2) + assertLeader(client, partition1, broker2) + assertLeader(client, partition2, broker2) } @Test @@ -3359,23 +3359,23 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) - TestUtils.assertLeader(client, partition1, broker1) - TestUtils.assertLeader(client, partition2, broker1) + assertLeader(client, partition1, broker1) + assertLeader(client, partition2, broker1) killBroker(broker2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) + waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) killBroker(broker1) - TestUtils.assertNoLeader(client, partition1) - TestUtils.assertLeader(client, partition2, broker3) + assertNoLeader(client, partition1) + assertLeader(client, partition2, broker3) brokers(broker2).startup() - TestUtils.waitForOnlineBroker(client, broker2) + waitForOnlineBroker(client, broker2) val electResult = client.electLeaders(ElectionType.UNCLEAN, null) electResult.partitions.get.get(partition1) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition1")) assertFalse(electResult.partitions.get.containsKey(partition2)) - TestUtils.assertLeader(client, partition1, broker2) - TestUtils.assertLeader(client, partition2, broker3) + assertLeader(client, partition1, broker2) + assertLeader(client, partition2, broker3) } @Test @@ -3397,7 +3397,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(0 -> assignment1) ) - TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1) + assertLeader(client, new TopicPartition(topic, 0), broker1) val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(unknownPartition, unknownTopic)) assertTrue(electResult.partitions.get.get(unknownPartition).get.isInstanceOf[UnknownTopicOrPartitionException]) @@ -3422,12 +3422,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(partition1.partition -> assignment1) ) - TestUtils.assertLeader(client, partition1, broker1) + assertLeader(client, partition1, broker1) killBroker(broker2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) + waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) killBroker(broker1) - TestUtils.assertNoLeader(client, partition1) + assertNoLeader(client, partition1) val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(partition1)) assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException]) @@ -3451,10 +3451,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(partition1.partition -> assignment1) ) - TestUtils.assertLeader(client, partition1, broker1) + assertLeader(client, partition1, broker1) killBroker(broker1) - TestUtils.assertLeader(client, partition1, broker2) + assertLeader(client, partition1, broker2) brokers(broker1).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(partition1)) @@ -3482,23 +3482,23 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) - TestUtils.assertLeader(client, partition1, broker1) - TestUtils.assertLeader(client, partition2, broker1) + assertLeader(client, partition1, broker1) + assertLeader(client, partition2, broker1) killBroker(broker2) - TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) + waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) killBroker(broker1) - TestUtils.assertNoLeader(client, partition1) - TestUtils.assertLeader(client, partition2, broker3) + assertNoLeader(client, partition1) + assertLeader(client, partition2, broker3) brokers(broker2).startup() - TestUtils.waitForOnlineBroker(client, broker2) + waitForOnlineBroker(client, broker2) val electResult = client.electLeaders(ElectionType.UNCLEAN, util.Set.of(partition1, partition2)) electResult.partitions.get.get(partition1) .ifPresent(t => fail(s"Unexpected exception during leader election: $t for partition $partition1")) assertTrue(electResult.partitions.get.get(partition2).get.isInstanceOf[ElectionNotNeededException]) - TestUtils.assertLeader(client, partition1, broker2) - TestUtils.assertLeader(client, partition2, broker3) + assertLeader(client, partition1, broker2) + assertLeader(client, partition2, broker3) } @Test @@ -5061,4 +5061,78 @@ object PlaintextAdminIntegrationTest { assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value) } + + private def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = { + waitUntilTrue( + () => { + val isr = client.describeTopics(util.Set.of(partition.topic)) + .allTopicNames + .get + .asScala + .values + .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) + .map(_.id) + .toSet + brokerIds.subsetOf(isr) + }, + s"Expected brokers $brokerIds to be in the ISR for $partition" + ) + } + + private def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = { + waitUntilTrue( + () => { + val description = client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala + val isr = description + .values + .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) + .map(_.id) + .toSet + + brokerIds.intersect(isr).isEmpty + }, + s"Expected brokers $brokerIds to no longer be in the ISR for $partition" + ) + } + + private def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { + waitUntilTrue(() => { + val nodes = client.describeCluster().nodes().get() + nodes.asScala.exists(_.id == brokerId) + }, s"Timed out waiting for brokerId $brokerId to come online") + } + + private def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit = { + waitForLeaderToBecome(client, topicPartition, Some(expectedLeader)) + } + + private def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = { + waitForLeaderToBecome(client, topicPartition, None) + } + + private def waitForLeaderToBecome( + client: Admin, + topicPartition: TopicPartition, + expectedLeaderOpt: Option[Int] + ): Unit = { + val topic = topicPartition.topic + val partitionId = topicPartition.partition + + def currentLeader: Try[Option[Int]] = Try { + val topicDescription = client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic) + topicDescription.partitions.asScala + .find(_.partition == partitionId) + .flatMap(partitionState => Option(partitionState.leader)) + .map(_.id) + } + + val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) { + case Success(leaderOpt) => leaderOpt == expectedLeaderOpt + case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false + case Failure(e) => throw e + } + + assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " + + s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}") + } } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 7f37eeb25a15d..4b6690a4fdc45 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.resource.ResourcePattern -import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT +import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import java.io.File +import java.net.InetAddress import java.time.Duration import java.util import java.util.Properties @@ -214,6 +216,17 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } } + val anonymousAuthorizableContext = new AuthorizableRequestContext() { + override def listenerName(): String = "" + override def securityProtocol(): SecurityProtocol = SecurityProtocol.PLAINTEXT + override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS + override def clientAddress(): InetAddress = null + override def requestType(): Int = 0 + override def requestVersion(): Int = 0 + override def clientId(): String = "" + override def correlationId(): Int = 0 + } + def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = { val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers) val aclBindings = acls.map { acl => new AclBinding(resource, acl) } @@ -371,4 +384,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } } } + + /** + * Find an Authorizer that we can call createAcls or deleteAcls on. + */ + def pickAuthorizerForWrite[B <: KafkaBroker](brokers: Seq[B], controllers: Seq[ControllerServer]): JAuthorizer = { + if (controllers.isEmpty) { + brokers.head.authorizerPlugin.get.get + } else { + var result: JAuthorizer = null + TestUtils.retry(120000) { + val active = controllers.filter(_.controller.isActive).head + result = active.authorizerPlugin.get.get + } + result + } + } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index c51cba6a66fb4..0d689e7f6cb41 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1229,7 +1229,7 @@ class LogCleanerTest extends Logging { // the last (active) segment has just one message - def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq + def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => StandardCharsets.UTF_8.decode(record.value()).toString).toSet.size).toSeq val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), @@ -1927,7 +1927,7 @@ class LogCleanerTest extends Logging { for (segment <- log.logSegments.asScala; batch <- segment.log.batches.asScala; record <- batch.asScala) { assertTrue(record.hasMagic(batch.magic)) - val value = TestUtils.readString(record.value).toLong + val value = StandardCharsets.UTF_8.decode(record.value()).toString.toLong assertEquals(record.offset, value) } } @@ -1947,7 +1947,7 @@ class LogCleanerTest extends Logging { for (logEntry <- records.records.asScala) { val offset = logEntry.offset - val value = TestUtils.readString(logEntry.value).toLong + val value = StandardCharsets.UTF_8.decode(logEntry.value()).toString.toLong assertEquals(offset, value) } } diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 9875a3b2ce441..cb46f74bdc4ca 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -19,7 +19,6 @@ package kafka.log import java.io.File import java.util.Properties -import kafka.utils.TestUtils import org.apache.kafka.common.Uuid import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.record.internal.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} @@ -38,9 +37,10 @@ import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG} import org.apache.kafka.common.message.AbortedTxn -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats +import java.nio.charset.StandardCharsets import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption @@ -202,7 +202,7 @@ object LogTestUtils { for (logSegment <- log.logSegments.asScala; batch <- logSegment.log.batches.asScala if !batch.isControlBatch; record <- batch.asScala if record.hasValue && record.hasKey) - yield TestUtils.readString(record.key).toLong + yield StandardCharsets.UTF_8.decode(record.key()).toString.toLong } def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: Scheduler): UnifiedLog = { @@ -308,4 +308,5 @@ object LogTestUtils { sequence += numRecords } } + } diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 770ec43211855..e30531779c20e 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -36,11 +36,14 @@ import org.apache.kafka.common.metrics.JmxReporter import org.apache.kafka.common.utils.Time import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} +import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile +import org.apache.kafka.storage.internals.log.UnifiedLog import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource +import java.io.File import scala.jdk.OptionConverters.RichOptional @Timeout(120) @@ -64,7 +67,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val topic = "test-topic-metric" createTopic(topic) deleteTopic(topic) - TestUtils.verifyTopicDeletion(topic, 1, brokers) + verifyTopicDeletion(topic, brokers) assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic") } @@ -78,7 +81,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist") brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic))) deleteTopic(topic) - TestUtils.verifyTopicDeletion(topic, 1, brokers) + verifyTopicDeletion(topic, brokers) assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic") } @@ -244,4 +247,41 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => s"($t)$$").getOrElse("")).r.pattern metrics.filter(pattern.matcher(_).matches()) } + + private def verifyTopicDeletion[B <: KafkaBroker](topic: String, brokers: Seq[B]): Unit = { + val topicPartitions = (0 until 1).map(new TopicPartition(topic, _)) + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => + brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)), + "Replica manager's should have deleted all of this topic's partitions") + // ensure that logs from all replicas are deleted + TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)), + "Replica logs not deleted after delete topic is complete") + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp => + val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir => + new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint"), null).read() + } + checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp)) + }), "Cleaner offset for deleted partition should have been removed") + TestUtils.waitUntilTrue(() => brokers.forall(broker => + broker.config.logDirs.stream().allMatch { logDir => + topicPartitions.forall { tp => + !new File(logDir, tp.topic + "-" + tp.partition).exists() + } + } + ), "Failed to soft-delete the data to a delete directory") + TestUtils.waitUntilTrue(() => brokers.forall(broker => + broker.config.logDirs.stream().allMatch { logDir => + topicPartitions.forall { tp => + !util.List.of(new File(logDir).list()).asScala.exists { partitionDirectoryNames => + partitionDirectoryNames.exists { directoryName => + directoryName.startsWith(tp.topic + "-" + tp.partition) && + directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX) + } + } + } + } + ), "Failed to hard-delete the delete directory") + } } diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index df10430ad7246..d96359342a973 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -20,7 +20,6 @@ package kafka.network import com.fasterxml.jackson.databind.ObjectMapper import kafka.network import kafka.server.EnvelopeUtils -import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig} @@ -29,7 +28,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._ import org.apache.kafka.common.message.{CreateTopicsRequestData, CreateTopicsResponseData, IncrementalAlterConfigsRequestData} import org.apache.kafka.common.network.{ClientInformation, ListenerName} -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.AlterConfigsRequest._ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} @@ -47,6 +46,7 @@ import java.io.IOException import java.net.InetAddress import java.nio.ByteBuffer import java.util +import java.util.Optional import java.util.concurrent.atomic.AtomicReference import scala.collection.Map import scala.jdk.CollectionConverters._ @@ -259,7 +259,7 @@ class RequestChannelTest { } private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = { - val wrappedRequest = TestUtils.buildEnvelopeRequest( + val wrappedRequest = buildEnvelopeRequest( request, principalSerde, requestChannelMetrics, @@ -277,6 +277,46 @@ class RequestChannelTest { unwrappedRequest.get() } + def buildEnvelopeRequest( + request: AbstractRequest, + principalSerde: KafkaPrincipalSerde, + requestChannelMetrics: RequestChannelMetrics, + startTimeNanos: Long, + dequeueTimeNanos: Long = -1, + fromPrivilegedListener: Boolean = true + ): RequestChannel.Request = { + val clientId = "id" + val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + + val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0) + val requestBuffer = request.serializeWithHeader(requestHeader) + + val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0) + val envelopeBuffer = new EnvelopeRequest.Builder( + requestBuffer, + principalSerde.serialize(KafkaPrincipal.ANONYMOUS), + InetAddress.getLocalHost.getAddress + ).build().serializeWithHeader(envelopeHeader) + + RequestHeader.parse(envelopeBuffer) + + val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost, Optional.empty(), + KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, + fromPrivilegedListener, Optional.of(principalSerde)) + + val envelopRequest = new RequestChannel.Request( + processor = 1, + context = envelopeContext, + startTimeNanos = startTimeNanos, + memoryPool = MemoryPool.NONE, + buffer = envelopeBuffer, + metrics = requestChannelMetrics, + envelope = None + ) + envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos + envelopRequest + } + private def isValidJson(str: String): Boolean = { try { val mapper = new ObjectMapper diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 3b2ac9ce3ee9b..68312a2c89cbe 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.utils.ProducerIdAndEpoch import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.IntegrationTestUtils import org.junit.jupiter.api.Assertions.{assertEquals, fail} @@ -70,10 +71,14 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def createTransactionStateTopic(): Unit = { val admin = cluster.admin() try { - TestUtils.createTransactionStateTopicWithAdmin( + TestUtils.createTopicWithAdmin( admin = admin, + topic = Topic.TRANSACTION_STATE_TOPIC_NAME, + numPartitions = brokers().head.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG), + replicationFactor = brokers().head.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt, brokers = brokers(), - controllers = controllerServers() + controllers = controllerServers(), + topicConfig = new Properties() ) } finally { admin.close() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3fdcdaf3e4966..0cfd4a3f02397 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -17,6 +17,7 @@ package kafka.server +import com.yammer.metrics.core.{Histogram, Meter} import kafka.cluster.Partition import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.network.RequestChannel @@ -95,7 +96,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion} import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.logger.LoggingController -import org.apache.kafka.server.metrics.ClientMetricsTestUtils +import org.apache.kafka.server.metrics.{ClientMetricsTestUtils, KafkaYammerMetrics} import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota, ReplicationQuotaManager, ThrottleCallback} import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch @@ -2260,12 +2261,22 @@ class KafkaApisTest extends Logging { assertEquals(expectedError, error) val metricName = if (version < 4) ApiKeys.ADD_PARTITIONS_TO_TXN.name else RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME - assertEquals(8, TestUtils.metersCount(metricName)) + assertEquals(8, metersCount(metricName)) } finally { requestMetrics.close() } } + private def metersCount(metricName: String): Long = { + KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (k, _) => k.getMBeanName.endsWith(metricName) } + .values.map { + case histogram: Histogram => histogram.count() + case meter: Meter => meter.count() + case _ => 0 + }.sum + } + @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index fc1f10fbbd73d..b55661b95ef5b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -16,8 +16,7 @@ */ package kafka.utils -import com.yammer.metrics.core.{Histogram, Meter} -import kafka.network.RequestChannel +import com.yammer.metrics.core.Meter import kafka.security.JaasTestUtils import kafka.server._ import kafka.utils.Implicits._ @@ -32,15 +31,12 @@ import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.errors.{TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.header.Header import org.apache.kafka.common.internals.{Plugin, Topic} -import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, ListenerName} -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.record.internal._ +import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record.internal._ import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.ResourcePattern -import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils.formatAddress @@ -48,14 +44,12 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr, MockConfigRepository} import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.network.metrics.RequestChannelMetrics import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig} -import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} +import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer} import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile -import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogManager, ProducerStateManagerConfig, UnifiedLog} +import org.apache.kafka.storage.internals.log._ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ @@ -63,9 +57,8 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean} import org.mockito.Mockito import java.io._ -import java.net.InetAddress import java.nio._ -import java.nio.charset.{Charset, StandardCharsets} +import java.nio.charset.StandardCharsets import java.nio.file.{Files, StandardOpenOption} import java.time.Duration import java.util @@ -78,7 +71,6 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption import scala.jdk.javaapi.OptionConverters -import scala.util.{Failure, Success, Try} /** * Utility functions to help with testing @@ -99,7 +91,6 @@ object TestUtils extends Logging { private val transactionStatusKey = "transactionStatus" private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8) - private val abortedValue : Array[Byte] = "aborted".getBytes(StandardCharsets.UTF_8) sealed trait LogDirFailureType case object Roll extends LogDirFailureType @@ -131,11 +122,6 @@ object TestUtils extends Logging { */ def tempFile(prefix: String, suffix: String): File = JTestUtils.tempFile(prefix, suffix) - def tempPropertiesFile(properties: Map[String, String]): File = { - val content = properties.map{case (k, v) => k + "=" + v}.mkString(System.lineSeparator()) - JTestUtils.tempFile(content) - } - /** * Create a test config for the provided parameters. * @@ -440,23 +426,6 @@ object TestUtils extends Logging { ) } - def createTransactionStateTopicWithAdmin[B <: KafkaBroker]( - admin: Admin, - brokers: Seq[B], - controllers: Seq[ControllerServer] - ): Map[Int, Int] = { - val broker = brokers.head - createTopicWithAdmin( - admin = admin, - topic = Topic.TRANSACTION_STATE_TOPIC_NAME, - numPartitions = broker.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG), - replicationFactor = broker.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt, - brokers = brokers, - controllers = controllers, - topicConfig = new Properties(), - ) - } - def deleteTopicWithAdmin[B <: KafkaBroker]( admin: Admin, topic: String, @@ -576,7 +545,7 @@ object TestUtils extends Logging { * * @return The new leader (note that negative values are used to indicate conditions like NoLeader and * LeaderDuringDelete). - * @throws AssertionError if the expected condition is not true within the timeout. + * @throws java.lang.AssertionError if the expected condition is not true within the timeout. */ def waitUntilLeaderIsElectedOrChangedWithAdmin( admin: Admin, @@ -597,17 +566,6 @@ object TestUtils extends Logging { } } } - doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition, timeoutMs, oldLeaderOpt, newLeaderOpt) - } - - private def doWaitUntilLeaderIsElectedOrChanged( - getPartitionLeader: (String, Int) => Option[Int], - topic: String, - partition: Int, - timeoutMs: Long, - oldLeaderOpt: Option[Int], - newLeaderOpt: Option[Int] - ): Int = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() val topicPartition = new TopicPartition(topic, partition) @@ -804,21 +762,6 @@ object TestUtils extends Logging { .getOrElse(throw new AssertionError(s"Unable to locate follower for $topicPartition")) } - /** - * Wait until all brokers know about each other. - * - * @param brokers The Kafka brokers. - * @param timeout The amount of time waiting on this condition before assert to fail - */ - def waitUntilBrokerMetadataIsPropagated[B <: KafkaBroker]( - brokers: Seq[B], - timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = { - val expectedBrokerIds = brokers.map(_.config.brokerId).toSet - waitUntilTrue(() => brokers.forall(server => - expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_)) - ), "Timed out waiting for broker metadata to propagate to all servers", timeout) - } - /** * Wait until the expected number of partitions is in the metadata cache in each broker. * @@ -1041,58 +984,6 @@ object TestUtils extends Logging { } } - def verifyTopicDeletion[B <: KafkaBroker]( - topic: String, - numPartitions: Int, - brokers: Seq[B]): Unit = { - val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) - // ensure that the topic-partition has been deleted from all brokers' replica managers - waitUntilTrue(() => - brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)), - "Replica manager's should have deleted all of this topic's partitions") - // ensure that logs from all replicas are deleted - waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)), - "Replica logs not deleted after delete topic is complete") - // ensure that topic is removed from all cleaner offsets - waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp => - val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir => - new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint"), null).read() - } - checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp)) - }), "Cleaner offset for deleted partition should have been removed") - waitUntilTrue(() => brokers.forall(broker => - broker.config.logDirs.stream().allMatch { logDir => - topicPartitions.forall { tp => - !new File(logDir, tp.topic + "-" + tp.partition).exists() - } - } - ), "Failed to soft-delete the data to a delete directory") - waitUntilTrue(() => brokers.forall(broker => - broker.config.logDirs.stream().allMatch { logDir => - topicPartitions.forall { tp => - !util.List.of(new File(logDir).list()).asScala.exists { partitionDirectoryNames => - partitionDirectoryNames.exists { directoryName => - directoryName.startsWith(tp.topic + "-" + tp.partition) && - directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX) - } - } - } - } - ), "Failed to hard-delete the delete directory") - } - - /** - * Translate the given buffer into a string - * - * @param buffer The buffer to translate - * @param encoding The encoding to use in translating bytes to characters - */ - def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = { - val bytes = new Array[Byte](buffer.remaining) - buffer.get(bytes) - new String(bytes, encoding) - } - def waitAndVerifyAcls(expected: Set[AccessControlEntry], authorizerPlugin: Plugin[JAuthorizer], resource: ResourcePattern, @@ -1112,7 +1003,7 @@ object TestUtils extends Logging { 45000) } - def consumeTopicRecords[K, V, B <: KafkaBroker]( + def consumeTopicRecords[B <: KafkaBroker]( brokers: Seq[B], topic: String, numMessages: Int, @@ -1223,7 +1114,7 @@ object TestUtils extends Logging { override def value() = if (willBeCommitted) committedValue else - abortedValue + "aborted".getBytes(StandardCharsets.UTF_8) } new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, key, value, util.Set.of(header)) } @@ -1270,86 +1161,6 @@ object TestUtils extends Logging { adminClient.incrementalAlterConfigs(configs) } - def assertLeader(client: Admin, topicPartition: TopicPartition, expectedLeader: Int): Unit = { - waitForLeaderToBecome(client, topicPartition, Some(expectedLeader)) - } - - def assertNoLeader(client: Admin, topicPartition: TopicPartition): Unit = { - waitForLeaderToBecome(client, topicPartition, None) - } - - def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { - waitUntilTrue(() => { - val nodes = client.describeCluster().nodes().get() - nodes.asScala.exists(_.id == brokerId) - }, s"Timed out waiting for brokerId $brokerId to come online") - } - - def waitForLeaderToBecome( - client: Admin, - topicPartition: TopicPartition, - expectedLeaderOpt: Option[Int] - ): Unit = { - val topic = topicPartition.topic - val partitionId = topicPartition.partition - - def currentLeader: Try[Option[Int]] = Try { - val topicDescription = client.describeTopics(util.List.of(topic)).allTopicNames.get.get(topic) - topicDescription.partitions.asScala - .find(_.partition == partitionId) - .flatMap(partitionState => Option(partitionState.leader)) - .map(_.id) - } - - val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) { - case Success(leaderOpt) => leaderOpt == expectedLeaderOpt - case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false - case Failure(e) => throw e - } - - assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " + - s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}") - } - - def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = { - waitUntilTrue( - () => { - val description = client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala - val isr = description - .values - .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) - .map(_.id) - .toSet - - brokerIds.intersect(isr).isEmpty - }, - s"Expected brokers $brokerIds to no longer be in the ISR for $partition" - ) - } - - def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = { - val description = admin.describeTopics(util.Set.of(partition.topic)) - .allTopicNames - .get - .asScala - - description - .values - .flatMap(_.partitions.asScala.flatMap(_.isr.asScala)) - .map(_.id) - .toSet - } - - def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: Set[Int]): Unit = { - waitUntilTrue( - () => { - val isr = currentIsr(client, partition) - brokerIds.subsetOf(isr) - }, - s"Expected brokers $brokerIds to be in the ISR for $partition" - ) - } - def assertBadConfigContainingMessage(props: Properties, expectedExceptionContainsText: String): Unit = { try { KafkaConfig.fromProps(props) @@ -1364,105 +1175,18 @@ object TestUtils extends Logging { } def totalMetricValue(broker: KafkaBroker, metricName: String): Long = { - totalMetricValue(broker.metrics, metricName) - } - - def totalMetricValue(metrics: Metrics, metricName: String): Long = { - val allMetrics = metrics.metrics + val allMetrics = broker.metrics.metrics val total = allMetrics.values().asScala.filter(_.metricName().name() == metricName) .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double]) total.toLong } def meterCount(metricName: String): Long = { - meterCountOpt(metricName).getOrElse(fail(s"Unable to find metric $metricName")) - } - - def meterCountOpt(metricName: String): Option[Long] = { KafkaYammerMetrics.defaultRegistry.allMetrics.asScala .filter { case (k, _) => k.getMBeanName.endsWith(metricName) } .values .headOption .map(_.asInstanceOf[Meter].count) - } - - def metersCount(metricName: String): Long = { - KafkaYammerMetrics.defaultRegistry.allMetrics.asScala - .filter { case (k, _) => k.getMBeanName.endsWith(metricName) } - .values.map { - case histogram: Histogram => histogram.count() - case meter: Meter => meter.count() - case _ => 0 - }.sum - } - - /** - * Find an Authorizer that we can call createAcls or deleteAcls on. - */ - def pickAuthorizerForWrite[B <: KafkaBroker]( - brokers: Seq[B], - controllers: Seq[ControllerServer], - ): JAuthorizer = { - if (controllers.isEmpty) { - brokers.head.authorizerPlugin.get.get - } else { - var result: JAuthorizer = null - TestUtils.retry(120000) { - val active = controllers.filter(_.controller.isActive).head - result = active.authorizerPlugin.get.get - } - result - } - } - - val anonymousAuthorizableContext = new AuthorizableRequestContext() { - override def listenerName(): String = "" - override def securityProtocol(): SecurityProtocol = SecurityProtocol.PLAINTEXT - override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS - override def clientAddress(): InetAddress = null - override def requestType(): Int = 0 - override def requestVersion(): Int = 0 - override def clientId(): String = "" - override def correlationId(): Int = 0 - } - - def buildEnvelopeRequest( - request: AbstractRequest, - principalSerde: KafkaPrincipalSerde, - requestChannelMetrics: RequestChannelMetrics, - startTimeNanos: Long, - dequeueTimeNanos: Long = -1, - fromPrivilegedListener: Boolean = true - ): RequestChannel.Request = { - val clientId = "id" - val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) - - val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0) - val requestBuffer = request.serializeWithHeader(requestHeader) - - val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0) - val envelopeBuffer = new EnvelopeRequest.Builder( - requestBuffer, - principalSerde.serialize(KafkaPrincipal.ANONYMOUS), - InetAddress.getLocalHost.getAddress - ).build().serializeWithHeader(envelopeHeader) - - RequestHeader.parse(envelopeBuffer) - - val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost, Optional.empty(), - KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, - fromPrivilegedListener, Optional.of(principalSerde)) - - val envelopRequest = new RequestChannel.Request( - processor = 1, - context = envelopeContext, - startTimeNanos = startTimeNanos, - memoryPool = MemoryPool.NONE, - buffer = envelopeBuffer, - metrics = requestChannelMetrics, - envelope = None - ) - envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos - envelopRequest + .getOrElse(fail(s"Unable to find metric $metricName")) } } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index 5e68bea182b2a..6591cac58d01e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -72,8 +72,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.jdk.javaapi.CollectionConverters; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -791,7 +789,7 @@ public void shouldAlterTopicConfig(boolean file) throws Exception { addedConfigs.put("delete.retention.ms", "1000000"); addedConfigs.put("min.insync.replicas", "2"); if (file) { - File f = kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs)); + File f = ToolsTestUtils.tempPropertiesFile(addedConfigs); filePath = f.getPath(); } @@ -1519,7 +1517,6 @@ public static Map concat(Map... maps) { return res; } - static class DummyAdminClient extends MockAdminClient { public DummyAdminClient(Node node) { super(List.of(node), node); diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java index 2f5f92a6ee7dc..d78236fa29275 100644 --- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.tools; -import kafka.utils.TestUtils; - import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.test.ClusterInstance; @@ -29,6 +29,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.utils.internals.Exit; +import org.apache.kafka.test.TestUtils; import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; @@ -38,16 +39,18 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; - -import scala.jdk.javaapi.CollectionConverters; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -83,16 +86,16 @@ public void testAllTopicPartition() throws InterruptedException, ExecutionExcept TopicPartition topicPartition = new TopicPartition(topic, partition); - TestUtils.assertLeader(client, topicPartition, broker2); + assertLeader(client, topicPartition, broker2); cluster.shutdownBroker(broker3); - TestUtils.waitForBrokersOutOfIsr(client, - CollectionConverters.asScala(List.of(topicPartition)).toSet(), - CollectionConverters.asScala(List.of(broker3)).toSet() + waitForBrokersOutOfIsr(client, + Set.of(topicPartition), + Set.of(broker3) ); cluster.shutdownBroker(broker2); - TestUtils.assertNoLeader(client, topicPartition); + assertNoLeader(client, topicPartition); cluster.startBroker(broker3); - TestUtils.waitForOnlineBroker(client, broker3); + waitForOnlineBroker(client, broker3); assertEquals(0, LeaderElectionCommand.mainNoExit( "--bootstrap-server", cluster.bootstrapServers(), @@ -100,7 +103,7 @@ public void testAllTopicPartition() throws InterruptedException, ExecutionExcept "--all-topic-partitions" )); - TestUtils.assertLeader(client, topicPartition, broker3); + assertLeader(client, topicPartition, broker3); } } @@ -189,17 +192,17 @@ public void testTopicPartition() throws InterruptedException, ExecutionException TopicPartition topicPartition = new TopicPartition(topic, partition); - TestUtils.assertLeader(client, topicPartition, broker2); + assertLeader(client, topicPartition, broker2); cluster.shutdownBroker(broker3); - TestUtils.waitForBrokersOutOfIsr(client, - CollectionConverters.asScala(List.of(topicPartition)).toSet(), - CollectionConverters.asScala(List.of(broker3)).toSet() + waitForBrokersOutOfIsr(client, + Set.of(topicPartition), + Set.of(broker3) ); cluster.shutdownBroker(broker2); - TestUtils.assertNoLeader(client, topicPartition); + assertNoLeader(client, topicPartition); cluster.startBroker(broker3); - TestUtils.waitForOnlineBroker(client, broker3); + waitForOnlineBroker(client, broker3); assertEquals(0, LeaderElectionCommand.mainNoExit( "--bootstrap-server", cluster.bootstrapServers(), @@ -208,7 +211,7 @@ public void testTopicPartition() throws InterruptedException, ExecutionException "--partition", Integer.toString(partition) )); - TestUtils.assertLeader(client, topicPartition, broker3); + assertLeader(client, topicPartition, broker3); } } @@ -227,17 +230,17 @@ public void testPathToJsonFile() throws Exception { TopicPartition topicPartition = new TopicPartition(topic, partition); - TestUtils.assertLeader(client, topicPartition, broker2); + assertLeader(client, topicPartition, broker2); cluster.shutdownBroker(broker3); - TestUtils.waitForBrokersOutOfIsr(client, - CollectionConverters.asScala(List.of(topicPartition)).toSet(), - CollectionConverters.asScala(List.of(broker3)).toSet() + waitForBrokersOutOfIsr(client, + Set.of(topicPartition), + Set.of(broker3) ); cluster.shutdownBroker(broker2); - TestUtils.assertNoLeader(client, topicPartition); + assertNoLeader(client, topicPartition); cluster.startBroker(broker3); - TestUtils.waitForOnlineBroker(client, broker3); + waitForOnlineBroker(client, broker3); Path topicPartitionPath = tempTopicPartitionFile(List.of(topicPartition)); @@ -247,7 +250,7 @@ public void testPathToJsonFile() throws Exception { "--path-to-json-file", topicPartitionPath.toString() )); - TestUtils.assertLeader(client, topicPartition, broker3); + assertLeader(client, topicPartition, broker3); } } @@ -266,14 +269,12 @@ public void testPreferredReplicaElection() throws InterruptedException, Executio TopicPartition topicPartition = new TopicPartition(topic, partition); - TestUtils.assertLeader(client, topicPartition, broker2); + assertLeader(client, topicPartition, broker2); cluster.shutdownBroker(broker2); - TestUtils.assertLeader(client, topicPartition, broker3); + assertLeader(client, topicPartition, broker3); cluster.startBroker(broker2); - TestUtils.waitForBrokersInIsr(client, topicPartition, - CollectionConverters.asScala(List.of(broker2)).toSet() - ); + waitForBrokersInIsr(client, topicPartition, Set.of(broker2)); assertEquals(0, LeaderElectionCommand.mainNoExit( "--bootstrap-server", cluster.bootstrapServers(), @@ -282,7 +283,7 @@ public void testPreferredReplicaElection() throws InterruptedException, Executio "--partition", Integer.toString(partition) )); - TestUtils.assertLeader(client, topicPartition, broker2); + assertLeader(client, topicPartition, broker2); } } @@ -319,18 +320,14 @@ public void testElectionResultOutput() throws Exception { topicPartition0 = new TopicPartition(topic, partition0); topicPartition1 = new TopicPartition(topic, partition1); - TestUtils.assertLeader(client, topicPartition0, broker2); - TestUtils.assertLeader(client, topicPartition1, broker3); + assertLeader(client, topicPartition0, broker2); + assertLeader(client, topicPartition1, broker3); cluster.shutdownBroker(broker2); - TestUtils.assertLeader(client, topicPartition0, broker3); + assertLeader(client, topicPartition0, broker3); cluster.startBroker(broker2); - TestUtils.waitForBrokersInIsr(client, topicPartition0, - CollectionConverters.asScala(List.of(broker2)).toSet() - ); - TestUtils.waitForBrokersInIsr(client, topicPartition1, - CollectionConverters.asScala(List.of(broker2)).toSet() - ); + waitForBrokersInIsr(client, topicPartition0, Set.of(broker2)); + waitForBrokersInIsr(client, topicPartition1, Set.of(broker2)); } Path topicPartitionPath = tempTopicPartitionFile(List.of(topicPartition0, topicPartition1)); @@ -396,4 +393,95 @@ private String stringifyTopicPartitions(Set topicPartitions) { sb.append("]}"); return sb.toString(); } + + private void waitForBrokersOutOfIsr( + Admin client, + Set partitions, + Set brokerIds + ) throws InterruptedException { + TestUtils.waitForCondition( + () -> { + Set topics = partitions.stream() + .map(TopicPartition::topic) + .collect(Collectors.toSet()); + + Map description = client.describeTopics(topics).allTopicNames().get(); + + Set isr = description.values().stream() + .flatMap(desc -> desc.partitions().stream()) + .flatMap(info -> info.isr().stream()) + .map(Node::id) + .collect(Collectors.toSet()); + + return Collections.disjoint(brokerIds, isr); + }, + "Expected brokers " + brokerIds + " to no longer be in the ISR for " + partitions + ); + } + + private void waitForBrokersInIsr(Admin client, TopicPartition partition, Set brokerIds) throws InterruptedException { + TestUtils.waitForCondition( + () -> { + Set isr = client.describeTopics(Set.of(partition.topic())) + .allTopicNames() + .get() + .values().stream() + .flatMap(desc -> desc.partitions().stream()) + .flatMap(info -> info.isr().stream()) + .map(Node::id) + .collect(Collectors.toSet()); + + return isr.containsAll(brokerIds); + }, + "Expected brokers " + brokerIds + " to be in the ISR for " + partition + ); + } + + private void waitForOnlineBroker(Admin client, int brokerId) throws InterruptedException { + TestUtils.waitForCondition( + () -> client.describeCluster().nodes().get().stream() + .anyMatch(node -> node.id() == brokerId), + "Timed out waiting for brokerId " + brokerId + " to come online" + ); + } + + private void assertLeader(Admin client, TopicPartition topicPartition, int expectedLeader) throws InterruptedException { + waitForLeaderToBecome(client, topicPartition, Optional.of(expectedLeader)); + } + + private void assertNoLeader(Admin client, TopicPartition topicPartition) throws InterruptedException { + waitForLeaderToBecome(client, topicPartition, Optional.empty()); + } + + private void waitForLeaderToBecome( + Admin client, + TopicPartition topicPartition, + Optional expectedLeaderOpt + ) throws InterruptedException { + String topic = topicPartition.topic(); + int partitionId = topicPartition.partition(); + AtomicReference lastLeaderRef = new AtomicReference<>("unknown"); + + TestUtils.waitForCondition( + () -> { + Set existingTopics = client.listTopics().names().get(); + if (!existingTopics.contains(topic)) { + return false; + } + + TopicDescription topicDescription = client.describeTopics(List.of(topic)) + .allTopicNames().get().get(topic); + Optional currentLeader = topicDescription.partitions().stream() + .filter(p -> p.partition() == partitionId) + .findFirst() + .flatMap(p -> Optional.ofNullable(p.leader())) + .map(Node::id); + + lastLeaderRef.set(currentLeader.map(String::valueOf).orElse("none")); + return currentLeader.equals(expectedLeaderOpt); + }, + "Timed out waiting for leader to become " + expectedLeaderOpt.map(String::valueOf).orElse("none") + + ". Last metadata lookup returned leader = " + lastLeaderRef.get() + ); + } }