Skip to content

Commit

Permalink
remove webhook blocking etcd scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
ahreehong committed Jan 12, 2024
1 parent 4d25b12 commit ae3b0c8
Show file tree
Hide file tree
Showing 10 changed files with 556 additions and 15 deletions.
11 changes: 0 additions & 11 deletions api/v1beta1/etcdadmcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1beta1

import (
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -73,15 +71,6 @@ func (r *EtcdadmCluster) ValidateCreate() error {
func (r *EtcdadmCluster) ValidateUpdate(old runtime.Object) error {
etcdadmclusterlog.Info("validate update", "name", r.Name)

oldEtcdadmCluster, ok := old.(*EtcdadmCluster)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected an EtcdadmCluster object but got a %T", old))
}

if *oldEtcdadmCluster.Spec.Replicas != *r.Spec.Replicas {
return field.Invalid(field.NewPath("spec", "replicas"), r.Spec.Replicas, "field is immutable")
}

allErrs := r.validateCommon()
if len(allErrs) > 0 {
return apierrors.NewInvalid(GroupVersion.WithKind("EtcdadmCluster").GroupKind(), r.Name, allErrs)
Expand Down
156 changes: 156 additions & 0 deletions api/v1beta1/etcdadmcluster_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
)

func TestValidateCreate(t *testing.T) {
cases := map[string]struct {
in *EtcdadmCluster
expectErr string
}{
"valid etcdadm cluster": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "",
},
"no replicas field": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{},
Status: EtcdadmClusterStatus{},
},
expectErr: "spec.replicas: Required value: is required",
},
"zero replicas": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(0)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "cannot be less than or equal to 0",
},
"even replicas": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(2)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "Forbidden: etcd cluster cannot have an even number of nodes",
},
"mismatched namespace": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
InfrastructureTemplate: corev1.ObjectReference{
Namespace: "fail",
},
},
Status: EtcdadmClusterStatus{},
},
expectErr: "Invalid value: \"fail\": must match metadata.namespace",
},
}
for name, tt := range cases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
err := tt.in.ValidateCreate()
if tt.expectErr == "" {
g.Expect(err).To(BeNil())
} else {
g.Expect(err).To(MatchError(ContainSubstring(tt.expectErr)))
}
})
}
}
func TestValidateUpdate(t *testing.T) {
cases := map[string]struct {
oldConf *EtcdadmCluster
newConf *EtcdadmCluster
expectErr string
}{
"valid scale up": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(5)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "",
},
"valid scale down": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(1)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "",
},
"zero replicas": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(0)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "cannot be less than or equal to 0",
},
}
for name, tt := range cases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
err := tt.newConf.ValidateUpdate(tt.oldConf)
if tt.expectErr != "" {
g.Expect(tt.newConf.ValidateUpdate(tt.oldConf)).NotTo(Succeed())
g.Expect(err).To(MatchError(ContainSubstring(tt.expectErr)))
} else {
g.Expect(tt.newConf.ValidateUpdate(tt.oldConf)).To(Succeed())
g.Expect(err).To(BeNil())
}
})
}
}
19 changes: 17 additions & 2 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,18 @@ func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *e
// Etcd machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := ep.MachinesNeedingRollout()
numNeedRollout := len(needRollout)

ep2, err := NewEtcdPlane(ctx, r.Client, cluster, etcdCluster, etcdMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "Error initializing internal object EtcdPlane")
}
numOutOfDateMachines := len(ep2.OutOfDateMachines())

