From d7301874dcaba5958eb7939dd7afaecbee3f249d Mon Sep 17 00:00:00 2001 From: fclivaz42 Date: Fri, 15 May 2026 13:27:16 +0200 Subject: [PATCH] fix: add protections against dataraces in epoll engine, cluster timer and queues --- include/dpp/queues.h | 14 ++++++++++++-- src/dpp/cluster/timer.cpp | 7 +++++-- src/dpp/queues.cpp | 11 ++++++++--- src/dpp/socketengines/epoll.cpp | 5 ++--- src/dpp/socketengines/kqueue.cpp | 1 + 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/include/dpp/queues.h b/include/dpp/queues.h index 364f41ee6c..2b6bd71ab3 100644 --- a/include/dpp/queues.h +++ b/include/dpp/queues.h @@ -229,13 +229,18 @@ class DPP_EXPORT http_request { /** * @brief True if request has been made. */ - bool completed; + std::atomic completed; /** * @brief True for requests that are not going to discord (rate limits code skipped). */ bool non_discord; + /** + * @brief Client mutex + */ + std::mutex cli_mutex; + /** * @brief HTTPS client */ @@ -460,6 +465,11 @@ class DPP_EXPORT request_concurrency_queue { */ std::shared_mutex in_mutex; + /** + * @brief Removals queue mutex thread safety. + */ + std::mutex rem_mutex; + /** * @brief Inbound queue timer. The timer is called every second, * and when it wakes up it checks for requests pending to be sent in the queue. @@ -589,7 +599,7 @@ class DPP_EXPORT request_queue { * When globally rate limited the concurrency queues associated with this request queue * will not process any requests in their timers until the global rate limit expires. */ - bool globally_ratelimited; + std::atomic globally_ratelimited; /** * @brief When we are globally rate limited until (unix epoch) diff --git a/src/dpp/cluster/timer.cpp b/src/dpp/cluster/timer.cpp index 1844fd20d5..37a248474a 100644 --- a/src/dpp/cluster/timer.cpp +++ b/src/dpp/cluster/timer.cpp @@ -57,8 +57,11 @@ bool cluster::stop_timer(timer t) { void cluster::tick_timers() { time_t now = time(nullptr); - if (next_timer.empty()) { - return; + { + std::lock_guard l(timer_guard); + if (next_timer.empty()) { + return; + } } do { timer_t cur_timer; diff --git a/src/dpp/queues.cpp b/src/dpp/queues.cpp index aa7e88d0a2..8d6cda2f0a 100644 --- a/src/dpp/queues.cpp +++ b/src/dpp/queues.cpp @@ -225,7 +225,7 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor } http_connect_info hci = https_client::get_host_info(_host); try { - cli = std::make_unique( + std::unique_ptr tmp = std::make_unique( owner, hci.hostname, hci.port, @@ -286,6 +286,10 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor }); } ); + { + std::lock_guard client(this->cli_mutex); + cli = std::move(tmp); + } } catch (const std::exception& e) { owner->log(ll_error, "HTTP(S) error on " + hci.scheme + " connection to " + hci.hostname + ":" + std::to_string(hci.port) + ": " + std::string(e.what())); @@ -313,7 +317,7 @@ request_concurrency_queue::request_concurrency_queue(class cluster* owner, class tick_and_deliver_requests(in_index); /* Clear pending removals in the removals queue */ if (time(nullptr) % 90 == 0) { - std::scoped_lock lock1{in_mutex}; + std::scoped_lock lock1{rem_mutex}; for (auto it = removals.cbegin(); it != removals.cend();) { if ((*it)->is_completed()) { it = removals.erase(it); @@ -404,7 +408,8 @@ void request_concurrency_queue::tick_and_deliver_requests(uint32_t index) std::unique_ptr rq; { /* Find the owned pointer in requests_in */ - std::scoped_lock lock1{in_mutex}; + std::scoped_lock requests_in_lock{in_mutex}; + std::scoped_lock removals_queue_lock{rem_mutex}; const std::string &key = request_view->endpoint; auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{}); diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index b0be2e6b5b..d641d335e2 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -55,7 +55,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { int epoll_handle{INVALID_SOCKET}; static constexpr size_t MAX_EVENTS = 65536; std::array events{}; - int sockets{0}; + std::mutex sockets_mutex; socket_engine_epoll(const socket_engine_epoll&) = delete; socket_engine_epoll(socket_engine_epoll&&) = delete; @@ -135,6 +135,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((eh->flags & WANT_DELETION) != 0L) { remove_socket(fd); + std::lock_guard lg(this->fds_mutex); fds.erase(fd); } } @@ -143,7 +144,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { bool register_socket(const socket_events& e) final { bool r = socket_engine_base::register_socket(e); - sockets++; if (r) { struct epoll_event ev{}; ev.events = EPOLLET; @@ -186,7 +186,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { bool remove_socket(dpp::socket fd) final { struct epoll_event ev{}; - sockets--; epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev); if (!owner->on_socket_close.empty()) { socket_close_t event(owner, 0, ""); diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index 498cd56123..501153f238 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -108,6 +108,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { if ((eh->flags & WANT_DELETION) != 0L) { remove_socket(kev.ident); + std::lock_guard lg(this->fds_mutex); fds.erase(kev.ident); } }