Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UT] [BugFix] Fix mv unstable test cases #55998

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MvId;
Expand Down Expand Up @@ -491,6 +490,31 @@ private void updateTaskDefinition(MaterializedView materializedView) {
}
}

/**
* Inactive the materialized view and its related materialized views.
*/
private static void doInactiveMaterializedView(MaterializedView mv, String reason) {
if (mv == null) {
return;
}
LOG.warn("Inactive MV {}/{} because {}", mv.getName(), mv.getId(), reason);
// inactive mv by reason
if (mv.isActive()) {
// log edit log
String status = AlterMaterializedViewStatusClause.INACTIVE;
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(mv, status, reason, false);
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(mv.getDbId(),
mv.getId(), status, MANUAL_INACTIVE_MV_REASON);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
} else {
mv.setInactiveAndReason(reason);
}
// recursive inactive
inactiveRelatedMaterializedView(mv,
MaterializedViewExceptions.inactiveReasonForBaseTableActive(mv.getName()), false);
}

/**
* Inactive related materialized views because of base table/view is changed or dropped in the leader background.
*/
Expand All @@ -511,32 +535,15 @@ public static void inactiveRelatedMaterializedView(Table olapTable, String reaso
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(mvId.getDbId());
if (db == null) {
LOG.warn("Table {} inactive MaterializedView, viewId {} ,db {} not found",
olapTable.getName(),
mvId.getId(),
mvId.getDbId());
olapTable.getName(), mvId.getId(), mvId.getDbId());
continue;
}
MaterializedView mv = (MaterializedView) db.getTable(mvId.getId());
if (mv != null) {
LOG.warn("Inactive MV {}/{} because {}", mv.getName(), mv.getId(), reason);
// inactive mv by reason
if (mv.isActive()) {
// log edit log
String status = AlterMaterializedViewStatusClause.INACTIVE;
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(mv, status, reason, false);
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(mv.getDbId(),
mv.getId(), status, MANUAL_INACTIVE_MV_REASON);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
} else {
mv.setInactiveAndReason(reason);
}
// recursive inactive
inactiveRelatedMaterializedView(mv,
MaterializedViewExceptions.inactiveReasonForBaseTableActive(mv.getName()), false);
} else {
if (mv == null) {
LOG.info("Ignore materialized view {} does not exists", mvId);
continue;
}
doInactiveMaterializedView(mv, reason);
}
}

