Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4177,11 +4177,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

def removeAllClientAcls(): Unit = {
val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers, controllerServers)
val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY)
val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, aclEntryFilter)

authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala.
authorizerForWrite.deleteAcls(anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala.
map(_.toCompletableFuture.get).flatMap { deletion =>
deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
}.foreach { resource =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
override def setUp(testInfo: TestInfo): Unit = {
this.testInfo = testInfo
super.setUp(testInfo)
waitUntilBrokerMetadataIsPropagated(brokers)
val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
waitUntilTrue(() => brokers.forall(server =>
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
), "Timed out waiting for broker metadata to propagate to all servers", 15000)
}

@AfterEach
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}

import java.io.File
import java.net.InetAddress
import java.time.Duration
import java.util
import java.util.Properties
Expand Down Expand Up @@ -214,6 +216,17 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}

val anonymousAuthorizableContext = new AuthorizableRequestContext() {
override def listenerName(): String = ""
override def securityProtocol(): SecurityProtocol = SecurityProtocol.PLAINTEXT
override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
override def clientAddress(): InetAddress = null
override def requestType(): Int = 0
override def requestVersion(): Int = 0
override def clientId(): String = ""
override def correlationId(): Int = 0
}

def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
Expand Down Expand Up @@ -371,4 +384,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
}

/**
* Find an Authorizer that we can call createAcls or deleteAcls on.
*/
def pickAuthorizerForWrite[B <: KafkaBroker](brokers: Seq[B], controllers: Seq[ControllerServer]): JAuthorizer = {
if (controllers.isEmpty) {
brokers.head.authorizerPlugin.get.get
} else {
var result: JAuthorizer = null
TestUtils.retry(120000) {
val active = controllers.filter(_.controller.isActive).head
result = active.authorizerPlugin.get.get
}
result
}
}
}
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ class LogCleanerTest extends Logging {

// the last (active) segment has just one message

def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq
def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => StandardCharsets.UTF_8.decode(record.value()).toString).toSet.size).toSeq

val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
Expand Down Expand Up @@ -1927,7 +1927,7 @@ class LogCleanerTest extends Logging {

for (segment <- log.logSegments.asScala; batch <- segment.log.batches.asScala; record <- batch.asScala) {
assertTrue(record.hasMagic(batch.magic))
val value = TestUtils.readString(record.value).toLong
val value = StandardCharsets.UTF_8.decode(record.value()).toString.toLong
assertEquals(record.offset, value)
}
}
Expand All @@ -1947,7 +1947,7 @@ class LogCleanerTest extends Logging {

for (logEntry <- records.records.asScala) {
val offset = logEntry.offset
val value = TestUtils.readString(logEntry.value).toLong
val value = StandardCharsets.UTF_8.decode(logEntry.value()).toString.toLong
assertEquals(offset, value)
}
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.log

import java.io.File
import java.util.Properties
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.record.internal.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
Expand All @@ -38,9 +37,10 @@ import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
import org.apache.kafka.common.message.AbortedTxn
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption

Expand Down Expand Up @@ -202,7 +202,7 @@ object LogTestUtils {
for (logSegment <- log.logSegments.asScala;
batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
record <- batch.asScala if record.hasValue && record.hasKey)
yield TestUtils.readString(record.key).toLong
yield StandardCharsets.UTF_8.decode(record.key()).toString.toLong
}

def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: Scheduler): UnifiedLog = {
Expand Down Expand Up @@ -308,4 +308,5 @@ object LogTestUtils {
sequence += numRecords
}
}

}
44 changes: 42 additions & 2 deletions core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

import java.io.File
import scala.jdk.OptionConverters.RichOptional

@Timeout(120)
Expand All @@ -64,7 +67,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val topic = "test-topic-metric"
createTopic(topic)
deleteTopic(topic)
TestUtils.verifyTopicDeletion(topic, 1, brokers)
verifyTopicDeletion(topic, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}

Expand All @@ -78,7 +81,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
deleteTopic(topic)
TestUtils.verifyTopicDeletion(topic, 1, brokers)
verifyTopicDeletion(topic, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
}

Expand Down Expand Up @@ -244,4 +247,41 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => s"($t)$$").getOrElse("")).r.pattern
metrics.filter(pattern.matcher(_).matches())
}

