Skip to content

Commit

Permalink
remove webhook blocking etcd scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
ahreehong committed Dec 27, 2023
1 parent 4d25b12 commit fec236f
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 14 deletions.
11 changes: 0 additions & 11 deletions api/v1beta1/etcdadmcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1beta1

import (
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -73,15 +71,6 @@ func (r *EtcdadmCluster) ValidateCreate() error {
func (r *EtcdadmCluster) ValidateUpdate(old runtime.Object) error {
etcdadmclusterlog.Info("validate update", "name", r.Name)

oldEtcdadmCluster, ok := old.(*EtcdadmCluster)
if !ok {
return apierrors.NewBadRequest(fmt.Sprintf("expected an EtcdadmCluster object but got a %T", old))
}

if *oldEtcdadmCluster.Spec.Replicas != *r.Spec.Replicas {
return field.Invalid(field.NewPath("spec", "replicas"), r.Spec.Replicas, "field is immutable")
}

allErrs := r.validateCommon()
if len(allErrs) > 0 {
return apierrors.NewInvalid(GroupVersion.WithKind("EtcdadmCluster").GroupKind(), r.Name, allErrs)
Expand Down
155 changes: 155 additions & 0 deletions api/v1beta1/etcdadmcluster_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

import (
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
)

func TestValidateCommon(t *testing.T) {
cases := map[string]struct {
in *EtcdadmCluster
expectErr string
}{
"valid etcdadm cluster": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "",
},
"no replicas field": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{},
Status: EtcdadmClusterStatus{},
},
expectErr: "spec.replicas: Required value: is required",
},
"zero replicas": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(0)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "cannot be less than or equal to 0",
},
"even replicas": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(2)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: "Forbidden: etcd cluster cannot have an even number of nodes",
},
"mismatched namespace": {
in: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
InfrastructureTemplate: corev1.ObjectReference{
Namespace: "fail",
},
},
Status: EtcdadmClusterStatus{},
},
expectErr: "Invalid value: \"fail\": must match metadata.namespace",
},
}
for name, tt := range cases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
err := tt.in.ValidateCreate()
if tt.expectErr == "" {
g.Expect(err).To(BeNil())
g.Expect(tt.in.validateCommon()).To(Succeed())
} else {
g.Expect(err).To(MatchError(ContainSubstring(tt.expectErr)))
g.Expect(tt.in.validateCommon()).NotTo(Succeed())
}
})
}
}
func TestValidateUpdate(t *testing.T) {
cases := map[string]struct {
oldConf *EtcdadmCluster
newConf *EtcdadmCluster
expectErr bool
}{
"valid scale up": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(5)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: false,
},
"valid scale down": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(1)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: false,
},
"zero replicas": {
oldConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(3)),
},
Status: EtcdadmClusterStatus{},
},
newConf: &EtcdadmCluster{
Spec: EtcdadmClusterSpec{
Replicas: pointer.Int32(int32(0)),
},
Status: EtcdadmClusterStatus{},
},
expectErr: true,
},
}
for name, tt := range cases {
t.Run(name, func(t *testing.T) {
g := NewWithT(t)
if tt.expectErr {
g.Expect(tt.newConf.ValidateUpdate(tt.oldConf)).NotTo(Succeed())
} else {
g.Expect(tt.newConf.ValidateUpdate(tt.oldConf)).To(Succeed())
}
})
}
}
19 changes: 17 additions & 2 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,18 @@ func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *e
// Etcd machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := ep.MachinesNeedingRollout()
numNeedRollout := len(needRollout)

ep2, err := NewEtcdPlane(ctx, r.Client, cluster, etcdCluster, etcdMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "Error initializing internal object EtcdPlane")
}
numOutOfDateMachines := len(ep2.OutOfDateMachines())

