Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from cudf_polars.experimental.explain import SerializablePlan
from cudf_polars.experimental.rapidsmpf.frontend.core import StreamingEngine
from cudf_polars.experimental.rapidsmpf.frontend.options import StreamingOptions

POLARS_VALIDATION_OPTIONS = {
"check_row_order": True,
"check_column_order": True,
Expand Down Expand Up @@ -544,7 +545,7 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig:
duckdb_temp_dir=args.duckdb_temp_dir,
)

def serialize(self, engine: pl.GPUEngine | None) -> dict:
def serialize(self, engine: StreamingEngine | None) -> dict:
"""Serialize the run config to a dictionary."""
opts = self.streaming_options
result: dict[str, Any] = {
Expand Down Expand Up @@ -583,7 +584,21 @@ def serialize(self, engine: pl.GPUEngine | None) -> dict:
}
if engine is not None:
config_options = ConfigOptions.from_polars_engine(engine)
result["config_options"] = dataclasses.asdict(config_options)
# Drop non-serializable contexts.
config_options = dataclasses.replace(
config_options,
executor=dataclasses.replace(
config_options.executor,
spmd_context=None,
ray_context=None,
dask_context=None,
),
)
rapidsmpf_options = engine.rapidsmpf_options.get_strings()
result["config_options"] = {
"config_options": dataclasses.asdict(config_options),
"rapidsmpf_options": rapidsmpf_options,
}
Comment on lines +597 to +601
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Keep serialized RapidsMPF options in sync with engine resets.

serialize() now trusts engine.rapidsmpf_options, but the frontend _reset() paths rebuild their live Context from new Options objects without updating that cached attribute. After a reset, this JSON will report the old settings instead of the ones actually running. Please either refresh self.rapidsmpf_options in each _reset() or serialize from the same freshly resolved options object used by the reset path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 597 - 601, The serialized RapidsMPF options are stale because
serialize() reads engine.rapidsmpf_options while the frontend _reset() path
rebuilds the live Context from new Options without updating that cached
attribute; fix by ensuring the source of truth is consistent — either update
self.rapidsmpf_options inside every frontend _reset() (the methods that rebuild
Context/Options) to reflect the newly resolved Options, or change serialize() to
read/serialize from the same freshly resolved Options object used in the reset
path (the local variable passed into Context rebuild) instead of
engine.rapidsmpf_options; reference methods: serialize(), _reset(),
engine.rapidsmpf_options, self.rapidsmpf_options, Context, Options.

return result

def summarize(self) -> None:
Expand Down Expand Up @@ -1060,6 +1075,7 @@ def _finalize_benchmark_run(
run_config: RunConfig,
validation_failures: list[int],
query_failures: list[tuple[int, int]],
engine: StreamingEngine | None,
) -> None:
"""Summarize, serialize, and exit after a benchmark run."""
if args.summarize:
Expand All @@ -1074,7 +1090,7 @@ def _finalize_benchmark_run(
)
else:
print("✅ All validated queries passed.")
args.output.write(json.dumps(run_config.serialize(engine=None)))
args.output.write(json.dumps(run_config.serialize(engine=engine)))
args.output.write("\n")
sys.exit(1 if (query_failures or validation_failures) else 0)

Expand Down Expand Up @@ -1133,7 +1149,9 @@ def _allgather_result(df: pl.DataFrame) -> pl.DataFrame:
run_config = _consolidate_logs(
run_config, engine=engine, gather_client_logs=False
)
_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)


