Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ def _finalize_benchmark_run(
run_config: RunConfig,
validation_failures: list[int],
query_failures: list[tuple[int, int]],
engine: StreamingEngine | None,
) -> None:
"""Summarize, serialize, and exit after a benchmark run."""
if args.summarize:
Expand All @@ -1092,7 +1093,7 @@ def _finalize_benchmark_run(
)
else:
print("✅ All validated queries passed.")
args.output.write(json.dumps(run_config.serialize(engine=None)))
args.output.write(json.dumps(run_config.serialize(engine=engine)))
args.output.write("\n")
sys.exit(1 if (query_failures or validation_failures) else 0)

Expand Down Expand Up @@ -1152,7 +1153,9 @@ def _allgather_result(df: pl.DataFrame) -> pl.DataFrame:
run_config = _consolidate_logs(
run_config, engine=engine, gather_client_logs=False
)
_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)


def run_polars_ray(
Expand Down Expand Up @@ -1200,7 +1203,9 @@ def run_polars_ray(
run_config = dataclasses.replace(run_config, records=dict(records), plans=plans)
run_config = _consolidate_logs(run_config, engine=engine)

_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Comment on lines +1201 to +1203
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize the Ray engine before leaving the context manager.

This now runs after with RayEngine(...) exits, so shutdown() has already torn down the engine state that RunConfig.serialize(engine=engine) reads. That means the new config recording can emit empty/default data or fail instead of capturing the live runtime settings. Move _finalize_benchmark_run(...) back inside the with block.

Suggested fix
     with RayEngine(
         rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
         executor_options=executor_options,
         engine_options=engine_options,
         ray_init_options=ray_init_options,
     ) as engine:
         run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
         records, plans, validation_failures, query_failures = _run_query_loop(
             benchmark,
             args,
             run_config,
             engine,
             numeric_type,
             date_type,
             validation_files,
         )
         run_config = dataclasses.replace(run_config, records=dict(records), plans=plans)
         run_config = _consolidate_logs(run_config, engine=engine)
-
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
+        _finalize_benchmark_run(
+            args, run_config, validation_failures, query_failures, engine=engine
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1201 - 1203, The call to _finalize_benchmark_run(...) is happening
after exiting the with RayEngine(...) block so engine shutdown has already
occurred; move the _finalize_benchmark_run(args, run_config,
validation_failures, query_failures, engine=engine) invocation back inside the
with RayEngine(...) context so RunConfig.serialize(engine=engine) (and any
engine-dependent recording) runs while the engine is still live and before
shutdown.



def run_polars_dask(
Expand Down Expand Up @@ -1261,7 +1266,9 @@ def run_polars_dask(
finally:
if dask_client is not None:
dask_client.close()
_finalize_benchmark_run(args, run_config, validation_failures, query_failures)
_finalize_benchmark_run(
args, run_config, validation_failures, query_failures, engine=engine
)
Comment on lines +1263 to +1265
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Finalize the Dask run while the engine is still alive.

By the time this call executes, the with DaskEngine(...) block has already exited, so the engine has been shut down before run_config.serialize(engine=engine) reads from it. That defeats the new config-capture path for Dask runs. Call _finalize_benchmark_run(...) inside the with block instead.

Suggested fix
     try:
         with DaskEngine(
             rapidsmpf_options=run_config.streaming_options.to_rapidsmpf_options(),
             executor_options=executor_options,
             engine_options=engine_options,
             dask_client=dask_client,
         ) as engine:
             run_config = dataclasses.replace(run_config, n_workers=engine.nranks)
             records, plans, validation_failures, query_failures = _run_query_loop(
                 benchmark,
                 args,
                 run_config,
                 engine,
                 numeric_type,
                 date_type,
                 validation_files,
             )
             run_config = dataclasses.replace(
                 run_config, records=dict(records), plans=plans
             )
             run_config = _consolidate_logs(run_config, engine)
+            _finalize_benchmark_run(
+                args, run_config, validation_failures, query_failures, engine=engine
+            )
     finally:
         if dask_client is not None:
             dask_client.close()
-    _finalize_benchmark_run(
-        args, run_config, validation_failures, query_failures, engine=engine
-    )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@python/cudf_polars/cudf_polars/experimental/benchmarks/utils_new_frontends.py`
around lines 1263 - 1265, The call to _finalize_benchmark_run(...) is happening
after the DaskEngine context has exited so the engine is already shut down when
run_config.serialize(engine=engine) needs it; move the
_finalize_benchmark_run(args, run_config, validation_failures, query_failures,
engine=engine) invocation so it executes inside the with DaskEngine(...) block
(i.e., before the context manager exits) so run_config.serialize can access the
live engine during Dask runs.



def setup_logging(query_id: int, iteration: int) -> None:
Expand Down
Loading