Skip to content

Commit 8f817f3

Browse files
committed
MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java
1 parent 75052bb commit 8f817f3

2 files changed

Lines changed: 104 additions & 105 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.server;
19+
20+
import kafka.raft.KafkaRaftManager;
21+
22+
import org.apache.kafka.common.test.KafkaClusterTestKit;
23+
import org.apache.kafka.common.test.TestKitNodes;
24+
import org.apache.kafka.common.utils.LogContext;
25+
import org.apache.kafka.common.utils.internals.BufferSupplier;
26+
import org.apache.kafka.metadata.MetadataRecordSerde;
27+
import org.apache.kafka.raft.MetadataLogConfig;
28+
import org.apache.kafka.server.common.ApiMessageAndVersion;
29+
import org.apache.kafka.snapshot.RecordsSnapshotReader;
30+
import org.apache.kafka.test.TestUtils;
31+
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.Timeout;
34+
35+
import java.util.Map;
36+
import java.util.stream.Collectors;
37+
38+
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
40+
import static org.junit.jupiter.api.Assertions.assertTrue;
41+
42+
@Timeout(120)
43+
public class RaftClusterSnapshotTest {
44+
45+
@Test
46+
public void testSnapshotsGenerated() throws Exception {
47+
int numberOfBrokers = 3;
48+
int numberOfControllers = 3;
49+
50+
try (var cluster = new KafkaClusterTestKit.Builder(
51+
new TestKitNodes.Builder()
52+
.setNumBrokerNodes(numberOfBrokers)
53+
.setNumControllerNodes(numberOfControllers)
54+
.build())
55+
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, "10")
56+
.setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, "0")
57+
.build()) {
58+
59+
cluster.format();
60+
cluster.startup();
61+
62+
// Check that every controller and broker has a snapshot
63+
TestUtils.waitForCondition(
64+
() -> cluster.raftManagers().values().stream()
65+
.allMatch(raftManager -> raftManager.raftLog().latestSnapshotId().isPresent()),
66+
() -> "Expected for every controller and broker to generate a snapshot: " +
67+
cluster.raftManagers().entrySet().stream()
68+
.collect(Collectors.toMap(
69+
Map.Entry::getKey,
70+
e -> e.getValue().raftLog().latestSnapshotId()
71+
))
72+
);
73+
74+
assertEquals(numberOfControllers + numberOfBrokers, cluster.raftManagers().size());
75+
76+
// For every controller and broker perform some sanity checks against the latest snapshot
77+
for (KafkaRaftManager<ApiMessageAndVersion> raftManager : cluster.raftManagers().values()) {
78+
try (var snapshot = RecordsSnapshotReader.of(
79+
raftManager.raftLog().latestSnapshot().get(),
80+
new MetadataRecordSerde(),
81+
BufferSupplier.create(),
82+
1,
83+
true,
84+
new LogContext()
85+
)) {
86+
// Check that the snapshot is non-empty
87+
assertTrue(snapshot.hasNext());
88+
89+
// Check that we can read the entire snapshot
90+
while (snapshot.hasNext()) {
91+
var batch = snapshot.next();
92+
assertTrue(batch.sizeInBytes() > 0);
93+
// A batch must have at least one control records or at least one data records, but not both
94+
assertNotEquals(
95+
batch.records().isEmpty(),
96+
batch.controlRecords().isEmpty(),
97+
"data records = " + batch.records() + "; control records = " + batch.controlRecords()
98+
);
99+
}
100+
}
101+
}
102+
}
103+
}
104+
}

core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala

Lines changed: 0 additions & 105 deletions
This file was deleted.

0 commit comments

Comments
 (0)