Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ typedef struct pconnection_t {
pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
bool io_doublecheck; /* callbacks made and new IO may have arrived */
uint64_t expected_timeout;
bool name_lookup_pending;
char addr_buf[1];
} pconnection_t;

Expand Down
30 changes: 24 additions & 6 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer.
// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
static inline bool pconnection_is_final(pconnection_t *pc) {
return !pc->current_arm && !pc->task.ready && !pc->tick_pending;
return !pc->current_arm && !pc->task.ready && !pc->tick_pending && !pc->name_lookup_pending;
}

static void pconnection_final_free(pconnection_t *pc) {
Expand Down Expand Up @@ -1400,6 +1400,7 @@ static void pconnection_start(pconnection_t *pc, int fd) {
}

/* Called on initial connect, and if connection fails to try another address */
/* May be called within the pconnection task or from an external name_lookup task */
static void pconnection_maybe_connect_lh(pconnection_t *pc) {
errno = 0;
if (!pc->connected) { /* Not yet connected */
Expand Down Expand Up @@ -1445,8 +1446,7 @@ bool schedule_if_inactive(pn_proactor_t *p) {

/* Called when connection name lookup completes (from name_lookup done_cb). Call with task lock held. */
static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) {
pn_proactor_t *p = pc->task.proactor;
bool notify = false;
pc->name_lookup_pending = false;
if (gai_error) {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
} else if (ai) {
Expand All @@ -1457,8 +1457,8 @@ static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, in
return;
}
}
notify = schedule(&pc->task);
if (notify) notify_poller(p);
bool notify = schedule(&pc->task);
if (notify) notify_poller(pc->task.proactor);
}

static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) {
Expand All @@ -1472,10 +1472,28 @@ static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_err
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
pn_proactor_t *p = pc->task.proactor;
pn_transport_t *tp = pc->driver.transport;
pc->name_lookup_pending = true;

unlock(&pc->task.mutex);
bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb);
lock(&pc->task.mutex);
return rc;

if (!rc) {
// Either the callback was synchronous or no callback was possible
if (pc->name_lookup_pending) {
// Clean up since there will be no callback.
pc->name_lookup_pending = false;
psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect");
}
return false;
}
// Name lookup started. Callback may have already completed and failed.
if (!pc->name_lookup_pending) {
if (pn_condition_is_set(pn_transport_condition(tp)))
return false;
}
return true;
}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
Expand Down
72 changes: 55 additions & 17 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct praw_connection_t {
bool hup_detected;
bool read_check;
bool first_schedule;
bool name_lookup_pending;
char *taddr;
};

Expand Down Expand Up @@ -110,8 +111,14 @@ static void praw_connection_start(praw_connection_t *prc, int fd) {
}

/* Called on initial connect, and if connection fails to try another address */
/* May be called within the praw_connection task or from an external name_lookup task */
static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
if (prc->task.closing) {
return;
}
int err = 0; /* Initialized in case while loop has zero iterations */
while (prc->ai) { /* Have an address */
err = 0;
struct addrinfo *ai = prc->ai;
prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
int fd = socket(ai->ai_family, SOCK_STREAM, 0);
Expand All @@ -125,14 +132,19 @@ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
praw_connection_start(prc, fd);
return; /* Async connection started */
} else {
err = errno;
close(fd);
}
} else {
err = errno;
}
/* connect failed immediately, go round the loop to try the next addr */
}
int err;
socklen_t errlen = sizeof(err);
getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);

if (err == 0 && prc->psocket.epoll_io.fd >= 0) {
socklen_t errlen = sizeof(err);
getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
}
psocket_error(prc, err, "on connect");

freeaddrinfo(prc->addrinfo);
Expand All @@ -144,6 +156,8 @@ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct addrinfo *ai, int gai_error) {
pn_proactor_t *p = prc->task.proactor;
bool notify = false;

prc->name_lookup_pending = false;
if (gai_error) {
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
} else if (ai) {
Expand Down Expand Up @@ -224,21 +238,29 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
// Call from pconnection_process with no locks.
// Callback may complete before pni_name_lookup_start returns.
static void praw_connection_first_connect(praw_connection_t *prc) {
pn_proactor_t *p = prc->task.proactor;

unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
const char *host;
const char *port;
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, raw_connection_done_cb);
lock(&prc->task.mutex);

return rc;
if (!rc) {
// Either the callback was synchronous or no callback was possible
bool notify = false;
lock(&prc->task.mutex);
if (prc->name_lookup_pending) {
// Clean up since there will be no callback.
prc->name_lookup_pending = false;
psocket_error(prc, EAI_FAIL, "internal error on connect");
notify = schedule(&prc->task);
}
unlock(&prc->task.mutex);
if (notify) notify_poller(p);
}
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
Expand Down Expand Up @@ -438,22 +460,37 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
rc->armed = false;
rc->current_arm = 0;
}
if (pni_raw_finished(&rc->raw_connection)) {
if (pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending) {
t->working = false;
unlock(&rc->task.mutex);
praw_initiate_cleanup(rc);
return NULL;
}
if (rc->task.closing) {
// rclosed and wclosed. Allow final events to be processed.
unlock(&rc->task.mutex);
return &rc->batch;
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;

if (rc->first_schedule) {
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
if (praw_connection_first_connect_lh(rc)) {
bool wake_event = pni_task_wake_pending(&rc->task);

t->working = false;
rc->name_lookup_pending = true;
unlock(&rc->task.mutex);
praw_connection_first_connect(rc);
if (wake_event) {
lock(&rc->task.mutex);
t->working = true;
unlock(&rc->task.mutex);
return NULL;
return &rc->batch;
Comment thread
astitcher marked this conversation as resolved.
}
return NULL;
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
Expand Down Expand Up @@ -525,9 +562,10 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
notify = (wake_pending || have_event) && schedule(&rc->task);
ready = rc->task.ready; // No need to poll. Already scheduled.
bool praw_finished = pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending;
unlock(&rc->task.mutex);

if (pni_raw_finished(raw) && !ready) {
if (praw_finished && !ready) {
// If raw connection has no more work to do and safe to free resources, do so.
praw_initiate_cleanup(rc);
} else if (ready) {
Expand Down
Loading