def run_polars_ray(
Expand Down Expand Up @@ -1180,7 +1198,9 @@ def run_polars_ray(
run_config = dataclasses.replace(run_config, records=dict(records), plans=plans)
run_config = _consolidate_logs(run_config, engine=engine)

_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Comment on lines +1201 to +1203
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize the Ray engine before leaving the context manager.

This now runs after with RayEngine(...) exits, so shutdown() has already torn down the engine state that RunConfig.serialize(engine=engine) reads. That means the new config recording can emit empty/default data or fail instead of capturing the live runtime settings. Move _finalize_benchmark_run(...) back inside the with block.

Suggested fix
     with RayEngine(
         rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
         executor_options=executor_options,
         engine_options=engine_options,
         ray_init_options=ray_init_options,
     ) as engine:
         run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
         records, plans, validation_failures, query_failures = _run_query_loop(
             benchmark,
             args,
             run_config,
             engine,
             numeric_type,
             date_type,
             validation_files,
         )
         run_config = dataclasses.replace(run_config, records=dict(records), plans=plans)
         run_config = _consolidate_logs(run_config, engine=engine)
-
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
+        _finalize_benchmark_run(
+            args, run_config, validation_failures, query_failures, engine=engine
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1201 - 1203, The call to _finalize_benchmark_run(...) is happening
after exiting the with RayEngine(...) block so engine shutdown has already
occurred; move the _finalize_benchmark_run(args, run_config,
validation_failures, query_failures, engine=engine) invocation back inside the
with RayEngine(...) context so RunConfig.serialize(engine=engine) (and any
engine-dependent recording) runs while the engine is still live and before
shutdown.



def run_polars_dask(
Expand Down Expand Up @@ -1240,7 +1260,9 @@ def run_polars_dask(
finally:
if dask_client is not None:
dask_client.close()
_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Comment on lines +1263 to +1265
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Finalize the Dask run while the engine is still alive.

By the time this call executes, the with DaskEngine(...) block has already exited, so the engine has been shut down before run_config.serialize(engine=engine) reads from it. That defeats the new config-capture path for Dask runs. Call _finalize_benchmark_run(...) inside the with block instead.

Suggested fix
     try:
         with DaskEngine(
             rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
             executor_options=executor_options,
             engine_options=engine_options,
             dask_client=dask_client,
         ) as engine:
             run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
             records, plans, validation_failures, query_failures = _run_query_loop(
                 benchmark,
                 args,
                 run_config,
                 engine,
                 numeric_type,
                 date_type,
                 validation_files,
             )
             run_config = dataclasses.replace(
                 run_config, records=dict(records), plans=plans
             )
             run_config = _consolidate_logs(run_config, engine)
+            _finalize_benchmark_run(
+                args, run_config, validation_failures, query_failures, engine=engine
+            )
     finally:
         if dask_client is not None:
             dask_client.close()
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1263 - 1265, The call to _finalize_benchmark_run(...) is happening
after the DaskEngine context has exited so the engine is already shut down when
run_config.serialize(engine=engine) needs it; move the
_finalize_benchmark_run(args, run_config, validation_failures, query_failures,
engine=engine) invocation so it executes inside the with DaskEngine(...) block
(i.e., before the context manager exits) so run_config.serialize can access the
live engine during Dask runs.



def setup_logging(query_id: int, iteration: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from collections.abc import Callable, MutableMapping
from concurrent.futures import ThreadPoolExecutor

import rapidsmpf.config
from rapidsmpf.communicator.communicator import Communicator
from rapidsmpf.memory.buffer_resource import BufferResource
from rapidsmpf.streaming.core.context import Context
Expand Down Expand Up @@ -150,6 +151,8 @@ class StreamingEngine(pl.GPUEngine):
when :meth:`shutdown` is called. If ``None``, an empty stack is created.
"""

rapidsmpf_options: rapidsmpf.config.Options

def __init__(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,8 @@ def __init__(
"memory_resource_config", None
)

rapidsmpf_options_as_bytes = resolve_rapidsmpf_options(
rapidsmpf_options
).serialize()
self.rapidsmpf_options = resolve_rapidsmpf_options(rapidsmpf_options)
rapidsmpf_options_as_bytes = self.rapidsmpf_options.serialize()

# Unique identifier for this cluster instance; namespaces the per-worker
# attribute so multiple DaskEngine contexts can coexist on the same workers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,8 @@ def __init__(
"memory_resource_config", None
)

rapidsmpf_options_as_bytes = resolve_rapidsmpf_options(
rapidsmpf_options
).serialize()
self.rapidsmpf_options = resolve_rapidsmpf_options(rapidsmpf_options)
rapidsmpf_options_as_bytes = self.rapidsmpf_options.serialize()

exit_stack = contextlib.ExitStack()
if not ray.is_initialized():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ def __init__(
)
bind_to_gpu(hw_binding)

rapidsmpf_options = resolve_rapidsmpf_options(rapidsmpf_options)
self.rapidsmpf_options = resolve_rapidsmpf_options(rapidsmpf_options)

mr_config: MemoryResourceConfig | None = engine_options.get(
"memory_resource_config", None
)
Expand All @@ -361,12 +362,12 @@ def __init__(
comm = bootstrap.create_ucxx_comm(
progress_thread=ProgressThread(),
type=bootstrap.BackendType.AUTO,
options=rapidsmpf_options,
options=self.rapidsmpf_options,
)
else:
comm = single_communicator(
progress_thread=ProgressThread(),
options=rapidsmpf_options,
options=self.rapidsmpf_options,
)
# else: caller-provided comm; the caller retains ownership

Expand All @@ -384,7 +385,7 @@ def __init__(
# exit-stack holding a stale reference. ``_cleanup_ctx`` is
# registered instead — it shuts down whatever ``self._ctx`` is
# at engine-shutdown time (i.e. the latest reset's Context).
ctx = Context.from_options(comm.logger, mr, rapidsmpf_options)
ctx = Context.from_options(comm.logger, mr, self.rapidsmpf_options)
exit_stack.callback(self._cleanup_ctx)
self._comm: Communicator | None = comm
self._ctx: Context | None = ctx
Expand Down
Loading