Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
#include "storage/task/engine_storage_migration_task.h"
#include "storage/txn/txn_manager.h"
#include "storage/utils.h"
#include "udf/python/python_server.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/jni-util.h"
Expand Down Expand Up @@ -2596,6 +2597,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req) {

if (clean_req.__isset.function_id && clean_req.function_id > 0) {
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
}

LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
Expand Down
24 changes: 18 additions & 6 deletions be/src/udf/python/python_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "arrow/flight/client.h"
#include "common/config.h"
#include "common/status.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
Expand Down Expand Up @@ -413,7 +414,19 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
}

std::string body = fmt::format(R"({{"location": "{}"}})", location);
return _broadcast_action_to_processes("clear_module_cache", body,
fmt::format("location={}", location));
}

void PythonServerManager::clear_udaf_state_cache(int64_t function_id) {
std::string body = fmt::format(R"({{"function_id": {}}})", function_id);
THROW_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body,
fmt::format("function_id={}", function_id)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can crash the BE from a DROP FUNCTION cleanup task. _broadcast_action_to_processes() returns an error whenever an active Python process fails DoAction/Next, and THROW_IF_ERROR converts that into a doris::Exception. The caller is clean_udf_cache_callback(), which is run by TaskWorkerPool via _callback(task) without any catch boundary, so an uncaught exception terminates the worker thread/process instead of reporting a best-effort cache cleanup failure. The existing module-cache cleanup path logs and continues; this should return Status and be handled with a warning (or use the existing cleanup-style WARN_IF_ERROR) rather than throwing out of the task callback.

}

Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type,
const std::string& body,
const std::string& log_name) {
int success_count = 0;
int fail_count = 0;
bool has_active_process = false;
Expand Down Expand Up @@ -441,7 +454,7 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
auto client = std::move(*client_result);

arrow::flight::Action action;
action.type = "clear_module_cache";
action.type = action_type;
action.body = arrow::Buffer::FromString(body);

auto result_stream = client->DoAction(action);
Expand All @@ -467,13 +480,12 @@ Status PythonServerManager::clear_module_cache(const std::string& location) {
return Status::OK();
}

LOG(INFO) << "clear_module_cache completed for location=" << location
<< ", success=" << success_count << ", failed=" << fail_count;
LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count
<< ", failed=" << fail_count;

if (fail_count > 0) {
return Status::InternalError(
"clear_module_cache failed for location={}, success={}, failed={}", location,
success_count, fail_count);
return Status::InternalError("{} failed for {}, success={}, failed={}", action_type,
log_name, success_count, fail_count);
}

return Status::OK();
Expand Down
5 changes: 5 additions & 0 deletions be/src/udf/python/python_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class PythonServerManager {
// Clear Python module cache for a specific UDF location across all processes
Status clear_module_cache(const std::string& location);

// Clear Python UDAF runtime state after DROP FUNCTION
void clear_udaf_state_cache(int64_t function_id);

void shutdown();

#ifdef BE_TEST
Expand Down Expand Up @@ -108,6 +111,8 @@ class PythonServerManager {
std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const PythonVersion& version);
std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>>
_snapshot_process_pools();
Status _broadcast_action_to_processes(const std::string& action_type, const std::string& body,
const std::string& log_name);

std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> _process_pools;
// Protects the version -> pool handle map only. Per-version process operations are guarded
Expand Down
70 changes: 65 additions & 5 deletions be/src/udf/python/python_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ class PythonUDFMeta:

def __init__(
self,
function_id: int,
name: str,
symbol: str,
location: str,
Expand All @@ -470,6 +471,7 @@ def __init__(
Initialize Python UDF metadata.

Args:
function_id: FE catalog function id
name: UDF function name
symbol: Symbol to load (function name or module.function)
location: File path or directory containing the UDF
Expand All @@ -481,6 +483,7 @@ def __init__(
output_type: PyArrow data type for return value
client_type: 0 for UDF, 1 for UDAF, 2 for UDTF
"""
self.id = function_id
self.name = name
self.symbol = symbol
self.location = location
Expand Down Expand Up @@ -508,7 +511,7 @@ def __str__(self) -> str:
"""Returns a string representation of the UDF metadata."""
udf_load_type_str = "INLINE" if self.udf_load_type == 0 else "MODULE"
return (
f"PythonUDFMeta(name={self.name}, symbol={self.symbol}, "
f"PythonUDFMeta(id={self.id}, name={self.name}, symbol={self.symbol}, "
f"location={self.location}, udf_load_type={udf_load_type_str}, runtime_version={self.runtime_version}, "
f"always_nullable={self.always_nullable}, client_type={self.client_type.name}, "
f"input_types={self.input_types}, output_type={self.output_type})"
Expand Down Expand Up @@ -1575,8 +1578,9 @@ def __init__(self, location: str):
location: Unix socket path for the server
"""
super().__init__(location)
# Use a dictionary to maintain separate state managers for each UDAF function
# Key: function signature (name + input_types), Value: UDAFStateManager instance
# Use a dictionary to maintain separate state managers for each UDAF function.
# Key includes function_id so DROP/CREATE with the same name and signature
# cannot reuse a class loaded from old inline code.
self.udaf_state_managers: Dict[str, UDAFStateManager] = {}
self.udaf_managers_lock = threading.Lock()

Expand All @@ -1593,9 +1597,10 @@ def _get_udaf_state_manager(
Returns:
UDAFStateManager instance for this specific UDAF
"""
# Create a unique key based on function name and argument types
type_names = [str(field.type) for field in python_udaf_meta.input_types]
func_key = f"{python_udaf_meta.name}({','.join(type_names)})"
func_key = (
f"{python_udaf_meta.id}:{python_udaf_meta.name}({','.join(type_names)})"
)

with self.udaf_managers_lock:
if func_key not in self.udaf_state_managers:
Expand All @@ -1607,6 +1612,31 @@ def _get_udaf_state_manager(

return self.udaf_state_managers[func_key]

def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int:
"""
Clear UDAF managers for a dropped function id.

DROP FUNCTION cache cleanup is asynchronous. The runtime key still includes
function_id for correctness, while this action releases old states and class
objects after the drop task reaches this Python process.
"""
prefix = f"{function_id}:"
cleared = 0

with self.udaf_managers_lock:
keys_to_remove = [
key for key in self.udaf_state_managers if key.startswith(prefix)
]
for key in keys_to_remove:
manager = self.udaf_state_managers.pop(key)
manager.states.clear()
cleared += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing manager.states can invalidate in-flight UDAF queries. The DROP cleanup task is submitted asynchronously after FE removes the function, while an already-started query can still have a Flight exchange using this same UDAFStateManager for the old function id. If this action runs between that query's CREATE/ACCUMULATE and later SERIALIZE/FINALIZE/DESTROY calls, those operations will find their place_id entries removed and fail with KeyError/failed UDAF results. Since adding function_id to the key already prevents a recreated function from reusing the old class, cleanup should detach the manager from udaf_state_managers without clearing a manager that active exchanges may still reference, or add explicit lifecycle/ref-count coordination. Also consider returning the manager from _get_udaf_state_manager() while still under the lock so a concurrent pop cannot occur between lookup and return.


if cleared:
gc.collect()

return cleared

@staticmethod
def parse_python_udf_meta(
descriptor: flight.FlightDescriptor,
Expand All @@ -1623,6 +1653,7 @@ def parse_python_udf_meta(
return None

cmd_json = json.loads(descriptor.command)
function_id = cmd_json["id"]
name = cmd_json["name"]
symbol = cmd_json["symbol"]
location = cmd_json["location"]
Expand All @@ -1648,6 +1679,7 @@ def parse_python_udf_meta(
output_type = output_schema.field(0).type

python_udf_meta = PythonUDFMeta(
function_id=function_id,
name=name,
symbol=symbol,
location=location,
Expand Down Expand Up @@ -2513,14 +2545,42 @@ def do_action(
Supported actions:
- "clear_module_cache": Clear Python module cache for a specific location
Body: JSON with "location" field (the UDF cache directory path)
- "clear_udaf_state_cache": Clear UDAF runtime state for a dropped function id
Body: JSON with "function_id" field
"""
action_type = action.type

if action_type == "clear_module_cache":
yield from self._handle_clear_module_cache(action.body.to_pybytes())
elif action_type == "clear_udaf_state_cache":
yield from self._handle_clear_udaf_state_cache(action.body.to_pybytes())
else:
raise flight.FlightUnavailableError(f"Unknown action: {action_type}")

def _handle_clear_udaf_state_cache(self, body: bytes):
"""
Clear cached UDAF state managers for a dropped function id.
"""
try:
params = json.loads(body.decode("utf-8"))
function_id = int(params["function_id"])

cleared_managers = self._clear_udaf_state_cache_by_function_id(function_id)

result = {
"success": True,
"cleared_managers": cleared_managers,
"function_id": function_id,
}
yield flight.Result(json.dumps(result).encode("utf-8"))

except Exception as e:
logging.error("clear_udaf_state_cache failed: %s", e)
yield flight.Result(json.dumps({
"success": False,
"error": str(e)
}).encode("utf-8"))

def _handle_clear_module_cache(self, body: bytes):
"""
Clear Python module cache for a specific UDF location.
Expand Down
2 changes: 2 additions & 0 deletions be/src/udf/python/python_udf_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Status PythonUDFMeta::serialize_arrow_schema(const std::shared_ptr<arrow::Schema
json format:
{
"name": "xxx",
"id": 123,
"symbol": "xxx",
"location": "xxx",
"udf_load_type": 0 or 1,
Expand All @@ -72,6 +73,7 @@ Status PythonUDFMeta::serialize_to_json(std::string* json_str) const {
doc.SetObject();
auto& allocator = doc.GetAllocator();
doc.AddMember("name", rapidjson::Value().SetString(name.c_str(), allocator), allocator);
doc.AddMember("id", rapidjson::Value().SetInt64(id), allocator);
doc.AddMember("symbol", rapidjson::Value().SetString(symbol.c_str(), allocator), allocator);
doc.AddMember("location", rapidjson::Value().SetString(location.c_str(), allocator), allocator);
doc.AddMember("udf_load_type", rapidjson::Value().SetInt(static_cast<int>(type)), allocator);
Expand Down
13 changes: 13 additions & 0 deletions be/test/udf/python/python_server_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,19 @@ TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) {
EXPECT_NO_THROW(mgr.shutdown());
}

TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) {
PythonServerManager mgr;

EXPECT_NO_THROW(mgr.clear_udaf_state_cache(12345));
}

TEST_F(PythonServerTest, ClearModuleCacheWithoutProcessesIsNoOp) {
PythonServerManager mgr;

auto status = mgr.clear_module_cache("/tmp/python_udf_cache");
EXPECT_TRUE(status.ok()) << status.to_string();
}

// ============================================================================
// PythonServerManager::get_client() - client retrieval test
// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions be/test/udf/python/python_udf_meta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ TEST_F(PythonUDFMetaTest, SerializeToJsonBasic) {
doc.Parse(json_str.c_str());
EXPECT_FALSE(doc.HasParseError());

EXPECT_TRUE(doc.HasMember("id"));
EXPECT_EQ(doc["id"].GetInt64(), 1);

EXPECT_TRUE(doc.HasMember("name"));
EXPECT_STREQ(doc["name"].GetString(), "test_udf");

Expand Down
63 changes: 63 additions & 0 deletions regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,73 @@ suite('test_pythonudaf_drop', "nonConcurrent") {

qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM py_udaf_drop_tbl;'''
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')

// Case 4: inline UDAF drop/recreate must not reuse the old Python class.
// The Python server caches UDAF state managers, so this verifies the cache key
// and drop cleanup both use the FE function id, not just name + argument types.
sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
sql """
CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT)
RETURNS BIGINT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "InlineDropRecreateUdaf",
"runtime_version" = "${runtime_version}",
"always_nullable" = "true"
)
AS \$\$
class InlineDropRecreateUdaf:
def __init__(self):
self.total = 0
@property
def aggregate_state(self):
return self.total
def accumulate(self, val):
if val is not None:
self.total += val
def merge(self, other):
self.total += other
def finish(self):
return self.total * 10
\$\$
"""
def inlineOldResult = sql '''SELECT py_drop_inline_recreate(v) FROM py_udaf_drop_tbl;'''
assert inlineOldResult[0][0].toString() == '60'

sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
sql """
CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT)
RETURNS BIGINT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "InlineDropRecreateUdaf",
"runtime_version" = "${runtime_version}",
"always_nullable" = "true"
)
AS \$\$
class InlineDropRecreateUdaf:
def __init__(self):
self.total = 0
@property
def aggregate_state(self):
return self.total
def accumulate(self, val):
if val is not None:
self.total += val
def merge(self, other):
self.total += other
def finish(self):
return self.total * 100
\$\$
"""
def inlineNewResult = sql '''SELECT py_drop_inline_recreate(v) FROM py_udaf_drop_tbl;'''
assert inlineNewResult[0][0].toString() == '600'
sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)'''
} finally {
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT);')
}
}
Loading