Skip to content

Commit

Permalink
[UT] [BugFix] Fix mv unstable test cases (#55998)
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
(cherry picked from commit 107727b)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/sql/optimizer/MvPlanContextBuilder.java
#	fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MvTimeSeriesRewriteWithOlapTest.java
  • Loading branch information
LiShuMing authored and mergify[bot] committed Feb 20, 2025
1 parent 322d89d commit c78a90c
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvId;
Expand Down Expand Up @@ -466,6 +465,31 @@ private void updateTaskDefinition(MaterializedView materializedView) {
}
}

/**
* Inactive the materialized view and its related materialized views.
*/
private static void doInactiveMaterializedView(MaterializedView mv, String reason) {
if (mv == null) {
return;
}
LOG.warn("Inactive MV {}/{} because {}", mv.getName(), mv.getId(), reason);
// inactive mv by reason
if (mv.isActive()) {
// log edit log
String status = AlterMaterializedViewStatusClause.INACTIVE;
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(mv, status, reason, false);
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(mv.getDbId(),
mv.getId(), status, MANUAL_INACTIVE_MV_REASON);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
} else {
mv.setInactiveAndReason(reason);
}
// recursive inactive
inactiveRelatedMaterializedView(mv,
MaterializedViewExceptions.inactiveReasonForBaseTableActive(mv.getName()), false);
}

/**
* Inactive related materialized views because of base table/view is changed or dropped in the leader background.
*/
Expand All @@ -486,32 +510,15 @@ public static void inactiveRelatedMaterializedView(Table olapTable, String reaso
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(mvId.getDbId());
if (db == null) {
LOG.warn("Table {} inactive MaterializedView, viewId {} ,db {} not found",
olapTable.getName(),
mvId.getId(),
mvId.getDbId());
olapTable.getName(), mvId.getId(), mvId.getDbId());
continue;
}
MaterializedView mv = (MaterializedView) db.getTable(mvId.getId());
if (mv != null) {
LOG.warn("Inactive MV {}/{} because {}", mv.getName(), mv.getId(), reason);
// inactive mv by reason
if (mv.isActive()) {
// log edit log
String status = AlterMaterializedViewStatusClause.INACTIVE;
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(mv, status, reason, false);
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(mv.getDbId(),
mv.getId(), status, MANUAL_INACTIVE_MV_REASON);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
} else {
mv.setInactiveAndReason(reason);
}
// recursive inactive
inactiveRelatedMaterializedView(mv,
MaterializedViewExceptions.inactiveReasonForBaseTableActive(mv.getName()), false);
} else {
if (mv == null) {
LOG.info("Ignore materialized view {} does not exists", mvId);
continue;
}
doInactiveMaterializedView(mv, reason);
}
}

