From 86fbf884b7b7cbb524d20b1483a4dae49d82365c Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Mon, 22 Jun 2026 16:47:12 +0800 Subject: [PATCH] [#2753] fix(spark): ignore removed failed blocks --- .../uniffle/shuffle/ReassignExecutor.java | 6 ++++ .../uniffle/shuffle/ReassignExecutorTest.java | 14 +++++++++ .../client/impl/FailedBlockSendTracker.java | 5 ++-- .../impl/FailedBlockSendTrackerTest.java | 29 +++++++++++++++++++ 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/ReassignExecutor.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/ReassignExecutor.java index 57bd256414..6b9e040ab8 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/ReassignExecutor.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/ReassignExecutor.java @@ -133,6 +133,9 @@ private void reassignAndResendForFailedBlocks(FailedBlockSendTracker failedBlock for (Long blockId : failedBlockIds) { List failedBlockStatus = failedBlockSendTracker.getFailedBlockStatus(blockId); + if (CollectionUtils.isEmpty(failedBlockStatus)) { + continue; + } synchronized (failedBlockStatus) { int retryCnt = failedBlockStatus.stream() @@ -180,6 +183,9 @@ private void reassignAndResendForFailedBlocks(FailedBlockSendTracker failedBlock resendBlocks.addAll(failedBlockStatus); } } + if (resendBlocks.isEmpty()) { + return; + } reassignAndResendBlocks(resendBlocks); } diff --git a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/ReassignExecutorTest.java b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/ReassignExecutorTest.java index fd25e0a2c3..bc8501da20 100644 --- a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/ReassignExecutorTest.java +++ b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/ReassignExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.uniffle.shuffle; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -44,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class ReassignExecutorTest { @@ -104,6 +106,18 @@ void testRetryExceededShouldFailAndReleaseResources() { verify(blockInfo).executeCompletionCallback(true); } + @Test + void testRemovedFailureStatusShouldBeIgnored() { + long blockId = 100L; + when(failedBlockSendTracker.getFailedBlockIds()) + .thenReturn(new HashSet<>(Arrays.asList(blockId))); + when(failedBlockSendTracker.getFailedBlockStatus(blockId)).thenReturn(Collections.emptyList()); + + executor.reassign(); + + verifyNoInteractions(removeBlockStatsFunction, resendBlocksFunction, shuffleManagerClient); + } + @Test public void testMixedSameAndDifferent() throws Exception { Map, List>> map = new HashMap<>(); diff --git a/client/src/main/java/org/apache/uniffle/client/impl/FailedBlockSendTracker.java b/client/src/main/java/org/apache/uniffle/client/impl/FailedBlockSendTracker.java index 22c6d21fba..b1a36a025d 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/FailedBlockSendTracker.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/FailedBlockSendTracker.java @@ -84,11 +84,12 @@ public void clearAndReleaseBlockResources() { } public Set getFailedBlockIds() { - return trackingBlockStatusMap.keySet(); + return Sets.newHashSet(trackingBlockStatusMap.keySet()); } public List getFailedBlockStatus(Long blockId) { - return trackingBlockStatusMap.get(blockId); + List statuses = trackingBlockStatusMap.get(blockId); + return statuses == null ? Collections.emptyList() : statuses; } public Set getFaultyShuffleServers() { diff --git a/client/src/test/java/org/apache/uniffle/client/impl/FailedBlockSendTrackerTest.java b/client/src/test/java/org/apache/uniffle/client/impl/FailedBlockSendTrackerTest.java index ec5b0ab164..a5660c4a24 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/FailedBlockSendTrackerTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/FailedBlockSendTrackerTest.java @@ -18,6 +18,7 @@ package org.apache.uniffle.client.impl; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -30,6 +31,8 @@ import org.apache.uniffle.common.rpc.StatusCode; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class FailedBlockSendTrackerTest { @Test @@ -72,4 +75,30 @@ public void test() throws Exception { return CollectionUtils.isEqualCollection(expected, actual); }); } + + @Test + public void getFailedBlockIdsShouldReturnSnapshot() { + FailedBlockSendTracker tracker = new FailedBlockSendTracker(); + ShuffleServerInfo shuffleServerInfo = new ShuffleServerInfo("host1", 19999); + ShuffleBlockInfo shuffleBlockInfo = + new ShuffleBlockInfo( + 0, + 0, + 1L, + 0, + 0L, + new byte[] {}, + Lists.newArrayList(shuffleServerInfo), + 0, + 0L, + 0L); + + tracker.add(shuffleBlockInfo, shuffleServerInfo, StatusCode.INTERNAL_ERROR); + + Set failedBlockIds = tracker.getFailedBlockIds(); + tracker.remove(shuffleBlockInfo.getBlockId()); + + assertEquals(1, failedBlockIds.size()); + assertTrue(tracker.getFailedBlockStatus(shuffleBlockInfo.getBlockId()).isEmpty()); + } }