Skip to content

Commit b60728d

Browse files
committed
Implement highlevel unix socket listeners
1 parent 5ca9662 commit b60728d

3 files changed

Lines changed: 255 additions & 1 deletion

File tree

src/trio/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,16 @@
6363
serve_tcp as serve_tcp,
6464
)
6565
from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream
66+
from ._highlevel_open_unix_listeners import (
67+
open_unix_listener as open_unix_listener,
68+
serve_unix as serve_unix,
69+
)
6670
from ._highlevel_open_unix_stream import open_unix_socket as open_unix_socket
6771
from ._highlevel_serve_listeners import serve_listeners as serve_listeners
6872
from ._highlevel_socket import (
6973
SocketListener as SocketListener,
7074
SocketStream as SocketStream,
75+
UnixSocketListener as UnixSocketListener,
7176
)
7277
from ._highlevel_ssl_helpers import (
7378
open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners,
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
from __future__ import annotations
2+
3+
import os
4+
import sys
5+
from typing import TYPE_CHECKING
6+
7+
import trio
8+
import trio.socket as tsocket
9+
from trio import TaskStatus
10+
11+
if TYPE_CHECKING:
12+
from collections.abc import Awaitable, Callable
13+
14+
15+
try:
16+
from trio.socket import AF_UNIX
17+
18+
HAS_UNIX = True
19+
except ImportError:
20+
HAS_UNIX = False
21+
22+
23+
# Default backlog size:
24+
#
25+
# Having the backlog too low can cause practical problems (a perfectly healthy
26+
# service that starts failing to accept connections if they arrive in a
27+
# burst).
28+
#
29+
# Having it too high doesn't really cause any problems. Like any buffer, you
30+
# want backlog queue to be zero usually, and it won't save you if you're
31+
# getting connection attempts faster than you can call accept() on an ongoing
32+
# basis. But unlike other buffers, this one doesn't really provide any
33+
# backpressure. If a connection gets stuck waiting in the backlog queue, then
34+
# from the peer's point of view the connection succeeded but then their
35+
# send/recv will stall until we get to it, possibly for a long time. OTOH if
36+
# there isn't room in the backlog queue, then their connect stalls, possibly
37+
# for a long time, which is pretty much the same thing.
38+
#
39+
# A large backlog can also use a bit more kernel memory, but this seems fairly
40+
# negligible these days.
41+
#
42+
# So this suggests we should make the backlog as large as possible. This also
43+
# matches what Golang does. However, they do it in a weird way, where they
44+
# have a bunch of code to sniff out the configured upper limit for backlog on
45+
# different operating systems. But on every system, passing in a too-large
46+
# backlog just causes it to be silently truncated to the configured maximum,
47+
# so this is unnecessary -- we can just pass in "infinity" and get the maximum
48+
# that way. (Verified on Windows, Linux, macOS using
49+
# https://github.com/python-trio/trio/wiki/notes-to-self#measure-listen-backlogpy
50+
def _compute_backlog(backlog: int | None) -> int:
51+
# Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are
52+
# missing overflow protection, so we apply our own overflow protection.
53+
# https://github.com/golang/go/issues/5030
54+
if not isinstance(backlog, int) and backlog is not None:
55+
raise TypeError(f"backlog must be an int or None, not {backlog!r}")
56+
if backlog is None:
57+
return 0xFFFF
58+
return min(backlog, 0xFFFF)
59+
60+
61+
async def open_unix_listener(
62+
path: str | bytes | os.PathLike[str] | os.PathLike[bytes],
63+
*,
64+
mode: int | None = None, # 0o666,
65+
backlog: int | None = None,
66+
) -> trio.UnixSocketListener:
67+
"""Create :class:`SocketListener` objects to listen for connections.
68+
Opens a connection to the specified
69+
`Unix domain socket <https://en.wikipedia.org/wiki/Unix_domain_socket>`__.
70+
71+
You must have read/write permission on the specified file to connect.
72+
73+
Args:
74+
75+
path (str): Filename of UNIX socket to create and listen on.
76+
Absolute or relative paths may be used.
77+
78+
mode (int or None): The socket file permissions.
79+
UNIX permissions are usually specified in octal numbers.
80+
If you leave this as ``None``, Trio will not change the mode from
81+
the operating system's default.
82+
83+
backlog (int or None): The listen backlog to use. If you leave this as
84+
``None`` then Trio will pick a good default. (Currently: whatever
85+
your system has configured as the maximum backlog.)
86+
87+
Returns:
88+
:class:`UnixSocketListener`
89+
90+
Raises:
91+
:class:`TypeError` if invalid arguments.
92+
:class:`RuntimeError`: If AF_UNIX sockets are not supported.
93+
"""
94+
if not HAS_UNIX:
95+
raise RuntimeError("Unix sockets are not supported on this platform")
96+
97+
computed_backlog = _compute_backlog(backlog)
98+
99+
fspath = await trio.Path(os.fsdecode(path)).absolute()
100+
101+
folder = fspath.parent
102+
if not await folder.exists():
103+
raise FileNotFoundError(f"Socket folder does not exist: {folder!r}")
104+
105+
# much more simplified logic vs tcp sockets - one socket type and only one
106+
# possible location to connect to
107+
sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM)
108+
try:
109+
# See https://github.com/python-trio/trio/issues/39
110+
if sys.platform != "win32":
111+
sock.setsockopt(tsocket.SOL_SOCKET, tsocket.SO_REUSEADDR, 1)
112+
113+
await sock.bind(str(fspath))
114+
115+
sock.listen(computed_backlog)
116+
117+
if mode is not None:
118+
await fspath.chmod(mode)
119+
120+
return trio.UnixSocketListener(sock)
121+
except BaseException as exc:
122+
sock.close()
123+
try:
124+
os.unlink(str(fspath))
125+
except BaseException as exc_2:
126+
raise exc_2 from exc
127+
raise
128+
129+
130+
async def serve_unix(
131+
handler: Callable[[trio.SocketStream], Awaitable[object]],
132+
path: str | bytes | os.PathLike[str] | os.PathLike[bytes],
133+
*,
134+
backlog: int | None = None,
135+
handler_nursery: trio.Nursery | None = None,
136+
task_status: TaskStatus[list[trio.UnixSocketListener]] = trio.TASK_STATUS_IGNORED,
137+
) -> None:
138+
"""Listen for incoming UNIX connections, and for each one start a task
139+
running ``handler(stream)``.
140+
This is a thin convenience wrapper around :func:`open_unix_listener` and
141+
:func:`serve_listeners` – see them for full details.
142+
.. warning::
143+
If ``handler`` raises an exception, then this function doesn't do
144+
anything special to catch it – so by default the exception will
145+
propagate out and crash your server. If you don't want this, then catch
146+
exceptions inside your ``handler``, or use a ``handler_nursery`` object
147+
that responds to exceptions in some other way.
148+
When used with ``nursery.start`` you get back the newly opened listeners.
149+
Args:
150+
handler: The handler to start for each incoming connection. Passed to
151+
:func:`serve_listeners`.
152+
path: The socket file name.
153+
Passed to :func:`open_unix_listener`.
154+
backlog: The listen backlog, or None to have a good default picked.
155+
Passed to :func:`open_tcp_listener`.
156+
handler_nursery: The nursery to start handlers in, or None to use an
157+
internal nursery. Passed to :func:`serve_listeners`.
158+
task_status: This function can be used with ``nursery.start``.
159+
Returns:
160+
This function only returns when cancelled.
161+
Raises:
162+
RuntimeError: If AF_UNIX sockets are not supported.
163+
"""
164+
if not HAS_UNIX:
165+
raise RuntimeError("Unix sockets are not supported on this platform")
166+
167+
listener = await open_unix_listener(path, backlog=backlog)
168+
await trio.serve_listeners(
169+
handler,
170+
[listener],
171+
handler_nursery=handler_nursery,
172+
task_status=task_status,
173+
)

