Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ object MetricsUtil extends Logging {
}
ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams)
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
// Union outputs two suites of metrics respectively.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.gluten.test;

import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.config.VeloxConfig$;
import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.config.VeloxConfig;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -72,7 +73,8 @@ public Object ask(Object message) throws Exception {
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
conf.set(VeloxConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR().key(), "1");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this value is set to -1 on scala side

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's finally set here:

conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR, taskSlots)

conf.set(VeloxConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
return conf;
}
}
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/CHColumnToSparkRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class BackingDataLengthCalculator
explicit BackingDataLengthCalculator(const DB::DataTypePtr & type_);
virtual ~BackingDataLengthCalculator() = default;

/// Return length is guranteed to round up to 8
/// Return length is guaranteed to round up to 8
virtual int64_t calculate(const DB::Field & field) const;

static int64_t getArrayElementSize(const DB::DataTypePtr & nested_type);
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list

if (read_rel_parser->isReadRelFromJavaIter(read))
{
/// If read from java iter, local_files is guranteed to be set in read rel.
/// If read from java iter, local_files is guaranteed to be set in read rel.
auto iter = read.local_files().items().at(0).uri_file();
auto pos = iter.find(':');
auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
memory/MemoryManager.cc
memory/ArrowMemoryPool.cc
memory/ColumnarBatch.cc
threads/ThreadInitializer.cc
threads/ThreadManager.cc
shuffle/Dictionary.cc
shuffle/FallbackRangePartitioner.cc
shuffle/HashPartitioner.cc
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/compute/Runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ void Runtime::registerFactory(const std::string& kind, Runtime::Factory factory,
Runtime* Runtime::create(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
auto& factory = runtimeFactories().get(kind);
return factory(kind, std::move(memoryManager), sessionConf);
return factory(kind, memoryManager, threadManager, sessionConf);
}

void Runtime::release(Runtime* runtime) {
Expand Down
11 changes: 10 additions & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
#include "threads/ThreadManager.h"
#include "utils/ObjectStore.h"
#include "utils/WholeStageDumper.h"

Expand Down Expand Up @@ -66,12 +67,14 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
using Factory = std::function<Runtime*(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf)>;
using Releaser = std::function<void(Runtime*)>;
static void registerFactory(const std::string& kind, Factory factory, Releaser releaser);
static Runtime* create(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);
static std::optional<std::string>* localWriteFilesTempPath();
Expand All @@ -80,8 +83,9 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
Runtime(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& confMap)
: kind_(kind), memoryManager_(memoryManager), confMap_(confMap) {}
: kind_(kind), memoryManager_(memoryManager), threadManager_(threadManager), confMap_(confMap) {}

virtual ~Runtime() = default;

Expand Down Expand Up @@ -131,6 +135,10 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
return memoryManager_;
};

virtual ThreadManager* threadManager() {
return threadManager_;
};

/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
virtual std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) {
Expand Down Expand Up @@ -189,6 +197,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
protected:
std::string kind_;
MemoryManager* memoryManager_;
ThreadManager* threadManager_;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was trying to understand the lifetime for this threadManager_: initially we run into some leakage from VeloxRuntime and we need to careful on destructor maybe we could change this to unique_ptr so it will be released when runtime destruct?

@zhztheplayer zhztheplayer Jun 17, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The destruction is guaranteed to be right earlier than destruction of memory manager, by its priority settings:

https://github.com/zhztheplayer/gluten/blob/312f45bf71dcb68d601e39812df30ddca4a90c8f/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala#L57

initially we run into some leakage from VeloxRuntime and we need to careful on destructor

I was aware of this, and this kind of leakage was fundamentally fixed at once by #11882, specifically, by the introduction of HookedExecutor. HookedExecutor can reuse OS thread resources, while it guarantees no background threads are running before we end the Velox task. Precondition is, Velox should keep avoiding running threads off the thread pools we pass to it, and this rule is unlikely to be broken by Velox shortly.

std::unique_ptr<ObjectStore> objStore_ = ObjectStore::create();
std::unordered_map<std::string, std::string> confMap_; // Session conf map

Expand Down
2 changes: 1 addition & 1 deletion cpp/core/jni/JniCommon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "JniCommon.h"
#include <folly/system/ThreadName.h>

void gluten::JniCommonState::ensureInitialized(JNIEnv* env) {
std::lock_guard<std::mutex> lockGuard(mtx_);
Expand Down Expand Up @@ -120,7 +121,6 @@ std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next()
std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::nextInternal() const {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);

if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
Expand Down
84 changes: 84 additions & 0 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "compute/Runtime.h"
#include "memory/AllocationListener.h"
#include "shuffle/rss/RssClient.h"
#include "threads/ThreadInitializer.h"
#include "utils/Compression.h"
#include "utils/Exception.h"
#include "utils/ResourceMap.h"
Expand Down Expand Up @@ -549,3 +550,86 @@ class JavaRssClient : public RssClient {
jmethodID javaPushPartitionData_;
jbyteArray array_;
};

/// Bridges gluten::ThreadInitializer callbacks to a Java-side
/// NativeThreadInitializer instance via JNI.
///
/// On initialize(), attaches the current native thread to the JVM and calls
/// into the Java initializer so it can install Spark TaskContext or other
/// task-local state. On destroy(), calls the Java destroy() method but
/// does NOT detach the thread — the underlying JVM thread may be reused by
/// the pool, and detaching prematurely could allow the Java Thread object
/// to be garbage-collected.
class SparkThreadInitializer final : public gluten::ThreadInitializer {
public:
/// @param vm The JavaVM pointer from JNI_OnLoad.
/// @param jInitializerLocalRef A local reference to a Java object
/// implementing org.apache.gluten.threads.NativeThreadInitializer.
/// A global reference is created internally.
SparkThreadInitializer(JavaVM* vm, jobject jInitializerLocalRef) : vm_(vm) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jInitializerGlobalRef_ = env->NewGlobalRef(jInitializerLocalRef);
GLUTEN_CHECK(jInitializerGlobalRef_ != nullptr, "Failed to create global reference for native thread initializer.");
(void)initializeMethod(env);
}

SparkThreadInitializer(const SparkThreadInitializer&) = delete;
SparkThreadInitializer(SparkThreadInitializer&&) = delete;
SparkThreadInitializer& operator=(const SparkThreadInitializer&) = delete;
SparkThreadInitializer& operator=(SparkThreadInitializer&&) = delete;

~SparkThreadInitializer() override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
LOG(WARNING) << "SparkThreadInitializer#~SparkThreadInitializer(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(jInitializerGlobalRef_);
}
Comment thread
zhztheplayer marked this conversation as resolved.

void initialize(const std::string& threadName) override {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jstring jThreadName = env->NewStringUTF(threadName.c_str());
env->CallVoidMethod(jInitializerGlobalRef_, initializeMethod(env), jThreadName);
env->DeleteLocalRef(jThreadName);
checkException(env);
}

void destroy(const std::string& threadName) override {
// IMPORTANT: Do not call vm_.DetachCurrentThread here, otherwise Java side thread
// object might be dereferenced and garbage-collected, to break the reuse of thread
// resources.
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jstring jThreadName = env->NewStringUTF(threadName.c_str());
env->CallVoidMethod(jInitializerGlobalRef_, destroyMethod(env), jThreadName);
env->DeleteLocalRef(jThreadName);
checkException(env);
}

private:
jmethodID initializeMethod(JNIEnv* env) {
static jmethodID initializeMethod =
getMethodIdOrError(env, nativeThreadInitializerClass(env), "initialize", "(Ljava/lang/String;)V");
return initializeMethod;
}

jmethodID destroyMethod(JNIEnv* env) {
static jmethodID destroyMethod =
getMethodIdOrError(env, nativeThreadInitializerClass(env), "destroy", "(Ljava/lang/String;)V");
return destroyMethod;
}

jclass nativeThreadInitializerClass(JNIEnv* env) {
static jclass javaInitializerClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/threads/NativeThreadInitializer;");
return javaInitializerClass;
}
Comment thread
zhztheplayer marked this conversation as resolved.

private:
JavaVM* vm_;
jobject jInitializerGlobalRef_;
};
59 changes: 56 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ class InternalRuntime : public Runtime {
InternalRuntime(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& confMap)
: Runtime(kind, memoryManager, confMap) {}
: Runtime(kind, memoryManager, threadManager, confMap) {}
};

MemoryManager* internalMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) {
Expand All @@ -194,11 +195,33 @@ void internalMemoryManagerReleaser(MemoryManager* memoryManager) {
delete memoryManager;
}

class InternalThreadManager : public ThreadManager {
public:
InternalThreadManager(const std::string& kind, std::unique_ptr<ThreadInitializer> initializer)
: ThreadManager(kind), initializer_(std::shared_ptr<ThreadInitializer>(std::move(initializer))) {}

ThreadInitializer* getThreadInitializer() override {
return initializer_.get();
}

private:
std::shared_ptr<ThreadInitializer> initializer_;
};

ThreadManager* internalThreadManagerFactory(const std::string& kind, std::unique_ptr<ThreadInitializer> initializer) {
return new InternalThreadManager(kind, std::move(initializer));
}

void internalThreadManagerReleaser(ThreadManager* threadManager) {
delete threadManager;
}

Runtime* internalRuntimeFactory(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
return new InternalRuntime(kind, memoryManager, sessionConf);
return new InternalRuntime(kind, memoryManager, threadManager, sessionConf);
}

void internalRuntimeReleaser(Runtime* runtime) {
Expand Down Expand Up @@ -253,6 +276,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
getJniErrorState()->ensureInitialized(env);

MemoryManager::registerFactory(kInternalBackendKind, internalMemoryManagerFactory, internalMemoryManagerReleaser);
ThreadManager::registerFactory(kInternalBackendKind, internalThreadManagerFactory, internalThreadManagerReleaser);
Runtime::registerFactory(kInternalBackendKind, internalRuntimeFactory, internalRuntimeReleaser);

byteArrayClass = createGlobalClassReferenceOrError(env, "[B");
Expand Down Expand Up @@ -320,14 +344,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR
jclass,
jstring jBackendType,
jlong nmmHandle,
jlong ntmHandle,
jbyteArray sessionConf) {
JNI_METHOD_START
MemoryManager* memoryManager = jniCastOrThrow<MemoryManager>(nmmHandle);
ThreadManager* threadManager = jniCastOrThrow<ThreadManager>(ntmHandle);
auto safeArray = getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
auto backendType = jStringToCString(env, jBackendType);

auto runtime = Runtime::create(backendType, memoryManager, sparkConf);
auto runtime = Runtime::create(backendType, memoryManager, threadManager, sparkConf);
return reinterpret_cast<jlong>(runtime);
JNI_METHOD_END(kInvalidObjectHandle)
}
Expand Down Expand Up @@ -371,6 +397,33 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap
JNI_METHOD_END(-1L)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_create( // NOLINT
JNIEnv* env,
jclass,
jstring jBackendType,
jobject jInitializer) {
JNI_METHOD_START
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
throw GlutenException("Unable to get JavaVM instance");
}
auto backendType = jStringToCString(env, jBackendType);
std::unique_ptr<ThreadInitializer> initializer = std::make_unique<SparkThreadInitializer>(vm, jInitializer);
ThreadManager* tm = ThreadManager::create(backendType, std::move(initializer));
return reinterpret_cast<jlong>(tm);
JNI_METHOD_END(-1L)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_release( // NOLINT
JNIEnv* env,
jclass,
jlong ntmHandle) {
JNI_METHOD_START
auto* threadManager = jniCastOrThrow<ThreadManager>(ntmHandle);
ThreadManager::release(threadManager);
JNI_METHOD_END()
}

JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_collectUsage( // NOLINT
JNIEnv* env,
jclass,
Expand Down
37 changes: 37 additions & 0 deletions cpp/core/threads/ThreadInitializer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "ThreadInitializer.h"

namespace gluten {
namespace {

/// A ThreadInitializer whose initialize() and destroy() are no-ops.
/// Used when no JVM or Spark task context is available (e.g., benchmarks).
class NoopThreadInitializer final : public ThreadInitializer {
public:
void initialize(const std::string& threadName) override {}
void destroy(const std::string& threadName) override{};
};

} // namespace

std::unique_ptr<ThreadInitializer> ThreadInitializer::noop() {
return std::make_unique<NoopThreadInitializer>();
}

} // namespace gluten
Loading
Loading