Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,34 @@
import sys
import re

def _assert_equal(cpu, gpu, float_check, path):
def _is_nan_inf_overflow_pair(cpu, gpu):
return type(gpu) is float and (
(math.isnan(cpu) and math.isinf(gpu)) or
(math.isinf(cpu) and math.isnan(gpu)))

def _assert_equal(cpu, gpu, float_check, path, nan_inf_equivalent_for_overflow=False):
Comment thread
thirtiseven marked this conversation as resolved.
Outdated
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
_assert_equal(cpu[field], gpu[field], float_check, path + [field],
nan_inf_equivalent_for_overflow)
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
Expand All @@ -67,18 +76,22 @@ def _assert_equal(cpu, gpu, float_check, path):
if done:
assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
else:
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index],
nan_inf_equivalent_for_overflow)

index = index + 1
elif (t is dict):
# The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
# so sort the items to do our best with ignoring the order of dicts
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
Comment thread
wjxiz1992 marked this conversation as resolved.
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"],
nan_inf_equivalent_for_overflow)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 The sorted(...) fix mentioned in the PR description was not applied. list.sort() is an in-place method that returns None, so cpu_items and gpu_items are both None. When _assert_equal(None, None, ...) is called, it falls through to the cpu == None branch (elif (cpu == None): assert cpu == gpu) which always passes. Any dict comparison therefore silently succeeds regardless of content — a false green that masks real CPU/GPU divergence in map-type columns.

Suggested change
cpu_items = list(cpu.items()).sort(key=_RowCmp)
gpu_items = list(gpu.items()).sort(key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"],
nan_inf_equivalent_for_overflow)
cpu_items = sorted(list(cpu.items()), key=_RowCmp)
gpu_items = sorted(list(gpu.items()), key=_RowCmp)
_assert_equal(cpu_items, gpu_items, float_check, path + ["map"],
nan_inf_equivalent_for_overflow)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in e964fed: this PR no longer changes the dict/map comparison branch. The list.sort() behavior is a pre-existing issue, but applying the suggested sorted(...) cleanup in this PR exposed unrelated existing map/JSON/ORC differences in Blossom #13006, so that cleanup should be handled separately instead of being bundled with the std/variance fix.

Local validation after removing this PR's dict-branch change passed:
hash_aggregate_test.py::test_std_variance_extreme_floating_point plus json_matrix_test.py::test_from_json_map_string_string[int_formatted.json]: 11 passed, 3 warnings in 116.69s.

elif (t is int):
assert cpu == gpu, f"GPU ({gpu}) and CPU ({cpu}) int values are different at {path}"
elif (t is float):
if (nan_inf_equivalent_for_overflow and _is_nan_inf_overflow_pair(cpu, gpu)):
return
if (math.isnan(cpu)):
assert math.isnan(gpu), f"GPU ({gpu}) and CPU (nan) float values are different at {path}"
else:
Expand Down Expand Up @@ -106,14 +119,15 @@ def _assert_equal(cpu, gpu, float_check, path):
else:
assert False, "Found unexpected type {} at {}".format(t, path)

def assert_equal_with_local_sort(cpu, gpu):
def assert_equal_with_local_sort(cpu, gpu, nan_inf_equivalent_for_overflow=False):
_sort_locally(cpu, gpu)
assert_equal(cpu, gpu)
assert_equal(cpu, gpu, nan_inf_equivalent_for_overflow)

def assert_equal(cpu, gpu):
def assert_equal(cpu, gpu, nan_inf_equivalent_for_overflow=False):
"""Verify that the result from the CPU and the GPU are equal"""
try:
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[],
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)
except:
def to_txt(data):
try:
Expand Down Expand Up @@ -587,7 +601,8 @@ def _assert_gpu_and_cpu_are_equal(func,
mode,
conf={},
is_cpu_first=True,
result_canonicalize_func_before_compare=None):
result_canonicalize_func_before_compare=None,
nan_inf_equivalent_for_overflow=False):
(bring_back, collect_type) = _prep_func_for_compare(func, mode)
conf = _prep_incompat_conf(conf)

Expand Down Expand Up @@ -625,7 +640,7 @@ def run_on_gpu():
if should_sort_locally():
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)
assert_equal(from_cpu, from_gpu, nan_inf_equivalent_for_overflow)

def run_with_cpu(func,
mode,
Expand Down Expand Up @@ -687,7 +702,9 @@ def run_on_gpu():

return (from_cpu, from_gpu)

def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True, result_canonicalize_func_before_compare=None):
def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in f402c5f. The assert_gpu_and_cpu_are_equal_collect signature is back to a single line; this PR no longer changes that line.