src/trio/_highlevel_socket.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
import errno
55
from contextlib import contextmanager, suppress
6-
from typing import TYPE_CHECKING, overload
6+
from os import unlink
7+
from typing import TYPE_CHECKING, Final, overload
78

89
import trio
910

@@ -31,6 +32,8 @@
3132
errno.ENOTSOCK,
3233
}
3334

35+
HAS_UNIX: Final = hasattr(tsocket, "AF_UNIX")
36+
3437

3538
@contextmanager
3639
def _translate_socket_errors_to_stream_errors() -> Generator[None, None, None]:
@@ -412,3 +415,76 @@ async def aclose(self) -> None:
412415
"""Close this listener and its underlying socket."""
413416
self.socket.close()
414417
await trio.lowlevel.checkpoint()
418+
419+
420+
@final
421+
class UnixSocketListener(Listener[SocketStream]):
422+
"""A :class:`~trio.abc.Listener` that uses a listening socket to accept
423+
incoming connections as :class:`SocketStream` objects.
424+
425+
Args:
426+
socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``,
427+
and be listening.
428+
429+
Note that the :class:`UnixSocketListener` "takes ownership" of the given
430+
socket; closing the :class:`UnixSocketListener` will also close the socket
431+
and unlink its associated file.
432+
433+
.. attribute:: socket
434+
435+
The Trio socket object that this stream wraps.
436+
437+
"""
438+
439+
def __init__(self, socket: SocketType) -> None:
440+
if not HAS_UNIX:
441+
raise RuntimeError("Unix sockets are not supported on this platform")
442+
if not isinstance(socket, tsocket.SocketType):
443+
raise TypeError("SocketListener requires a Trio socket object")
444+
if socket.type != tsocket.SOCK_STREAM:
445+
raise ValueError("SocketListener requires a SOCK_STREAM socket")
446+
try:
447+
listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN)
448+
except OSError:
449+
# SO_ACCEPTCONN fails on macOS; we just have to trust the user.
450+
pass
451+
else:
452+
if not listening:
453+
raise ValueError("SocketListener requires a listening socket")
454+
455+
self.socket = socket
456+
457+
async def accept(self) -> SocketStream:
458+
"""Accept an incoming connection.
459+
460+
Returns:
461+
:class:`SocketStream`
462+
463+
Raises:
464+
OSError: if the underlying call to ``accept`` raises an unexpected
465+
error.
466+
ClosedResourceError: if you already closed the socket.
467+
468+
This method handles routine errors like ``ECONNABORTED``, but passes
469+
other errors on to its caller. In particular, it does *not* make any
470+
special effort to handle resource exhaustion errors like ``EMFILE``,
471+
``ENFILE``, ``ENOBUFS``, ``ENOMEM``.
472+
473+
"""
474+
while True:
475+
try:
476+
sock, _ = await self.socket.accept()
477+
except OSError as exc:
478+
if exc.errno in _closed_stream_errnos:
479+
raise trio.ClosedResourceError from None
480+
if exc.errno not in _ignorable_accept_errnos:
481+
raise
482+
else:
483+
return SocketStream(sock)
484+
485+
async def aclose(self) -> None:
486+
"""Close this listener, its underlying socket, and unlink its associated file."""
487+
path = self.socket.getsockname()
488+
self.socket.close()
489+
unlink(path)
490+
await trio.lowlevel.checkpoint()

0 commit comments

Comments
 (0)