diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index 426357b607f2..1500c3b64884 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -28,6 +28,41 @@ Author: Leonardo de Moura #define LEAN_SUPPORTS_BACKTRACE 0 #endif +// POSIX named semaphores for the experimental cross-process jobserver +// (env var `LEAN_JOB_SEMAPHORE=/name`, or `LEAN_JOB_SEMAPHORE_AUTO=N` to +// auto-create one and propagate it to children). Linux + macOS only. +#if !defined(_WIN32) && !defined(LEAN_EMSCRIPTEN) + #define LEAN_JOBSERVER_POSIX 1 + #include + #include + #include + #include + #include +#else + #define LEAN_JOBSERVER_POSIX 0 +#endif + +#if LEAN_JOBSERVER_POSIX +// Name of a semaphore created by this process; unlinked on exit. +static char * g_owned_sem_name = nullptr; + +static void unlink_owned_sem() { + if (g_owned_sem_name) { + sem_unlink(g_owned_sem_name); + free(g_owned_sem_name); + g_owned_sem_name = nullptr; + } +} + +// Per-worker-thread flag: does this thread currently hold a jobserver token? +// Set by `acquire_token` / `wait_for`'s reclaim, cleared by `release_token` +// or by `wait_for`'s release. When `wait_for` cannot reclaim a token +// non-blockingly, the worker continues running its task un-gated and this +// flag stays false; the worker-loop's `release_token` then skips its +// `sem_post`, keeping per-worker token accounting balanced. +static thread_local bool g_holds_token = false; +#endif + #if LEAN_SUPPORTS_BACKTRACE #include #include @@ -719,6 +754,41 @@ class task_manager { condition_variable m_task_finished_cv; condition_variable m_dedicated_finished_cv; bool m_shutting_down{false}; +#if LEAN_JOBSERVER_POSIX + // Optional cross-process token pool. When non-null, standard workers must + // acquire a token before running a task and release it after, so that the + // total number of concurrently running standard workers across all + // processes sharing the semaphore does not exceed the initial count. + sem_t * m_jobserver_sem{nullptr}; +#endif + + // Acquire a token before running a task. Blocks on the global semaphore. + // Must be called with `lock` held; temporarily releases it. + void acquire_token(unique_lock & lock) { +#if LEAN_JOBSERVER_POSIX + if (!m_jobserver_sem) return; + lock.unlock(); + while (sem_wait(m_jobserver_sem) != 0 && errno == EINTR) {} + lock.lock(); + g_holds_token = true; +#else + (void)lock; +#endif + } + + // Release the token currently held by this worker to the global semaphore, + // if any. A `wait_for` that couldn't reclaim its token non-blockingly + // continues un-gated and leaves `g_holds_token == false`, in which case + // we skip the `sem_post` to keep accounting balanced. + void release_token() { +#if LEAN_JOBSERVER_POSIX + if (!m_jobserver_sem) return; + if (g_holds_token) { + sem_post(m_jobserver_sem); + g_holds_token = false; + } +#endif + } lean_task_object * dequeue() { lean_assert(m_queues_size != 0); @@ -808,9 +878,23 @@ class task_manager { continue; } + // Acquire a jobserver token (drops the mutex while blocked). + // After waking we must re-check conditions, because the queue + // may have been drained or shutdown may have been requested. + acquire_token(lock); +#if LEAN_JOBSERVER_POSIX + if (m_jobserver_sem && + (m_queues_size == 0 || m_shutting_down || + m_std_workers.size() - m_idle_std_workers >= m_max_std_workers)) { + release_token(); + continue; + } +#endif + lean_task_object * t = dequeue(); m_idle_std_workers--; run_task(lock, t); + release_token(); m_idle_std_workers++; reset_heartbeat(); } @@ -914,6 +998,38 @@ class task_manager { public: task_manager(unsigned max_std_workers): m_max_std_workers(max_std_workers) { +#if LEAN_JOBSERVER_POSIX + if (char const * name = std::getenv("LEAN_JOB_SEMAPHORE")) { + // Attach as a participant in an existing jobserver. + sem_t * s = sem_open(name, 0); + if (s != SEM_FAILED) { + m_jobserver_sem = s; + } + } else { + // No jobserver set up yet; create one with `max_std_workers` + // slots (overridable via `LEAN_JOB_SEMAPHORE_AUTO=N`) and hand it + // to children via env. Do NOT gate this process: the creator is + // typically an orchestrator (e.g. `lake`) whose workers block on + // subprocesses, and gating it would deadlock the pool. + int count = (int)max_std_workers; + if (char const * auto_n = std::getenv("LEAN_JOB_SEMAPHORE_AUTO")) { + count = std::atoi(auto_n); + } + if (count > 0) { + char buf[64]; + std::snprintf(buf, sizeof buf, "/lean-jobs-%d", (int)getpid()); + sem_unlink(buf); + sem_t * s = sem_open(buf, O_CREAT | O_EXCL, 0600, (unsigned)count); + if (s != SEM_FAILED) { + sem_close(s); // the named semaphore persists until unlink + setenv("LEAN_JOB_SEMAPHORE", buf, 1); + unsetenv("LEAN_JOB_SEMAPHORE_AUTO"); + g_owned_sem_name = strdup(buf); + std::atexit(unlink_owned_sem); + } + } + } +#endif } ~task_manager() { @@ -931,6 +1047,12 @@ class task_manager { unique_lock lock(m_mutex); m_dedicated_finished_cv.wait(lock, [&]() { return m_num_dedicated_workers == 0; }); // never seems to terminate under Emscripten +#endif +#if LEAN_JOBSERVER_POSIX + if (m_jobserver_sem) { + sem_close(m_jobserver_sem); + m_jobserver_sem = nullptr; + } #endif } @@ -986,10 +1108,37 @@ class task_manager { spawn_worker(); else m_queue_cv.notify_one(); +#if LEAN_JOBSERVER_POSIX + // Release our token globally so a sibling worker (in this or + // another process) can pick up the sub-task while we are + // blocked. If we don't currently hold a token (e.g. a previous + // un-reclaimed `wait_for` left us running un-gated), skip: + // there's no token to release. + if (m_jobserver_sem && g_holds_token) { + sem_post(m_jobserver_sem); + g_holds_token = false; + } +#endif } m_task_finished_cv.wait(lock, [&]() { return t->m_value != nullptr; }); if (in_pool) { m_max_std_workers--; +#if LEAN_JOBSERVER_POSIX + // Try to reclaim a token non-blockingly. If one isn't + // immediately available, continue running un-gated rather than + // blocking in `sem_wait` — that's what previously let new + // `wait_for` calls spawn more workers that piled up in the same + // blocking call. The worker-loop's `release_token` will see + // `g_holds_token == false` and skip its `sem_post`, so the + // initial `sem_post`/`sem_wait` pair stays balanced. The cost is + // brief inter-process oversubscription, bounded by the depth of + // nested `Task.get`. + if (m_jobserver_sem && !g_holds_token) { + if (sem_trywait(m_jobserver_sem) == 0) { + g_holds_token = true; + } + } +#endif } }