diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index b482584236..4083791803 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -779,6 +779,11 @@ public boolean isAppExpired(String appId) { * @param shuffleIds */ public void removeResourcesByShuffleIds(String appId, List shuffleIds) { + removeResourcesByShuffleIds(appId, shuffleIds, false); + } + + public void removeResourcesByShuffleIds( + String appId, List shuffleIds, boolean isTwoPhases) { Lock writeLock = getAppWriteLock(appId); writeLock.lock(); try { @@ -811,7 +816,7 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) withTimeoutExecution( () -> { storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases)); return null; }, storageRemoveOperationTimeoutSec, @@ -995,7 +1000,7 @@ public void removeShuffleDataAsync(String appId) { @VisibleForTesting public void removeShuffleDataSync(String appId, int shuffleId) { - removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); + removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId),true); } public ShuffleDataDistributionType getDataDistributionType(String appId) { diff --git a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java index cbc39aab84..32750990b0 100644 --- a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java @@ -22,6 +22,11 @@ public class ShufflePurgeEvent extends PurgeEvent { public ShufflePurgeEvent(String appId, String user, List shuffleIds) { - super(appId, user, shuffleIds); + this(appId, user, shuffleIds, false); + } + + public ShufflePurgeEvent( + String appId, String user, List shuffleIds, boolean isTwoPhases) { + super(appId, user, shuffleIds, isTwoPhases); } } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopDeletionStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopDeletionStrategy.java index 9c8daa58d3..270214dc26 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopDeletionStrategy.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopDeletionStrategy.java @@ -102,7 +102,7 @@ void deleteShuffleData(List deletePaths, Storage storage, PurgeEvent eve if (!isSucess) { ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc(); LOG.warn( - "Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements."); + "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); } } else { deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalDeletionStrategy.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalDeletionStrategy.java index 5ba61b5567..9dcc894a1d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalDeletionStrategy.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalDeletionStrategy.java @@ -85,7 +85,7 @@ void deleteShuffleData(List deletePaths, Storage storage, PurgeEvent eve boolean isSucess = twoPhasesDeletionEventQueue.offer(asynchronousDeleteEvent); if (!isSucess) { LOG.warn( - "Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements."); + "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc(); } } else {