diff --git a/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java b/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java new file mode 100644 index 0000000000..b811b51cc0 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/util/OutputUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.util; + +import java.util.List; + +public class OutputUtils { + + /** + * Convert a list of number into a segment string. + * + * @param numbers the list of number + * @return the segment string + */ + public static String listToSegment(List numbers) { + return listToSegment(numbers, 1, Long.MAX_VALUE); + } + + /** + * Convert a list of number into a segment string. + * + * @param numbers the list of number + * @param threshold if the segment size is larger than this threshold, it will be converted into a + * segment string + * @return the segment string + */ + public static String listToSegment(List numbers, long threshold) { + return listToSegment(numbers, threshold, Long.MAX_VALUE); + } + + /** + * Convert a list of number into a segment string. + * + * @param numbers the list of number + * @param threshold if the segment size is larger than this threshold, it will be converted into a + * segment string + * @param limit the maximum number of elements in the segment + * @return the segment string + */ + public static String listToSegment(List numbers, long threshold, long limit) { + if (numbers == null || numbers.isEmpty()) { + return "[]"; + } + if (threshold < 1 || numbers.size() <= threshold) { + return numbers.toString(); + } + + StringBuilder result = new StringBuilder(); + int start = numbers.get(0); + int end = start; + + long rangeCount = 0; + for (int i = 1; i < numbers.size(); i++) { + if (numbers.get(i) == numbers.get(i - 1) + 1) { + end = numbers.get(i); + } else { + if (rangeCount < limit) { + appendRange(result, start, end); + } + rangeCount++; + start = numbers.get(i); + end = start; + } + } + rangeCount++; + if (rangeCount < limit) { + // Append the last range + appendRange(result, start, end); + } else { + result.append("...").append(rangeCount - limit).append(" more ranges..."); + } + + return result.toString(); + } + + private static void appendRange(StringBuilder result, int start, int end) { + if (result.length() > 0) { + result.append(", "); + } + if (start == end) { + result.append(start); + } else { + result.append("[").append(start).append("~").append(end).append("]"); + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java new file mode 100644 index 0000000000..5545876d46 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/util/OutputUtilsTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.util; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class OutputUtilsTest { + @Test + public void test() { + List numbers = + Arrays.asList( + 9527, 9528, 9529, 9530, 11375, 11376, 11377, 11378, 11379, 12000, 12001, 12002, 12003, + 12004); + assertEquals("[9527~9530], [11375~11379], [12000~12004]", OutputUtils.listToSegment(numbers)); + assertEquals( + "[9527~9530], [11375~11379], [12000~12004]", OutputUtils.listToSegment(numbers, 4)); + assertEquals( + "[9527, 9528, 9529, 9530, 11375, 11376, 11377, 11378," + + " 11379, 12000, 12001, 12002, 12003, 12004]", + OutputUtils.listToSegment(numbers, 20)); + // limit + assertEquals( + "[9527~9530], [11375~11379]...1 more ranges...", OutputUtils.listToSegment(numbers, 1, 2)); + assertEquals("[9527~9530]...2 more ranges...", OutputUtils.listToSegment(numbers, 1, 1)); + assertEquals("...3 more ranges...", OutputUtils.listToSegment(numbers, 1, 0)); + + // corner case + assertEquals("[9527]", OutputUtils.listToSegment(Arrays.asList(9527))); + assertEquals("[]", OutputUtils.listToSegment(Arrays.asList())); + assertEquals("[]", OutputUtils.listToSegment(null)); + } +} diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 98180d647c..6bd1f8990a 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -77,6 +77,7 @@ import org.apache.uniffle.common.exception.RssFetchFailedException; import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer; import org.apache.uniffle.common.rpc.StatusCode; +import org.apache.uniffle.common.util.OutputUtils; import org.apache.uniffle.common.util.RetryUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.proto.RssProtos; @@ -316,7 +317,7 @@ public long requirePreAllocation( "Requiring buffer for appId: {}, shuffleId: {}, partitionIds: {} with {} bytes from {}:{}", appId, shuffleId, - partitionIds, + OutputUtils.listToSegment(partitionIds, 10), requireSize, host, port); @@ -355,7 +356,7 @@ public long requirePreAllocation( "Can't require buffer for appId: {}, shuffleId: {}, partitionIds: {} with {} bytes from {}:{} due to {}, sleep and try[{}] again", appId, shuffleId, - partitionIds, + OutputUtils.listToSegment(partitionIds, 10), requireSize, host, port, diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 85fe7fa6e7..b46418bc8f 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -59,6 +59,7 @@ import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.ByteBufUtils; +import org.apache.uniffle.common.util.OutputUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest; @@ -711,6 +712,10 @@ public void requireBuffer( if (request.getPartitionIdsList() != null) { auditArgs += ", partitionIdsSize=" + request.getPartitionIdsList().size(); } + if (request.getPartitionIdsList() != null) { + auditArgs += + ", partitionIds=" + OutputUtils.listToSegment(request.getPartitionIdsList(), 1, 10); + } auditContext.withArgs(auditArgs); StatusCode status = verifyRequest(appId); if (status != StatusCode.SUCCESS) { @@ -992,8 +997,14 @@ public void getShuffleResultForMultiPart( request.getBlockIdLayout().getTaskAttemptIdBits()); auditContext.withAppId(appId).withShuffleId(shuffleId); + String partitionIdsOutput = OutputUtils.listToSegment(partitionsList, 1, 10); auditContext.withArgs( - "partitionsListSize=" + partitionsList.size() + ", blockIdLayout=" + blockIdLayout); + "partitionsListSize=" + + partitionsList.size() + + ", partitionIds=" + + partitionIdsOutput + + ", blockIdLayout=" + + blockIdLayout); StatusCode status = verifyRequest(appId); if (status != StatusCode.SUCCESS) { @@ -1012,7 +1023,7 @@ public void getShuffleResultForMultiPart( GetShuffleResultForMultiPartResponse reply; byte[] serializedBlockIds = null; String requestInfo = - "appId[" + appId + "], shuffleId[" + shuffleId + "], partitions" + partitionsList; + "appId[" + appId + "], shuffleId[" + shuffleId + "], partitions=" + partitionIdsOutput; ByteString serializedBlockIdsBytes = ByteString.EMPTY; try { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index af37646a78..b80daba205 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -71,6 +71,7 @@ import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.common.util.OutputUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.util.UnitConverter; @@ -604,7 +605,10 @@ public long requireBuffer( String errorMessage = String.format( "Huge partition is limited to writing. appId: %s, shuffleId: %s, partitionIds: %s, partitionUsedDataSize: %s", - appId, shuffleId, partitionIds, partitionUsedDataSize); + appId, + shuffleId, + OutputUtils.listToSegment(partitionIds, 10), + partitionUsedDataSize); LOG.error(errorMessage); throw new NoBufferForHugePartitionException(errorMessage); }