Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ floating point aggregation then the join may fail to work properly with the plug
worked with plain Spark. This is behavior is enabled by default but can be disabled with the config
[`spark.rapids.sql.variableFloatAgg.enabled`](additional-functionality/advanced_configs.md#sql.variableFloatAgg.enabled).

For standard deviation and variance aggregations (`stddev`, `stddev_pop`, `stddev_samp`,
`variance`, `var_pop`, and `var_samp`) on very large finite floating-point values, the exact
mathematical result can exceed the range of a double. In these overflow cases Spark CPU and the
RAPIDS Accelerator may report different IEEE floating-point sentinels, such as `NaN` versus
`+Infinity` or `-Infinity`, because partial aggregate state can be merged in a different order.
This is limited to overflow behavior for extreme inputs; ordinary finite inputs should still match
within normal floating-point tolerance.

### `0.0` vs `-0.0`

Floating point allows zero to be encoded as `0.0` and `-0.0`, but the IEEE standard says that they
Expand Down
57 changes: 41 additions & 16 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,34 @@
import sys
import re

def _assert_equal(cpu, gpu, float_check, path):
def _is_nan_inf_overflow_pair(cpu, gpu):
return type(gpu) is float and (
(math.isnan(cpu) and math.isinf(gpu)) or
(math.isinf(cpu) and math.isnan(gpu)))

def _assert_equal(cpu, gpu, float_check, path, nan_inf_equivalent_for_overflow=False):
Comment thread
thirtiseven marked this conversation as resolved.
Outdated
t = type(cpu)
if (t is Row):
assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
for field in cpu.__fields__:
_assert_equal(cpu[field], gpu[field], float_check, path + [field])
_assert_equal(cpu[field], gpu[field], float_check, path + [field],
nan_inf_equivalent_for_overflow)
else:
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is list):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is tuple):
assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
for index in range(len(cpu)):
_assert_equal(cpu[index], gpu[index], float_check, path + [index])
_assert_equal(cpu[index], gpu[index], float_check, path + [index],
nan_inf_equivalent_for_overflow)
elif (t is pytypes.GeneratorType):
index = 0
# generator has no zip :( so we have to do this the hard way
Expand All @@ -67,7 +76,8 @@ def _assert_equal(cpu, gpu, float_check, path):
if done:
assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
else:
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
_assert_equal(sub_cpu, sub_gpu, float_check, path + [index],
nan_inf_equivalent_for_overflow)

index = index + 1
elif (t is dict):
Expand All @@ -79,6 +89,8 @@ def _assert_equal(cpu, gpu, float_check, path):
elif (t is int):
assert cpu == gpu, f"GPU ({gpu}) and CPU ({cpu}) int values are different at {path}"
elif (t is float):
if (nan_inf_equivalent_for_overflow and _is_nan_inf_overflow_pair(cpu, gpu)):
return
if (math.isnan(cpu)):
assert math.isnan(gpu), f"GPU ({gpu}) and CPU (nan) float values are different at {path}"
else:
Expand Down Expand Up @@ -106,14 +118,15 @@ def _assert_equal(cpu, gpu, float_check, path):
else:
assert False, "Found unexpected type {} at {}".format(t, path)

def assert_equal_with_local_sort(cpu, gpu):
def assert_equal_with_local_sort(cpu, gpu, nan_inf_equivalent_for_overflow=False):
_sort_locally(cpu, gpu)
assert_equal(cpu, gpu)
assert_equal(cpu, gpu, nan_inf_equivalent_for_overflow)

def assert_equal(cpu, gpu):
def assert_equal(cpu, gpu, nan_inf_equivalent_for_overflow=False):
"""Verify that the result from the CPU and the GPU are equal"""
try:
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
_assert_equal(cpu, gpu, float_check=get_float_check(), path=[],
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)
except:
def to_txt(data):
try:
Expand Down Expand Up @@ -587,7 +600,8 @@ def _assert_gpu_and_cpu_are_equal(func,
mode,
conf={},
is_cpu_first=True,
result_canonicalize_func_before_compare=None):
result_canonicalize_func_before_compare=None,
nan_inf_equivalent_for_overflow=False):
(bring_back, collect_type) = _prep_func_for_compare(func, mode)
conf = _prep_incompat_conf(conf)

Expand Down Expand Up @@ -625,7 +639,7 @@ def run_on_gpu():
if should_sort_locally():
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)
assert_equal(from_cpu, from_gpu, nan_inf_equivalent_for_overflow)

def run_with_cpu(func,
mode,
Expand Down Expand Up @@ -687,7 +701,9 @@ def run_on_gpu():

return (from_cpu, from_gpu)

def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True, result_canonicalize_func_before_compare=None):
def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in f402c5f. The assert_gpu_and_cpu_are_equal_collect signature is back to a single line; this PR no longer changes that line.

