Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
willg1983 authored Feb 11, 2025
2 parents 562204d + d2af146 commit 9a40aa8
Show file tree
Hide file tree
Showing 56 changed files with 1,575 additions and 1,137 deletions.
184 changes: 149 additions & 35 deletions .github/workflows/ci.yml

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<!-- Versioning properties -->
<PropertyGroup>
<AssemblyVersion>9.0.0.0</AssemblyVersion>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">9.0.0</VersionPrefix>
<VersionPrefix Condition=" '$(VersionPrefix)'=='' ">9.1.0</VersionPrefix>
</PropertyGroup>

<!-- For Debug builds generated a date/time dependent version suffix -->
Expand Down Expand Up @@ -103,4 +103,4 @@
</PropertyGroup>
</Otherwise>
</Choose>
</Project>
</Project>
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
</PropertyGroup>
<ItemGroup>
<!-- System packages -->
<PackageVersion Include="Aspire.Azure.Storage.Queues" Version="9.0.0" />
<PackageVersion Include="System.Diagnostics.PerformanceCounter" Version="8.0.1" />
<PackageVersion Include="System.IO.Hashing" Version="8.0.0" NoWarn="NU5104" />
<PackageVersion Include="System.IO.Pipelines" Version="8.0.0" />
Expand Down Expand Up @@ -113,6 +114,7 @@
<PackageVersion Include="Hyperion" Version="0.12.2" />
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
<PackageVersion Include="Testcontainers" Version="3.8.0" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<!-- Tooling related packages -->
<PackageVersion Include="Microsoft.SourceLink.AzureRepos.Git" Version="8.0.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,22 @@ private static Action<OptionsBuilder<AzureQueueOptions>> GetQueueOptionBuilder(I

if (!string.IsNullOrEmpty(connectionString))
{
options.QueueServiceClient = Uri.TryCreate(connectionString, UriKind.Absolute, out var uri)
? new QueueServiceClient(uri)
: new QueueServiceClient(connectionString);
if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
{
if (!string.IsNullOrEmpty(uri.Query))
{
// SAS URI
options.QueueServiceClient = new QueueServiceClient(uri);
}
else
{
options.QueueServiceClient = new QueueServiceClient(uri, credential: new Azure.Identity.DefaultAzureCredential());
}
}
else
{
options.QueueServiceClient = new QueueServiceClient(connectionString);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<ItemGroup>
<ProjectReference Include="$(SourceRoot)src\Orleans.Streaming\Orleans.Streaming.csproj" />
<PackageReference Include="Azure.Core" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Azure.Storage.Queues" />
<PackageReference Include="Azure.Data.Tables" />
Expand Down
24 changes: 15 additions & 9 deletions src/Orleans.Core/Async/AsyncExecutorWithRetries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static Task<T> ExecuteWithRetries<T>(
int maxNumErrorTries,
Func<Exception, int, bool> retryExceptionFilter,
TimeSpan maxExecutionTime,
IBackoffProvider onErrorBackOff)
IBackoffProvider onErrorBackOff,
CancellationToken cancellationToken = default)
{
return ExecuteWithRetries<T>(
function,
Expand All @@ -94,7 +95,8 @@ public static Task<T> ExecuteWithRetries<T>(
retryExceptionFilter,
maxExecutionTime,
null,
onErrorBackOff);
onErrorBackOff,
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -140,7 +142,8 @@ public static Task<T> ExecuteWithRetries<T>(
Func<Exception, int, bool> retryExceptionFilter,
TimeSpan maxExecutionTime = default,
IBackoffProvider onSuccessBackOff = null,
IBackoffProvider onErrorBackOff = null)
IBackoffProvider onErrorBackOff = null,
CancellationToken cancellationToken = default)
{
return ExecuteWithRetriesHelper<T>(
function,
Expand All @@ -151,7 +154,8 @@ public static Task<T> ExecuteWithRetries<T>(
retryValueFilter,
retryExceptionFilter,
onSuccessBackOff,
onErrorBackOff);
onErrorBackOff,
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -201,7 +205,8 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
Func<T, int, bool> retryValueFilter = null,
Func<Exception, int, bool> retryExceptionFilter = null,
IBackoffProvider onSuccessBackOff = null,
IBackoffProvider onErrorBackOff = null)
IBackoffProvider onErrorBackOff = null,
CancellationToken cancellationToken = default)
{
T result = default;
ExceptionDispatchInfo lastExceptionInfo = null;
Expand All @@ -211,6 +216,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
do
{
retry = false;
cancellationToken.ThrowIfCancellationRequested();

if (maxExecutionTime != Timeout.InfiniteTimeSpan && maxExecutionTime != default)
{
Expand Down Expand Up @@ -241,13 +247,13 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
retry = retryValueFilter(result, counter);
}

if (retry)
if (retry && !cancellationToken.IsCancellationRequested)
{
TimeSpan? delay = onSuccessBackOff?.Next(counter);

if (delay.HasValue)
{
await Task.Delay(delay.Value);
await Task.Delay(delay.Value, cancellationToken);
}
}
}
Expand All @@ -261,7 +267,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(
retry = retryExceptionFilter(exc, counter);
}

if (!retry)
if (!retry || cancellationToken.IsCancellationRequested)
{
throw;
}
Expand All @@ -272,7 +278,7 @@ private static async Task<T> ExecuteWithRetriesHelper<T>(

if (delay.HasValue)
{
await Task.Delay(delay.Value);
await Task.Delay(delay.Value, cancellationToken);
}
}
} while (retry);
Expand Down
53 changes: 28 additions & 25 deletions src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal class ClientClusterManifestProvider : IClusterManifestProvider, IAsyncD
private readonly LocalClientDetails _localClientDetails;
private readonly GatewayManager _gatewayManager;
private readonly AsyncEnumerable<ClusterManifest> _updates;
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource();
private ClusterManifest _current;
private Task? _runTask;

Expand Down Expand Up @@ -81,14 +81,14 @@ public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
_cancellation.Cancel();
_shutdownCts.Cancel();

if (_runTask is { IsCompleted: false } _task)
if (_runTask is { } task)
{
await _task.WaitAsync(cancellationToken);
await task.WaitAsync(cancellationToken);
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Graceful shutdown of cluster manifest provider was canceled.");
}
Expand All @@ -103,12 +103,11 @@ private async Task RunAsync()
try
{
var grainFactory = _services.GetRequiredService<IInternalGrainFactory>();
var cancellationTask = _cancellation.Token.WhenCancelled();
SiloAddress? gateway = null;
IClusterManifestSystemTarget? provider = null;
var minorVersion = 0;
var gatewayVersion = MajorMinorVersion.MinValue;
while (!_cancellation.IsCancellationRequested)
while (!_shutdownCts.IsCancellationRequested)
{
// Select a new gateway if the current one is not available.
// This could be caused by a temporary issue or a permanent gateway failure.
Expand All @@ -117,7 +116,7 @@ private async Task RunAsync()
gateway = _gatewayManager.GetLiveGateway();
if (gateway is null)
{
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)));
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)), _shutdownCts.Token);
continue;
}

Expand All @@ -135,19 +134,11 @@ private async Task RunAsync()

try
{
var refreshTask = GetClusterManifestUpdate(provider, gatewayVersion);
var task = await Task.WhenAny(cancellationTask, refreshTask).ConfigureAwait(false);

if (ReferenceEquals(task, cancellationTask))
{
return;
}

var updateResult = await refreshTask;
var updateResult = await GetClusterManifestUpdate(provider, gatewayVersion).WaitAsync(_shutdownCts.Token);
if (updateResult is null)
{
// There was no newer cluster manifest, so wait for the next refresh interval and try again.
await Task.WhenAny(cancellationTask, Task.Delay(_typeManagementOptions.TypeMapRefreshInterval));
await Task.Delay(_typeManagementOptions.TypeMapRefreshInterval, _shutdownCts.Token);
continue;
}

Expand Down Expand Up @@ -176,7 +167,7 @@ private async Task RunAsync()
var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), siloManifests);
if (!_updates.TryPublish(updatedManifest))
{
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)));
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)), _shutdownCts.Token);
continue;
}

Expand All @@ -187,12 +178,16 @@ private async Task RunAsync()
_logger.LogDebug("Refreshed cluster manifest");
}

await Task.WhenAny(cancellationTask, Task.Delay(_typeManagementOptions.TypeMapRefreshInterval));
await Task.Delay(_typeManagementOptions.TypeMapRefreshInterval, _shutdownCts.Token);
}
catch (OperationCanceledException) when (_shutdownCts.IsCancellationRequested)
{
// Ignore during shutdown.
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Error trying to get cluster manifest from gateway {Gateway}", gateway);
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromSeconds(5)));
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromSeconds(5)), _shutdownCts.Token).SuppressThrowing();