result_canonicalize_func_before_compare=None,
nan_inf_equivalent_for_overflow=False):
"""
Assert when running func on both the CPU and the GPU that the results are equal.
In this case the data is collected back to the driver and compared here, so be
Expand All @@ -702,8 +719,12 @@ def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True, resul
+Row(a=Row(first=-341142443, second=3.333994866005594e-37))
Use this func to canonicalize the results.
Usage of this func is: (cpu, gpu) = result_canonicalize_func_before_compare(original_cpu_result, original_gpu_result)
:param nan_inf_equivalent_for_overflow: Treat NaN vs +/-Inf as equivalent for
documented floating-point overflow tests.
"""
_assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first, result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)
_assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first,
result_canonicalize_func_before_compare=result_canonicalize_func_before_compare,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)

def assert_gpu_and_cpu_are_equal_iterator(func, conf={}, is_cpu_first=True):
"""
Expand All @@ -721,7 +742,9 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False,
is_cpu_first=True, validate_execs_in_gpu_plan=[],
nan_inf_equivalent_for_overflow=False):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
Expand All @@ -731,6 +754,8 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=F
:param debug: Boolean to indicate if the SQL output should be printed
:param is_cpu_first: Boolean to indicate if the CPU should be run first or not
:param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan.
:param nan_inf_equivalent_for_overflow: Treat NaN vs +/-Inf as equivalent for
documented floating-point overflow tests.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
Expand All @@ -745,7 +770,8 @@ def do_it_all(spark):
return data_gen.debug_df(spark.sql(sql))
else:
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)


def check_exception(actual_error, error_message):
Expand Down
67 changes: 46 additions & 21 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,24 @@
_init_list_with_decimals = _init_list + [
_decimals_with_nulls, _decimals_with_no_nulls]

_std_variance_issue_14681_gen = [
('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', DoubleGen()), ('c', DoubleGen())]

# Grouped FP gens using bare DoubleGen()/FloatGen() on the aggregated columns.
# Their default special_cases inject NaN, -0.0, +-Inf, and max-fraction values,
# which exercise the corner-case paths for FP aggregate functions.
_init_list_with_decimals_and_floats = _init_list_with_decimals + [
_std_variance_issue_14681_gen,
[('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', FloatGen()), ('c', FloatGen())]]
_std_variance_common_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(min_exp=-200, max_exp=200, no_nans=True)),
('c', DoubleGen(min_exp=-200, max_exp=200, no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

_init_list_with_decimals_and_common_floats = (
_init_list_with_decimals + _std_variance_common_fp_gens)

_std_variance_extreme_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(no_nans=True)),
('c', DoubleGen(no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

# Used to test ANSI-mode fallback
_no_overflow_ansi_gens = [
Expand Down Expand Up @@ -2346,16 +2355,7 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})

# Tests for standard deviation and variance aggregations.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
if data_gen is _std_variance_issue_14681_gen:
pytest.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/14681')

def _assert_std_variance(data_gen, conf, nan_inf_equivalent_for_overflow=False):
local_conf = copy_and_update(conf, {
'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})
assert_gpu_and_cpu_are_equal_sql(
Expand All @@ -2369,15 +2369,40 @@ def test_std_variance(data_gen, conf):
'var_pop(b),' +
'var_samp(b)' +
' from data_table group by a',
conf=local_conf)
conf=local_conf,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=1000),
"data_table",
'select ' +
'stddev(b),' +
'stddev_samp(b)'
' from data_table',
conf=local_conf)
conf=local_conf,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)


# Tests for standard deviation and variance aggregations on common finite values.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_common_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
_assert_std_variance(data_gen, conf)


# Extremely large FP inputs can make the true variance overflow double precision.
# Spark CPU and GPU may surface that overflow as NaN or +/-Inf depending only on
# accumulation order. Keep this corner coverage, but compare those overflow
# sentinels loosely. See https://github.com/NVIDIA/spark-rapids/issues/14681.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _std_variance_extreme_fp_gens, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance_extreme_floating_point(data_gen, conf):
_assert_std_variance(data_gen, conf, nan_inf_equivalent_for_overflow=True)


@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,18 @@ class CudfMergeM2 extends CudfAggregate {
if (n > 0) {
val mean = partialMean.getDouble(i)
val m2 = partialM2.getDouble(i)
val delta = mean - mergeMean
val newN = n + mergeN
mergeM2 += m2 + delta * delta * n * mergeN / newN
mergeMean = (mergeMean * mergeN + mean * n) / newN
mergeN = newN
if (mergeN == 0.0) {
mergeN = n
mergeMean = mean
mergeM2 = m2
} else {
val delta = mean - mergeMean
val newN = mergeN + n
val deltaN = delta / newN
mergeM2 += m2 + delta * deltaN * mergeN * n
mergeMean += deltaN * n
mergeN = newN
}
}
}

Expand Down
Loading