diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 8951a39bc4a09..e29216edc639e 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -4604,6 +4604,8 @@ export class BaseQuery { join_types: { inner: 'INNER', left: 'LEFT', + right: 'RIGHT', + full: 'FULL', }, window_frame_types: { rows: 'ROWS', diff --git a/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts b/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts index f79710722915a..9cb0d58968b0a 100644 --- a/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts @@ -358,6 +358,11 @@ export class CubeStoreQuery extends BaseQuery { templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; templates.expressions.wrap_segment_select = 'IF({{ expr }}, 1, 0)'; templates.expressions.wrap_segment_filter = '{{ expr }} = 1'; + // CubeStore has no native FULL OUTER JOIN (it is emulated via LEFT JOIN chains), and its + // distributed join executor assumes the left-most table is the split root, so RIGHT/FULL + // across partitioned tables is unsafe. Don't push those join types down to CubeStore. + delete templates.join_types.full; + delete templates.join_types.right; return templates; } } diff --git a/packages/cubejs-schema-compiler/src/adapter/MysqlQuery.ts b/packages/cubejs-schema-compiler/src/adapter/MysqlQuery.ts index 04274ae4fffb1..246881435ada5 100644 --- a/packages/cubejs-schema-compiler/src/adapter/MysqlQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/MysqlQuery.ts @@ -195,6 +195,8 @@ export class MysqlQuery extends BaseQuery { templates.types.timestamp = 'DATETIME'; delete templates.types.interval; templates.types.binary = 'BLOB'; + // MySQL has no FULL OUTER JOIN + delete templates.join_types.full; templates.expressions.concat_strings = 'CONCAT({{ strings | join(\',\' ) }})'; diff --git a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts index 4267eaa6888b8..1b09ebce27e3f 100644 --- a/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts @@ -119,7 +119,6 @@ export class SnowflakeQuery extends BaseQuery { templates.expressions.like = '{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}{% if default_escape %} ESCAPE \'\\\\\'{% endif %}'; templates.expressions.ilike = '{{ expr }} {% if negated %}NOT {% endif %}ILIKE {{ pattern }}{% if default_escape %} ESCAPE \'\\\\\'{% endif %}'; templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM'; - templates.join_types.full = 'FULL'; delete templates.types.interval; return templates; } diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index 37dd75f5d5a68..2b1d20cfe5cbd 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -3459,6 +3459,7 @@ impl WrappedSelectNode { } match join_type { + // Right/Full are only generated on the non-push-to-Cube path JoinType::Inner | JoinType::Left => { // Do nothing } @@ -3596,6 +3597,7 @@ impl WrappedSelectNode { }; let join_type = match join_type { + // Right/Full are only generated on the non-push-to-Cube path JoinType::Left => generator.get_sql_templates().left_join()?, JoinType::Inner => generator.get_sql_templates().inner_join()?, _ => { @@ -3868,6 +3870,8 @@ impl WrappedSelectNode { let join_type_sql = match join_type { JoinType::Left => generator.get_sql_templates().left_join()?, JoinType::Inner => generator.get_sql_templates().inner_join()?, + JoinType::Right => generator.get_sql_templates().right_join()?, + JoinType::Full => generator.get_sql_templates().full_join()?, _ => { return Err(CubeError::internal(format!( "Unsupported join type for join subquery: {join_type:?}" diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs index 71bdbda8ac473..b1d7acc547906 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs @@ -13,6 +13,7 @@ use crate::{ WrappedSelectJoinJoinType, WrappedSelectPushToCube, WrapperReplacerContextAliasToCube, WrapperReplacerContextGroupedSubqueries, }, + transport::MetaContext, var, var_iter, var_list_iter, }; @@ -21,7 +22,7 @@ use datafusion::{ logical_plan::Column, prelude::JoinType, }; -use egg::{Id, Subst}; +use egg::{Id, Subst, Var}; use itertools::Itertools; impl WrapperRules { @@ -263,6 +264,7 @@ impl WrapperRules { "?left_on", "?right_on", "?in_join_type", + "?input_data_source", "?out_join_expr", "?out_join_type", "?out_grouped_subqueries", @@ -481,6 +483,7 @@ impl WrapperRules { "?left_push_to_cube", "?right_on", "?in_join_type", + "?input_data_source", "?out_join_expr", "?out_join_type", "?out_grouped_subqueries", @@ -982,12 +985,42 @@ impl WrapperRules { result_expr } + /// Whether a join subquery with `join_type` can be pushed down to `data_source_var`. + /// + /// Inner/Left are always supported. Right/Full are only supported on the non-push-to-Cube + /// path (`push_to_cube == false`), i.e. when both sides become standalone subqueries joined + /// together — there the outer-join semantics map directly to SQL. On the push-to-Cube path + /// the join is folded inside the Cube query alongside its grouping/measures, where NULL-extended + /// outer rows are not validated, so Right/Full are refused there. + /// Other join types (semi/anti) are never supported as join subqueries. + fn is_subquery_join_type_supported( + egraph: &CubeEGraph, + subst: &mut Subst, + meta: &MetaContext, + data_source_var: Var, + join_type: &JoinType, + push_to_cube: bool, + ) -> bool { + let template = match join_type { + JoinType::Inner => "join_types/inner", + JoinType::Left => "join_types/left", + JoinType::Right if !push_to_cube => "join_types/right", + JoinType::Full if !push_to_cube => "join_types/full", + _ => return false, + }; + let Ok(data_source) = Self::get_data_source(egraph, subst, data_source_var) else { + return false; + }; + Self::can_rewrite_template(&data_source, meta, template) + } + fn transform_ungrouped_join_grouped( &self, left_members_var: &'static str, left_on_var: &'static str, right_on_var: &'static str, in_join_type_var: &'static str, + input_data_source_var: &'static str, out_join_expr_var: &'static str, out_join_type_var: &'static str, out_grouped_subqueries_var: &'static str, @@ -998,11 +1031,14 @@ impl WrapperRules { let right_on_var = var!(right_on_var); let in_join_type_var = var!(in_join_type_var); + let input_data_source_var = var!(input_data_source_var); let out_join_expr_var = var!(out_join_expr_var); let out_join_type_var = var!(out_join_type_var); let out_grouped_subqueries_var = var!(out_grouped_subqueries_var); + let meta = self.meta_context.clone(); + // Only left is allowed to be ungrouped query, so right would be a subquery join for left ungrouped CubeScan // It means we don't care about just a "single cube" in LHS, and there's essentially no cubes by this moment in RHS @@ -1020,6 +1056,19 @@ impl WrapperRules { for in_join_type in var_list_iter!(egraph[subst[in_join_type_var]], JoinJoinType).cloned() { + // Left is an ungrouped CubeScan pushed to Cube, so this is always the + // push-to-Cube path: Right/Full are not supported here. + if !Self::is_subquery_join_type_supported( + egraph, + subst, + &meta, + input_data_source_var, + &in_join_type.0, + true, + ) { + return false; + } + if !Self::are_join_members_supported( egraph, subst[left_members_var], @@ -1217,6 +1266,7 @@ impl WrapperRules { left_push_to_cube_var: &'static str, right_on_var: &'static str, in_join_type_var: &'static str, + input_data_source_var: &'static str, out_join_expr_var: &'static str, out_join_type_var: &'static str, out_grouped_subqueries_var: &'static str, @@ -1228,12 +1278,15 @@ impl WrapperRules { let right_on_var = var!(right_on_var); let in_join_type_var = var!(in_join_type_var); + let input_data_source_var = var!(input_data_source_var); let out_join_expr_var = var!(out_join_expr_var); let out_join_type_var = var!(out_join_type_var); let out_grouped_subqueries_var = var!(out_grouped_subqueries_var); let out_push_to_cube_var = var!(out_push_to_cube_var); + let meta = self.meta_context.clone(); + move |egraph, subst| { // We are going to generate join with grouped subquery // TODO Do we have to check stuff like `transform_check_subquery_allowed` is checking: @@ -1254,6 +1307,20 @@ impl WrapperRules { ) .cloned() { + // Right/Full are only supported on the non-push-to-Cube variant. + // `continue` rather than `return false` so the non-push variant of + // this eclass still gets a chance to match. + if !Self::is_subquery_join_type_supported( + egraph, + subst, + &meta, + input_data_source_var, + &in_join_type.0, + left_push_to_cube.0, + ) { + continue; + } + // TODO what's a proper way to find table expression alias? let Some(right_join_alias) = right_join_on .iter() diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 20e63e584b5f8..7026910699116 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -754,6 +754,8 @@ OFFSET {{ offset }}{% endif %}"#.to_string(), ("expressions/between".to_string(), "{{ expr }} {% if negated %}NOT {% endif %}BETWEEN {{ low }} AND {{ high }}".to_string()), ("join_types/inner".to_string(), "INNER".to_string()), ("join_types/left".to_string(), "LEFT".to_string()), + ("join_types/right".to_string(), "RIGHT".to_string()), + ("join_types/full".to_string(), "FULL".to_string()), ("quotes/identifiers".to_string(), "\"".to_string()), ("quotes/escape".to_string(), "\"\"".to_string()), ("params/param".to_string(), "${{ param_index + 1 }}".to_string()), @@ -778,6 +780,11 @@ OFFSET {{ offset }}{% endif %}"#.to_string(), ("types/binary".to_string(), "BINARY".to_string()), ] .into_iter().chain(custom_templates) + .collect::>() + .into_iter() + // Custom template with an empty value removes the base template, + // allowing tests to check behavior of data sources without it + .filter(|(_, value)| !value.is_empty()) .collect(), false, ) diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index 23e02b0daad29..d029973ecb4f0 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1,5 +1,9 @@ use cubeclient::models::{V1LoadRequestQuery, V1LoadRequestQueryTimeDimension}; -use datafusion::{physical_plan::displayable, scalar::ScalarValue}; +use datafusion::{ + logical_plan::{JoinType, LogicalPlan, PlanVisitor}, + physical_plan::displayable, + scalar::ScalarValue, +}; use pretty_assertions::assert_eq; use regex::Regex; use serde_json::json; @@ -17,6 +21,7 @@ use crate::{ }, config::ConfigObjImpl, transport::TransportLoadRequestQuery, + CubeError, }; #[tokio::test] @@ -1193,6 +1198,175 @@ WHERE assert!(literal_re.is_match(&sql)); } +/// Regression test for grouped-grouped joins with non-Inner/Left join types under SQL +/// push down. Both sides are grouped subqueries joined with FULL OUTER JOIN (the shape +/// produced by period-over-period queries). Before `join_types/full` was wired up, this +/// failed with `Unsupported join type for join subquery: Full`. +#[tokio::test] +async fn test_grouped_join_wrapper_full_outer() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + WITH first_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS first_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ), + last_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS last_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ) + SELECT + COALESCE(f.customer_gender, l.customer_gender) AS customer_gender, + COALESCE(f.first_price, 0) AS first_price, + COALESCE(l.last_price, 0) AS last_price + FROM first_period f + FULL OUTER JOIN last_period l ON f.customer_gender = l.customer_gender + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + // Whole query must be pushed down to a single wrapped SQL with a FULL JOIN + let _physical_plan = query_plan.as_physical_plan().await.unwrap(); + + let logical_plan = query_plan.as_logical_plan(); + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + assert!( + sql.contains("FULL JOIN"), + "wrapped SQL is missing FULL JOIN:\n{}", + sql + ); +} + +/// Same as [`test_grouped_join_wrapper_full_outer`], but for RIGHT JOIN. +#[tokio::test] +async fn test_grouped_join_wrapper_right() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" + WITH first_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS first_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ), + last_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS last_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ) + SELECT + COALESCE(f.customer_gender, l.customer_gender) AS customer_gender, + COALESCE(f.first_price, 0) AS first_price, + COALESCE(l.last_price, 0) AS last_price + FROM first_period f + RIGHT JOIN last_period l ON f.customer_gender = l.customer_gender + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + // Whole query must be pushed down to a single wrapped SQL with a RIGHT JOIN + let _physical_plan = query_plan.as_physical_plan().await.unwrap(); + + let logical_plan = query_plan.as_logical_plan(); + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + assert!( + sql.contains("RIGHT JOIN"), + "wrapped SQL is missing RIGHT JOIN:\n{}", + sql + ); +} + +/// When the data source has no `join_types/full` template, a grouped-grouped FULL JOIN +/// must not be pushed down: the join stays in the DataFusion plan and the query is still +/// plannable instead of failing with `Unsupported join type for join subquery`. +#[tokio::test] +async fn test_grouped_join_wrapper_full_outer_without_template() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan_customized( + // language=PostgreSQL + r#" + WITH first_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS first_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ), + last_period AS ( + SELECT + customer_gender, + sum(sumPrice) AS last_price + FROM KibanaSampleDataEcommerce + GROUP BY 1 + ) + SELECT + COALESCE(f.customer_gender, l.customer_gender) AS customer_gender, + COALESCE(f.first_price, 0) AS first_price, + COALESCE(l.last_price, 0) AS last_price + FROM first_period f + FULL OUTER JOIN last_period l ON f.customer_gender = l.customer_gender + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + // Emulate a data source without FULL JOIN support: empty value removes the template + vec![("join_types/full".to_string(), "".to_string())], + ) + .await; + + // Query must still be plannable, with the join executed by DataFusion + let _physical_plan = query_plan.as_physical_plan().await.unwrap(); + + struct FindJoinVisitor(Option); + + impl PlanVisitor for FindJoinVisitor { + type Error = CubeError; + + fn pre_visit(&mut self, plan: &LogicalPlan) -> Result { + if let LogicalPlan::Join(join) = plan { + self.0 = Some(join.join_type); + } + Ok(true) + } + } + + let logical_plan = query_plan.as_logical_plan(); + let mut visitor = FindJoinVisitor(None); + logical_plan.accept(&mut visitor).unwrap(); + assert_eq!( + visitor.0, + Some(JoinType::Full), + "expected FULL JOIN to stay in the DataFusion plan, got:\n{}", + logical_plan.display_indent(), + ); +} + /// Regression test for smoke test "select __user and literal grouped under wrapper". /// Inner CTE has unaliased DATE_TRUNC expressions; outer query references those columns /// through the SubqueryAlias qualifier (`cube_scan_subq`). The CTE-level Remapper must diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index 23ef71ce3e7b2..b2b4b75bf6246 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -995,6 +995,14 @@ impl SqlTemplates { self.render_template("join_types/inner", context! {}) } + pub fn full_join(&self) -> Result { + self.render_template("join_types/full", context! {}) + } + + pub fn right_join(&self) -> Result { + self.render_template("join_types/right", context! {}) + } + pub fn query_aliased(&self, query: &str, alias: &str) -> Result { let bracketed_query = format!("({})", query); let quoted_alias = self.quote_identifier(alias)?;