Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ floating point aggregation then the join may fail to work properly with the plug
worked with plain Spark. This is behavior is enabled by default but can be disabled with the config
[`spark.rapids.sql.variableFloatAgg.enabled`](additional-functionality/advanced_configs.md#sql.variableFloatAgg.enabled).

For standard deviation and variance aggregations (`stddev`, `stddev_pop`, `stddev_samp`,
`variance`, `var_pop`, and `var_samp`) on very large finite floating-point values, the exact
mathematical result can exceed the range of a double. In these overflow cases Spark CPU and the
RAPIDS Accelerator may report different IEEE floating-point sentinels, such as `NaN` versus
`+Infinity`, because partial aggregate state can be merged in a different order. `-Infinity` is
not an expected outcome for stddev/variance over finite inputs and is not treated as an accepted
overflow sentinel. This is limited to overflow behavior for extreme inputs; ordinary finite
inputs should still match within normal floating-point tolerance.

### `0.0` vs `-0.0`

Floating point allows zero to be encoded as `0.0` and `-0.0`, but the IEEE standard says that they
Expand Down
9 changes: 7 additions & 2 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,9 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False,
is_cpu_first=True, validate_execs_in_gpu_plan=[],
result_canonicalize_func_before_compare=None):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
Expand All @@ -731,6 +733,8 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=F
:param debug: Boolean to indicate if the SQL output should be printed
:param is_cpu_first: Boolean to indicate if the CPU should be run first or not
:param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan.
:param result_canonicalize_func_before_compare: Function to canonicalize the CPU and GPU
results before comparison.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
Expand All @@ -745,7 +749,8 @@ def do_it_all(spark):
return data_gen.debug_df(spark.sql(sql))
else:
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first,
result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)


def check_exception(actual_error, error_message):
Expand Down
103 changes: 82 additions & 21 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from conftest import is_not_utc
from data_gen import *
from functools import reduce
from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from marks import *
Expand Down Expand Up @@ -194,15 +195,24 @@
_init_list_with_decimals = _init_list + [
_decimals_with_nulls, _decimals_with_no_nulls]

_std_variance_issue_14681_gen = [
('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', DoubleGen()), ('c', DoubleGen())]

# Grouped FP gens using bare DoubleGen()/FloatGen() on the aggregated columns.
# Their default special_cases inject NaN, -0.0, +-Inf, and max-fraction values,
# which exercise the corner-case paths for FP aggregate functions.
_init_list_with_decimals_and_floats = _init_list_with_decimals + [
_std_variance_issue_14681_gen,
[('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', FloatGen()), ('c', FloatGen())]]
_std_variance_common_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(min_exp=-200, max_exp=200, no_nans=True)),
('c', DoubleGen(min_exp=-200, max_exp=200, no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

_init_list_with_decimals_and_common_floats = (
_init_list_with_decimals + _std_variance_common_fp_gens)

_std_variance_extreme_fp_gens = [
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', DoubleGen(no_nans=True)),
('c', DoubleGen(no_nans=True))],
[('a', RepeatSeqGen(IntegerGen(), length=20)),
('b', FloatGen(no_nans=True)),
('c', FloatGen(no_nans=True))]]

# Used to test ANSI-mode fallback
_no_overflow_ansi_gens = [
Expand Down Expand Up @@ -2346,16 +2356,37 @@ def do_it(spark):
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.sql.ansi.enabled': 'true'})

# Tests for standard deviation and variance aggregations.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
if data_gen is _std_variance_issue_14681_gen:
pytest.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/14681')

_STD_VARIANCE_OVERFLOW_SENTINEL = '__STD_VARIANCE_OVERFLOW__'
_STD_VARIANCE_FIELDS = {
'stddev(b)', 'stddev_pop(b)', 'stddev_samp(b)',
'variance(b)', 'var_pop(b)', 'var_samp(b)'
}

def _canonicalize_std_variance_overflow_value(value):
# Only NaN and +Infinity are accepted overflow sentinels for std/variance
# over finite inputs: they can be explained by overflow plus partial merge
# order. -Infinity would indicate a different issue (e.g. negative M2 /
# sign bug) and must not be canonicalized away.
if isinstance(value, float) and (math.isnan(value) or value == math.inf):
return _STD_VARIANCE_OVERFLOW_SENTINEL
return value

def _canonicalize_std_variance_overflow_rows(rows):
return [
Row(**{
field: _canonicalize_std_variance_overflow_value(row[field])
if field in _STD_VARIANCE_FIELDS else row[field]
for field in row.__fields__
}) if isinstance(row, Row) and hasattr(row, "__fields__") else row
for row in rows
]

def _canonicalize_std_variance_overflow_results(cpu, gpu):
return (
_canonicalize_std_variance_overflow_rows(cpu),
_canonicalize_std_variance_overflow_rows(gpu))

def _assert_std_variance(data_gen, conf, result_canonicalize_func_before_compare=None):
local_conf = copy_and_update(conf, {
'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})
assert_gpu_and_cpu_are_equal_sql(
Expand All @@ -2369,15 +2400,45 @@ def test_std_variance(data_gen, conf):
'var_pop(b),' +
'var_samp(b)' +
' from data_table group by a',
conf=local_conf)
conf=local_conf,
result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, data_gen, length=1000),
"data_table",
'select ' +
'stddev(b),' +
'stddev_samp(b)'
' from data_table',
conf=local_conf)
conf=local_conf,
result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)


# Tests for standard deviation and variance aggregations on common finite values.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_common_floats, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance(data_gen, conf):
_assert_std_variance(data_gen, conf)


# Extremely large FP inputs can make the true variance overflow double precision.
# Spark CPU and GPU may surface that overflow as NaN or +Infinity depending only on
# accumulation order. Keep this corner coverage, but compare those overflow
# sentinels loosely. -Infinity is not an accepted overflow sentinel because
# stddev/variance over finite inputs cannot reach it via overflow alone.
# See https://github.com/NVIDIA/spark-rapids/issues/14681.
@ignore_order(local=True)
@approximate_float
@incompat
@pytest.mark.parametrize('data_gen', _std_variance_extreme_fp_gens, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_std_variance_extreme_floating_point(data_gen, conf):
_assert_std_variance(
data_gen,
conf,
result_canonicalize_func_before_compare=_canonicalize_std_variance_overflow_results)


@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,18 @@ class CudfMergeM2 extends CudfAggregate {
if (n > 0) {
val mean = partialMean.getDouble(i)
val m2 = partialM2.getDouble(i)
val delta = mean - mergeMean
val newN = n + mergeN
mergeM2 += m2 + delta * delta * n * mergeN / newN
mergeMean = (mergeMean * mergeN + mean * n) / newN
mergeN = newN
if (mergeN == 0.0) {
mergeN = n
mergeMean = mean
mergeM2 = m2
} else {
val delta = mean - mergeMean
val newN = mergeN + n
val deltaN = delta / newN
mergeM2 += m2 + delta * deltaN * mergeN * n
mergeMean += deltaN * n
mergeN = newN
}
}
}

Expand Down