Skip to content

Commit

Permalink
Fix to support partition split case for ShuffleManagerGrpcService
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Nov 6, 2024
1 parent 06c5f57 commit 7ff6446
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase {
private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers;

private Map<String, Set<ShuffleServerInfo>> excludedServerToReplacements;
/**
* partitionId -> excluded server -> replacement servers. The replacement servers for exclude
* server of specific partition.
*/
private Map<Integer, Map<String, Set<ShuffleServerInfo>>>
excludedServerForPartitionToReplacements;

public MutableShuffleHandleInfo(
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = toPartitionReplicaMapping(partitionToServers);
this(shuffleId, storageInfo, toPartitionReplicaMapping(partitionToServers));
}

@VisibleForTesting
Expand All @@ -70,14 +74,15 @@ protected MutableShuffleHandleInfo(
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers) {
super(shuffleId, storageInfo);
this.excludedServerToReplacements = new HashMap<>();
this.excludedServerForPartitionToReplacements = new HashMap<>();
this.partitionReplicaAssignedServers = partitionReplicaAssignedServers;
}

public MutableShuffleHandleInfo(int shuffleId, RemoteStorageInfo storageInfo) {
super(shuffleId, storageInfo);
}

private Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(
private static Map<Integer, Map<Integer, List<ShuffleServerInfo>>> toPartitionReplicaMapping(
Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
Map<Integer, Map<Integer, List<ShuffleServerInfo>>> partitionReplicaAssignedServers =
new HashMap<>();
Expand All @@ -102,13 +107,33 @@ public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
return excludedServerToReplacements.get(faultyServerId);
}

public Set<ShuffleServerInfo> getReplacementsForPartition(
int partitionId, String excludedServerId) {
return excludedServerForPartitionToReplacements
.getOrDefault(partitionId, Collections.emptyMap())
.getOrDefault(excludedServerId, Collections.emptySet());
}

/**
* Update the assignment for the receiving failure server of the given partition.
*
* @param partitionId the partition id
* @param receivingFailureServerId the id of the receiving failure server
* @param replacements the new assigned servers for replacing the receiving failure server
* @return the updated server list for receiving data
*/
public Set<ShuffleServerInfo> updateAssignment(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
return Collections.emptySet();
}
excludedServerToReplacements.put(receivingFailureServerId, replacements);

return updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
}

private Set<ShuffleServerInfo> updateAssignmentInternal(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
Set<ShuffleServerInfo> updatedServers = new HashSet<>();
Map<Integer, List<ShuffleServerInfo>> replicaServers =
partitionReplicaAssignedServers.get(partitionId);
Expand All @@ -131,6 +156,26 @@ public Set<ShuffleServerInfo> updateAssignment(
return updatedServers;
}

/**
* Update the assignment for the receiving failure server of the need split partition.
*
* @param partitionId the partition id
* @param receivingFailureServerId the id of the receiving failure server
* @param replacements the new assigned servers for replacing the receiving failure server
* @return the updated server list for receiving data
*/
public Set<ShuffleServerInfo> updateAssignmentOnPartitionSplit(
int partitionId, String receivingFailureServerId, Set<ShuffleServerInfo> replacements) {
if (replacements == null || StringUtils.isEmpty(receivingFailureServerId)) {
return Collections.emptySet();
}
excludedServerForPartitionToReplacements
.computeIfAbsent(partitionId, x -> new HashMap<>())
.put(receivingFailureServerId, replacements);

return updateAssignmentInternal(partitionId, receivingFailureServerId, replacements);
}

@Override
public Set<ShuffleServerInfo> getServers() {
return partitionReplicaAssignedServers.values().stream()
Expand All @@ -149,6 +194,7 @@ public Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWrit
replicaServers.entrySet()) {
ShuffleServerInfo candidate;
int candidateSize = replicaServerEntry.getValue().size();
// Use the last one for each replica writing
candidate = replicaServerEntry.getValue().get(candidateSize - 1);
assignment.computeIfAbsent(partitionId, x -> new ArrayList<>()).add(candidate);
}
Expand Down Expand Up @@ -266,4 +312,10 @@ public static MutableShuffleHandleInfo fromProto(RssProtos.MutableShuffleHandleI
handle.partitionReplicaAssignedServers = partitionToServers;
return handle;
}

public Set<String> listExcludedServersForPartition(int partitionId) {
return excludedServerForPartitionToReplacements
.getOrDefault(partitionId, Collections.emptyMap())
.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public interface ShuffleHandleInfo {

/**
* Get the assignment of available servers for writer to write partitioned blocks to corresponding
* shuffleServers. Implementations might return dynamic, up-to-date information here.
* shuffleServers. Implementations might return dynamic, up-to-date information here. Returns
* partitionId -> [replica1, replica2, ...]
*/
Map<Integer, List<ShuffleServerInfo>> getAvailablePartitionServersForWriter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit) {
long startTime = System.currentTimeMillis();
ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
MutableShuffleHandleInfo internalHandle = null;
Expand All @@ -754,8 +755,11 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
synchronized (internalHandle) {
// If the reassignment servers for one partition exceeds the max reassign server num,
// it should fast fail.
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
if (!partitionSplit) {
// Do not check the partition reassign server num for partition split case
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
}

Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new HashMap<>();
// receivingFailureServer -> partitionId -> replacementServerIds. For logging
Expand All @@ -769,27 +773,44 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
String serverId = receivingFailureServer.getServerId();

boolean serverHasReplaced = false;
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
final int requiredServerNum = 1;
Set<String> excludedServers = new HashSet<>(internalHandle.listExcludedServers());
excludedServers.add(serverId);
replacements =
reassignServerForTask(
stageId,
stageAttemptNumber,
shuffleId,
Sets.newHashSet(partitionId),
excludedServers,
requiredServerNum,
true);

Set<ShuffleServerInfo> updatedReassignServers;
if (!partitionSplit) {
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId, replacements);
} else {
serverHasReplaced = true;
Set<ShuffleServerInfo> replacements =
internalHandle.getReplacementsForPartition(partitionId, serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignmentOnPartitionSplit(
partitionId, serverId, replacements);
}

Set<ShuffleServerInfo> updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId, replacements);

if (!updatedReassignServers.isEmpty()) {
reassignResult
.computeIfAbsent(serverId, x -> new HashMap<>())
Expand Down Expand Up @@ -825,6 +846,31 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
}
}

private Set<ShuffleServerInfo> requestReassignServer(
int stageId,
int stageAttemptNumber,
int shuffleId,
MutableShuffleHandleInfo internalHandle,
int partitionId,
String serverId) {
Set<ShuffleServerInfo> replacements;
final int requiredServerNum = 1;
Set<String> excludedServers = new HashSet<>(internalHandle.listExcludedServers());
// Exclude the servers that has already been replaced for partition split case.
excludedServers.addAll(internalHandle.listExcludedServersForPartition(partitionId));
excludedServers.add(serverId);
replacements =
reassignServerForTask(
stageId,
stageAttemptNumber,
shuffleId,
Sets.newHashSet(partitionId),
excludedServers,
requiredServerNum,
true);
return replacements;
}

@Override
public void stop() {
if (managerClientSupplier != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,13 @@ public void reassignOnBlockSendFailure(
RssProtos.ReassignOnBlockSendFailureResponse reply;
try {
LOG.info(
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {} while partition split:{}",
request.getShuffleId(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getTaskAttemptId(),
request.getExecutorId());
request.getExecutorId(),
request.getPartitionSplit());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getStageId(),
Expand All @@ -281,7 +282,8 @@ public void reassignOnBlockSendFailure(
request.getFailurePartitionToServerIdsMap().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, x -> ReceivingFailureServer.fromProto(x.getValue()))));
Map.Entry::getKey, x -> ReceivingFailureServer.fromProto(x.getValue()))),
request.getPartitionSplit());
code = RssProtos.StatusCode.SUCCESS;
reply =
RssProtos.ReassignOnBlockSendFailureResponse.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,13 @@ private void reassignOnPartitionNeedSplit(FailedBlockSendTracker failedTracker)
new ReceivingFailureServer(
partitionStatus.getShuffleServerInfo().getId(), StatusCode.SUCCESS)));
if (!failurePartitionToServers.isEmpty()) {
doReassignOnBlockSendFailure(failurePartitionToServers);
doReassignOnBlockSendFailure(failurePartitionToServers, true);
}
}

