Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions include/dpp/queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,18 @@ class DPP_EXPORT http_request {
/**
* @brief True if request has been made.
*/
bool completed;
std::atomic<bool> completed;

/**
* @brief True for requests that are not going to discord (rate limits code skipped).
*/
bool non_discord;

/**
* @brief Client mutex
*/
std::mutex cli_mutex;

/**
* @brief HTTPS client
*/
Expand Down Expand Up @@ -460,6 +465,11 @@ class DPP_EXPORT request_concurrency_queue {
*/
std::shared_mutex in_mutex;

/**
* @brief Removals queue mutex thread safety.
*/
std::mutex rem_mutex;

/**
* @brief Inbound queue timer. The timer is called every second,
* and when it wakes up it checks for requests pending to be sent in the queue.
Expand Down Expand Up @@ -589,7 +599,7 @@ class DPP_EXPORT request_queue {
* When globally rate limited the concurrency queues associated with this request queue
* will not process any requests in their timers until the global rate limit expires.
*/
bool globally_ratelimited;
std::atomic<bool> globally_ratelimited;

/**
* @brief When we are globally rate limited until (unix epoch)
Expand Down
7 changes: 5 additions & 2 deletions src/dpp/cluster/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ bool cluster::stop_timer(timer t) {
void cluster::tick_timers() {
time_t now = time(nullptr);

if (next_timer.empty()) {
return;
{
std::lock_guard<std::mutex> l(timer_guard);
if (next_timer.empty()) {
return;
}
}
do {
timer_t cur_timer;
Expand Down
11 changes: 8 additions & 3 deletions src/dpp/queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor
}
http_connect_info hci = https_client::get_host_info(_host);
try {
cli = std::make_unique<https_client>(
std::unique_ptr<https_client> tmp = std::make_unique<https_client>(
owner,
hci.hostname,
hci.port,
Expand Down Expand Up @@ -286,6 +286,10 @@ http_request_completion_t http_request::run(request_concurrency_queue* processor
});
}
);
{
std::lock_guard<std::mutex> client(this->cli_mutex);
cli = std::move(tmp);
}
}
catch (const std::exception& e) {
owner->log(ll_error, "HTTP(S) error on " + hci.scheme + " connection to " + hci.hostname + ":" + std::to_string(hci.port) + ": " + std::string(e.what()));
Expand Down Expand Up @@ -313,7 +317,7 @@ request_concurrency_queue::request_concurrency_queue(class cluster* owner, class
tick_and_deliver_requests(in_index);
/* Clear pending removals in the removals queue */
if (time(nullptr) % 90 == 0) {
std::scoped_lock lock1{in_mutex};
std::scoped_lock lock1{rem_mutex};
for (auto it = removals.cbegin(); it != removals.cend();) {
if ((*it)->is_completed()) {
it = removals.erase(it);
Expand Down Expand Up @@ -404,7 +408,8 @@ void request_concurrency_queue::tick_and_deliver_requests(uint32_t index)
std::unique_ptr<http_request> rq;
{
/* Find the owned pointer in requests_in */
std::scoped_lock lock1{in_mutex};
std::scoped_lock requests_in_lock{in_mutex};
std::scoped_lock removals_queue_lock{rem_mutex};

const std::string &key = request_view->endpoint;
auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{});
Expand Down
5 changes: 2 additions & 3 deletions src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
int epoll_handle{INVALID_SOCKET};
static constexpr size_t MAX_EVENTS = 65536;
std::array<struct epoll_event, MAX_EVENTS> events{};
int sockets{0};
std::mutex sockets_mutex;

socket_engine_epoll(const socket_engine_epoll&) = delete;
socket_engine_epoll(socket_engine_epoll&&) = delete;
Expand Down Expand Up @@ -135,6 +135,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

if ((eh->flags & WANT_DELETION) != 0L) {
remove_socket(fd);
std::lock_guard<std::shared_mutex> lg(this->fds_mutex);
fds.erase(fd);
}
}
Expand All @@ -143,7 +144,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

bool register_socket(const socket_events& e) final {
bool r = socket_engine_base::register_socket(e);
sockets++;
if (r) {
struct epoll_event ev{};
ev.events = EPOLLET;
Expand Down Expand Up @@ -186,7 +186,6 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

bool remove_socket(dpp::socket fd) final {
struct epoll_event ev{};
sockets--;
epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev);
if (!owner->on_socket_close.empty()) {
socket_close_t event(owner, 0, "");
Expand Down
1 change: 1 addition & 0 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {

if ((eh->flags & WANT_DELETION) != 0L) {
remove_socket(kev.ident);
std::lock_guard<std::shared_mutex> lg(this->fds_mutex);
fds.erase(kev.ident);
}
}
Expand Down
Loading