-
Notifications
You must be signed in to change notification settings - Fork 447
[CELEBORN-2362] Fix Flaky CI/CD #3737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4f0d470
0a98f5f
7620017
d8be0ef
afeb8ce
fb95ba1
bdc236a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
|
|
@@ -57,6 +58,7 @@ public abstract class ShuffleClient { | |
| private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class); | ||
| private static volatile ShuffleClient _instance; | ||
| private static volatile boolean initialized = false; | ||
| private static volatile String _appUniqueId; | ||
| private static volatile Map<StorageInfo.Type, FileSystem> hadoopFs; | ||
| private static LongAdder totalReadCounter = new LongAdder(); | ||
| private static LongAdder localShuffleReadCounter = new LongAdder(); | ||
|
|
@@ -69,6 +71,7 @@ public abstract class ShuffleClient { | |
| public static void reset() { | ||
| _instance = null; | ||
| initialized = false; | ||
| _appUniqueId = null; | ||
| hadoopFs = null; | ||
| } | ||
|
|
||
|
|
@@ -90,7 +93,7 @@ public static ShuffleClient get( | |
| CelebornConf conf, | ||
| UserIdentifier userIdentifier, | ||
| byte[] extension) { | ||
| if (null == _instance || !initialized) { | ||
| if (null == _instance || !initialized || !Objects.equals(appUniqueId, _appUniqueId)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The in-lock
B (the live app) then drives its shuffle against A's Fix: return the instance captured under the lock (each branch assigns a local |
||
| synchronized (ShuffleClient.class) { | ||
| if (null == _instance) { | ||
| // During the execution of Spark tasks, each task may be interrupted due to speculative | ||
|
|
@@ -102,14 +105,33 @@ public static ShuffleClient get( | |
| _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier); | ||
| _instance.setupLifecycleManagerRef(driverHost, port); | ||
| _instance.setExtension(extension); | ||
| _appUniqueId = appUniqueId; | ||
| initialized = true; | ||
| } else if (!initialized) { | ||
| _instance.shutdown(); | ||
| _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier); | ||
| _instance.setupLifecycleManagerRef(driverHost, port); | ||
| _instance.setExtension(extension); | ||
| _appUniqueId = appUniqueId; | ||
| initialized = true; | ||
| } else if (!Objects.equals(appUniqueId, _appUniqueId)) { | ||
| // Do NOT shutdown() the old _instance. Callers cache the reference returned by get(), | ||
| // and shutdown() is an immediate teardown that would terminate the RpcEnv/pools still in | ||
| // use, causing RejectedExecutionException. Teardown is owned by stop()->shutdown(). The | ||
| // orphan is bounded (one per appUniqueId) and unreachable in normal single-app JVMs. | ||
| // The spark-it suite runs multiple apps in one reused JVM with overlapping lifecycles, so | ||
| // a shutdown() here tears down an instance still in use by the previous app and fails. | ||
| ShuffleClientImpl newInstance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. branch-3 replaces The comment calls the orphan "bounded (one per appUniqueId)", but under two apps alternately calling |
||
| newInstance.setupLifecycleManagerRef(driverHost, port); | ||
| newInstance.setExtension(extension); | ||
| // Publish _instance before _appUniqueId. The outer guard reads both volatiles without | ||
| // holding the lock, so writing _appUniqueId first would let another thread observe the | ||
| // new id while _instance is still stale and return the old instance. | ||
| _instance = newInstance; | ||
| _appUniqueId = appUniqueId; | ||
| initialized = true; | ||
| } | ||
| return _instance; | ||
| } | ||
| } | ||
| return _instance; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Three behavior changes in this rewrite worth a second look:
rm -f "${local_tarball}"at the top of every attempt drops the old "reuse an already-downloaded tarball" path. For an offline/air-gapped build that pre-stages the tarball (but not the extracted dir), this turns a working build into a hard failure. (The extracted-binary short-circuit at the top ofinstall_appstill works.)elifwget means when curl is present, wget is never tried across all 3 attempts. The old script fell through to wget when curl produced no file; a curl-specific failure (proxy/TLS/CA) that wget would survive now fails outright.install_mvn, sinceAPACHE_MIRRORnow defaults toarchive.apache.org, the "fall back to archive" block (124) is dead code for the default config (the%/compare is equal → straight toexit 2). Minor:sleep 3(82) also runs after the final failed attempt before giving up.