From f6e30d47ef6f11f0e8659d02b3a9a944049ee021 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 5 Jan 2023 16:12:43 +0100 Subject: [PATCH] feat: operator can be restarted (#1675) --- .../io/javaoperatorsdk/operator/Operator.java | 3 + .../processing/event/EventProcessor.java | 18 ++---- .../processing/event/EventSourceManager.java | 1 - .../source/informer/InformerManager.java | 29 +++++---- .../informer/ManagedInformerEventSource.java | 4 +- .../event/source/timer/TimerEventSource.java | 22 ++++--- .../operator/OperatorRestartIT.java | 63 +++++++++++++++++++ .../restart/ConfigMapDependentResource.java | 35 +++++++++++ .../restart/RestartTestCustomResource.java | 15 +++++ .../sample/restart/RestartTestReconciler.java | 31 +++++++++ 10 files changed, 185 insertions(+), 36 deletions(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 12437155e9..b7c2f5e108 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -132,11 +132,14 @@ public void stop() throws OperatorException { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); controllerManager.stop(); + ExecutorServiceManager.stop(); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); } + + started = false; } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 59962e39f1..a75f51f981 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -5,14 +5,12 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; @@ -40,20 +38,18 @@ public class EventProcessor

implements EventHandler, Life private final ControllerConfiguration controllerConfiguration; private final ReconciliationDispatcher

reconciliationDispatcher; private final Retry retry; - private final ExecutorService executor; private final Metrics metrics; private final Cache

cache; private final EventSourceManager

eventSourceManager; private final RateLimiter rateLimiter; private final ResourceStateManager resourceStateManager = new ResourceStateManager(); private final Map metricsMetadata; - + private ExecutorService executor; public EventProcessor(EventSourceManager

eventSourceManager) { this( eventSourceManager.getController().getConfiguration(), eventSourceManager.getControllerResourceEventSource(), - ExecutorServiceManager.instance().executorService(), new ReconciliationDispatcher<>(eventSourceManager.getController()), ConfigurationServiceProvider.instance().getMetrics(), eventSourceManager); @@ -68,7 +64,6 @@ public EventProcessor(EventSourceManager

eventSourceManager) { this( controllerConfiguration, eventSourceManager.getControllerResourceEventSource(), - null, reconciliationDispatcher, metrics, eventSourceManager); @@ -78,17 +73,11 @@ public EventProcessor(EventSourceManager

eventSourceManager) { private EventProcessor( ControllerConfiguration controllerConfiguration, Cache

cache, - ExecutorService executor, ReconciliationDispatcher

reconciliationDispatcher, Metrics metrics, EventSourceManager

eventSourceManager) { this.controllerConfiguration = controllerConfiguration; this.running = false; - this.executor = - executor == null - ? new ScheduledThreadPoolExecutor( - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) - : executor; this.reconciliationDispatcher = reconciliationDispatcher; this.retry = controllerConfiguration.getRetry(); this.cache = cache; @@ -376,6 +365,8 @@ public synchronized void stop() { @Override public void start() throws OperatorException { + // on restart new executor service is created and needs to be set here + executor = ExecutorServiceManager.instance().executorService(); this.running = true; handleAlreadyMarkedEvents(); } @@ -424,7 +415,8 @@ public void run() { @Override public String toString() { - return controllerName() + " -> " + executionScope; + return controllerName() + " -> " + + (executionScope.getResource() != null ? executionScope : resourceID); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 97ff150126..5ba58154d1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -97,7 +97,6 @@ public synchronized void stop() { eventSources.additionalNamedEventSources(), this::stopEventSource, getThreadNamer("stop")); - eventSources.clear(); } @SuppressWarnings("rawtypes") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index c6636a4a5d..50ad1d6567 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -41,24 +41,31 @@ public class InformerManager> sources = new ConcurrentHashMap<>(); private Cloner cloner; - private C configuration; - private MixedOperation, Resource> client; - private ResourceEventHandler eventHandler; + private final C configuration; + private final MixedOperation, Resource> client; + private final ResourceEventHandler eventHandler; private final Map>> indexers = new HashMap<>(); + public InformerManager(MixedOperation, Resource> client, + C configuration, + ResourceEventHandler eventHandler) { + this.client = client; + this.configuration = configuration; + this.eventHandler = eventHandler; + } + @Override public void start() throws OperatorException { + initSources(); // make sure informers are all started before proceeding further sources.values().parallelStream().forEach(InformerWrapper::start); } - void initSources(MixedOperation, Resource> client, - C configuration, ResourceEventHandler eventHandler) { + private void initSources() { + if (!sources.isEmpty()) { + throw new IllegalStateException("Some sources already initialized."); + } cloner = ConfigurationServiceProvider.instance().getResourceCloner(); - this.configuration = configuration; - this.client = client; - this.eventHandler = eventHandler; - final var targetNamespaces = configuration.getEffectiveNamespaces(); if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) { var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES); @@ -86,7 +93,6 @@ public void changeNamespaces(Set namespaces) { namespaces.forEach(ns -> { if (!sources.containsKey(ns)) { final InformerWrapper source = createEventSourceForNamespace(ns); - source.addIndexers(this.indexers); source.start(); log.debug("Registered new {} -> {} for namespace: {}", this, source, ns); @@ -106,6 +112,7 @@ private InformerWrapper createEventSourceForNamespace(String namespace) { client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()), eventHandler, namespace); } + source.addIndexers(indexers); return source; } @@ -130,6 +137,7 @@ public void stop() { log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); } }); + sources.clear(); } @Override @@ -177,7 +185,6 @@ private Optional> getSource(String namespace) { @Override public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); - sources.values().forEach(s -> s.addIndexers(indexers)); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 3e7400b90f..d4fed3b816 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource temporaryResourceCache; - protected InformerManager cache = new InformerManager<>(); + protected InformerManager cache; protected C configuration; protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); temporaryResourceCache = new TemporaryResourceCache<>(this); - manager().initSources(client, configuration, this); + cache = new InformerManager<>(client, configuration, this); this.configuration = configuration; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index f22400453a..fe641e0b0b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -4,7 +4,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,16 +19,16 @@ public class TimerEventSource implements ResourceEventAware { private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class); - private final Timer timer = new Timer(true); - private final AtomicBoolean running = new AtomicBoolean(); + private Timer timer; private final Map onceTasks = new ConcurrentHashMap<>(); + @SuppressWarnings("unused") public void scheduleOnce(R resource, long delay) { scheduleOnce(ResourceID.fromResource(resource), delay); } public void scheduleOnce(ResourceID resourceID, long delay) { - if (!running.get()) { + if (!isRunning()) { throw new IllegalStateException("The TimerEventSource is not running"); } @@ -55,14 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) { @Override public void start() { - running.set(true); + if (!isRunning()) { + super.start(); + timer = new Timer(true); + } } @Override public void stop() { - running.set(false); - onceTasks.keySet().forEach(this::cancelOnceSchedule); - timer.cancel(); + if (isRunning()) { + onceTasks.keySet().forEach(this::cancelOnceSchedule); + timer.cancel(); + super.stop(); + } } public class EventProducerTimeTask extends TimerTask { @@ -75,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) { @Override public void run() { - if (running.get()) { + if (isRunning()) { log.debug("Producing event for custom resource id: {}", customResourceUid); getEventHandler().handleEvent(new Event(customResourceUid)); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java new file mode 100644 index 0000000000..45b88a126b --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -0,0 +1,63 @@ +package io.javaoperatorsdk.operator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource; +import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class OperatorRestartIT { + private final static KubernetesClient client = new KubernetesClientBuilder().build(); + private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false)); + private final static RestartTestReconciler reconciler = new RestartTestReconciler(); + private static int reconcileNumberBeforeStop = 0; + + @BeforeAll + static void registerReconciler() { + LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client); + operator.register(reconciler); + } + + @BeforeEach + void startOperator() { + operator.start(); + } + + @AfterEach + void stopOperator() { + operator.stop(); + } + + @Test + @Order(1) + void createResource() { + client.resource(testCustomResource()).createOrReplace(); + await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0)); + reconcileNumberBeforeStop = reconciler.getNumberOfExecutions(); + } + + @Test + @Order(2) + void reconcile() { + await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()) + .isGreaterThan(reconcileNumberBeforeStop)); + } + + RestartTestCustomResource testCustomResource() { + RestartTestCustomResource cr = new RestartTestCustomResource(); + cr.setMetadata(new ObjectMetaBuilder() + .withName("test1") + .build()); + return cr; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java new file mode 100644 index 0000000000..edb4e2baff --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java @@ -0,0 +1,35 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; + +@KubernetesDependent(labelSelector = "app=restart-test") +public class ConfigMapDependentResource + extends CRUDKubernetesDependentResource { + + public static final String DATA_KEY = "key"; + + public ConfigMapDependentResource() { + super(ConfigMap.class); + } + + @Override + protected ConfigMap desired(RestartTestCustomResource primary, + Context context) { + return new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder() + .withLabels(Map.of("app", "restart-test")) + .withName(primary.getMetadata().getName()) + .withNamespace(primary.getMetadata().getNamespace()) + .build()) + .withData(Map.of(DATA_KEY, primary.getMetadata().getName())) + .build(); + + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java new file mode 100644 index 0000000000..a3bcd31053 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("rt") +public class RestartTestCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java new file mode 100644 index 0000000000..decd9b597b --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java @@ -0,0 +1,31 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@ControllerConfiguration( + dependents = @Dependent(type = ConfigMapDependentResource.class)) +public class RestartTestReconciler + implements Reconciler, TestExecutionInfoProvider { + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + RestartTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +}