Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions integration_tests/test_graceful_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import socket
import subprocess
import sys
import textwrap
import time

import pytest


pytestmark = pytest.mark.skipif(sys.platform.startswith("win32"), reason="SIGTERM graceful shutdown test is POSIX-only")


def _get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]


def _wait_for_server(port: int, process: subprocess.Popen[bytes]) -> None:
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
if process.poll() is not None:
stdout, stderr = process.communicate()
raise AssertionError(f"Robyn server exited early with {process.returncode}\nstdout: {stdout!r}\nstderr: {stderr!r}")

try:
with socket.create_connection(("127.0.0.1", port), timeout=1):
return
except OSError:
time.sleep(0.1)

process.kill()
stdout, stderr = process.communicate()
raise AssertionError(f"Robyn server did not start on port {port}\nstdout: {stdout!r}\nstderr: {stderr!r}")


def test_sigterm_runs_shutdown_handler(tmp_path):
# Multi-process SIGTERM coverage is left as follow-up because this regression
# targets the default single-process fixture path from issue #1324.
port = _get_free_port()
sentinel_file = tmp_path / "shutdown.txt"
app_file = tmp_path / "graceful_shutdown_app.py"
app_file.write_text(
textwrap.dedent(
"""
import os
from pathlib import Path

from robyn import Robyn

app = Robyn(__file__)

@app.get("/")
def index():
return "Hello World!"

@app.shutdown_handler
def shutdown_handler():
Path(os.environ["ROBYN_SHUTDOWN_SENTINEL"]).write_text("shutdown")

if __name__ == "__main__":
app.start(host="127.0.0.1", port=int(os.environ["ROBYN_PORT"]))
"""
)
)

env = os.environ.copy()
env["ROBYN_PORT"] = str(port)
env["ROBYN_SHUTDOWN_SENTINEL"] = str(sentinel_file)

process = subprocess.Popen([sys.executable, str(app_file)], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
_wait_for_server(port, process)
process.terminate()
process.wait(timeout=15)

assert process.returncode == 0
assert sentinel_file.read_text() == "shutdown"
finally:
if process.poll() is None:
process.kill()
process.communicate()
1 change: 1 addition & 0 deletions robyn/_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
GRACEFUL_SHUTDOWN_TIMEOUT = 10
37 changes: 35 additions & 2 deletions robyn/processpool.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,50 @@
import asyncio
import signal
import sys
import time
import webbrowser

from multiprocess import Process # type: ignore

from robyn._runtime import GRACEFUL_SHUTDOWN_TIMEOUT
from robyn.events import Events
from robyn.logger import logger
from robyn.robyn import FunctionInfo, Headers, Server, SocketHeld
from robyn.router import GlobalMiddleware, Route, RouteMiddleware
from robyn.types import Directory


def _raise_keyboard_interrupt(_sig, _frame):
raise KeyboardInterrupt


def _register_graceful_shutdown_handler() -> None:
if sys.platform.startswith("win32"):
return

signal.signal(signal.SIGTERM, _raise_keyboard_interrupt)


def _terminate_process_pool(process_pool: list[Process]) -> None:
for process in process_pool:
try:
process.terminate()
except ProcessLookupError:
pass

deadline = time.monotonic() + GRACEFUL_SHUTDOWN_TIMEOUT
for process in process_pool:
remaining_timeout = max(deadline - time.monotonic(), 0)
process.join(timeout=remaining_timeout)
if process.is_alive():
logger.warn("Worker process %s did not exit gracefully, force killing it.", process.pid)
try:
process.kill()
except ProcessLookupError:
pass
process.join()


def run_processes(
url: str,
port: int,
Expand Down Expand Up @@ -51,8 +84,7 @@ def run_processes(

def terminating_signal_handler(_sig, _frame):
logger.info("Terminating server!!", bold=True)
for process in process_pool:
process.kill()
_terminate_process_pool(process_pool)

signal.signal(signal.SIGINT, terminating_signal_handler)
signal.signal(signal.SIGTERM, terminating_signal_handler)
Expand Down Expand Up @@ -177,6 +209,7 @@ def spawn_process(
"""

loop = initialize_event_loop()
_register_graceful_shutdown_handler()

server = Server()

Expand Down
28 changes: 23 additions & 5 deletions robyn/reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from robyn._runtime import GRACEFUL_SHUTDOWN_TIMEOUT
from robyn.logger import Colors, logger


Expand Down Expand Up @@ -92,6 +93,7 @@ def setup_reloader(directory_path: str, file_path: str) -> None:

def terminating_signal_handler(_sig, _frame):
event_handler.stop_server()
event_handler.wait_for_server_shutdown()
logger.info("Terminating reloader", bold=True)
observer.stop()
observer.join()
Expand All @@ -109,7 +111,7 @@ def terminating_signal_handler(_sig, _frame):
finally:
observer.stop()
observer.join()
event_handler.process.wait()
event_handler.wait_for_server_shutdown()


class EventHandler(FileSystemEventHandler):
Expand All @@ -123,7 +125,24 @@ def __init__(self, file_path: str, directory_path: str) -> None:

def stop_server(self) -> None:
if self.process:
os.kill(self.process.pid, signal.SIGTERM) # Stop the subprocess using os.kill()
try:
self.process.terminate()
except ProcessLookupError:
pass

def wait_for_server_shutdown(self) -> None:
if not self.process:
return

try:
self.process.wait(timeout=GRACEFUL_SHUTDOWN_TIMEOUT)
except subprocess.TimeoutExpired:
logger.warn("Server process %s did not exit gracefully, force killing it.", self.process.pid)
try:
self.process.kill()
except ProcessLookupError:
pass
self.process.wait()

def reload(self) -> None:
self.stop_server()
Expand All @@ -138,9 +157,8 @@ def reload(self) -> None:
clean_rust_binaries(self.built_rust_binaries)
self.built_rust_binaries = compile_rust_files(self.directory_path)

prev_process = self.process
if prev_process:
prev_process.kill()
if self.process:
self.wait_for_server_shutdown()

self.process = subprocess.Popen(
[sys.executable, *arguments],
Expand Down