From 14c18efe198382d5649f7258413276775b87c83b Mon Sep 17 00:00:00 2001 From: Andy Postnikov Date: Wed, 3 Jun 2026 18:33:16 +0200 Subject: [PATCH] test(port): add IPC fault-injection coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a fault-injection harness under src/test/nxt_port_fail_test.c covering the audit-fixed IPC paths from freeunitorg/freeunit#56. Sub-tests: * socket_write — forces nxt_port_msg_alloc() failure with fd != -1 and a non-sync buf; asserts NXT_ERROR, fd stays open, completion not invoked, /proc/self/fd balanced (Linux only). * rpc_register — forces nxt_port_rpc_register_handler_ex alloc and lvlhsh-insert failures; asserts use_count == 1, rpc_streams empty. * error_handler — queues a send_msg with a closeable fd + non-sync buffer, runs nxt_port_error_handler() via an NXT_TESTS trampoline, drains the fast work queue, asserts completion enqueued (not called synchronously) and runs exactly once. * mp_baseline — mirrors the audit-fixed sender shape, forces a pre-queue failure, asserts mp->retain is unchanged. NXT_TESTS-gated trampolines for the harness: nxt_port_test_msg_alloc_ failures()/nxt_port_test_run_error_handler() (nxt_port_socket.c), nxt_port_rpc_test_alloc_failures()/_insert_failures() (nxt_port_rpc.c), nxt_mp_test_retain_count() (nxt_mp.c). Test buffers are heap-owned from a transient mp, matching production callers (nxt_cert_store_get allocates from temp_conf mp). The injected stack engine documents its invariant: the exercised error path must touch nothing beyond fast_work_queue. Also resets file.fd to -1 after nxt_fd_close on the send-fail path in nxt_cert.c and nxt_script.c — defense in depth against an accidental double-close if code is later added below the close. No behavior change. Validation: ./configure --tests && make tests -j && build/tests — all four sub-tests green. Co-Authored-By: Claude Opus 4.8 (1M context) --- auto/sources | 1 + src/nxt_cert.c | 1 + src/nxt_mp.c | 11 + src/nxt_mp.h | 10 + src/nxt_port.h | 5 + src/nxt_port_rpc.c | 55 +++ src/nxt_port_rpc.h | 6 +- src/nxt_port_socket.c | 36 ++ src/nxt_script.c | 1 + src/test/nxt_port_fail_test.c | 659 ++++++++++++++++++++++++++++++++++ src/test/nxt_tests.c | 4 + src/test/nxt_tests.h | 1 + 12 files changed, 789 insertions(+), 1 deletion(-) create mode 100644 src/test/nxt_port_fail_test.c diff --git a/auto/sources b/auto/sources index 28cdc8344..336eb15b5 100644 --- a/auto/sources +++ b/auto/sources @@ -172,6 +172,7 @@ NXT_TEST_SRCS=" \ src/test/nxt_http_parse_test.c \ src/test/nxt_strverscmp_test.c \ src/test/nxt_base64_test.c \ + src/test/nxt_port_fail_test.c \ " diff --git a/src/nxt_cert.c b/src/nxt_cert.c index f14c29343..16dc811f9 100644 --- a/src/nxt_cert.c +++ b/src/nxt_cert.c @@ -1241,6 +1241,7 @@ nxt_cert_store_get_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) * dereference it through "%FN" on a close-failure log path. */ nxt_fd_close(file.fd); + file.fd = -1; } } diff --git a/src/nxt_mp.c b/src/nxt_mp.c index 2bd8cdee4..2554e5344 100644 --- a/src/nxt_mp.c +++ b/src/nxt_mp.c @@ -305,6 +305,17 @@ nxt_mp_release(nxt_mp_t *mp) } +#if (NXT_TESTS) + +uint32_t +nxt_mp_test_retain_count(nxt_mp_t *mp) +{ + return mp->retain; +} + +#endif + + void nxt_mp_destroy(nxt_mp_t *mp) { diff --git a/src/nxt_mp.h b/src/nxt_mp.h index a5aaabd19..10e228978 100644 --- a/src/nxt_mp.h +++ b/src/nxt_mp.h @@ -52,6 +52,16 @@ NXT_EXPORT void nxt_mp_retain(nxt_mp_t *mp); */ NXT_EXPORT void nxt_mp_release(nxt_mp_t *mp); +#if (NXT_TESTS) +/* + * Returns the current retention count. Used by + * src/test/nxt_port_fail_test.c to verify the "retain after + * successful send" pattern leaves the pool's retain at its + * baseline (1) when the send fails. + */ +NXT_EXPORT uint32_t nxt_mp_test_retain_count(nxt_mp_t *mp); +#endif + /* nxt_mp_test_sizes() tests validity of memory pool parameters. */ NXT_EXPORT nxt_bool_t nxt_mp_test_sizes(size_t cluster_size, size_t page_alignment, size_t page_size, size_t min_chunk_size); diff --git a/src/nxt_port.h b/src/nxt_port.h index df61a407b..61b32e3e8 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -349,6 +349,11 @@ nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b); +#if (NXT_TESTS) +void nxt_port_test_msg_alloc_failures(nxt_uint_t failures); +void nxt_port_test_run_error_handler(nxt_task_t *task, nxt_port_t *port); +#endif + nxt_inline nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 285909335..87815faae 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -29,6 +29,39 @@ static void nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, nxt_port_rpc_reg_t *reg); +#if (NXT_TESTS) +static nxt_uint_t nxt_port_rpc_test_alloc_failure_count; +static nxt_uint_t nxt_port_rpc_test_insert_failure_count; + + +void +nxt_port_rpc_test_alloc_failures(nxt_uint_t failures) +{ + nxt_port_rpc_test_alloc_failure_count = failures; +} + + +void +nxt_port_rpc_test_insert_failures(nxt_uint_t failures) +{ + nxt_port_rpc_test_insert_failure_count = failures; +} + + +static nxt_bool_t +nxt_port_rpc_test_should_fail(nxt_uint_t *failures) +{ + if (*failures == 0) { + return 0; + } + + (*failures)--; + + return 1; +} + +#endif + nxt_int_t nxt_port_rpc_init(void) @@ -130,6 +163,16 @@ nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, stream = nxt_atomic_fetch_add(nxt_stream_ident, 1); +#if (NXT_TESTS) + if (nxt_slow_path(nxt_port_rpc_test_should_fail( + &nxt_port_rpc_test_alloc_failure_count))) + { + nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); + + return NULL; + } +#endif + reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); if (nxt_slow_path(reg == NULL)) { @@ -149,6 +192,18 @@ nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, lhq.value = reg; lhq.pool = port->mem_pool; +#if (NXT_TESTS) + if (nxt_slow_path(nxt_port_rpc_test_should_fail( + &nxt_port_rpc_test_insert_failure_count))) + { + nxt_debug(task, "rpc: stream #%uD failed to add reg", stream); + + nxt_mp_free(port->mem_pool, reg); + + return NULL; + } +#endif + switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) { case NXT_OK: diff --git a/src/nxt_port_rpc.h b/src/nxt_port_rpc.h index c07683fb0..8218c4e3a 100644 --- a/src/nxt_port_rpc.h +++ b/src/nxt_port_rpc.h @@ -20,6 +20,11 @@ void *nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, size_t ex_size); +#if (NXT_TESTS) +void nxt_port_rpc_test_alloc_failures(nxt_uint_t failures); +void nxt_port_rpc_test_insert_failures(nxt_uint_t failures); +#endif + uint32_t nxt_port_rpc_ex_stream(void *ex); void nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, void *ex, nxt_pid_t peer); @@ -32,4 +37,3 @@ void nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port); #endif /* _NXT_PORT_RPC_H_INCLUDED_ */ - diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 5752d5ab0..0da4bc54f 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -37,6 +37,35 @@ static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); +#if (NXT_TESTS) +static nxt_uint_t nxt_port_test_msg_alloc_failure_count; + + +void +nxt_port_test_msg_alloc_failures(nxt_uint_t failures) +{ + nxt_port_test_msg_alloc_failure_count = failures; +} + + +/* + * Public wrapper that lets src/test/nxt_port_fail_test.c invoke the + * static nxt_port_error_handler() directly with a synthesised port + * and queued message — used to verify that the "queued, then write + * failed" cleanup matches the ordering the cert/script/socket reply + * paths now mirror after the audit fix (close fd first, queue buffer + * completion second). Pass NULL for `data` so use_delta does not + * decrement for the "obj == data" case — the test owns the port + * reference and releases it explicitly. + */ +void +nxt_port_test_run_error_handler(nxt_task_t *task, nxt_port_t *port) +{ + nxt_port_error_handler(task, &port->socket, NULL); +} + +#endif + nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) @@ -336,6 +365,13 @@ nxt_port_msg_alloc(const nxt_port_send_msg_t *m) { nxt_port_send_msg_t *msg; +#if (NXT_TESTS) + if (nxt_slow_path(nxt_port_test_msg_alloc_failure_count != 0)) { + nxt_port_test_msg_alloc_failure_count--; + return NULL; + } +#endif + msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { return NULL; diff --git a/src/nxt_script.c b/src/nxt_script.c index 41615d0a6..4df011d35 100644 --- a/src/nxt_script.c +++ b/src/nxt_script.c @@ -606,6 +606,7 @@ nxt_script_store_get_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) * log path. */ nxt_fd_close(file.fd); + file.fd = -1; } } diff --git a/src/test/nxt_port_fail_test.c b/src/test/nxt_port_fail_test.c new file mode 100644 index 000000000..1e1722ba3 --- /dev/null +++ b/src/test/nxt_port_fail_test.c @@ -0,0 +1,659 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +#include +#include +#include +#include +#include "nxt_tests.h" + +#if (NXT_LINUX) +#include +#endif + + +static nxt_port_t *nxt_port_fail_test_port(nxt_task_t *task); +static nxt_int_t nxt_port_fail_test_socket_write(nxt_thread_t *thr); +static nxt_int_t nxt_port_fail_test_rpc_register(nxt_thread_t *thr); +static nxt_int_t nxt_port_fail_test_error_handler(nxt_thread_t *thr); +static nxt_int_t nxt_port_fail_test_mp_baseline(nxt_thread_t *thr); +static nxt_int_t nxt_port_fail_test_sender_pattern(nxt_task_t *task, + nxt_port_t *port, nxt_mp_t *mp); +static void nxt_port_fail_test_mp_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_port_fail_test_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_port_fail_test_drain_wq(nxt_work_queue_t *wq); +static nxt_bool_t nxt_port_fail_test_fd_is_open(nxt_fd_t fd); +static nxt_int_t nxt_port_fail_test_fd_count(void); + + +static nxt_uint_t nxt_port_fail_test_completions; + + +nxt_int_t +nxt_port_fail_test(nxt_thread_t *thr) +{ + nxt_thread_time_update(thr); + nxt_log_error(NXT_LOG_NOTICE, thr->log, "port failure test started"); + + if (nxt_port_fail_test_socket_write(thr) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_port_fail_test_rpc_register(thr) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_port_fail_test_error_handler(thr) != NXT_OK) { + return NXT_ERROR; + } + + if (nxt_port_fail_test_mp_baseline(thr) != NXT_OK) { + return NXT_ERROR; + } + + nxt_thread_time_update(thr); + nxt_log_error(NXT_LOG_NOTICE, thr->log, "port failure test passed"); + + return NXT_OK; +} + + +static nxt_port_t * +nxt_port_fail_test_port(nxt_task_t *task) +{ + nxt_port_t *port; + + port = nxt_port_new(task, 1, nxt_pid, NXT_PROCESS_MAIN); + + if (nxt_slow_path(port == NULL)) { + return NULL; + } + + port->pair[0] = -1; + port->pair[1] = -1; + port->socket.fd = -1; + + return port; +} + + +static nxt_int_t +nxt_port_fail_test_socket_write(nxt_thread_t *thr) +{ + nxt_mp_t *mp; + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_task_t *task; + nxt_port_t *port; + nxt_int_t before, after_open, after_fail, after_close; + + task = thr->task; + task->thread = thr; + + /* + * Per Gemini's PR #57 review feedback: allocate the test buffer + * from a transient mp rather than the stack — keeps the buf + * lifetime tied to a heap object, so a future change that lets + * the port layer access it asynchronously cannot UAF the stack + * frame. The mp is destroyed below on every exit path. + */ + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; + } + + port = nxt_port_fail_test_port(task); + if (nxt_slow_path(port == NULL)) { + nxt_mp_destroy(mp); + return NXT_ERROR; + } + + fd = -1; + + before = nxt_port_fail_test_fd_count(); + + fd = open("/dev/null", O_RDONLY); + if (fd == -1) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test failed to open /dev/null"); + goto fail; + } + + after_open = nxt_port_fail_test_fd_count(); + + buf = nxt_buf_mem_alloc(mp, 1, 0); + if (nxt_slow_path(buf == NULL)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test failed to allocate buf"); + goto fail_close_fd; + } + + buf->completion_handler = nxt_port_fail_test_completion; + + nxt_port_fail_test_completions = 0; + nxt_port_test_msg_alloc_failures(1); + + if (nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA + | NXT_PORT_MSG_CLOSE_FD, fd, 1, 0, buf) + != NXT_ERROR) + { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test expected socket write failure"); + goto fail_close_fd; + } + + nxt_port_test_msg_alloc_failures(0); + + if (!nxt_port_fail_test_fd_is_open(fd)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test closed fd before ownership transfer"); + goto fail_close_port; + } + + if (nxt_port_fail_test_completions != 0) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test completed unsent buffer"); + goto fail_close_fd; + } + + after_fail = nxt_port_fail_test_fd_count(); + + nxt_fd_close(fd); + fd = -1; + + after_close = nxt_port_fail_test_fd_count(); + + if (before >= 0 + && (after_open != before + 1 || after_fail != after_open + || after_close != before)) + { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test fd count mismatch: %d %d %d %d", + before, after_open, after_fail, after_close); + goto fail_close_port; + } + + nxt_port_use(task, port, -1); + nxt_mp_destroy(mp); + + return NXT_OK; + +fail_close_fd: + + if (fd != -1 && nxt_port_fail_test_fd_is_open(fd)) { + nxt_fd_close(fd); + } + +fail_close_port: + + nxt_port_test_msg_alloc_failures(0); + +fail: + + nxt_port_use(task, port, -1); + nxt_mp_destroy(mp); + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_port_fail_test_rpc_register(nxt_thread_t *thr) +{ + void *ex; + nxt_task_t *task; + nxt_port_t *port; + + task = thr->task; + task->thread = thr; + + if (nxt_port_rpc_init() != NXT_OK) { + return NXT_ERROR; + } + + port = nxt_port_fail_test_port(task); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + port->pair[0] = 0; + + nxt_port_rpc_test_alloc_failures(1); + + ex = nxt_port_rpc_register_handler_ex(task, port, NULL, NULL, 0); + if (ex != NULL) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test expected rpc alloc failure"); + goto fail; + } + + if (port->use_count != 1 || !nxt_lvlhsh_is_empty(&port->rpc_streams)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test leaked failed rpc alloc"); + goto fail; + } + + nxt_port_rpc_test_insert_failures(1); + + ex = nxt_port_rpc_register_handler_ex(task, port, NULL, NULL, 0); + if (ex != NULL) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test expected rpc insert failure"); + goto fail; + } + + if (port->use_count != 1 || !nxt_lvlhsh_is_empty(&port->rpc_streams)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test leaked failed rpc insert"); + goto fail; + } + + port->pair[0] = -1; + nxt_port_use(task, port, -1); + + return NXT_OK; + +fail: + + nxt_port_rpc_test_alloc_failures(0); + nxt_port_rpc_test_insert_failures(0); + + port->pair[0] = -1; + nxt_port_use(task, port, -1); + + return NXT_ERROR; +} + + +/* + * Verify the "queued, then write failed" cleanup path inside + * nxt_port_error_handler() — the reference behaviour that the + * cert/script/socket/access-log reply paths now mirror after the + * audit fix (close fd first, queue buffer completion second). A + * synthesised port + send_msg + buf are pushed into port->messages, + * then nxt_port_test_run_error_handler() is invoked and the + * resulting fast_work_queue is drained manually so the completion + * handler runs. Asserts: + * + * - the queued message is removed from port->messages, + * - the close_fd-marked fd is actually closed, + * - the buffer completion runs exactly once. + */ +static nxt_int_t +nxt_port_fail_test_error_handler(nxt_thread_t *thr) +{ + nxt_mp_t *mp; + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_task_t *task; + nxt_port_t *port; + nxt_event_engine_t engine; + nxt_port_send_msg_t *msg; + + task = thr->task; + task->thread = thr; + + /* + * The minimal engine the test injects so nxt_port_error_handler + * can deref task->thread->engine->fast_work_queue. Only the + * fast_work_queue + its cache need to be initialised. + * + * Invariant: the error path exercised here must touch nothing on + * the engine beyond fast_work_queue (and the port-owned + * write_mutex). Every other field is left zeroed, so any future + * code that dereferences another engine member would read garbage + * here -- extend this initialisation (or use a real engine) before + * relying on it. + */ + nxt_memzero(&engine, sizeof(engine)); + nxt_work_queue_cache_create(&engine.work_queue_cache, 1024); + engine.fast_work_queue.cache = &engine.work_queue_cache; + nxt_work_queue_name(&engine.fast_work_queue, "fast"); + + thr->engine = &engine; + + /* + * Test mp owns the buf so that the buffer outlives the test's + * stack frame even though the completion handler runs + * asynchronously via the work queue (see Gemini PR #57 review). + * Destroyed on every exit path below. + */ + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + goto fail_engine; + } + + port = nxt_port_fail_test_port(task); + if (nxt_slow_path(port == NULL)) { + goto fail_mp; + } + + /* + * Bump use_count so the per-msg use_delta-- inside + * nxt_port_error_handler() does not drive the port to zero + * before we have inspected its state. Released explicitly + * below. + */ + port->use_count = 2; + + fd = -1; + msg = NULL; + + fd = open("/dev/null", O_RDONLY); + if (fd == -1) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test failed to open /dev/null"); + goto fail_port; + } + + buf = nxt_buf_mem_alloc(mp, 1, 0); + if (nxt_slow_path(buf == NULL)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test failed to allocate buf"); + goto fail_port; + } + + buf->completion_handler = nxt_port_fail_test_completion; + + /* + * nxt_port_release_send_msg() free()s msg only when ->allocated + * is set, so we mirror what nxt_port_msg_alloc() would do for a + * heap-allocated message: nxt_malloc + ->allocated = 1. + */ + msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); + if (msg == NULL) { + goto fail_port; + } + + nxt_memzero(msg, sizeof(*msg)); + msg->allocated = 1; + msg->close_fd = 1; + msg->fd[0] = fd; + msg->fd[1] = -1; + msg->buf = buf; + + nxt_queue_insert_tail(&port->messages, &msg->link); + + nxt_port_fail_test_completions = 0; + + nxt_port_test_run_error_handler(task, port); + + /* + * msg was freed inside nxt_port_release_send_msg(); take the + * ownership pointer off so the failure label does not free it + * again. + */ + msg = NULL; + + if (!nxt_queue_is_empty(&port->messages)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test left a queued message after " + "error_handler"); + goto fail_port; + } + + if (nxt_port_fail_test_fd_is_open(fd)) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test did not close the queued fd"); + goto fail_port; + } + + /* fd is closed; clear so the failure label does not re-close. */ + fd = -1; + + /* Buffer completion was enqueued, not invoked synchronously. */ + if (nxt_port_fail_test_completions != 0) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test invoked completion synchronously"); + goto fail_port; + } + + nxt_port_fail_test_drain_wq(&engine.fast_work_queue); + + if (nxt_port_fail_test_completions != 1) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "port failure test completion count: %d (want 1)", + (int) nxt_port_fail_test_completions); + goto fail_port; + } + + nxt_port_use(task, port, -1); + nxt_mp_destroy(mp); + + nxt_work_queue_cache_destroy(&engine.work_queue_cache); + thr->engine = NULL; + + return NXT_OK; + +fail_port: + + if (msg != NULL) { + nxt_queue_remove(&msg->link); + nxt_free(msg); + } + if (fd != -1 && nxt_port_fail_test_fd_is_open(fd)) { + nxt_fd_close(fd); + } + nxt_port_use(task, port, -1); + +fail_mp: + + nxt_mp_destroy(mp); + +fail_engine: + + nxt_work_queue_cache_destroy(&engine.work_queue_cache); + thr->engine = NULL; + + return NXT_ERROR; +} + + +/* + * Verify the mp-refcount invariant from phpclub's #56 review: when + * nxt_port_socket_write() fails before the buffer is handed off to + * the port machinery, the temp config mp's retain count stays at + * its baseline (1). This regression-checks the audit fix in + * src/nxt_cert.c / src/nxt_script.c, which moved `nxt_mp_retain(mp)` + * to AFTER the successful socket_write. + * + * The helper nxt_port_fail_test_sender_pattern() is a deliberate + * copy of the fixed sender shape: allocate the buf in `mp`, send, + * retain only on success. If a future change moves the retain back + * above socket_write (the bug), an injected msg_alloc failure + * leaves retain at 2 and this test fails. + */ +static nxt_int_t +nxt_port_fail_test_mp_baseline(nxt_thread_t *thr) +{ + nxt_mp_t *mp; + nxt_int_t res; + nxt_task_t *task; + nxt_port_t *port; + uint32_t retain_before, retain_after; + + task = thr->task; + task->thread = thr; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NXT_ERROR; + } + + retain_before = nxt_mp_test_retain_count(mp); + + port = nxt_port_fail_test_port(task); + if (nxt_slow_path(port == NULL)) { + nxt_mp_destroy(mp); + return NXT_ERROR; + } + + nxt_port_fail_test_completions = 0; + nxt_port_test_msg_alloc_failures(1); + + res = nxt_port_fail_test_sender_pattern(task, port, mp); + + nxt_port_test_msg_alloc_failures(0); + + if (res != NXT_ERROR) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "mp baseline test: sender expected failure return"); + goto fail; + } + + retain_after = nxt_mp_test_retain_count(mp); + + if (retain_after != retain_before) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "mp baseline test: retain before %uD, after %uD " + "(retain leaked across failed send)", + retain_before, retain_after); + goto fail; + } + + /* + * Buf-completion handler must not have run: the buf never + * entered the port queue. + */ + if (nxt_port_fail_test_completions != 0) { + nxt_log_error(NXT_LOG_NOTICE, thr->log, + "mp baseline test: completion ran for unsent buf"); + goto fail; + } + + nxt_port_use(task, port, -1); + nxt_mp_destroy(mp); + + return NXT_OK; + +fail: + + nxt_port_use(task, port, -1); + nxt_mp_destroy(mp); + + return NXT_ERROR; +} + + +/* + * Mirror of the post-audit nxt_cert_store_get sender shape: + * - allocate buf in the caller-supplied temp_conf mp; + * - attempt socket_write; + * - retain mp ONLY after the write succeeded. + * + * The completion handler released by the buf — when it eventually + * runs — would call nxt_mp_release(mp); we do not exercise that + * branch here since the test forces a pre-queue failure. See + * nxt_port_fail_test_mp_baseline(). + */ +static nxt_int_t +nxt_port_fail_test_sender_pattern(nxt_task_t *task, nxt_port_t *port, + nxt_mp_t *mp) +{ + nxt_buf_t *b; + nxt_int_t res; + + b = nxt_buf_mem_alloc(mp, 16, 0); + if (b == NULL) { + return NXT_ERROR; + } + + b->completion_handler = nxt_port_fail_test_mp_completion; + b->parent = mp; + + res = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 1, 0, b); + if (res != NXT_OK) { + return NXT_ERROR; + } + + /* + * Retain only after the buffer has been handed off to the port + * machinery — matches the fix in nxt_cert_store_get / + * nxt_script_store_get. + */ + nxt_mp_retain(mp); + + return NXT_OK; +} + + +static void +nxt_port_fail_test_mp_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp = data; + + nxt_port_fail_test_completions++; + + if (mp != NULL) { + nxt_mp_release(mp); + } +} + + +static void +nxt_port_fail_test_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_port_fail_test_completions++; +} + + +static void +nxt_port_fail_test_drain_wq(nxt_work_queue_t *wq) +{ + void *obj, *data; + nxt_task_t *t; + nxt_work_handler_t handler; + + while (wq->head != NULL) { + handler = nxt_work_queue_pop(wq, &t, &obj, &data); + handler(t, obj, data); + } +} + + +static nxt_bool_t +nxt_port_fail_test_fd_is_open(nxt_fd_t fd) +{ + return fcntl(fd, F_GETFD) != -1; +} + + +static nxt_int_t +nxt_port_fail_test_fd_count(void) +{ +#if (NXT_LINUX) + nxt_int_t count; + DIR *dir; + struct dirent *de; + + dir = opendir("/proc/self/fd"); + if (dir == NULL) { + return -1; + } + + count = 0; + + for ( ;; ) { + de = readdir(dir); + + if (de == NULL) { + break; + } + + if (nxt_strcmp(de->d_name, ".") != 0 + && nxt_strcmp(de->d_name, "..") != 0) + { + count++; + } + } + + (void) closedir(dir); + + return count; +#else + return -1; +#endif +} diff --git a/src/test/nxt_tests.c b/src/test/nxt_tests.c index 03a2a1df4..636363ad1 100644 --- a/src/test/nxt_tests.c +++ b/src/test/nxt_tests.c @@ -166,6 +166,10 @@ main(int argc, char **argv) return 1; } + if (nxt_port_fail_test(thr) != NXT_OK) { + return 1; + } + #if (NXT_HAVE_CLONE_NEWUSER) if (nxt_clone_creds_test(thr) != NXT_OK) { return 1; diff --git a/src/test/nxt_tests.h b/src/test/nxt_tests.h index 463dc8519..55b46357f 100644 --- a/src/test/nxt_tests.h +++ b/src/test/nxt_tests.h @@ -65,6 +65,7 @@ nxt_int_t nxt_utf8_test(nxt_thread_t *thr); nxt_int_t nxt_http_parse_test(nxt_thread_t *thr); nxt_int_t nxt_strverscmp_test(nxt_thread_t *thr); nxt_int_t nxt_base64_test(nxt_thread_t *thr); +nxt_int_t nxt_port_fail_test(nxt_thread_t *thr); nxt_int_t nxt_clone_creds_test(nxt_thread_t *thr);