[VL] Use Velox TaskCursor infrastructure#12302
Conversation
Replace Task-based execution with TaskCursor in WholeStageResultIterator to enable future parallel execution support. Add ThreadInitializer and ThreadManager abstractions to manage native thread lifecycle, plumbed through the Runtime layer. This is a pure refactoring with no behavioral change — serial execution is still the default. Includes: - New ThreadInitializer / ThreadManager classes in cpp/core/threads/ - SparkThreadInitializer for JNI thread callbacks - VeloxThreadManager backend implementation - TaskCursor-based execution replacing direct Task management - NumTaskSlotsPerExecutor config wired for IO threads default - HookedExecutor plumbed with ThreadInitializer Co-Authored-By: Claude <noreply@anthropic.com>
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Refactors Gluten’s Velox backend runtime and execution to use Velox TaskCursor and introduces ThreadInitializer/ThreadManager abstractions to manage native thread lifecycle (JNI/Spark task context propagation), preparing for future parallel execution capabilities.
Changes:
- Replaces direct Velox Task driving with TaskCursor in
WholeStageResultIterator. - Introduces cross-language thread lifecycle plumbing (Scala/Java JNI wrappers + C++ ThreadInitializer/ThreadManager + Velox implementation).
- Wires
spark.gluten.numTaskSlotsPerExecutoras the default for Velox IO thread configuration.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala | Adjusts default config mapping toward numTaskSlotsPerExecutor driving IO threads. |
| gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala | Improves error context when not in a Spark task. |
| gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala | Adds per-task Scala wrapper around native ThreadManager handle with TaskResources integration. |
| gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala | Plumbs ThreadManager handle into runtime creation and registers it as a task resource. |
| gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java | Adds Java-side Spark TaskContext propagation for native worker threads. |
| gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java | Adds JNI bridge for creating/releasing native ThreadManager. |
| gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java | Defines Java callback interface for native thread lifecycle events. |
| gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java | Extends runtime JNI factory to accept ThreadManager handle. |
| cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc | Includes Velox backend header for updated backend/runtime APIs. |
| cpp/velox/tests/RuntimeTest.cc | Updates tests for Runtime factory signature and ThreadManager lifecycle. |
| cpp/velox/operators/plannodes/CudfVectorStream.h | Forces single-thread execution for a node to avoid non-thread-safe iterator use. |
| cpp/velox/jni/VeloxJniWrapper.cc | Switches parallel join build executor selection to IO executor. |
| cpp/velox/config/VeloxConfig.h | Adds config key for numTaskSlotsPerExecutor (used as IO thread default). |
| cpp/velox/compute/WholeStageResultIterator.h | Stores TaskCursor and task raw pointer; updates cleanup logic. |
| cpp/velox/compute/WholeStageResultIterator.cc | Implements TaskCursor-based execution and updates split/metrics traversal logic. |
| cpp/velox/compute/VeloxRuntime.h | Extends constructor signature to accept ThreadManager. |
| cpp/velox/compute/VeloxRuntime.cc | Hooks executors with ThreadInitializer initialize/destroy callbacks. |
| cpp/velox/compute/VeloxBackend.cc | Registers ThreadManager factory; uses numTaskSlotsPerExecutor as IO thread default. |
| cpp/velox/benchmarks/ParquetWriteBenchmark.cc | Creates/releases ThreadManager for benchmark runtime creation. |
| cpp/velox/benchmarks/GenericBenchmark.cc | Updates benchmark runtime factory and ensures ThreadManager is released. |
| cpp/core/threads/ThreadManager.h | Introduces ThreadManager interface and registry plumbing. |
| cpp/core/threads/ThreadManager.cc | Implements ThreadManager registry-based factory/releaser wiring. |
| cpp/core/threads/ThreadInitializer.h | Introduces ThreadInitializer interface (noop factory). |
| cpp/core/threads/ThreadInitializer.cc | Implements noop ThreadInitializer. |
| cpp/core/jni/JniWrapper.cc | Plumbs ThreadManager through runtime JNI create; adds ThreadManager JNI create/release. |
| cpp/core/jni/JniCommon.h | Adds SparkThreadInitializer (C++ -> Java NativeThreadInitializer bridge). |
| cpp/core/jni/JniCommon.cc | Includes thread name header and minor cleanup. |
| cpp/core/compute/Runtime.h | Extends Runtime to store ThreadManager and updates factory signature. |
| cpp/core/compute/Runtime.cc | Updates Runtime::create to forward ThreadManager. |
| cpp/core/CMakeLists.txt | Adds new threads sources to build. |
| backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java | Updates test config wiring to include numTaskSlotsPerExecutor and new VeloxConfig reference. |
| backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala | Fixes misleading comment about union metrics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
I was trying to understand more on the logic for lifecycle management on the threadmanage. hookedexecutor instance that is required to be released before the threadmanager, how can web ensure that?
Cc: @rui-mo on the io thread related change
| protected: | ||
| std::string kind_; | ||
| MemoryManager* memoryManager_; | ||
| ThreadManager* threadManager_; |
There was a problem hiding this comment.
was trying to understand the lifetime for this threadManager_: initially we run into some leakage from VeloxRuntime and we need to careful on destructor maybe we could change this to unique_ptr so it will be released when runtime destruct?
There was a problem hiding this comment.
Good point. The destruction is guaranteed to be right earlier than destruction of memory manager, by its priority settings:
initially we run into some leakage from VeloxRuntime and we need to careful on destructor
I was aware of this, and this kind of leakage was fundamentally fixed at once by #11882, specifically, by the introduction of HookedExecutor. HookedExecutor can reuse OS thread resources, while it guarantees no background threads are running before we end the Velox task. Precondition is, Velox should keep avoiding running threads off the thread pools we pass to it, and this rule is unlikely to be broken by Velox shortly.
| final SparkConf conf = new SparkConf(); | ||
| conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g"); | ||
| conf.set(VeloxConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0"); | ||
| conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR().key(), "1"); |
There was a problem hiding this comment.
it seems this value is set to -1 on scala side
There was a problem hiding this comment.
It's finally set here:
|
Run Gluten Clickhouse CI on x86 |
Replace Gluten's task execution code with Velox's TaskCursor in WholeStageResultIterator, in preparation for future parallel execution support. Add ThreadInitializer and ThreadManager abstractions to manage native thread lifecycle, plumbed through the Runtime layer. This is a pure refactoring with no behavioral change.
Includes:
PR authored by me and split from #11852 by AI