diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java index 2564a683ed92e..d2da8fc0bd475 100644 --- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java @@ -106,6 +106,11 @@ public long remainingFetchTimeMs(long currentTimeMs) { return fetchTimer.remainingMs(); } + public long remainingUpdateVoterSetTimeMs(long currentTimeMs) { + updateVoterSetPeriodTimer.update(currentTimeMs); + return updateVoterSetPeriodTimer.remainingMs(); + } + public int leaderId() { return leaderId; } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 02ae182901f5a..4a457df381e5b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1725,6 +1725,13 @@ private boolean handleFetchResponse( leaderEndpoints = Endpoints.empty(); } + maybeSwitchObserverFetchToLeader( + responseEpoch, + responseLeaderId, + leaderEndpoints, + currentTimeMs + ); + Optional handled = maybeHandleCommonResponse( error, responseLeaderId, @@ -2608,6 +2615,32 @@ private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { } } + /** + * If the local replica is an observer currently routing fetches to bootstrap servers, + * and a non-leader source's fetch response advertises the leader's endpoints, switch + * the observer back to fetching from the leader. + */ + private void maybeSwitchObserverFetchToLeader( + int responseEpoch, + OptionalInt responseLeaderId, + Endpoints leaderEndpoints, + long currentTimeMs + ) { + if (!hasConsistentLeader(responseEpoch, responseLeaderId)) { + throw new IllegalStateException("Received request or response with leader " + responseLeaderId + + " and epoch " + responseEpoch + " which is inconsistent with current leader " + + quorum.leaderId() + " and epoch " + quorum.epoch()); + } else if (responseEpoch == quorum.epoch() && quorum.isUnattached() && + responseLeaderId.isPresent() && !leaderEndpoints.isEmpty()) { + transitionToFollower( + responseEpoch, + responseLeaderId.getAsInt(), + leaderEndpoints, + currentTimeMs + ); + } + } + /** * Handle response errors that are common across request types. * @@ -3390,6 +3423,9 @@ private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { state.resetUpdateVoterSetPeriod(currentTimeMs); } return sendResult.timeToWaitMs(); + } else if (state.hasFetchTimeoutExpired(currentTimeMs)) { + transitionToUnattached(state.epoch(), OptionalInt.of(state.leaderId())); + return 0L; } else { return maybeSendFetchToBestNode(state, currentTimeMs); } diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 4ae24c84eaf99..bc5701466e790 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -379,7 +379,7 @@ public void transitionToResigned(List preferredSuccessors) { */ public void transitionToUnattached(int epoch, OptionalInt leaderId) { int currentEpoch = state.epoch(); - if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective())) { + if (epoch < currentEpoch || (epoch == currentEpoch && !isProspective() && !isObserver())) { throw new IllegalStateException( String.format( "Cannot transition to Unattached with epoch %d from current state %s", diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java index 74e4afd362ac2..33d4c9ecc78c2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java @@ -38,6 +38,7 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -765,4 +766,75 @@ void testUpdatedHighWatermarkCompleted() throws Exception { assertEquals(localLogEndOffset, partitionResponse.highWatermark()); } } + + @Test + void testObserverFetchesBetweenLeaderAndBootstrapServers() throws Exception { + final var epoch = 2; + final var local = KafkaRaftClientTest.replicaKey( + KafkaRaftClientTest.randomReplicaId(), + true + ); + final var leader = KafkaRaftClientTest.replicaKey(local.id() + 1, true); + final var otherVoter = KafkaRaftClientTest.replicaKey(local.id() + 2, true); + + final var voters = VoterSet.fromMap( + Map.of( + leader.id(), VoterSetTest.voterNode(leader), + otherVoter.id(), VoterSetTest.voterNode(otherVoter) + ) + ); + + final var context = new RaftClientTestContext.Builder( + local.id(), + local.directoryId().get() + ) + .withStaticVoters(voters) + .withBootstrapServers(Optional.of(List.of(RaftClientTestContext.mockAddress(otherVoter.id())))) + .withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL) + .build(); + + for (int i = 0; i < 10; ++i) { + // The observer initially fetches from the bootstrap servers, where it will discover the leader's endpoints. + context.pollUntilRequest(); + final var bootstrapFetch = context.assertSentFetchRequest(); + assertEquals(-2, bootstrapFetch.destination().id()); + assertEquals(RaftClientTestContext.mockAddress(otherVoter.id()).getHostName(), bootstrapFetch.destination().host()); + assertEquals(RaftClientTestContext.mockAddress(otherVoter.id()).getPort(), bootstrapFetch.destination().port()); + + context.deliverResponse( + bootstrapFetch.correlationId(), + bootstrapFetch.destination(), + context.fetchResponse( + epoch, + leader.id(), + MemoryRecords.EMPTY, + 0L, + Errors.NOT_LEADER_OR_FOLLOWER + ) + ); + + // Subsequent fetch from the observer is sent to the leader + context.pollUntilRequest(); + final var leaderFetch = context.assertSentFetchRequest(); + assertEquals(leader.id(), leaderFetch.destination().id()); + assertEquals(RaftClientTestContext.mockAddress(leader.id()).getHostName(), leaderFetch.destination().host()); + assertEquals(RaftClientTestContext.mockAddress(leader.id()).getPort(), leaderFetch.destination().port()); + + // Return a BROKER_NOT_AVAILABLE error, and then advance time past the fetch timeout, + // which should cause the observer to fetch from the bootstrap servers on the next fetch. + + // The fetch timeout is much greater than the request manager's configured backoff, so the + // current unreachable connection will no longer be backing off when the next fetch is sent. + context.deliverResponse( + leaderFetch.correlationId(), + leaderFetch.destination(), + RaftUtil.errorResponse( + ApiKeys.FETCH, + Errors.BROKER_NOT_AVAILABLE + ) + ); + context.client.poll(); + context.time.sleep(context.fetchTimeoutMs + 1); + } + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index e4d879c64edf3..897235a4a4dfb 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2011,7 +2011,7 @@ public void testFollowerAsObserverDoesNotBecomeProspectiveAfterFetchTimeout(bool context.time.sleep(context.fetchTimeoutMs); context.pollUntilRequest(); - assertTrue(context.client.quorum().isFollower()); + assertFalse(context.client.quorum().isProspective()); // transitions to unattached context.deliverRequest(context.voteRequest(epoch + 1, replicaKey(otherNodeId, withKip853Rpc), epoch, 1)); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 2b01c15325471..39a8874649cd0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -998,8 +998,13 @@ void advanceTimeAndCompleteFetch( int leaderId, boolean expireUpdateVoterSetTimer ) throws Exception { + final var state = client.quorum().followerStateOrThrow(); for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; i++) { - time.sleep(fetchTimeoutMs - 1); + long sleepMs = Math.min( + state.remainingFetchTimeMs(time.milliseconds()) - 1, + state.remainingUpdateVoterSetTimeMs(time.milliseconds()) - 1 + ); + time.sleep(Math.max(0, sleepMs)); pollUntilRequest(); final var fetchRequest = assertSentFetchRequest(); assertFetchRequestData( @@ -1025,7 +1030,8 @@ void advanceTimeAndCompleteFetch( client.poll(); } if (expireUpdateVoterSetTimer) { - time.sleep(fetchTimeoutMs - 1); + long remaining = state.remainingUpdateVoterSetTimeMs(time.milliseconds()); + time.sleep(remaining + 1); } }