// Reset the gateway so that another will be selected on the next iteration.
gateway = null;
Expand Down Expand Up @@ -230,16 +225,24 @@ private async Task RunAsync()
}

/// <inheritdoc />
public ValueTask DisposeAsync()
public async ValueTask DisposeAsync()
{
_cancellation.Cancel();
return _runTask is Task task ? new ValueTask(task) : default;
if (_shutdownCts.IsCancellationRequested)
{
return;
}

_shutdownCts.Cancel();
if (_runTask is Task task)
{
await task.SuppressThrowing();
}
}

/// <inheritdoc />
public void Dispose()
{
_cancellation.Cancel();
_shutdownCts.Cancel();
}
}
}
19 changes: 9 additions & 10 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,42 +99,41 @@ public async Task StartAsync(CancellationToken cancellationToken)

private async Task EstablishInitialConnection(CancellationToken cancellationToken)
{
var cancellationTask = cancellationToken.WhenCancelled();
var liveGateways = gatewayManager.GetLiveGateways();

if (liveGateways.Count == 0)
{
throw new ConnectionFailedException("There are no available gateways");
throw new ConnectionFailedException("There are no available gateways.");
}

var pendingTasks = new List<Task>(liveGateways.Count + 1);
pendingTasks.Add(cancellationTask);
var pendingTasks = new List<Task>(liveGateways.Count);
foreach (var gateway in liveGateways)
{
pendingTasks.Add(connectionManager.GetConnection(gateway).AsTask());
}

try
{
// There will always be one task to represent cancellation.
while (pendingTasks.Count > 1)
while (pendingTasks.Count > 0)
{
var completedTask = await Task.WhenAny(pendingTasks);
var completedTask = await Task.WhenAny(pendingTasks).WaitAsync(cancellationToken);
pendingTasks.Remove(completedTask);

cancellationToken.ThrowIfCancellationRequested();

// If at least one gateway connection has been established, break out of the loop and continue startup.
if (completedTask.IsCompletedSuccessfully)
{
break;
}

// If there are no more gateways, observe the most recent exception and bail out.
if (pendingTasks.Count == 1)
if (pendingTasks.Count == 0)
{
await completedTask;
}
else
{
completedTask.Ignore();
}
}
}
catch (Exception exception)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Networking/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public async Task Close(CancellationToken ct)

if (closeTasks.Count > 0)
{
await Task.WhenAny(Task.WhenAll(closeTasks), ct.WhenCancelled());
await Task.WhenAll(closeTasks).WaitAsync(ct).SuppressThrowing();
if (ct.IsCancellationRequested) break;
}
else if (!pendingConnections) break;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ private async ValueTask RemoveExpiredAsync(CancellationToken cancellationToken)
using var cancellation = new CancellationTokenSource(_messagingOptions.ResponseTimeout / 2);

// Wait for either the MoveNextAsync task to complete or the cancellation token to be cancelled.
var completedTask = await Task.WhenAny(moveNextTask, cancellation.Token.WhenCancelled());
if (completedTask == moveNextTask)
await moveNextTask.WaitAsync(cancellation.Token).SuppressThrowing();
if (moveNextTask is {IsCompletedSuccessfully: true })
{
OnMoveNext(requestId);
var hasValue = moveNextTask.GetAwaiter().GetResult();
Expand Down
12 changes: 10 additions & 2 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageF
}

var executionTime = _busyDuration.Elapsed;
if (executionTime >= slowRunningRequestDuration)
if (executionTime >= slowRunningRequestDuration && !message.IsLocalOnly)
{
GetStatusList(ref diagnostics);
if (timeSinceQueued.HasValue)
Expand All @@ -722,7 +722,10 @@ public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageF
{
var message = running.Key;
var runDuration = running.Value;
if (ReferenceEquals(message, _blockingRequest)) continue;
if (ReferenceEquals(message, _blockingRequest) || message.IsLocalOnly)
{
continue;
}

// Check how long they've been executing.
var executionTime = runDuration.Elapsed;
Expand All @@ -744,6 +747,11 @@ public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageF
foreach (var pair in _waitingRequests)
{
var message = pair.Message;
if (message.IsLocalOnly)
{
continue;
}

var queuedTime = pair.QueuedTime.Elapsed;
if (queuedTime >= longQueueTimeDuration)
{
Expand Down
Loading

0 comments on commit 9a40aa8

Please sign in to comment.