Skip to content

Commit 9278134

Browse files
committed
MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java
1 parent 75052bb commit 9278134

2 files changed

Lines changed: 101 additions & 105 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import kafka.raft.KafkaRaftManager;
24+
import org.apache.kafka.common.test.KafkaClusterTestKit;
25+
import org.apache.kafka.common.test.TestKitNodes;
26+
import org.apache.kafka.common.utils.LogContext;
27+
import org.apache.kafka.common.utils.internals.BufferSupplier;
28+
import org.apache.kafka.metadata.MetadataRecordSerde;
29+
import org.apache.kafka.raft.MetadataLogConfig;
30+
import org.apache.kafka.server.common.ApiMessageAndVersion;
31+
import org.apache.kafka.snapshot.RecordsSnapshotReader;
32+
import org.apache.kafka.test.TestUtils;
33+
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.api.Timeout;
35+
36+
import java.util.Map;
37+
import java.util.stream.Collectors;
38+
39+
@Timeout(120)
40+
public class RaftClusterSnapshotTest {
41+
42+
@Test
43+
public void testSnapshotsGenerated() throws Exception {
44+
int numberOfBrokers = 3;
45+
int numberOfControllers = 3;
46+
47+
try (var cluster = new KafkaClusterTestKit.Builder(
48+
new TestKitNodes.Builder()
49+
.setNumBrokerNodes(numberOfBrokers)
50+
.setNumControllerNodes(numberOfControllers)
51+
.build())
52+
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
53+
.setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
54+
.build()) {
55+
56+
cluster.format();
57+
cluster.startup();
58+
59+
// Check that every controller and broker has a snapshot
60+
TestUtils.waitForCondition(
61+
() -> cluster.raftManagers().values().stream()
62+
.allMatch(raftManager -> raftManager.raftLog().latestSnapshotId().isPresent()),
63+
() -> "Expected for every controller and broker to generate a snapshot: " +
64+
cluster.raftManagers().entrySet().stream()
65+
.collect(Collectors.toMap(
66+
Map.Entry::getKey,
67+
e -> e.getValue().raftLog().latestSnapshotId()
68+
))
69+
);
70+
71+
assertEquals(numberOfControllers + numberOfBrokers, cluster.raftManagers().size());
72+
73+
// For every controller and broker perform some sanity checks against the latest snapshot
74+
for (KafkaRaftManager<ApiMessageAndVersion> raftManager : cluster.raftManagers().values()) {
75+
try (var snapshot = RecordsSnapshotReader.of(
76+
raftManager.raftLog().latestSnapshot().get(),
77+
new MetadataRecordSerde(),
78+
BufferSupplier.create(),
79+
1,
80+
true,
81+
new LogContext()
82+
)) {
83+
// Check that the snapshot is non-empty
84+
assertTrue(snapshot.hasNext());
85+
86+
// Check that we can read the entire snapshot
87+
while (snapshot.hasNext()) {
88+
var batch = snapshot.next();
89+
assertTrue(batch.sizeInBytes() > 0);
90+
// A batch must have at least one control records or at least one data records, but not both
91+
assertNotEquals(
92+
batch.records().isEmpty(),
93+
batch.controlRecords().isEmpty(),
94+
"data records = " + batch.records() + "; control records = " + batch.controlRecords()
95+
);
96+
}
97+
}
98+
}
99+
}
100+
}
101+
}

core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala

Lines changed: 0 additions & 105 deletions
This file was deleted.

0 commit comments

Comments
 (0)