Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid sending cluster manifest to client if client already has the latest version #8728

Merged
merged 13 commits into from
Dec 15, 2023
12 changes: 4 additions & 8 deletions src/Orleans.Core.Abstractions/Manifest/ClusterManifest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ public sealed class ClusterManifest
/// <param name="silos">
/// The silo manifests.
/// </param>
/// <param name="allGrainManifests">
/// All grain manifests.
/// </param>
public ClusterManifest(
MajorMinorVersion version,
ImmutableDictionary<SiloAddress, GrainManifest> silos,
ImmutableArray<GrainManifest> allGrainManifests)
ImmutableDictionary<SiloAddress, GrainManifest> silos)
{
this.Version = version;
this.Silos = silos;
this.AllGrainManifests = allGrainManifests;
Version = version;
Silos = silos;
AllGrainManifests = silos.Values.ToImmutableArray();
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void AddDefaultServices(IServiceCollection services)
services.TryAddSingleton<IAppEnvironmentStatistics, AppEnvironmentStatistics>();
services.AddLogging();
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
services.TryAddSingleton<ClientGrainContext>();
services.AddFromExisting<IGrainContextAccessor, ClientGrainContext>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Entry(MethodInfo implementationMethod, MethodInfo interfaceMethod)
}

/// <summary>
/// Gets the grain implmentation <see cref="MethodInfo"/>.
/// Gets the grain implementation <see cref="MethodInfo"/>.
/// </summary>
public MethodInfo ImplementationMethod { get; }

Expand Down
90 changes: 82 additions & 8 deletions src/Orleans.Core/Manifest/ClientClusterManifestProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -23,14 +25,16 @@ internal class ClientClusterManifestProvider : IClusterManifestProvider, IAsyncD
private readonly ILogger<ClientClusterManifestProvider> _logger;
private readonly TypeManagementOptions _typeManagementOptions;
private readonly IServiceProvider _services;
private readonly LocalClientDetails _localClientDetails;
private readonly GatewayManager _gatewayManager;
private readonly AsyncEnumerable<ClusterManifest> _updates;
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private ClusterManifest _current;
private Task _runTask;
private Task? _runTask;

