Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
19ddd2e
Issue-124397: Add free-threading support for iterators.
rhettinger Apr 22, 2026
4aa242d
Add blurb
rhettinger Apr 23, 2026
e0c44be
More wordsmithing
rhettinger Apr 23, 2026
adcb718
Clarify use of the lock. Add message to the ValueError.
rhettinger Apr 23, 2026
4c2bad0
Include "threading" in the reference
rhettinger Apr 23, 2026
d9dde84
Support send(), throw(), and close() for generators.
rhettinger Apr 23, 2026
fcb9ee8
Tweak wording. Add doctest.
rhettinger Apr 23, 2026
314ec67
Merge branch 'main' into iterator_synchronization
rhettinger Apr 23, 2026
802c1e8
Adopt GPS suggestion to make the instance variables private.
rhettinger Apr 28, 2026
201bc76
Adopt GPS suggestion to add an example to the docstring
rhettinger Apr 28, 2026
2384f46
Add test for exceptions in next() calls
rhettinger Apr 28, 2026
8e291b4
Adopt suggestion for more iterator specific names
rhettinger Apr 30, 2026
adfad71
Test the code blocks
rhettinger Apr 30, 2026
b6601a1
Adopt Colesbury suggestion for a generator example
rhettinger Apr 30, 2026
7df1ef7
Use new name in class reference
rhettinger Apr 30, 2026
4efe3f5
Add whatsnew entry
rhettinger Apr 30, 2026
fa733cf
Adopt Peter's suggestion to not use a lazy import
rhettinger May 1, 2026
0e207fb
Note version added
rhettinger May 1, 2026
4802517
Harmonize the examples into parallel form for easy comparison
rhettinger May 1, 2026
06ed996
Update Lib/threading.py
rhettinger May 1, 2026
a81276a
Update Lib/threading.py
rhettinger May 1, 2026
0c0471f
Update Doc/library/threading.rst
rhettinger May 1, 2026
2563d26
Merge branch 'main' into iterator_synchronization
rhettinger May 1, 2026
ed5dd0f
Defer the import of functools
rhettinger May 1, 2026
fbae726
Adopt suggestion from Daniele to verify the worker threads actually s…
rhettinger May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1436,3 +1436,147 @@ is equivalent to::
Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
:class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
:keyword:`with` statement context managers.


Iterator synchronization
------------------------

By default, Python iterators do not support concurrent access. Most iterators make
no guarantees when accessed simultaneously from multiple threads. Generator
iterators, for example, raise :exc:`ValueError` if one of their iterator methods
is called while the generator is already executing. The tools in this section
allow reliable concurrency support to be added to ordinary iterators and
iterator-producing callables.

The :class:`serialize_iterator` wrapper lets multiple threads share a single iterator and
take turns consuming from it. While one thread is running ``__next__()``, the
others block until the iterator becomes available. Each value produced by the
underlying iterator is delivered to exactly one caller.

The :func:`concurrent_tee` function lets multiple threads each receive the full
stream of values from one underlying iterator. It creates independent iterators
that all draw from the same source. Values are buffered until consumed by all
of the derived iterators.

.. class:: serialize_iterator(iterable)

Return an iterator wrapper that serializes concurrent calls to
:meth:`~iterator.__next__` using a lock.

If the wrapped iterator also defines :meth:`~generator.send`,
:meth:`~generator.throw`, or :meth:`~generator.close`, those calls
are serialized as well.

This makes it possible to share a single iterator, including a generator
iterator, between multiple threads. A lock assures that calls are handled
Comment thread
rhettinger marked this conversation as resolved.
Outdated
one at a time. No values are duplicated or skipped by the wrapper itself.
Each item from the underlying iterator is given to exactly one caller.

This wrapper does not copy or buffer values. Threads that call
:func:`next` while another thread is already advancing the iterator will
block until the active call completes.

Example:

.. code-block:: python

import threading
Comment thread
rhettinger marked this conversation as resolved.

