diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 5a6a37ac3..7ae86f136 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -243,7 +243,7 @@ typedef struct pconnection_t { psocket_t psocket; pni_timer_t *timer; const char *host, *port; - uint32_t new_events; + uint32_t new_os_events; bool server; /* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ @@ -352,7 +352,7 @@ static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m static inline void lock(pmutex *m) { pthread_mutex_lock(m); } static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); } -static inline bool pconnection_has_event(pconnection_t *pc) { +static inline bool pconnection_has_pn_event(pconnection_t *pc) { return pn_connection_driver_has_event(&pc->driver); } diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6cc0d38f0..df1bde8b2 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -800,7 +800,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con task_init(&pc->task, PCONNECTION, p); psocket_init(&pc->psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port); - pc->new_events = 0; + pc->new_os_events = 0; pc->tick_pending = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -905,7 +905,7 @@ static void pconnection_begin_close(pconnection_t *pc) { static void pconnection_forced_shutdown(pconnection_t *pc) { // Called by proactor_free, no competing threads, no epoll activity. pc->current_arm = 0; - pc->new_events = 0; + pc->new_os_events = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. pc->task.ready = 0; @@ -980,12 +980,28 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { Never rearm(0 | EPOLLONESHOT), since this really means rearm(EPOLLHUP | EPOLLERR | EPOLLONESHOT) and leaves doubt that the EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during - close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic. + close/shutdown. + + Normally, let send()/recv() return 0 or -1 to trigger cleanup logic. */ static int pconnection_rearm_check(pconnection_t *pc) { if ((pconnection_rclosed(pc) && pconnection_wclosed(pc)) || pc->psocket.epoll_io.fd == -1) { return 0; + } else if (pc->disconnected) { + // Transport not closed, yet IO not possible. Skip another epoll since it will return + // immediately. PROTON-2930: either send/recv not called or no error (i.e. EAGAIN). + int soerr; + socklen_t soerrlen = sizeof(soerr); + int ec = getsockopt(pc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); + if (ec || !soerr) { + soerr = ENOTCONN; + } + // Force transport closure. + psocket_error(&pc->psocket, soerr, pc->connected ? "disconnected" : "on connect"); + schedule(&pc->task); // unassign_thread handles notify_poller requirement. + return 0; } + uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0; if (!pconnection_wclosed(pc)) { if (pc->write_blocked) @@ -1011,7 +1027,8 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) + if (pc->new_os_events || pni_task_wake_pending(&pc->task) || pconnection_has_pn_event(pc) || + pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -1029,9 +1046,8 @@ static void pconnection_done(pconnection_t *pc) { pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto // working task instance while the lock is held. pc->hog_count = 0; - bool has_event = pconnection_has_event(pc); - if (has_event || pconnection_work_pending(pc)) { + if (pconnection_work_pending(pc)) { self_sched = true; } else if (pn_connection_driver_finished(&pc->driver)) { pconnection_begin_close(pc); @@ -1141,7 +1157,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->process_args = (pc->tick_pending << 1) | sched_ready; } if (events) { - pc->new_events = events; + pc->new_os_events = events; pc->current_arm = 0; events = 0; } @@ -1185,7 +1201,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } - if (pconnection_has_event(pc)) { + if (pconnection_has_pn_event(pc)) { unlock(&pc->task.mutex); return &pc->batch; } @@ -1199,10 +1215,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, tick_required = !closed; } - if (pc->new_events) { - uint32_t update_events = pc->new_events; + if (pc->new_os_events) { + uint32_t update_events = pc->new_os_events; pc->current_arm = 0; - pc->new_events = 0; + pc->new_os_events = 0; if (!pc->task.closing) { if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) pconnection_maybe_connect_lh(pc); @@ -1290,7 +1306,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, return NULL; // caller already owns the batch } - if (pconnection_has_event(pc)) { + if (pconnection_has_pn_event(pc)) { pc->output_drained = false; return &pc->batch; }