Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 67 additions & 29 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ install_app() {
local remote_tarball="$1/$2$4"
local local_tarball="${_DIR}/$2"
local binary="${_DIR}/$3"
local max_attempts=3

# setup `curl` and `wget` silent options if we're running on Jenkins
local curl_opts="-L"
Expand All @@ -46,23 +47,44 @@ install_app() {
wget_opts="--progress=bar:force ${wget_opts}"

if [ -z "$3" -o ! -f "$binary" ]; then
# check if we already have the tarball
# check if we have curl installed
# download application
[ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 && \
curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
# if the file still doesn't exist, lets try `wget` and cross our fingers
[ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2 && \
wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
# if both were unsuccessful, exit
[ ! -f "${local_tarball}" ] && \
echo -n "ERROR: Cannot download $2 with cURL or wget; " && \
echo "please install manually and try again." && \
exit 2
cd "${_DIR}" && tar -xzf "$2"
rm -rf "$local_tarball"
local attempt=1
while [ "${attempt}" -le "${max_attempts}" ]; do
# remove any partial/corrupt download left over from a previous attempt
rm -f "${local_tarball}"

# download application with `curl`, falling back to `wget`
if command -v curl >/dev/null 2>&1; then
echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2
curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
elif command -v wget >/dev/null 2>&1; then

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three behavior changes in this rewrite worth a second look:

  • rm -f "${local_tarball}" at the top of every attempt drops the old "reuse an already-downloaded tarball" path. For an offline/air-gapped build that pre-stages the tarball (but not the extracted dir), this turns a working build into a hard failure. (The extracted-binary short-circuit at the top of install_app still works.)
  • curl elif wget means when curl is present, wget is never tried across all 3 attempts. The old script fell through to wget when curl produced no file; a curl-specific failure (proxy/TLS/CA) that wget would survive now fails outright.
  • In install_mvn, since APACHE_MIRROR now defaults to archive.apache.org, the "fall back to archive" block (124) is dead code for the default config (the %/ compare is equal → straight to exit 2). Minor: sleep 3 (82) also runs after the final failed attempt before giving up.

echo "exec: wget ${wget_opts} ${remote_tarball}" 1>&2
wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
else
echo "ERROR: Cannot download $2: neither cURL nor wget is installed." 1>&2
exit 2
fi

# Validate the download before trusting it. A flaky Apache mirror can
# return an HTML page (mirror chooser / error) with HTTP 200, which is
# not a gzip tarball; extracting it later would fail with a confusing
# exit code. `tar -tzf` lists the archive without extracting and
# exits non-zero on a non-tarball body.
if [ -f "${local_tarball}" ] && tar -tzf "${local_tarball}" >/dev/null 2>&1; then
if cd "${_DIR}" && tar -xzf "$2"; then
rm -rf "${local_tarball}"
return 0
fi
fi

echo "WARN: Download of $2 from $1 was not a valid tarball" \
"(attempt ${attempt}/${max_attempts}); retrying..." 1>&2
attempt=$((attempt + 1))
sleep 3
done

rm -f "${local_tarball}"
echo "WARN: Failed to download a valid $2 from $1 after ${max_attempts} attempts." 1>&2
return 1
fi
}

Expand All @@ -77,25 +99,41 @@ install_mvn() {
# See simple version normalization: http://stackoverflow.com/questions/16989598/bash-comparing-version-numbers
function version { echo "$@" | awk -F. '{ printf("%03d%03d%03d\n", $1,$2,$3); }'; }
if [ $(version $MVN_DETECTED_VERSION) -ne $(version $MVN_VERSION) ]; then
local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua'}
local MIRROR_URL_QUERY="?action=download"
# Default to archive.apache.org: it serves the exact tarball
# deterministically, avoiding the closer.lua mirror redirector which
# intermittently routes to a mirror that returns an HTML page instead of
# the binary. Override with APACHE_MIRROR to use a closer mirror.
local APACHE_MIRROR=${APACHE_MIRROR:-'https://archive.apache.org/dist'}
local MVN_TARBALL="apache-maven-${MVN_VERSION}-bin.tar.gz"
local FILE_PATH="maven/maven-3/${MVN_VERSION}/binaries"

if [ $(command -v curl) ]; then
if ! curl -L --output /dev/null --silent --head --fail "${APACHE_MIRROR}/${FILE_PATH}/${MVN_TARBALL}${MIRROR_URL_QUERY}" ; then
# Fall back to archive.apache.org for older Maven
echo "Falling back to archive.apache.org to download Maven"
APACHE_MIRROR="https://archive.apache.org/dist"
MIRROR_URL_QUERY=""
fi
fi
# closer.lua needs the ?action=download query to redirect to a mirror;
# archive.apache.org and most plain mirrors serve the file directly.
local MIRROR_URL_QUERY=""
case "${APACHE_MIRROR}" in
*closer.lua*) MIRROR_URL_QUERY="?action=download" ;;
esac

install_app \
if ! install_app \
"${APACHE_MIRROR}/${FILE_PATH}" \
"${MVN_TARBALL}" \
"apache-maven-${MVN_VERSION}/bin/mvn" \
"${MIRROR_URL_QUERY}"
"${MIRROR_URL_QUERY}"; then
# Last resort: fall back to archive.apache.org, which serves the exact
# tarball deterministically. Skip if it was already the chosen mirror.
local ARCHIVE_MIRROR="https://archive.apache.org/dist"
if [ "${APACHE_MIRROR%/}" != "${ARCHIVE_MIRROR}" ]; then
echo "WARN: falling back to ${ARCHIVE_MIRROR} to download Maven" 1>&2
install_app \
"${ARCHIVE_MIRROR}/${FILE_PATH}" \
"${MVN_TARBALL}" \
"apache-maven-${MVN_VERSION}/bin/mvn" \
"" || { echo "ERROR: Failed to download Maven; please install manually." 1>&2; exit 2; }
else
echo "ERROR: Failed to download Maven; please install manually." 1>&2
exit 2
fi
fi

MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -57,6 +58,7 @@ public abstract class ShuffleClient {
private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
private static volatile ShuffleClient _instance;
private static volatile boolean initialized = false;
private static volatile String _appUniqueId;
private static volatile Map<StorageInfo.Type, FileSystem> hadoopFs;
private static LongAdder totalReadCounter = new LongAdder();
private static LongAdder localShuffleReadCounter = new LongAdder();
Expand All @@ -69,6 +71,7 @@ public abstract class ShuffleClient {
public static void reset() {
_instance = null;
initialized = false;
_appUniqueId = null;
hadoopFs = null;
}

Expand All @@ -90,7 +93,7 @@ public static ShuffleClient get(
CelebornConf conf,
UserIdentifier userIdentifier,
byte[] extension) {
if (null == _instance || !initialized) {
if (null == _instance || !initialized || !Objects.equals(appUniqueId, _appUniqueId)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() is now app-aware, but the lock-free fast path still has a TOCTOU that can hand back the wrong app's client.

The in-lock return _instance (134) is safe, but the fast-path return _instance (137) reads the static volatile without the lock. The publish-order comment (127-129) only closes the new-id + stale-instance direction; writing _instance (130) before _appUniqueId (131) opens the reverse window:

  • Thread A is in branch-3 for app A: it has run _instance = A (130) but not yet _appUniqueId = A (131), so _appUniqueId is still B.
  • Concurrent thread B calls get(B): the guard here sees _appUniqueId == BObjects.equals(B,B) true → guard false → B skips the lock and return _instance (137) returns A's client.

B (the live app) then drives its shuffle against A's LifecycleManager → ArrayIndexOutOfBounds / CommitMetadata CRC mismatch, i.e. the cross-app corruption this PR is fixing. Only reachable in the multi-app spark-it JVM (prod uses one appUniqueId per JVM, so branch-3 never fires there) — but that is exactly the scenario branch-3 was added for.

Fix: return the instance captured under the lock (each branch assigns a local result, then return result;), or hold (appId, client) in a single AtomicReference so id and instance can't be read torn.

synchronized (ShuffleClient.class) {
if (null == _instance) {
// During the execution of Spark tasks, each task may be interrupted due to speculative
Expand All @@ -102,14 +105,33 @@ public static ShuffleClient get(
_instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
_instance.setupLifecycleManagerRef(driverHost, port);
_instance.setExtension(extension);
_appUniqueId = appUniqueId;
initialized = true;
} else if (!initialized) {
_instance.shutdown();
_instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
_instance.setupLifecycleManagerRef(driverHost, port);
_instance.setExtension(extension);
_appUniqueId = appUniqueId;
initialized = true;
} else if (!Objects.equals(appUniqueId, _appUniqueId)) {
// Do NOT shutdown() the old _instance. Callers cache the reference returned by get(),
// and shutdown() is an immediate teardown that would terminate the RpcEnv/pools still in
// use, causing RejectedExecutionException. Teardown is owned by stop()->shutdown(). The
// orphan is bounded (one per appUniqueId) and unreachable in normal single-app JVMs.
// The spark-it suite runs multiple apps in one reused JVM with overlapping lifecycles, so
// a shutdown() here tears down an instance still in use by the previous app and fails.
ShuffleClientImpl newInstance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

branch-3 replaces _instance without shutdown()-ing the old one, so its RpcEnv (bound port + dispatcher threads), Netty dataClientFactory, pushDataRetryPool and reviveManager leak until JVM exit.

The comment calls the orphan "bounded (one per appUniqueId)", but under two apps alternately calling get(), every call observes a mismatch and rebuilds — so it's bounded by the number of A↔B switches, not by app count. reset() only nulls the reference (no shutdown()), so these are never reclaimed there either. In the long-lived serial spark-it JVM this accumulates threads/FDs/ports, feeding the same contention this PR lowers worker counts to fight. If teardown of an in-use instance is the concern, that's a signal the static singleton shouldn't be shared across apps at all (per-app map / injected client).

newInstance.setupLifecycleManagerRef(driverHost, port);
newInstance.setExtension(extension);
// Publish _instance before _appUniqueId. The outer guard reads both volatiles without
// holding the lock, so writing _appUniqueId first would let another thread observe the
// new id while _instance is still stale and return the old instance.
_instance = newInstance;
_appUniqueId = appUniqueId;
initialized = true;
}
return _instance;
}
}
return _instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,10 @@ Map<Integer, Integer> reviveBatch(
StatusCode statusCode = entry.getValue()._1();
if (entry.getValue()._2() != null) {
PartitionLocation oldLoc = oldLocMap.get(partitionId);
// Currently, revive only check if main location available, here won't remove peer loc.
pushExcludedWorkers.remove(oldLoc.hostAndPushPort());
if (oldLoc != null) {
// Currently, revive only check if main location available, here won't remove peer loc.
pushExcludedWorkers.remove(oldLoc.hostAndPushPort());
}
}

if (StatusCode.SUCCESS == statusCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {

protected val celebornConf: CelebornConf = new CelebornConf()

protected val APP = "app-1"
protected var APP: String = _
protected val userIdentifier: UserIdentifier = UserIdentifier("mock", "mock")
private val numMappers = 8
private val mapId = 1
Expand All @@ -49,6 +49,11 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
_shuffleId
}

override protected def beforeEach(): Unit = {
super.beforeEach()
APP = s"app-${java.util.UUID.randomUUID()}"
}

override protected def afterEach() {
if (lifecycleManager != null) {
lifecycleManager.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ object Utils extends Logging {
ScalaRandom.nextInt(until - 1 - from) + from
}

val MAX_SELECTABLE_PORT = 32768

def selectRandomPort(): Int = selectRandomInt(1024, MAX_SELECTABLE_PORT)

def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ public static void resetRaftServer(

while (!serversStarted) {
try {
// Re-point each server to a fresh storage directory on retry. Ratis releases the storage
// directory lock asynchronously on close(), so a failed attempt (e.g. a random ratis port
// collision) can leave the previous directory locked. Reusing the same directory on retry
// then fails with "directory is already locked"; allocating a clean directory each time
// avoids contending for a lock that has not been released yet. Skip this on the first
// attempt: callers already configure a fresh directory when building conf1/2/3, so
// reconfiguring here would orphan that just-created (empty) directory.
if (retryCount > 0) {
configureServerConf(conf1, 1);
configureServerConf(conf2, 2);
configureServerConf(conf3, 3);
}

STATUSSYSTEM1 = new HAMasterMetaManager(mockRpcEnv, conf1);
STATUSSYSTEM2 = new HAMasterMetaManager(mockRpcEnv, conf2);
STATUSSYSTEM3 = new HAMasterMetaManager(mockRpcEnv, conf3);
Expand All @@ -131,7 +144,8 @@ public static void resetRaftServer(
String id2 = UUID.randomUUID().toString();
String id3 = UUID.randomUUID().toString();

int ratisPort1 = Utils$.MODULE$.selectRandomInt(1024, 65535);
int ratisPort1 =
Utils$.MODULE$.selectRandomInt(1024, Utils$.MODULE$.MAX_SELECTABLE_PORT() - 2);
int ratisPort2 = ratisPort1 + 1;
int ratisPort3 = ratisPort2 + 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait MasterClusterFeature extends Logging {
}
}
def selectRandomPort(): Int = synchronized {
val port = Utils.selectRandomInt(1024, 65535)
val port = Utils.selectRandomPort()
val portUsed = usedPorts.contains(port) || portBounded(port)
usedPorts.add(port)
if (portUsed) {
Expand Down
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,24 @@
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<!--
Netty 4.2 fires JFR events (e.g. io.netty.buffer.FreeBufferEvent,
FreeChunkEvent) by lazily loading event classes that extend
jdk.jfr.Event. Those classes are already bytecode-enhanced, so
JaCoCo's on-the-fly agent rejects them with
"Cannot process instrumented class ..." and throws an
IllegalClassFormatException from defineClass. When this happens on
a Netty I/O thread mid-write (e.g. during buffer deallocation while
the worker streams shuffle data to the Flink read client), the
channel is torn down and the client sees "Client is lost", flaking
the Flink integration tests. Netty is a third-party dependency we do
not measure coverage for, so exclude it from instrumentation.
-->
<excludes>
<exclude>io/netty/**</exclude>
</excludes>
</configuration>
</execution>
<execution>
<id>report</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.JobType
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.time.SpanSugar._

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
Expand Down Expand Up @@ -186,12 +188,18 @@ class HybridShuffleWordCountTest extends AnyFunSuite with Logging with MiniClust
}

private def checkFlushingFileLength(): Unit = {
workers.map(worker => {
worker.storageManager.workingDirWriters.values().asScala.map(writers => {
writers.forEach((fileName, fileWriter) => {
assert(new File(fileName).length() == fileWriter.getDiskFileInfo.getFileLength)
// getDiskFileInfo.getFileLength is the logical byte count accounted as data is written, while
// the physical file is grown asynchronously by the LocalFlusher. Right after the job finishes
// the flusher may not have drained the last buffers yet, so the on-disk length can lag (briefly
// even 0). Wait for the flush to catch up before asserting equality instead of reading mid-flush.
eventually(timeout(30.seconds), interval(500.milliseconds)) {
workers.map(worker => {
worker.storageManager.workingDirWriters.values().asScala.map(writers => {
writers.forEach((fileName, fileWriter) => {
assert(new File(fileName).length() == fileWriter.getDiskFileInfo.getFileLength)
})
})
})
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ChangePartitionManagerUpdateWorkersSuite extends WithShuffleClientSuite
}

override def beforeEach(): Unit = {
super.beforeEach()
val testConf = Map(
s"${CelebornConf.CLIENT_PUSH_MAX_REVIVE_TIMES.key}" -> "3")
val (master, _) = setupMiniClusterWithRandomPorts(testConf, testConf, workerNum = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class LifecycleManagerUnregisterShuffleSuite extends WithShuffleClientSuite
celebornConf
.set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
// The default expired-check interval is 60s. removeExpiredShuffle only
// unregisters a shuffle once `unregisterTime < now - checkInterval` and runs
// on a fixed-rate timer at that interval, so with 60s the master side cannot
// be cleared until the second tick (~120s) -- exactly the eventually() window
// below, leaving no margin and no retry if an RPC briefly fails under load.
// Use a short interval so the unregister runs promptly with ample retries.
.set(CelebornConf.SHUFFLE_EXPIRED_CHECK_INTERVAL.key, "5s")

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.SparkSession
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.worker.Worker
Expand All @@ -43,10 +42,11 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
}

override def beforeEach(): Unit = {
ShuffleClient.reset()
stopActiveSparkSessions()
}

override def afterEach(): Unit = {
stopActiveSparkSessions()
System.gc()
}

Expand All @@ -59,7 +59,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
val combineResult = combine(sparkSession)
val groupByResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
sparkSession.stop()
stopActiveSparkSessions()

val sparkSessionEnableCeleborn = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ShuffleFallbackSuite extends AnyFunSuite
}

test(s"celeborn spark integration test - fallback") {
setupMiniClusterWithRandomPorts(workerNum = 5)
setupMiniClusterWithRandomPorts(workerNum = 3)
val sparkConf = new SparkConf().setAppName("celeborn-demo")
.setMaster("local[2]")
.set(s"spark.${CelebornConf.SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED.key}", "true")
Expand Down
Loading
Loading