switch {
case len(needRollout) > 0:
log.Info("Etcd cluster needs a rollout", "totalMachines", numAllEtcdMachines, "needRollout", numNeedRollout)
// NOTE: we need to check that numAllEtcdMachines is not more than 2X replicas, as this will create new replicas to infinity
if numAllEtcdMachines > 2*desiredReplicas {
// NOTE: There has been issues with etcd rolling out new machines till infinity. Add an upper limit as a fail safe against this situation.
if numAllEtcdMachines > numOutOfDateMachines+desiredReplicas {
log.Info("Cluster has reached the max number of machines, won't create new machines until at least one is deleted", "totalMachines", numAllEtcdMachines)
conditions.MarkFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition, etcdv1.MaxNumberOfEtcdMachinesReachedReason, clusterv1.ConditionSeverityWarning, "Etcd cluster has %d total machines, maximum number of machines is %d", numAllEtcdMachines, 2*desiredReplicas)
return ctrl.Result{}, nil
Expand Down Expand Up @@ -364,6 +371,14 @@ func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *e
log.Info("Scaling down etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
// The last parameter corresponds to Machines that need to be rolled out, eg during upgrade, should always be empty here.
return r.scaleDownEtcdCluster(ctx, etcdCluster, cluster, ep, collections.Machines{})
// In the case that we do a scale operation on etcd clusters, remove upgradeInProgressAnnotation once scale is complete and there
// are no more out of date machines
case numCurrentMachines == desiredReplicas && numNeedRollout == 0:
_, hasUpgradeAnnotation := etcdCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
if hasUpgradeAnnotation {
log.Info("Removing update in progress annotation", "upgrading", hasUpgradeAnnotation)
delete(etcdCluster.Annotations, etcdv1.UpgradeInProgressAnnotation)
}
}

return ctrl.Result{}, nil
Expand Down
149 changes: 149 additions & 0 deletions controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"net/http"
"testing"
"time"

Expand All @@ -27,6 +28,8 @@ import (

etcdbootstrapv1 "github.com/aws/etcdadm-bootstrap-provider/api/v1beta1"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/aws/etcdadm-controller/controllers/mocks"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -458,6 +461,110 @@ func TestReconcileNeedsRollOutEtcdCluster(t *testing.T) {
g.Expect(len(machineList.Items)).To(Equal(2))
}

func TestReconcileScaleEtcdClusterUpgradeDone(t *testing.T) {
g := NewWithT(t)

cluster := newClusterWithExternalEtcd()
etcdadmCluster := newEtcdadmCluster(cluster)

// CAPI machine controller has set status.Initialized to true, after the first etcd Machine is created, and after creating the Secret containing etcd init address
etcdadmCluster.Status.Initialized = true
etcdadmCluster.Annotations = map[string]string{
etcdv1.UpgradeInProgressAnnotation: "upgrading",
}

// etcdadm controller has also registered that the status.Initialized field is true, so it has set InitializedCondition to true
conditions.MarkTrue(etcdadmCluster, etcdv1.InitializedCondition)
machine1 := newEtcdMachine(etcdadmCluster, cluster)
machine2 := newEtcdMachine(etcdadmCluster, cluster)
machine3 := newEtcdMachine(etcdadmCluster, cluster)

etcdadmCluster.Spec.Replicas = pointer.Int32(int32(5))
machine4 := newEtcdMachine(etcdadmCluster, cluster)
machine5 := newEtcdMachine(etcdadmCluster, cluster)

objects := []client.Object{
cluster,
etcdadmCluster,
infraTemplate.DeepCopy(),
machine1,
machine2,
machine3,
machine4,
machine5,
}

fakeClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(objects...).Build()

r := &EtcdadmClusterReconciler{
Client: fakeClient,
uncachedClient: fakeClient,
Log: log.Log,
}
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(etcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

machineList := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace("test"))).To(Succeed())
g.Expect(len(machineList.Items)).To(Equal(5))

updatedEtcdadmCluster := &etcdv1.EtcdadmCluster{}
g.Expect(fakeClient.Get(ctx, util.ObjectKey(etcdadmCluster), updatedEtcdadmCluster)).To(Succeed())
g.Expect(updatedEtcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]).To(BeEmpty())
}