Expand Down Expand Up @@ -566,8 +573,9 @@ public static void inactiveRelatedMaterializedViews(Database db,

}
// TODO: support more types for base table's schema change.
String reason = MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns);
try {
List<MvPlanContext> mvPlanContexts = MvPlanContextBuilder.getPlanContext(mv);
List<MvPlanContext> mvPlanContexts = MvPlanContextBuilder.getPlanContext(mv, true);
for (MvPlanContext mvPlanContext : mvPlanContexts) {
if (mvPlanContext != null) {
OptExpression mvPlan = mvPlanContext.getLogicalPlan();
Expand All @@ -581,14 +589,8 @@ public static void inactiveRelatedMaterializedViews(Database db,
Set<String> usedColNames = usedColRefs.stream()
.map(x -> x.getName())
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
for (String modifiedColumn : modifiedColumns) {
if (usedColNames.contains(modifiedColumn)) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the column {} of the table {} was modified.", mv.getName(), mv.getId(),
modifiedColumn, olapTable.getName());
mv.setInactiveAndReason(
MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
}
if (modifiedColumns.stream().anyMatch(usedColNames::contains)) {
doInactiveMaterializedView(mv, reason);
}
}
}
Expand All @@ -597,19 +599,12 @@ public static void inactiveRelatedMaterializedViews(Database db,
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the columns of the table {} was modified.", mv.getName(), mv.getId(),
olapTable.getName());
mv.setInactiveAndReason(MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
doInactiveMaterializedView(mv, reason);
} catch (Exception e) {
LOG.warn("Get related materialized view {} failed:", mv.getName(), e);
// basic check: may lose some situations
for (Column mvColumn : mv.getColumns()) {
if (modifiedColumns.contains(mvColumn.getName())) {
LOG.warn("Setting the materialized view {}({}) to invalid because " +
"the column {} of the table {} was modified.", mv.getName(), mv.getId(),
mvColumn.getName(), olapTable.getName());
mv.setInactiveAndReason(
MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
break;
}
if (mv.getColumns().stream().anyMatch(x -> modifiedColumns.contains(x.getName()))) {
doInactiveMaterializedView(mv, reason);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<MvPlanContext> getPlanContextIfPresent(SessionVariable sessionVariab
*/
private static List<MvPlanContext> loadMvPlanContext(MaterializedView mv) {
try {
return MvPlanContextBuilder.getPlanContext(mv);
return MvPlanContextBuilder.getPlanContext(mv, false);
} catch (Throwable e) {
LOG.warn("load mv plan cache failed: {}", mv.getName(), e);
return Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,25 @@
import com.starrocks.catalog.MvPlanContext;
import com.starrocks.catalog.Table.TableType;
import com.starrocks.qe.ConnectContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class MvPlanContextBuilder {
public static List<MvPlanContext> getPlanContext(MaterializedView mv) {
private static final Logger LOG = LogManager.getLogger(MvPlanContextBuilder.class);

/**
* Get plan context for the given materialized view.
* @param mv
* @param isThrowException: whether to throw exception when failed to build plan context:
* - when in altering table, we want to throw exception to fail the alter table operation
* - when in generating mv plan, we want to ignore the exception and continue the query
*/
public static List<MvPlanContext> getPlanContext(MaterializedView mv,
boolean isThrowException) {
// build mv query logical plan
MaterializedViewOptimizer mvOptimizer = new MaterializedViewOptimizer();

Expand All @@ -33,15 +47,30 @@ public static List<MvPlanContext> getPlanContext(MaterializedView mv) {

List<MvPlanContext> results = Lists.newArrayList();
try (var guard = connectContext.bindScope()) {
MvPlanContext contextWithoutView = mvOptimizer.optimize(mv, connectContext);
results.add(contextWithoutView);
Optional.ofNullable(doGetOptimizePlan(() -> mvOptimizer.optimize(mv, connectContext), isThrowException))
.map(results::add);

// TODO: Only add context with view when view rewrite is set on.
if (mv.getBaseTableTypes().stream().anyMatch(type -> type == TableType.VIEW)) {
MvPlanContext contextWithView = mvOptimizer.optimize(mv, connectContext, false, true);
results.add(contextWithView);
Optional.ofNullable(doGetOptimizePlan(() -> mvOptimizer.optimize(mv, connectContext, false, true),
isThrowException))
.map(results::add);
}
}
return results;
}

private static MvPlanContext doGetOptimizePlan(Supplier<MvPlanContext> supplier,
boolean isThrowException) {
try {
return supplier.get();
} catch (Exception e) {
// ignore
LOG.warn("Failed to build mv plan context", e);
if (isThrowException) {
throw e;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ private OptExpression getMvDefinedQueryPlan(OptExpression mvPlan,
MaterializationContext mvContext,
List<ColumnRefOperator> originalOutputColumns) {
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(mvContext);
OptExpression newMvQueryPlan = duplicator.duplicate(mvPlan, true, true);
OptExpression newMvQueryPlan = duplicator.duplicate(mvPlan, mvContext.getMvColumnRefFactory(),
true, true);

List<ColumnRefOperator> orgMvQueryOutputColumnRefs = mvContext.getMvOutputColumnRefs();
List<ColumnRefOperator> newQueryOutputColumns = duplicator.getMappedColumns(orgMvQueryOutputColumnRefs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private static Pair<OptExpression, List<ColumnRefOperator>> getMVCompensationPla
return null;
}
OptExpressionDuplicator duplicator = new OptExpressionDuplicator(mvContext);
OptExpression newMvQueryPlan = duplicator.duplicate(compensateMvQueryPlan);
OptExpression newMvQueryPlan = duplicator.duplicate(compensateMvQueryPlan, mvContext.getMvColumnRefFactory());
if (newMvQueryPlan == null) {
logMVRewrite(mvContext, "Duplicate compensate query plan failed");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,32 @@ public OptExpressionDuplicator(ColumnRefFactory columnRefFactory, OptimizerConte
}

public OptExpression duplicate(OptExpression source) {
return duplicate(source, false, false);
return duplicate(source, columnRefFactory, false, false);
}

public OptExpression duplicate(OptExpression source,
boolean isResetSelectedPartitions) {
return duplicate(source, isResetSelectedPartitions, false);
return duplicate(source, columnRefFactory, isResetSelectedPartitions, false);
}

public OptExpression duplicate(OptExpression source, ColumnRefFactory prevColumnRefFactory) {
return duplicate(source, prevColumnRefFactory, false, false);
}

/**
* Duplicate the OptExpression tree.
* @param source: source OptExpression
* @param prevColumnRefFactory: column ref factory where source OptExpression is from
* @param isResetSelectedPartitions: whether to reset selected partitions
* @param isRefreshExternalTable: whether to refresh external table
* @return
*/
public OptExpression duplicate(OptExpression source,
ColumnRefFactory prevColumnRefFactory,
boolean isResetSelectedPartitions,
boolean isRefreshExternalTable) {
OptExpressionDuplicatorVisitor visitor = new OptExpressionDuplicatorVisitor(optimizerContext, columnMapping, rewriter,
isResetSelectedPartitions, isRefreshExternalTable);
prevColumnRefFactory, isResetSelectedPartitions, isRefreshExternalTable);
return source.getOp().accept(visitor, source, null);
}

Expand Down Expand Up @@ -145,15 +158,18 @@ class OptExpressionDuplicatorVisitor extends OptExpressionVisitor<OptExpression,
private final ReplaceColumnRefRewriter rewriter;
private final boolean isResetSelectedPartitions;
private final boolean isRefreshExternalTable;
private final ColumnRefFactory prevColumnRefFactory;

OptExpressionDuplicatorVisitor(OptimizerContext optimizerContext,
Map<ColumnRefOperator, ColumnRefOperator> columnMapping,
ReplaceColumnRefRewriter rewriter,
ColumnRefFactory prevColumnRefFactory,
boolean isResetSelectedPartitions,
boolean isRefreshExternalTable) {
this.optimizerContext = optimizerContext;
this.columnMapping = columnMapping;
this.rewriter = rewriter;
this.prevColumnRefFactory = prevColumnRefFactory;
this.isResetSelectedPartitions = isResetSelectedPartitions;
this.isRefreshExternalTable = isRefreshExternalTable;
}
Expand Down Expand Up @@ -209,8 +225,7 @@ public OptExpression visitLogicalTableScan(OptExpression optExpression, Void con
LogicalOlapScanOperator.Builder olapScanBuilder = (LogicalOlapScanOperator.Builder) scanBuilder;
if (olapScan.getDistributionSpec() instanceof HashDistributionSpec) {
HashDistributionSpec newHashDistributionSpec =
processHashDistributionSpec((HashDistributionSpec) olapScan.getDistributionSpec(),
columnRefFactory, columnMapping);
processHashDistributionSpec((HashDistributionSpec) olapScan.getDistributionSpec());
olapScanBuilder.setDistributionSpec(newHashDistributionSpec);
}

Expand Down Expand Up @@ -659,51 +674,50 @@ public OptExpression visit(OptExpression optExpression, Void context) {

// rewrite shuffle columns and joinEquivalentColumns in HashDistributionSpec
// because the columns ids have changed
private HashDistributionSpec processHashDistributionSpec(
HashDistributionSpec originSpec,
ColumnRefFactory columnRefFactory,
Map<ColumnRefOperator, ColumnRefOperator> columnMapping) {

private HashDistributionSpec processHashDistributionSpec(HashDistributionSpec originSpec) {
// HashDistributionDesc
List<DistributionCol> newColumns = Lists.newArrayList();
for (DistributionCol column : originSpec.getShuffleColumns()) {
ColumnRefOperator oldRefOperator = columnRefFactory.getColumnRef(column.getColId());
ColumnRefOperator newRefOperator = columnMapping.get(oldRefOperator);
final List<DistributionCol> newColumns = Lists.newArrayList();
for (DistributionCol distributionCol : originSpec.getShuffleColumns()) {
final ColumnRefOperator newRefOperator = getNewDistributionColRef(distributionCol);
Preconditions.checkNotNull(newRefOperator);
newColumns.add(new DistributionCol(newRefOperator.getId(), column.isNullStrict()));
newColumns.add(new DistributionCol(newRefOperator.getId(), distributionCol.isNullStrict()));
}
Preconditions.checkState(newColumns.size() == originSpec.getShuffleColumns().size());
HashDistributionDesc hashDistributionDesc =
final HashDistributionDesc hashDistributionDesc =
new HashDistributionDesc(newColumns, originSpec.getHashDistributionDesc().getSourceType());

EquivalentDescriptor equivDesc = originSpec.getEquivDesc();

EquivalentDescriptor newEquivDesc = new EquivalentDescriptor(equivDesc.getTableId(), equivDesc.getPartitionIds());
final EquivalentDescriptor equivDesc = originSpec.getEquivDesc();
final EquivalentDescriptor newEquivDesc = new EquivalentDescriptor(equivDesc.getTableId(),
equivDesc.getPartitionIds());
updateDistributionUnionFind(newEquivDesc.getNullRelaxUnionFind(), equivDesc.getNullStrictUnionFind());
updateDistributionUnionFind(newEquivDesc.getNullStrictUnionFind(), equivDesc.getNullRelaxUnionFind());

return new HashDistributionSpec(hashDistributionDesc, newEquivDesc);
}

private void updateDistributionUnionFind(UnionFind<DistributionCol> newUnionFind,
UnionFind<DistributionCol> oldUnionFind) {

UnionFind<DistributionCol> oldUnionFind) {
for (Set<DistributionCol> distributionColSet : oldUnionFind.getAllGroups()) {
DistributionCol first = null;
for (DistributionCol next : distributionColSet) {
if (first == null) {
first = next;
}
ColumnRefOperator firstCol = columnRefFactory.getColumnRef(first.getColId());
ColumnRefOperator newFirstCol = columnMapping.get(firstCol).cast();

ColumnRefOperator nextCol = columnRefFactory.getColumnRef(next.getColId());
ColumnRefOperator newNextCol = columnMapping.get(nextCol).cast();
final ColumnRefOperator newFirstCol = getNewDistributionColRef(first);
final ColumnRefOperator newNextCol = getNewDistributionColRef(next);
newUnionFind.union(first.updateColId(newFirstCol.getId()), next.updateColId(newNextCol.getId()));
}
}
}

private ColumnRefOperator getNewDistributionColRef(DistributionCol col) {
int colId = col.getColId();
final ColumnRefOperator oldRefOperator = prevColumnRefFactory.getColumnRef(colId);
Preconditions.checkArgument(oldRefOperator != null);
// use column mapping to find the new ColumnRefOperator
ColumnRefOperator newRefOperator = columnMapping.get(oldRefOperator);
return newRefOperator;
}

private void processCommon(Operator.Builder opBuilder) {
// first process predicate, then projection
ScalarOperator predicate = opBuilder.getPredicate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static void beforeClass() throws Exception {
MVTestBase.beforeClass();

connectContext.getSessionVariable().setMaterializedViewRewriteMode("force");
connectContext.getSessionVariable().setEnableLowCardinalityOptimize(false);
starRocksAssert.withTable("CREATE TABLE t0(\n" +
" k1 datetime,\n" +
" v1 INT,\n" +
Expand Down
Loading