Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ typedef struct pconnection_t {
psocket_t psocket;
pni_timer_t *timer;
const char *host, *port;
uint32_t new_events;
uint32_t new_os_events;
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
Expand Down Expand Up @@ -352,7 +352,7 @@ static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m
static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }

static inline bool pconnection_has_event(pconnection_t *pc) {
static inline bool pconnection_has_pn_event(pconnection_t *pc) {
return pn_connection_driver_has_event(&pc->driver);
}

Expand Down
40 changes: 28 additions & 12 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
task_init(&pc->task, PCONNECTION, p);
psocket_init(&pc->psocket, PCONNECTION_IO);
pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
pc->new_events = 0;
pc->new_os_events = 0;
pc->tick_pending = false;
pc->queued_disconnect = false;
pc->disconnect_condition = NULL;
Expand Down Expand Up @@ -905,7 +905,7 @@ static void pconnection_begin_close(pconnection_t *pc) {
static void pconnection_forced_shutdown(pconnection_t *pc) {
// Called by proactor_free, no competing threads, no epoll activity.
pc->current_arm = 0;
pc->new_events = 0;
pc->new_os_events = 0;
pconnection_begin_close(pc);
// pconnection_process will never be called again. Zero everything.
pc->task.ready = 0;
Expand Down Expand Up @@ -980,12 +980,28 @@ static inline bool pconnection_wclosed(pconnection_t *pc) {
Never rearm(0 | EPOLLONESHOT), since this really means
rearm(EPOLLHUP | EPOLLERR | EPOLLONESHOT) and leaves doubt that the
EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during
close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic.
close/shutdown.

Normally, let send()/recv() return 0 or -1 to trigger cleanup logic.
*/
static int pconnection_rearm_check(pconnection_t *pc) {
if ((pconnection_rclosed(pc) && pconnection_wclosed(pc)) || pc->psocket.epoll_io.fd == -1) {
return 0;
} else if (pc->disconnected) {
// Transport not closed, yet IO not possible. Skip another epoll since it will return
// immediately. PROTON-2930: either send/recv not called or no error (i.e. EAGAIN).
int soerr;
socklen_t soerrlen = sizeof(soerr);
int ec = getsockopt(pc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen);
if (ec || !soerr) {
soerr = ENOTCONN;
}
// Force transport closure.
psocket_error(&pc->psocket, soerr, pc->connected ? "disconnected" : "on connect");
schedule(&pc->task); // unassign_thread handles notify_poller requirement.
return 0;
}

uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0;
if (!pconnection_wclosed(pc)) {
if (pc->write_blocked)
Expand All @@ -1011,7 +1027,8 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {

/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
if (pc->new_os_events || pni_task_wake_pending(&pc->task) || pconnection_has_pn_event(pc) ||
pc->tick_pending || pc->queued_disconnect)
return true;
if (!pc->read_blocked && !pconnection_rclosed(pc))
return true;
Expand All @@ -1029,9 +1046,8 @@ static void pconnection_done(pconnection_t *pc) {
pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto
// working task instance while the lock is held.
pc->hog_count = 0;
bool has_event = pconnection_has_event(pc);

if (has_event || pconnection_work_pending(pc)) {
if (pconnection_work_pending(pc)) {
self_sched = true;
} else if (pn_connection_driver_finished(&pc->driver)) {
pconnection_begin_close(pc);
Expand Down Expand Up @@ -1141,7 +1157,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
pc->process_args = (pc->tick_pending << 1) | sched_ready;
}
if (events) {
pc->new_events = events;
pc->new_os_events = events;
pc->current_arm = 0;
events = 0;
}
Expand Down Expand Up @@ -1185,7 +1201,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
}

if (pconnection_has_event(pc)) {
if (pconnection_has_pn_event(pc)) {
unlock(&pc->task.mutex);
return &pc->batch;
}
Expand All @@ -1199,10 +1215,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
tick_required = !closed;
}

if (pc->new_events) {
uint32_t update_events = pc->new_events;
if (pc->new_os_events) {
uint32_t update_events = pc->new_os_events;
pc->current_arm = 0;
pc->new_events = 0;
pc->new_os_events = 0;
if (!pc->task.closing) {
if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
pconnection_maybe_connect_lh(pc);
Expand Down Expand Up @@ -1290,7 +1306,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
return NULL; // caller already owns the batch
}

if (pconnection_has_event(pc)) {
if (pconnection_has_pn_event(pc)) {
pc->output_drained = false;
return &pc->batch;
}
Expand Down
Loading