Skip to content

Commit

Permalink
Add a buffering functionality to the SseWorkerConnection (#721)
Browse files Browse the repository at this point in the history
* Add a buffering functionality to the SsseWorkerConnection
  • Loading branch information
crioux-stripe authored Nov 1, 2024
1 parent 7f996f8 commit f9d3a5c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<Ser
}
return response.getContent()
.lift(new DropOperator<ServerSentEvent>(metricGroupId))
.rebatchRequests(this.bufferSize <= 0 ? 1 : this.bufferSize)
.flatMap((ServerSentEvent t1) -> {
lastDataReceived.set(System.currentTimeMillis());
if (isConnected.get() && isReceivingData.compareAndSet(false, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,51 @@ public void testMantisHttpClientUsage() throws Exception {
logger.info("Connection tracker size: {}", client.connectionTrackerSize());
assertEquals(0, client.connectionTrackerSize());
}

@Test
public void testStreamContentBuffersBeforeDrop() throws Exception {
int bufferSize = 20;
int totalEvents = 100;
SpectatorRegistryFactory.setRegistry(new DefaultRegistry());
String metricGroupString = "testmetric_buffer";
MetricGroupId metricGroupId = new MetricGroupId(metricGroupString);
SseWorkerConnection workerConnection = new SseWorkerConnection("connection_type",
"hostname",
80,
b -> {},
b -> {},
t -> {},
600,
false,
new CopyOnWriteArraySet<>(),
bufferSize,
null,
true,
metricGroupId);
HttpClientResponse<ServerSentEvent> response = mock(HttpClientResponse.class);
TestScheduler testScheduler = Schedulers.test();

// Events are just "0", "1", "2", ...
Observable<ServerSentEvent> contentObs = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.map(t -> new ServerSentEvent(Unpooled.copiedBuffer(Long.toString(t), Charset.defaultCharset())));

when(response.getContent()).thenReturn(contentObs);

TestSubscriber<MantisServerSentEvent> subscriber = new TestSubscriber<>(1);

workerConnection.streamContent(response, b -> {}, 600, "delimiter").subscribeOn(testScheduler).subscribe(subscriber);

testScheduler.advanceTimeBy(totalEvents, TimeUnit.SECONDS);
subscriber.assertValueCount(1);
List<MantisServerSentEvent> events = subscriber.getOnNextEvents();
assertEquals("0", events.get(0).getEventAsString());

Metrics metrics = MetricsRegistry.getInstance().getMetric(metricGroupId);
Counter onNextCounter = metrics.getCounter(DropOperator.Counters.onNext.toString());
Counter droppedCounter = metrics.getCounter(DropOperator.Counters.dropped.toString());
logger.info("next: {}", onNextCounter.value());
logger.info("drop: {}", droppedCounter.value());
assertTrue(onNextCounter.value() >= bufferSize); // Should request at least the buffer even though we requested 1.
assertTrue(droppedCounter.value() <= totalEvents - bufferSize ); // We should not drop any of the buffer.
}
}

0 comments on commit f9d3a5c

Please sign in to comment.