diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 97125ffe6a888f..ecbd01aac15c43 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -92,6 +92,7 @@ #include "storage/task/engine_storage_migration_task.h" #include "storage/txn/txn_manager.h" #include "storage/utils.h" +#include "udf/python/python_server.h" #include "util/brpc_client_cache.h" #include "util/debug_points.h" #include "util/jni-util.h" @@ -2596,6 +2597,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req) { if (clean_req.__isset.function_id && clean_req.function_id > 0) { UserFunctionCache::instance()->drop_function_cache(clean_req.function_id); + PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id); } LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature; diff --git a/be/src/udf/python/python_server.cpp b/be/src/udf/python/python_server.cpp index 228cab8d905b90..f78603e0bb96d9 100644 --- a/be/src/udf/python/python_server.cpp +++ b/be/src/udf/python/python_server.cpp @@ -31,6 +31,7 @@ #include "arrow/flight/client.h" #include "common/config.h" +#include "common/status.h" #include "udf/python/python_udaf_client.h" #include "udf/python/python_udf_client.h" #include "udf/python/python_udtf_client.h" @@ -413,7 +414,20 @@ Status PythonServerManager::clear_module_cache(const std::string& location) { } std::string body = fmt::format(R"({{"location": "{}"}})", location); + return _broadcast_action_to_processes("clear_module_cache", body, + fmt::format("location={}", location)); +} + +void PythonServerManager::clear_udaf_state_cache(int64_t function_id) { + std::string body = fmt::format(R"({{"function_id": {}}})", function_id); + WARN_IF_ERROR(_broadcast_action_to_processes("clear_udaf_state_cache", body, + fmt::format("function_id={}", function_id)), + "failed to clear Python UDAF state cache"); +} +Status PythonServerManager::_broadcast_action_to_processes(const std::string& action_type, + const std::string& body, + const std::string& log_name) { int success_count = 0; int fail_count = 0; bool has_active_process = false; @@ -441,7 +455,7 @@ Status PythonServerManager::clear_module_cache(const std::string& location) { auto client = std::move(*client_result); arrow::flight::Action action; - action.type = "clear_module_cache"; + action.type = action_type; action.body = arrow::Buffer::FromString(body); auto result_stream = client->DoAction(action); @@ -467,13 +481,12 @@ Status PythonServerManager::clear_module_cache(const std::string& location) { return Status::OK(); } - LOG(INFO) << "clear_module_cache completed for location=" << location - << ", success=" << success_count << ", failed=" << fail_count; + LOG(INFO) << action_type << " completed for " << log_name << ", success=" << success_count + << ", failed=" << fail_count; if (fail_count > 0) { - return Status::InternalError( - "clear_module_cache failed for location={}, success={}, failed={}", location, - success_count, fail_count); + return Status::InternalError("{} failed for {}, success={}, failed={}", action_type, + log_name, success_count, fail_count); } return Status::OK(); diff --git a/be/src/udf/python/python_server.h b/be/src/udf/python/python_server.h index 7aa452740c7b66..06ae5ca92a957a 100644 --- a/be/src/udf/python/python_server.h +++ b/be/src/udf/python/python_server.h @@ -52,6 +52,9 @@ class PythonServerManager { // Clear Python module cache for a specific UDF location across all processes Status clear_module_cache(const std::string& location); + // Clear Python UDAF runtime state after DROP FUNCTION + void clear_udaf_state_cache(int64_t function_id); + void shutdown(); #ifdef BE_TEST @@ -108,6 +111,8 @@ class PythonServerManager { std::shared_ptr _get_or_create_process_pool(const PythonVersion& version); std::vector>> _snapshot_process_pools(); + Status _broadcast_action_to_processes(const std::string& action_type, const std::string& body, + const std::string& log_name); std::unordered_map> _process_pools; // Protects the version -> pool handle map only. Per-version process operations are guarded diff --git a/be/src/udf/python/python_server.py b/be/src/udf/python/python_server.py index d16fc352178942..3290acc6bc6e24 100644 --- a/be/src/udf/python/python_server.py +++ b/be/src/udf/python/python_server.py @@ -455,6 +455,7 @@ class PythonUDFMeta: def __init__( self, + function_id: int, name: str, symbol: str, location: str, @@ -470,6 +471,7 @@ def __init__( Initialize Python UDF metadata. Args: + function_id: FE catalog function id name: UDF function name symbol: Symbol to load (function name or module.function) location: File path or directory containing the UDF @@ -481,6 +483,7 @@ def __init__( output_type: PyArrow data type for return value client_type: 0 for UDF, 1 for UDAF, 2 for UDTF """ + self.id = function_id self.name = name self.symbol = symbol self.location = location @@ -508,7 +511,7 @@ def __str__(self) -> str: """Returns a string representation of the UDF metadata.""" udf_load_type_str = "INLINE" if self.udf_load_type == 0 else "MODULE" return ( - f"PythonUDFMeta(name={self.name}, symbol={self.symbol}, " + f"PythonUDFMeta(id={self.id}, name={self.name}, symbol={self.symbol}, " f"location={self.location}, udf_load_type={udf_load_type_str}, runtime_version={self.runtime_version}, " f"always_nullable={self.always_nullable}, client_type={self.client_type.name}, " f"input_types={self.input_types}, output_type={self.output_type})" @@ -1575,8 +1578,9 @@ def __init__(self, location: str): location: Unix socket path for the server """ super().__init__(location) - # Use a dictionary to maintain separate state managers for each UDAF function - # Key: function signature (name + input_types), Value: UDAFStateManager instance + # Use a dictionary to maintain separate state managers for each UDAF function. + # Key includes function_id so DROP/CREATE with the same name and signature + # cannot reuse a class loaded from old inline code. self.udaf_state_managers: Dict[str, UDAFStateManager] = {} self.udaf_managers_lock = threading.Lock() @@ -1593,19 +1597,50 @@ def _get_udaf_state_manager( Returns: UDAFStateManager instance for this specific UDAF """ - # Create a unique key based on function name and argument types type_names = [str(field.type) for field in python_udaf_meta.input_types] - func_key = f"{python_udaf_meta.name}({','.join(type_names)})" + func_key = ( + f"{python_udaf_meta.id}:{python_udaf_meta.name}({','.join(type_names)})" + ) with self.udaf_managers_lock: - if func_key not in self.udaf_state_managers: + manager = self.udaf_state_managers.get(func_key) + if manager is None: manager = UDAFStateManager() # Load and set the UDAF class for this manager using UDAFClassLoader udaf_class = UDAFClassLoader.load_udaf_class(python_udaf_meta) manager.set_udaf_class(udaf_class) self.udaf_state_managers[func_key] = manager - return self.udaf_state_managers[func_key] + # Return the manager while holding the lock so a concurrent DROP cleanup + # cannot pop the key between lookup and return. + return manager + + def _clear_udaf_state_cache_by_function_id(self, function_id: int) -> int: + """ + Clear UDAF managers for a dropped function id. + + DROP FUNCTION cache cleanup is asynchronous. The runtime key still includes + function_id for correctness, while this action detaches dropped functions + from the manager registry so new exchanges cannot reuse the old UDAF class. + """ + prefix = f"{function_id}:" + cleared = 0 + + with self.udaf_managers_lock: + keys_to_remove = [ + key for key in self.udaf_state_managers if key.startswith(prefix) + ] + for key in keys_to_remove: + # Do not clear manager.states here. An already-started Flight + # exchange may still hold this manager and continue with later + # SERIALIZE/FINALIZE/DESTROY calls for its place_ids. + self.udaf_state_managers.pop(key, None) + cleared += 1 + + if cleared: + gc.collect() + + return cleared @staticmethod def parse_python_udf_meta( @@ -1623,6 +1658,7 @@ def parse_python_udf_meta( return None cmd_json = json.loads(descriptor.command) + function_id = cmd_json["id"] name = cmd_json["name"] symbol = cmd_json["symbol"] location = cmd_json["location"] @@ -1648,6 +1684,7 @@ def parse_python_udf_meta( output_type = output_schema.field(0).type python_udf_meta = PythonUDFMeta( + function_id=function_id, name=name, symbol=symbol, location=location, @@ -2513,14 +2550,42 @@ def do_action( Supported actions: - "clear_module_cache": Clear Python module cache for a specific location Body: JSON with "location" field (the UDF cache directory path) + - "clear_udaf_state_cache": Clear UDAF runtime state for a dropped function id + Body: JSON with "function_id" field """ action_type = action.type if action_type == "clear_module_cache": yield from self._handle_clear_module_cache(action.body.to_pybytes()) + elif action_type == "clear_udaf_state_cache": + yield from self._handle_clear_udaf_state_cache(action.body.to_pybytes()) else: raise flight.FlightUnavailableError(f"Unknown action: {action_type}") + def _handle_clear_udaf_state_cache(self, body: bytes): + """ + Clear cached UDAF state managers for a dropped function id. + """ + try: + params = json.loads(body.decode("utf-8")) + function_id = int(params["function_id"]) + + cleared_managers = self._clear_udaf_state_cache_by_function_id(function_id) + + result = { + "success": True, + "cleared_managers": cleared_managers, + "function_id": function_id, + } + yield flight.Result(json.dumps(result).encode("utf-8")) + + except Exception as e: + logging.error("clear_udaf_state_cache failed: %s", e) + yield flight.Result(json.dumps({ + "success": False, + "error": str(e) + }).encode("utf-8")) + def _handle_clear_module_cache(self, body: bytes): """ Clear Python module cache for a specific UDF location. diff --git a/be/src/udf/python/python_udf_meta.cpp b/be/src/udf/python/python_udf_meta.cpp index 88af0c9ff64128..446999895b8f67 100644 --- a/be/src/udf/python/python_udf_meta.cpp +++ b/be/src/udf/python/python_udf_meta.cpp @@ -56,6 +56,7 @@ Status PythonUDFMeta::serialize_arrow_schema(const std::shared_ptr(type)), allocator); diff --git a/be/test/udf/python/python_server_test.cpp b/be/test/udf/python/python_server_test.cpp index 557815f750623d..4375631a5ce26a 100644 --- a/be/test/udf/python/python_server_test.cpp +++ b/be/test/udf/python/python_server_test.cpp @@ -21,21 +21,22 @@ #include #include +#include #include #include -#include #include +#include #include "common/config.h" #include "common/status.h" #include "udf/python/python_env.h" #include "udf/python/python_udf_client.h" #include "udf/python/python_udf_meta.h" -#include "udf/python/python_udf_runtime.h" namespace doris { namespace fs = std::filesystem; +namespace bp = boost::process; class PythonServerTest : public ::testing::Test { protected: @@ -136,6 +137,13 @@ class PythonServerTest : public ::testing::Test { ofs << "# fake server\n"; ofs.close(); } + + ProcessPtr create_sleep_process() { + bp::ipstream output_stream; + std::string sleep_path = fs::exists("/bin/sleep") ? "/bin/sleep" : "/usr/bin/sleep"; + bp::child child(sleep_path, "60", bp::std_out > output_stream, bp::std_err > bp::null); + return std::make_shared(std::move(child), std::move(output_stream)); + } }; // ============================================================================ @@ -304,6 +312,39 @@ TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) { EXPECT_NO_THROW(mgr.shutdown()); } +TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) { + PythonServerManager mgr; + + EXPECT_NO_THROW(mgr.clear_udaf_state_cache(12345)); +} + +TEST_F(PythonServerTest, ClearModuleCacheWithoutProcessesIsNoOp) { + PythonServerManager mgr; + + auto status = mgr.clear_module_cache("/tmp/python_udf_cache"); + EXPECT_TRUE(status.ok()) << status.to_string(); +} + +TEST_F(PythonServerTest, BroadcastActionWithInvalidProcessUriReturnsError) { + PythonServerManager mgr; + PythonVersion version("3.9.16", test_dir_, test_dir_ + "/bin/python3"); + ProcessPtr process = create_sleep_process(); + ASSERT_NE(process, nullptr); + ASSERT_TRUE(process->is_alive()); + process->_uri = "invalid-python-flight-uri"; + + mgr.set_process_pool_for_test(version, {process}); + auto status = mgr._broadcast_action_to_processes( + "clear_udaf_state_cache", R"({"function_id": 12345})", "function_id=12345"); + + EXPECT_FALSE(status.ok()); + EXPECT_NE(status.to_string().find("clear_udaf_state_cache failed for function_id=12345"), + std::string::npos); + EXPECT_NE(status.to_string().find("success=0, failed=1"), std::string::npos); + + mgr.shutdown(); +} + // ============================================================================ // PythonServerManager::get_client() - client retrieval test // ============================================================================ diff --git a/be/test/udf/python/python_udf_meta_test.cpp b/be/test/udf/python/python_udf_meta_test.cpp index b913f49d19b5f1..06f3d95efe4146 100644 --- a/be/test/udf/python/python_udf_meta_test.cpp +++ b/be/test/udf/python/python_udf_meta_test.cpp @@ -323,6 +323,9 @@ TEST_F(PythonUDFMetaTest, SerializeToJsonBasic) { doc.Parse(json_str.c_str()); EXPECT_FALSE(doc.HasParseError()); + EXPECT_TRUE(doc.HasMember("id")); + EXPECT_EQ(doc["id"].GetInt64(), 1); + EXPECT_TRUE(doc.HasMember("name")); EXPECT_STREQ(doc["name"].GetString(), "test_udf"); diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy index 4b64921676fd0b..e0b0ed8c4668e9 100644 --- a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy +++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy @@ -124,10 +124,73 @@ suite('test_pythonudaf_drop', "nonConcurrent") { qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM py_udaf_drop_tbl;''' try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);') + + // Case 4: inline UDAF drop/recreate must not reuse the old Python class. + // The Python server caches UDAF state managers, so this verifies the cache key + // and drop cleanup both use the FE function id, not just name + argument types. + sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)''' + sql """ + CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT) + RETURNS BIGINT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "InlineDropRecreateUdaf", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +class InlineDropRecreateUdaf: + def __init__(self): + self.total = 0 + @property + def aggregate_state(self): + return self.total + def accumulate(self, val): + if val is not None: + self.total += val + def merge(self, other): + self.total += other + def finish(self): + return self.total * 10 +\$\$ + """ + def inlineOldResult = sql '''SELECT py_drop_inline_recreate(v) FROM py_udaf_drop_tbl;''' + assert inlineOldResult[0][0].toString() == '60' + + sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)''' + sql """ + CREATE AGGREGATE FUNCTION py_drop_inline_recreate(INT) + RETURNS BIGINT + PROPERTIES ( + "type" = "PYTHON_UDF", + "symbol" = "InlineDropRecreateUdaf", + "runtime_version" = "${runtime_version}", + "always_nullable" = "true" + ) + AS \$\$ +class InlineDropRecreateUdaf: + def __init__(self): + self.total = 0 + @property + def aggregate_state(self): + return self.total + def accumulate(self, val): + if val is not None: + self.total += val + def merge(self, other): + self.total += other + def finish(self): + return self.total * 100 +\$\$ + """ + def inlineNewResult = sql '''SELECT py_drop_inline_recreate(v) FROM py_udaf_drop_tbl;''' + assert inlineNewResult[0][0].toString() == '600' + sql '''DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT)''' } finally { try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);') try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);') try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);') try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);') + try_sql('DROP FUNCTION IF EXISTS py_drop_inline_recreate(INT);') } }