Skip to content

Commit

Permalink
[Improvement] Quickly delete local or HDFS data at the shuffleId level.
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Aug 23, 2024
1 parent 57f0f8b commit ecf9e44
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void registerShuffle(
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
shuffleServer.getShuffleTaskManager().quickRemoveShuffleDataSync(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}. It costs {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,16 @@ public boolean isAppExpired(String appId) {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
removeResourcesByShuffleIds(appId, shuffleIds, false);
}

/**
* Clear up the partial resources of shuffleIds of App.
*
* @param appId
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds, boolean isQuick) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
Expand Down Expand Up @@ -811,7 +821,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds), isQuick);
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -998,6 +1008,16 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
}

/**
* Delete all data under the shuffleId using the synchronous quick delete mode.
*
* @param appId
* @param shuffleId
*/
public void quickRemoveShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
}

public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -38,6 +42,7 @@
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
Expand All @@ -64,12 +69,19 @@ public class HadoopStorageManager extends SingleStorageManager {
private Map<String, HadoopStorage> appIdToStorages = JavaUtils.newConcurrentMap();
private Map<String, HadoopStorage> pathToStorages = JavaUtils.newConcurrentMap();
private final boolean isStorageAuditLogEnabled;
private final Map<String, Map<String, List<String>>> quickDeletePaths;
private ScheduledExecutorService deletePathExecutorService;

HadoopStorageManager(ShuffleServerConf conf) {
super(conf);
hadoopConf = conf.getHadoopConf();
shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId");
isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
this.quickDeletePaths = JavaUtils.newConcurrentMap();
deletePathExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("deleteHadoopPathExecutor");
deletePathExecutorService.scheduleAtFixedRate(
this::clearQuickDeletePath, 1000 / 2, 1000, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -98,6 +110,11 @@ public Storage selectStorage(ShuffleDataReadEvent event) {

@Override
public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

@Override
public void removeResources(PurgeEvent event, boolean isQuick) {
String appId = event.getAppId();
HadoopStorage storage = getStorageByAppId(appId);
if (storage != null) {
Expand Down Expand Up @@ -148,7 +165,20 @@ public void removeResources(PurgeEvent event) {
storage.getStoragePath()));
}
}
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
if (isQuick) {
Map<String, Map<String, List<String>>> userAppIdNeedDeletePaths =
JavaUtils.newConcurrentMap();
Map<String, List<String>> appIdNeedDeletePaths =
userAppIdNeedDeletePaths.computeIfAbsent(
event.getUser(), k -> JavaUtils.newConcurrentMap());
List<String> needDeletePaths =
appIdNeedDeletePaths.computeIfAbsent(event.getAppId(), k -> Lists.newArrayList());
needDeletePaths.addAll(deletePaths);
deleteHandler.quickDelete(userAppIdNeedDeletePaths);
quickDeletePaths.putAll(userAppIdNeedDeletePaths);
} else {
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
}
removeAppStorageInfo(event);
} else {
LOG.warn("Storage gotten is null when removing resources for event: {}", event);
Expand Down Expand Up @@ -189,6 +219,33 @@ public Map<String, StorageInfo> getStorageInfo() {
return Maps.newHashMap();
}

private void clearQuickDeletePath() {
synchronized (quickDeletePaths) {
// delete shuffle data for application
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
if (!quickDeletePaths.isEmpty()) {
for (Map.Entry<String, Map<String, List<String>>> userAppIdNeedDeletePaths :
quickDeletePaths.entrySet()) {
String user = userAppIdNeedDeletePaths.getKey();
for (Map.Entry<String, List<String>> appIdNeedDeletePaths :
userAppIdNeedDeletePaths.getValue().entrySet()) {
String appId = appIdNeedDeletePaths.getKey();
List<String> needDeletePaths = appIdNeedDeletePaths.getValue();
List<String> needDeleteTmpPaths =
needDeletePaths.stream()
.map(path -> StringUtils.join(path, "_tmp"))
.collect(Collectors.toList());
deleteHandler.delete(needDeleteTmpPaths.toArray(new String[0]), appId, user);
}
}
}
}
}

public HadoopStorage getStorageByAppId(String appId) {
if (!appIdToStorages.containsKey(appId)) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,13 @@ public Map<String, StorageInfo> getStorageInfo() {
}

public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

public void removeResources(PurgeEvent event, boolean isQuick) {
LOG.info("Start to remove resource of {}", event);
warmStorageManager.removeResources(event);
coldStorageManager.removeResources(event);
warmStorageManager.removeResources(event, isQuick);
coldStorageManager.removeResources(event, isQuick);
}

public StorageManager getColdStorageManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -53,6 +55,7 @@
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.Checker;
Expand Down Expand Up @@ -90,6 +93,8 @@ public class LocalStorageManager extends SingleStorageManager {
private final List<StorageMediaProvider> typeProviders = Lists.newArrayList();

private final boolean isStorageAuditLogEnabled;
private final Map<String, Map<String, List<String>>> quickDeletePaths;
private ScheduledExecutorService deletePathExecutorService;

@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
Expand Down Expand Up @@ -175,6 +180,11 @@ public class LocalStorageManager extends SingleStorageManager {
localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList())));
this.checker = new LocalStorageChecker(conf, localStorages);
isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
this.quickDeletePaths = JavaUtils.newConcurrentMap();
deletePathExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("deleteLocalPathExecutor");
deletePathExecutorService.scheduleAtFixedRate(
this::clearQuickDeletePath, 1000 / 2, 1000, TimeUnit.MILLISECONDS);
}

private StorageMedia getStorageTypeForBasePath(String basePath) {
Expand Down Expand Up @@ -266,6 +276,11 @@ public Checker getStorageChecker() {

@Override
public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

@Override
public void removeResources(PurgeEvent event, boolean isQuick) {
String appId = event.getAppId();
String user = event.getUser();
List<Integer> shuffleSet =
Expand Down Expand Up @@ -327,11 +342,50 @@ public void removeResources(PurgeEvent event) {
}
})
.collect(Collectors.toList());

deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
if (isQuick) {
Map<String, Map<String, List<String>>> userAppIdNeedDeletePaths =
JavaUtils.newConcurrentMap();
Map<String, List<String>> appIdNeedDeletePaths =
userAppIdNeedDeletePaths.computeIfAbsent(
event.getUser(), k -> JavaUtils.newConcurrentMap());
List<String> needDeletePaths =
appIdNeedDeletePaths.computeIfAbsent(event.getAppId(), k -> Lists.newArrayList());
needDeletePaths.addAll(deletePaths);
deleteHandler.quickDelete(userAppIdNeedDeletePaths);
quickDeletePaths.putAll(userAppIdNeedDeletePaths);
} else {
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
}
removeAppStorageInfo(event);
}

private void clearQuickDeletePath() {
synchronized (quickDeletePaths) {
// delete shuffle data for application
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
if (!quickDeletePaths.isEmpty()) {
for (Map.Entry<String, Map<String, List<String>>> userAppIdNeedDeletePaths :
quickDeletePaths.entrySet()) {
String user = userAppIdNeedDeletePaths.getKey();
for (Map.Entry<String, List<String>> appIdNeedDeletePaths :
userAppIdNeedDeletePaths.getValue().entrySet()) {
String appId = appIdNeedDeletePaths.getKey();
List<String> needDeletePaths = appIdNeedDeletePaths.getValue();
List<String> needDeleteTmpPaths =
needDeletePaths.stream()
.map(path -> StringUtils.join(path, "_tmp"))
.collect(Collectors.toList());
deleteHandler.delete(needDeleteTmpPaths.toArray(new String[0]), appId, user);
}
}
}
}
}

private void cleanupStorageSelectionCache(PurgeEvent event) {
Function<String, Boolean> deleteConditionFunc = null;
String prefixKey = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface StorageManager {

void removeResources(PurgeEvent event);

void removeResources(PurgeEvent event, boolean isQuick);

void start();

void stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.uniffle.storage.handler.api;

import java.util.List;
import java.util.Map;

public interface ShuffleDeleteHandler {

/**
Expand All @@ -25,4 +28,11 @@ public interface ShuffleDeleteHandler {
* @param appId ApplicationId for delete
*/
void delete(String[] storageBasePaths, String appId, String user);

/**
* Rename the file and then delete it asynchronously.
*
* @param userAppIdNeedDeletePaths
*/
void quickDelete(Map<String, Map<String, List<String>>> userAppIdNeedDeletePaths);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -118,4 +121,63 @@ private void delete(FileSystem fileSystem, Path path, String filePrefix) throws
fileSystem.delete(path, true);
}
}

@Override
public void quickDelete(Map<String, Map<String, List<String>>> userAppIdNeedDeletePaths) {
for (Map.Entry<String, Map<String, List<String>>> userAppIdNeedDeletePath :
userAppIdNeedDeletePaths.entrySet()) {
String user = userAppIdNeedDeletePath.getKey();
for (Map.Entry<String, List<String>> appIdNeedDeletePaths :
userAppIdNeedDeletePath.getValue().entrySet()) {
String appId = appIdNeedDeletePaths.getKey();
List<String> needDeletePaths = appIdNeedDeletePaths.getValue();
for (String needDeletePath : needDeletePaths) {
final Path path = new Path(needDeletePath);
final Path breakdownPathFolder = new Path(StringUtils.join(needDeletePath, "_tmp"));
boolean isSuccess = false;
int times = 0;
int retryMax = 5;
long start = System.currentTimeMillis();
LOG.info(
"Try rename shuffle data in Hadoop FS for appId[{}] of user[{}] with {}",
appId,
user,
path);
while (!isSuccess && times < retryMax) {
try {
FileSystem fileSystem =
HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
fileSystem.rename(path, breakdownPathFolder);
isSuccess = true;
} catch (Exception e) {
if (e instanceof FileNotFoundException) {
LOG.info("[{}] doesn't exist, ignore it.", path);
return;
}
times++;
LOG.warn("Can't rename shuffle data for appId[{}] with {} times", appId, times, e);
try {
Thread.sleep(1000);
} catch (Exception ex) {
LOG.warn("Exception happened when Thread.sleep", ex);
}
}
}
if (isSuccess) {
LOG.info(
"Rename shuffle data in Hadoop FS for appId[{}] with {} successfully in {} ms",
appId,
path,
(System.currentTimeMillis() - start));
} else {
LOG.info(
"Failed to rename shuffle data in Hadoop FS for appId [{}] with {} successfully in {} ms",
appId,
path,
(System.currentTimeMillis() - start));
}
}
}
}
}
}
Loading

0 comments on commit ecf9e44

Please sign in to comment.