Skip to content

Commit

Permalink
[#2003][FOLLOWUP] fix(client): Replace rss.rpc.client.type.grpc.timeo…
Browse files Browse the repository at this point in the history
…ut with rss.client.rpc.timeout.ms in RssShuffleManagerBase (#2004)

### What changes were proposed in this pull request?

1. Configure `rpcTimeout` in ShuffleManagerClient using `rss.client.rpc.timeout.ms`.  
2. Remove the `rss.rpc.client.type.grpc.timeout` configuration.

### Why are the changes needed?

Fix: #2003 

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT
  • Loading branch information
xumanbu authored Aug 2, 2024
1 parent c3e1f60 commit f5261d3
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
Expand Down Expand Up @@ -624,7 +623,7 @@ protected synchronized Supplier<ShuffleManagerClient> getOrCreateShuffleManagerC
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS);
long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS);
this.managerClientSupplier =
ExpiringCloseableSupplier.of(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,6 @@ public class RssBaseConf extends RssConf {
.defaultValue(ClientType.GRPC)
.withDescription("client type for rss");

public static final ConfigOption<Long> RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS =
ConfigOptions.key("rss.rpc.client.type.grpc.timeout")
.longType()
.checkValue(
ConfigUtils.POSITIVE_LONG_VALIDATOR, "The grpc timeout must be positive integer")
.defaultValue(60 * 1000L)
.withDescription("Remote shuffle service client type grpc timeout (ms)");

public static final ConfigOption<StorageType> RSS_STORAGE_TYPE =
ConfigOptions.key("rss.storage.type")
.enumType(StorageType.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public class RssClientConf {
ConfigOptions.key("rss.client.rpc.timeout.ms")
.longType()
.defaultValue(60 * 1000L)
.withDescription("Timeout in milliseconds for RPC calls.");
.withDescription(
"The timeout value in milliseconds for gRPC and Netty Type RPC Clients, including ShuffleServerClient and ShuffleManagerClient.");

public static final ConfigOption<Integer> RPC_MAX_ATTEMPTS =
ConfigOptions.key("rss.client.rpc.maxAttempts")
Expand Down
2 changes: 1 addition & 1 deletion docs/client_guide/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The important configuration of client is listed as following. These configuratio
| <client_type>.rss.estimate.server.assignment.enabled | false | Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. |
| <client_type>.rss.estimate.task.concurrency.per.server | 80 | It takes effect when rss.estimate.server.assignment.enabled=true, how many tasks are concurrently assigned to a ShuffleServer. |
| <client_type>.rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. |
| <client_type>.rss.client.rpc.timeout.ms | 60000 | Timeout in milliseconds for RPC calls. |
| <client_type>.rss.client.rpc.timeout.ms | 60000 | The timeout value in milliseconds for gRPC and Netty Type RPC Clients, including ShuffleServerClient and ShuffleManagerClient. |
| <client_type>.rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. |
| <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
| <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleManagerGrpcClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.GrpcServer;
import org.apache.uniffle.shuffle.manager.DummyRssShuffleManager;
Expand Down Expand Up @@ -63,7 +63,7 @@ public void createServerAndClient() throws Exception {
shuffleManagerServer = createShuffleManagerServer();
shuffleManagerServer.start();
int port = shuffleManagerServer.getPort();
long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS);
long rpcTimeout = rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS);
client = factory.createShuffleManagerClient(ClientType.GRPC, LOCALHOST, port, rpcTimeout);
}

Expand Down

0 comments on commit f5261d3

Please sign in to comment.