Skip to content

Commit

Permalink
Fix skipping events on resume handshake (#9336)
Browse files Browse the repository at this point in the history
* Fix skipping events on resume handshake

* Update test/Tester/StreamingTests/StreamingResumeTests.cs

* Update test/Tester/StreamingTests/StreamingResumeTests.cs

Co-authored-by: Günther Foidl <[email protected]>

* fix test

---------

Co-authored-by: Reuben Bond <[email protected]>
Co-authored-by: Günther Foidl <[email protected]>
Co-authored-by: Reuben Bond <[email protected]>
  • Loading branch information
4 people authored Feb 10, 2025
1 parent b9827a4 commit d2af146
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 3 deletions.
15 changes: 15 additions & 0 deletions src/Orleans.Streaming/Internal/StreamSubscriptionHandleImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ public async Task<StreamHandshakeToken> DeliverBatch(IBatchContainer batch, Stre
{
if (!this.expectedToken.Equals(handshakeToken))
return this.expectedToken;

// Check if this even already has been delivered
if (IsRewindable)
{
var currentToken = StreamHandshakeToken.CreateDeliveyToken(batch.SequenceToken);
if (this.expectedToken.Equals(currentToken))
return this.expectedToken;
}
}

if (batch is IBatchContainerBatch)
Expand Down Expand Up @@ -142,6 +150,13 @@ public async Task<StreamHandshakeToken> DeliverItem(object item, StreamSequenceT
{
if (!this.expectedToken.Equals(handshakeToken))
return this.expectedToken;

// Check if this even already has been delivered
if (IsRewindable)
{
if (this.expectedToken.Equals(currentToken))
return this.expectedToken;
}
}

T typedItem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,7 @@ private async Task<bool> DoHandshakeWithConsumer(
if (requestedHandshakeToken != null)
{
consumerData.SafeDisposeCursor(logger);
// The handshake token points to an already processed event, we need to advance the cursor to
// the next event.
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, requestedHandshakeToken.Token);
consumerData.Cursor.MoveNext(); //
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public void Configure(ISiloBuilder hostBuilder)
{
options.StreamInactivityPeriod = StreamInactivityPeriod;
}));
b.ConfigureCacheEviction(ob => ob.Configure(options =>
{
options.MetadataMinTimeInCache = MetadataMinTimeInCache;
options.DataMaxAgeInCache = DataMaxAgeInCache;
options.DataMinTimeInCache = DataMinTimeInCache;
}));
b.ConfigureEventHub(ob => ob.Configure(options =>
{
options.ConfigureTestDefaults(EHPath, EHConsumerGroup);
Expand Down
6 changes: 6 additions & 0 deletions test/Tester/StreamingTests/MemoryStreamResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public void Configure(ISiloBuilder hostBuilder)
{
options.StreamInactivityPeriod = StreamInactivityPeriod;
}));
b.ConfigureCacheEviction(ob => ob.Configure(options =>
{
options.MetadataMinTimeInCache = MetadataMinTimeInCache;
options.DataMaxAgeInCache = DataMaxAgeInCache;
options.DataMinTimeInCache = DataMinTimeInCache;
}));
});
}
}
Expand Down
30 changes: 30 additions & 0 deletions test/Tester/StreamingTests/StreamingResumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,25 @@ namespace Tester.StreamingTests
public abstract class StreamingResumeTests : TestClusterPerTest
{
protected static readonly TimeSpan StreamInactivityPeriod = TimeSpan.FromSeconds(5);
protected static readonly TimeSpan MetadataMinTimeInCache = StreamInactivityPeriod * 100;
protected static readonly TimeSpan DataMaxAgeInCache = StreamInactivityPeriod * 5;
protected static readonly TimeSpan DataMinTimeInCache = StreamInactivityPeriod * 4;

protected const string StreamProviderName = "StreamingCacheMissTests";

[SkippableFact]
public virtual async Task ResumeAfterInactivity()
{
await ResumeAfterInactivityImpl(false);
}

[SkippableFact]
public virtual async Task ResumeAfterInactivityNotInCache()
{
await ResumeAfterInactivityImpl(true);
}

protected virtual async Task ResumeAfterInactivityImpl(bool waitForCacheToFlush)
{
var streamProvider = this.Client.GetStreamProvider(StreamProviderName);

Expand All @@ -31,6 +45,22 @@ public virtual async Task ResumeAfterInactivity()
// Wait for the stream to become inactive
await Task.Delay(StreamInactivityPeriod.Multiply(3));

if (waitForCacheToFlush)
{
for (var i = 0; i < 5; i++)
{
var otherStream = streamProvider.GetStream<byte[]>(nameof(IImplicitSubscriptionCounterGrain), Guid.NewGuid());
await otherStream.OnNextAsync(interestingData);
}
// Wait a bit more for the cache to flush some events
await Task.Delay(StreamInactivityPeriod.Multiply(3));
for (var i = 0; i < 5; i++)
{
var otherStream = streamProvider.GetStream<byte[]>(nameof(IImplicitSubscriptionCounterGrain), Guid.NewGuid());
await otherStream.OnNextAsync(interestingData);
}
}

await stream.OnNextAsync(interestingData);

await Task.Delay(2_000);
Expand Down

0 comments on commit d2af146

Please sign in to comment.