switch {
case len(needRollout) > 0:
log.Info("Etcd cluster needs a rollout", "totalMachines", numAllEtcdMachines, "needRollout", numNeedRollout)
// NOTE: we need to check that numAllEtcdMachines is not more than 2X replicas, as this will create new replicas to infinity
if numAllEtcdMachines > 2*desiredReplicas {
// NOTE: There has been issues with etcd rolling out new machines till infinity. Add an upper limit as a fail safe against this situation.
if numAllEtcdMachines > numOutOfDateMachines+desiredReplicas {
log.Info("Cluster has reached the max number of machines, won't create new machines until at least one is deleted", "totalMachines", numAllEtcdMachines)
conditions.MarkFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition, etcdv1.MaxNumberOfEtcdMachinesReachedReason, clusterv1.ConditionSeverityWarning, "Etcd cluster has %d total machines, maximum number of machines is %d", numAllEtcdMachines, 2*desiredReplicas)
return ctrl.Result{}, nil
Expand Down Expand Up @@ -364,6 +371,14 @@ func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *e
log.Info("Scaling down etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
// The last parameter corresponds to Machines that need to be rolled out, eg during upgrade, should always be empty here.
return r.scaleDownEtcdCluster(ctx, etcdCluster, cluster, ep, collections.Machines{})
// In the case that we do a scale operation on etcd clusters, remove upgradeInProgressAnnotation once scale is complete and there
// are no more out of date machines
case numCurrentMachines == desiredReplicas && numNeedRollout == 0:
_, hasUpgradeAnnotation := etcdCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
if hasUpgradeAnnotation {
log.Info("Removing update in progress annotation", "upgrading", hasUpgradeAnnotation)
delete(etcdCluster.Annotations, etcdv1.UpgradeInProgressAnnotation)
}
}

return ctrl.Result{}, nil
Expand Down
126 changes: 126 additions & 0 deletions controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,132 @@ func TestReconcileNeedsRollOutEtcdCluster(t *testing.T) {
g.Expect(len(machineList.Items)).To(Equal(2))
}

func TestReconcileScaleEtcdClusterUpgradeDone(t *testing.T) {
g := NewWithT(t)

cluster := newClusterWithExternalEtcd()
etcdadmCluster := newEtcdadmCluster(cluster)

// CAPI machine controller has set status.Initialized to true, after the first etcd Machine is created, and after creating the Secret containing etcd init address
etcdadmCluster.Status.Initialized = true
etcdadmCluster.Annotations = map[string]string{
etcdv1.UpgradeInProgressAnnotation: "upgrading",
}

// etcdadm controller has also registered that the status.Initialized field is true, so it has set InitializedCondition to true
conditions.MarkTrue(etcdadmCluster, etcdv1.InitializedCondition)
machine1 := newEtcdMachine(etcdadmCluster, cluster)
machine2 := newEtcdMachine(etcdadmCluster, cluster)
machine3 := newEtcdMachine(etcdadmCluster, cluster)

etcdadmCluster.Spec.Replicas = pointer.Int32(int32(5))
machine4 := newEtcdMachine(etcdadmCluster, cluster)
machine5 := newEtcdMachine(etcdadmCluster, cluster)

objects := []client.Object{
cluster,
etcdadmCluster,
infraTemplate.DeepCopy(),
machine1,
machine2,
machine3,
machine4,
machine5,
}

fakeClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(objects...).Build()

r := &EtcdadmClusterReconciler{
Client: fakeClient,
uncachedClient: fakeClient,
Log: log.Log,
}
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(etcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

machineList := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace("test"))).To(Succeed())
g.Expect(len(machineList.Items)).To(Equal(5))

updatedEtcdadmCluster := &etcdv1.EtcdadmCluster{}
g.Expect(fakeClient.Get(ctx, util.ObjectKey(etcdadmCluster), updatedEtcdadmCluster)).To(Succeed())
_, upgradeInProgress := updatedEtcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
g.Expect(upgradeInProgress).To(BeFalse())
}

func TestReconcileScaleDownEtcdCluster(t *testing.T) {
g := NewWithT(t)

cluster := newClusterWithExternalEtcd()
etcdadmCluster := newEtcdadmCluster(cluster)

// CAPI machine controller has set status.Initialized to true, after the first etcd Machine is created, and after creating the Secret containing etcd init address
etcdadmCluster.Status.Initialized = true
etcdadmCluster.Annotations = map[string]string{
etcdv1.UpgradeInProgressAnnotation: "upgrading",
}

// etcdadm controller has also registered that the status.Initialized field is true, so it has set InitializedCondition to true
conditions.MarkTrue(etcdadmCluster, etcdv1.InitializedCondition)
machine1 := newEtcdMachine(etcdadmCluster, cluster)
machine2 := newEtcdMachine(etcdadmCluster, cluster)
machine3 := newEtcdMachine(etcdadmCluster, cluster)

objects := []client.Object{
cluster,
etcdadmCluster,
infraTemplate.DeepCopy(),
machine1,
machine2,
machine3,
}

fakeClient := fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(objects...).Build()

r := &EtcdadmClusterReconciler{
Client: fakeClient,
uncachedClient: fakeClient,
Log: log.Log,
}
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(etcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

machineList := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace("test"))).To(Succeed())
g.Expect(len(machineList.Items)).To(Equal(3))

updatedEtcdadmCluster := &etcdv1.EtcdadmCluster{}
g.Expect(fakeClient.Get(ctx, util.ObjectKey(etcdadmCluster), updatedEtcdadmCluster)).To(Succeed())
_, upgradeInProgress := updatedEtcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
g.Expect(upgradeInProgress).To(BeFalse())

updatedEtcdadmCluster.Spec.Replicas = pointer.Int32(int32(1))

objects = []client.Object{
cluster,
updatedEtcdadmCluster,
infraTemplate.DeepCopy(),
}
fakeClient = fake.NewClientBuilder().WithScheme(setupScheme()).WithObjects(objects...).Build()

r = &EtcdadmClusterReconciler{
Client: fakeClient,
uncachedClient: fakeClient,
Log: log.Log,
}
_, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(updatedEtcdadmCluster)})
g.Expect(err).NotTo(HaveOccurred())