func TestReconcileScaleDownEtcdCluster(t *testing.T) {
g := NewWithT(t)

etcdTest, fakeKubernetesClient, mockEtcd, mockHttpClient := setupEtcdScalingTest(t)
etcdEtcdClient := func(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
return mockEtcd, nil
}

r := &EtcdadmClusterReconciler{
Client: fakeKubernetesClient,
uncachedClient: fakeKubernetesClient,
Log: log.Log,
GetEtcdClient: etcdEtcdClient,
}

r.etcdHealthCheckConfig.clusterToHttpClient.Store(etcdTest.cluster.UID, mockHttpClient)
r.SetIsPortOpen(isPortOpenMock)

mockEtcd.EXPECT().MemberList(gomock.Any()).Return(etcdTest.getMemberListResponse(), nil)
mockEtcd.EXPECT().Close()

_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(etcdTest.etcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

machineList := &clusterv1.MachineList{}
g.Expect(fakeKubernetesClient.List(context.Background(), machineList, client.InNamespace("test"))).To(Succeed())
g.Expect(len(machineList.Items)).To(Equal(3))

g.Expect(fakeKubernetesClient.Get(ctx, util.ObjectKey(etcdTest.etcdadmCluster), etcdTest.etcdadmCluster)).To(Succeed())
_, upgradeInProgress := etcdTest.etcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
g.Expect(upgradeInProgress).To(BeFalse())

// update desired replicas to be 1
etcdTest.etcdadmCluster.Spec.Replicas = pointer.Int32(int32(1))
g.Expect(fakeKubernetesClient.Update(ctx, etcdTest.etcdadmCluster)).To(Succeed())

fakeKubernetesClient.Get(ctx, util.ObjectKey(etcdTest.etcdadmCluster), etcdTest.etcdadmCluster)

mockEtcd.EXPECT().MemberList(gomock.Any()).Return(etcdTest.getMemberListResponse(), nil)
mockEtcd.EXPECT().MemberRemove(gomock.Any(), gomock.Any()).Return(etcdTest.getMemberRemoveResponse(), nil)
mockEtcd.EXPECT().Close()

_, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(etcdTest.etcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

g.Expect(etcdTest.getDeletedMachines(fakeKubernetesClient)).To(HaveLen(1))

updatedEtcdadmCluster := &etcdv1.EtcdadmCluster{}
g.Expect(fakeKubernetesClient.Get(ctx, util.ObjectKey(etcdTest.etcdadmCluster), updatedEtcdadmCluster)).To(Succeed())
g.Expect(updatedEtcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]).To(BeEmpty())
}

// newClusterWithExternalEtcd return a CAPI cluster object with managed external etcd ref
func newClusterWithExternalEtcd() *clusterv1.Cluster {
return &clusterv1.Cluster{
Expand Down Expand Up @@ -542,3 +649,45 @@ func newEtcdMachine(etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cl
},
}
}

// setupEtcdScalingTest scaffolds resources, clients and mocks to test EtcdCluster scaling.
func setupEtcdScalingTest(t *testing.T) (*etcdadmClusterTest, client.WithWatch, *mocks.MockEtcdClient, *http.Client) {
controller := gomock.NewController(t)
mockEtcd := mocks.NewMockEtcdClient(controller)

etcdTest := newEtcdadmClusterTest(3)
etcdTest.buildClusterWithExternalEtcd()
etcdTest.etcdadmCluster.Status.CreationComplete = true

// CAPI machine controller has set status.Initialized to true, after the first etcd Machine is created, and after creating the Secret containing etcd init address
etcdTest.etcdadmCluster.Status.Initialized = true
etcdTest.etcdadmCluster.Annotations = map[string]string{
etcdv1.UpgradeInProgressAnnotation: "upgrading",
}
etcdTest.etcdadmCluster.Status.InitMachineAddress = etcdTest.machines[0].Status.Addresses[0].Address
etcdTest.newInitSecret()

// etcdadm controller has also registered that the status.Initialized field is true, so it has set InitializedCondition to true
conditions.MarkTrue(etcdTest.etcdadmCluster, etcdv1.InitializedCondition)

objects := []client.Object{
infraTemplate.DeepCopy(),
etcdTest.cluster,
etcdTest.etcdadmCluster,
etcdTest.initSecret,
}

for _, machine := range etcdTest.machines {
objects = append(objects, machine)
}

fakeKubernetesClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(objects...).Build()

mockHttpClient := &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
return getHealthyEtcdResponse(), nil
}),
}

return etcdTest, fakeKubernetesClient, mockEtcd, mockHttpClient
}
Loading

0 comments on commit ae3b0c8

Please sign in to comment.