Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#3602 from GoogleCloudPlatform/…
Browse files Browse the repository at this point in the history
…mock_managedkafka

mockgcp: add support for ManagedKafkaCluster
  • Loading branch information
google-oss-prow[bot] authored Feb 11, 2025
2 parents 69081e7 + 8dc3718 commit b428ec0
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 543 deletions.
2 changes: 2 additions & 0 deletions config/tests/samples/create/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured
case schema.GroupKind{Group: "logging.cnrm.cloud.google.com", Kind: "LoggingLogSink"}:
case schema.GroupKind{Group: "logging.cnrm.cloud.google.com", Kind: "LoggingLogView"}:

case schema.GroupKind{Group: "managedkafka.cnrm.cloud.google.com", Kind: "ManagedKafkaCluster"}:

case schema.GroupKind{Group: "monitoring.cnrm.cloud.google.com", Kind: "MonitoringAlertPolicy"}:
case schema.GroupKind{Group: "monitoring.cnrm.cloud.google.com", Kind: "MonitoringDashboard"}:
case schema.GroupKind{Group: "monitoring.cnrm.cloud.google.com", Kind: "MonitoringGroup"}:
Expand Down
2 changes: 2 additions & 0 deletions mockgcp/mock_http_roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockiam"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockkms"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mocklogging"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockmanagedkafka"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockmonitoring"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mocknetworkconnectivity"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mocknetworkservices"
Expand Down Expand Up @@ -221,6 +222,7 @@ func NewMockRoundTripper(ctx context.Context, k8sClient client.Client, storage s
services = append(services, mockvpcaccess.New(env, storage))
services = append(services, mockapigee.New(env, storage))
services = append(services, mockbigqueryreservation.New(env, storage))
services = append(services, mockmanagedkafka.New(env, storage))

for _, service := range services {
service.Register(server)
Expand Down
213 changes: 213 additions & 0 deletions mockgcp/mockmanagedkafka/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +tool:mockgcp-support
// proto.service: google.cloud.managedkafka.v1.ManagedKafka
// proto.message: google.cloud.managedkafka.v1.Cluster

package mockmanagedkafka

import (
"context"
"fmt"
"strings"
"time"

"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects"
pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/cloud/managedkafka/v1"
)

type managedKafka struct {
*MockService
pb.UnimplementedManagedKafkaServer
}

func (s *managedKafka) CreateCluster(ctx context.Context, req *pb.CreateClusterRequest) (*longrunning.Operation, error) {
reqName := fmt.Sprintf("%s/clusters/%s", req.GetParent(), req.GetClusterId())
name, err := s.parseClusterName(reqName)
if err != nil {
return nil, err
}

fqn := name.String()
now := time.Now()

obj := proto.Clone(req.GetCluster()).(*pb.Cluster)
obj.Name = fqn
obj.CreateTime = timestamppb.New(now)
obj.State = pb.Cluster_CREATING
if err := s.storage.Create(ctx, fqn, obj); err != nil {
return nil, err
}

prefix := fmt.Sprintf("projects/%s/locations/%s", name.Project.ID, name.Location)
metadata := &pb.OperationMetadata{
ApiVersion: "v1",
CreateTime: timestamppb.New(now),
RequestedCancellation: false,
Target: name.String(),
Verb: "create",
}
return s.operations.StartLRO(ctx, prefix, metadata, func() (proto.Message, error) {
obj.State = pb.Cluster_ACTIVE
obj.UpdateTime = timestamppb.New(now)
if !obj.GetSatisfiesPzi() { // hack: we don't support PZI yet. This is to match the real GCP
obj.SatisfiesPzi = proto.Bool(false)
}
if !obj.GetSatisfiesPzs() { // hack: we don't support PZS yet. This is to match the real GCP
obj.SatisfiesPzs = proto.Bool(false)
}
metadata.EndTime = timestamppb.Now()
if err := s.storage.Update(ctx, fqn, obj); err != nil {
return nil, err
}
return obj, nil
})
}

func (s *managedKafka) GetCluster(ctx context.Context, req *pb.GetClusterRequest) (*pb.Cluster, error) {
name, err := s.parseClusterName(req.Name)
if err != nil {
return nil, err
}

fqn := name.String()

obj := &pb.Cluster{}
if err := s.storage.Get(ctx, fqn, obj); err != nil {
if status.Code(err) == codes.NotFound {
return nil, status.Errorf(codes.NotFound, "Resource '%v' was not found", name)
}
return nil, err
}

return obj, nil
}

func (s *managedKafka) UpdateCluster(ctx context.Context, req *pb.UpdateClusterRequest) (*longrunning.Operation, error) {
name, err := s.parseClusterName(req.GetCluster().GetName())
if err != nil {
return nil, err
}
fqn := name.String()
obj := &pb.Cluster{}
if err := s.storage.Get(ctx, fqn, obj); err != nil {
return nil, err
}
paths := req.GetUpdateMask().GetPaths()
if len(paths) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "update_mask must be provided")
}
// updateMask=capacityConfig.memoryBytes%2CcapacityConfig.vcpuCount%2CgcpConfig.accessConfig.networkConfigs%2Cname%2CrebalanceConfig.mode
for _, path := range paths {
switch path {
case "capacityConfig.memoryBytes":
obj.CapacityConfig.MemoryBytes = req.GetCluster().GetCapacityConfig().GetMemoryBytes()
case "capacityConfig.vcpuCount":
obj.CapacityConfig.VcpuCount = req.GetCluster().GetCapacityConfig().GetVcpuCount()
case "gcpConfig.accessConfig.networkConfigs":
obj.GetGcpConfig().AccessConfig.NetworkConfigs = req.GetCluster().GetGcpConfig().GetAccessConfig().GetNetworkConfigs()
case "name":
obj.Name = req.GetCluster().GetName()
case "rebalanceConfig.mode":
obj.RebalanceConfig.Mode = req.GetCluster().GetRebalanceConfig().GetMode()
default:
return nil, status.Errorf(codes.InvalidArgument, "field %q is not yet handled in mock", path)
}
}

obj.UpdateTime = timestamppb.Now()

if err := s.storage.Update(ctx, fqn, obj); err != nil {
return nil, err
}

prefix := fmt.Sprintf("projects/%s/locations/%s", name.Project.ID, name.Location)
metadata := &pb.OperationMetadata{
ApiVersion: "v1",
CreateTime: timestamppb.Now(),
RequestedCancellation: false,
Target: name.String(),
Verb: "update",
}

return s.operations.StartLRO(ctx, prefix, metadata, func() (proto.Message, error) {
obj.State = pb.Cluster_ACTIVE
metadata.EndTime = timestamppb.Now()
if err := s.storage.Update(ctx, fqn, obj); err != nil {
return nil, err
}
return obj, nil
})
}

