Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Make lake publish operation async in alter job to avoid stuck other job #55857

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/*
* Version 2 of AlterJob, for replacing the old version of AlterJob.
Expand Down Expand Up @@ -120,6 +123,8 @@ public enum JobType {

protected Span span;

protected Future<Boolean> publishVersionFuture = null;

public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) {
this.jobId = jobId;
this.type = jobType;
Expand Down Expand Up @@ -333,6 +338,31 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {

public abstract void replay(AlterJobV2 replayedJob);

protected boolean lakePublishVersion() {
return true;
}

protected boolean publishVersion() {
if (publishVersionFuture == null) {
Callable<Boolean> task = () -> {
return lakePublishVersion();
};
publishVersionFuture = GlobalStateMgr.getCurrentState().getLakeAlterPublishExecutor().submit(task);
return false;
} else {
if (publishVersionFuture.isDone()) {
try {
return publishVersionFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error retrieving task result", e);
return false;
}
} else {
return false;
}
}
}

private void finishHook() {
WarehouseIdleChecker.updateJobLastFinishTime(warehouseId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ protected void runFinishedRewritingJob() throws AlterCancelException {
}

if (!publishVersion()) {
LOG.info("publish version failed, will retry later. jobId={}", jobId);
return;
}

Expand Down Expand Up @@ -654,7 +653,7 @@ boolean readyToPublishVersion() throws AlterCancelException {
return true;
}

private boolean publishVersion() {
protected boolean lakePublishVersion() {
try (ReadLockedDatabase db = getReadLockedDatabase(dbId)) {
LakeTable table = getTableOrThrow(db, tableId);
for (long partitionId : physicalPartitionIdToRollupIndex.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ protected void runFinishedRewritingJob() throws AlterCancelException {
}

if (!publishVersion()) {
LOG.info("publish version failed, will retry later. jobId={}", jobId);
return;
}

Expand Down Expand Up @@ -255,7 +254,7 @@ boolean readyToPublishVersion() throws AlterCancelException {
return true;
}

boolean publishVersion() {
protected boolean lakePublishVersion() {
try {
TxnInfoPB txnInfo = new TxnInfoPB();
txnInfo.txnId = watershedTxnId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ protected void runFinishedRewritingJob() throws AlterCancelException {
}

if (!publishVersion()) {
LOG.info("publish version failed, will retry later. jobId={}", jobId);
return;
}

Expand Down Expand Up @@ -767,7 +766,7 @@ boolean readyToPublishVersion() throws AlterCancelException {
return true;
}

boolean publishVersion() {
protected boolean lakePublishVersion() {
try {
TxnInfoPB txnInfo = new TxnInfoPB();
txnInfo.txnId = watershedTxnId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -303,6 +304,7 @@ public class GlobalStateMgr {
* Alter Job Manager
*/
private final AlterJobMgr alterJobMgr;
private final ThreadPoolExecutor lakeAlterPublishExecutor;

private final PortConnectivityChecker portConnectivityChecker;

Expand Down Expand Up @@ -649,6 +651,8 @@ private GlobalStateMgr(boolean isCkptGlobalState, NodeMgr nodeMgr) {
new SchemaChangeHandler(),
new MaterializedViewHandler(),
new SystemHandler());
this.lakeAlterPublishExecutor = ThreadPoolManager.newDaemonCacheThreadPool(
Config.lake_publish_version_max_threads, "alter-publish", false);

this.load = new Load();
this.streamLoadMgr = new StreamLoadMgr();
Expand Down Expand Up @@ -2171,6 +2175,10 @@ public AlterJobMgr getAlterJobMgr() {
return alterJobMgr;
}

public ThreadPoolExecutor getLakeAlterPublishExecutor() {
return lakeAlterPublishExecutor;
}