machineList = &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), machineList, client.InNamespace("test"))).To(Succeed())
g.Expect(len(machineList.Items)).To(Equal(1))

updatedEtcdadmCluster = &etcdv1.EtcdadmCluster{}
g.Expect(fakeClient.Get(ctx, util.ObjectKey(etcdadmCluster), updatedEtcdadmCluster)).To(Succeed())
_, upgradeInProgress = updatedEtcdadmCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
g.Expect(upgradeInProgress).To(BeFalse())
}

// newClusterWithExternalEtcd return a CAPI cluster object with managed external etcd ref
func newClusterWithExternalEtcd() *clusterv1.Cluster {
return &clusterv1.Cluster{
Expand Down
9 changes: 9 additions & 0 deletions controllers/etcd_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ func (ep *EtcdPlane) MachinesNeedingRollout() collections.Machines {
)
}

// OutOfDateMachines return a list of all machines with an out of date config.
func (ep *EtcdPlane) OutOfDateMachines() collections.Machines {
// Return machines if they are scheduled for rollout or if with an outdated configuration.
return ep.Machines.AnyFilter(
//Machines that do not match with Etcdadm config.
collections.Not(MatchesEtcdadmClusterConfiguration(ep.infraResources, ep.etcdadmConfigs, ep.EC)),
)
}

// MatchesEtcdadmClusterConfiguration returns a filter to find all machines that matches with EtcdadmCluster config and do not require any rollout.
// Etcd version and extra params, and infrastructure template need to be equivalent.
func MatchesEtcdadmClusterConfiguration(infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*etcdbootstrapv1.EtcdadmConfig, ec *etcdv1.EtcdadmCluster) func(machine *clusterv1.Machine) bool {
Expand Down
Loading

0 comments on commit fec236f

Please sign in to comment.