Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23936 Java thin: Support broadcast partitioned #5068

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public class ClientOp {
/** Cancels the execution of JDBC statement. */
public static final int JDBC_CANCEL = 68;

/** Execute partitioned compute job. */
public static final int COMPUTE_EXECUTE_PARTITIONED = 69;

/** Reserved for extensions: min. */
@SuppressWarnings("unused")
public static final int RESERVED_EXTENSION_RANGE_START = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.ignite.client.handler.requests.compute.ClientComputeChangePriorityRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteMapReduceRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecutePartitionedRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest;
import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcCancelRequest;
Expand Down Expand Up @@ -790,6 +791,16 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
notificationSender(requestId)
);

case ClientOp.COMPUTE_EXECUTE_PARTITIONED:
return ClientComputeExecutePartitionedRequest.process(
in,
out,
compute,
igniteTables,
clusterService,
notificationSender(requestId)
);

case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
return ClientComputeExecuteMapReduceRequest.process(in, out, compute, notificationSender(requestId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public static CompletableFuture<Void> process(
deploymentUnits,
jobClassName,
options,
null,
args);
args,
null
);

sendResultAndState(jobExecutionFut, notificationSender);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.client.handler.requests.compute;

import static org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.packSubmitResult;
import static org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.sendResultAndState;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.table.IgniteTables;

/**
* Compute execute partitioned request.
*/
public class ClientComputeExecutePartitionedRequest {
/**
* Processes the request.
*
* @param in Unpacker.
* @param out Packer.
* @param compute Compute.
* @param tables Tables.
* @param cluster Cluster service
* @return Future.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteComputeInternal compute,
IgniteTables tables,
ClusterService cluster,
NotificationSender notificationSender
) {
return readTableAsync(in, tables).thenCompose(table -> {
out.packInt(table.schemaView().lastKnownSchemaVersion());

int partitionId = in.unpackInt();

// Unpack job
List<DeploymentUnit> deploymentUnits = in.unpackDeploymentUnits();
String jobClassName = in.unpackString();
JobExecutionOptions options = JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
ComputeJobDataHolder args = unpackJobArgumentWithoutMarshaller(in);

CompletableFuture<JobExecution<ComputeJobDataHolder>> jobExecutionFut = compute.submitPartitionedInternal(
table,
partitionId,
deploymentUnits,
jobClassName,
options,
args,
null
);

sendResultAndState(jobExecutionFut, notificationSender);

//noinspection DataFlowIssue
return jobExecutionFut.thenCompose(execution ->
execution.idAsync().thenAccept(jobId -> packSubmitResult(out, jobId, execution.node()))
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static CompletableFuture<Void> process(
ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in);

JobExecution<ComputeJobDataHolder> execution = compute.executeAsyncWithFailover(
candidates, deploymentUnits, jobClassName, options, null, arg
candidates, deploymentUnits, jobClassName, options, arg, null
);
// TODO https://issues.apache.org/jira/browse/IGNITE-24184
sendResultAndState(completedFuture(execution), notificationSender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public static ClientOperationType opCodeToClientOperationType(int opCode) {

case ClientOp.COMPUTE_EXECUTE:
case ClientOp.COMPUTE_EXECUTE_COLOCATED:
case ClientOp.COMPUTE_EXECUTE_PARTITIONED:
return ClientOperationType.COMPUTE_EXECUTE;

case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TableJobTarget;
import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
Expand All @@ -65,6 +66,7 @@
import org.apache.ignite.internal.compute.BroadcastJobExecutionImpl;
import org.apache.ignite.internal.compute.FailedExecution;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.CancelHandleHelper;
Expand All @@ -74,6 +76,7 @@
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -140,22 +143,43 @@ public <T, R> CompletableFuture<BroadcastExecution<R>> submitAsync(
AllNodesBroadcastJobTarget allNodesBroadcastTarget = (AllNodesBroadcastJobTarget) target;
Set<ClusterNode> nodes = allNodesBroadcastTarget.nodes();

//noinspection unchecked
CompletableFuture<SubmitResult>[] futures = nodes.stream()
.map(node -> executeOnAnyNodeAsync(Set.of(node), descriptor, arg))
.toArray(CompletableFuture[]::new);

// Wait for all the futures but don't fail resulting future, keep individual futures in executions.
return allOf(futures).handle((unused, throwable) -> new BroadcastJobExecutionImpl<>(
Arrays.stream(futures)
.map(fut -> mapSubmitResult(descriptor, cancellationToken, fut))
.collect(Collectors.toList())
));
return mapSubmitFutures(futures, descriptor, cancellationToken);
} else if (target instanceof TableJobTarget) {
TableJobTarget tableJobTarget = (TableJobTarget) target;
String tableName = tableJobTarget.tableName();
return getTable(tableName)
.thenCompose(table -> table.partitionManager().primaryReplicasAsync())
.thenCompose(replicas -> {
//noinspection unchecked
CompletableFuture<SubmitResult>[] futures = replicas.keySet().stream()
.map(partition -> doExecutePartitionedAsync(tableName, partition, descriptor, arg))
.toArray(CompletableFuture[]::new);

return mapSubmitFutures(futures, descriptor, cancellationToken);
});
}
// TODO https://issues.apache.org/jira/browse/IGNITE-23936

throw new IllegalArgumentException("Unsupported job target: " + target);
}

private <T, R> CompletableFuture<BroadcastExecution<R>> mapSubmitFutures(
CompletableFuture<SubmitResult>[] futures,
JobDescriptor<T, R> descriptor,
@Nullable CancellationToken cancellationToken
) {
// Wait for all the futures but don't fail resulting future, keep individual futures in executions.
return allOf(futures).handle((unused, throwable) -> new BroadcastJobExecutionImpl<>(
Arrays.stream(futures)
.map(fut -> mapSubmitResult(descriptor, cancellationToken, fut))
.collect(Collectors.toList())
));
}

private <T, R> JobExecution<R> mapSubmitResult(
JobDescriptor<T, R> descriptor,
@Nullable CancellationToken cancellationToken,
Expand Down Expand Up @@ -383,6 +407,47 @@ private static <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(
null);
}

private <T, R> CompletableFuture<SubmitResult> doExecutePartitionedAsync(
String tableName,
Partition partition,
JobDescriptor<T, R> descriptor,
@Nullable T arg
) {
return getTable(tableName).thenCompose(table -> executePartitioned(table, partition, descriptor, arg)
.handle((res, err) -> handleMissingTable(
tableName,
res,
err,
() -> doExecutePartitionedAsync(tableName, partition, descriptor, arg))
)
.thenCompose(Function.identity()));
}

private static <T, R> CompletableFuture<SubmitResult> executePartitioned(
ClientTable t,
Partition partition,
JobDescriptor<T, R> descriptor,
@Nullable T arg
) {
int partitionId = ((HashPartition) partition).partitionId();
return t.doSchemaOutOpAsync(
ClientOp.COMPUTE_EXECUTE_PARTITIONED,
(schema, outputChannel) -> {
ClientMessagePacker w = outputChannel.out();

w.packInt(t.tableId());

w.packInt(partitionId);

packJob(w, descriptor, arg);
},
ClientCompute::unpackSubmitResult,
PartitionAwarenessProvider.of(partitionId),
true,
null
);
}

private CompletableFuture<ClientTable> getTable(String tableName) {
// Cache tables by name to avoid extra network call on every executeColocated.
var cached = tableCache.get(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public JobExecution<ComputeJobDataHolder> executeAsyncWithFailover(
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
@Nullable CancellationToken cancellationToken,
@Nullable ComputeJobDataHolder arg) {
@Nullable ComputeJobDataHolder arg,
@Nullable CancellationToken cancellationToken) {
if (Objects.equals(jobClassName, GET_UNITS)) {
String unitString = units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
return completedExecution(unitString);
Expand Down Expand Up @@ -146,14 +146,27 @@ public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitColocatedInte
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
@Nullable CancellationToken cancellationToken,
ComputeJobDataHolder args
ComputeJobDataHolder args,
@Nullable CancellationToken cancellationToken
) {
return completedFuture(jobExecution(future != null
? future
: completedFuture(SharedComputeUtils.marshalArgOrResult(nodeName, null))));
}

@Override
public CompletableFuture<JobExecution<ComputeJobDataHolder>> submitPartitionedInternal(
TableViewInternal table,
int partitionId,
List<DeploymentUnit> units,
String jobClassName,
JobExecutionOptions options,
@Nullable ComputeJobDataHolder arg,
@Nullable CancellationToken cancellationToken
) {
return nullCompletedFuture();
}

@Override
public <T, R> CompletableFuture<JobExecution<R>> submitAsync(
JobTarget target,
Expand All @@ -169,8 +182,8 @@ public <T, R> CompletableFuture<JobExecution<R>> submitAsync(
descriptor.units(),
descriptor.jobClassName(),
descriptor.options(),
cancellationToken,
SharedComputeUtils.marshalArgOrResult(arg, null)
SharedComputeUtils.marshalArgOrResult(arg, null),
cancellationToken
);

return completedFuture(new JobExecution<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,15 +759,13 @@ private static Stream<Arguments> tupleCollections() {
void partitionedBroadcast() {
createTestTableWithOneRow();

Ignite entryNode = node(0);

Map<Partition, ClusterNode> replicas = entryNode.tables().table("test").partitionManager().primaryReplicasAsync().join();
Map<Partition, ClusterNode> replicas = node(0).tables().table("test").partitionManager().primaryReplicasAsync().join();
Map<Integer, ClusterNode> partitionIdToNode = replicas.entrySet().stream()
.collect(toMap(entry -> ((HashPartition) entry.getKey()).partitionId(), Entry::getValue));

// When run job that will return its partition id
JobDescriptor<Void, Integer> job = JobDescriptor.builder(GetPartitionJob.class).units(units()).build();
CompletableFuture<BroadcastExecution<Integer>> future = entryNode.compute()
CompletableFuture<BroadcastExecution<Integer>> future = compute()
.submitAsync(BroadcastJobTarget.table("test"), job, null);

// Then the jobs are submitted
Expand All @@ -783,7 +781,7 @@ void partitionedBroadcast() {
assertThat(executions, hasSize(partitionIdToNode.size()));
executions.forEach(execution -> {
Integer partitionId = execution.resultAsync().join(); // safe to join since resultsAsync is already complete
assertThat(execution.node(), is(partitionIdToNode.get(partitionId)));
assertThat(execution.node().name(), is(partitionIdToNode.get(partitionId).name()));
});
}

Expand Down
Loading