Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,15 @@ public List<Column> getSchemaByIndexId(Long indexId) {
}

public List<Column> getSchemaByIndexId(Long indexId, boolean full) {
List<Column> fullSchema = indexIdToMeta.get(indexId).getSchema();
MaterializedIndexMeta meta = indexIdToMeta.get(indexId);
if (meta == null) {
throw new RuntimeException(String.format(
"No schema found for index id %d in table %s(%d). This may be caused by a"
+ " concurrent DDL operation (e.g., schema change). Available index ids: %s."
+ " Please retry the query.",
indexId, name, id, indexIdToMeta.keySet()));
}
List<Column> fullSchema = meta.getSchema();
if (full) {
return fullSchema;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
if (plan instanceof PhysicalPlan) {
physicalPlan = (PhysicalPlan) plan;
distribute(physicalPlan, explainLevel);
cacheThriftPlans(explainLevel);
}
});
} finally {
Expand Down Expand Up @@ -556,6 +557,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) {
}
}

/**
* Pre-serialize (cache) the TPlan for each fragment while still holding the table read lock.
* This prevents NPE in OlapTable.getSchemaByIndexId caused by concurrent schema changes
* that may remove index metadata after the table lock is released.
*/
private void cacheThriftPlans(ExplainLevel explainLevel) {
if (explainLevel != ExplainLevel.NONE && explainLevel.isPlanLevel
&& (explainLevel != ExplainLevel.ALL_PLAN && explainLevel != ExplainLevel.DISTRIBUTED_PLAN)) {
return;
}
for (PlanFragment fragment : fragments) {
fragment.cacheThriftPlan();
}
}

protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlan;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TQueryCacheParam;
import org.apache.doris.thrift.TResultSinkType;
Expand Down Expand Up @@ -165,6 +166,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public TQueryCacheParam queryCacheParam;
private int numBackends = 0;
private boolean forceSingleInstance = false;
// Cache the serialized TPlan (built under table lock) to avoid NPE from concurrent schema changes.
// See: OlapTable.getSchemaByIndexId is called during treeToThrift(); by caching while holding
// the table read lock, we ensure schema meta is still present when the plan is serialized.
private Supplier<TPlan> thriftPlanCache;

/**
* C'tor for fragment with specific partition; the output is by default broadcast.
Expand All @@ -178,6 +183,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
this.builderRuntimeFilterIds = new HashSet<>();
this.targetRuntimeFilterIds = new HashSet<>();
this.hasBucketShuffleJoin = buildHasBucketShuffleJoin();
this.thriftPlanCache = buildTPlanCache();
setParallelExecNumIfExists();
setFragmentInPlanTree(planRoot);
}
Expand Down Expand Up @@ -206,6 +212,22 @@ private Supplier<Boolean> buildHasBucketShuffleJoin() {
});
}

private Supplier<TPlan> buildTPlanCache() {
return Suppliers.memoize(planRoot::treeToThrift);
}

/**
* Pre-serialize the plan tree to Thrift while still holding the table read lock.
* This prevents NPE in OlapTable.getSchemaByIndexId due to concurrent schema changes
* that may remove index metadata after the lock is released.
*/
public TPlan cacheThriftPlan() {
if (thriftPlanCache == null) {
thriftPlanCache = buildTPlanCache();
}
return thriftPlanCache.get();
}

/**
* Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node.
* Does not traverse the children of ExchangeNodes because those must belong to a
Expand Down Expand Up @@ -328,9 +350,10 @@ public int getParallelExecNum() {
}

public TPlanFragment toThrift() {
cacheThriftPlan();
TPlanFragment result = new TPlanFragment();
if (planRoot != null) {
result.setPlan(planRoot.treeToThrift());
result.setPlan(thriftPlanCache.get());
}
if (outputExprs != null) {
result.setOutputExprs(Expr.treesToThrift(outputExprs));
Expand Down
Loading