private def verifyTopicDeletion[B <: KafkaBroker](topic: String, brokers: Seq[B]): Unit = {
val topicPartitions = (0 until 1).map(new TopicPartition(topic, _))
// ensure that the topic-partition has been deleted from all brokers' replica managers
TestUtils.waitUntilTrue(() =>
brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)),
"Replica manager's should have deleted all of this topic's partitions")
// ensure that logs from all replicas are deleted
TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)),
"Replica logs not deleted after delete topic is complete")
// ensure that topic is removed from all cleaner offsets
TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp =>
val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint"), null).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp))
}), "Cleaner offset for deleted partition should have been removed")
TestUtils.waitUntilTrue(() => brokers.forall(broker =>
broker.config.logDirs.stream().allMatch { logDir =>
topicPartitions.forall { tp =>
!new File(logDir, tp.topic + "-" + tp.partition).exists()
}
}
), "Failed to soft-delete the data to a delete directory")
TestUtils.waitUntilTrue(() => brokers.forall(broker =>
broker.config.logDirs.stream().allMatch { logDir =>
topicPartitions.forall { tp =>
!util.List.of(new File(logDir).list()).asScala.exists { partitionDirectoryNames =>
partitionDirectoryNames.exists { directoryName =>
directoryName.startsWith(tp.topic + "-" + tp.partition) &&
directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
}
}
}
}
), "Failed to hard-delete the delete directory")
}
}
46 changes: 43 additions & 3 deletions core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.network
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network
import kafka.server.EnvelopeUtils
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
Expand All @@ -29,7 +28,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
import org.apache.kafka.common.message.{CreateTopicsRequestData, CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
Expand All @@ -47,6 +46,7 @@ import java.io.IOException
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicReference
import scala.collection.Map
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -259,7 +259,7 @@ class RequestChannelTest {
}

private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = {
val wrappedRequest = TestUtils.buildEnvelopeRequest(
val wrappedRequest = buildEnvelopeRequest(
request,
principalSerde,
requestChannelMetrics,
Expand All @@ -277,6 +277,46 @@ class RequestChannelTest {
unwrappedRequest.get()
}

def buildEnvelopeRequest(
request: AbstractRequest,
principalSerde: KafkaPrincipalSerde,
requestChannelMetrics: RequestChannelMetrics,
startTimeNanos: Long,
dequeueTimeNanos: Long = -1,
fromPrivilegedListener: Boolean = true
): RequestChannel.Request = {
val clientId = "id"
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)

val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0)
val requestBuffer = request.serializeWithHeader(requestHeader)

val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
val envelopeBuffer = new EnvelopeRequest.Builder(
requestBuffer,
principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
InetAddress.getLocalHost.getAddress
).build().serializeWithHeader(envelopeHeader)

RequestHeader.parse(envelopeBuffer)

val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost, Optional.empty(),
KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
fromPrivilegedListener, Optional.of(principalSerde))

val envelopRequest = new RequestChannel.Request(
processor = 1,
context = envelopeContext,
startTimeNanos = startTimeNanos,
memoryPool = MemoryPool.NONE,
buffer = envelopeBuffer,
metrics = requestChannelMetrics,
envelope = None
)
envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
envelopRequest
}

private def isValidJson(str: String): Boolean = {
try {
val mapper = new ObjectMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, fail}

Expand Down Expand Up @@ -70,10 +71,14 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected def createTransactionStateTopic(): Unit = {
val admin = cluster.admin()
try {
TestUtils.createTransactionStateTopicWithAdmin(
TestUtils.createTopicWithAdmin(
admin = admin,
topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
numPartitions = brokers().head.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
replicationFactor = brokers().head.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
brokers = brokers(),
controllers = controllerServers()
controllers = controllerServers(),
topicConfig = new Properties()
)
} finally {
admin.close()
Expand Down
15 changes: 13 additions & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server

import com.yammer.metrics.core.{Histogram, Meter}
import kafka.cluster.Partition
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
Expand Down Expand Up @@ -95,7 +96,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.metrics.{ClientMetricsTestUtils, KafkaYammerMetrics}
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota, ReplicationQuotaManager, ThrottleCallback}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
Expand Down Expand Up @@ -2260,12 +2261,22 @@ class KafkaApisTest extends Logging {
assertEquals(expectedError, error)

val metricName = if (version < 4) ApiKeys.ADD_PARTITIONS_TO_TXN.name else RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME
assertEquals(8, TestUtils.metersCount(metricName))
assertEquals(8, metersCount(metricName))
} finally {
requestMetrics.close()
}
}

private def metersCount(metricName: String): Long = {
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
.values.map {
case histogram: Histogram => histogram.count()
case meter: Meter => meter.count()
case _ => 0
}.sum
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
Expand Down
Loading
Loading