Skip to content

Commit

Permalink
[Improvement] Quickly delete local or HDFS data at the shuffleId level.
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Nov 4, 2024
1 parent 85b1f01 commit 93be639
Show file tree
Hide file tree
Showing 16 changed files with 496 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ public void registerShuffle(
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
shuffleServer
.getShuffleTaskManager()
.removeShuffleDataSyncTwoPhases(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}. It costs {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public class ShuffleServerMetrics {
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
public static final String CACHED_BLOCK_COUNT = "cached_block_count";

private static final String TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED =
"total_hadoop_two_phases_deletion_failed";
private static final String TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED =
"total_local_two_phases_deletion_failed";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
Expand Down Expand Up @@ -245,6 +250,8 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
public static Gauge.Child gaugeReadMemoryDataBufferSize;
public static Counter.Child counterLocalTwoPhasesDeletionFaileTd;
public static Counter.Child counterHadoopTwoPhasesDeletionFailed;

public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
Expand Down Expand Up @@ -440,6 +447,10 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
counterTotalHugePartitionExceedHardLimitNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
counterLocalTwoPhasesDeletionFaileTd =
metricsManager.addLabeledCounter(TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED);
counterHadoopTwoPhasesDeletionFailed =
metricsManager.addLabeledCounter(TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED);

gaugeLocalStorageIsWritable =
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ public boolean isAppExpired(String appId) {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
removeResourcesByShuffleIds(appId, shuffleIds, false);
}

public void removeResourcesByShuffleIds(
String appId, List<Integer> shuffleIds, boolean isTwoPhases) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
Expand Down Expand Up @@ -846,7 +851,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases));
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -1054,6 +1059,10 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
}

public void removeShuffleDataSyncTwoPhases(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
}

public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
// Quick Delete or not.
private boolean isTwoPhasesDeletion;

public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
this(appId, user, shuffleIds, false);
}

public PurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhasesDeletion) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
this.isTwoPhasesDeletion = isTwoPhasesDeletion;
}

public String getAppId() {
Expand All @@ -44,6 +52,10 @@ public List<Integer> getShuffleIds() {
return shuffleIds;
}

public boolean isTwoPhasesDeletion() {
return isTwoPhasesDeletion;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
Expand All @@ -56,6 +68,8 @@ public String toString() {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ ", isTwoPhasesDeletion="
+ isTwoPhasesDeletion
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
public class ShufflePurgeEvent extends PurgeEvent {

public ShufflePurgeEvent(String appId, String user, List<Integer> shuffleIds) {
super(appId, user, shuffleIds);
this(appId, user, shuffleIds, false);
}

public ShufflePurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhases) {
super(appId, user, shuffleIds, isTwoPhases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public void removeResources(PurgeEvent event) {
new CreateShuffleDeleteHandlerRequest(
StorageType.HDFS.name(),
storage.getConf(),
purgeForExpired ? shuffleServerId : null));
purgeForExpired ? shuffleServerId : null,
event.isTwoPhasesDeletion()));

String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
Expand Down Expand Up @@ -149,7 +150,11 @@ public void removeResources(PurgeEvent event) {
storage.getStoragePath()));
}
}
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
if (!isSuccess && event.isTwoPhasesDeletion()) {
ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc();
}
removeAppStorageInfo(event);
} else {
LOG.warn("Storage gotten is null when removing resources for event: {}", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ public void removeResources(PurgeEvent event) {
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
StorageType.LOCALFILE.name(),
new Configuration(),
event.isTwoPhasesDeletion()));

List<String> deletePaths =
localStorages.stream()
Expand Down Expand Up @@ -352,7 +354,11 @@ public void removeResources(PurgeEvent event) {
})
.collect(Collectors.toList());

deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
if (!isSuccess && event.isTwoPhasesDeletion()) {
ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc();
}
removeAppStorageInfo(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
Expand Down Expand Up @@ -184,9 +187,15 @@ private ClientReadHandler getHadoopClientReadHandler(

public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
if (StorageType.HDFS.name().equals(request.getStorageType()) && request.isAsync()) {
return new HadoopShuffleAsyncDeleteHandler(
request.getConf(), request.getShuffleServerId(), AsynDeletionEventManager.getInstance());
} else if (StorageType.HDFS.name().equals(request.getStorageType()) && !request.isAsync()) {
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) {
return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())
&& !request.isAsync()) {
return new LocalFileDeleteHandler();
} else {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.storage.handler;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

public class AsynDeletionEvent {
private static final String TEMPORARYSUFFIX = "_tmp";
private String appId;
private String user;
private String shuffleServerId;
private Configuration conf;
/** Records the mapping between the path to be deleted and the path to be renamed. */
private Map<String, String> needDeletePathAndRenamePath;

private String storageType;

public AsynDeletionEvent(
String appId,
String user,
Configuration conf,
String shuffleServerId,
List<String> needDeletePath,
String storageType) {
this.appId = appId;
this.user = user;
this.shuffleServerId = shuffleServerId;
this.conf = conf;
this.needDeletePathAndRenamePath =
needDeletePath.stream()
.collect(
Collectors.toMap(Function.identity(), s -> StringUtils.join(s, TEMPORARYSUFFIX)));
this.storageType = storageType;
}

public String getAppId() {
return appId;
}

public String getUser() {
return user;
}

public Configuration getConf() {
return conf;
}

public Map<String, String> getNeedDeletePathAndRenamePath() {
return needDeletePathAndRenamePath;
}

public String[] getNeedDeleteRenamePaths() {
return needDeletePathAndRenamePath.values().stream().toArray(String[]::new);
}

public String getShuffleServerId() {
return shuffleServerId;
}

public String getStorageType() {
return storageType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
*
* @param appId ApplicationId for delete
*/
void delete(String[] storageBasePaths, String appId, String user);
boolean delete(String[] storageBasePaths, String appId, String user);
}
Loading

0 comments on commit 93be639

Please sign in to comment.