def count():
for i in range(5):
yield i

it = threading.serialize_iterator(count())

def worker():
for item in it:
print(threading.current_thread().name, item)

threads = [threading.Thread(target=worker) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

In this example, each number is printed exactly once, but the work is shared
between the two threads.

.. function:: synchronized_iterator(func)
Comment thread
rhettinger marked this conversation as resolved.

Wrap an iterator-producing callable so that each iterator it returns is
automatically passed through :class:`serialize_iterator`.

This is especially useful as a :term:`decorator` for generator functions,
allowing their generator-iterators to be consumed from multiple threads.

Example:

.. code-block:: python

import threading

@threading.synchronized_iterator
def counter():
i = 0
while True:
yield i
i += 1

it = counter()

def worker():
for _ in range(5):
print(next(it))

threads = [threading.Thread(target=worker) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

The returned wrapper preserves the metadata of *func*, such as its name and
wrapped function reference.

.. function:: concurrent_tee(iterable, n=2)

Return *n* independent iterators from a single input *iterable*, with
guaranteed behavior when the derived iterators are consumed concurrently.

This function is similar to :func:`itertools.tee`, but is intended for cases
where the source iterator may feed consumers running in different threads.
Each returned iterator yields every value from the underlying iterable, in
the same order.

Internally, values are buffered until every derived iterator has consumed
them.

The returned iterators share the same underlying synchronization lock. Each
individual derived iterator is intended to be consumed by one thread at a
time. If a single derived iterator must itself be shared by multiple
threads, wrap it with :class:`serialize_iterator`.

If *n* is ``0``, return an empty tuple. If *n* is negative, raise
:exc:`ValueError`.

Example:

.. code-block:: python

import threading

source = (x**2 for x in range(5))
left, right = threading.concurrent_tee(source)

def consume(name, iterable):
for item in iterable:
print(name, item)

t1 = threading.Thread(target=consume, args=("left", left))
t2 = threading.Thread(target=consume, args=("right", right))
t1.start()
t2.start()
t1.join()
t2.join()

In this example, both consumer threads see the full sequence of squares
from a single generator expression.
10 changes: 10 additions & 0 deletions Doc/whatsnew/3.15.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,16 @@ tarfile
(Contributed by Christoph Walcher in :gh:`57911`.)


threading
---------

* Added :class:`~threading.serialize_iterator`,
:func:`~threading.synchronized_iterator`,
and :func:`~threading.concurrent_tee` to support concurrent access to
generators and iterators.
(Contributed by Raymond Hettinger in :gh:`124397`.)


timeit
------

Expand Down
221 changes: 221 additions & 0 deletions Lib/test/test_threading.py
Comment thread
rhettinger marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,227 @@ class BarrierTests(lock_tests.BarrierTests):
barriertype = staticmethod(threading.Barrier)


## Test Synchronization tools for iterators ################

class ThreadingIteratorToolsTests(BaseTestCase):
def test_serialize_serializes_concurrent_iteration(self):
limit = 10_000
workers_count = 10
result = 0
result_lock = threading.Lock()
start = threading.Event()

def producer(limit):
for x in range(limit):
yield x

def consumer(iterator):
nonlocal result
start.wait()
total = 0
for x in iterator:
total += x
with result_lock:
result += total

iterator = threading.serialize_iterator(producer(limit))
workers = [
threading.Thread(target=consumer, args=(iterator,))
for _ in range(workers_count)
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
Comment thread
rhettinger marked this conversation as resolved.
for worker in workers:
worker.join()
Comment thread
rhettinger marked this conversation as resolved.

self.assertEqual(result, limit * (limit - 1) // 2)

def test_serialize_generator_methods(self):
# A generator that yields and receives
def echo():
try:
while True:
val = yield "ready"
yield f"received {val}"
except ValueError:
yield "caught"

it = threading.serialize_iterator(echo())

# Test __next__
self.assertEqual(next(it), "ready")

# Test send()
self.assertEqual(it.send("hello"), "received hello")
self.assertEqual(next(it), "ready")

# Test throw()
self.assertEqual(it.throw(ValueError), "caught")

# Test close()
it.close()
with self.assertRaises(StopIteration):
next(it)

def test_serialize_methods_attribute_error(self):
# A standard iterator that does not have send/throw/close
# should raise AttributeError when called.
standard_it = threading.serialize_iterator([1, 2, 3])

with self.assertRaises(AttributeError):
standard_it.send("foo")

with self.assertRaises(AttributeError):
standard_it.throw(ValueError)

with self.assertRaises(AttributeError):
standard_it.close()

def test_serialize_generator_methods_locking(self):
# Verifies that generator methods also acquire the lock.
# We can test this by checking if the lock is held during the call.

class LockCheckingGenerator:
def __init__(self, lock):
self.lock = lock
def __iter__(self):
return self
def send(self, value):
if not self.lock.locked():
raise RuntimeError("Lock not held during send()")
return value
def throw(self, *args):
if not self.lock.locked():
raise RuntimeError("Lock not held during throw()")
def close(self):
if not self.lock.locked():
raise RuntimeError("Lock not held during close()")

# Manually create the serialize object to inspect the lock
it = threading.serialize_iterator([])
mock_gen = LockCheckingGenerator(it._lock)
it._iterator = mock_gen

# These should not raise RuntimeError
it.send(1)
it.throw(ValueError)
it.close()

def test_serialize_next_exception(self):
# Verify exception pass through for calls to next()

def f():
raise RuntimeError
yield None

g = threading.serialize_iterator(f())
with self.assertRaises(RuntimeError):
next(g)

def test_synchronized_serializes_generator_instances(self):
unique = 10
repetitions = 5
limit = 100
start = threading.Event()

@threading.synchronized_iterator
def atomic_counter():
# The sleep widens the race window that would exist without
# synchronization between yielding a value and advancing state.
i = 0
while True:
yield i
time.sleep(0.0005)
i += 1

def consumer(counter):
start.wait()
for _ in range(limit):
next(counter)

unique_counters = [atomic_counter() for _ in range(unique)]
counters = unique_counters * repetitions
workers = [
threading.Thread(target=consumer, args=(counter,))
for counter in counters
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
for worker in workers:
worker.join()

self.assertEqual(
{next(counter) for counter in unique_counters},
{limit * repetitions},
)

def test_synchronized_preserves_wrapped_metadata(self):
def gen():
yield 1

wrapped = threading.synchronized_iterator(gen)

self.assertEqual(wrapped.__name__, gen.__name__)
self.assertIs(wrapped.__wrapped__, gen)
self.assertEqual(list(wrapped()), [1])

def test_concurrent_tee_supports_concurrent_consumers(self):
limit = 5_000
num_threads = 25
successes = 0
failures = []
result_lock = threading.Lock()
start = threading.Event()
expected = list(range(limit))

def producer(limit):
for x in range(limit):
yield x

def consumer(iterator):
nonlocal successes
start.wait()
items = list(iterator)
with result_lock:
if items == expected:
successes += 1
else:
failures.append(items[:20])

tees = threading.concurrent_tee(producer(limit), n=num_threads)
workers = [
threading.Thread(target=consumer, args=(iterator,))
for iterator in tees
]
with threading_helper.wait_threads_exit():
for worker in workers:
worker.start()
start.set()
for worker in workers:
worker.join()

self.assertEqual(failures, [])
self.assertEqual(successes, len(tees))

# Verify that locks are shared
self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1)

def test_concurrent_tee_zero_iterators(self):
self.assertEqual(threading.concurrent_tee(range(10), n=0), ())

def test_concurrent_tee_negative_n(self):
with self.assertRaises(ValueError):
threading.concurrent_tee(range(10), n=-1)


#################



class MiscTestCase(unittest.TestCase):
def test__all__(self):
restore_default_excepthook(self)
Expand Down
Loading
Loading