Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-129_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies:
- packaging
- pandas>=3.0.0,<3.1.0
- pandoc
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- pre-commit
- pyarrow>=19.0.0,<24
- pydata-sphinx-theme>=0.15.4
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-129_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies:
- packaging
- pandas>=3.0.0,<3.1.0
- pandoc
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- pre-commit
- pyarrow>=19.0.0,<24
- pydata-sphinx-theme>=0.15.4
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-132_arch-aarch64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies:
- packaging
- pandas>=3.0.0,<3.1.0
- pandoc
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- pre-commit
- pyarrow>=19.0.0,<24
- pydata-sphinx-theme>=0.15.4
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-132_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies:
- packaging
- pandas>=3.0.0,<3.1.0
- pandoc
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- pre-commit
- pyarrow>=19.0.0,<24
- pydata-sphinx-theme>=0.15.4
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf-polars/recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ requirements:
- python
- pylibcudf =${{ version }}
- rapidsmpf =${{ minor_version }}
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- packaging
- ${{ pin_compatible("cuda-version", upper_bound="x", lower_bound="x") }}
- if: cuda_major == "12"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
diff --git a/cpp/cmake_modules/FindRapidJSONAlt.cmake b/cpp/cmake_modules/FindRapidJSONAlt.cmake
index babb450e20..148dd93a78 100644
--- a/cpp/cmake_modules/FindRapidJSONAlt.cmake
+++ b/cpp/cmake_modules/FindRapidJSONAlt.cmake
@@ -26,7 +26,10 @@ endif()
if(RapidJSONAlt_FIND_QUIETLY)
list(APPEND find_package_args QUIET)
endif()
+set(_CMAKE_POLICY_VERSION_MINIMUM_OLD ${CMAKE_POLICY_VERSION_MINIMUM})
+set(CMAKE_POLICY_VERSION_MINIMUM 3.5)
find_package(RapidJSON ${find_package_args})
+set(CMAKE_POLICY_VERSION_MINIMUM ${_CMAKE_POLICY_VERSION_MINIMUM_OLD})
if(RapidJSON_FOUND)
set(RapidJSONAlt_FOUND TRUE)
if(NOT TARGET RapidJSON)
17 changes: 17 additions & 0 deletions cpp/cmake/thirdparty/patches/override.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"packages": {
"arrow": {
"version": "21.0.0",
"git_url": "https://github.com/apache/arrow.git",
"git_tag": "apache-arrow-${version}",
"git_shallow": false,
"patches": [
{
"file": "${current_json_dir}/arrow_rapidjson_cmake_policy_version_minimum.diff",
"issue": "https://github.com/apache/arrow/pull/49993"
}
],
"source_subdir": "cpp"
}
}
}
6 changes: 3 additions & 3 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ struct is_nested_end {
};

