Skip to content

Commit 19ddd2e

Browse files
committed
Issue-124397: Add free-threading support for iterators.
1 parent b168865 commit 19ddd2e

3 files changed

Lines changed: 381 additions & 0 deletions

File tree

Doc/library/threading.rst

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,3 +1436,138 @@ is equivalent to::
14361436
Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
14371437
:class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
14381438
:keyword:`with` statement context managers.
1439+
1440+
1441+
Iterator synchronization
1442+
------------------------
1443+
1444+
By default, Python iterators do not support concurrent use. Most iterators make
1445+
no guarantees when accessed simultaneously from multiple threads. Generator
1446+
iterators, for example, raise :exc:`ValueError` if one of their iterator methods
1447+
is called while the generator is already executing. The tools in this section
1448+
allow reliable concurrency support to be added to ordinary iterators and
1449+
iterator-producing callables.
1450+
1451+
Use :class:`serialize` when multiple threads should share a single iterator and
1452+
take turns consuming from it. While one thread is running ``__next__()``, the
1453+
others block until the iterator becomes available. Each value produced by the
1454+
underlying iterator is delivered to exactly one caller.
1455+
1456+
Use :func:`concurrent_tee` when multiple threads should each receive the full
1457+
stream of values from one underlying iterator. It creates independent iterators
1458+
that all draw from the same source. Values are buffered until every derived
1459+
iterator has received them.
1460+
1461+
.. class:: serialize(iterable)
1462+
1463+
Return an iterator wrapper that serializes concurrent calls to
1464+
:meth:`~iterator.__next__` using a lock.
1465+
1466+
This makes it possible to share a single iterator, including a generator
1467+
iterator, between multiple threads. Calls are handled one at a time in
1468+
arrival order determined by lock acquisition. No values are duplicated or
1469+
skipped by the wrapper itself; each item produced by the underlying iterator
1470+
is returned to exactly one caller.
1471+
1472+
This wrapper does not copy or buffer values. Threads that call
1473+
:func:`next` while another thread is already advancing the iterator will
1474+
block until the active call completes.
1475+
1476+
Example::
1477+
1478+
import threading
1479+
1480+
def count():
1481+
for i in range(5):
1482+
yield i
1483+
1484+
it = threading.serialize(count())
1485+
1486+
def worker():
1487+
for item in it:
1488+
print(threading.current_thread().name, item)
1489+
1490+
threads = [threading.Thread(target=worker) for _ in range(2)]
1491+
for thread in threads:
1492+
thread.start()
1493+
for thread in threads:
1494+
thread.join()
1495+
1496+
In this example, each number is printed exactly once, but the work is shared
1497+
between the two threads.
1498+
1499+
.. function:: synchronized(func)
1500+
1501+
Wrap an iterator-producing callable so that each iterator it returns is
1502+
automatically passed through :class:`serialize`.
1503+
1504+
This is especially useful as a decorator for generator functions that may be
1505+
consumed from multiple threads.
1506+
1507+
Example::
1508+
1509+
import threading
1510+
1511+
@threading.synchronized
1512+
def counter():
1513+
i = 0
1514+
while True:
1515+
yield i
1516+
i += 1
1517+
1518+
it = counter()
1519+
1520+
def worker():
1521+
for _ in range(3):
1522+
print(next(it))
1523+
1524+
threads = [threading.Thread(target=worker) for _ in range(2)]
1525+
for thread in threads:
1526+
thread.start()
1527+
for thread in threads:
1528+
thread.join()
1529+
1530+
The returned wrapper preserves the metadata of *func*, such as its name and
1531+
wrapped function reference.
1532+
1533+
.. function:: concurrent_tee(iterable, n=2)
1534+
1535+
Return *n* independent iterators from a single input *iterable*, with
1536+
guaranteed behavior when the derived iterators are consumed concurrently.
1537+
1538+
This function is similar to :func:`itertools.tee`, but is intended for cases
1539+
where the source iterator may feed consumers running in different threads.
1540+
Each returned iterator yields every value from the underlying iterable, in
1541+
the same order.
1542+
1543+
Internally, values are buffered until every derived iterator has consumed
1544+
them. As a result, if one consumer falls far behind the others, the buffer
1545+
may grow without bound.
1546+
1547+
The returned iterators share the same underlying synchronization lock. Each
1548+
individual derived iterator is intended to be consumed by one thread at a
1549+
time. If a single derived iterator must itself be shared by multiple
1550+
threads, wrap it with :class:`serialize`.
1551+
1552+
If *n* is ``0``, return an empty tuple. If *n* is negative, raise
1553+
:exc:`ValueError`.
1554+
1555+
Example::
1556+
1557+
import threading
1558+
1559+
source = range(5)
1560+
left, right = threading.concurrent_tee(source)
1561+
1562+
def consume(name, iterable):
1563+
for item in iterable:
1564+
print(name, item)
1565+
1566+
t1 = threading.Thread(target=consume, args=("left", left))
1567+
t2 = threading.Thread(target=consume, args=("right", right))
1568+
t1.start()
1569+
t2.start()
1570+
t1.join()
1571+
t2.join()
1572+
1573+
Here, both threads see the full sequence ``0`` through ``4``.

