Skip to content

Commit 8ec2f60

Browse files
committed
add TaskGroup.stop()
ISSUE: #108951
1 parent 39e60ae commit 8ec2f60

3 files changed

Lines changed: 118 additions & 47 deletions

File tree

Doc/library/asyncio-task.rst

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,25 @@ and reliable way to wait for all tasks in the group to finish.
342342

343343
Close the given coroutine if the task group is not active.
344344

345+
.. method:: stop()
346+
347+
Stop the task group
348+
349+
:meth:`~asyncio.Task.cancel` will be called on any tasks in the group that
350+
aren't yet done, as well as the parent (body) of the group. This will
351+
cause the task group context manager to exit *without* a
352+
:exc:`asyncio.CancelledError` being raised.
353+
354+
If :meth:`stop` is called before entering the task group, the group will be
355+
stopped upon entry. This is useful for patterns where one piece of
356+
code passes an unused TaskGroup instance to another in order to have
357+
the ability to stop anything run within the group.
358+
359+
:meth:`stop` is idempotent and may be called after the task group has
360+
already exited.
361+
362+
.. versionadded:: 3.14
363+
345364
Example::
346365

347366
async def main():
@@ -414,53 +433,6 @@ reported by :meth:`asyncio.Task.cancelling`.
414433
Improved handling of simultaneous internal and external cancellations
415434
and correct preservation of cancellation counts.
416435

417-
Terminating a Task Group
418-
------------------------
419-
420-
While terminating a task group is not natively supported by the standard
421-
library, termination can be achieved by adding an exception-raising task
422-
to the task group and ignoring the raised exception:
423-
424-
.. code-block:: python
425-
426-
import asyncio
427-
from asyncio import TaskGroup
428-
429-
class TerminateTaskGroup(Exception):
430-
"""Exception raised to terminate a task group."""
431-
432-
async def force_terminate_task_group():
433-
"""Used to force termination of a task group."""
434-
raise TerminateTaskGroup()
435-
436-
async def job(task_id, sleep_time):
437-
print(f'Task {task_id}: start')
438-
await asyncio.sleep(sleep_time)
439-
print(f'Task {task_id}: done')
440-
441-
async def main():
442-
try:
443-
async with TaskGroup() as group:
444-
# spawn some tasks
445-
group.create_task(job(1, 0.5))
446-
group.create_task(job(2, 1.5))
447-
# sleep for 1 second
448-
await asyncio.sleep(1)
449-
# add an exception-raising task to force the group to terminate
450-
group.create_task(force_terminate_task_group())
451-
except* TerminateTaskGroup:
452-
pass
453-
454-
asyncio.run(main())
455-
456-
Expected output:
457-
458-
.. code-block:: text
459-
460-
Task 1: start
461-
Task 2: start
462-
Task 1: done
463-
464436
Sleeping
465437
========
466438

Lib/asyncio/taskgroups.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(self):
3636
self._errors = []
3737
self._base_error = None
3838
self._on_completed_fut = None
39+
self._stop_on_enter = False
3940

4041
def __repr__(self):
4142
info = ['']
@@ -62,6 +63,8 @@ async def __aenter__(self):
6263
raise RuntimeError(
6364
f'TaskGroup {self!r} cannot determine the parent task')
6465
self._entered = True
66+
if self._stop_on_enter:
67+
self.stop()
6568

6669
return self
6770

@@ -147,6 +150,10 @@ async def _aexit(self, et, exc):
147150
# If there are no pending cancellations left,
148151
# don't propagate CancelledError.
149152
propagate_cancellation_error = None
153+
# If Cancelled would actually be raised out of the TaskGroup,
154+
# suppress it-- this is significant when using stop().
155+
if not self._errors:
156+
return True
150157

