mirror of
https://github.com/python/cpython.git
synced 2026-05-06 04:37:33 -04:00
gh-108951: add TaskGroup.cancel() (#127214)
Fixes #108951 Co-authored-by: sobolevn <mail@sobolevn.me> Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com> Co-authored-by: Guido van Rossum <guido@python.org>
This commit is contained in:
@@ -355,6 +355,34 @@ and reliable way to wait for all tasks in the group to finish.
|
||||
|
||||
Passes on all *kwargs* to :meth:`loop.create_task`
|
||||
|
||||
.. method:: cancel()
|
||||
|
||||
Cancel the task group. This is a non-exceptional, early exit of the
|
||||
task group's lifetime -- useful once the group's goal has been met or
|
||||
its services no longer needed.
|
||||
|
||||
:meth:`~asyncio.Task.cancel` will be called on any tasks in the group that
|
||||
aren't yet done, as well as the parent (body) of the group. The task group
|
||||
context manager will exit *without* :exc:`asyncio.CancelledError` being raised.
|
||||
|
||||
If :meth:`cancel` is called before entering the task group, the group will be
|
||||
cancelled upon entry. This is useful for patterns where one piece of
|
||||
code passes an unused :class:`asyncio.TaskGroup` instance to another in order to have
|
||||
the ability to cancel anything run within the group.
|
||||
|
||||
:meth:`cancel` is idempotent and may be called after the task group has
|
||||
already exited.
|
||||
|
||||
Some ways to use :meth:`cancel`:
|
||||
|
||||
* call it from the task group body based on some condition or event
|
||||
* pass the task group instance to child tasks via :meth:`create_task`, allowing a child
|
||||
task to conditionally cancel the entire entire group
|
||||
* pass the task group instance or bound :meth:`cancel` method to some other task *before*
|
||||
opening the task group, allowing remote cancellation
|
||||
|
||||
.. versionadded:: next
|
||||
|
||||
Example::
|
||||
|
||||
async def main():
|
||||
@@ -366,7 +394,8 @@ Example::
|
||||
The ``async with`` statement will wait for all tasks in the group to finish.
|
||||
While waiting, new tasks may still be added to the group
|
||||
(for example, by passing ``tg`` into one of the coroutines
|
||||
and calling ``tg.create_task()`` in that coroutine).
|
||||
and calling ``tg.create_task()`` in that coroutine). There is also opportunity
|
||||
to short-circuit the entire task group with ``tg.cancel()``, based on some condition.
|
||||
Once the last task has finished and the ``async with`` block is exited,
|
||||
no new tasks may be added to the group.
|
||||
|
||||
@@ -427,53 +456,6 @@ reported by :meth:`asyncio.Task.cancelling`.
|
||||
Improved handling of simultaneous internal and external cancellations
|
||||
and correct preservation of cancellation counts.
|
||||
|
||||
Terminating a task group
|
||||
------------------------
|
||||
|
||||
While terminating a task group is not natively supported by the standard
|
||||
library, termination can be achieved by adding an exception-raising task
|
||||
to the task group and ignoring the raised exception:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
from asyncio import TaskGroup
|
||||
|
||||
class TerminateTaskGroup(Exception):
|
||||
"""Exception raised to terminate a task group."""
|
||||
|
||||
async def force_terminate_task_group():
|
||||
"""Used to force termination of a task group."""
|
||||
raise TerminateTaskGroup()
|
||||
|
||||
async def job(task_id, sleep_time):
|
||||
print(f'Task {task_id}: start')
|
||||
await asyncio.sleep(sleep_time)
|
||||
print(f'Task {task_id}: done')
|
||||
|
||||
async def main():
|
||||
try:
|
||||
async with TaskGroup() as group:
|
||||
# spawn some tasks
|
||||
group.create_task(job(1, 0.5))
|
||||
group.create_task(job(2, 1.5))
|
||||
# sleep for 1 second
|
||||
await asyncio.sleep(1)
|
||||
# add an exception-raising task to force the group to terminate
|
||||
group.create_task(force_terminate_task_group())
|
||||
except* TerminateTaskGroup:
|
||||
pass
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
Expected output:
|
||||
|
||||
.. code-block:: text
|
||||
|
||||
Task 1: start
|
||||
Task 2: start
|
||||
Task 1: done
|
||||
|
||||
Sleeping
|
||||
========
|
||||
|
||||
|
||||
@@ -3,3 +3,5 @@
|
||||
# Remove from here in 3.16
|
||||
c-api/allocation.html: deprecated-aliases
|
||||
c-api/file.html: deprecated-api
|
||||
|
||||
library/asyncio-task.html: terminating-a-task-group
|
||||
|
||||
@@ -37,6 +37,7 @@ class TaskGroup:
|
||||
self._errors = []
|
||||
self._base_error = None
|
||||
self._on_completed_fut = None
|
||||
self._cancel_on_enter = False
|
||||
|
||||
def __repr__(self):
|
||||
info = ['']
|
||||
@@ -63,6 +64,8 @@ class TaskGroup:
|
||||
raise RuntimeError(
|
||||
f'TaskGroup {self!r} cannot determine the parent task')
|
||||
self._entered = True
|
||||
if self._cancel_on_enter:
|
||||
self.cancel()
|
||||
|
||||
return self
|
||||
|
||||
@@ -178,6 +181,9 @@ class TaskGroup:
|
||||
finally:
|
||||
exc = None
|
||||
|
||||
# Suppress any remaining exception (exceptions deserving to be raised
|
||||
# were raised above).
|
||||
return True
|
||||
|
||||
def create_task(self, coro, **kwargs):
|
||||
"""Create a new task in this group and return it.
|
||||
@@ -278,3 +284,30 @@ class TaskGroup:
|
||||
self._abort()
|
||||
self._parent_cancel_requested = True
|
||||
self._parent_task.cancel()
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel the task group
|
||||
|
||||
`cancel()` will be called on any tasks in the group that aren't yet
|
||||
done, as well as the parent (body) of the group. This will cause the
|
||||
task group context manager to exit *without* `asyncio.CancelledError`
|
||||
being raised.
|
||||
|
||||
If `cancel()` is called before entering the task group, the group will be
|
||||
cancelled upon entry. This is useful for patterns where one piece of
|
||||
code passes an unused TaskGroup instance to another in order to have
|
||||
the ability to cancel anything run within the group.
|
||||
|
||||
`cancel()` is idempotent and may be called after the task group has
|
||||
already exited.
|
||||
"""
|
||||
if not self._entered:
|
||||
self._cancel_on_enter = True
|
||||
return
|
||||
if self._exiting and not self._tasks:
|
||||
return
|
||||
if not self._aborting:
|
||||
self._abort()
|
||||
if self._parent_task and not self._parent_cancel_requested:
|
||||
self._parent_cancel_requested = True
|
||||
self._parent_task.cancel()
|
||||
|
||||
@@ -1102,6 +1102,131 @@ class BaseTestTaskGroup:
|
||||
# cancellation happens here and error is more understandable
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def test_taskgroup_cancel_children(self):
|
||||
# (asserting that TimeoutError is not raised)
|
||||
async with asyncio.timeout(1):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(asyncio.sleep(10))
|
||||
tg.create_task(asyncio.sleep(10))
|
||||
await asyncio.sleep(0)
|
||||
tg.cancel()
|
||||
|
||||
async def test_taskgroup_cancel_body(self):
|
||||
count = 0
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.cancel()
|
||||
count += 1
|
||||
await asyncio.sleep(0)
|
||||
count += 1
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
async def test_taskgroup_cancel_idempotent(self):
|
||||
count = 0
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.cancel()
|
||||
tg.cancel()
|
||||
count += 1
|
||||
await asyncio.sleep(0)
|
||||
count += 1
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
async def test_taskgroup_cancel_after_exit(self):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
await asyncio.sleep(0)
|
||||
# (asserting that exception is not raised)
|
||||
tg.cancel()
|
||||
|
||||
async def test_taskgroup_cancel_before_enter(self):
|
||||
tg = asyncio.TaskGroup()
|
||||
tg.cancel()
|
||||
count = 0
|
||||
async with tg:
|
||||
count += 1
|
||||
await asyncio.sleep(0)
|
||||
count += 1
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
async def test_taskgroup_cancel_before_create_task(self):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.cancel()
|
||||
# TODO: This behavior is not ideal. We'd rather have no exception
|
||||
# raised, and the child task run until the first await.
|
||||
with self.assertRaises(RuntimeError):
|
||||
tg.create_task(asyncio.sleep(1))
|
||||
|
||||
async def test_taskgroup_cancel_before_exception(self):
|
||||
async def raise_exc(parent_tg: asyncio.TaskGroup):
|
||||
parent_tg.cancel()
|
||||
raise RuntimeError
|
||||
|
||||
with self.assertRaises(ExceptionGroup):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(raise_exc(tg))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def test_taskgroup_cancel_after_exception(self):
|
||||
async def raise_exc(parent_tg: asyncio.TaskGroup):
|
||||
try:
|
||||
raise RuntimeError
|
||||
finally:
|
||||
parent_tg.cancel()
|
||||
|
||||
with self.assertRaises(ExceptionGroup):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.create_task(raise_exc(tg))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def test_taskgroup_body_cancel_before_exception(self):
|
||||
with self.assertRaises(ExceptionGroup):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
tg.cancel()
|
||||
raise RuntimeError
|
||||
|
||||
async def test_taskgroup_body_cancel_after_exception(self):
|
||||
with self.assertRaises(ExceptionGroup):
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
try:
|
||||
raise RuntimeError
|
||||
finally:
|
||||
tg.cancel()
|
||||
|
||||
async def test_taskgroup_cancel_one_winner(self):
|
||||
async def race(*fns):
|
||||
outcome = None
|
||||
async def run(fn):
|
||||
nonlocal outcome
|
||||
outcome = await fn()
|
||||
tg.cancel()
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for fn in fns:
|
||||
tg.create_task(run(fn))
|
||||
return outcome
|
||||
|
||||
event = asyncio.Event()
|
||||
record = []
|
||||
async def fn_1():
|
||||
record.append("1 started")
|
||||
await event.wait()
|
||||
record.append("1 finished")
|
||||
return 1
|
||||
|
||||
async def fn_2():
|
||||
record.append("2 started")
|
||||
await event.wait()
|
||||
record.append("2 finished")
|
||||
return 2
|
||||
|
||||
async def fn_3():
|
||||
record.append("3 started")
|
||||
event.set()
|
||||
await asyncio.sleep(10)
|
||||
record.append("3 finished")
|
||||
return 3
|
||||
|
||||
self.assertEqual(await race(fn_1, fn_2, fn_3), 1)
|
||||
self.assertListEqual(record, ["1 started", "2 started", "3 started", "1 finished"])
|
||||
|
||||
|
||||
class TestTaskGroup(BaseTestTaskGroup, unittest.IsolatedAsyncioTestCase):
|
||||
loop_factory = asyncio.EventLoop
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
Add :meth:`~asyncio.TaskGroup.cancel` which cancels unfinished tasks and exits the group without error.
|
||||
Reference in New Issue
Block a user