diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index 6a2315966bcdd5..b6a450c93a1b1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -523,6 +523,13 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP } } + // The tablet-based parallelism strategy only applies when the fragment has no exchange nodes + // (e.g. pure colocate scan). When exchange nodes are present (e.g. bucket shuffle join), + // fall back to the base class behavior to avoid over-parallelizing the join fragment. + if (!exchangeToChildJob.isEmpty()) { + return super.degreeOfParallelism(maxParallel, useLocalShuffleToAddParallel); + } + long tabletNum = 0; for (ScanNode scanNode : scanNodes) { if (scanNode instanceof OlapScanNode) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java new file mode 100644 index 00000000000000..db7f55fb2fd240 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.BitSet; + +public class UnassignedScanBucketOlapTableJobTest { + + @Test + public void testDegreeOfParallelismWithExchangeNodes() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(1, 1)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + connectContext.getSessionVariable().colocateMaxParallelNum = 128; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L); + + PlanFragment fragment = Mockito.mock(PlanFragment.class); + Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM); + Mockito.when(fragment.getParallelExecNum()).thenReturn(5); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + + // Non-empty exchangeToChildJob simulates bucket shuffle join fragment. + ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class); + UnassignedJob mockChild = Mockito.mock(UnassignedJob.class); + Mockito.when(mockChild.getAllChildrenTypes()).thenReturn(new BitSet()); + ArrayListMultimap exchangeToChildJob = ArrayListMultimap.create(); + exchangeToChildJob.put(exchangeNode, mockChild); + + UnassignedScanBucketOlapTableJob unassignedJob = new UnassignedScanBucketOlapTableJob( + statementContext, + fragment, + ImmutableList.of(olapScanNode), + exchangeToChildJob, + scanWorkerSelector + ); + + // maxParallel = 3 (buckets), parallelExecNum = 5 + // Base class: min(maxParallel, max(parallelExecNum, 1)) = min(3, 5) = 3 + int result = unassignedJob.degreeOfParallelism(3, false); + Assertions.assertEquals(3, result); + } + + @Test + public void testDegreeOfParallelismWithoutExchangeNodes() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + connectContext.setQueryId(new TUniqueId(2, 2)); + connectContext.getSessionVariable().parallelPipelineTaskNum = 1; + connectContext.getSessionVariable().colocateMaxParallelNum = 128; + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("select * from t", 0)); + connectContext.setStatementContext(statementContext); + + OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class); + Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L); + Mockito.when(olapScanNode.shouldUseOneInstance(Mockito.any())).thenReturn(false); + + PlanFragment fragment = Mockito.mock(PlanFragment.class); + Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM); + Mockito.when(fragment.getParallelExecNum()).thenReturn(5); + + ScanWorkerSelector scanWorkerSelector = Mockito.mock(ScanWorkerSelector.class); + + // Empty exchangeToChildJob simulates pure colocate scan (no exchange nodes). + ArrayListMultimap exchangeToChildJob = ArrayListMultimap.create(); + + UnassignedScanBucketOlapTableJob unassignedJob = new UnassignedScanBucketOlapTableJob( + statementContext, + fragment, + ImmutableList.of(olapScanNode), + exchangeToChildJob, + scanWorkerSelector + ); + + // Tablet strategy: min(max(tabletNum=100, parallelExecNum=5), colocateMaxParallelNum=128) = 100 + int result = unassignedJob.degreeOfParallelism(3, false); + Assertions.assertEquals(100, result); + } +}