Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 2 additions & 45 deletions python/cudf_polars/cudf_polars/testing/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@
"assert_sink_result_equal",
]

# Will be overriden by `conftest.py` with the value from the `--executor`
# command-line argument.
DEFAULT_EXECUTOR = "in-memory"


def assert_gpu_result_equal(
lazydf: pl.LazyFrame,
*,
engine: GPUEngine | None = None,
engine: GPUEngine,
collect_kwargs: CollectKwargs | None = None,
polars_collect_kwargs: CollectKwargs | None = None,
cudf_collect_kwargs: CollectKwargs | None = None,
Expand All @@ -45,7 +41,6 @@ def assert_gpu_result_equal(
rtol: float = 1e-05,
atol: float = 1e-08,
categorical_as_str: bool = False,
executor: str | None = None,
) -> None:
"""
Assert that collection of a lazyframe on GPU produces correct results.
Expand Down Expand Up @@ -83,9 +78,6 @@ def assert_gpu_result_equal(
Absolute tolerance for float comparisons
categorical_as_str
Decat categoricals to strings before comparing
executor
The executor configuration to pass to `GPUEngine`. If not specified
uses the module level `Executor` attribute.

Raises
------
Expand All @@ -94,7 +86,6 @@ def assert_gpu_result_equal(
NotImplementedError
If GPU collection failed in some way.
"""
engine = engine or get_default_engine(executor)
final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs(
collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs
)
Expand Down Expand Up @@ -167,35 +158,6 @@ def assert_ir_translation_raises(q: pl.LazyFrame, *exceptions: type[Exception])
raise AssertionError(f"Translation DID NOT RAISE {exceptions}")


def get_default_engine(
executor: str | None = None,
) -> GPUEngine:
"""
Get the default engine used for testing.

Parameters
----------
executor
The executor configuration to pass to `GPUEngine`. If not specified
uses the module level `Executor` attribute.

Returns
-------
engine
A polars GPUEngine configured with the default settings for tests.

See Also
--------
assert_gpu_result_equal
assert_sink_result_equal
"""
executor = executor or DEFAULT_EXECUTOR
return GPUEngine(
raise_on_fail=True,
executor=executor,
)


def _process_kwargs(
collect_kwargs: CollectKwargs | None,
polars_collect_kwargs: CollectKwargs | None,
Expand Down Expand Up @@ -311,10 +273,9 @@ def assert_sink_result_equal(
lazydf: pl.LazyFrame,
path: str | Path,
*,
engine: str | GPUEngine | None = None,
engine: GPUEngine,
read_kwargs: dict | None = None,
write_kwargs: dict | None = None,
executor: str | None = None,
) -> None:
"""
Assert that writing a LazyFrame via sink produces the same output.
Expand All @@ -332,9 +293,6 @@ def assert_sink_result_equal(
Optional keyword arguments to pass to the corresponding `pl.read_*` function.
write_kwargs
Optional keyword arguments to pass to the corresponding `sink_*` function.
executor
The executor configuration to pass to `GPUEngine`. If not specified
uses the module level `Executor` attribute.

Raises
------
Expand All @@ -343,7 +301,6 @@ def assert_sink_result_equal(
ValueError
If the file extension is not one of the supported formats.
"""
engine = engine or get_default_engine(executor)
path = Path(path)
read_kwargs = read_kwargs or {}
write_kwargs = write_kwargs or {}
Expand Down
81 changes: 37 additions & 44 deletions python/cudf_polars/cudf_polars/testing/engine_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@

import importlib.util
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any, Literal, TypeVar

if TYPE_CHECKING:
from collections.abc import Mapping
from contextlib import AbstractContextManager

import polars as pl
Expand Down Expand Up @@ -103,7 +102,6 @@ def warns_on_spmd( # pragma: no cover; rapidsmpf-only path

def create_streaming_options(
blocksize_mode: Literal["medium", "small"],
overrides: StreamingOptions | None = None,
) -> StreamingOptions:
"""
Create :class:`StreamingOptions` for a block size mode.
Expand All @@ -114,13 +112,10 @@ def create_streaming_options(
Block size configuration. ``"medium"`` uses moderate partition sizes,
while ``"small"`` uses very small partitions and sets
``fallback_mode=SILENT`` to avoid excessive warnings from CPU fallback.
overrides
Optional options to merge on top of the selected baseline. Fields in
``overrides`` take precedence over the baseline.

Returns
-------
The merged streaming options.
The streaming options for the given block size.
"""
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions
from cudf_polars.utils.config import StreamingFallbackMode
Expand All @@ -130,15 +125,15 @@ def create_streaming_options(
# the UUID-collision guard on every ``_reset(...)``.
match blocksize_mode:
case "medium":
baseline = StreamingOptions(
return StreamingOptions(
max_rows_per_partition=50,
dynamic_planning={},
target_partition_size=1_000_000,
raise_on_fail=True,
allow_gpu_sharing=True,
)
case "small":
baseline = StreamingOptions(
return StreamingOptions(
max_rows_per_partition=4,
dynamic_planning={},
target_partition_size=10,
Expand All @@ -148,54 +143,52 @@ def create_streaming_options(
)
case _: # pragma: no cover
raise ValueError(f"Unknown blocksize_mode: {blocksize_mode!r}")
if overrides is None:
return baseline
return StreamingOptions(**{**baseline.to_dict(), **overrides.to_dict()})


def build_streaming_engine(
param: EngineFixtureParam,
engines: Mapping[str, StreamingEngine],
options: StreamingOptions | None = None,
) -> StreamingEngine:
def merge_streaming_options(
base: StreamingOptions, overrides: StreamingOptions
) -> StreamingOptions:
"""
Return ``engines``'s entry for ``param``, ``_reset``-ed.
Merge override options into the base streaming options.

``engines`` must already contain a slot for ``param.engine_name`` —
seeded by the ``streaming_engines`` session-scoped fixture. The
fixture owns mutation; this function only reads and ``_reset``-s.
Parameters
----------
base
The base streaming options.
overrides
Any additional streaming options.

Returns
-------
The merged streaming options with overrides overriding any base options.
"""
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions

return StreamingOptions(**{**base.to_dict(), **overrides.to_dict()})


EngineT = TypeVar("EngineT", bound="StreamingEngine")


def configure_streaming_engine(engine: EngineT, options: StreamingOptions) -> EngineT:
"""
Configure an engine with a set of options.

Parameters
----------
param
Decoded engine fixture parameter describing the backend and block size mode.
engines
Streaming-engine collection keyed by backend name. Provided by
the ``streaming_engines`` test fixture.
engine
Streaming engine to configure. The caller owns its lifecycle.
options
Optional streaming options to merge on top of the baseline selected by
``param.blocksize_mode``.
Configuration options to apply to the engine.

Returns
-------
The shared :class:`StreamingEngine`, ``_reset`` to the requested options.

Raises
------
RuntimeError
If ``engines`` has no slot for ``param.engine_name``.
``engine``, reset to the requested options.
"""
streaming_options = create_streaming_options(param.blocksize_mode, options)
engine = engines.get(param.engine_name)
if engine is None: # pragma: no cover
raise RuntimeError(
f"No streaming engine for {param.engine_name!r}. The corresponding "
"session-scoped fixture must populate the collection before tests run."
)
engine._reset(
rapidsmpf_options=streaming_options.to_rapidsmpf_options(),
executor_options=streaming_options.to_executor_options(),
engine_options=streaming_options.to_engine_options(),
rapidsmpf_options=options.to_rapidsmpf_options(),
executor_options=options.to_executor_options(),
engine_options=options.to_engine_options(),
)
return engine

Expand Down
Loading
Loading