Skip to content

Commit

Permalink
Pub sub fix for delivery to stopped subscribers + message renames (#1667
Browse files Browse the repository at this point in the history
)

* some more test diagnostics

* unsubscribe not reachable subscribers

* cleanup pub sub message names

* split out pub sub tests

* test CI

* fixed test project name

* add missing test logger

* revert CI change
  • Loading branch information
marcinbudny authored Jun 10, 2022
1 parent 3552340 commit 2fc538d
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 166 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
# - "tests/Proto.Cluster.MongoIdentity.Tests/*.csproj"
- "tests/Proto.Cluster.RedisIdentity.Tests/*.csproj"
- "tests/Proto.Cluster.CodeGen.Tests/*.csproj"
- "tests/Proto.Cluster.PubSub.Tests/*.csproj"

steps:
- uses: actions/checkout@v1
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
# - "tests/Proto.Cluster.MongoIdentity.Tests/*.csproj"
- "tests/Proto.Cluster.RedisIdentity.Tests/*.csproj"
- "tests/Proto.Cluster.CodeGen.Tests/*.csproj"
- "tests/Proto.Cluster.PubSub.Tests/*.csproj"

steps:
- uses: actions/checkout@v1
Expand Down
15 changes: 15 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ClusterPubSubBatchingProduc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ClusterPubSubBatchingProducer", "examples\ClusterPubSubBatchingProducer\ClusterPubSubBatchingProducer.csproj", "{8D481FB0-EEDC-4049-B18E-DEACA5BD8FF4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.PubSub.Tests", "tests\Proto.Cluster.PubSub.Tests\Proto.Cluster.PubSub.Tests.csproj", "{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1245,6 +1247,18 @@ Global
{8D481FB0-EEDC-4049-B18E-DEACA5BD8FF4}.Release|x64.Build.0 = Release|Any CPU
{8D481FB0-EEDC-4049-B18E-DEACA5BD8FF4}.Release|x86.ActiveCfg = Release|Any CPU
{8D481FB0-EEDC-4049-B18E-DEACA5BD8FF4}.Release|x86.Build.0 = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|x64.ActiveCfg = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|x64.Build.0 = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|x86.ActiveCfg = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Debug|x86.Build.0 = Debug|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|Any CPU.Build.0 = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|x64.ActiveCfg = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|x64.Build.0 = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|x86.ActiveCfg = Release|Any CPU
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1360,6 +1374,7 @@ Global
{31A0025C-008E-4961-A82A-842C806567F0} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
{4A162D2F-C82A-4919-962B-6B2C53316263} = {59DCCC96-DDAF-469F-9E8E-9BC733285082}
{8D481FB0-EEDC-4049-B18E-DEACA5BD8FF4} = {4A162D2F-C82A-4919-962B-6B2C53316263}
{5B3A62B8-4C75-4551-9DDE-4107EE96FD03} = {9AA2BCF0-19AB-4DD9-8D91-7D188E463806}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
26 changes: 17 additions & 9 deletions src/Proto.Actor/Mailbox/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,29 +79,36 @@ params IMailboxStatistics[] stats
_invoker = NoopInvoker.Instance;
}

public int Status => (int)Interlocked.Read(ref _status);
public int Status => (int) Interlocked.Read(ref _status);

public int UserMessageCount => _userMailbox.Length;

public void PostUserMessage(object msg)
{
// if the message is a batch message, we unpack the content as individual messages in the mailbox
// feature Aka: Samkuvertering in Swedish...
if (msg is IMessageBatch || msg is MessageEnvelope e && e.Message is IMessageBatch)
if (msg is IMessageBatch || msg is MessageEnvelope e && e.Message is IMessageBatch)
{
var batch = (IMessageBatch)MessageEnvelope.UnwrapMessage(msg)!;
var batch = (IMessageBatch) MessageEnvelope.UnwrapMessage(msg)!;
var messages = batch.GetMessages();

foreach (var message in messages)
{
_userMailbox.Push(message);

foreach (var t in _stats)
{
t.MessagePosted(message);
}
}

_userMailbox.Push(msg);

if (batch is IAutoRespond)
{
// push the batch itself as well, so that it can autorespond to sender after processing all contained messages
// used by pub sub
_userMailbox.Push(msg);
}

foreach (var t in _stats)
{
t.MessagePosted(msg);
Expand Down Expand Up @@ -179,7 +186,7 @@ private static Task RunAsync(DefaultMailbox mailbox)
static async Task Await(DefaultMailbox self, ValueTask task)
{
await task;

Interlocked.Exchange(ref self._status, MailboxStatus.Idle);

if (self._systemMessages.HasMessages || !self._suspended && self._userMailbox.HasMessages)
Expand Down Expand Up @@ -257,6 +264,7 @@ private ValueTask ProcessMessages()
e.CheckFailFast();
_invoker.EscalateFailure(e, msg);
}

return default;

static async ValueTask Await(object msg, ValueTask task, DefaultMailbox self)
Expand Down Expand Up @@ -285,11 +293,11 @@ private void Schedule()
#if NET5_0_OR_GREATER
ThreadPool.UnsafeQueueUserWorkItem(RunWrapper, this ,false);
#else
ThreadPool.UnsafeQueueUserWorkItem(RunWrapper, this);
ThreadPool.UnsafeQueueUserWorkItem(RunWrapper, this);
#endif
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void RunWrapper(object state)
{
Expand Down Expand Up @@ -322,4 +330,4 @@ public interface IMailboxStatistics
/// This method is invoked when all messages in the mailbox have been received.
/// </summary>
void MailboxEmpty();
}
}
18 changes: 9 additions & 9 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private async Task PublisherLoop(CancellationToken cancel)
{
Logger.LogDebug("Producer is starting the publisher loop for topic {Topic}", _topic);

var batch = new PublisherBatchMessage();
var batch = new PubSubBatch();

try
{
Expand All @@ -80,14 +80,14 @@ private async Task PublisherLoop(CancellationToken cancel)
if (batch.Envelopes.Count < _config.BatchSize) continue;

await PublishBatch(batch);
batch = new PublisherBatchMessage();
batch = new PubSubBatch();
}
else
{
if (batch.Envelopes.Count > 0)
{
await PublishBatch(batch);
batch = new PublisherBatchMessage();
batch = new PubSubBatch();
}

await _publisherChannel.Reader.WaitToReadAsync(cancel);
Expand Down Expand Up @@ -135,14 +135,14 @@ private async Task CancelPendingMessages()
}
}

private void ClearBatch(PublisherBatchMessage batch)
private void ClearBatch(PubSubBatch batch)
{
batch.Envelopes.Clear();
batch.DeliveryReports.Clear();
batch.CancelTokens.Clear();
}

private void FailBatch(PublisherBatchMessage batch, Exception ex)
private void FailBatch(PubSubBatch batch, Exception ex)
{
foreach (var deliveryReport in batch.DeliveryReports)
{
Expand All @@ -152,7 +152,7 @@ private void FailBatch(PublisherBatchMessage batch, Exception ex)
ClearBatch(batch);
}

private void CancelBatch(PublisherBatchMessage batch)
private void CancelBatch(PubSubBatch batch)
{
foreach (var deliveryReport in batch.DeliveryReports)
{
Expand All @@ -162,7 +162,7 @@ private void CancelBatch(PublisherBatchMessage batch)
ClearBatch(batch);
}

private void CompleteBatch(PublisherBatchMessage batch)
private void CompleteBatch(PubSubBatch batch)
{
foreach (var deliveryReport in batch.DeliveryReports)
{
Expand All @@ -172,7 +172,7 @@ private void CompleteBatch(PublisherBatchMessage batch)
ClearBatch(batch);
}

private void RemoveCancelledFromBatch(PublisherBatchMessage batch)
private void RemoveCancelledFromBatch(PubSubBatch batch)
{
var cancelTokensCopy = batch.CancelTokens.ToArray();

Expand All @@ -195,7 +195,7 @@ private void StopAcceptingNewMessages()
_publisherChannel.Writer.Complete();
}

private async Task PublishBatch(PublisherBatchMessage batch)
private async Task PublishBatch(PubSubBatch batch)
{
var retries = 0;
var retry = true;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Cluster/PubSub/BatchingProducerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Proto.Cluster.PubSub;
/// <param name="retries">Number of retries (1 after initial try)</param>
/// <param name="e">Exception that was thrown</param>
/// <param name="batch">Current batch</param>
public delegate Task<PublishingErrorDecision> PublishingErrorHandler(int retries, Exception e, PublisherBatchMessage batch);
public delegate Task<PublishingErrorDecision> PublishingErrorHandler(int retries, Exception e, PubSubBatch batch);

public record BatchingProducerConfig
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

namespace Proto.Cluster.PubSub;

public record DeliveryBatchMessage(Subscribers Subscribers, PublisherBatchMessage PublisherBatch) : IRootSerializable
public record DeliverBatchRequest(Subscribers Subscribers, PubSubBatch PubSubBatch) : IRootSerializable
{
public IRootSerialized Serialize(ActorSystem system) => new DeliveryBatch
public IRootSerialized Serialize(ActorSystem system) => new DeliverBatchRequestTransport
{
Subscribers = Subscribers,
Batch = (ProducerBatch)PublisherBatch.Serialize(system),
Batch = (PubSubBatchTransport) PubSubBatch.Serialize(system)
};
}

public partial class DeliveryBatch: IRootSerialized
public partial class DeliverBatchRequestTransport : IRootSerialized
{
public IRootSerializable Deserialize(ActorSystem system) =>
new DeliveryBatchMessage(Subscribers, (PublisherBatchMessage) Batch.Deserialize(system));
public IRootSerializable Deserialize(ActorSystem system) =>
new DeliverBatchRequest(Subscribers, (PubSubBatch) Batch.Deserialize(system));
}
2 changes: 1 addition & 1 deletion src/Proto.Cluster/PubSub/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface IPublisher
/// <returns></returns>
public Task<PublishResponse?> PublishBatch(
string topic,
PublisherBatchMessage batch,
PubSubBatch batch,
CancellationToken ct = default
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@

namespace Proto.Cluster.PubSub;

public record TopicBatchMessage(IReadOnlyCollection<object> Envelopes) : IRootSerializable , IMessageBatch, IAutoRespond
// message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond
public record PubSubAutoRespondBatch(IReadOnlyCollection<object> Envelopes) : IRootSerializable, IMessageBatch, IAutoRespond
{
public object GetAutoResponse(IContext context) => new PublishResponse();

public IReadOnlyCollection<object> GetMessages() => Envelopes;

public IRootSerialized Serialize(ActorSystem system)
{
var s = system.Serialization();

var batch = new TopicBatchRequest();
var batch = new PubSubAutoRespondBatchTransport();

foreach (var message in Envelopes)
{

var (messageData, typeName, serializerId) = s.Serialize(message);
var typeIndex = batch.TypeNames.IndexOf(typeName);

Expand All @@ -32,31 +33,32 @@ public IRootSerialized Serialize(ActorSystem system)
typeIndex = batch.TypeNames.Count - 1;
}

var topicEnvelope = new TopicEnvelope
var envelope = new PubSubEnvelope
{
MessageData = messageData,
TypeId = typeIndex,
SerializerId = serializerId,
};
batch.Envelopes.Add(topicEnvelope);

batch.Envelopes.Add(envelope);
}

return batch;
}
}

public partial class TopicBatchRequest : IRootSerialized
public partial class PubSubAutoRespondBatchTransport : IRootSerialized
{
public IRootSerializable Deserialize(ActorSystem system)
{
var ser = system.Serialization();
//deserialize messages in the envelope
var messages = Envelopes
.Select(e => ser
.Deserialize(TypeNames[e.TypeId], e.MessageData, e.SerializerId))
.Deserialize(TypeNames[e.TypeId], e.MessageData, e.SerializerId)
)
.ToList();

return new TopicBatchMessage(messages);
return new PubSubAutoRespondBatch(messages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

namespace Proto.Cluster.PubSub;

public class PublisherBatchMessage : IRootSerializable
public class PubSubBatch : IRootSerializable
{
public List<object> Envelopes { get; } = new ();
public List<object> Envelopes { get; } = new();

internal List<TaskCompletionSource<bool>> DeliveryReports { get; } = new();

Expand All @@ -23,7 +23,8 @@ public IRootSerialized Serialize(ActorSystem system)
{
var s = system.Serialization();

var batch = new ProducerBatch();
var batch = new PubSubBatchTransport();

foreach (var message in Envelopes)
{
var (messageData, typeName, serializerId) = s.Serialize(message);
Expand All @@ -35,14 +36,14 @@ public IRootSerialized Serialize(ActorSystem system)
typeIndex = batch.TypeNames.Count - 1;
}

var producerMessage = new ProducerEnvelope
var envelope = new PubSubEnvelope
{
MessageData = messageData,
TypeId = typeIndex,
SerializerId = serializerId
};
batch.Envelopes.Add(producerMessage);

batch.Envelopes.Add(envelope);
}

return batch;
Expand All @@ -51,18 +52,19 @@ public IRootSerialized Serialize(ActorSystem system)
public bool IsEmpty() => Envelopes.Count == 0;
}

public partial class ProducerBatch : IRootSerialized
public partial class PubSubBatchTransport : IRootSerialized
{
public IRootSerializable Deserialize(ActorSystem system)
{
var ser = system.Serialization();
//deserialize messages in the envelope
var messages = Envelopes
.Select(e => ser
.Deserialize(TypeNames[e.TypeId], e.MessageData, e.SerializerId))
.Deserialize(TypeNames[e.TypeId], e.MessageData, e.SerializerId)
)
.ToList();

var res = new PublisherBatchMessage();
var res = new PubSubBatch();
res.Envelopes.AddRange(messages);
return res;
}
Expand Down
Loading

0 comments on commit 2fc538d

Please sign in to comment.