Skip to content

Commit

Permalink
[UT] [BugFix] Fix possible NPE querying task_runs schema table (#55968)
Browse files Browse the repository at this point in the history
Signed-off-by: shuming.li <[email protected]>
(cherry picked from commit 2552759)
  • Loading branch information
LiShuMing authored and mergify[bot] committed Feb 18, 2025
1 parent 3db43ba commit 20db774
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ private Map<String, MaterializedView.BasePartitionInfo> getRefreshedPartitionInf
partition.getDefaultPhysicalPartition().getVisibleVersionTime());
partitionInfos.put(partition.getName(), basePartitionInfo);
}
LOG.info("Collect olap base table {}'s refreshed partition infos: {}", baseTable.getName(), partitionInfos);
LOG.debug("Collect olap base table {}'s refreshed partition infos: {}", baseTable.getName(), partitionInfos);
return partitionInfos;
} else if (ConnectorPartitionTraits.isSupportPCTRefresh(baseTable.getType())) {
return getSelectedPartitionInfos(baseTable, Lists.newArrayList(refreshedPartitionNames), baseTableInfo);
Expand Down
61 changes: 40 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class TaskRun implements Comparable<TaskRun> {
public static final Set<String> TASK_RUN_PROPERTIES = ImmutableSet.of(
MV_ID, PARTITION_START, PARTITION_END, FORCE, START_TASK_RUN_ID, PARTITION_VALUES);

public static final int INVALID_TASK_PROGRESS = -1;

// Only used in FE's UT
public static final String IS_TEST = "__IS_TEST__";
private boolean isKilled = false;
Expand Down Expand Up @@ -305,31 +307,48 @@ public TaskRunStatus getStatus() {
if (status == null) {
return null;
}
switch (status.getState()) {
case RUNNING:
if (runCtx != null) {
StmtExecutor executor = runCtx.getExecutor();
if (executor != null && executor.getCoordinator() != null) {
long jobId = executor.getCoordinator().getLoadJobId();
if (jobId != -1) {
InsertLoadJob job = (InsertLoadJob) GlobalStateMgr.getCurrentState()
.getLoadMgr().getLoadJob(jobId);
int progress = job.getProgress();
if (progress == 100) {
progress = 99;
}
status.setProgress(progress);
}
}
}
break;
case SUCCESS:
status.setProgress(100);
break;
final int progress = getProgress();
if (progress != INVALID_TASK_PROGRESS) {
status.setProgress(progress);
}
return status;
}

private int getProgress() {
if (status == null) {
return INVALID_TASK_PROGRESS;
}

if (status.getState().isSuccessState()) {
return 100;
} else if (status.getState().isFinishState()) {
return INVALID_TASK_PROGRESS;
} else {
if (runCtx == null) {
return INVALID_TASK_PROGRESS;
}
final StmtExecutor executor = runCtx.getExecutor();
if (executor == null || executor.getCoordinator() == null) {
return INVALID_TASK_PROGRESS;
}
final long jobId = executor.getCoordinator().getLoadJobId();
if (jobId == -1) {
return INVALID_TASK_PROGRESS;
}
final InsertLoadJob job = (InsertLoadJob) GlobalStateMgr.getCurrentState()
.getLoadMgr().getLoadJob(jobId);
if (job == null) {
return INVALID_TASK_PROGRESS;
}
int progress = job.getProgress();
// if the progress is 100, we should return 99 to avoid the task run is marked as success
if (progress == 100) {
progress = 99;
}
return progress;
}
}

public TaskRunStatus initStatus(String queryId, Long createTime) {
TaskRunStatus status = new TaskRunStatus();
long created = createTime == null ? System.currentTimeMillis() : createTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.starrocks.common.FeConstants;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.utframe.UtFrameUtils;
Expand Down Expand Up @@ -358,4 +359,32 @@ public void testGetAllRunnableTaskCount() {
Assert.assertEquals(Long.valueOf(3), result.get(1L));
Assert.assertEquals(Long.valueOf(3), result.get(2L));
}

@Test
public void testTaskRunProgress() {
ConnectContext context1 = new ConnectContext();
context1.setGlobalStateMgr(connectContext.getGlobalStateMgr());
Task task = new Task("test");
TaskRun run = makeTaskRun(1, task, makeExecuteOption(true, false), System.currentTimeMillis());
Assert.assertTrue(run.getStatus() != null);
Assert.assertEquals(0, run.getStatus().getProgress());

TaskRunStatus status = run.getStatus();
status.setState(Constants.TaskRunState.SUCCESS);
Assert.assertEquals(100, run.getStatus().getProgress());
status.setState(Constants.TaskRunState.MERGED);
Assert.assertEquals(100, run.getStatus().getProgress());

// keep original progress
status.setState(Constants.TaskRunState.FAILED);
Assert.assertEquals(100, run.getStatus().getProgress());

status.setState(Constants.TaskRunState.RUNNING);
status.setProgress(10);
// should not throw exception
Assert.assertEquals(10, run.getStatus().getProgress());

status.setProgress(100);
Assert.assertEquals(100, run.getStatus().getProgress());
}
}

0 comments on commit 20db774

Please sign in to comment.