public ClientClusterManifestProvider(
IServiceProvider services,
LocalClientDetails localClientDetails,
GatewayManager gatewayManager,
ILogger<ClientClusterManifestProvider> logger,
ClientManifestProvider clientManifestProvider,
Expand All @@ -39,12 +43,18 @@ public ClientClusterManifestProvider(
_logger = logger;
_typeManagementOptions = typeManagementOptions.Value;
_services = services;
_localClientDetails = localClientDetails;
_gatewayManager = gatewayManager;
this.LocalGrainManifest = clientManifestProvider.ClientManifest;
_current = new ClusterManifest(MajorMinorVersion.MinValue, ImmutableDictionary<SiloAddress, GrainManifest>.Empty, ImmutableArray.Create(this.LocalGrainManifest));
LocalGrainManifest = clientManifestProvider.ClientManifest;

// Create a fake manifest for the very first generation, which only includes the local client's manifest.
var builder = ImmutableDictionary.CreateBuilder<SiloAddress, GrainManifest>();
builder.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
_current = new ClusterManifest(MajorMinorVersion.MinValue, builder.ToImmutable());

_updates = new AsyncEnumerable<ClusterManifest>(
initialValue: _current,
updateValidator: (previous, proposed) => previous is null || proposed.Version > previous.Version,
updateValidator: (previous, proposed) => proposed.Version > previous.Version,
onPublished: update => Interlocked.Exchange(ref _current, update));
}

Expand Down Expand Up @@ -73,21 +83,64 @@ private async Task RunAsync()
{
var grainFactory = _services.GetRequiredService<IInternalGrainFactory>();
var cancellationTask = _cancellation.Token.WhenCancelled();
SiloAddress? gateway = null;
IClusterManifestSystemTarget? provider = null;
var minorVersion = 0;
var gatewayVersion = MajorMinorVersion.MinValue;
while (!_cancellation.IsCancellationRequested)
{
var gateway = _gatewayManager.GetLiveGateway();
// Select a new gateway if the current one is not available.
// This could be caused by a temporary issue or a permanent gateway failure.
if (gateway is null || !_gatewayManager.IsGatewayAvailable(gateway))
{
gateway = _gatewayManager.GetLiveGateway();
provider = grainFactory.GetGrain<IClusterManifestSystemTarget>(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId);

// Accept any cluster manifest version from the new gateway.
// Since the minor version of the manifest is specific to each gateway, we reset it to the lowest possible value.
// This means that it is possible to receive the an older or equivalent cluster manifest when the gateway changes.
// That hiccup is addressed by resetting the expected manifest version and merging incomplete manifests until a complete
// manifest is received.
gatewayVersion = MajorMinorVersion.MinValue;
}

Debug.Assert(provider is not null);

try
{
var provider = grainFactory.GetGrain<IClusterManifestSystemTarget>(SystemTargetGrainId.Create(Constants.ManifestProviderType, gateway).GrainId);
var refreshTask = provider.GetClusterManifest().AsTask();
var refreshTask = GetClusterManifestUpdate(provider, gatewayVersion);
var task = await Task.WhenAny(cancellationTask, refreshTask).ConfigureAwait(false);

if (ReferenceEquals(task, cancellationTask))
{
return;
}

if (!_updates.TryPublish(await refreshTask))
var updateResult = await refreshTask;
gatewayVersion = updateResult.Version;

// If the manifest does not contain all active servers, merge with the existing manifest until it does.
// This prevents reversed progress at the expense of including potentially defunct silos.
ImmutableDictionary<SiloAddress, GrainManifest> siloManifests;
if (!updateResult.IncludesAllActiveServers)
{
// Merge manifests until the manifest contains all active servers.
var mergedSilos = _current.Silos.ToBuilder();
mergedSilos.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
foreach (var kvp in updateResult.SiloManifests)
{
mergedSilos[kvp.Key] = kvp.Value;
}

siloManifests = mergedSilos.ToImmutable();
}
else
{
siloManifests = updateResult.SiloManifests.Add(_localClientDetails.ClientAddress, LocalGrainManifest);
}

var updatedManifest = new ClusterManifest(new MajorMinorVersion(gatewayVersion.Major, ++minorVersion), siloManifests);
if (!_updates.TryPublish(updatedManifest))
{
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromMilliseconds(500)));
continue;
Expand All @@ -106,6 +159,9 @@ private async Task RunAsync()
{
_logger.LogWarning(exception, "Error trying to get cluster manifest from gateway {Gateway}", gateway);
await Task.Delay(StandardExtensions.Min(_typeManagementOptions.TypeMapRefreshInterval, TimeSpan.FromSeconds(5)));

// Reset the gateway so that another will be selected on the next iteration.
gateway = null;
}
}
}
Expand All @@ -120,6 +176,24 @@ private async Task RunAsync()
}
}

private async Task<ClusterManifestUpdate> GetClusterManifestUpdate(IClusterManifestSystemTarget provider, MajorMinorVersion previousVersion)
{
try
{
// First, attempt to call the new API, which provides more information.
return await provider.GetClusterManifestUpdate(previousVersion);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Failed to fetch cluster manifest update from {Provider}.", provider);

// If the provider does not support the new API, fall back to the old one.
var manifest = await provider.GetClusterManifest();
var result = new ClusterManifestUpdate(manifest.Version, manifest.Silos, includesAllActiveServers: true);
return result;
}
}