public SchemaChangeHandler getSchemaChangeHandler() {
return this.alterJobMgr.getSchemaChangeHandler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public void sendAgentTask(AgentBatchTask batchTask) {
lakeRollupJob.runRunningJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, lakeRollupJob.getJobState());

lakeRollupJob.runFinishedRewritingJob();
while (lakeRollupJob.getJobState() != AlterJobV2.JobState.FINISHED) {
lakeRollupJob.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, lakeRollupJob.getJobState());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public void testJobState() throws Exception {
Assert.assertNotEquals(-1L, job.getTransactionId().orElse(-1L).longValue());
job.runRunningJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, job.getJobState());
job.runFinishedRewritingJob();
while (job.getJobState() != AlterJobV2.JobState.FINISHED) {
job.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, job.getJobState());

Assert.assertTrue(table.enablePersistentIndex());
Expand All @@ -125,7 +128,10 @@ public void testSetEnablePersistentWithoutType() throws Exception {
Assert.assertNotEquals(-1L, job.getTransactionId().orElse(-1L).longValue());
job.runRunningJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, job.getJobState());
job.runFinishedRewritingJob();
while (job.getJobState() != AlterJobV2.JobState.FINISHED) {
job.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, job.getJobState());
Assert.assertTrue(table.enablePersistentIndex());
// check persistent index type been set
Expand All @@ -144,7 +150,10 @@ public void testSetEnablePersistentWithLocalindex() throws Exception {
Assert.assertNotEquals(-1L, job2.getTransactionId().orElse(-1L).longValue());
job2.runRunningJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, job2.getJobState());
job2.runFinishedRewritingJob();
while (job2.getJobState() != AlterJobV2.JobState.FINISHED) {
job2.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, job2.getJobState());
Assert.assertTrue(table.enablePersistentIndex());
// check persistent index type been set
Expand All @@ -167,7 +176,10 @@ public void testSetDisblePersistentIndex() throws Exception {
Assert.assertNotEquals(-1L, job2.getTransactionId().orElse(-1L).longValue());
job2.runRunningJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, job2.getJobState());
job2.runFinishedRewritingJob();
while (job2.getJobState() != AlterJobV2.JobState.FINISHED) {
job2.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, job2.getJobState());
Assert.assertFalse(table2.enablePersistentIndex());

Expand Down Expand Up @@ -258,8 +270,12 @@ public void testDropDb03() throws DdlException, MetaNotFoundException {
}

@Test
public void testReplay() {
public void testReplay() throws Exception {
job.run();
while (job.getJobState() != AlterJobV2.JobState.FINISHED) {
job.run();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, job.getJobState());

LakeTableAlterMetaJob replayAlterMetaJob = new LakeTableAlterMetaJob(job.jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
import com.starrocks.catalog.TabletInvertedIndex;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.util.concurrent.MarkedCountDownLatch;
import com.starrocks.lake.LakeTable;
import com.starrocks.lake.LakeTablet;
import com.starrocks.lake.Utils;
import com.starrocks.proto.TxnInfoPB;
import com.starrocks.qe.ConnectContext;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.server.WarehouseManager;
Expand All @@ -52,7 +49,6 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.validation.constraints.NotNull;

public class LakeTableSchemaChangeJobTest {
private static final int NUM_BUCKETS = 4;
Expand Down Expand Up @@ -426,17 +422,7 @@ public void sendAgentTask(AgentBatchTask batchTask) {
}

@Test
public void testPublishVersion() throws AlterCancelException {
new MockUp<Utils>() {
@Mock
public void publishVersion(@NotNull List<Tablet> tablets, TxnInfoPB txnInfo, long baseVersion,
long newVersion, long warehouseId)
throws
RpcException {
throw new RpcException("publish version failed", "127.0.0.1");
}
};

public void testPublishVersion() throws AlterCancelException, InterruptedException {
new MockUp<LakeTableSchemaChangeJob>() {
@Mock
public void sendAgentTask(AgentBatchTask batchTask) {
Expand Down Expand Up @@ -486,21 +472,18 @@ public void sendAgentTask(AgentBatchTask batchTask) {
// Add table back to database
db.registerTableUnlocked(table);

// We've mocked ColumnTypeConverter.publishVersion to throw RpcException, should this runFinishedRewritingJob will fail but
// should not throw any exception.
schemaChangeJob.runFinishedRewritingJob();
Assert.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, schemaChangeJob.getJobState());

// Make publish version success
new MockUp<Utils>() {
new MockUp<AlterJobV2>() {
@Mock
public void publishVersion(@NotNull List<Tablet> tablets, TxnInfoPB txnInfo, long baseVersion,
long newVersion, long warehouseId) {
// nothing to do
public boolean publishVersion() {
return true;
}
};

schemaChangeJob.runFinishedRewritingJob();
while (schemaChangeJob.getJobState() != AlterJobV2.JobState.FINISHED) {
schemaChangeJob.runFinishedRewritingJob();
Thread.sleep(100);
}
Assert.assertEquals(AlterJobV2.JobState.FINISHED, schemaChangeJob.getJobState());
Assert.assertTrue(schemaChangeJob.getFinishedTimeMs() > System.currentTimeMillis() - 10_000L);

Expand Down
Loading