diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q14.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q14.py index 57946e3044d..d353c19102c 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q14.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q14.py @@ -335,65 +335,66 @@ def polars_impl(run_config: RunConfig) -> QueryResult: item = get_data(run_config.dataset_path, "item", run_config.suffix) date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) + cross_items = build_cross_items( + store_sales, catalog_sales, web_sales, item, date_dim, year=year + ) + average_sales = build_average_sales( + store_sales, catalog_sales, web_sales, date_dim, year=year + ) + + # week_dates is ≤7 rows (one calendar week), computed once as a 1-partition frame. + # Push the week filter into each channel before the UNION via a semi-join so that + # ~99% of rows (everything outside the target week) are dropped before the + # expensive cross_items join and groupby. + target_week = ( + date_dim.filter( + (pl.col("d_year") == year + 1) + & (pl.col("d_moy") == 12) + & (pl.col("d_dom") == day) + ) + .select("d_week_seq") + .unique() + ) + week_dates = date_dim.join(target_week, on="d_week_seq").select("d_date_sk") + all_sales = pl.concat( [ - store_sales.select( + store_sales.join( + week_dates, left_on="ss_sold_date_sk", right_on="d_date_sk", how="semi" + ).select( [ pl.lit("store").alias("channel"), pl.col("ss_item_sk").alias("item_sk"), pl.col("ss_quantity").alias("quantity"), pl.col("ss_list_price").alias("list_price"), - pl.col("ss_sold_date_sk").alias("date_sk"), ] ), - catalog_sales.select( + catalog_sales.join( + week_dates, left_on="cs_sold_date_sk", right_on="d_date_sk", how="semi" + ).select( [ pl.lit("catalog").alias("channel"), pl.col("cs_item_sk").alias("item_sk"), pl.col("cs_quantity").alias("quantity"), pl.col("cs_list_price").alias("list_price"), - pl.col("cs_sold_date_sk").alias("date_sk"), ] ), - web_sales.select( + web_sales.join( + week_dates, left_on="ws_sold_date_sk", right_on="d_date_sk", how="semi" + ).select( [ pl.lit("web").alias("channel"), pl.col("ws_item_sk").alias("item_sk"), pl.col("ws_quantity").alias("quantity"), pl.col("ws_list_price").alias("list_price"), - pl.col("ws_sold_date_sk").alias("date_sk"), ] ), ] ) - cross_items = build_cross_items( - store_sales, catalog_sales, web_sales, item, date_dim, year=year - ) - average_sales = build_average_sales( - store_sales, catalog_sales, web_sales, date_dim, year=year - ) - - # d_week_seq target is the same for all 3 channels; compute it once. - target_week = ( - date_dim.filter( - (pl.col("d_year") == year + 1) - & (pl.col("d_moy") == 12) - & (pl.col("d_dom") == day) - ) - .select("d_week_seq") - .unique() - ) - week_dates = date_dim.join(target_week, on="d_week_seq").select("d_date_sk") - - # Build y: all 3 channels in a single pipeline. - # cross_items and average_sales each appear once — no CSE needed. - # After group_by the frame is tiny, so the cross join with the 1-row - # average_sales frame is negligible even if Polars fuses it into an IEJoin. y = ( all_sales.join(cross_items, left_on="item_sk", right_on="ss_item_sk") .join(item, left_on="item_sk", right_on="i_item_sk") - .join(week_dates, left_on="date_sk", right_on="d_date_sk") .group_by(["channel", "i_brand_id", "i_class_id", "i_category_id"]) .agg( [ diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q17.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q17.py index 997a1546d47..b26e23984e8 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q17.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q17.py @@ -110,38 +110,103 @@ def polars_impl(run_config: RunConfig) -> QueryResult: sort_by = {"i_item_id": False, "i_item_desc": False, "s_state": False} limit = 100 - store_sales_base = ( + q1 = f"{year}Q1" + q1_q3 = [f"{year}Q1", f"{year}Q2", f"{year}Q3"] + + # Pre-filter date_dim to only qualifying d_date_sk values. + d1_dates = date_dim.filter(pl.col("d_quarter_name") == q1).select("d_date_sk") + d_q3_dates = date_dim.filter(pl.col("d_quarter_name").is_in(q1_q3)).select( + "d_date_sk" + ) + + # store_returns has [6] partitions — at the broadcast limit. Filter it to Q1-Q3 dates + # first, then use the (customer, item) pairs it contains to pre-filter both store_sales + # and catalog_sales before those larger tables enter the expensive shuffle joins. + store_returns_filtered = store_returns.join( + d_q3_dates, left_on="sr_returned_date_sk", right_on="d_date_sk", how="semi" + ).select(["sr_customer_sk", "sr_item_sk", "sr_ticket_number", "sr_return_quantity"]) + + # (customer, item) pairs present in any qualifying store return; stays at [6] partitions + # so broadcast is free. Polars will CACHE this shared subplan. + sr_customer_item = store_returns_filtered.select(["sr_customer_sk", "sr_item_sk"]) + + store_sales_filtered = ( store_sales.join( - date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk", suffix="_d1" + d1_dates, left_on="ss_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["ss_customer_sk", "ss_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_store_sk", + "ss_ticket_number", + "ss_quantity", + ] + ) + .join( + item.select(["i_item_sk", "i_item_id", "i_item_desc"]), + left_on="ss_item_sk", + right_on="i_item_sk", + ) + .join( + store.select(["s_store_sk", "s_state"]), + left_on="ss_store_sk", + right_on="s_store_sk", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_ticket_number", + "ss_quantity", + "i_item_id", + "i_item_desc", + "s_state", + ] ) - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .filter(pl.col("d_quarter_name") == f"{year}Q1") ) - store_returns_base = store_returns.join( - date_dim, left_on="sr_returned_date_sk", right_on="d_date_sk", suffix="_d2" - ).filter(pl.col("d_quarter_name").is_in([f"{year}Q1", f"{year}Q2", f"{year}Q3"])) - - catalog_sales_base = catalog_sales.join( - date_dim, left_on="cs_sold_date_sk", right_on="d_date_sk", suffix="_d3" - ).filter(pl.col("d_quarter_name").is_in([f"{year}Q1", f"{year}Q2", f"{year}Q3"])) + catalog_sales_filtered = ( + catalog_sales.join( + d_q3_dates, left_on="cs_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["cs_bill_customer_sk", "cs_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select(["cs_bill_customer_sk", "cs_item_sk", "cs_quantity"]) + ) return QueryResult( frame=( - store_sales_base.join( - store_returns_base, + store_sales_filtered.join( + store_returns_filtered, left_on=["ss_customer_sk", "ss_item_sk", "ss_ticket_number"], right_on=["sr_customer_sk", "sr_item_sk", "sr_ticket_number"], - how="inner", - suffix="_sr", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_quantity", + "sr_return_quantity", + "i_item_id", + "i_item_desc", + "s_state", + ] ) .join( - catalog_sales_base, + catalog_sales_filtered, left_on=["ss_customer_sk", "ss_item_sk"], right_on=["cs_bill_customer_sk", "cs_item_sk"], - how="inner", - suffix="_cs", ) .group_by(["i_item_id", "i_item_desc", "s_state"]) .agg( diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q18.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q18.py index 9c2b9f227ef..5ef6017150e 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q18.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q18.py @@ -121,10 +121,7 @@ def polars_impl(run_config: RunConfig) -> QueryResult: catalog_sales = get_data( run_config.dataset_path, "catalog_sales", run_config.suffix ) - customer_demographics_1 = get_data( - run_config.dataset_path, "customer_demographics", run_config.suffix - ) - customer_demographics_2 = get_data( + customer_demographics = get_data( run_config.dataset_path, "customer_demographics", run_config.suffix ) customer = get_data(run_config.dataset_path, "customer", run_config.suffix) @@ -134,30 +131,52 @@ def polars_impl(run_config: RunConfig) -> QueryResult: date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) item = get_data(run_config.dataset_path, "item", run_config.suffix) + # Pre-filter each dimension table before joining against catalog_sales [45 partitions]. + # d_year not in GROUP BY — semi-join keeps only the date key in the pipeline. + filtered_dates = date_dim.filter(pl.col("d_year") == year).select("d_date_sk") + filtered_cd1 = customer_demographics.filter( + (pl.col("cd_gender") == gen) & (pl.col("cd_education_status") == es) + ).select(["cd_demo_sk", "cd_dep_count"]) + filtered_customer = customer.filter(pl.col("c_birth_month").is_in(month)).select( + ["c_customer_sk", "c_current_cdemo_sk", "c_current_addr_sk", "c_birth_year"] + ) + filtered_addr = customer_address.filter(pl.col("ca_state").is_in(state)).select( + ["ca_address_sk", "ca_county", "ca_state", "ca_country"] + ) + base_query = ( - catalog_sales.join(date_dim, left_on="cs_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="cs_item_sk", right_on="i_item_sk") + catalog_sales.select( + [ + "cs_sold_date_sk", + "cs_item_sk", + "cs_bill_cdemo_sk", + "cs_bill_customer_sk", + "cs_quantity", + "cs_list_price", + "cs_coupon_amt", + "cs_sales_price", + "cs_net_profit", + ] + ) .join( - customer_demographics_1, - left_on="cs_bill_cdemo_sk", - right_on="cd_demo_sk", - suffix="_cd1", + filtered_dates, left_on="cs_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + item.select(["i_item_sk", "i_item_id"]), + left_on="cs_item_sk", + right_on="i_item_sk", ) - .join(customer, left_on="cs_bill_customer_sk", right_on="c_customer_sk") + .join(filtered_cd1, left_on="cs_bill_cdemo_sk", right_on="cd_demo_sk") .join( - customer_demographics_2, + filtered_customer, left_on="cs_bill_customer_sk", right_on="c_customer_sk" + ) + .join( + customer_demographics.select("cd_demo_sk"), left_on="c_current_cdemo_sk", right_on="cd_demo_sk", - suffix="_cd2", - ) - .join(customer_address, left_on="c_current_addr_sk", right_on="ca_address_sk") - .filter( - (pl.col("cd_gender") == gen) - & (pl.col("cd_education_status") == es) - & pl.col("c_birth_month").is_in(month) - & (pl.col("d_year") == year) - & pl.col("ca_state").is_in(state) + how="semi", ) + .join(filtered_addr, left_on="c_current_addr_sk", right_on="ca_address_sk") ) agg_exprs = [ diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py index 998cccb6017..edf285e0d9a 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q2.py @@ -136,10 +136,6 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ), ] ) - # Step 2: Create wswscs CTE equivalent (aggregate by week and day of week) - # First join with date_dim to get day names - wscs_with_dates = wscs.join(date_dim, left_on="sold_date_sk", right_on="d_date_sk") - # Create separate aggregations for each day to better control null handling days = ( "Sunday", "Monday", @@ -158,35 +154,26 @@ def polars_impl(run_config: RunConfig) -> QueryResult: "fri_sales", "sat_sales", ) - # Start with all week sequences - all_weeks = wscs_with_dates.select("d_week_seq").unique() - wswscs = all_weeks - + # Pre-filter date_dim to 4 years ([year-1, year, year+1, year+2]) to capture + # boundary weeks that span year transitions (e.g. from Dec 28 to Jan 3). Filtering to + # only [year, year+1] incorrectly excludes Dec days whose d_week_seq also + # appears in year's date_dim, producing null sales for those boundary weeks. + date_dim_prefilter = date_dim.filter( + pl.col("d_year").is_in([year - 1, year, year + 1, year + 2]) + ).select(["d_date_sk", "d_week_seq", "d_day_name"]) wswscs = ( - wscs_with_dates.with_columns( + wscs.join(date_dim_prefilter, left_on="sold_date_sk", right_on="d_date_sk") + .group_by("d_week_seq") + .agg( [ pl.when(pl.col("d_day_name") == day) .then(pl.col("sales_price")) .otherwise(None) + .sum() .alias(name) for day, name in zip(days, day_cols, strict=True) ] ) - .group_by("d_week_seq") - .agg( - *(pl.col(name).sum().alias(name) for name in day_cols), - *(pl.col(name).count().alias(f"{name}_count") for name in day_cols), - ) - .with_columns( - [ - pl.when(pl.col(f"{name}_count") > 0) - .then(pl.col(name)) - .otherwise(None) - .alias(name) - for name in day_cols - ] - ) - .select(["d_week_seq", *day_cols]) ) # Step 3: Create year data (y subquery equivalent) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q23.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q23.py index debe1b512b3..dfdcef33744 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q23.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q23.py @@ -105,10 +105,16 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ) web_sales = get_data(run_config.dataset_path, "web_sales", run_config.suffix) + # Pre-filter date_dim to the 4-year window so the inner join with store_sales + # naturally excludes out-of-window records before the expensive item join. + year_set = [year, year + 1, year + 2, year + 3] + date_dim_years = date_dim.filter(pl.col("d_year").is_in(year_set)) + frequent_ss_items = ( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") + store_sales.join( + date_dim_years, left_on="ss_sold_date_sk", right_on="d_date_sk" + ) .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .filter(pl.col("d_year").is_in([year, year + 1, year + 2, year + 3])) .with_columns(pl.col("i_item_desc").str.slice(0, 30).alias("itemdesc")) .group_by(["itemdesc", "ss_item_sk", "d_date"]) .agg(pl.len().alias("cnt")) @@ -121,8 +127,11 @@ def polars_impl(run_config: RunConfig) -> QueryResult: # only valid because we know that the TPC-DS includes a foreign key here, so all # customers in store_sales _must_ be entries that exist somewhere in customer. store_sales.filter(pl.col("ss_customer_sk").is_not_null()) - .join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .filter(pl.col("d_year").is_in([year, year + 1, year + 2, year + 3])) + .join( + date_dim_years.select("d_date_sk"), + left_on="ss_sold_date_sk", + right_on="d_date_sk", + ) .group_by("ss_customer_sk") .agg((pl.col("ss_quantity") * pl.col("ss_sales_price")).sum().alias("csales")) ) @@ -146,13 +155,12 @@ def polars_impl(run_config: RunConfig) -> QueryResult: (pl.col("d_year") == year) & (pl.col("d_moy") == month) ).select("d_date_sk") + # Join order: most selective filters first (date_target ~1.2%, frequent_ss_items, + # best_customers semi ~5%), then customer last — it's a non-filtering name lookup + # that only adds c_last_name/c_first_name, so running it on the already-reduced + # row set avoids the full catalog_sales/web_sales scan width. catalog_part = ( - catalog_sales.join( - customer.select(["c_customer_sk", "c_last_name", "c_first_name"]), - left_on="cs_bill_customer_sk", - right_on="c_customer_sk", - ) - .join(date_target, left_on="cs_sold_date_sk", right_on="d_date_sk") + catalog_sales.join(date_target, left_on="cs_sold_date_sk", right_on="d_date_sk") .join(frequent_ss_items, left_on="cs_item_sk", right_on="ss_item_sk") .join( best_customers, @@ -160,17 +168,17 @@ def polars_impl(run_config: RunConfig) -> QueryResult: right_on="ss_customer_sk", how="semi", ) + .join( + customer.select(["c_customer_sk", "c_last_name", "c_first_name"]), + left_on="cs_bill_customer_sk", + right_on="c_customer_sk", + ) .group_by(["c_last_name", "c_first_name"]) .agg((pl.col("cs_quantity") * pl.col("cs_list_price")).sum().alias("sales")) ) web_part = ( - web_sales.join( - customer.select(["c_customer_sk", "c_last_name", "c_first_name"]), - left_on="ws_bill_customer_sk", - right_on="c_customer_sk", - ) - .join(date_target, left_on="ws_sold_date_sk", right_on="d_date_sk") + web_sales.join(date_target, left_on="ws_sold_date_sk", right_on="d_date_sk") .join(frequent_ss_items, left_on="ws_item_sk", right_on="ss_item_sk") .join( best_customers, @@ -178,6 +186,11 @@ def polars_impl(run_config: RunConfig) -> QueryResult: right_on="ss_customer_sk", how="semi", ) + .join( + customer.select(["c_customer_sk", "c_last_name", "c_first_name"]), + left_on="ws_bill_customer_sk", + right_on="c_customer_sk", + ) .group_by(["c_last_name", "c_first_name"]) .agg((pl.col("ws_quantity") * pl.col("ws_list_price")).sum().alias("sales")) ) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py index 8006585a170..d431f5507b8 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q25.py @@ -96,17 +96,6 @@ def polars_impl(run_config: RunConfig) -> QueryResult: store = get_data(run_config.dataset_path, "store", run_config.suffix) item = get_data(run_config.dataset_path, "item", run_config.suffix) - d1, d2, d3 = [ - date_dim.clone().select( - [ - pl.col("d_date_sk").alias(f"{p}_date_sk"), - pl.col("d_moy").alias(f"{p}_moy"), - pl.col("d_year").alias(f"{p}_year"), - ] - ) - for p in ("d1", "d2", "d3") - ] - sort_by = { "i_item_id": False, "i_item_desc": False, @@ -114,31 +103,103 @@ def polars_impl(run_config: RunConfig) -> QueryResult: "s_store_name": False, } limit = 100 + + # d1: only April of the target year — very selective (~1/60 of date_dim rows). + # d2/d3: from April to October of the target year: same condition, one pre-filtered frame. + d1_dates = date_dim.filter( + (pl.col("d_moy") == 4) & (pl.col("d_year") == year) + ).select("d_date_sk") + d2_d3_dates = date_dim.filter( + pl.col("d_moy").is_between(4, 10) & (pl.col("d_year") == year) + ).select("d_date_sk") + + # store_returns [6] ≤ broadcast limit: filter to qualifying return dates first, + # then extract (customer, item) pairs to pre-filter ss and cs before shuffle joins. + store_returns_filtered = store_returns.join( + d2_d3_dates, left_on="sr_returned_date_sk", right_on="d_date_sk", how="semi" + ).select(["sr_customer_sk", "sr_item_sk", "sr_ticket_number", "sr_net_loss"]) + sr_customer_item = store_returns_filtered.select(["sr_customer_sk", "sr_item_sk"]) + + store_sales_filtered = ( + store_sales.join( + d1_dates, left_on="ss_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["ss_customer_sk", "ss_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_store_sk", + "ss_ticket_number", + "ss_net_profit", + ] + ) + .join( + item.select(["i_item_sk", "i_item_id", "i_item_desc"]), + left_on="ss_item_sk", + right_on="i_item_sk", + ) + .join( + store.select(["s_store_sk", "s_store_id", "s_store_name"]), + left_on="ss_store_sk", + right_on="s_store_sk", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_ticket_number", + "ss_net_profit", + "i_item_id", + "i_item_desc", + "s_store_id", + "s_store_name", + ] + ) + ) + + catalog_sales_filtered = ( + catalog_sales.join( + d2_d3_dates, left_on="cs_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["cs_bill_customer_sk", "cs_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select(["cs_bill_customer_sk", "cs_item_sk", "cs_net_profit"]) + ) + return QueryResult( frame=( - store_sales.join(d1, left_on="ss_sold_date_sk", right_on="d1_date_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .join( - store_returns, + store_sales_filtered.join( + store_returns_filtered, left_on=["ss_customer_sk", "ss_item_sk", "ss_ticket_number"], right_on=["sr_customer_sk", "sr_item_sk", "sr_ticket_number"], ) - .join(d2, left_on="sr_returned_date_sk", right_on="d2_date_sk") + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_net_profit", + "sr_net_loss", + "i_item_id", + "i_item_desc", + "s_store_id", + "s_store_name", + ] + ) .join( - catalog_sales, + catalog_sales_filtered, left_on=["ss_customer_sk", "ss_item_sk"], right_on=["cs_bill_customer_sk", "cs_item_sk"], ) - .join(d3, left_on="cs_sold_date_sk", right_on="d3_date_sk") - .filter( - (pl.col("d1_moy") == 4) - & (pl.col("d1_year") == year) - & (pl.col("d2_moy").is_between(4, 10)) - & (pl.col("d2_year") == year) - & (pl.col("d3_moy").is_between(4, 10)) - & (pl.col("d3_year") == year) - ) .group_by(["i_item_id", "i_item_desc", "s_store_id", "s_store_name"]) .agg( [ diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q29.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q29.py index a39954745bd..6bdb7593929 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q29.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q29.py @@ -97,23 +97,6 @@ def polars_impl(run_config: RunConfig) -> QueryResult: store = get_data(run_config.dataset_path, "store", run_config.suffix) item = get_data(run_config.dataset_path, "item", run_config.suffix) - d1, d2 = [ - date_dim.clone().select( - [ - pl.col("d_date_sk").alias(f"{p}_date_sk"), - pl.col("d_moy").alias(f"{p}_moy"), - pl.col("d_year").alias(f"{p}_year"), - ] - ) - for p in ("d1", "d2") - ] - d3 = date_dim.clone().select( - [ - pl.col("d_date_sk").alias("d3_date_sk"), - pl.col("d_year").alias("d3_year"), - ] - ) - sort_by = { "i_item_id": False, "i_item_desc": False, @@ -121,30 +104,107 @@ def polars_impl(run_config: RunConfig) -> QueryResult: "s_store_name": False, } limit = 100 + + # d1: one specific month of the target year — most selective filter. + # d2: 4-month window of the target year. + # d3: 3-year window — less selective but still worth pushing before the cs shuffle join. + d1_dates = date_dim.filter( + (pl.col("d_moy") == month) & (pl.col("d_year") == year) + ).select("d_date_sk") + d2_dates = date_dim.filter( + pl.col("d_moy").is_between(month, month + 3) & (pl.col("d_year") == year) + ).select("d_date_sk") + d3_dates = date_dim.filter( + pl.col("d_year").is_in([year, year + 1, year + 2]) + ).select("d_date_sk") + + # store_returns [6] ≤ broadcast limit: apply d2 date filter, then use + # (customer, item) pairs to pre-filter ss and cs before shuffle joins. + store_returns_filtered = store_returns.join( + d2_dates, left_on="sr_returned_date_sk", right_on="d_date_sk", how="semi" + ).select(["sr_customer_sk", "sr_item_sk", "sr_ticket_number", "sr_return_quantity"]) + sr_customer_item = store_returns_filtered.select(["sr_customer_sk", "sr_item_sk"]) + + store_sales_filtered = ( + store_sales.join( + d1_dates, left_on="ss_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["ss_customer_sk", "ss_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_store_sk", + "ss_ticket_number", + "ss_quantity", + ] + ) + .join( + item.select(["i_item_sk", "i_item_id", "i_item_desc"]), + left_on="ss_item_sk", + right_on="i_item_sk", + ) + .join( + store.select(["s_store_sk", "s_store_id", "s_store_name"]), + left_on="ss_store_sk", + right_on="s_store_sk", + ) + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_ticket_number", + "ss_quantity", + "i_item_id", + "i_item_desc", + "s_store_id", + "s_store_name", + ] + ) + ) + + catalog_sales_filtered = ( + catalog_sales.join( + d3_dates, left_on="cs_sold_date_sk", right_on="d_date_sk", how="semi" + ) + .join( + sr_customer_item, + left_on=["cs_bill_customer_sk", "cs_item_sk"], + right_on=["sr_customer_sk", "sr_item_sk"], + how="semi", + ) + .select(["cs_bill_customer_sk", "cs_item_sk", "cs_quantity"]) + ) + return QueryResult( frame=( - store_sales.join(d1, left_on="ss_sold_date_sk", right_on="d1_date_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .join( - store_returns, + store_sales_filtered.join( + store_returns_filtered, left_on=["ss_customer_sk", "ss_item_sk", "ss_ticket_number"], right_on=["sr_customer_sk", "sr_item_sk", "sr_ticket_number"], ) - .join(d2, left_on="sr_returned_date_sk", right_on="d2_date_sk") + .select( + [ + "ss_customer_sk", + "ss_item_sk", + "ss_quantity", + "sr_return_quantity", + "i_item_id", + "i_item_desc", + "s_store_id", + "s_store_name", + ] + ) .join( - catalog_sales, + catalog_sales_filtered, left_on=["ss_customer_sk", "ss_item_sk"], right_on=["cs_bill_customer_sk", "cs_item_sk"], ) - .join(d3, left_on="cs_sold_date_sk", right_on="d3_date_sk") - .filter( - (pl.col("d1_moy") == month) - & (pl.col("d1_year") == year) - & (pl.col("d2_moy").is_between(month, month + 3)) - & (pl.col("d2_year") == year) - & (pl.col("d3_year").is_in([year, year + 1, year + 2])) - ) .group_by(["i_item_id", "i_item_desc", "s_store_id", "s_store_name"]) .agg( [ diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q43.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q43.py index b0c1023d655..cc885a05fe8 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q43.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q43.py @@ -91,10 +91,20 @@ def polars_impl(run_config: RunConfig) -> QueryResult: year = params["year"] gmt = params["gmt"] - # Load tables date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) store_sales = get_data(run_config.dataset_path, "store_sales", run_config.suffix) store = get_data(run_config.dataset_path, "store", run_config.suffix) + + # Pre-filter lookup tables before joining against store_sales [58 partitions]. + # d_year not needed after filter; d_day_name drives the conditional agg columns. + filtered_dates = date_dim.filter(pl.col("d_year") == year).select( + ["d_date_sk", "d_day_name"] + ) + # s_gmt_offset not needed after filter; keep group-by output columns. + filtered_store = store.filter(pl.col("s_gmt_offset") == gmt).select( + ["s_store_sk", "s_store_name", "s_store_id"] + ) + sort_by = { "s_store_name": False, "s_store_id": False, @@ -107,15 +117,13 @@ def polars_impl(run_config: RunConfig) -> QueryResult: "sat_sales": False, } limit = 100 - # Main query with joins and conditional aggregations return QueryResult( frame=( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .filter((pl.col("s_gmt_offset") == gmt) & (pl.col("d_year") == year)) + store_sales.select(["ss_sold_date_sk", "ss_store_sk", "ss_sales_price"]) + .join(filtered_dates, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join(filtered_store, left_on="ss_store_sk", right_on="s_store_sk") .with_columns( [ - # Pre-compute conditional sales amounts for each day pl.when(pl.col("d_day_name") == "Sunday") .then(pl.col("ss_sales_price")) .otherwise(0) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py index 204c9c90a76..bdff35af0f5 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q44.py @@ -90,87 +90,57 @@ def polars_impl(run_config: RunConfig) -> QueryResult: store_sk = params["store_sk"] - # Load tables store_sales = get_data(run_config.dataset_path, "store_sales", run_config.suffix) item = get_data(run_config.dataset_path, "item", run_config.suffix) - # Step 1: Calculate benchmark (average profit for store with null demographics) + # Benchmark: global mean profit for the store with null demographics — single row. + # Use a constant-key equi-join instead of how="cross" so the streaming executor + # treats it as a broadcast join (1 row ≤ broadcast_join_limit) rather than a + # ConditionalJoin that falls back from multi-partition mode. benchmark = ( store_sales.filter( - (pl.col("ss_store_sk") == store_sk) & (pl.col("ss_cdemo_sk").is_null()) + (pl.col("ss_store_sk") == store_sk) & pl.col("ss_cdemo_sk").is_null() ) - .group_by("ss_store_sk") - .agg( - [ - pl.col("ss_net_profit").mean().alias("profit_mean"), - pl.col("ss_net_profit").count().alias("profit_count"), - ] - ) - .with_columns( - [ - pl.when(pl.col("profit_count") > 0) - .then(pl.col("profit_mean")) - .otherwise(None) - .alias("benchmark_profit") - ] - ) - .select("benchmark_profit") + .select(pl.col("ss_net_profit").mean().alias("benchmark_profit")) + .with_columns(pl.lit(1, dtype=pl.Int32).alias("_key")) ) - # Step 2: Calculate item-level average profits for store + # Item-level average profits, broadcast-joined with the 1-row benchmark. item_profits = ( store_sales.filter(pl.col("ss_store_sk") == store_sk) .group_by("ss_item_sk") - .agg( - [ - pl.col("ss_net_profit").mean().alias("profit_mean"), - pl.col("ss_net_profit").count().alias("profit_count"), - ] - ) - .with_columns( - [ - pl.when(pl.col("profit_count") > 0) - .then(pl.col("profit_mean")) - .otherwise(None) - .alias("avg(ss_net_profit)") - ] - ) - .drop(["profit_mean", "profit_count"]) - .join(benchmark, how="cross") - .filter(pl.col("avg(ss_net_profit)") > (0.9 * pl.col("benchmark_profit"))) + .agg(pl.col("ss_net_profit").mean().alias("avg_profit")) + .with_columns(pl.lit(1, dtype=pl.Int32).alias("_key")) + .join(benchmark, on="_key") + .filter(pl.col("avg_profit") > 0.9 * pl.col("benchmark_profit")) + .select(["ss_item_sk", "avg_profit"]) ) - # Step 3: Create ascending ranking (worst to best) ascending_rank = ( item_profits.with_columns( - [pl.col("avg(ss_net_profit)").rank(method="ordinal").alias("rnk")] + pl.col("avg_profit").rank(method="ordinal").alias("rnk") ) .filter(pl.col("rnk") < 11) .select(["ss_item_sk", "rnk"]) ) - # Step 4: Create descending ranking (best to worst) descending_rank = ( item_profits.with_columns( - [ - pl.col("avg(ss_net_profit)") - .rank(method="ordinal", descending=True) - .alias("rnk") - ] + pl.col("avg_profit").rank(method="ordinal", descending=True).alias("rnk") ) .filter(pl.col("rnk") < 11) .select(["ss_item_sk", "rnk"]) ) + item_cols = item.select(["i_item_sk", "i_product_name"]) sort_by = {"rnk": False} limit = 100 - # Step 5: Join rankings and get product names return QueryResult( frame=( ascending_rank.join(descending_rank, on="rnk", how="inner", suffix="_desc") - .join(item, left_on="ss_item_sk", right_on="i_item_sk", how="inner") + .join(item_cols, left_on="ss_item_sk", right_on="i_item_sk", how="inner") .join( - item, + item_cols, left_on="ss_item_sk_desc", right_on="i_item_sk", how="inner", diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q52.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q52.py index 2a8f74151b3..b391e49cda9 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q52.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q52.py @@ -69,15 +69,21 @@ def polars_impl(run_config: RunConfig) -> QueryResult: sort_by = {"d_year": False, "ext_price": True, "brand_id": False} limit = 100 + + # Pre-filter both lookup tables before joining against store_sales [87 partitions]. + # date_dim keeps d_year because it appears in the GROUP BY. + filtered_dates = date_dim.filter( + (pl.col("d_moy") == month) & (pl.col("d_year") == year) + ).select(["d_date_sk", "d_year"]) + filtered_item = item.filter(pl.col("i_manager_id") == manager_id).select( + ["i_item_sk", "i_brand", "i_brand_id"] + ) + return QueryResult( frame=( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .filter( - (pl.col("i_manager_id") == manager_id) - & (pl.col("d_moy") == month) - & (pl.col("d_year") == year) - ) + store_sales.select(["ss_sold_date_sk", "ss_item_sk", "ss_ext_sales_price"]) + .join(filtered_dates, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join(filtered_item, left_on="ss_item_sk", right_on="i_item_sk") .group_by(["d_year", "i_brand", "i_brand_id"]) .agg(pl.col("ss_ext_sales_price").sum().alias("ext_price")) .select( diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q53.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q53.py index 88a9e1992ee..dcc897de935 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q53.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q53.py @@ -101,25 +101,37 @@ def polars_impl(run_config: RunConfig) -> QueryResult: date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) store = get_data(run_config.dataset_path, "store", run_config.suffix) month_seq_list = list(range(dms, dms + 12)) + + # Pre-filter lookup tables before joining against store_sales [87 partitions]. + # date_dim: keep d_qoy because it appears in the GROUP BY. + filtered_dates = date_dim.filter( + pl.col("d_month_seq").is_in(month_seq_list) + ).select(["d_date_sk", "d_qoy"]) + # item: apply both OR'd rule groups up front; only i_manufact_id needed after. + filtered_item = item.filter( + ( + pl.col("i_category").is_in(categories1) + & pl.col("i_class").is_in(classes1) + & pl.col("i_brand").is_in(brands1) + ) + | ( + pl.col("i_category").is_in(categories2) + & pl.col("i_class").is_in(classes2) + & pl.col("i_brand").is_in(brands2) + ) + ).select(["i_item_sk", "i_manufact_id"]) + grouped_data = ( - store_sales.join(item, left_on="ss_item_sk", right_on="i_item_sk") - .join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .filter(pl.col("d_month_seq").is_in(month_seq_list)) - .filter( - # First rule group - ( - (pl.col("i_category").is_in(categories1)) - & (pl.col("i_class").is_in(classes1)) - & (pl.col("i_brand").is_in(brands1)) - ) - | - # Second rule group - ( - (pl.col("i_category").is_in(categories2)) - & (pl.col("i_class").is_in(classes2)) - & (pl.col("i_brand").is_in(brands2)) - ) + store_sales.select( + ["ss_sold_date_sk", "ss_item_sk", "ss_store_sk", "ss_sales_price"] + ) + .join(filtered_item, left_on="ss_item_sk", right_on="i_item_sk") + .join(filtered_dates, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join( + store.select("s_store_sk"), + left_on="ss_store_sk", + right_on="s_store_sk", + how="semi", ) .group_by(["i_manufact_id", "d_qoy"]) .agg([pl.col("ss_sales_price").sum().alias("sum_sales_raw")]) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q55.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q55.py index e6cfbfef9e6..eb3d1eb019e 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q55.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q55.py @@ -65,15 +65,25 @@ def polars_impl(run_config: RunConfig) -> QueryResult: item = get_data(run_config.dataset_path, "item", run_config.suffix) sort_by = {"ext_price": True, "brand_id": False} limit = 100 + + # d_year not in GROUP BY so date filter can be a semi-join (no date columns needed). + filtered_dates = date_dim.filter( + (pl.col("d_moy") == month) & (pl.col("d_year") == year) + ).select("d_date_sk") + filtered_item = item.filter(pl.col("i_manager_id") == manager_id).select( + ["i_item_sk", "i_brand", "i_brand_id"] + ) + return QueryResult( frame=( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .filter( - (pl.col("i_manager_id") == manager_id) - & (pl.col("d_moy") == month) - & (pl.col("d_year") == year) + store_sales.select(["ss_sold_date_sk", "ss_item_sk", "ss_ext_sales_price"]) + .join( + filtered_dates, + left_on="ss_sold_date_sk", + right_on="d_date_sk", + how="semi", ) + .join(filtered_item, left_on="ss_item_sk", right_on="i_item_sk") .group_by(["i_brand", "i_brand_id"]) .agg(pl.col("ss_ext_sales_price").sum().alias("ext_price")) .select( diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q63.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q63.py index 3e9b06cb553..5bcd47a8ddc 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q63.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q63.py @@ -89,42 +89,54 @@ def polars_impl(run_config: RunConfig) -> QueryResult: date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) store = get_data(run_config.dataset_path, "store", run_config.suffix) - inner_query = ( - store_sales.join(item, left_on="ss_item_sk", right_on="i_item_sk") - .join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .filter( - pl.col("d_month_seq").is_in([dms + i for i in range(12)]) - & ( - ( - pl.col("i_category").is_in(["Books", "Children", "Electronics"]) - & pl.col("i_class").is_in( - ["personal", "portable", "reference", "self-help"] - ) - & pl.col("i_brand").is_in( - [ - "scholaramalgamalg #14", - "scholaramalgamalg #7", - "exportiunivamalg #9", - "scholaramalgamalg #9", - ] - ) - ) - | ( - pl.col("i_category").is_in(["Women", "Music", "Men"]) - & pl.col("i_class").is_in( - ["accessories", "classical", "fragrances", "pants"] - ) - & pl.col("i_brand").is_in( - [ - "amalgimporto #1", - "edu packscholar #1", - "exportiimporto #1", - "importoamalg #1", - ] - ) - ) + # Pre-filter both lookup tables before joining against store_sales [58 partitions]. + # item: apply both OR'd rule groups up front; only i_manager_id needed after. + # date_dim: keep d_moy because it appears in the GROUP BY. + filtered_item = item.filter( + ( + pl.col("i_category").is_in(["Books", "Children", "Electronics"]) + & pl.col("i_class").is_in( + ["personal", "portable", "reference", "self-help"] + ) + & pl.col("i_brand").is_in( + [ + "scholaramalgamalg #14", + "scholaramalgamalg #7", + "exportiunivamalg #9", + "scholaramalgamalg #9", + ] + ) + ) + | ( + pl.col("i_category").is_in(["Women", "Music", "Men"]) + & pl.col("i_class").is_in( + ["accessories", "classical", "fragrances", "pants"] ) + & pl.col("i_brand").is_in( + [ + "amalgimporto #1", + "edu packscholar #1", + "exportiimporto #1", + "importoamalg #1", + ] + ) + ) + ).select(["i_item_sk", "i_manager_id"]) + filtered_dates = date_dim.filter( + pl.col("d_month_seq").is_in([dms + i for i in range(12)]) + ).select(["d_date_sk", "d_moy"]) + + inner_query = ( + store_sales.select( + ["ss_sold_date_sk", "ss_item_sk", "ss_store_sk", "ss_sales_price"] + ) + .join(filtered_item, left_on="ss_item_sk", right_on="i_item_sk") + .join(filtered_dates, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join( + store.select("s_store_sk"), + left_on="ss_store_sk", + right_on="s_store_sk", + how="semi", ) .group_by(["i_manager_id", "d_moy"]) .agg([pl.col("ss_sales_price").sum().alias("sum_sales")]) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q67.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q67.py index 0c6859ca55a..855c98a1b60 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q67.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q67.py @@ -214,11 +214,35 @@ def polars_impl(run_config: RunConfig) -> QueryResult: store = get_data(run_config.dataset_path, "store", run_config.suffix) item = get_data(run_config.dataset_path, "item", run_config.suffix) + # Pre-filter date_dim to the 12-month window before joining against store_sales [58]. + # d_month_seq not needed after filter; keep group-by output columns. + filtered_dates = date_dim.filter( + pl.col("d_month_seq").is_between(dms, dms + 11) + ).select(["d_date_sk", "d_year", "d_qoy", "d_moy"]) + base_data = ( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") - .filter(pl.col("d_month_seq").is_between(dms, dms + 11)) + store_sales.select( + [ + "ss_sold_date_sk", + "ss_item_sk", + "ss_store_sk", + "ss_sales_price", + "ss_quantity", + ] + ) + .join(filtered_dates, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join( + store.select(["s_store_sk", "s_store_id"]), + left_on="ss_store_sk", + right_on="s_store_sk", + ) + .join( + item.select( + ["i_item_sk", "i_category", "i_class", "i_brand", "i_product_name"] + ), + left_on="ss_item_sk", + right_on="i_item_sk", + ) .with_columns( (pl.col("ss_sales_price") * pl.col("ss_quantity")) .fill_null(0) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py index e05d9b28f65..8bcaf67e9b8 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q76.py @@ -106,10 +106,16 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ) item = get_data(run_config.dataset_path, "item", run_config.suffix) date_dim = get_data(run_config.dataset_path, "date_dim", run_config.suffix) + + # Project lookup tables to only the columns needed in each component. + date_cols = date_dim.select(["d_date_sk", "d_year", "d_qoy"]) + item_cols = item.select(["i_item_sk", "i_category"]) + store_component = ( store_sales.filter(pl.col(nullcol_ss).is_null()) - .join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="ss_item_sk", right_on="i_item_sk") + .select(["ss_sold_date_sk", "ss_item_sk", "ss_ext_sales_price"]) + .join(date_cols, left_on="ss_sold_date_sk", right_on="d_date_sk") + .join(item_cols, left_on="ss_item_sk", right_on="i_item_sk") .select( [ pl.lit("store").alias("channel"), @@ -123,8 +129,9 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ) web_component = ( web_sales.filter(pl.col(nullcol_ws).is_null()) - .join(date_dim, left_on="ws_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="ws_item_sk", right_on="i_item_sk") + .select(["ws_sold_date_sk", "ws_item_sk", "ws_ext_sales_price"]) + .join(date_cols, left_on="ws_sold_date_sk", right_on="d_date_sk") + .join(item_cols, left_on="ws_item_sk", right_on="i_item_sk") .select( [ pl.lit("web").alias("channel"), @@ -138,8 +145,9 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ) catalog_component = ( catalog_sales.filter(pl.col(nullcol_cs).is_null()) - .join(date_dim, left_on="cs_sold_date_sk", right_on="d_date_sk") - .join(item, left_on="cs_item_sk", right_on="i_item_sk") + .select(["cs_sold_date_sk", "cs_item_sk", "cs_ext_sales_price"]) + .join(date_cols, left_on="cs_sold_date_sk", right_on="d_date_sk") + .join(item_cols, left_on="cs_item_sk", right_on="i_item_sk") .select( [ pl.lit("catalog").alias("channel"), @@ -166,10 +174,7 @@ def polars_impl(run_config: RunConfig) -> QueryResult: .agg( [ pl.len().cast(pl.Int64).alias("sales_cnt"), - pl.when(pl.col("ext_sales_price").count() > 0) - .then(pl.col("ext_sales_price").sum()) - .otherwise(None) - .alias("sales_amt"), + pl.col("ext_sales_price").sum().alias("sales_amt"), ] ) .select( diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q8.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q8.py index 2dad75b9ef6..3c2916e29a4 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q8.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q8.py @@ -100,18 +100,30 @@ def polars_impl(run_config: RunConfig) -> QueryResult: preferred_customer_zips, on="ca_zip", how="inner" ).select("ca_zip") - # Main query: join store_sales with date_dim, store, and filter by zip codes + # Pre-filter date_dim; d_year/d_qoy not needed after filter — semi-join. + filtered_dates = date_dim.filter( + (pl.col("d_year") == year) & (pl.col("d_qoy") == qoy) + ).select("d_date_sk") + return QueryResult( frame=( - store_sales.join(date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk") - .join(store, left_on="ss_store_sk", right_on="s_store_sk") + store_sales.select(["ss_sold_date_sk", "ss_store_sk", "ss_net_profit"]) + .join( + filtered_dates, + left_on="ss_sold_date_sk", + right_on="d_date_sk", + how="semi", + ) + .join( + store.select(["s_store_sk", "s_store_name", "s_zip"]), + left_on="ss_store_sk", + right_on="s_store_sk", + ) .join( intersect_zips, left_on=pl.col("s_zip").str.slice(0, 2), right_on=pl.col("ca_zip").str.slice(0, 2), ) - .filter(pl.col("d_qoy") == qoy) - .filter(pl.col("d_year") == year) .group_by("s_store_name") .agg(pl.col("ss_net_profit").sum().alias("sum")) .sort("s_store_name", nulls_last=True) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q88.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q88.py index a086bb609cf..304e9404117 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q88.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q88.py @@ -142,81 +142,76 @@ def polars_impl(run_config: RunConfig) -> QueryResult: ) time_dim = get_data(run_config.dataset_path, "time_dim", run_config.suffix) store = get_data(run_config.dataset_path, "store", run_config.suffix) - hd_filter = ( + + # Pre-filter each small table before joining against store_sales [58 partitions]. + filtered_hdemo = household_demographics.filter( ((pl.col("hd_dep_count") == hd1) & (pl.col("hd_vehicle_count") <= hd1 + 2)) | ((pl.col("hd_dep_count") == hd2) & (pl.col("hd_vehicle_count") <= hd2 + 2)) | ((pl.col("hd_dep_count") == hd3) & (pl.col("hd_vehicle_count") <= hd3 + 2)) + ).select("hd_demo_sk") + filtered_store = store.filter(pl.col("s_store_name") == s_store_name).select( + "s_store_sk" ) - base_query = ( - store_sales.join( - time_dim, left_on="ss_sold_time_sk", right_on="t_time_sk", how="inner" - ) - .join( - household_demographics, - left_on="ss_hdemo_sk", - right_on="hd_demo_sk", - how="inner", - ) - .join(store, left_on="ss_store_sk", right_on="s_store_sk", how="inner") - .filter( - hd_filter - & ( - pl.col("s_store_name").is_not_null() - & (pl.col("s_store_name") == s_store_name) - ) + # Restrict time_dim to the union of all 8 slot conditions; every surviving row maps + # to exactly one bucket, so the downstream pl.when chain is exhaustive. + filtered_time = time_dim.filter( + ((pl.col("t_hour") == 8) & (pl.col("t_minute") >= 30)) + | pl.col("t_hour").is_in([9, 10, 11]) + | ((pl.col("t_hour") == 12) & (pl.col("t_minute") < 30)) + ).select(["t_time_sk", "t_hour", "t_minute"]) + + bucket_names = [ + "h8_30_to_9", + "h9_to_9_30", + "h9_30_to_10", + "h10_to_10_30", + "h10_30_to_11", + "h11_to_11_30", + "h11_30_to_12", + "h12_to_12_30", + ] + + # Collapse the 58-partition store_sales pipeline to an 8-row bucket-count table first. + # The 8 conditional sums in the final select then operate on [1] partition, so even if + # the streaming executor creates separate sub-plans for each sum, each reads only the + # tiny CACHE'd group_by output rather than re-scanning store_sales. + counts_lf = ( + store_sales.select(["ss_sold_time_sk", "ss_hdemo_sk", "ss_store_sk"]) + .join(filtered_time, left_on="ss_sold_time_sk", right_on="t_time_sk") + .join(filtered_hdemo, left_on="ss_hdemo_sk", right_on="hd_demo_sk", how="semi") + .join(filtered_store, left_on="ss_store_sk", right_on="s_store_sk", how="semi") + .select( + pl.when((pl.col("t_hour") == 8) & (pl.col("t_minute") >= 30)) + .then(pl.lit(0)) + .when((pl.col("t_hour") == 9) & (pl.col("t_minute") < 30)) + .then(pl.lit(1)) + .when((pl.col("t_hour") == 9) & (pl.col("t_minute") >= 30)) + .then(pl.lit(2)) + .when((pl.col("t_hour") == 10) & (pl.col("t_minute") < 30)) + .then(pl.lit(3)) + .when((pl.col("t_hour") == 10) & (pl.col("t_minute") >= 30)) + .then(pl.lit(4)) + .when((pl.col("t_hour") == 11) & (pl.col("t_minute") < 30)) + .then(pl.lit(5)) + .when((pl.col("t_hour") == 11) & (pl.col("t_minute") >= 30)) + .then(pl.lit(6)) + .when((pl.col("t_hour") == 12) & (pl.col("t_minute") < 30)) + .then(pl.lit(7)) + .alias("bucket") ) + .group_by("bucket") + .agg(pl.len().cast(pl.Int64).alias("cnt")) ) + return QueryResult( - frame=base_query.select( + frame=counts_lf.select( [ - pl.when((pl.col("t_hour") == 8) & (pl.col("t_minute") >= 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h8_30_to_9"), - pl.when((pl.col("t_hour") == 9) & (pl.col("t_minute") < 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h9_to_9_30"), - pl.when((pl.col("t_hour") == 9) & (pl.col("t_minute") >= 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h9_30_to_10"), - pl.when((pl.col("t_hour") == 10) & (pl.col("t_minute") < 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h10_to_10_30"), - pl.when((pl.col("t_hour") == 10) & (pl.col("t_minute") >= 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h10_30_to_11"), - pl.when((pl.col("t_hour") == 11) & (pl.col("t_minute") < 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h11_to_11_30"), - pl.when((pl.col("t_hour") == 11) & (pl.col("t_minute") >= 30)) - .then(1) - .otherwise(0) - .sum() - .cast(pl.Int64) - .alias("h11_30_to_12"), - pl.when((pl.col("t_hour") == 12) & (pl.col("t_minute") < 30)) - .then(1) - .otherwise(0) + pl.when(pl.col("bucket") == i) + .then(pl.col("cnt")) + .otherwise(pl.lit(0).cast(pl.Int64)) .sum() - .cast(pl.Int64) - .alias("h12_to_12_30"), + .alias(name) + for i, name in enumerate(bucket_names) ] ), sort_by=[], diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q9.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q9.py index d42218179ea..142c94c3477 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q9.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q9.py @@ -102,57 +102,56 @@ def polars_impl(run_config: RunConfig) -> QueryResult: aggcelse = params["aggcelse"] rc = params["rc"] - # Load required tables store_sales = get_data(run_config.dataset_path, "store_sales", run_config.suffix) reason = get_data(run_config.dataset_path, "reason", run_config.suffix) - # Define bucket configurations: (min_qty, max_qty, count_threshold) - buckets = [ - (1, 20, rc[0]), - (21, 40, rc[1]), - (41, 60, rc[2]), - (61, 80, rc[3]), - (81, 100, rc[4]), - ] - - bucket_expressions = [] - for i, (min_qty, max_qty, _) in enumerate(buckets, 1): - condition = pl.col("ss_quantity").is_between(min_qty, max_qty, closed="both") - bucket_expressions.extend( - [ - condition.sum().alias(f"count_{i}"), - pl.when(condition) - .then(pl.col(aggcthen)) - .otherwise(None) - .mean() - .alias(f"avg_then_{i}"), - pl.when(condition) - .then(pl.col(aggcelse)) - .otherwise(None) - .mean() - .alias(f"avg_else_{i}"), - ] + thresholds = pl.LazyFrame({"bucket": [1, 2, 3, 4, 5], "threshold": list(rc)}) + + # Single scan: the 5 ss_quantity ranges are non-overlapping, so a group_by + # computes all counts and averages in one pass over store_sales. + stats = ( + store_sales.with_columns( + pl.when(pl.col("ss_quantity").is_between(1, 20)) + .then(pl.lit(1)) + .when(pl.col("ss_quantity").is_between(21, 40)) + .then(pl.lit(2)) + .when(pl.col("ss_quantity").is_between(41, 60)) + .then(pl.lit(3)) + .when(pl.col("ss_quantity").is_between(61, 80)) + .then(pl.lit(4)) + .when(pl.col("ss_quantity").is_between(81, 100)) + .then(pl.lit(5)) + .alias("bucket") ) - - combined_stats = store_sales.select(bucket_expressions) - - # Select appropriate value per bucket based on count threshold - bucket_values = [] - for i, (_min_qty, _max_qty, threshold) in enumerate(buckets, 1): - bucket = ( - pl.when(pl.col(f"count_{i}") > threshold) - .then(pl.col(f"avg_then_{i}")) - .otherwise(pl.col(f"avg_else_{i}")) - .alias(f"bucket{i}") + .filter(pl.col("bucket").is_not_null()) + .group_by("bucket") + .agg( + pl.len().alias("count"), + pl.col(aggcthen).mean().alias("avg_then"), + pl.col(aggcelse).mean().alias("avg_else"), + ) + .join(thresholds, on="bucket") + .select( + pl.col("bucket"), + pl.when(pl.col("count") > pl.col("threshold")) + .then(pl.col("avg_then")) + .otherwise(pl.col("avg_else")) + .alias("value"), ) - bucket_values.append(bucket) + .sort("bucket") + ) + + # Pivot 5 rows → 1 row with 5 named columns (operates on 5 rows, trivially fast) + wide = stats.select( + pl.col("value").filter(pl.col("bucket") == i).first().alias(f"bucket{i}") + for i in range(1, 6) + ) - # Create result DataFrame with one row (using reason table as in SQL) return QueryResult( frame=( reason.filter(pl.col("r_reason_sk") == 1) - .join(combined_stats, how="cross") - .select(bucket_values) + .join(wide, how="cross") + .select([f"bucket{i}" for i in range(1, 6)]) .limit(1) ), sort_by=[], diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q98.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q98.py index a70bf71269b..24442425911 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q98.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds_queries/q98.py @@ -76,17 +76,34 @@ def polars_impl(run_config: RunConfig) -> QueryResult: end_date_py = start_date_py + timedelta(days=30) start_date = pl.date(start_date_py.year, start_date_py.month, start_date_py.day) end_date = pl.date(end_date_py.year, end_date_py.month, end_date_py.day) + + # Pre-filter item to matching categories before joining against store_sales [58 partitions]. + filtered_item = item.filter( + pl.col("i_category").is_in(params["categories"]) + ).select( + [ + "i_item_sk", + "i_item_id", + "i_item_desc", + "i_current_price", + "i_class", + "i_category", + ] + ) + # Pre-filter date_dim to the 30-day window; d_date not needed after filter — semi-join. + filtered_dates = date_dim.filter( + pl.col("d_date").is_between(start_date, end_date, closed="both") + ).select("d_date_sk") + return QueryResult( frame=( - store_sales.join( - item, left_on="ss_item_sk", right_on="i_item_sk", how="inner" - ) + store_sales.select(["ss_sold_date_sk", "ss_item_sk", "ss_ext_sales_price"]) + .join(filtered_item, left_on="ss_item_sk", right_on="i_item_sk") .join( - date_dim, left_on="ss_sold_date_sk", right_on="d_date_sk", how="inner" - ) - .filter( - pl.col("i_category").is_in(params["categories"]) - & pl.col("d_date").is_between(start_date, end_date, closed="both") + filtered_dates, + left_on="ss_sold_date_sk", + right_on="d_date_sk", + how="semi", ) .group_by( [