/// <inheritdoc />
public ValueTask DisposeAsync()
{
Expand Down
34 changes: 1 addition & 33 deletions src/Orleans.Core/Manifest/ClientManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ namespace Orleans.Runtime
internal class ClientManifestProvider
{
public ClientManifestProvider(
IEnumerable<IGrainPropertiesProvider> grainPropertiesProviders,
IEnumerable<IGrainInterfacePropertiesProvider> grainInterfacePropertiesProviders,
IOptions<GrainTypeOptions> grainTypeOptions,
GrainTypeResolver grainTypeResolver,
GrainInterfaceTypeResolver interfaceTypeResolver)
{
var grainProperties = CreateGrainManifest(grainPropertiesProviders, grainTypeOptions, grainTypeResolver);
var interfaces = CreateInterfaceManifest(grainInterfacePropertiesProviders, grainTypeOptions, interfaceTypeResolver);
this.ClientManifest = new GrainManifest(grainProperties, interfaces);
this.ClientManifest = new GrainManifest(ImmutableDictionary<GrainType, GrainProperties>.Empty, interfaces);
}

/// <summary>
Expand Down Expand Up @@ -57,34 +54,5 @@ private static ImmutableDictionary<GrainInterfaceType, GrainInterfaceProperties>

return builder.ToImmutable();
}

private static ImmutableDictionary<GrainType, GrainProperties> CreateGrainManifest(
IEnumerable<IGrainPropertiesProvider> grainMetadataProviders,
IOptions<GrainTypeOptions> grainTypeOptions,
GrainTypeResolver grainTypeProvider)
{
var propertiesMap = ImmutableDictionary.CreateBuilder<GrainType, GrainProperties>();
foreach (var grainClass in grainTypeOptions.Value.Classes)
{
var grainType = grainTypeProvider.GetGrainType((Type)grainClass);
var properties = new Dictionary<string, string>();
foreach (var provider in grainMetadataProviders)
{
provider.Populate((Type)grainClass, grainType, properties);
}

var result = new GrainProperties(properties.ToImmutableDictionary());
if (propertiesMap.TryGetValue(grainType, out var grainProperty))
{
throw new InvalidOperationException($"An entry with the key {grainType} is already present."
+ $"\nExisting: {grainProperty.ToDetailedString()}\nTrying to add: {result.ToDetailedString()}"
+ "\nConsider using the [GrainType(\"name\")] attribute to give these classes unique names.");
}

propertiesMap.Add(grainType, result);
}

return propertiesMap.ToImmutable();
}
}
}
11 changes: 9 additions & 2 deletions src/Orleans.Core/Manifest/GrainVersionManifest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public GrainVersionManifest(IClusterManifestProvider clusterManifestProvider)
/// <summary>
/// Gets the local version for a specified grain interface type.
/// </summary>
/// <param name="interfaceType">The grain intrerface type name.</param>
/// <param name="interfaceType">The grain interface type name.</param>
/// <returns>The version of the specified grain interface.</returns>
public ushort GetLocalVersion(GrainInterfaceType interfaceType)
{
Expand Down Expand Up @@ -151,7 +151,7 @@ public ushort GetLocalVersion(GrainInterfaceType interfaceType)
/// <param name="grainType">The grain type.</param>
/// <param name="interfaceType">The grain interface type name.</param>
/// <param name="versions">The grain interface version.</param>
/// <returns>The set of silos which support the specifed grain.</returns>
/// <returns>The set of silos which support the specified grain.</returns>
public (MajorMinorVersion Version, Dictionary<ushort, SiloAddress[]> Result) GetSupportedSilos(GrainType grainType, GrainInterfaceType interfaceType, ushort[] versions)
{
var result = new Dictionary<ushort, SiloAddress[]>();
Expand Down Expand Up @@ -238,6 +238,13 @@ private static Cache BuildCache(ClusterManifest clusterManifest)
foreach (var entry in clusterManifest.Silos)
{
var silo = entry.Key;

// Since clients are not eligible for placement, we exclude them here.
if (silo.IsClient)
{
continue;
}

var manifest = entry.Value;
foreach (var grainInterface in manifest.Interfaces)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Manifest/IClusterManifestProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IClusterManifestProvider
/// Gets the current cluster manifest.
/// </summary>
ClusterManifest Current { get; }

/// <summary>
/// Gets the stream of cluster manifest updates.
/// </summary>
Expand Down
43 changes: 43 additions & 0 deletions src/Orleans.Core/Manifest/IClusterManifestSystemTarget.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Orleans.Metadata;

Expand All @@ -13,5 +15,46 @@ internal interface IClusterManifestSystemTarget : ISystemTarget
/// </summary>
/// <returns>The current cluster manifest.</returns>
ValueTask<ClusterManifest> GetClusterManifest();

/// <summary>
/// Gets an updated cluster manifest if newer than the provided <paramref name="previousVersion"/>.
/// </summary>
/// <returns>The current cluster manifest, or <see langword="null"/> if it is not newer than the provided version.</returns>
ValueTask<ClusterManifestUpdate> GetClusterManifestUpdate(MajorMinorVersion previousVersion);
}

/// <summary>
/// Represents an update to the cluster manifest.
/// </summary>
[GenerateSerializer, Immutable]
public class ClusterManifestUpdate
{
public ClusterManifestUpdate(
MajorMinorVersion manifestVersion,
ImmutableDictionary<SiloAddress, GrainManifest> siloManifests,
bool includesAllActiveServers)
{
Version = manifestVersion;
SiloManifests = siloManifests;
IncludesAllActiveServers = includesAllActiveServers;
}

/// <summary>
/// Gets the version of this instance.
/// </summary>
[Id(0)]
public MajorMinorVersion Version { get; }

/// <summary>
/// Gets the manifests for each silo in the cluster.
/// </summary>
[Id(1)]
public ImmutableDictionary<SiloAddress, GrainManifest> SiloManifests { get; }

/// <summary>
/// Gets a value indicating whether this update includes all active servers.
/// </summary>
[Id(2)]
public bool IncludesAllActiveServers { get; }
}
}
12 changes: 5 additions & 7 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable
internal static readonly TimeSpan MINIMUM_INTERCONNECT_DELAY = TimeSpan.FromMilliseconds(100); // wait one tenth of a second between connect attempts
internal const int CONNECT_RETRY_COUNT = 2; // Retry twice before giving up on a gateway server

internal ClientGrainId ClientId { get; private set; }
internal ClientGrainId ClientId => _localClientDetails.ClientId;
public IRuntimeClient RuntimeClient { get; }
internal bool Running { get; private set; }

Expand All @@ -58,17 +58,16 @@ internal class ClientMessageCenter : IMessageCenter, IDisposable
// false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
private readonly WeakReference<ClientOutboundConnection>[] grainBuckets;
private readonly ILogger logger;
public SiloAddress MyAddress { get; private set; }
public SiloAddress MyAddress => _localClientDetails.ClientAddress;
private int numberOfConnectedGateways = 0;
private readonly MessageFactory messageFactory;
private readonly IClusterConnectionStatusListener connectionStatusListener;
private readonly ConnectionManager connectionManager;
private readonly LocalClientDetails _localClientDetails;

public ClientMessageCenter(
IOptions<ClientMessagingOptions> clientMessagingOptions,
IPAddress localAddress,
int gen,
ClientGrainId clientId,
LocalClientDetails localClientDetails,
IRuntimeClient runtimeClient,
MessageFactory messageFactory,
IClusterConnectionStatusListener connectionStatusListener,
Expand All @@ -77,8 +76,7 @@ public ClientMessageCenter(
GatewayManager gatewayManager)
{
this.connectionManager = connectionManager;
MyAddress = SiloAddress.New(localAddress, 0, gen);
ClientId = clientId;
_localClientDetails = localClientDetails;
this.RuntimeClient = runtimeClient;
this.messageFactory = messageFactory;
this.connectionStatusListener = connectionStatusListener;
Expand Down
Loading
Loading