diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java index 400633cc3..9125bb510 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java @@ -22,11 +22,13 @@ import io.reactivx.mantis.operators.DisableBackPressureOperator; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; +import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; @@ -39,6 +41,8 @@ public final class ObservableTrigger { private static final Logger logger = LoggerFactory.getLogger(ObservableTrigger.class); + private static Scheduler timeoutScheduler = Schedulers.from(Executors.newFixedThreadPool(5)); + private ObservableTrigger() {} @@ -54,46 +58,45 @@ private static PushTrigger trigger(final String name, final Observable subscriptionActive = metrics.getGauge("subscriptionActive"); - Action1> doOnStart = new Action1>() { - @Override - public void call(final MonitoredQueue queue) { - subRef.set( - o - .filter((T t1) -> t1 != null) - .doOnSubscribe(() -> { - logger.info("Subscription is ACTIVE for observable trigger with name: " + name); - subscriptionActive.set(1); - }) - .doOnUnsubscribe(() -> { - logger.info("Subscription is INACTIVE for observable trigger with name: " + name); - subscriptionActive.set(0); - }) - .subscribe((T data) -> queue.write(data) - , - (Throwable e) -> { - logger.warn("Observable used to push data errored, on server with name: " + name, e); - if (doOnError != null) { - doOnError.call(e); - } - }, - () -> { - logger.info("Observable used to push data completed, on server with name: " + name); - if (doOnComplete != null) { - doOnComplete.call(); - } - } - ) - ); + Action1> doOnStart = queue -> { + Subscription oldSub = subRef.getAndSet( + o + .filter((T t1) -> t1 != null) + .doOnSubscribe(() -> { + logger.info("Subscription is ACTIVE for observable trigger with name: " + name); + subscriptionActive.increment(); + }) + .doOnUnsubscribe(() -> { + logger.info("Subscription is INACTIVE for observable trigger with name: " + name); + subscriptionActive.decrement(); + }) + .subscribe( + (T data) -> queue.write(data), + (Throwable e) -> { + logger.warn("Observable used to push data errored, on server with name: " + name, e); + if (doOnError != null) { + doOnError.call(e); + } + }, + () -> { + logger.info("Observable used to push data completed, on server with name: " + name); + if (doOnComplete != null) { + doOnComplete.call(); + } + } + ) + ); + if (oldSub != null) { + logger.info("A new subscription is ACTIVE. " + + "Unsubscribe from previous subscription observable trigger with name: " + name); + oldSub.unsubscribe(); } }; - Action1> doOnStop = new Action1>() { - @Override - public void call(MonitoredQueue t1) { - if (subRef.get() != null) { - logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe"); - // subRef.get().unsubscribe(); - } + Action1> doOnStop = t1 -> { + if (subRef.get() != null) { + logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe"); + // subRef.get().unsubscribe(); } }; @@ -112,46 +115,38 @@ private static PushTrigger ssetrigger(final String name, final Observable subscriptionActive = metrics.getGauge("subscriptionActive"); - Action1> doOnStart = new Action1>() { - @Override - public void call(final MonitoredQueue queue) { - subRef.set( - o - .filter((T t1) -> t1 != null) - .doOnSubscribe(() -> { - logger.info("Subscription is ACTIVE for observable trigger with name: " + name); - subscriptionActive.set(1); - }) - .doOnUnsubscribe(() -> { - logger.info("Subscription is INACTIVE for observable trigger with name: " + name); - subscriptionActive.set(0); - }) - .subscribe((T data) -> queue.write(data) - , - (Throwable e) -> { - logger.warn("Observable used to push data errored, on server with name: " + name, e); - if (doOnError != null) { - doOnError.call(e); - } - }, - () -> { - logger.info("Observable used to push data completed, on server with name: " + name); - if (doOnComplete != null) { - doOnComplete.call(); - } - } - ) - ); - } - }; - - Action1> doOnStop = new Action1>() { - @Override - public void call(MonitoredQueue t1) { - if (subRef.get() != null) { - logger.warn("Connections from next stage has dropped to 0 for SSE stage. propagate unsubscribe"); - subRef.get().unsubscribe(); - } + Action1> doOnStart = queue -> subRef.set( + o + .filter((T t1) -> t1 != null) + .doOnSubscribe(() -> { + logger.info("Subscription is ACTIVE for observable trigger with name: " + name); + subscriptionActive.increment(); + }) + .doOnUnsubscribe(() -> { + logger.info("Subscription is INACTIVE for observable trigger with name: " + name); + subscriptionActive.decrement(); + }) + .subscribe( + (T data) -> queue.write(data), + (Throwable e) -> { + logger.warn("Observable used to push data errored, on server with name: " + name, e); + if (doOnError != null) { + doOnError.call(e); + } + }, + () -> { + logger.info("Observable used to push data completed, on server with name: " + name); + if (doOnComplete != null) { + doOnComplete.call(); + } + } + ) + ); + + Action1> doOnStop = t1 -> { + if (subRef.get() != null) { + logger.warn("Connections from next stage has dropped to 0 for SSE stage. propagate unsubscribe"); + subRef.get().unsubscribe(); } }; @@ -171,69 +166,68 @@ private static PushTrigger> groupTrigger(final String subscriptionActive = metrics.getGauge("subscriptionActive"); - Action1>> doOnStart = new Action1>>() { - @Override - public void call(final MonitoredQueue> queue) { - subRef.set( - o - // decouple from calling thread - .observeOn(Schedulers.computation()) - .doOnSubscribe(() -> { - logger.info("Subscription is ACTIVE for observable trigger with name: " + name); - subscriptionActive.set(1); - }) - .doOnUnsubscribe(() -> { - logger.info("Subscription is INACTIVE for observable trigger with name: " + name); - subscriptionActive.set(0); - }) - .flatMap((final GroupedObservable group) -> { - final byte[] keyBytes = keyEncoder.call(group.getKey()); - final long keyBytesHashed = hashFunction.computeHash(keyBytes); - return - group - .timeout(groupExpirySeconds, TimeUnit.SECONDS, (Observable) Observable.empty()) - .lift(new DisableBackPressureOperator()) - .buffer(250, TimeUnit.MILLISECONDS) - .filter((List t1) -> t1 != null && !t1.isEmpty()) - .map((List list) -> { - List> keyPairList = new ArrayList<>(list.size()); - for (V data : list) { - keyPairList.add(new KeyValuePair(keyBytesHashed, keyBytes, data)); - } - return keyPairList; - } - ); - } - ) - .subscribe((List> list) -> { - for (KeyValuePair data : list) { - queue.write(data); - } - }, - (Throwable e) -> { - logger.warn("Observable used to push data errored, on server with name: " + name, e); - if (doOnError != null) { - doOnError.call(e); - } - }, - () -> { - logger.info("Observable used to push data completed, on server with name: " + name); - if (doOnComplete != null) { - doOnComplete.call(); + Action1>> doOnStart = queue -> { + Subscription oldSub = subRef.getAndSet( + o + .doOnSubscribe(() -> { + logger.info("Subscription is ACTIVE for observable trigger with name: " + name); + subscriptionActive.increment(); + }) + .doOnUnsubscribe(() -> { + logger.info("Subscription is INACTIVE for observable trigger with name: " + name); + subscriptionActive.decrement(); + }) + .flatMap((final GroupedObservable group) -> { + final byte[] keyBytes = keyEncoder.call(group.getKey()); + final long keyBytesHashed = hashFunction.computeHash(keyBytes); + return + group + .timeout(groupExpirySeconds, TimeUnit.SECONDS, Observable.empty(), timeoutScheduler) + .lift(new DisableBackPressureOperator()) + .buffer(250, TimeUnit.MILLISECONDS) + .filter((List t1) -> t1 != null && !t1.isEmpty()) + .map((List list) -> { + List> keyPairList = new ArrayList<>(list.size()); + for (V data : list) { + keyPairList.add(new KeyValuePair<>(keyBytesHashed, keyBytes, data)); } + return keyPairList; } - - )); + ); + } + ) + .subscribe( + (List> list) -> { + for (KeyValuePair data : list) { + queue.write(data); + } + }, + (Throwable e) -> { + logger.warn("Observable used to push data errored, on server with name: " + name, e); + if (doOnError != null) { + doOnError.call(e); + } + }, + () -> { + logger.info("Observable used to push data completed, on server with name: " + name); + if (doOnComplete != null) { + doOnComplete.call(); + } + } + ) + ); + if (oldSub != null) { + logger.info("A new subscription is ACTIVE. " + + "Unsubscribe from previous subscription observable trigger with name: " + name); + oldSub.unsubscribe(); } }; - Action1>> doOnStop = new Action1>>() { - @Override - public void call(MonitoredQueue> t1) { - if (subRef.get() != null) { - logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe"); - //subRef.get().unsubscribe(); - } + Action1>> doOnStop = t1 -> { + if (subRef.get() != null) { + logger.warn("Connections from next stage has dropped to 0. " + + "Do not propagate unsubscribe until a new connection is made."); + //subRef.get().unsubscribe(); } }; @@ -253,53 +247,50 @@ private static PushTrigger> mantisGroupTrigger(final S subscriptionActive = metrics.getGauge("subscriptionActive"); - Action1>> doOnStart = new Action1>>() { - @Override - public void call(final MonitoredQueue> queue) { - subRef.set( - o - .doOnSubscribe(() -> { - logger.info("Subscription is ACTIVE for observable trigger with name: " + name); - subscriptionActive.set(1); - }) - .doOnUnsubscribe(() -> { - logger.info("Subscription is INACTIVE for observable trigger with name: " + name); - subscriptionActive.set(0); - }) - .map((MantisGroup data) -> { - final byte[] keyBytes = keyEncoder.call(data.getKeyValue()); - final long keyBytesHashed = hashFunction.computeHash(keyBytes); - return (new KeyValuePair(keyBytesHashed, keyBytes, data.getValue())); - }) - - .subscribe( - (KeyValuePair data) -> { - queue.write(data); - }, - (Throwable e) -> { - logger.warn("Observable used to push data errored, on server with name: " + name, e); - if (doOnError != null) { - doOnError.call(e); - } - } - , - () -> { - logger.info("Observable used to push data completed, on server with name: " + name); - if (doOnComplete != null) { - doOnComplete.call(); - } - } - )); + Action1>> doOnStart = queue -> { + Subscription oldSub = subRef.getAndSet( + o + .doOnSubscribe(() -> { + logger.info("Subscription is ACTIVE for observable trigger with name: " + name); + subscriptionActive.increment(); + }) + .doOnUnsubscribe(() -> { + logger.info("Subscription is INACTIVE for observable trigger with name: " + name); + subscriptionActive.decrement(); + }) + .map((MantisGroup data) -> { + final byte[] keyBytes = keyEncoder.call(data.getKeyValue()); + final long keyBytesHashed = hashFunction.computeHash(keyBytes); + return (new KeyValuePair(keyBytesHashed, keyBytes, data.getValue())); + }) + .subscribe( + (KeyValuePair data) -> queue.write(data), + (Throwable e) -> { + logger.warn("Observable used to push data errored, on server with name: " + name, e); + if (doOnError != null) { + doOnError.call(e); + } + }, + () -> { + logger.info("Observable used to push data completed, on server with name: " + name); + if (doOnComplete != null) { + doOnComplete.call(); + } + } + ) + ); + if (oldSub != null) { + logger.info("A new subscription is ACTIVE. " + + "Unsubscribe from previous subscription observable trigger with name: " + name); + oldSub.unsubscribe(); } }; - Action1>> doOnStop = new Action1>>() { - @Override - public void call(MonitoredQueue> t1) { - if (subRef.get() != null) { - logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe"); - // subRef.get().unsubscribe(); - } + Action1>> doOnStop = t1 -> { + if (subRef.get() != null) { + logger.warn("Connections from next stage has dropped to 0. " + + "Do not propagate unsubscribe until a new connection is made."); + // subRef.get().unsubscribe(); } };