diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index fbc38026..14692a64 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -22,6 +22,17 @@ jobs: unittest: # The type of runner that the job will run on runs-on: ubuntu-latest + strategy: + matrix: + config: [latest] + include: + - config: latest + PYARROW_VERSION: "6.0.1" + NUMPY_VERSION: "1.21.5" + TF_VERSION: "2.8.0" + PYSPARK_VERSION: "3.0.0" + ARROW_PRE_0_15_IPC_FORMAT: "0" + PY: "3.9" # Steps represent a sequence of tasks that will be executed as part of the job steps: @@ -40,15 +51,15 @@ jobs: - name: build and run unit tests run: | sleep 30 - export PYARROW_VERSION="6.0.1" - export NUMPY_VERSION="1.21.5" - export TF_VERSION="2.8.0" - export PY="3.9" - export PYSPARK_VERSION="3.0.0" - export ARROW_PRE_0_15_IPC_FORMAT="0" + export PYARROW_VERSION=${{matrix.PYARROW_VERSION}} + export NUMPY_VERSION=${{matrix.NUMPY_VERSION}} + export TF_VERSION=${{matrix.TF_VERSION}} + export PY=${{matrix.PY}} + export PYSPARK_VERSION=${{matrix.PYSPARK_VERSION}} + export ARROW_PRE_0_15_IPC_FORMAT=${{matrix.ARROW_PRE_0_15_IPC_FORMAT}} export RUN="docker exec -e ARROW_PRE_0_15_IPC_FORMAT=$ARROW_PRE_0_15_IPC_FORMAT petastorm_ci bash /run_in_venv.sh ${PY}" export PYTEST="pytest --timeout=360 -v --color=yes --cov=./ --cov-report xml:coverage.xml" - $RUN pip install -U pip "setuptools<70" + $RUN pip install -U pip "setuptools<70" $RUN pip install -e /petastorm/[test,tf,torch,docs,opencv] $RUN pip install --upgrade numpy==$NUMPY_VERSION $RUN pip install -U pyarrow==${PYARROW_VERSION} tensorflow==${TF_VERSION} pyspark==${PYSPARK_VERSION} @@ -77,6 +88,12 @@ jobs: petastorm/tests/test_pytorch_utils.py $RUN $PYTEST -Y --cov-append petastorm/tests/test_tf_autograph.py + - name: codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + required: true + draft_release: needs: unittest # Only come with a tag @@ -129,4 +146,4 @@ jobs: uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ - password: ${{ secrets.PYPI_API_TOKEN }} + password: ${{ secrets.PYPI_API_TOKEN }} \ No newline at end of file diff --git a/petastorm/arrow_reader_worker.py b/petastorm/arrow_reader_worker.py index fedad01d..7959d9a0 100644 --- a/petastorm/arrow_reader_worker.py +++ b/petastorm/arrow_reader_worker.py @@ -101,7 +101,10 @@ def __init__(self, worker_id, publish_func, args): self._transformed_schema = args[7] self._arrow_filters = args[8] self._shuffle_rows = args[9] - self._random_state = np.random.RandomState(seed=args[10]) + self._random_seed = args[10] + + # Initialize random number generator + self._rng = np.random.default_rng(self._random_seed) if self._ngram: raise NotImplementedError('ngrams are not supported by ArrowReaderWorker') @@ -289,9 +292,19 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_ # pyarrow would fail if we request a column names that the dataset is partitioned by table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions) + + # Handle row shuffling based on shuffle_rows setting if self._shuffle_rows: - indices = self._random_state.permutation(table.num_rows) - table = table.take(indices) + if self._random_seed is not None and self._random_seed != 0: + # Deterministic randomization: use provided seed + indices = self._rng.permutation(table.num_rows) + else: + # Non-deterministic randomization: use np.random directly + indices = np.random.permutation(table.num_rows) + else: + # Deterministic natural order: shuffle_rows=False + indices = np.arange(table.num_rows) + table = table.take(indices) # Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns # requested, pyarrow will also return partition values. Having these unexpected fields will break some diff --git a/petastorm/reader.py b/petastorm/reader.py index 8fa69935..22f591eb 100644 --- a/petastorm/reader.py +++ b/petastorm/reader.py @@ -42,7 +42,7 @@ # Ventilator guarantees that no more than workers + _VENTILATE_EXTRA_ROWGROUPS are processed at a moment by a # worker pool. This guarantees that we don't run out of memory if data consumer is slower than the Reader. -_VENTILATE_EXTRA_ROWGROUPS = 2 +_VENTILATE_EXTRA_ROWGROUPS = 3 LOCAL_DISK_CACHE = 'local-disk' NULL_CACHE = 'null' @@ -475,7 +475,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None, self.ventilator = self._create_ventilator(filtered_row_group_indexes, shuffle_row_groups, normalized_shuffle_row_drop_partitions, self.num_epochs, worker_predicate, - self._workers_pool.workers_count + _VENTILATE_EXTRA_ROWGROUPS, + self._workers_pool.workers_count * (1 + _VENTILATE_EXTRA_ROWGROUPS), seed) # 5. Start workers pool diff --git a/petastorm/workers_pool/tests/test_workers_pool.py b/petastorm/workers_pool/tests/test_workers_pool.py index 142c608a..a3029476 100644 --- a/petastorm/workers_pool/tests/test_workers_pool.py +++ b/petastorm/workers_pool/tests/test_workers_pool.py @@ -141,15 +141,17 @@ def test_stop_when_result_queue_is_full(self): SLEEP_DELTA = 0.01 TIMEOUT = 20 QUEUE_SIZE = 2 + WORKERS_COUNT = 10 - pool = ThreadPool(10, results_queue_size=QUEUE_SIZE) + pool = ThreadPool(WORKERS_COUNT, results_queue_size=QUEUE_SIZE) pool.start(WorkerIdGeneratingWorker) - for _ in range(100): + for _ in range(1000): pool.ventilate() + expected_queue_size = WORKERS_COUNT * max(5, QUEUE_SIZE // WORKERS_COUNT) cumulative_wait = 0 - while pool.results_qsize() != QUEUE_SIZE: + while pool.results_qsize() != expected_queue_size: time.sleep(SLEEP_DELTA) cumulative_wait += SLEEP_DELTA # Make sure we wait no longer than the timeout. Otherwise, something is very wrong diff --git a/petastorm/workers_pool/thread_pool.py b/petastorm/workers_pool/thread_pool.py index 649aa77f..c7e6a46b 100644 --- a/petastorm/workers_pool/thread_pool.py +++ b/petastorm/workers_pool/thread_pool.py @@ -90,16 +90,21 @@ def __init__(self, workers_count, results_queue_size=50, profiling_enabled=False """ self._seed = random.randint(0, 100000) self._workers = [] - self._ventilator_queue = None + self._ventilator_queues = [] self.workers_count = workers_count self._results_queue_size = results_queue_size # Worker threads will watch this event and gracefully shutdown when the event is set self._stop_event = Event() self._profiling_enabled = profiling_enabled + # Count of items ventilated by the pool self._ventilated_items = 0 - self._ventilated_items_processed = 0 + # Count of items ventilated by each worker + self._ventilated_items_by_worker = [0 for _ in range(self.workers_count)] + # Count of items processed by each worker + self._ventilated_items_processed_by_worker = [0 for _ in range(self.workers_count)] self._ventilator = None + self._get_results_worker_id = 0 def start(self, worker_class, worker_args=None, ventilator=None): """Starts worker threads. @@ -115,14 +120,19 @@ class must implement :class:`.WorkerBase` protocol raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}' .format(len(self._workers), self._stop_event.is_set())) - # Set up a channel to send work - self._ventilator_queue = queue.Queue() - self._results_queue = queue.Queue(self._results_queue_size) + # Set up a channel for each worker to send work + self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)] + # Set up a channel for each worker to send results + self._results_queues = [queue.Queue(5) for _ in range(self.workers_count)] self._workers = [] for worker_id in range(self.workers_count): - worker_impl = worker_class(worker_id, self._stop_aware_put, worker_args) - new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queue, - self._results_queue, self._profiling_enabled) + # Create a closure that captures the worker_id for this specific worker + def make_publish_func(worker_id): + return lambda data: self._stop_aware_put(worker_id, data) + + worker_impl = worker_class(worker_id, make_publish_func(worker_id), worker_args) + new_thread = WorkerThread(worker_impl, self._stop_event, self._ventilator_queues[worker_id], + self._results_queues[worker_id], self._profiling_enabled) # Make the thread daemonic. Since it only reads it's ok to abort while running - no resource corruption # will occur. new_thread.daemon = True @@ -139,8 +149,22 @@ class must implement :class:`.WorkerBase` protocol def ventilate(self, *args, **kargs): """Sends a work item to a worker process. Will result in ``worker.process(...)`` call with arbitrary arguments. """ + current_worker_id = self._ventilated_items % self.workers_count self._ventilated_items += 1 - self._ventilator_queue.put((args, kargs)) + self._ventilated_items_by_worker[current_worker_id] += 1 + self._ventilator_queues[current_worker_id].put((args, kargs)) + + def current_worker_done(self, worker_id): + # Check if the current worker has processed all the items it was assigned and if the results queue is empty + return (self._ventilated_items_processed_by_worker[worker_id] == self._ventilated_items_by_worker[worker_id] + and self._results_queues[worker_id].empty()) + + def all_workers_done(self): + # Check if all workers have processed all the items they were assigned and if the results queues are empty + for i in range(self.workers_count): + if not self.current_worker_done(i): + return False + return True def get_results(self): """Returns results from worker pool or re-raise worker's exception if any happen in worker thread. @@ -151,20 +175,28 @@ def get_results(self): :return: arguments passed to ``publish_func(...)`` by a worker. If no more results are anticipated, :class:`.EmptyResultError`. """ - while True: # If there is no more work to do, raise an EmptyResultError - if self._results_queue.empty() and self._ventilated_items == self._ventilated_items_processed: + if self.all_workers_done(): # We also need to check if we are using a ventilator and if it is completed if not self._ventilator or self._ventilator.completed(): raise EmptyResultError() + # If the current worker is done, we need to get the result from the next worker + if self.current_worker_done(self._get_results_worker_id): + self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count + continue + try: - result = self._results_queue.get(timeout=_VERIFY_END_OF_VENTILATION_PERIOD) + result = self._results_queues[self._get_results_worker_id].get( + block=True, timeout=_VERIFY_END_OF_VENTILATION_PERIOD + ) if isinstance(result, VentilatedItemProcessedMessage): - self._ventilated_items_processed += 1 + self._ventilated_items_processed_by_worker[self._get_results_worker_id] += 1 if self._ventilator: self._ventilator.processed_item() + # Move to the next worker + self._get_results_worker_id = (self._get_results_worker_id + 1) % self.workers_count continue elif isinstance(result, Exception): self.stop() @@ -197,7 +229,7 @@ def join(self): stats = pstats.Stats(w.prof) stats.sort_stats('cumulative').print_stats() - def _stop_aware_put(self, data): + def _stop_aware_put(self, worker_id, data): """This method is called to write the results to the results queue. We use ``put`` in a non-blocking way so we can gracefully terminate the worker thread without being stuck on :func:`Queue.put`. @@ -205,7 +237,7 @@ def _stop_aware_put(self, data): :func:`WorkerThread.run` which will gracefully terminate main worker loop.""" while True: try: - self._results_queue.put(data, block=True, timeout=IO_TIMEOUT_INTERVAL_S) + self._results_queues[worker_id].put(data, block=True, timeout=IO_TIMEOUT_INTERVAL_S) return except queue.Full: pass @@ -214,7 +246,7 @@ def _stop_aware_put(self, data): raise WorkerTerminationRequested() def results_qsize(self): - return self._results_queue.qsize() + return sum(queue.qsize() for queue in self._results_queues) @property def diagnostics(self): diff --git a/petastorm/workers_pool/ventilator.py b/petastorm/workers_pool/ventilator.py index 0f26bec1..9f6bfb4f 100644 --- a/petastorm/workers_pool/ventilator.py +++ b/petastorm/workers_pool/ventilator.py @@ -98,7 +98,8 @@ def __init__(self, self._items_to_ventilate = items_to_ventilate self._iterations_remaining = iterations self._randomize_item_order = randomize_item_order - self._random_state = np.random.RandomState(seed=random_seed) + self._random_seed = random_seed + self._rng = np.random.default_rng(self._random_seed) self._iterations = iterations # For the default max ventilation queue size we will use the size of the items to ventilate @@ -136,15 +137,20 @@ def reset(self): self.start() def _ventilate(self): + # Randomize the item order before starting the ventilation if randomize_item_order is set + if self._randomize_item_order: + if self._random_seed is not None and self._random_seed != 0: + # Deterministic randomization: use provided seed + self._items_to_ventilate = list(self._rng.permutation(self._items_to_ventilate)) + else: + # Non-deterministic randomization: use np.random + self._items_to_ventilate = list(np.random.permutation(self._items_to_ventilate)) + while True: # Stop condition is when no iterations are remaining or there are no items to ventilate if self.completed(): break - # If we are ventilating the first item, we check if we would like to randomize the item order - if self._current_item_to_ventilate == 0 and self._randomize_item_order: - self._random_state.shuffle(self._items_to_ventilate) - # Block until queue has room, but use continue to allow for checking if stop has been called if self._ventilated_items_count - self._processed_items_count >= self._max_ventilation_queue_size: sleep(self._ventilation_interval) diff --git a/setup.py b/setup.py index 0d06ab20..ab69bbf8 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ 'psutil>=4.0.0', 'pyspark>=2.1.0', 'pyzmq>=14.0.0', - 'pyarrow>=0.17.1', + 'pyarrow>=6.0.1', 'six>=1.5.0', 'fsspec', 'setuptools<70', # Prevent compatibility issues with newer setuptools