Skip to content

Commit

Permalink
fix sse infinite retry onError (#729)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Dec 6, 2024
1 parent 880d2fc commit 0419367
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,28 @@ protected void trackConnection(Channel channel) {
protected void closeConn() {
synchronized (connectionTracker) {
isClosed.set(true);
for (Channel value : this.connectionTracker) {
Channel channel = value;
log.info("Closing connection: {}. Status at close: isActive: {}, isOpen: {}, isWritable: {}",
channel.toString(), channel.isActive(), channel.isOpen(), channel.isWritable());
channel.close();
numConnectionsTracked.decrement();
}
this.connectionTracker.clear();
resetConnInternalUnsafe();
}
}

protected void resetConn() {
synchronized (connectionTracker) {
resetConnInternalUnsafe();
}
}

/**
* Not thread safe, must be called with explicit lock.
*/
private void resetConnInternalUnsafe() {
for (Channel value : this.connectionTracker) {
Channel channel = value;
log.info("Closing connection: {}. Status at close: isActive: {}, isOpen: {}, isWritable: {}",
channel.toString(), channel.isActive(), channel.isOpen(), channel.isWritable());
channel.close();
numConnectionsTracked.decrement();
}
this.connectionTracker.clear();
}

protected int connectionTrackerSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ public Integer call(Throwable t1, Integer integer) {
@Override
public Observable<?> call(Integer integer) {
if (isShutdown) {
logger.info(getName() + ": Is shutdown, stopping retries");
logger.info("{}: Is shutdown, stopping retries", getName());
return Observable.empty();
}
long delay = 2 * (integer > 10 ? 10 : integer);
logger.info(getName() + ": retrying conx after sleeping for " + delay + " secs");
logger.info("{}: retrying conx after sleeping for {} secs", getName(), delay);
return Observable.timer(delay, TimeUnit.SECONDS);
}
});
Expand Down Expand Up @@ -188,7 +188,7 @@ public synchronized void close() throws Exception {
shutdownSubject.onNext(true);
shutdownSubject.onCompleted();
isShutdown = true;
resetConnected();
closeConnected();
}

private <I, O> HttpClientBuilder<I, O> newHttpClientBuilder(String host, int port) {
Expand Down Expand Up @@ -249,18 +249,34 @@ public synchronized Observable<MantisServerSentEvent> call() {
return streamContent(response, updateDataRecvngStatus, dataRecvTimeoutSecs, delimiter);
})
.doOnError((Throwable throwable) -> {
// Only reset connection status, do not close SSE http client.
// otherwise it would cause infinite retry loop by the retryWhen below
resetConnected();
logger.warn(getName() +
"Error on getting response from SSE server: " + throwable.getMessage());
logger.warn("{}: Error on getting response from SSE server: {}",
getName(), throwable.getMessage());
connectionResetHandler.call(throwable);
})
.retryWhen(retryLogic)
.doOnCompleted(this::resetConnected);
.doOnError((Throwable throwable) -> {
closeConnected();
logger.error("{}: non-retryable error on getting response from SSE server: ",
getName(), throwable);
connectionResetHandler.call(throwable);
})
.doOnCompleted(this::closeConnected);
}

private void resetConnected() {
/***
* close SSE connection status and close the SSE http client.
*/
private void closeConnected() {
// explicitly close the connection
((MantisHttpClientImpl<?, ?>)client).closeConn();
resetConnected();
}

private void resetConnected() {
((MantisHttpClientImpl<?, ?>)client).resetConn();
if (isConnected.getAndSet(false)) {
if (updateConxStatus != null)
updateConxStatus.call(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ public void testMantisHttpClientUsage() throws Exception {
logger.info("Connection tracker size: {}", client.connectionTrackerSize());
assertEquals(1, client.connectionTrackerSize());

client.resetConn();

logger.info("Connection tracker size: {}", client.connectionTrackerSize());
assertEquals(0, client.connectionTrackerSize());

// Test can still add more channels after the client gets reset.
client.trackConnection(dummyChannel);
logger.info("Connection tracker size: {}", client.connectionTrackerSize());
assertEquals(1, client.connectionTrackerSize());

client.trackConnection(dummyChannel);
logger.info("Connection tracker size: {}", client.connectionTrackerSize());
assertEquals(2, client.connectionTrackerSize());
client.closeConn();

logger.info("Connection tracker size: {}", client.connectionTrackerSize());
Expand Down

0 comments on commit 0419367

Please sign in to comment.