diff --git a/docs/compatibility.md b/docs/compatibility.md index d478ba9f894..4dc5420701d 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -68,6 +68,15 @@ floating point aggregation then the join may fail to work properly with the plug worked with plain Spark. This is behavior is enabled by default but can be disabled with the config [`spark.rapids.sql.variableFloatAgg.enabled`](additional-functionality/advanced_configs.md#sql.variableFloatAgg.enabled). +For standard deviation and variance aggregations (`stddev`, `stddev_pop`, `stddev_samp`, +`variance`, `var_pop`, and `var_samp`) on very large finite floating-point values, the exact +mathematical result can exceed the range of a double. In these overflow cases Spark CPU and the +RAPIDS Accelerator may report different IEEE floating-point sentinels, such as `NaN` versus +`+Infinity`, because partial aggregate state can be merged in a different order. `-Infinity` is +not an expected outcome for stddev/variance over finite inputs and is not treated as an accepted +overflow sentinel. This is limited to overflow behavior for extreme inputs; ordinary finite +inputs should still match within normal floating-point tolerance. + ### `0.0` vs `-0.0` Floating point allows zero to be encoded as `0.0` and `-0.0`, but the IEEE standard says that they diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index cc9013cd845..a83b6a8014a 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -721,7 +721,9 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True): """ _assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first) -def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]): +def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, + is_cpu_first=True, validate_execs_in_gpu_plan=[], + result_canonicalize_func_before_compare=None): """ Assert that the specified SQL query produces equal results on CPU and GPU. :param df_fun: a function that will create the dataframe @@ -731,6 +733,8 @@ def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=F :param debug: Boolean to indicate if the SQL output should be printed :param is_cpu_first: Boolean to indicate if the CPU should be run first or not :param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan. + :param result_canonicalize_func_before_compare: Function to canonicalize the CPU and GPU + results before comparison. :return: Assertion failure, if results from CPU and GPU do not match. """ if conf is None: @@ -745,7 +749,8 @@ def do_it_all(spark): return data_gen.debug_df(spark.sql(sql)) else: return spark.sql(sql) - assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first) + assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first, + result_canonicalize_func_before_compare=result_canonicalize_func_before_compare) def check_exception(actual_error, error_message): diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 5d7bece84e3..3393848bdeb 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -21,6 +21,7 @@ from conftest import is_not_utc from data_gen import * from functools import reduce +from pyspark.sql import Row from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import * from marks import * @@ -194,15 +195,24 @@ _init_list_with_decimals = _init_list + [ _decimals_with_nulls, _decimals_with_no_nulls] -_std_variance_issue_14681_gen = [ - ('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', DoubleGen()), ('c', DoubleGen())] - -# Grouped FP gens using bare DoubleGen()/FloatGen() on the aggregated columns. -# Their default special_cases inject NaN, -0.0, +-Inf, and max-fraction values, -# which exercise the corner-case paths for FP aggregate functions. -_init_list_with_decimals_and_floats = _init_list_with_decimals + [ - _std_variance_issue_14681_gen, - [('a', RepeatSeqGen(IntegerGen(), length=20)), ('b', FloatGen()), ('c', FloatGen())]] +_std_variance_common_fp_gens = [ + [('a', RepeatSeqGen(IntegerGen(), length=20)), + ('b', DoubleGen(min_exp=-200, max_exp=200, no_nans=True)), + ('c', DoubleGen(min_exp=-200, max_exp=200, no_nans=True))], + [('a', RepeatSeqGen(IntegerGen(), length=20)), + ('b', FloatGen(no_nans=True)), + ('c', FloatGen(no_nans=True))]] + +_init_list_with_decimals_and_common_floats = ( + _init_list_with_decimals + _std_variance_common_fp_gens) + +_std_variance_extreme_fp_gens = [ + [('a', RepeatSeqGen(IntegerGen(), length=20)), + ('b', DoubleGen(no_nans=True)), + ('c', DoubleGen(no_nans=True))], + [('a', RepeatSeqGen(IntegerGen(), length=20)), + ('b', FloatGen(no_nans=True)), + ('c', FloatGen(no_nans=True))]] # Used to test ANSI-mode fallback _no_overflow_ansi_gens = [ @@ -2346,16 +2356,37 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.sql.ansi.enabled': 'true'}) -# Tests for standard deviation and variance aggregations. -@ignore_order(local=True) -@approximate_float -@incompat -@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_floats, ids=idfn) -@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) -def test_std_variance(data_gen, conf): - if data_gen is _std_variance_issue_14681_gen: - pytest.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/14681') - +_STD_VARIANCE_OVERFLOW_SENTINEL = '__STD_VARIANCE_OVERFLOW__' +_STD_VARIANCE_FIELDS = { + 'stddev(b)', 'stddev_pop(b)', 'stddev_samp(b)', + 'variance(b)', 'var_pop(b)', 'var_samp(b)' +} + +def _canonicalize_std_variance_overflow_value(value): + # Only NaN and +Infinity are accepted overflow sentinels for std/variance + # over finite inputs: they can be explained by overflow plus partial merge + # order. -Infinity would indicate a different issue (e.g. negative M2 / + # sign bug) and must not be canonicalized away. + if isinstance(value, float) and (math.isnan(value) or value == math.inf): + return _STD_VARIANCE_OVERFLOW_SENTINEL + return value + +def _canonicalize_std_variance_overflow_rows(rows): + return [ + Row(**{ + field: _canonicalize_std_variance_overflow_value(row[field]) + if field in _STD_VARIANCE_FIELDS else row[field] + for field in row.__fields__ + }) if isinstance(row, Row) and hasattr(row, "__fields__") else row + for row in rows + ] + +def _canonicalize_std_variance_overflow_results(cpu, gpu): + return ( + _canonicalize_std_variance_overflow_rows(cpu), + _canonicalize_std_variance_overflow_rows(gpu)) + +def _assert_std_variance(data_gen, conf, result_canonicalize_func_before_compare=None): local_conf = copy_and_update(conf, { 'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}) assert_gpu_and_cpu_are_equal_sql( @@ -2369,7 +2400,8 @@ def test_std_variance(data_gen, conf): 'var_pop(b),' + 'var_samp(b)' + ' from data_table group by a', - conf=local_conf) + conf=local_conf, + result_canonicalize_func_before_compare=result_canonicalize_func_before_compare) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=1000), "data_table", @@ -2377,7 +2409,36 @@ def test_std_variance(data_gen, conf): 'stddev(b),' + 'stddev_samp(b)' ' from data_table', - conf=local_conf) + conf=local_conf, + result_canonicalize_func_before_compare=result_canonicalize_func_before_compare) + + +# Tests for standard deviation and variance aggregations on common finite values. +@ignore_order(local=True) +@approximate_float +@incompat +@pytest.mark.parametrize('data_gen', _init_list_with_decimals_and_common_floats, ids=idfn) +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +def test_std_variance(data_gen, conf): + _assert_std_variance(data_gen, conf) + + +# Extremely large FP inputs can make the true variance overflow double precision. +# Spark CPU and GPU may surface that overflow as NaN or +Infinity depending only on +# accumulation order. Keep this corner coverage, but compare those overflow +# sentinels loosely. -Infinity is not an accepted overflow sentinel because +# stddev/variance over finite inputs cannot reach it via overflow alone. +# See https://github.com/NVIDIA/spark-rapids/issues/14681. +@ignore_order(local=True) +@approximate_float +@incompat +@pytest.mark.parametrize('data_gen', _std_variance_extreme_fp_gens, ids=idfn) +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +def test_std_variance_extreme_floating_point(data_gen, conf): + _assert_std_variance( + data_gen, + conf, + result_canonicalize_func_before_compare=_canonicalize_std_variance_overflow_results) @ignore_order(local=True) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index 6f006e602b3..47df3f404a0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -250,11 +250,18 @@ class CudfMergeM2 extends CudfAggregate { if (n > 0) { val mean = partialMean.getDouble(i) val m2 = partialM2.getDouble(i) - val delta = mean - mergeMean - val newN = n + mergeN - mergeM2 += m2 + delta * delta * n * mergeN / newN - mergeMean = (mergeMean * mergeN + mean * n) / newN - mergeN = newN + if (mergeN == 0.0) { + mergeN = n + mergeMean = mean + mergeM2 = m2 + } else { + val delta = mean - mergeMean + val newN = mergeN + n + val deltaN = delta / newN + mergeM2 += m2 + delta * deltaN * mergeN * n + mergeMean += deltaN * n + mergeN = newN + } } }