Skip to content

Commit

Permalink
[Improvement] Quickly delete local or HDFS data at the shuffleId level.
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Sep 12, 2024
1 parent 3593513 commit bb11961
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ public boolean isAppExpired(String appId) {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
removeResourcesByShuffleIds(appId, shuffleIds, false);
}

public void removeResourcesByShuffleIds(
String appId, List<Integer> shuffleIds, boolean isTwoPhases) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
Expand Down Expand Up @@ -811,7 +816,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases));
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -995,7 +1000,7 @@ public void removeShuffleDataAsync(String appId) {

@VisibleForTesting
public void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId),true);
}

public ShuffleDataDistributionType getDataDistributionType(String appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
public class ShufflePurgeEvent extends PurgeEvent {

public ShufflePurgeEvent(String appId, String user, List<Integer> shuffleIds) {
super(appId, user, shuffleIds);
this(appId, user, shuffleIds, false);
}

public ShufflePurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhases) {
super(appId, user, shuffleIds, isTwoPhases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void deleteShuffleData(List<String> deletePaths, Storage storage, PurgeEvent eve
if (!isSucess) {
ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc();
LOG.warn(
"Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements.");
"Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements.");
}
} else {
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void deleteShuffleData(List<String> deletePaths, Storage storage, PurgeEvent eve
boolean isSucess = twoPhasesDeletionEventQueue.offer(asynchronousDeleteEvent);
if (!isSucess) {
LOG.warn(
"Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements.");
"Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements.");
ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc();
}
} else {
Expand Down

0 comments on commit bb11961

Please sign in to comment.