Lib/test/test_threading.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,145 @@ class BarrierTests(lock_tests.BarrierTests):
23682368
barriertype = staticmethod(threading.Barrier)
23692369

23702370

2371+
## Test Synchronization tools for iterators ################
2372+
2373+
class ThreadingIteratorToolsTests(BaseTestCase):
2374+
def test_serialize_serializes_concurrent_iteration(self):
2375+
limit = 10_000
2376+
workers_count = 10
2377+
result = 0
2378+
result_lock = threading.Lock()
2379+
start = threading.Event()
2380+
2381+
def producer(limit):
2382+
for x in range(limit):
2383+
yield x
2384+
2385+
def consumer(iterator):
2386+
nonlocal result
2387+
start.wait()
2388+
total = 0
2389+
for x in iterator:
2390+
total += x
2391+
with result_lock:
2392+
result += total
2393+
2394+
iterator = threading.serialize(producer(limit))
2395+
workers = [
2396+
threading.Thread(target=consumer, args=(iterator,))
2397+
for _ in range(workers_count)
2398+
]
2399+
with threading_helper.wait_threads_exit():
2400+
for worker in workers:
2401+
worker.start()
2402+
start.set()
2403+
for worker in workers:
2404+
worker.join()
2405+
2406+
self.assertEqual(result, limit * (limit - 1) // 2)
2407+
2408+
def test_synchronized_serializes_generator_instances(self):
2409+
unique = 10
2410+
repetitions = 5
2411+
limit = 100
2412+
start = threading.Event()
2413+
2414+
@threading.synchronized
2415+
def atomic_counter():
2416+
# The sleep widens the race window that would exist without
2417+
# synchronization between yielding a value and advancing state.
2418+
i = 0
2419+
while True:
2420+
yield i
2421+
time.sleep(0.0005)
2422+
i += 1
2423+
2424+
def consumer(counter):
2425+
start.wait()
2426+
for _ in range(limit):
2427+
next(counter)
2428+
2429+
unique_counters = [atomic_counter() for _ in range(unique)]
2430+
counters = unique_counters * repetitions
2431+
workers = [
2432+
threading.Thread(target=consumer, args=(counter,))
2433+
for counter in counters
2434+
]
2435+
with threading_helper.wait_threads_exit():
2436+
for worker in workers:
2437+
worker.start()
2438+
start.set()
2439+
for worker in workers:
2440+
worker.join()
2441+
2442+
self.assertEqual(
2443+
{next(counter) for counter in unique_counters},
2444+
{limit * repetitions},
2445+
)
2446+
2447+
def test_synchronized_preserves_wrapped_metadata(self):
2448+
def gen():
2449+
yield 1
2450+
2451+
wrapped = threading.synchronized(gen)
2452+
2453+
self.assertEqual(wrapped.__name__, gen.__name__)
2454+
self.assertIs(wrapped.__wrapped__, gen)
2455+
self.assertEqual(list(wrapped()), [1])
2456+
2457+
def test_concurrent_tee_supports_concurrent_consumers(self):
2458+
limit = 5_000
2459+
num_threads = 25
2460+
successes = 0
2461+
failures = []
2462+
result_lock = threading.Lock()
2463+
start = threading.Event()
2464+
expected = list(range(limit))
2465+
2466+
def producer(limit):
2467+
for x in range(limit):
2468+
yield x
2469+
2470+
def consumer(iterator):
2471+
nonlocal successes
2472+
start.wait()
2473+
items = list(iterator)
2474+
with result_lock:
2475+
if items == expected:
2476+
successes += 1
2477+
else:
2478+
failures.append(items[:20])
2479+
2480+
tees = threading.concurrent_tee(producer(limit), n=num_threads)
2481+
workers = [
2482+
threading.Thread(target=consumer, args=(iterator,))
2483+
for iterator in tees
2484+
]
2485+
with threading_helper.wait_threads_exit():
2486+
for worker in workers:
2487+
worker.start()
2488+
start.set()
2489+
for worker in workers:
2490+
worker.join()
2491+
2492+
self.assertEqual(failures, [])
2493+
self.assertEqual(successes, len(tees))
2494+
2495+
# Verify that locks are shared
2496+
self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1)
2497+
2498+
def test_concurrent_tee_zero_iterators(self):
2499+
self.assertEqual(threading.concurrent_tee(range(10), n=0), ())
2500+
2501+
def test_concurrent_tee_negative_n(self):
2502+
with self.assertRaises(ValueError):
2503+
threading.concurrent_tee(range(10), n=-1)
2504+
2505+
2506+
#################
2507+
2508+
2509+
23712510
class MiscTestCase(unittest.TestCase):
23722511
def test__all__(self):
23732512
restore_default_excepthook(self)

0 commit comments

Comments
 (0)