Expand Down Expand Up @@ -541,8 +548,9 @@ public static void inactiveRelatedMaterializedViews(Database db,

}
// TODO: support more types for base table's schema change.
String reason = MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns);
try {
List<MvPlanContext> mvPlanContexts = MvPlanContextBuilder.getPlanContext(mv);
List<MvPlanContext> mvPlanContexts = MvPlanContextBuilder.getPlanContext(mv, true);
for (MvPlanContext mvPlanContext : mvPlanContexts) {
if (mvPlanContext != null) {
OptExpression mvPlan = mvPlanContext.getLogicalPlan();
Expand All @@ -556,14 +564,8 @@ public static void inactiveRelatedMaterializedViews(Database db,
Set<String> usedColNames = usedColRefs.stream()
.map(x -> x.getName())
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
for (String modifiedColumn : modifiedColumns) {
if (usedColNames.contains(modifiedColumn)) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the column {} of the table {} was modified.", mv.getName(), mv.getId(),
modifiedColumn, olapTable.getName());
mv.setInactiveAndReason(
MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
}
if (modifiedColumns.stream().anyMatch(usedColNames::contains)) {
doInactiveMaterializedView(mv, reason);
}
}
}
Expand All @@ -572,19 +574,12 @@ public static void inactiveRelatedMaterializedViews(Database db,
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the columns of the table {} was modified.", mv.getName(), mv.getId(),
olapTable.getName());
mv.setInactiveAndReason(MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
doInactiveMaterializedView(mv, reason);
} catch (Exception e) {
LOG.warn("Get related materialized view {} failed:", mv.getName(), e);
// basic check: may lose some situations
for (Column mvColumn : mv.getColumns()) {
if (modifiedColumns.contains(mvColumn.getName())) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the column {} of the table {} was modified.", mv.getName(), mv.getId(),
mvColumn.getName(), olapTable.getName());
mv.setInactiveAndReason(
MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
break;
}
if (mv.getColumns().stream().anyMatch(x -> modifiedColumns.contains(x.getName()))) {
doInactiveMaterializedView(mv, reason);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public List<MvPlanContext> getPlanContextIfPresent(SessionVariable sessionVariab
*/
private static List<MvPlanContext> loadMvPlanContext(MaterializedView mv) {
try {
return MvPlanContextBuilder.getPlanContext(mv);
return MvPlanContextBuilder.getPlanContext(mv, false);
} catch (Throwable e) {
LOG.warn("load mv plan cache failed: {}", mv.getName(), e);
return Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,25 @@
import com.starrocks.catalog.MvPlanContext;
import com.starrocks.catalog.Table.TableType;
import com.starrocks.qe.ConnectContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class MvPlanContextBuilder {
public static List<MvPlanContext> getPlanContext(MaterializedView mv) {
private static final Logger LOG = LogManager.getLogger(MvPlanContextBuilder.class);

/**
* Get plan context for the given materialized view.
* @param mv
* @param isThrowException: whether to throw exception when failed to build plan context:
* - when in altering table, we want to throw exception to fail the alter table operation
* - when in generating mv plan, we want to ignore the exception and continue the query
*/
public static List<MvPlanContext> getPlanContext(MaterializedView mv,
boolean isThrowException) {
// build mv query logical plan
MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer();

Expand All @@ -33,15 +47,35 @@ public static List<MvPlanContext> getPlanContext(MaterializedView mv) {

List<MvPlanContext> results = Lists.newArrayList();
try (var guard = connectContext.bindScope()) {
MvPlanContext contextWithoutView = mvOptimizer.optimize(mv, connectContext);
results.add(contextWithoutView);
Optional.ofNullable(doGetOptimizePlan(() -> mvOptimizer.optimize(mv, connectContext), isThrowException))
.map(results::add);

// TODO: Only add context with view when view rewrite is set on.
if (mv.getBaseTableTypes().stream().anyMatch(type -> type == TableType.VIEW)) {
<<<<<<< HEAD
MvPlanContext contextWithView = mvOptimizer.optimize(mv, connectContext, false);
results.add(contextWithView);
=======
Optional.ofNullable(doGetOptimizePlan(() -> mvOptimizer.optimize(mv, connectContext, false, true),
isThrowException))
.map(results::add);
>>>>>>> 107727bb68 ([UT] [BugFix] Fix mv unstable test cases (#55998))
}
}
return results;
}

private static MvPlanContext doGetOptimizePlan(Supplier<MvPlanContext> supplier,
boolean isThrowException) {
try {
return supplier.get();
} catch (Exception e) {
// ignore
LOG.warn("Failed to build mv plan context", e);
if (isThrowException) {
throw e;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ private OptExpression getMvDefinedQueryPlan(OptExpression mvPlan,
MaterializationContext mvContext,
List<ColumnRefOperator> originalOutputColumns) {
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(mvContext);
OptExpression newMvQueryPlan = duplicator.duplicate(mvPlan, true, true);
OptExpression newMvQueryPlan = duplicator.duplicate(mvPlan, mvContext.getMvColumnRefFactory(),
true, true);

List<ColumnRefOperator> orgMvQueryOutputColumnRefs = mvContext.getMvOutputColumnRefs();
List<ColumnRefOperator> newQueryOutputColumns = duplicator.getMappedColumns(orgMvQueryOutputColumnRefs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private static Pair<OptExpression, List<ColumnRefOperator>> getMVCompensationPla
return null;
}
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(mvContext);
OptExpression newMvQueryPlan = duplicator.duplicate(compensateMvQueryPlan);
OptExpression newMvQueryPlan = duplicator.duplicate(compensateMvQueryPlan, mvContext.getMvColumnRefFactory());
if (newMvQueryPlan == null) {
logMVRewrite(mvContext, "Duplicate compensate query plan failed");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,32 @@ public OptExpressionDuplicator(ColumnRefFactory columnRefFactory, OptimizerConte
}

public OptExpression duplicate(OptExpression source) {
return duplicate(source, false, false);
return duplicate(source, columnRefFactory, false, false);
}

public OptExpression duplicate(OptExpression source,
boolean isResetSelectedPartitions) {
return duplicate(source, isResetSelectedPartitions, false);
return duplicate(source, columnRefFactory, isResetSelectedPartitions, false);
}

public OptExpression duplicate(OptExpression source, ColumnRefFactory prevColumnRefFactory) {
return duplicate(source, prevColumnRefFactory, false, false);
}

/**
* Duplicate the OptExpression tree.
* @param source: source OptExpression
* @param prevColumnRefFactory: column ref factory where source OptExpression is from
* @param isResetSelectedPartitions: whether to reset selected partitions
* @param isRefreshExternalTable: whether to refresh external table
* @return
*/
public OptExpression duplicate(OptExpression source,
ColumnRefFactory prevColumnRefFactory,
boolean isResetSelectedPartitions,
boolean isRefreshExternalTable) {
OptExpressionDuplicatorVisitor visitor = new OptExpressionDuplicatorVisitor(optimizerContext, columnMapping, rewriter,
isResetSelectedPartitions, isRefreshExternalTable);
prevColumnRefFactory, isResetSelectedPartitions, isRefreshExternalTable);
return source.getOp().accept(visitor, source, null);
}

Expand Down Expand Up @@ -142,15 +155,18 @@ class OptExpressionDuplicatorVisitor extends OptExpressionVisitor<OptExpression,
private final ReplaceColumnRefRewriter rewriter;
private final boolean isResetSelectedPartitions;
private final boolean isRefreshExternalTable;
private final ColumnRefFactory prevColumnRefFactory;

OptExpressionDuplicatorVisitor(OptimizerContext optimizerContext,
Map<ColumnRefOperator, ColumnRefOperator> columnMapping,
ReplaceColumnRefRewriter rewriter,
ColumnRefFactory prevColumnRefFactory,
boolean isResetSelectedPartitions,
boolean isRefreshExternalTable) {
this.optimizerContext = optimizerContext;
this.columnMapping = columnMapping;
this.rewriter = rewriter;
this.prevColumnRefFactory = prevColumnRefFactory;
this.isResetSelectedPartitions = isResetSelectedPartitions;
this.isRefreshExternalTable = isRefreshExternalTable;
}
Expand Down Expand Up @@ -206,8 +222,7 @@ public OptExpression visitLogicalTableScan(OptExpression optExpression, Void con
LogicalOlapScanOperator.Builder olapScanBuilder = (LogicalOlapScanOperator.Builder) scanBuilder;
if (olapScan.getDistributionSpec() instanceof HashDistributionSpec) {
HashDistributionSpec newHashDistributionSpec =
processHashDistributionSpec((HashDistributionSpec) olapScan.getDistributionSpec(),
columnRefFactory, columnMapping);
processHashDistributionSpec((HashDistributionSpec) olapScan.getDistributionSpec());
olapScanBuilder.setDistributionSpec(newHashDistributionSpec);
}

Expand Down Expand Up @@ -650,51 +665,50 @@ public OptExpression visit(OptExpression optExpression, Void context) {

// rewrite shuffle columns and joinEquivalentColumns in HashDistributionSpec
// because the columns ids have changed
private HashDistributionSpec processHashDistributionSpec(
HashDistributionSpec originSpec,
ColumnRefFactory columnRefFactory,
Map<ColumnRefOperator, ColumnRefOperator> columnMapping) {

private HashDistributionSpec processHashDistributionSpec(HashDistributionSpec originSpec) {
// HashDistributionDesc
List<DistributionCol> newColumns = Lists.newArrayList();
for (DistributionCol column : originSpec.getShuffleColumns()) {
ColumnRefOperator oldRefOperator = columnRefFactory.getColumnRef(column.getColId());
ColumnRefOperator newRefOperator = columnMapping.get(oldRefOperator);
final List<DistributionCol> newColumns = Lists.newArrayList();
for (DistributionCol distributionCol : originSpec.getShuffleColumns()) {
final ColumnRefOperator newRefOperator = getNewDistributionColRef(distributionCol);
Preconditions.checkNotNull(newRefOperator);
newColumns.add(new DistributionCol(newRefOperator.getId(), column.isNullStrict()));
newColumns.add(new DistributionCol(newRefOperator.getId(), distributionCol.isNullStrict()));
}
Preconditions.checkState(newColumns.size() == originSpec.getShuffleColumns().size());
HashDistributionDesc hashDistributionDesc =
final HashDistributionDesc hashDistributionDesc =
new HashDistributionDesc(newColumns, originSpec.getHashDistributionDesc().getSourceType());

EquivalentDescriptor equivDesc = originSpec.getEquivDesc();

EquivalentDescriptor newEquivDesc = new EquivalentDescriptor(equivDesc.getTableId(), equivDesc.getPartitionIds());
final EquivalentDescriptor equivDesc = originSpec.getEquivDesc();
final EquivalentDescriptor newEquivDesc = new EquivalentDescriptor(equivDesc.getTableId(),
equivDesc.getPartitionIds());
updateDistributionUnionFind(newEquivDesc.getNullRelaxUnionFind(), equivDesc.getNullStrictUnionFind());
updateDistributionUnionFind(newEquivDesc.getNullStrictUnionFind(), equivDesc.getNullRelaxUnionFind());

return new HashDistributionSpec(hashDistributionDesc, newEquivDesc);
}

private void updateDistributionUnionFind(UnionFind<DistributionCol> newUnionFind,
UnionFind<DistributionCol> oldUnionFind) {

UnionFind<DistributionCol> oldUnionFind) {
for (Set<DistributionCol> distributionColSet : oldUnionFind.getAllGroups()) {
DistributionCol first = null;
for (DistributionCol next : distributionColSet) {
if (first == null) {
first = next;
}
ColumnRefOperator firstCol = columnRefFactory.getColumnRef(first.getColId());
ColumnRefOperator newFirstCol = columnMapping.get(firstCol).cast();

ColumnRefOperator nextCol = columnRefFactory.getColumnRef(next.getColId());
ColumnRefOperator newNextCol = columnMapping.get(nextCol).cast();
final ColumnRefOperator newFirstCol = getNewDistributionColRef(first);
final ColumnRefOperator newNextCol = getNewDistributionColRef(next);
newUnionFind.union(first.updateColId(newFirstCol.getId()), next.updateColId(newNextCol.getId()));
}
}
}

private ColumnRefOperator getNewDistributionColRef(DistributionCol col) {
int colId = col.getColId();
final ColumnRefOperator oldRefOperator = prevColumnRefFactory.getColumnRef(colId);
Preconditions.checkArgument(oldRefOperator != null);
// use column mapping to find the new ColumnRefOperator
ColumnRefOperator newRefOperator = columnMapping.get(oldRefOperator);
return newRefOperator;
}

private void processCommon(Operator.Builder opBuilder) {
// first process predicate, then projection
ScalarOperator predicate = opBuilder.getPredicate();
Expand Down
Loading

0 comments on commit c78a90c

Please sign in to comment.