diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2b8ea62f9e0b28..2183416026cce4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -922,7 +922,15 @@ public List getSchemaByIndexId(Long indexId) { } public List getSchemaByIndexId(Long indexId, boolean full) { - List fullSchema = indexIdToMeta.get(indexId).getSchema(); + MaterializedIndexMeta meta = indexIdToMeta.get(indexId); + if (meta == null) { + throw new RuntimeException(String.format( + "No schema found for index id %d in table %s(%d). This may be caused by a" + + " concurrent DDL operation (e.g., schema change). Available index ids: %s." + + " Please retry the query.", + indexId, name, id, indexIdToMeta.keySet())); + } + List fullSchema = meta.getSchema(); if (full) { return fullSchema; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 351b7f717177be..54e974e95daa97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -152,6 +152,7 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions if (plan instanceof PhysicalPlan) { physicalPlan = (PhysicalPlan) plan; distribute(physicalPlan, explainLevel); + cacheThriftPlans(explainLevel); } }); } finally { @@ -556,6 +557,21 @@ private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) { } } + /** + * Pre-serialize (cache) the TPlan for each fragment while still holding the table read lock. + * This prevents NPE in OlapTable.getSchemaByIndexId caused by concurrent schema changes + * that may remove index metadata after the table lock is released. + */ + private void cacheThriftPlans(ExplainLevel explainLevel) { + if (explainLevel != ExplainLevel.NONE && explainLevel.isPlanLevel + && (explainLevel != ExplainLevel.ALL_PLAN && explainLevel != ExplainLevel.DISTRIBUTED_PLAN)) { + return; + } + for (PlanFragment fragment : fragments) { + fragment.cacheThriftPlan(); + } + } + protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) { return new PlanPostProcessors(cascadesContext).process(physicalPlan); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index d6a05f9711b5cb..ff416df23f151f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -33,6 +33,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; +import org.apache.doris.thrift.TPlan; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TQueryCacheParam; import org.apache.doris.thrift.TResultSinkType; @@ -165,6 +166,10 @@ public class PlanFragment extends TreeNode { public TQueryCacheParam queryCacheParam; private int numBackends = 0; private boolean forceSingleInstance = false; + // Cache the serialized TPlan (built under table lock) to avoid NPE from concurrent schema changes. + // See: OlapTable.getSchemaByIndexId is called during treeToThrift(); by caching while holding + // the table read lock, we ensure schema meta is still present when the plan is serialized. + private Supplier thriftPlanCache; /** * C'tor for fragment with specific partition; the output is by default broadcast. @@ -178,6 +183,7 @@ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { this.builderRuntimeFilterIds = new HashSet<>(); this.targetRuntimeFilterIds = new HashSet<>(); this.hasBucketShuffleJoin = buildHasBucketShuffleJoin(); + this.thriftPlanCache = buildTPlanCache(); setParallelExecNumIfExists(); setFragmentInPlanTree(planRoot); } @@ -206,6 +212,22 @@ private Supplier buildHasBucketShuffleJoin() { }); } + private Supplier buildTPlanCache() { + return Suppliers.memoize(planRoot::treeToThrift); + } + + /** + * Pre-serialize the plan tree to Thrift while still holding the table read lock. + * This prevents NPE in OlapTable.getSchemaByIndexId due to concurrent schema changes + * that may remove index metadata after the lock is released. + */ + public TPlan cacheThriftPlan() { + if (thriftPlanCache == null) { + thriftPlanCache = buildTPlanCache(); + } + return thriftPlanCache.get(); + } + /** * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. * Does not traverse the children of ExchangeNodes because those must belong to a @@ -328,9 +350,10 @@ public int getParallelExecNum() { } public TPlanFragment toThrift() { + cacheThriftPlan(); TPlanFragment result = new TPlanFragment(); if (planRoot != null) { - result.setPlan(planRoot.treeToThrift()); + result.setPlan(thriftPlanCache.get()); } if (outputExprs != null) { result.setOutputExprs(Expr.treesToThrift(outputExprs));