result_canonicalize_func_before_compare=None,
nan_inf_equivalent_for_overflow=False):
"""
Assert when running func on both the CPU and the GPU that the results are equal.
In this case the data is collected back to the driver and compared here, so be
Expand All @@ -702,8 +718,12 @@ def assert_gpu_and_cpu_are_equal_collect(func, conf={}, is_cpu_first=True, resul
+Row(a=Row(first=-341142443, second=3.333994866005594e-37))
Use this func to canonicalize the results.
Usage of this func is: (cpu, gpu) = result_canonicalize_func_before_compare(original_cpu_result, original_gpu_result)
:param nan_inf_equivalent_for_overflow: Treat NaN vs +/-Inf as equivalent for
documented floating-point overflow tests.
"""
_assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first, result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)
_assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first,
result_canonicalize_func_before_compare=result_canonicalize_func_before_compare,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)

def assert_gpu_and_cpu_are_equal_iterator(func, conf={}, is_cpu_first=True):
"""
Expand All @@ -721,7 +741,9 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False,
is_cpu_first=True, validate_execs_in_gpu_plan=[],
nan_inf_equivalent_for_overflow=False):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
Expand All @@ -731,6 +753,8 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=F
:param debug: Boolean to indicate if the SQL output should be printed
:param is_cpu_first: Boolean to indicate if the CPU should be run first or not
:param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan.
:param nan_inf_equivalent_for_overflow: Treat NaN vs +/-Inf as equivalent for
documented floating-point overflow tests.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
Expand All @@ -745,7 +769,8 @@ def do_it_all(spark):
return data_gen.debug_df(spark.sql(sql))
else:
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)


def check_exception(actual_error, error_message):
Expand Down
67 changes: 46 additions & 21 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,24 @@
_init_list_with_decimals = _init_list + [
_decimals_with_nulls, _decimals_with_no_nulls]

_std_variance_issue_14681_gen = [
('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', DoubleGen()), ('c', DoubleGen())]

# Grouped FP gens using bare DoubleGen()/FloatGen() on the aggregated columns.
# Their default special_cases inject NaN, -0.0, +-Inf, and max-fraction values,
# which exercise the corner-case paths for FP aggregate functions.
_init_list_with_decimals_and_floats = _init_list_with_decimals + [
_std_variance_issue_14681_gen,
[('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', FloatGen()), ('c', FloatGen())]]
_std_variance_common_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(min_exp=-200, max_exp=200, no_nans=True)),
('c', DoubleGen(min_exp=-200, max_exp=200, no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

_init_list_with_decimals_and_common_floats = (
_init_list_with_decimals + _std_variance_common_fp_gens)

_std_variance_extreme_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(no_nans=True)),
('c', DoubleGen(no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

# Used to test ANSI-mode fallback
_no_overflow_ansi_gens = [
Expand Down Expand Up @@ -2346,16 +2355,7 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})

# Tests for standard deviation and variance aggregations.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
if data_gen is _std_variance_issue_14681_gen:
pytest.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/14681')

def _assert_std_variance(data_gen, conf, nan_inf_equivalent_for_overflow=False):
local_conf = copy_and_update(conf, {
'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})
assert_gpu_and_cpu_are_equal_sql(
Expand All @@ -2369,15 +2369,40 @@ def test_std_variance(data_gen, conf):
'var_pop(b),' +
'var_samp(b)' +
' from data_table group by a',
conf=local_conf)
conf=local_conf,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=1000),
"data_table",
'select ' +
'stddev(b),' +
'stddev_samp(b)'
' from data_table',
conf=local_conf)
conf=local_conf,
nan_inf_equivalent_for_overflow=nan_inf_equivalent_for_overflow)


# Tests for standard deviation and variance aggregations on common finite values.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_common_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
_assert_std_variance(data_gen, conf)


# Extremely large FP inputs can make the true variance overflow double precision.
# Spark CPU and GPU may surface that overflow as NaN or +/-Inf depending only on
# accumulation order. Keep this corner coverage, but compare those overflow
# sentinels loosely. See https://github.com/NVIDIA/spark-rapids/issues/14681.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _std_variance_extreme_fp_gens, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance_extreme_floating_point(data_gen, conf):
_assert_std_variance(data_gen, conf, nan_inf_equivalent_for_overflow=True)


@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,18 @@ class CudfMergeM2 extends CudfAggregate {
if (n > 0) {
val mean = partialMean.getDouble(i)
val m2 = partialM2.getDouble(i)
val delta = mean - mergeMean
val newN = n + mergeN
mergeM2 += m2 + delta * delta * n * mergeN / newN
mergeMean = (mergeMean * mergeN + mean * n) / newN
mergeN = newN
if (mergeN == 0.0) {
mergeN = n
mergeMean = mean
mergeM2 = m2
} else {
val delta = mean - mergeMean
val newN = mergeN + n
val deltaN = delta / newN
mergeM2 += m2 + delta * deltaN * mergeN * n
mergeMean += deltaN * n
mergeN = newN
}
}
}

Expand Down