Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions robyn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ def start(self, host: str = "127.0.0.1", port: int = 8080, _check_port: bool = T
open_browser,
client_timeout,
keep_alive_timeout,
self.config.max_requests,
)


Expand Down
9 changes: 9 additions & 0 deletions robyn/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ def __init__(self) -> None:
default=False,
help="Fast mode. It sets the optimal values for processes, workers and log level. However, you can override them.",
)
parser.add_argument(
"--max-requests",
dest="max_requests",
type=int,
default=None,
required=False,
help="Recycle worker processes after this many requests. Helps contain memory leaks. [Default: None (disabled)]",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

args, unknown_args = parser.parse_known_args()
self.fast = args.fast
Expand All @@ -99,6 +107,7 @@ def __init__(self) -> None:
self.file_path = None
self.disable_openapi = args.disable_openapi
self.log_level = args.log_level
self.max_requests = args.max_requests

if self.fast:
# doing this here before every other check
Expand Down
76 changes: 69 additions & 7 deletions robyn/processpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from robyn.events import Events
from robyn.logger import logger
from robyn.robyn import FunctionInfo, Headers, Server, SocketHeld
from robyn.robyn import FunctionInfo, Headers, Server, SocketHeld, get_request_count
from robyn.router import GlobalMiddleware, Route, RouteMiddleware
from robyn.types import Directory

Expand All @@ -30,7 +30,10 @@ def run_processes(
open_browser: bool,
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
import time

socket = SocketHeld(url, port)

process_pool = init_processpool(
Expand All @@ -48,12 +51,24 @@ def run_processes(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
)

shutting_down = False

def terminating_signal_handler(_sig, _frame):
logger.info("Terminating server!!", bold=True)
nonlocal shutting_down
shutting_down = True
logger.info("Gracefully shutting down server...", bold=True)
for process in process_pool:
process.terminate()
for process in process_pool:
process.kill()
process.join(timeout=30)
if process.is_alive():
logger.warning("Process %s did not shut down in time, forcing kill.", process.pid)
process.kill()
process.join(timeout=5)
sys.exit(0)

signal.signal(signal.SIGINT, terminating_signal_handler)
signal.signal(signal.SIGTERM, terminating_signal_handler)
Expand All @@ -63,8 +78,38 @@ def terminating_signal_handler(_sig, _frame):
webbrowser.open_new_tab(f"http://{url}:{port}/")

logger.info("Press Ctrl + C to stop \n")
for process in process_pool:
process.join()

if max_requests and max_requests > 0 and len(process_pool) > 0:
while not shutting_down:
for i, process in enumerate(process_pool):
if not process.is_alive() and not shutting_down:
logger.info("Worker process exited (recycling), spawning replacement.")
copied_socket = socket.try_clone()
new_process = Process(
target=spawn_process,
args=(
directories,
request_headers,
routes,
global_middlewares,
route_middlewares,
web_sockets,
event_handlers,
copied_socket,
workers,
response_headers,
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
),
)
new_process.start()
process_pool[i] = new_process
time.sleep(5)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +82 to +110

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize max_requests before these truthiness branches.

Line 82 and Line 136 use truthiness, but src/server.rs enables recycling for any non-None value. 0 therefore disables supervision here while still enabling immediate shutdown in Rust. Line 136 also forces all Windows runs through the inline path, so there is no supervisor to replace that exit. Both cases can turn recycling into a permanent shutdown.

💡 Minimal guardrail
 def run_processes(
@@
     keep_alive_timeout: int = 20,
     max_requests: Optional[int] = None,
 ) -> List[Process]:
+    if max_requests is not None and max_requests <= 0:
+        max_requests = None
@@
-    if max_requests and max_requests > 0 and len(process_pool) > 0:
+    if max_requests is not None and len(process_pool) > 0:
         while not shutting_down:
             ...
 def init_processpool(
@@
     keep_alive_timeout: int = 20,
     max_requests: Optional[int] = None,
 ) -> List[Process]:
+    if max_requests is not None and max_requests <= 0:
+        max_requests = None
+    if sys.platform.startswith("win32") and max_requests is not None:
+        raise RuntimeError("--max-requests is not supported on Windows yet")
@@
-    if sys.platform.startswith("win32") or (processes == 1 and not max_requests):
+    if sys.platform.startswith("win32") or (processes == 1 and max_requests is None):
         spawn_process(...)

Also applies to: 133-152

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@robyn/processpool.py` around lines 82 - 110, The truthy checks on
max_requests are wrong: normalize by testing for None explicitly so a value of 0
does not disable supervision; replace conditions like "if max_requests and
max_requests > 0 and len(process_pool) > 0:" with "if max_requests is not None
and len(process_pool) > 0:" (and similarly update the other branch around the
Windows/inline path at the block referencing process_pool and spawn_process) so
recycling supervision is enabled for any non-None max_requests value.

⚠️ Potential issue | 🟠 Major

Worker replacement starts late enough to cause outages.

The parent only notices exited workers on the next 5-second sleep boundary at Line 110. In processes=1 mode that is a full outage on every recycle; if several workers hit the limit together, it can briefly drop the whole server. Please wait on process sentinels, or at least poll much more frequently, so replacements start immediately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@robyn/processpool.py` around lines 82 - 110, The parent loop currently sleeps
for 5 seconds which delays detection of exited workers and causes outages;
change the recycling loop so exited processes are reaped and replaced
immediately by calling process.join() as soon as process.is_alive() is False (or
use process.join(timeout=0) to non-blockingly reap), spawn the replacement
Process (using Process(target=spawn_process, ...), socket.try_clone(), etc.)
right away, and reduce or remove the coarse time.sleep(5) (replace with a short
poll interval like 0.1s if needed) so worker replacement happens immediately
instead of waiting on the 5s boundary.

else:
for process in process_pool:
process.join()

return process_pool

Expand All @@ -84,6 +129,7 @@ def init_processpool(
excluded_response_headers_paths: Optional[List[str]],
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
) -> List[Process]:
process_pool: List = []
if sys.platform.startswith("win32") or processes == 1:
Expand All @@ -101,6 +147,7 @@ def init_processpool(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
)

return process_pool
Expand All @@ -123,6 +170,7 @@ def init_processpool(
excluded_response_headers_paths,
client_timeout,
keep_alive_timeout,
max_requests,
),
)
process.start()
Expand Down Expand Up @@ -161,6 +209,7 @@ def spawn_process(
excluded_response_headers_paths: Optional[List[str]],
client_timeout: int = 30,
keep_alive_timeout: int = 20,
max_requests: Optional[int] = None,
):
"""
This function is called by the main process handler to create a server runtime.
Expand All @@ -175,14 +224,13 @@ def spawn_process(
:param socket SocketHeld: This is the main tcp socket, which is being shared across multiple processes.
:param process_name string: This is the name given to the process to identify the process
:param workers int: This is the name given to the process to identify the process
:param max_requests Optional[int]: Recycle this worker after N requests
"""

loop = initialize_event_loop()

server = Server()

# TODO: if we remove the dot access
# the startup time will improve in the server
for directory in directories:
server.add_directory(*directory.as_list())

Expand Down Expand Up @@ -220,6 +268,20 @@ def spawn_process(
try:
server.start(socket, workers)
loop = asyncio.get_event_loop()

if max_requests and max_requests > 0:

def _check_max_requests():
if get_request_count() >= max_requests:
logger.info("Max requests (%d) reached, worker shutting down for recycling.", max_requests)
loop.stop()
else:
loop.call_later(5, _check_max_requests)

loop.call_later(5, _check_max_requests)

loop.run_forever()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except KeyboardInterrupt:
pass
finally:
loop.close()
4 changes: 4 additions & 0 deletions robyn/robyn.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ from typing import Callable, Optional, Union
def get_version() -> str:
pass

def get_request_count() -> int:
"""Returns the total number of HTTP requests handled by this worker process."""
pass

class SocketHeld:
def __init__(self, url: str, port: int):
pass
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ fn get_version() -> String {
env!("CARGO_PKG_VERSION").into()
}

#[pyfunction]
fn get_request_count() -> u64 {
server::get_request_count()
}

#[pymodule]
pub fn robyn(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
// the pymodule class/function to make the rustPyFunctions available
m.add_function(wrap_pyfunction!(get_version, m)?)?;
m.add_function(wrap_pyfunction!(get_request_count, m)?)?;

m.add_class::<Server>()?;
m.add_class::<Headers>()?;
Expand Down
7 changes: 7 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const MAX_PAYLOAD_SIZE: &str = "ROBYN_MAX_PAYLOAD_SIZE";
const DEFAULT_MAX_PAYLOAD_SIZE: usize = 1_000_000; // 1Mb

static STARTED: AtomicBool = AtomicBool::new(false);
static REQUEST_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);

pub fn get_request_count() -> u64 {
REQUEST_COUNT.load(Relaxed)
}

#[derive(Clone)]
struct Directory {
Expand Down Expand Up @@ -482,6 +487,8 @@ async fn index(
excluded_response_headers_paths: web::Data<Option<Vec<String>>>,
req: HttpRequest,
) -> ResponseType {
REQUEST_COUNT.fetch_add(1, Relaxed);

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
if !HttpMethod::is_supported(req.method()) {
return ResponseType::Standard(Response::method_not_allowed(None));
}
Expand Down
Loading