Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UT] [BugFix] Fix possible NPE querying task_runs schema table #55968

Merged
merged 2 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,7 @@ private Map<String, MaterializedView.BasePartitionInfo> getRefreshedPartitionInf
partition.getDefaultPhysicalPartition().getVisibleVersionTime());
partitionInfos.put(partition.getName(), basePartitionInfo);
}
LOG.info("Collect olap base table {}'s refreshed partition infos: {}", baseTable.getName(), partitionInfos);
LOG.debug("Collect olap base table {}'s refreshed partition infos: {}", baseTable.getName(), partitionInfos);
return partitionInfos;
} else if (ConnectorPartitionTraits.isSupportPCTRefresh(baseTable.getType())) {
return getSelectedPartitionInfos(baseTable, Lists.newArrayList(refreshedPartitionNames), baseTableInfo);
Expand Down
61 changes: 40 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class TaskRun implements Comparable<TaskRun> {
public static final Set<String> TASK_RUN_PROPERTIES = ImmutableSet.of(
MV_ID, PARTITION_START, PARTITION_END, FORCE, START_TASK_RUN_ID, PARTITION_VALUES);

public static final int INVALID_TASK_PROGRESS = -1;

// Only used in FE's UT
public static final String IS_TEST = "__IS_TEST__";
private boolean isKilled = false;
Expand Down Expand Up @@ -314,31 +316,48 @@ public TaskRunStatus getStatus() {
if (status == null) {
return null;
}
switch (status.getState()) {
case RUNNING:
if (runCtx != null) {
StmtExecutor executor = runCtx.getExecutor();
if (executor != null && executor.getCoordinator() != null) {
long jobId = executor.getCoordinator().getLoadJobId();
if (jobId != -1) {
InsertLoadJob job = (InsertLoadJob) GlobalStateMgr.getCurrentState()
.getLoadMgr().getLoadJob(jobId);
int progress = job.getProgress();
if (progress == 100) {
progress = 99;
}
status.setProgress(progress);
}
}
}
break;
case SUCCESS:
status.setProgress(100);
break;
final int progress = getProgress();
if (progress != INVALID_TASK_PROGRESS) {
status.setProgress(progress);
}
return status;
}

private int getProgress() {
if (status == null) {
return INVALID_TASK_PROGRESS;
}

if (status.getState().isSuccessState()) {
return 100;
} else if (status.getState().isFinishState()) {
return INVALID_TASK_PROGRESS;
} else {
if (runCtx == null) {
return INVALID_TASK_PROGRESS;
}
final StmtExecutor executor = runCtx.getExecutor();
if (executor == null || executor.getCoordinator() == null) {
return INVALID_TASK_PROGRESS;
}
final long jobId = executor.getCoordinator().getLoadJobId();
if (jobId == -1) {
return INVALID_TASK_PROGRESS;
}
final InsertLoadJob job = (InsertLoadJob) GlobalStateMgr.getCurrentState()
.getLoadMgr().getLoadJob(jobId);
if (job == null) {
return INVALID_TASK_PROGRESS;
}
int progress = job.getProgress();
// if the progress is 100, we should return 99 to avoid the task run is marked as success
if (progress == 100) {
progress = 99;
}
return progress;
}
}

public TaskRunStatus initStatus(String queryId, Long createTime) {
TaskRunStatus status = new TaskRunStatus();
long created = createTime == null ? System.currentTimeMillis() : createTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.starrocks.common.FeConstants;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.utframe.UtFrameUtils;
Expand Down Expand Up @@ -358,4 +359,32 @@ public void testGetAllRunnableTaskCount() {
Assert.assertEquals(Long.valueOf(3), result.get(1L));
Assert.assertEquals(Long.valueOf(3), result.get(2L));
}

@Test
public void testTaskRunProgress() {
ConnectContext context1 = new ConnectContext();
context1.setGlobalStateMgr(connectContext.getGlobalStateMgr());
Task task = new Task("test");
TaskRun run = makeTaskRun(1, task, makeExecuteOption(true, false), System.currentTimeMillis());
Assert.assertTrue(run.getStatus() != null);
Assert.assertEquals(0, run.getStatus().getProgress());

TaskRunStatus status = run.getStatus();
status.setState(Constants.TaskRunState.SUCCESS);
Assert.assertEquals(100, run.getStatus().getProgress());
status.setState(Constants.TaskRunState.MERGED);
Assert.assertEquals(100, run.getStatus().getProgress());

// keep original progress
status.setState(Constants.TaskRunState.FAILED);
Assert.assertEquals(100, run.getStatus().getProgress());

status.setState(Constants.TaskRunState.RUNNING);
status.setProgress(10);
// should not throw exception
Assert.assertEquals(10, run.getStatus().getProgress());

status.setProgress(100);
Assert.assertEquals(100, run.getStatus().getProgress());
}
}
Loading