private void doReassignOnBlockSendFailure(
Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers) {
Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers,
boolean partitionSplit) {
LOG.info(
"Initiate reassignOnBlockSendFailure. failure partition servers: {}",
failurePartitionToServers);
Expand All @@ -655,7 +656,8 @@ private void doReassignOnBlockSendFailure(
executorId,
taskAttemptId,
stageId,
stageAttemptNum);
stageAttemptNum,
partitionSplit);
RssReassignOnBlockSendFailureResponse response =
managerClientSupplier.get().reassignOnBlockSendFailure(request);
if (response.getStatusCode() != StatusCode.SUCCESS) {
Expand Down Expand Up @@ -700,7 +702,12 @@ private void reassignAndResendBlocks(Set<TrackingBlockStatus> blocks) {
serverBlocks.entrySet()) {
String serverId = blockStatusEntry.getKey().getId();
// avoid duplicate reassign for the same failure server.
String latestServerId = getPartitionAssignedServers(partitionId).get(0).getId();
// todo: getting the replacement should support multi replica.
List<ShuffleServerInfo> servers = getPartitionAssignedServers(partitionId);
// Gets the first replica for this partition for now.
// It can not work if we want to use multiple replicas.
ShuffleServerInfo replacement = servers.get(0);
String latestServerId = replacement.getId();
if (!serverId.equals(latestServerId)) {
continue;
}
Expand All @@ -712,13 +719,15 @@ private void reassignAndResendBlocks(Set<TrackingBlockStatus> blocks) {
}

if (!failurePartitionToServers.isEmpty()) {
doReassignOnBlockSendFailure(failurePartitionToServers);
doReassignOnBlockSendFailure(failurePartitionToServers, false);
}

for (TrackingBlockStatus blockStatus : blocks) {
ShuffleBlockInfo block = blockStatus.getShuffleBlockInfo();
// todo: getting the replacement should support multi replica.
List<ShuffleServerInfo> servers = getPartitionAssignedServers(block.getPartitionId());
// Gets the first replica for this partition for now.
// It can not work if we want to use multiple replicas.
ShuffleServerInfo replacement = servers.get(0);
if (blockStatus.getShuffleServerInfo().getId().equals(replacement.getId())) {
throw new RssException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.uniffle.proto.RssProtos;

public class RssReassignOnBlockSendFailureRequest {
private final boolean partitionSplit;
private int shuffleId;
private Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers;
private String executorId;
Expand All @@ -38,13 +39,15 @@ public RssReassignOnBlockSendFailureRequest(
String executorId,
long taskAttemptId,
int stageId,
int stageAttemptNum) {
int stageAttemptNum,
boolean partitionSplit) {
this.shuffleId = shuffleId;
this.failurePartitionToServers = failurePartitionToServers;
this.executorId = executorId;
this.taskAttemptId = taskAttemptId;
this.stageId = stageId;
this.stageAttemptNumber = stageAttemptNum;
this.partitionSplit = partitionSplit;
}

public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
Expand All @@ -60,6 +63,7 @@ public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
.setStageId(request.stageId)
.setStageAttemptNumber(request.stageAttemptNumber)
.setTaskAttemptId(request.taskAttemptId)
.setPartitionSplit(request.partitionSplit)
.build();
}
}
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ message RssReassignOnBlockSendFailureRequest {
int32 stageId = 4;
int32 stageAttemptNumber = 5;
string executorId = 6;
optional bool partitionSplit = 7;
}

message ReceivingFailureServers {
Expand Down

0 comments on commit 7ff6446

Please sign in to comment.