func (s *managedKafka) DeleteCluster(ctx context.Context, req *pb.DeleteClusterRequest) (*longrunning.Operation, error) {
name, err := s.parseClusterName(req.Name)
if err != nil {
return nil, err
}

fqn := name.String()

deleted := &pb.Cluster{}
if err := s.storage.Delete(ctx, fqn, deleted); err != nil {
return nil, err
}

prefix := fmt.Sprintf("projects/%s/locations/%s", name.Project.ID, name.Location)
metadata := &pb.OperationMetadata{
ApiVersion: "v1",
CreateTime: timestamppb.Now(),
EndTime: timestamppb.Now(),
RequestedCancellation: false,
Target: name.String(),
Verb: "delete",
}
return s.operations.DoneLRO(ctx, prefix, metadata, &emptypb.Empty{})
}

type clusterName struct {
Project *projects.ProjectData
Location string
Cluster string
}

func (n *clusterName) String() string {
return fmt.Sprintf("projects/%s/locations/%s/clusters/%s", n.Project.ID, n.Location, n.Cluster)
}

func (s *MockService) parseClusterName(name string) (*clusterName, error) {
tokens := strings.Split(name, "/")
if len(tokens) == 6 && tokens[0] == "projects" && tokens[2] == "locations" && tokens[4] == "clusters" {
project, err := s.Projects.GetProjectByID(tokens[1])
if err != nil {
return nil, err
}

return &clusterName{
Project: project,
Location: tokens[3],
Cluster: tokens[5],
}, nil
}

return nil, status.Errorf(codes.InvalidArgument, "invalid cluster name %q", name)
}
76 changes: 76 additions & 0 deletions mockgcp/mockmanagedkafka/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockmanagedkafka

import (
"context"
"net/http"

"google.golang.org/grpc"

"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/httpmux"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/operations"
pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/cloud/managedkafka/v1"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage"
)

// MockService represents a mocked managedkafka service.
type MockService struct {
*common.MockEnvironment
storage storage.Storage

operations *operations.Operations

v1 *managedKafka
}

// New creates a MockService.
func New(env *common.MockEnvironment, storage storage.Storage) *MockService {
s := &MockService{
MockEnvironment: env,
storage: storage,
operations: operations.NewOperationsService(storage),
}
s.v1 = &managedKafka{MockService: s}
return s
}

func (s *MockService) ExpectedHosts() []string {
return []string{"managedkafka.googleapis.com"}
}

func (s *MockService) Register(grpcServer *grpc.Server) {
pb.RegisterManagedKafkaServer(grpcServer, s.v1)
}

func (s *MockService) NewHTTPMux(ctx context.Context, conn *grpc.ClientConn) (http.Handler, error) {
mux, err := httpmux.NewServeMux(ctx, conn, httpmux.Options{},
pb.RegisterManagedKafkaHandler,
s.operations.RegisterOperationsPath("/v1/{prefix=**}/operations/{name}"),
)
if err != nil {
return nil, err
}

// Returns slightly non-standard errors
mux.RewriteError = func(ctx context.Context, error *httpmux.ErrorResponse) {
if error.Code == 404 {
error.Errors = nil
}
}

return mux, nil
}
1 change: 1 addition & 0 deletions mockgcp/mockserviceusage/knownservices.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var allServices = []string{
"anthos.googleapis.com",
"anthosconfigmanagement.googleapis.com",
"anthospolicycontroller.googleapis.com",
"managedkafka.googleapis.com",
"multiclusteringress.googleapis.com",
"multiclusterservicediscovery.googleapis.com",
"mesh.googleapis.com",
Expand Down
2 changes: 2 additions & 0 deletions mockgcp/mockserviceusage/serviceusagev1beta1.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (s *ServiceUsageV1Beta1) GenerateServiceIdentity(ctx context.Context, req *
case "apigee.googleapis.com":
identity.Email = "service-" + strconv.FormatInt(name.Project.Number, 10) + "@gcp-sa-apigee.iam.gserviceaccount.com"
identity.UniqueId = "123456789008"
case "managedkafka.googleapis.com":
identity.Email = "service-" + strconv.FormatInt(name.Project.Number, 10) + "@gcp-sa-managedkafka.iam.gserviceaccount.com"
default:
return nil, fmt.Errorf("generating serviceIdentity for service %q not implemented in mock", name.ServiceName)
}
Expand Down
Loading

0 comments on commit b428ec0

Please sign in to comment.