Skip to content

Commit

Permalink
feat: operator can be restarted (#1675)
Browse files Browse the repository at this point in the history
  • Loading branch information
csviri authored Jan 5, 2023
1 parent 2fee15d commit f6e30d4
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,14 @@ public void stop() throws OperatorException {
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();

ExecutorServiceManager.stop();
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
}

started = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
Expand Down Expand Up @@ -40,20 +38,18 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
private final ControllerConfiguration<?> controllerConfiguration;
private final ReconciliationDispatcher<P> reconciliationDispatcher;
private final Retry retry;
private final ExecutorService executor;
private final Metrics metrics;
private final Cache<P> cache;
private final EventSourceManager<P> eventSourceManager;
private final RateLimiter<? extends RateLimitState> rateLimiter;
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final Map<String, Object> metricsMetadata;

private ExecutorService executor;

public EventProcessor(EventSourceManager<P> eventSourceManager) {
this(
eventSourceManager.getController().getConfiguration(),
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager);
Expand All @@ -68,7 +64,6 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
null,
reconciliationDispatcher,
metrics,
eventSourceManager);
Expand All @@ -78,17 +73,11 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
private EventProcessor(
ControllerConfiguration controllerConfiguration,
Cache<P> cache,
ExecutorService executor,
ReconciliationDispatcher<P> reconciliationDispatcher,
Metrics metrics,
EventSourceManager<P> eventSourceManager) {
this.controllerConfiguration = controllerConfiguration;
this.running = false;
this.executor =
executor == null
? new ScheduledThreadPoolExecutor(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
: executor;
this.reconciliationDispatcher = reconciliationDispatcher;
this.retry = controllerConfiguration.getRetry();
this.cache = cache;
Expand Down Expand Up @@ -376,6 +365,8 @@ public synchronized void stop() {

@Override
public void start() throws OperatorException {
// on restart new executor service is created and needs to be set here
executor = ExecutorServiceManager.instance().executorService();
this.running = true;
handleAlreadyMarkedEvents();
}
Expand Down Expand Up @@ -424,7 +415,8 @@ public void run() {

@Override
public String toString() {
return controllerName() + " -> " + executionScope;
return controllerName() + " -> "
+ (executionScope.getResource() != null ? executionScope : resourceID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public synchronized void stop() {
eventSources.additionalNamedEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
eventSources.clear();
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat

private final Map<String, InformerWrapper<T>> sources = new ConcurrentHashMap<>();
private Cloner cloner;
private C configuration;
private MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
private ResourceEventHandler<T> eventHandler;
private final C configuration;
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
private final ResourceEventHandler<T> eventHandler;
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();

public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
C configuration,
ResourceEventHandler<T> eventHandler) {
this.client = client;
this.configuration = configuration;
this.eventHandler = eventHandler;
}

@Override
public void start() throws OperatorException {
initSources();
// make sure informers are all started before proceeding further
sources.values().parallelStream().forEach(InformerWrapper::start);
}

void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
C configuration, ResourceEventHandler<T> eventHandler) {
private void initSources() {
if (!sources.isEmpty()) {
throw new IllegalStateException("Some sources already initialized.");
}
cloner = ConfigurationServiceProvider.instance().getResourceCloner();
this.configuration = configuration;
this.client = client;
this.eventHandler = eventHandler;

final var targetNamespaces = configuration.getEffectiveNamespaces();
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES);
Expand Down Expand Up @@ -86,7 +93,6 @@ public void changeNamespaces(Set<String> namespaces) {
namespaces.forEach(ns -> {
if (!sources.containsKey(ns)) {
final InformerWrapper<T> source = createEventSourceForNamespace(ns);
source.addIndexers(this.indexers);
source.start();
log.debug("Registered new {} -> {} for namespace: {}", this, source,
ns);
Expand All @@ -106,6 +112,7 @@ private InformerWrapper<T> createEventSourceForNamespace(String namespace) {
client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()),
eventHandler, namespace);
}
source.addIndexers(indexers);
return source;
}

Expand All @@ -130,6 +137,7 @@ public void stop() {
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
}
});
sources.clear();
}

@Override
Expand Down Expand Up @@ -177,7 +185,6 @@ private Optional<InformerWrapper<T>> getSource(String namespace) {
@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
this.indexers.putAll(indexers);
sources.values().forEach(s -> s.addIndexers(indexers));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);

protected TemporaryResourceCache<R> temporaryResourceCache;
protected InformerManager<R, C> cache = new InformerManager<>();
protected InformerManager<R, C> cache;
protected C configuration;

protected ManagedInformerEventSource(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
super(configuration.getResourceClass());
temporaryResourceCache = new TemporaryResourceCache<>(this);
manager().initSources(client, configuration, this);
cache = new InformerManager<>(client, configuration, this);
this.configuration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,16 +19,16 @@ public class TimerEventSource<R extends HasMetadata>
implements ResourceEventAware<R> {
private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class);

private final Timer timer = new Timer(true);
private final AtomicBoolean running = new AtomicBoolean();
private Timer timer;
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();

@SuppressWarnings("unused")
public void scheduleOnce(R resource, long delay) {
scheduleOnce(ResourceID.fromResource(resource), delay);
}

public void scheduleOnce(ResourceID resourceID, long delay) {
if (!running.get()) {
if (!isRunning()) {
throw new IllegalStateException("The TimerEventSource is not running");
}

Expand All @@ -55,14 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) {

@Override
public void start() {
running.set(true);
if (!isRunning()) {
super.start();
timer = new Timer(true);
}
}

@Override
public void stop() {
running.set(false);
onceTasks.keySet().forEach(this::cancelOnceSchedule);
timer.cancel();
if (isRunning()) {
onceTasks.keySet().forEach(this::cancelOnceSchedule);
timer.cancel();
super.stop();
}
}

public class EventProducerTimeTask extends TimerTask {
Expand All @@ -75,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) {

@Override
public void run() {
if (running.get()) {
if (isRunning()) {
log.debug("Producing event for custom resource id: {}", customResourceUid);
getEventHandler().handleEvent(new Event(customResourceUid));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource;
import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OperatorRestartIT {
private final static KubernetesClient client = new KubernetesClientBuilder().build();
private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false));
private final static RestartTestReconciler reconciler = new RestartTestReconciler();
private static int reconcileNumberBeforeStop = 0;

@BeforeAll
static void registerReconciler() {
LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client);
operator.register(reconciler);
}

@BeforeEach
void startOperator() {
operator.start();
}

@AfterEach
void stopOperator() {
operator.stop();
}

@Test
@Order(1)
void createResource() {
client.resource(testCustomResource()).createOrReplace();
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0));
reconcileNumberBeforeStop = reconciler.getNumberOfExecutions();
}

@Test
@Order(2)
void reconcile() {
await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions())
.isGreaterThan(reconcileNumberBeforeStop));
}

RestartTestCustomResource testCustomResource() {
RestartTestCustomResource cr = new RestartTestCustomResource();
cr.setMetadata(new ObjectMetaBuilder()
.withName("test1")
.build());
return cr;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.javaoperatorsdk.operator.sample.restart;

import java.util.Map;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;

@KubernetesDependent(labelSelector = "app=restart-test")
public class ConfigMapDependentResource
extends CRUDKubernetesDependentResource<ConfigMap, RestartTestCustomResource> {

public static final String DATA_KEY = "key";

public ConfigMapDependentResource() {
super(ConfigMap.class);
}

@Override
protected ConfigMap desired(RestartTestCustomResource primary,
Context<RestartTestCustomResource> context) {
return new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withLabels(Map.of("app", "restart-test"))
.withName(primary.getMetadata().getName())
.withNamespace(primary.getMetadata().getNamespace())
.build())
.withData(Map.of(DATA_KEY, primary.getMetadata().getName()))
.build();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.restart;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("rt")
public class RestartTestCustomResource
extends CustomResource<Void, Void>
implements Namespaced {
}
Loading

0 comments on commit f6e30d4

Please sign in to comment.