struct checked_token_level_output {
bool* depth_out_of_range;
int32_t* depth_out_of_range;

__device__ TreeDepthT operator()(size_type level) const
{
static_assert(sizeof(TreeDepthT) < sizeof(size_type));
if (level < static_cast<size_type>(cuda::std::numeric_limits<TreeDepthT>::min()) ||
level > static_cast<size_type>(cuda::std::numeric_limits<TreeDepthT>::max())) {
cuda::atomic_ref<bool, cuda::thread_scope_device> flag{*depth_out_of_range};
cuda::atomic_ref<int32_t, cuda::thread_scope_device> flag{*depth_out_of_range};
if (!flag.load(cuda::std::memory_order_relaxed)) {
flag.store(true, cuda::std::memory_order_relaxed);
}
Expand Down Expand Up @@ -302,7 +302,7 @@ tree_meta_t get_tree_representation(device_span<PdaTokenT const> tokens,
return does_push(token) - does_pop(token);
}));
auto depth_out_of_range =
cudf::detail::device_scalar<bool>(false, stream, cudf::get_current_device_resource_ref());
cudf::detail::device_scalar<int32_t>(0, stream, cudf::get_current_device_resource_ref());
auto const token_level_output_it = thrust::make_transform_output_iterator(
token_levels.begin(), checked_token_level_output{depth_out_of_range.data()});
thrust::exclusive_scan(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()),
Expand Down
80 changes: 51 additions & 29 deletions cpp/src/io/parquet/io_utils/parquet_io_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "io/comp/common.hpp"
#include "io/parquet/parquet_common.hpp"

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_worker_pool.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/io/datasource.hpp>
Expand All @@ -21,6 +22,7 @@
#include <cuda/std/tuple>

#include <numeric>
#include <tuple>

/**
* @file parquet_io_utils.cpp
Expand Down Expand Up @@ -89,6 +91,9 @@ fetch_byte_ranges_to_device_async(
std::vector<size_t> io_offsets;
std::vector<size_t> io_sizes;
std::vector<uint8_t*> destinations;
io_offsets.reserve(byte_ranges.size());
io_sizes.reserve(byte_ranges.size());
destinations.reserve(byte_ranges.size());

for (size_t chunk = 0; chunk < byte_ranges.size();) {
auto const io_offset = static_cast<size_t>(byte_ranges[chunk].offset());
Expand All @@ -110,12 +115,24 @@ fetch_byte_ranges_to_device_async(
CUDF_EXPECTS(io_offsets.size() == io_sizes.size() and io_sizes.size() == destinations.size(),
"Unexpected number of IO offsets, sizes, or destinations");

using host_read_buffer = std::unique_ptr<cudf::io::datasource::buffer>;

// Vectors to hold futures from datasource
std::vector<std::future<size_t>> device_read_tasks{};
std::vector<std::future<size_t>> host_read_tasks{};
device_read_tasks.reserve(byte_ranges.size());
host_read_tasks.reserve(byte_ranges.size());
std::vector<std::future<host_read_buffer>> host_read_tasks{};
device_read_tasks.reserve(io_offsets.size());
host_read_tasks.reserve(io_offsets.size());

// Vectors to store intermediate host read buffers and relevant pointers
std::vector<void*> copy_dsts{};
std::vector<size_t> copy_sizes{};
copy_dsts.reserve(io_offsets.size());
copy_sizes.reserve(io_offsets.size());

// Vector to store intermediate host buffers
std::vector<host_read_buffer> host_buffers{};

// device_read_async is not guaranteed to follow stream-ordering (see datasource API docs).
// `device_read_async` is not guaranteed to follow stream-ordering (see datasource API docs)
stream.synchronize();

{
Expand All @@ -133,35 +150,40 @@ fetch_byte_ranges_to_device_async(
device_read_tasks.emplace_back(
datasource.device_read_async(io_offset, io_size, dest, stream));
} else {
// Read the column chunk data to the host buffer copy it to the device buffer
host_read_tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(
[&datasource, io_offset, io_size, dest, stream]() {
auto host_buffer = datasource.host_read(io_offset, io_size);
cudf::detail::cuda_memcpy_async(
cudf::device_span<uint8_t>{dest, io_size},
cudf::host_span<uint8_t const>{host_buffer->data(), io_size},
stream);
return io_size;
}));
// Asynchronously read column chunk data to a host buffer
host_read_tasks.emplace_back(datasource.host_read_async(io_offset, io_size));
copy_dsts.push_back(static_cast<void*>(dest));
copy_sizes.push_back(io_size);
}
});
}

auto sync_function = [](decltype(host_read_tasks) host_read_tasks,
decltype(device_read_tasks) device_read_tasks) {
for (auto& task : host_read_tasks) {
task.get();
}
for (auto& task : device_read_tasks) {
task.get();
// If there are host reads, schedule a batched memcpy to device
if (not host_read_tasks.empty()) {
std::vector<void const*> copy_srcs{};
copy_srcs.reserve(host_read_tasks.size());
host_buffers.reserve(host_read_tasks.size());

for (auto& task : host_read_tasks) {
host_buffers.emplace_back(task.get());
copy_srcs.push_back(host_buffers.back().get()->data());
}
CUDF_CUDA_TRY(cudf::detail::memcpy_batch_async(
copy_dsts.data(), copy_srcs.data(), copy_sizes.data(), copy_dsts.size(), stream));
}
};
return {std::move(column_chunk_buffers),
std::move(column_chunk_data),
std::async(std::launch::deferred,
sync_function,
std::move(host_read_tasks),
std::move(device_read_tasks))};

// Synchronize the stream if `memcpy_batch_async` was scheduled to safely discard the host
// buffers
if (not host_buffers.empty()) { stream.synchronize(); }

auto sync_function = [](decltype(device_read_tasks) device_read_tasks) {
for (auto& task : device_read_tasks) {
task.get();
}
};
return {std::move(column_chunk_buffers),
std::move(column_chunk_data),
std::async(std::launch::deferred, sync_function, std::move(device_read_tasks))};
}
}

} // namespace cudf::io::parquet
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ dependencies:
# 'nvidia-ml-py' provides the 'pynvml' module
- &nvidia_ml_py nvidia-ml-py>=12
- packaging
- polars>=1.35,<1.39
- polars>=1.35,<1.40
- typing_extensions>=4.0.0
run_cudf_polars_dask:
common:
Expand Down
4 changes: 3 additions & 1 deletion java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ include(rapids-cmake)
include(rapids-cuda)
include(rapids-find)
include(rapids-cpm)
rapids_cpm_init()
rapids_cpm_init(
OVERRIDE "${CMAKE_CURRENT_SOURCE_DIR}/../../../../cpp/cmake/thirdparty/patches/override.json"
)

rapids_cuda_init_architectures(CUDF_JNI)

Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _callback(
else:
return df, timer.timings
elif config_options.executor.name == "streaming":
from cudf_polars.experimental.parallel import evaluate_streaming
from cudf_polars.streaming.parallel import evaluate_streaming

if timer is not None:
msg = textwrap.dedent("""\
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def serialize(

To enable dask support, dask serializers must be registered

>>> from cudf_polars.experimental.dask_serialize import register
>>> from cudf_polars.streaming.dask_serialize import register
>>> register()

Returns
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def serialize(

To enable dask support, dask serializers must be registered

>>> from cudf_polars.experimental.dask_serialize import register
>>> from cudf_polars.streaming.dask_serialize import register
>>> register()

Parameters
Expand Down
18 changes: 9 additions & 9 deletions python/cudf_polars/cudf_polars/dsl/expressions/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
offsets_to_windows,
range_window_bounds,
)
from cudf_polars.utils.versions import POLARS_VERSION_LT_136
from cudf_polars.utils.versions import POLARS_VERSION_LT_136, POLARS_VERSION_LT_139

if TYPE_CHECKING:
from collections.abc import Sequence
Expand Down Expand Up @@ -79,14 +79,14 @@ def to_request(
elif isinstance(value, expr.Agg):
child = value.children[0]
col = child.evaluate(df, context=ExecutionContext.ROLLING)
if POLARS_VERSION_LT_136 and value.name == "var": # pragma: no cover
if (POLARS_VERSION_LT_136 or not POLARS_VERSION_LT_139) and value.name == "var":
# Polars variance produces null if nvalues <= ddof
# libcudf produces NaN. However, we can get the polars
# behaviour by setting the minimum window size to ddof +
# 1.
#
# We still need this check, polars is not hitting it because
# of https://github.com/pola-rs/polars/pull/25117
# In polars 1.36-1.38 this code path is not hit because
# rolling goes through the IR path (not RollingWindow).
# See https://github.com/pola-rs/polars/pull/25117
min_periods = value.options + 1
else:
col = value.evaluate(
Expand Down Expand Up @@ -146,12 +146,12 @@ def __init__(
raise NotImplementedError(
"Incorrect handling of empty groups for list collection"
)
if POLARS_VERSION_LT_136 and not plc.rolling.is_valid_rolling_aggregation(
if (
POLARS_VERSION_LT_136 or not POLARS_VERSION_LT_139
) and not plc.rolling.is_valid_rolling_aggregation(
agg.dtype.plc_type, agg.agg_request
):
raise NotImplementedError(
f"Unsupported rolling aggregation {agg}"
) # pragma: no cover; polars may raise ahead of time
raise NotImplementedError(f"Unsupported rolling aggregation {agg}")

def do_evaluate( # noqa: D102
self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME
Expand Down
13 changes: 6 additions & 7 deletions python/cudf_polars/cudf_polars/dsl/expressions/unary.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,14 @@ def do_evaluate(
if maintain_order:
column = column.sorted_like(values)
return column
elif self.name == "set_sorted": # pragma: no cover
# TODO: LazyFrame.set_sorted is proper IR concept (ie. FunctionIR::Hint)
# and is is currently not implemented. We should reimplement it as a MapFunction.
elif self.name == "set_sorted":
(column,) = (child.evaluate(df, context=context) for child in self.children)
(asc,) = self.options
if isinstance(self.options[0], str):
descending = self.options[0] == "descending" # pragma: no cover
else:
descending, _ = self.options
Comment on lines +245 to +248
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix set_sorted option unpacking to avoid runtime crashes.

At Line 248, descending, _ = self.options can raise when self.options has a single non-string payload (e.g., tuple in index 0). Parse from self.options[0] consistently.

🔧 Proposed fix
-            if isinstance(self.options[0], str):
-                descending = self.options[0] == "descending"  # pragma: no cover
-            else:
-                descending, _ = self.options
+            option0 = self.options[0]
+            if isinstance(option0, str):
+                descending = option0 == "descending"  # pragma: no cover
+            elif isinstance(option0, tuple):
+                descending = bool(option0[0])
+            else:
+                descending = bool(option0)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if isinstance(self.options[0], str):
descending = self.options[0] == "descending" # pragma: no cover
else:
descending, _ = self.options
option0 = self.options[0]
if isinstance(option0, str):
descending = option0 == "descending" # pragma: no cover
elif isinstance(option0, tuple):
descending = bool(option0[0])
else:
descending = bool(option0)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/cudf_polars/cudf_polars/dsl/expressions/unary.py` around lines 245 -
248, The unpacking of set_sorted options can crash because "descending, _ =
self.options" assumes two items; change the logic to always read the flag from
self.options[0] instead of unpacking the whole sequence: when self.options[0] is
a str keep the current string check, otherwise set descending =
bool(self.options[0]) (or simply descending = self.options[0]) so the code reads
the descending flag consistently from self.options[0]; update the block around
the existing isinstance(self.options[0], str) check in unary.py to use
self.options[0] for both branches (referencing self.options and self.options[0]
in that scope).

order = (
plc.types.Order.ASCENDING
if asc == "ascending"
else plc.types.Order.DESCENDING
plc.types.Order.DESCENDING if descending else plc.types.Order.ASCENDING
)
null_order = plc.types.NullOrder.BEFORE
if column.null_count > 0 and (n := column.size) > 1:
Expand Down
18 changes: 14 additions & 4 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -3245,19 +3245,21 @@ def do_evaluate(
class HConcat(IR):
"""Concatenate dataframes horizontally."""

__slots__ = ("should_broadcast",)
_non_child = ("schema", "should_broadcast")
_n_non_child_args = 2
__slots__ = ("should_broadcast", "strict")
_non_child = ("schema", "should_broadcast", "strict")
_n_non_child_args = 3

def __init__(
self,
schema: Schema,
should_broadcast: bool, # noqa: FBT001
strict: bool, # noqa: FBT001
*children: IR,
):
self.schema = schema
self.should_broadcast = should_broadcast
self._non_child_args = (schema, should_broadcast)
self.strict = strict
self._non_child_args = (schema, should_broadcast, strict)
self.children = children

@staticmethod
Expand Down Expand Up @@ -3300,6 +3302,7 @@ def do_evaluate(
cls,
schema: Schema,
should_broadcast: bool, # noqa: FBT001
strict: bool, # noqa: FBT001
*dfs: DataFrame,
context: IRExecutionContext,
) -> DataFrame:
Expand All @@ -3317,6 +3320,13 @@ def do_evaluate(
result = DataFrame(ordered, stream=stream)
else:
max_rows = max(df.num_rows for df in dfs)
if strict and any(df.num_rows != max_rows for df in dfs):
heights = [df.num_rows for df in dfs]
msg = (
f"cannot concat DataFrames horizontally"
f" with strict=True: height mismatch {heights}"
)
raise pl.exceptions.ShapeError(msg)
# Horizontal concatenation extends shorter tables with nulls
result = DataFrame(
itertools.chain.from_iterable(
Expand Down
Loading
Loading