151158
# Propagate CancelledError if there is one, except if there
152159
# are other errors -- those have priority.
@@ -273,3 +280,30 @@ def _on_task_done(self, task):
273280
self._abort()
274281
self._parent_cancel_requested = True
275282
self._parent_task.cancel()
283+
284+
def stop(self):
285+
"""Stop the task group
286+
287+
`cancel()` will be called on any tasks in the group that aren't yet
288+
done, as well as the parent (body) of the group. This will cause the
289+
task group context manager to exit *without* a Cancelled exception
290+
being raised.
291+
292+
If `stop()` is called before entering the task group, the group will be
293+
stopped upon entry. This is useful for patterns where one piece of
294+
code passes an unused TaskGroup instance to another in order to have
295+
the ability to stop anything run within the group.
296+
297+
`stop()` is idempotent and may be called after the task group has
298+
already exited.
299+
"""
300+
if not self._entered:
301+
self._stop_on_enter = True
302+
return
303+
if self._exiting and not self._tasks:
304+
return
305+
if not self._aborting:
306+
self._abort()
307+
if self._parent_task and not self._parent_cancel_requested:
308+
self._parent_cancel_requested = True
309+
self._parent_task.cancel()

Lib/test/test_asyncio/test_taskgroups.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33

44
import sys
55
import gc
6+
67
import asyncio
78
import contextvars
89
import contextlib
910
from asyncio import taskgroups
11+
import math
1012
import unittest
1113
import warnings
1214

@@ -997,6 +999,69 @@ class MyKeyboardInterrupt(KeyboardInterrupt):
997999
self.assertIsNotNone(exc)
9981000
self.assertListEqual(gc.get_referrers(exc), no_other_refs())
9991001

1002+
async def test_taskgroup_stop_children(self):
1003+
async with asyncio.TaskGroup() as tg:
1004+
tg.create_task(asyncio.sleep(math.inf))
1005+
tg.create_task(asyncio.sleep(math.inf))
1006+
await asyncio.sleep(0)
1007+
tg.stop()
1008+
1009+
async def test_taskgroup_stop_body(self):
1010+
count = 0
1011+
async with asyncio.TaskGroup() as tg:
1012+
tg.stop()
1013+
count += 1
1014+
await asyncio.sleep(0)
1015+
count += 1
1016+
self.assertEqual(count, 1)
1017+
1018+
async def test_taskgroup_stop_idempotent(self):
1019+
count = 0
1020+
async with asyncio.TaskGroup() as tg:
1021+
tg.stop()
1022+
tg.stop()
1023+
count += 1
1024+
await asyncio.sleep(0)
1025+
count += 1
1026+
self.assertEqual(count, 1)
1027+
1028+
async def test_taskgroup_stop_after_exit(self):
1029+
async with asyncio.TaskGroup() as tg:
1030+
await asyncio.sleep(0)
1031+
tg.stop()
1032+
1033+
async def test_taskgroup_stop_before_enter(self):
1034+
tg = asyncio.TaskGroup()
1035+
tg.stop()
1036+
count = 0
1037+
async with tg:
1038+
count += 1
1039+
await asyncio.sleep(0)
1040+
count += 1
1041+
self.assertEqual(count, 1)
1042+
1043+
async def test_taskgroup_stop_before_exception(self):
1044+
async def raise_exc(parent_tg: asyncio.TaskGroup):
1045+
parent_tg.stop()
1046+
raise RuntimeError
1047+
1048+
with self.assertRaises(ExceptionGroup):
1049+
async with asyncio.TaskGroup() as tg:
1050+
tg.create_task(raise_exc(tg))
1051+
await asyncio.sleep(1)
1052+
1053+
async def test_taskgroup_stop_after_exception(self):
1054+
async def raise_exc(parent_tg: asyncio.TaskGroup):
1055+
try:
1056+
raise RuntimeError
1057+
finally:
1058+
parent_tg.stop()
1059+
1060+
with self.assertRaises(ExceptionGroup):
1061+
async with asyncio.TaskGroup() as tg:
1062+
tg.create_task(raise_exc(tg))
1063+
await asyncio.sleep(1)
1064+
10001065

10011066
if __name__ == "__main__":
10021067
unittest.main()

0 commit comments

Comments
 (0)