diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java index 8cd27cd304..29ef80b114 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterService.java @@ -68,8 +68,8 @@ public void createCluster(CreateClusterRequest request, StreamObserver // with the format of "groups/{group}/clusters/{cluster}". // https://github.com/aip-dev/google.aip.dev/blob/master/aip/general/0133.md#user-specified-ids final Cluster cluster = request.getCluster().toBuilder().setName(clusterName).build(); - xdsResourceManager.push(responseObserver, group, CLUSTERS_DIRECTORY + clusterId + ".json", - "Create cluster: " + clusterName, cluster, currentAuthor()); + xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".json", + "Create cluster: " + clusterName, cluster, currentAuthor(), true); } @Override diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java index b95177bfd1..5c202ffb92 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointService.java @@ -73,8 +73,8 @@ public void createEndpoint(CreateEndpointRequest request, .toBuilder() .setClusterName(clusterName) .build(); - xdsResourceManager.push(responseObserver, group, fileName(endpointId), - "Create endpoint: " + clusterName, endpoint, currentAuthor()); + xdsResourceManager.push(responseObserver, group, clusterName, fileName(endpointId), + "Create endpoint: " + clusterName, endpoint, currentAuthor(), true); } private static String clusterName(String parent, String endpointId) { diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupService.java index 42e25c1b1b..49191bc808 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupService.java @@ -24,6 +24,7 @@ import com.google.protobuf.Empty; +import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.centraldogma.common.RepositoryExistsException; import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.metadata.MetadataService; @@ -63,11 +64,12 @@ public void createGroup(CreateGroupRequest request, createRepository(commandExecutor, mds, currentAuthor(), XDS_CENTRAL_DOGMA_PROJECT, groupId) .handle((unused, cause) -> { if (cause != null) { - if (cause instanceof RepositoryExistsException) { + final Throwable peeled = Exceptions.peel(cause); + if (peeled instanceof RepositoryExistsException) { responseObserver.onError(alreadyExistsException(groupId)); } else { responseObserver.onError( - Status.INTERNAL.withCause(cause).asRuntimeException()); + Status.INTERNAL.withCause(peeled).asRuntimeException()); } return null; } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneExceptionHandlerFunction.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneExceptionHandlerFunction.java new file mode 100644 index 0000000000..9ac2c0e173 --- /dev/null +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneExceptionHandlerFunction.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.xds.internal; + +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; +import com.linecorp.centraldogma.common.RepositoryNotFoundException; +import com.linecorp.centraldogma.common.TooManyRequestsException; +import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException; +import com.linecorp.centraldogma.server.storage.StorageException; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.Status.Code; + +final class ControlPlaneExceptionHandlerFunction implements GrpcExceptionHandlerFunction { + + @Nullable + @Override + public Status apply(RequestContext ctx, Status status, Throwable cause, Metadata metadata) { + if (status.getCode() != Code.UNKNOWN) { + return status; + } + + if (cause instanceof TooManyRequestsException) { + return Status.RESOURCE_EXHAUSTED.withCause(cause); + } + + if (cause instanceof RequestAlreadyTimedOutException) { + return Status.DEADLINE_EXCEEDED.withCause(cause); + } + + if (cause instanceof RepositoryNotFoundException) { + return Status.NOT_FOUND.withCause(cause); + } + + if (cause instanceof StorageException) { + return Status.INTERNAL.withCause(cause); + } + + return null; + } +} diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java index 23fa71c4d6..bec27f8468 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java @@ -117,6 +117,7 @@ void start(PluginInitContext pluginInitContext) { .addService(new XdsClusterService(xdsResourceManager)) .addService(new XdsEndpointService(xdsResourceManager)) .addService(new XdsKubernetesService(xdsResourceManager)) + .exceptionHandler(new ControlPlaneExceptionHandlerFunction()) .jsonMarshallerFactory( serviceDescriptor -> { // Use JSON_MESSAGE_MARSHALLER not to parse Envoy extensions twice. diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java index dfbad120ae..64bf98f64d 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java @@ -35,9 +35,12 @@ import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.Message; +import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.centraldogma.common.Author; import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.Markup; +import com.linecorp.centraldogma.common.RedundantChangeException; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommandExecutor; @@ -134,11 +137,16 @@ public void checkGroup(String group) { } public void push( - StreamObserver responseObserver, String group, String fileName, - String summary, T resource, Author author) { + StreamObserver responseObserver, String group, String resourceName, String fileName, + String summary, T resource, Author author, boolean create) { final Change change; try { - change = Change.ofJsonUpsert(fileName, JSON_MESSAGE_MARSHALLER.writeValueAsString(resource)); + final String jsonText = JSON_MESSAGE_MARSHALLER.writeValueAsString(resource); + if (create) { + change = Change.ofJsonPatch(fileName, null, jsonText); + } else { + change = Change.ofJsonUpsert(fileName, jsonText); + } } catch (IOException e) { // This could happen when the message has a type that isn't registered to JSON_MESSAGE_MARSHALLER. responseObserver.onError(Status.INTERNAL.withCause(new IllegalStateException( @@ -149,7 +157,23 @@ public void push( summary, "", Markup.PLAINTEXT, ImmutableList.of(change))) .handle((unused, cause) -> { if (cause != null) { - responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException()); + final Throwable peeled = Exceptions.peel(cause); + if (create && peeled instanceof ChangeConflictException) { + responseObserver.onError( + Status.ALREADY_EXISTS + .withCause(peeled) + .withDescription("Resource already exists: " + resourceName) + .asRuntimeException()); + return null; + } + if (!create && peeled instanceof RedundantChangeException) { + // Updating with the same resource. Return the resource as is. + responseObserver.onNext(resource); + responseObserver.onCompleted(); + return null; + } + + responseObserver.onError(cause); return null; } responseObserver.onNext(resource); @@ -172,7 +196,8 @@ public void update(StreamObserver responseObserver, Strin String resourceName, String fileName, String summary, T resource, Author author) { updateOrDelete(responseObserver, group, resourceName, fileName, - () -> push(responseObserver, group, fileName, summary, resource, author)); + () -> push(responseObserver, group, resourceName, fileName, + summary, resource, author, false)); } public void delete(StreamObserver responseObserver, String group, @@ -204,7 +229,7 @@ public void updateOrDelete(StreamObserver responseObserver, String group, Str final Repository repository = xdsProject.repos().get(group); repository.find(Revision.HEAD, fileName, FIND_ONE_WITHOUT_CONTENT).handle((entries, cause) -> { if (cause != null) { - responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException()); + responseObserver.onError(cause); return null; } if (entries.isEmpty()) { diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java index b80267aadf..7f1efc4426 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java @@ -43,6 +43,7 @@ import com.linecorp.centraldogma.common.Change; import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.Markup; +import com.linecorp.centraldogma.common.RedundantChangeException; import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommandExecutor; @@ -181,7 +182,12 @@ private void pushK8sEndpoints(KubernetesEndpointGroup kubernetesEndpointGroup, S "Add " + watcher.getClusterName() + " with " + endpoints.size() + " endpoints.", "", Markup.PLAINTEXT, change)).handle((unused, cause) -> { if (cause != null) { - logger.warn("Failed to push {} to {}", change, groupName, cause); + final Throwable peeled = Exceptions.peel(cause); + if (peeled instanceof RedundantChangeException) { + // ignore + return null; + } + logger.warn("Failed to push {} to {}", change, groupName, peeled); } return null; }); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java index aa2d6cc238..07ed87e980 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java @@ -92,8 +92,8 @@ public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest req .build(); final Author author = currentAuthor(); validateWatcherAndPush(responseObserver, watcher, () -> xdsResourceManager.push( - responseObserver, group, K8S_WATCHERS_DIRECTORY + watcherId + ".json", - "Create watcher: " + watcherName, watcher, author)); + responseObserver, group, watcherName, K8S_WATCHERS_DIRECTORY + watcherId + ".json", + "Create watcher: " + watcherName, watcher, author, true)); } private static void validateWatcherAndPush( diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java index 77e9ebd612..1d1c51f442 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerService.java @@ -68,8 +68,9 @@ public void createListener(CreateListenerRequest request, StreamObserver future = + createGroupAsync("foo", dogma.httpClient()); + final CompletableFuture future2 = + createGroupAsync("foo", dogma.httpClient()); + AggregatedHttpResponse response = future.join(); + assertOk(response); assertThatJson(response.contentUtf8()).isEqualTo("{\"name\":\"groups/foo\"}"); // Cannot create with the same name. - response = createGroup("foo", dogma.httpClient()); + response = future2.join(); assertThat(response.status()).isSameAs(HttpStatus.CONFLICT); assertThat(response.headers().get("grpc-status")) .isEqualTo(Integer.toString(Status.ALREADY_EXISTS.getCode().value())); } + private static void assertOk(AggregatedHttpResponse response) { + assertThat(response.status()).isSameAs(HttpStatus.OK); + assertThat(response.headers().get("grpc-status")).isEqualTo("0"); + } + @Test void deleteGroupViaHttp() { AggregatedHttpResponse response = deleteGroup("groups/bar", dogma.httpClient()); @@ -75,8 +86,7 @@ void deleteGroupViaHttp() { // Add permission test. response = deleteGroup("groups/bar", dogma.httpClient()); - assertThat(response.status()).isSameAs(HttpStatus.OK); - assertThat(response.headers().get("grpc-status")).isEqualTo("0"); + assertOk(response); assertThat(response.contentUtf8()).isEqualTo("{}"); } diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java index 9a35d09df1..90c15ea71c 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URI; +import java.util.concurrent.CompletableFuture; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -68,12 +69,17 @@ public final class XdsTestUtil { static final String CONFIG_SOURCE_CLUSTER_NAME = "dogma/cluster"; public static AggregatedHttpResponse createGroup(String groupId, WebClient webClient) { + return createGroupAsync(groupId, webClient).join(); + } + + public static CompletableFuture createGroupAsync( + String groupId, WebClient webClient) { final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups?group_id=" + groupId) .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") .contentType(MediaType.JSON_UTF_8).build(); return webClient.execute(headers, "{\"name\":\"groups/" + groupId + "\"}") - .aggregate().join(); + .aggregate(); } public static AggregatedHttpResponse deleteGroup(String groupName, WebClient webClient) {