Skip to content

Commit

Permalink
[ResourceCluster] Add basic host routes to v1 API. (#197)
Browse files Browse the repository at this point in the history
* tmp restore

* Interface renaming + cleanup

* ResouceClusterHostRoute

* Fix style error

* Move larage test payload to resource

* Fix java style
  • Loading branch information
Andyz26 authored Apr 25, 2022
1 parent 066d211 commit 48db53a
Show file tree
Hide file tree
Showing 28 changed files with 851 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import io.mantisrx.control.plane.resource.cluster.proto.GetResourceClusterSpecRequest;
import io.mantisrx.control.plane.resource.cluster.proto.ListResourceClusterRequest;
import io.mantisrx.control.plane.resource.cluster.proto.ProvisionResourceClusterRequest;
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterAPIProto.DeleteResourceClusterRequest;
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterAPIProto.DeleteResourceClusterResponse;
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterAPIProto.GetResourceClusterResponse;
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterAPIProto.ListResourceClustersResponse;
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterProvisionSubmissionResponse;
import io.mantisrx.control.plane.resource.cluster.proto.ScaleResourceRequest;
import io.mantisrx.control.plane.resource.cluster.resourceprovider.IResourceClusterProvider;
import io.mantisrx.control.plane.resource.cluster.resourceprovider.IResourceClusterStorageProvider;
import io.mantisrx.control.plane.resource.cluster.resourceprovider.InMemoryOnlyResourceClusterStorageProvider;
import io.mantisrx.control.plane.resource.cluster.resourceprovider.ResourceClusterProvider;
import io.mantisrx.control.plane.resource.cluster.resourceprovider.ResourceClusterStorageProvider;
import io.mantisrx.control.plane.resource.cluster.writable.ResourceClusterSpecWritable;
import io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
Expand All @@ -47,26 +49,26 @@ public class ResourceClustersHostManagerActor extends AbstractActorWithTimers {

@VisibleForTesting
static Props props(
final IResourceClusterProvider resourceClusterProvider) {
final ResourceClusterProvider resourceClusterProvider) {
return Props.create(
ResourceClustersHostManagerActor.class,
resourceClusterProvider,
new InMemoryOnlyResourceClusterStorageProvider());
}

public static Props props(
final IResourceClusterProvider resourceClusterProvider,
final IResourceClusterStorageProvider resourceStorageProvider) {
final ResourceClusterProvider resourceClusterProvider,
final ResourceClusterStorageProvider resourceStorageProvider) {
// TODO(andyz): investigate atlas metered-mailbox.
return Props.create(ResourceClustersHostManagerActor.class, resourceClusterProvider, resourceStorageProvider);
}

private final IResourceClusterProvider resourceClusterProvider;
private final IResourceClusterStorageProvider resourceClusterStorageProvider;
private final ResourceClusterProvider resourceClusterProvider;
private final ResourceClusterStorageProvider resourceClusterStorageProvider;

public ResourceClustersHostManagerActor(
final IResourceClusterProvider resourceClusterProvider,
final IResourceClusterStorageProvider resourceStorageProvider) {
final ResourceClusterProvider resourceClusterProvider,
final ResourceClusterStorageProvider resourceStorageProvider) {
this.resourceClusterProvider = resourceClusterProvider;
this.resourceClusterStorageProvider = resourceStorageProvider;
}
Expand All @@ -79,9 +81,32 @@ public Receive createReceive() {
.match(ListResourceClusterRequest.class, this::onListResourceClusterRequest)
.match(GetResourceClusterSpecRequest.class, this::onGetResourceClusterSpecRequest)
.match(ResourceClusterProvisionSubmissionResponse.class, this::onResourceClusterProvisionResponse)
.match(DeleteResourceClusterRequest.class, this::onDeleteResourceCluster)
.build();
}

private void onDeleteResourceCluster(DeleteResourceClusterRequest req) {
/**
* Proper cluster deletion requires handling various cleanups e.g.:
* * Migrate existing jobs.
* * Un-provision cluster resources (nodes, network, storage e.g.).
* * Update internal tracking state and persistent data.
* For now this API will only serve the persistence layer update.
*/

pipe(this.resourceClusterStorageProvider.deregisterCluster(req.getClusterId())
.thenApply(clustersW ->
DeleteResourceClusterResponse.builder()
.responseCode(ResponseCode.SUCCESS)
.build())
.exceptionally(err ->
DeleteResourceClusterResponse.builder()
.message(err.getMessage())
.responseCode(ResponseCode.SERVER_ERROR).build()),
getContext().dispatcher())
.to(getSender());
}

private void onResourceClusterProvisionResponse(ResourceClusterProvisionSubmissionResponse resp) {
this.resourceClusterProvider.getResponseHandler().handleProvisionResponse(resp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,37 @@

package io.mantisrx.control.plane.resource.cluster.proto;

import io.mantisrx.control.plane.resource.cluster.resourceprovider.ResourceClusterProvider;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonCreator;
import io.mantisrx.shaded.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;
import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Singular;
import lombok.Value;

/**
* Contract class to define a Mantis resource cluster. This contract provides the abstraction to provide a generic
* definition from Mantis control perspective, and it's up to the implementations of each
* {@link io.mantisrx.control.plane.resource.cluster.resourceprovider.IResourceClusterProvider} to translate this spec
* {@link ResourceClusterProvider} to translate this spec
* to corresponding framework's cluster/node(s) definition.
*/
@Value
@Builder
public class MantisResourceClusterSpec {

@NonNull
String name;

/**
* ID fields maps to cluster name or spinnaker app name.
*/
@NonNull
String id;

@NonNull
String ownerName;

@NonNull
String ownerEmail;

@NonNull
MantisResourceClusterEnvType envType;

@Singular
Expand Down Expand Up @@ -85,14 +80,11 @@ public MantisResourceClusterSpec(
@Value
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class SkuTypeSpec {
@NonNull
@EqualsAndHashCode.Include
String skuId;

@NonNull
SkuCapacity capacity;

@NonNull
String imageId;

int cpuCoreCount;
Expand Down Expand Up @@ -127,10 +119,13 @@ public SkuTypeSpec(
}
}

/**
* This class defined the capacity required for the given skuId mapping to hosting framework nodes
* e.g. containers/virtual machines.
*/
@Builder
@Value
public static class SkuCapacity {
@NonNull
String skuId;

int minSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.mantisrx.control.plane.resource.cluster.proto;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

/**
Expand All @@ -26,7 +25,6 @@
@Builder
@Value
public class ProvisionResourceClusterRequest {
@NonNull
String clusterId;

MantisResourceClusterSpec clusterSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class ResourceClusterAPIProto {

@Value
public static final class ListResourceClustersResponse extends BaseResponse {
public static class ListResourceClustersResponse extends BaseResponse {

@Singular
List<RegisteredResourceCluster> registeredResourceClusters;
Expand All @@ -49,7 +49,7 @@ public static class RegisteredResourceCluster {
}

@Value
public static final class GetResourceClusterResponse extends BaseResponse {
public static class GetResourceClusterResponse extends BaseResponse {

MantisResourceClusterSpec clusterSpec;

Expand All @@ -63,4 +63,21 @@ public GetResourceClusterResponse(
this.clusterSpec = clusterSpec;
}
}

@Value
public static class DeleteResourceClusterResponse extends BaseResponse {
@Builder
public DeleteResourceClusterResponse(
final long requestId,
final ResponseCode responseCode,
final String message) {
super(requestId, responseCode, message);
}
}

@Builder
@Value
public static class DeleteResourceClusterRequest {
String clusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,17 @@

import io.mantisrx.shaded.com.google.common.base.Joiner;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

@Builder
@Value
public class ScaleResourceRequest {
@NonNull
String clusterId;

@NonNull
String skuId;

@NonNull
String region;

@NonNull
MantisResourceClusterEnvType envType;

int desireSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,16 @@

import io.mantisrx.master.jobcluster.proto.BaseResponse;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

@Value
public class ScaleResourceResponse extends BaseResponse {
@NonNull
String clusterId;

@NonNull
String skuId;

@NonNull
String region;

@NonNull
MantisResourceClusterEnvType envType;

int desireSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* [Test only] Store resource storage data in memroy only for testing.
*/
public class InMemoryOnlyResourceClusterStorageProvider implements IResourceClusterStorageProvider {
public class InMemoryOnlyResourceClusterStorageProvider implements ResourceClusterStorageProvider {
Map<String, ResourceClusterSpecWritable> clusters = new HashMap<>();

@Override
Expand All @@ -35,6 +35,12 @@ public CompletionStage<ResourceClusterSpecWritable> registerAndUpdateClusterSpec
return CompletableFuture.completedFuture(spec);
}

@Override
public CompletionStage<RegisteredResourceClustersWritable> deregisterCluster(String clusterId) {
this.clusters.remove(clusterId);
return getRegisteredResourceClustersWritable();
}

@Override
public CompletionStage<RegisteredResourceClustersWritable> getRegisteredResourceClustersWritable() {
RegisteredResourceClustersWritable.RegisteredResourceClustersWritableBuilder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NoopResourceClusterResponseHandler implements IResourceClusterResponseHandler {
public class NoopResourceClusterResponseHandler implements ResourceClusterResponseHandler {
@Override
public void handleProvisionResponse(ResourceClusterProvisionSubmissionResponse resp) {
log.info(resp.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* This interface provides the API to connect resource cluster management actor to actual
* implementations of different resource cluster clients e.g. k8s.
*/
public interface IResourceClusterProvider {
public interface ResourceClusterProvider {
/**
* Provision a new resource cluster using the given spec. This operation should be idempotent.
* The returned CompletionStage instance is to indicate whether the provision has been
Expand All @@ -44,5 +44,5 @@ CompletionStage<ResourceClusterProvisionSubmissionResponse> provisionClusterIfNo
*/
CompletionStage<ScaleResourceResponse> scaleResource(ScaleResourceRequest scaleRequest);

IResourceClusterResponseHandler getResponseHandler();
ResourceClusterResponseHandler getResponseHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.mantisrx.control.plane.resource.cluster.proto.ResourceClusterProvisionSubmissionResponse;

/**
* Callback handler for {@link IResouceClusterProvider} responses.
* Callback handler for {@link ResourceClusterProvider} responses.
*/
public interface IResourceClusterResponseHandler {
public interface ResourceClusterResponseHandler {
void handleProvisionResponse(ResourceClusterProvisionSubmissionResponse resp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
/**
* Interface for persisting resource cluster related data.
*/
public interface IResourceClusterStorageProvider {
public interface ResourceClusterStorageProvider {
/**
* Register and save the given cluster spec. Once the returned CompletionStage
* finishes successfully the given cluster should be available in list cluster response.
*/
CompletionStage<ResourceClusterSpecWritable> registerAndUpdateClusterSpec(ResourceClusterSpecWritable spec);

CompletionStage<RegisteredResourceClustersWritable> deregisterCluster(String clusterId);

CompletionStage<RegisteredResourceClustersWritable> getRegisteredResourceClustersWritable();

CompletionStage<ResourceClusterSpecWritable> getResourceClusterSpecWritable(String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import io.mantisrx.control.plane.resource.cluster.writable.RegisteredResourceClustersWritable;
import io.mantisrx.control.plane.resource.cluster.writable.RegisteredResourceClustersWritable.RegisteredResourceClustersWritableBuilder;
import io.mantisrx.control.plane.resource.cluster.writable.ResourceClusterSpecWritable;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectReader;
Expand All @@ -36,14 +37,15 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import lombok.extern.slf4j.Slf4j;

/**
* A simple file-based implementation for {@link IResourceClusterStorageProvider}. Not meant for production usage.
* A simple file-based implementation for {@link ResourceClusterStorageProvider}. Not meant for production usage.
*/
@Slf4j
public class SimpleFileResourceClusterStorageProvider implements IResourceClusterStorageProvider {
public class SimpleFileResourceClusterStorageProvider implements ResourceClusterStorageProvider {
public final static String SPOOL_DIR = "/tmp/MantisSpool";

private final static String CLUSTER_LIST_FILE_NAME = "mantisResourceClusterRegistrations";
Expand Down Expand Up @@ -85,6 +87,28 @@ public CompletionStage<ResourceClusterSpecWritable> registerAndUpdateClusterSpec
return fut;
}

@Override
public CompletionStage<RegisteredResourceClustersWritable> deregisterCluster(String clusterId) {
log.info("Starting deregisterCluster: {}", clusterId);
CompletionStage<RegisteredResourceClustersWritable> fut =
Source
.single(clusterId)
.mapAsync(1, clusterSpecW -> getRegisteredResourceClustersWritable().thenApplyAsync(rc -> {
RegisteredResourceClustersWritableBuilder rcBuilder = RegisteredResourceClustersWritable.builder();

rc.getClusters().entrySet().stream()
.filter(kv -> !Objects.equals(clusterId, kv.getKey()))
.forEach(kv -> rcBuilder.cluster(kv.getKey(), kv.getValue()));
return rcBuilder.build();
})
)
.mapAsync(1, rc -> updateRegisteredClusters(rc))
.mapAsync(1, notUsed -> getRegisteredResourceClustersWritable())
.runWith(Sink.last(), system);
log.info("Return future on deregisterCluster: {}", clusterId);
return fut;
}

public CompletionStage<ResourceClusterSpecWritable> updateClusterSpecImpl(ResourceClusterSpecWritable spec) {

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(
Expand Down
Loading

0 comments on commit 48db53a

Please sign in to comment.