Skip to content

Commit

Permalink
Ensure Compliance with AIP Rules for Resource Creation and Update (#1025
Browse files Browse the repository at this point in the history
)

Motivation:
Identified violations of two AIP (API Improvement Proposals) rules:
- CREATE: If a user tries to create a resource with an ID that would result in a duplicate resource name, the service must return an ALREADY_EXISTS error.
- UPDATE: If the method call is on a resource that already exists and all fields match, the existing resource should be returned unchanged.

Modifications:
- Implemented the aforementioned AIP rules during resource creation and update.
- Added `ControlPlaneExceptionHandlerFunction``, which converts exceptions raised within Central Dogma to appropriate gRPC status codes.

Result:
- The service now correctly returns an ALREADY_EXISTS error for duplicate resource creation attempts.
- Resource updates that do not change any fields will now correctly return the existing resource without modification.
  • Loading branch information
minwoox authored Aug 23, 2024
1 parent aaa8cee commit e78a569
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void createCluster(CreateClusterRequest request, StreamObserver<Cluster>
// with the format of "groups/{group}/clusters/{cluster}".
// https://github.com/aip-dev/google.aip.dev/blob/master/aip/general/0133.md#user-specified-ids
final Cluster cluster = request.getCluster().toBuilder().setName(clusterName).build();
xdsResourceManager.push(responseObserver, group, CLUSTERS_DIRECTORY + clusterId + ".json",
"Create cluster: " + clusterName, cluster, currentAuthor());
xdsResourceManager.push(responseObserver, group, clusterName, CLUSTERS_DIRECTORY + clusterId + ".json",
"Create cluster: " + clusterName, cluster, currentAuthor(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public void createEndpoint(CreateEndpointRequest request,
.toBuilder()
.setClusterName(clusterName)
.build();
xdsResourceManager.push(responseObserver, group, fileName(endpointId),
"Create endpoint: " + clusterName, endpoint, currentAuthor());
xdsResourceManager.push(responseObserver, group, clusterName, fileName(endpointId),
"Create endpoint: " + clusterName, endpoint, currentAuthor(), true);
}

private static String clusterName(String parent, String endpointId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.google.protobuf.Empty;

import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.centraldogma.common.RepositoryExistsException;
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.metadata.MetadataService;
Expand Down Expand Up @@ -63,11 +64,12 @@ public void createGroup(CreateGroupRequest request,
createRepository(commandExecutor, mds, currentAuthor(), XDS_CENTRAL_DOGMA_PROJECT, groupId)
.handle((unused, cause) -> {
if (cause != null) {
if (cause instanceof RepositoryExistsException) {
final Throwable peeled = Exceptions.peel(cause);
if (peeled instanceof RepositoryExistsException) {
responseObserver.onError(alreadyExistsException(groupId));
} else {
responseObserver.onError(
Status.INTERNAL.withCause(cause).asRuntimeException());
Status.INTERNAL.withCause(peeled).asRuntimeException());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.xds.internal;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.centraldogma.common.RepositoryNotFoundException;
import com.linecorp.centraldogma.common.TooManyRequestsException;
import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
import com.linecorp.centraldogma.server.storage.StorageException;

import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;

final class ControlPlaneExceptionHandlerFunction implements GrpcExceptionHandlerFunction {

@Nullable
@Override
public Status apply(RequestContext ctx, Status status, Throwable cause, Metadata metadata) {
if (status.getCode() != Code.UNKNOWN) {
return status;
}

if (cause instanceof TooManyRequestsException) {
return Status.RESOURCE_EXHAUSTED.withCause(cause);
}

if (cause instanceof RequestAlreadyTimedOutException) {
return Status.DEADLINE_EXCEEDED.withCause(cause);
}

if (cause instanceof RepositoryNotFoundException) {
return Status.NOT_FOUND.withCause(cause);
}

if (cause instanceof StorageException) {
return Status.INTERNAL.withCause(cause);
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void start(PluginInitContext pluginInitContext) {
.addService(new XdsClusterService(xdsResourceManager))
.addService(new XdsEndpointService(xdsResourceManager))
.addService(new XdsKubernetesService(xdsResourceManager))
.exceptionHandler(new ControlPlaneExceptionHandlerFunction())
.jsonMarshallerFactory(
serviceDescriptor -> {
// Use JSON_MESSAGE_MARSHALLER not to parse Envoy extensions twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Message;

import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.centraldogma.common.Author;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.ChangeConflictException;
import com.linecorp.centraldogma.common.Markup;
import com.linecorp.centraldogma.common.RedundantChangeException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandExecutor;
Expand Down Expand Up @@ -134,11 +137,16 @@ public void checkGroup(String group) {
}

public <T extends Message> void push(
StreamObserver<T> responseObserver, String group, String fileName,
String summary, T resource, Author author) {
StreamObserver<T> responseObserver, String group, String resourceName, String fileName,
String summary, T resource, Author author, boolean create) {
final Change<JsonNode> change;
try {
change = Change.ofJsonUpsert(fileName, JSON_MESSAGE_MARSHALLER.writeValueAsString(resource));
final String jsonText = JSON_MESSAGE_MARSHALLER.writeValueAsString(resource);
if (create) {
change = Change.ofJsonPatch(fileName, null, jsonText);
} else {
change = Change.ofJsonUpsert(fileName, jsonText);
}
} catch (IOException e) {
// This could happen when the message has a type that isn't registered to JSON_MESSAGE_MARSHALLER.
responseObserver.onError(Status.INTERNAL.withCause(new IllegalStateException(
Expand All @@ -149,7 +157,23 @@ public <T extends Message> void push(
summary, "", Markup.PLAINTEXT, ImmutableList.of(change)))
.handle((unused, cause) -> {
if (cause != null) {
responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException());
final Throwable peeled = Exceptions.peel(cause);
if (create && peeled instanceof ChangeConflictException) {
responseObserver.onError(
Status.ALREADY_EXISTS
.withCause(peeled)
.withDescription("Resource already exists: " + resourceName)
.asRuntimeException());
return null;
}
if (!create && peeled instanceof RedundantChangeException) {
// Updating with the same resource. Return the resource as is.
responseObserver.onNext(resource);
responseObserver.onCompleted();
return null;
}

responseObserver.onError(cause);
return null;
}
responseObserver.onNext(resource);
Expand All @@ -172,7 +196,8 @@ public <T extends Message> void update(StreamObserver<T> responseObserver, Strin
String resourceName, String fileName, String summary, T resource,
Author author) {
updateOrDelete(responseObserver, group, resourceName, fileName,
() -> push(responseObserver, group, fileName, summary, resource, author));
() -> push(responseObserver, group, resourceName, fileName,
summary, resource, author, false));
}

public void delete(StreamObserver<Empty> responseObserver, String group,
Expand Down Expand Up @@ -204,7 +229,7 @@ public void updateOrDelete(StreamObserver<?> responseObserver, String group, Str
final Repository repository = xdsProject.repos().get(group);
repository.find(Revision.HEAD, fileName, FIND_ONE_WITHOUT_CONTENT).handle((entries, cause) -> {
if (cause != null) {
responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException());
responseObserver.onError(cause);
return null;
}
if (entries.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.ChangeConflictException;
import com.linecorp.centraldogma.common.Markup;
import com.linecorp.centraldogma.common.RedundantChangeException;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandExecutor;
Expand Down Expand Up @@ -181,7 +182,12 @@ private void pushK8sEndpoints(KubernetesEndpointGroup kubernetesEndpointGroup, S
"Add " + watcher.getClusterName() + " with " + endpoints.size() + " endpoints.",
"", Markup.PLAINTEXT, change)).handle((unused, cause) -> {
if (cause != null) {
logger.warn("Failed to push {} to {}", change, groupName, cause);
final Throwable peeled = Exceptions.peel(cause);
if (peeled instanceof RedundantChangeException) {
// ignore
return null;
}
logger.warn("Failed to push {} to {}", change, groupName, peeled);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest req
.build();
final Author author = currentAuthor();
validateWatcherAndPush(responseObserver, watcher, () -> xdsResourceManager.push(
responseObserver, group, K8S_WATCHERS_DIRECTORY + watcherId + ".json",
"Create watcher: " + watcherName, watcher, author));
responseObserver, group, watcherName, K8S_WATCHERS_DIRECTORY + watcherId + ".json",
"Create watcher: " + watcherName, watcher, author, true));
}

private static void validateWatcherAndPush(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public void createListener(CreateListenerRequest request, StreamObserver<Listene
// with the format of "groups/{group}/listeners/{listener}".
// https://github.com/aip-dev/google.aip.dev/blob/master/aip/general/0133.md#user-specified-ids
final Listener listener = request.getListener().toBuilder().setName(listenerName).build();
xdsResourceManager.push(responseObserver, group, LISTENERS_DIRECTORY + listenerId + ".json",
"Create listener: " + listenerName, listener, currentAuthor());
xdsResourceManager.push(responseObserver, group, listenerName,
LISTENERS_DIRECTORY + listenerId + ".json",
"Create listener: " + listenerName, listener, currentAuthor(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void createRoute(CreateRouteRequest request, StreamObserver<RouteConfigur
// with the format of "groups/{group}/routes/{route}".
// https://github.com/aip-dev/google.aip.dev/blob/master/aip/general/0133.md#user-specified-ids
final RouteConfiguration route = request.getRoute().toBuilder().setName(routeName).build();
xdsResourceManager.push(responseObserver, group, ROUTES_DIRECTORY + routeId + ".json",
"Create route: " + routeName, route, currentAuthor());
xdsResourceManager.push(responseObserver, group, routeName, ROUTES_DIRECTORY + routeId + ".json",
"Create route: " + routeName, route, currentAuthor(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.envoyproxy.envoy.service.cluster.v3.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceStub;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

class XdsClusterServiceTest {
Expand Down Expand Up @@ -82,6 +83,12 @@ void createClusterViaHttp() throws Exception {
final String clusterName = "groups/foo/clusters/foo-cluster/1";
assertThat(actualCluster).isEqualTo(cluster.toBuilder().setName(clusterName).build());
checkResourceViaDiscoveryRequest(actualCluster, clusterName, true);

// Create the same cluster again.
response = createCluster("groups/foo", "foo-cluster/1", cluster, dogma.httpClient());
assertThat(response.status()).isSameAs(HttpStatus.CONFLICT);
assertThat(response.headers().get("grpc-status"))
.isEqualTo(Integer.toString(Status.ALREADY_EXISTS.getCode().value()));
}

private static void assertOk(AggregatedHttpResponse response) {
Expand Down Expand Up @@ -154,6 +161,10 @@ void updateClusterViaHttp() throws Exception {
final Cluster actualCluster2 = clusterBuilder2.build();
assertThat(actualCluster2).isEqualTo(updatingCluster.toBuilder().setName(clusterName).build());
checkResourceViaDiscoveryRequest(actualCluster2, clusterName, true);

// Can update with the same cluster again.
response = updateCluster("groups/foo", "foo-cluster/2", updatingCluster, dogma.httpClient());
assertOk(response);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
package com.linecorp.centraldogma.xds.group.v1;

import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup;
import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroupAsync;
import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.deleteGroup;
import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -52,18 +55,26 @@ protected void configure(CentralDogmaBuilder builder) {

@Test
void createGroupViaHttp() {
AggregatedHttpResponse response = createGroup("foo", dogma.httpClient());
assertThat(response.status()).isSameAs(HttpStatus.OK);
assertThat(response.headers().get("grpc-status")).isEqualTo("0");
final CompletableFuture<AggregatedHttpResponse> future =
createGroupAsync("foo", dogma.httpClient());
final CompletableFuture<AggregatedHttpResponse> future2 =
createGroupAsync("foo", dogma.httpClient());
AggregatedHttpResponse response = future.join();
assertOk(response);
assertThatJson(response.contentUtf8()).isEqualTo("{\"name\":\"groups/foo\"}");

// Cannot create with the same name.
response = createGroup("foo", dogma.httpClient());
response = future2.join();
assertThat(response.status()).isSameAs(HttpStatus.CONFLICT);
assertThat(response.headers().get("grpc-status"))
.isEqualTo(Integer.toString(Status.ALREADY_EXISTS.getCode().value()));
}

private static void assertOk(AggregatedHttpResponse response) {
assertThat(response.status()).isSameAs(HttpStatus.OK);
assertThat(response.headers().get("grpc-status")).isEqualTo("0");
}

@Test
void deleteGroupViaHttp() {
AggregatedHttpResponse response = deleteGroup("groups/bar", dogma.httpClient());
Expand All @@ -75,8 +86,7 @@ void deleteGroupViaHttp() {
// Add permission test.

response = deleteGroup("groups/bar", dogma.httpClient());
assertThat(response.status()).isSameAs(HttpStatus.OK);
assertThat(response.headers().get("grpc-status")).isEqualTo("0");
assertOk(response);
assertThat(response.contentUtf8()).isEqualTo("{}");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -68,12 +69,17 @@ public final class XdsTestUtil {
static final String CONFIG_SOURCE_CLUSTER_NAME = "dogma/cluster";

public static AggregatedHttpResponse createGroup(String groupId, WebClient webClient) {
return createGroupAsync(groupId, webClient).join();
}

public static CompletableFuture<AggregatedHttpResponse> createGroupAsync(
String groupId, WebClient webClient) {
final RequestHeaders headers =
RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups?group_id=" + groupId)
.set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous")
.contentType(MediaType.JSON_UTF_8).build();
return webClient.execute(headers, "{\"name\":\"groups/" + groupId + "\"}")
.aggregate().join();
.aggregate();
}

public static AggregatedHttpResponse deleteGroup(String groupName, WebClient webClient) {
Expand Down

0 comments on commit e78a569

Please sign in to comment.