From 43df11f671d0a5657be09392b2ced5bdd07ead10 Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Tue, 26 May 2026 16:11:58 +0000 Subject: [PATCH 1/5] feat: experimental cross-process jobserver via POSIX semaphore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds an opt-in cross-process parallelism limit to the Lean runtime. When `LEAN_JOB_SEMAPHORE=/name` points at a POSIX named semaphore, `task_manager`'s standard workers acquire a token before running a task and release it after, so the total number of concurrently running standard workers across all participating processes is bounded by the semaphore's initial value. When the env var is unset, behavior is unchanged and there is no overhead. `Task.get` releases its token while blocked and reacquires before resuming, so a worker waiting on a sub-task cannot starve the global pool. Dedicated workers (priority above `LEAN_MAX_PRIO`) and `LEAN_SYNC_PRIO` tasks bypass the worker loop and so do not consume tokens. This is intended for experimentation, not production: Linux and macOS only (Windows is a no-op), no `MAKEFLAGS` parsing, no crash-recovery for tokens leaked by killed processes, and no Lake integration — callers must create and destroy the semaphore themselves. --- src/runtime/object.cpp | 75 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index 426357b607f2..b8233be7f1f1 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -28,6 +28,17 @@ Author: Leonardo de Moura #define LEAN_SUPPORTS_BACKTRACE 0 #endif +// POSIX named semaphores for the experimental cross-process jobserver +// (env var `LEAN_JOB_SEMAPHORE=/name`). Linux + macOS only. +#if !defined(_WIN32) && !defined(LEAN_EMSCRIPTEN) + #define LEAN_JOBSERVER_POSIX 1 + #include + #include + #include +#else + #define LEAN_JOBSERVER_POSIX 0 +#endif + #if LEAN_SUPPORTS_BACKTRACE #include #include @@ -719,6 +730,35 @@ class task_manager { condition_variable m_task_finished_cv; condition_variable m_dedicated_finished_cv; bool m_shutting_down{false}; +#if LEAN_JOBSERVER_POSIX + // Optional cross-process token pool. When non-null, standard workers must + // acquire a token before running a task and release it after, so that the + // total number of concurrently running standard workers across all + // processes sharing the semaphore does not exceed the initial count. + sem_t * m_jobserver_sem{nullptr}; +#endif + + // Block until a jobserver token is available; no-op when no semaphore is + // configured. Must be called with `lock` held; temporarily releases it. + void acquire_token(unique_lock & lock) { +#if LEAN_JOBSERVER_POSIX + if (!m_jobserver_sem) return; + lock.unlock(); + while (sem_wait(m_jobserver_sem) != 0 && errno == EINTR) {} + lock.lock(); +#else + (void)lock; +#endif + } + + // Release a previously acquired token; no-op when no semaphore is + // configured. Safe to call without holding `m_mutex`. + void release_token() { +#if LEAN_JOBSERVER_POSIX + if (!m_jobserver_sem) return; + sem_post(m_jobserver_sem); +#endif + } lean_task_object * dequeue() { lean_assert(m_queues_size != 0); @@ -808,9 +848,23 @@ class task_manager { continue; } + // Acquire a jobserver token (drops the mutex while blocked). + // After waking we must re-check conditions, because the queue + // may have been drained or shutdown may have been requested. + acquire_token(lock); +#if LEAN_JOBSERVER_POSIX + if (m_jobserver_sem && + (m_queues_size == 0 || m_shutting_down || + m_std_workers.size() - m_idle_std_workers >= m_max_std_workers)) { + release_token(); + continue; + } +#endif + lean_task_object * t = dequeue(); m_idle_std_workers--; run_task(lock, t); + release_token(); m_idle_std_workers++; reset_heartbeat(); } @@ -914,6 +968,14 @@ class task_manager { public: task_manager(unsigned max_std_workers): m_max_std_workers(max_std_workers) { +#if LEAN_JOBSERVER_POSIX + if (char const * name = std::getenv("LEAN_JOB_SEMAPHORE")) { + sem_t * s = sem_open(name, 0); + if (s != SEM_FAILED) { + m_jobserver_sem = s; + } + } +#endif } ~task_manager() { @@ -931,6 +993,12 @@ class task_manager { unique_lock lock(m_mutex); m_dedicated_finished_cv.wait(lock, [&]() { return m_num_dedicated_workers == 0; }); // never seems to terminate under Emscripten +#endif +#if LEAN_JOBSERVER_POSIX + if (m_jobserver_sem) { + sem_close(m_jobserver_sem); + m_jobserver_sem = nullptr; + } #endif } @@ -986,10 +1054,17 @@ class task_manager { spawn_worker(); else m_queue_cv.notify_one(); + // Give back our jobserver token so another worker (in this or + // another process) can run while we are blocked. We reacquire it + // before resuming work below. + lock.unlock(); + release_token(); + lock.lock(); } m_task_finished_cv.wait(lock, [&]() { return t->m_value != nullptr; }); if (in_pool) { m_max_std_workers--; + acquire_token(lock); } } From fd71e6cfc54c88271e5f7621a802aeb6d9843fe9 Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Tue, 26 May 2026 17:21:31 +0000 Subject: [PATCH 2/5] feat: auto-create jobserver semaphore for child processes This PR makes the experimental jobserver self-bootstrapping: when no `LEAN_JOB_SEMAPHORE` is set in the environment, `task_manager` now creates a fresh named semaphore (`/lean-jobs-`) sized to `max_std_workers`, exports the name via `LEAN_JOB_SEMAPHORE` so child processes inherit it, and `sem_unlink`s on exit. `LEAN_JOB_SEMAPHORE_AUTO=N` overrides the size. The creating process does not gate its own workers against the semaphore. The creator is typically an orchestrator (e.g. `lake`) whose workers block on subprocesses; gating it would consume tokens that its child `lean` processes need, deadlocking the pool. Together with the previous patch this means `lake build` participates in cross-process parallelism limiting with no command-line changes. --- src/runtime/object.cpp | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index b8233be7f1f1..ec6bbd6ec990 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -29,16 +29,32 @@ Author: Leonardo de Moura #endif // POSIX named semaphores for the experimental cross-process jobserver -// (env var `LEAN_JOB_SEMAPHORE=/name`). Linux + macOS only. +// (env var `LEAN_JOB_SEMAPHORE=/name`, or `LEAN_JOB_SEMAPHORE_AUTO=N` to +// auto-create one and propagate it to children). Linux + macOS only. #if !defined(_WIN32) && !defined(LEAN_EMSCRIPTEN) #define LEAN_JOBSERVER_POSIX 1 #include #include #include + #include + #include #else #define LEAN_JOBSERVER_POSIX 0 #endif +#if LEAN_JOBSERVER_POSIX +// Name of a semaphore created by this process; unlinked on exit. +static char * g_owned_sem_name = nullptr; + +static void unlink_owned_sem() { + if (g_owned_sem_name) { + sem_unlink(g_owned_sem_name); + free(g_owned_sem_name); + g_owned_sem_name = nullptr; + } +} +#endif + #if LEAN_SUPPORTS_BACKTRACE #include #include @@ -970,10 +986,34 @@ class task_manager { m_max_std_workers(max_std_workers) { #if LEAN_JOBSERVER_POSIX if (char const * name = std::getenv("LEAN_JOB_SEMAPHORE")) { + // Attach as a participant in an existing jobserver. sem_t * s = sem_open(name, 0); if (s != SEM_FAILED) { m_jobserver_sem = s; } + } else { + // No jobserver set up yet; create one with `max_std_workers` + // slots (overridable via `LEAN_JOB_SEMAPHORE_AUTO=N`) and hand it + // to children via env. Do NOT gate this process: the creator is + // typically an orchestrator (e.g. `lake`) whose workers block on + // subprocesses, and gating it would deadlock the pool. + int count = (int)max_std_workers; + if (char const * auto_n = std::getenv("LEAN_JOB_SEMAPHORE_AUTO")) { + count = std::atoi(auto_n); + } + if (count > 0) { + char buf[64]; + std::snprintf(buf, sizeof buf, "/lean-jobs-%d", (int)getpid()); + sem_unlink(buf); + sem_t * s = sem_open(buf, O_CREAT | O_EXCL, 0600, (unsigned)count); + if (s != SEM_FAILED) { + sem_close(s); // the named semaphore persists until unlink + setenv("LEAN_JOB_SEMAPHORE", buf, 1); + unsetenv("LEAN_JOB_SEMAPHORE_AUTO"); + g_owned_sem_name = strdup(buf); + std::atexit(unlink_owned_sem); + } + } } #endif } From 046409efc8813ad14830829081587623d090b95c Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Tue, 26 May 2026 20:21:09 +0000 Subject: [PATCH 3/5] feat: route in-process jobserver releases to blocked `wait_for` waiters This PR avoids a thread-count cascade that the previous prototype provoked under heavy parallel elaboration. The earlier design released and re-acquired tokens through the global semaphore at every `Task.get` boundary; the blocking `sem_wait` on the re-acquire side let further `Task.get` calls inflate `m_max_std_workers` and spawn additional workers, multiplying the live OS thread count and tripping "failed to create thread" under `RLIMIT_NPROC`. In the new design, when a worker calls `Task.get`, it `sem_post`s its token globally so a sibling can pick up the blocked sub-task, then waits. Sibling `release_token` calls in the same process check whether a `Task.get` is actively waiting (registered on `m_parked_cv` after its `m_task_finished_cv.wait` returns) and, if so, hand the freed token directly to that waiter via `m_parked_cv` instead of `sem_post`. The waiter wakes without a blocking `sem_wait`, so the cascade cannot form. Excess releases (`m_parked_tokens >= m_parked_waiters`) still flow back to the global semaphore, so tokens aren't hoarded. Counting waiters only *after* `m_task_finished_cv.wait` is essential: counting them before would route releases to a pool nobody is listening on, starving the global semaphore and deadlocking workers that are blocked in `sem_wait`. --- src/runtime/object.cpp | 56 +++++++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index ec6bbd6ec990..6a114cf86765 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -752,10 +752,20 @@ class task_manager { // total number of concurrently running standard workers across all // processes sharing the semaphore does not exceed the initial count. sem_t * m_jobserver_sem{nullptr}; + // In-process direct handoff for `wait_for` reclaim. When a `wait_for` + // releases its token globally before blocking, it bumps `m_parked_waiters`; + // subsequent `release_token` calls from sibling workers route the freed + // token here (`m_parked_tokens++`) instead of `sem_post`, so the + // `wait_for` wakes via `m_parked_cv` without a blocking `sem_wait`. + // Excess (parked >= waiters) flows back to the global semaphore so the + // global pool isn't hoarded. + unsigned m_parked_tokens{0}; + unsigned m_parked_waiters{0}; + condition_variable m_parked_cv; #endif - // Block until a jobserver token is available; no-op when no semaphore is - // configured. Must be called with `lock` held; temporarily releases it. + // Acquire a token before running a task. Blocks on the global semaphore. + // Must be called with `lock` held; temporarily releases it. void acquire_token(unique_lock & lock) { #if LEAN_JOBSERVER_POSIX if (!m_jobserver_sem) return; @@ -767,12 +777,19 @@ class task_manager { #endif } - // Release a previously acquired token; no-op when no semaphore is - // configured. Safe to call without holding `m_mutex`. + // Release the token currently held by this worker. If a `wait_for` in + // this process is currently waiting for its token back, hand it directly + // via the parked-pool cv; otherwise return it to the global semaphore. + // Must be called with `lock` held. void release_token() { #if LEAN_JOBSERVER_POSIX if (!m_jobserver_sem) return; - sem_post(m_jobserver_sem); + if (m_parked_tokens < m_parked_waiters) { + m_parked_tokens++; + m_parked_cv.notify_one(); + } else { + sem_post(m_jobserver_sem); + } #endif } @@ -1094,17 +1111,32 @@ class task_manager { spawn_worker(); else m_queue_cv.notify_one(); - // Give back our jobserver token so another worker (in this or - // another process) can run while we are blocked. We reacquire it - // before resuming work below. - lock.unlock(); - release_token(); - lock.lock(); +#if LEAN_JOBSERVER_POSIX + // Release our token globally so a sibling worker (in this or + // another process) can pick up the sub-task. + if (m_jobserver_sem) { + sem_post(m_jobserver_sem); + } +#endif } m_task_finished_cv.wait(lock, [&]() { return t->m_value != nullptr; }); if (in_pool) { m_max_std_workers--; - acquire_token(lock); +#if LEAN_JOBSERVER_POSIX + if (m_jobserver_sem) { + // Now register as a parked waiter; subsequent in-process + // `release_token` calls route freed tokens here instead of + // `sem_post`, so we can wake without a blocking `sem_wait`. + // Counting waiters only after the `m_task_finished_cv` wait + // is critical: if we incremented before, releases would + // route to a pool we're not yet listening on, starving the + // global semaphore and deadlocking workers in `sem_wait`. + m_parked_waiters++; + while (m_parked_tokens == 0) m_parked_cv.wait(lock); + m_parked_tokens--; + m_parked_waiters--; + } +#endif } } From f3c3b987910134ffae7c38f9c85ea96be8d67e52 Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Wed, 27 May 2026 07:08:57 +0000 Subject: [PATCH 4/5] fix: recover from `release_token`/`wait_for` reclaim race in jobserver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR fixes a deadlock observed during a stage2 build of Lean and at sem=1 in nested `Task.get` smoke tests. The previous patch routed a freed token to the parked pool only when `m_parked_waiters > 0`, and counted the waiter only after `m_task_finished_cv.wait` returned. But the worker that resolves the sub-task holds the lock continuously through `resolve_core` and `release_token`, so the waiter cannot increment `m_parked_waiters` in between — the release always sees `waiters == 0` and `sem_post`s globally instead. The waiter then woke up, found `m_parked_tokens == 0`, and blocked on `m_parked_cv` forever because no further `release_token` was coming. On wake-up, the waiter now tries the parked pool first (in case another in-process release happened to route there), then attempts a non-blocking `sem_trywait` to recover a token the racing release sent to the global semaphore. Only when both fail does it register as a parked waiter and block on `m_parked_cv`. This handles the race without widening the lock scope or changing the waiter-counting policy. --- src/runtime/object.cpp | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index 6a114cf86765..bf7d113646d7 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -1124,17 +1124,23 @@ class task_manager { m_max_std_workers--; #if LEAN_JOBSERVER_POSIX if (m_jobserver_sem) { - // Now register as a parked waiter; subsequent in-process - // `release_token` calls route freed tokens here instead of - // `sem_post`, so we can wake without a blocking `sem_wait`. - // Counting waiters only after the `m_task_finished_cv` wait - // is critical: if we incremented before, releases would - // route to a pool we're not yet listening on, starving the - // global semaphore and deadlocking workers in `sem_wait`. - m_parked_waiters++; - while (m_parked_tokens == 0) m_parked_cv.wait(lock); - m_parked_tokens--; - m_parked_waiters--; + // The worker that resolved our sub-task may have already + // run its `release_token` before we got here (the resolver + // holds the lock continuously through `resolve_core` and + // `release_token`, so we can't be scheduled in between). + // If that release saw `m_parked_waiters == 0`, the token + // went to the global semaphore — recover it with a + // non-blocking `sem_trywait` first. Only if neither pool + // has a token waiting do we register as a parked waiter + // and block on `m_parked_cv`. + if (m_parked_tokens > 0) { + m_parked_tokens--; + } else if (sem_trywait(m_jobserver_sem) != 0) { + m_parked_waiters++; + while (m_parked_tokens == 0) m_parked_cv.wait(lock); + m_parked_tokens--; + m_parked_waiters--; + } } #endif } From fefdfc26352c8859bb5207d3851a1426a3adc37e Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Wed, 27 May 2026 08:39:07 +0000 Subject: [PATCH 5/5] fix: avoid blocking `sem_wait` in `wait_for` reclaim, accept brief oversubscription This PR replaces the parked-pool + `sem_trywait`-fallback design with a simpler approach: when `wait_for` cannot reclaim a token non-blockingly, the worker continues running its task un-gated rather than blocking in `sem_wait`. A thread-local `g_holds_token` flag tracks whether the current worker actually has a token; the worker-loop's `release_token` skips its `sem_post` when the flag is false, keeping per-worker token accounting balanced. The previous design either deadlocked (when the in-process parked-pool notification couldn't reach a token that had been taken by another process) or risked re-introducing the original thread-explosion cascade (when the `sem_trywait` fallback hit blocking `sem_wait` under contention). The new design avoids both: no blocking call in `wait_for`'s reclaim, so the cascade can't form; and no in-process-only wakeup, so cross-process token freeing isn't missed. The cost is brief inter-process oversubscription: while a worker runs un-gated, the global cap is exceeded by one. This is bounded per worker by the depth of nested `Task.get` and clears as soon as the worker finishes its current task. --- src/runtime/object.cpp | 70 ++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index bf7d113646d7..1500c3b64884 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -53,6 +53,14 @@ static void unlink_owned_sem() { g_owned_sem_name = nullptr; } } + +// Per-worker-thread flag: does this thread currently hold a jobserver token? +// Set by `acquire_token` / `wait_for`'s reclaim, cleared by `release_token` +// or by `wait_for`'s release. When `wait_for` cannot reclaim a token +// non-blockingly, the worker continues running its task un-gated and this +// flag stays false; the worker-loop's `release_token` then skips its +// `sem_post`, keeping per-worker token accounting balanced. +static thread_local bool g_holds_token = false; #endif #if LEAN_SUPPORTS_BACKTRACE @@ -752,16 +760,6 @@ class task_manager { // total number of concurrently running standard workers across all // processes sharing the semaphore does not exceed the initial count. sem_t * m_jobserver_sem{nullptr}; - // In-process direct handoff for `wait_for` reclaim. When a `wait_for` - // releases its token globally before blocking, it bumps `m_parked_waiters`; - // subsequent `release_token` calls from sibling workers route the freed - // token here (`m_parked_tokens++`) instead of `sem_post`, so the - // `wait_for` wakes via `m_parked_cv` without a blocking `sem_wait`. - // Excess (parked >= waiters) flows back to the global semaphore so the - // global pool isn't hoarded. - unsigned m_parked_tokens{0}; - unsigned m_parked_waiters{0}; - condition_variable m_parked_cv; #endif // Acquire a token before running a task. Blocks on the global semaphore. @@ -772,23 +770,22 @@ class task_manager { lock.unlock(); while (sem_wait(m_jobserver_sem) != 0 && errno == EINTR) {} lock.lock(); + g_holds_token = true; #else (void)lock; #endif } - // Release the token currently held by this worker. If a `wait_for` in - // this process is currently waiting for its token back, hand it directly - // via the parked-pool cv; otherwise return it to the global semaphore. - // Must be called with `lock` held. + // Release the token currently held by this worker to the global semaphore, + // if any. A `wait_for` that couldn't reclaim its token non-blockingly + // continues un-gated and leaves `g_holds_token == false`, in which case + // we skip the `sem_post` to keep accounting balanced. void release_token() { #if LEAN_JOBSERVER_POSIX if (!m_jobserver_sem) return; - if (m_parked_tokens < m_parked_waiters) { - m_parked_tokens++; - m_parked_cv.notify_one(); - } else { + if (g_holds_token) { sem_post(m_jobserver_sem); + g_holds_token = false; } #endif } @@ -1113,9 +1110,13 @@ class task_manager { m_queue_cv.notify_one(); #if LEAN_JOBSERVER_POSIX // Release our token globally so a sibling worker (in this or - // another process) can pick up the sub-task. - if (m_jobserver_sem) { + // another process) can pick up the sub-task while we are + // blocked. If we don't currently hold a token (e.g. a previous + // un-reclaimed `wait_for` left us running un-gated), skip: + // there's no token to release. + if (m_jobserver_sem && g_holds_token) { sem_post(m_jobserver_sem); + g_holds_token = false; } #endif } @@ -1123,23 +1124,18 @@ class task_manager { if (in_pool) { m_max_std_workers--; #if LEAN_JOBSERVER_POSIX - if (m_jobserver_sem) { - // The worker that resolved our sub-task may have already - // run its `release_token` before we got here (the resolver - // holds the lock continuously through `resolve_core` and - // `release_token`, so we can't be scheduled in between). - // If that release saw `m_parked_waiters == 0`, the token - // went to the global semaphore — recover it with a - // non-blocking `sem_trywait` first. Only if neither pool - // has a token waiting do we register as a parked waiter - // and block on `m_parked_cv`. - if (m_parked_tokens > 0) { - m_parked_tokens--; - } else if (sem_trywait(m_jobserver_sem) != 0) { - m_parked_waiters++; - while (m_parked_tokens == 0) m_parked_cv.wait(lock); - m_parked_tokens--; - m_parked_waiters--; + // Try to reclaim a token non-blockingly. If one isn't + // immediately available, continue running un-gated rather than + // blocking in `sem_wait` — that's what previously let new + // `wait_for` calls spawn more workers that piled up in the same + // blocking call. The worker-loop's `release_token` will see + // `g_holds_token == false` and skip its `sem_post`, so the + // initial `sem_post`/`sem_wait` pair stays balanced. The cost is + // brief inter-process oversubscription, bounded by the depth of + // nested `Task.get`. + if (m_jobserver_sem && !g_holds_token) { + if (sem_trywait(m_jobserver_sem) == 0) { + g_holds_token = true; } } #endif