From 57e8ed317685a11aa4de18482581829893234a29 Mon Sep 17 00:00:00 2001 From: Benjamin Petit Date: Tue, 12 Dec 2023 19:32:28 +0100 Subject: [PATCH 1/7] WIP --- .../IDs/GrainTypePrefix.cs | 41 ++++++ .../Runtime/GrainReference.cs | 8 +- .../Core/DefaultClientServices.cs | 1 + src/Orleans.Core/Core/GrainFactory.cs | 64 ++++++++- .../GrainReferenceActivator.cs | 127 +++++++++++++++++- .../HeterogeneousTests.cs | 9 +- 6 files changed, 240 insertions(+), 10 deletions(-) diff --git a/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs b/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs index c684f9236a..118c7c24a5 100644 --- a/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs +++ b/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs @@ -58,6 +58,16 @@ public static class GrainTypePrefix /// public static readonly ReadOnlyMemory LegacyGrainPrefixBytes = Encoding.UTF8.GetBytes(LegacyGrainPrefix); + /// + /// The prefix for stub grain (grain without definitive type). + /// + public const string StubGrainPrefix = SystemPrefix + "grain.stub."; + + /// + /// A span representation of + /// + public static readonly ReadOnlyMemory StubGrainPrefixBytes = Encoding.UTF8.GetBytes(StubGrainPrefix); + /// /// Returns if the type is a client, if not. /// @@ -99,5 +109,36 @@ public static class GrainTypePrefix /// The grain id. /// if the type is a system target, if not. public static bool IsSystemTarget(this in GrainId id) => id.Type.IsSystemTarget(); + + /// + /// Returns if the grain is a stub grain. + /// + /// The grain type. + /// if the type is a stub type , if not. + public static bool IsStubGrain(this in GrainType type) => type.AsSpan().StartsWith(StubGrainPrefixBytes.Span); + + /// + /// Create a stub grain type. + /// + /// The prefix of the class to be used. + /// The grain type. + public static GrainType CreateStubGrainType(string grainClassPrefix) + { + return string.IsNullOrWhiteSpace(grainClassPrefix) + ? new GrainType(StubGrainPrefixBytes.ToArray()) + : GrainType.Create(string.Concat(StubGrainPrefix, grainClassPrefix)); + } + + public static bool TryGetCrainClassPrefix(GrainType grainType, out string grainClassPrefix) + { + if (!IsStubGrain(grainType)) + { + grainClassPrefix = default; + return false; + } + + grainClassPrefix = Encoding.UTF8.GetString(grainType.AsSpan().Slice(StubGrainPrefixBytes.Length)); + return true; + } } } diff --git a/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs b/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs index b585d68cbc..004559f8ab 100644 --- a/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs +++ b/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs @@ -256,7 +256,7 @@ public class GrainReference : IAddressable, IEquatable, ISpanFor /// The grain reference functionality which is shared by all grain references of a given type. /// [NonSerialized] - private readonly GrainReferenceShared _shared; + private GrainReferenceShared _shared; /// /// The underlying grain id key. @@ -267,7 +267,11 @@ public class GrainReference : IAddressable, IEquatable, ISpanFor /// /// Gets the grain reference functionality which is shared by all grain references of a given type. /// - internal GrainReferenceShared Shared => _shared ?? throw new GrainReferenceNotBoundException(this); + internal GrainReferenceShared Shared + { + get => _shared ?? throw new GrainReferenceNotBoundException(this); + set => _shared = value; + } /// /// Gets the grain reference runtime. diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 32d9b6039b..55b3bb8d12 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -69,6 +69,7 @@ public static void AddDefaultServices(IServiceCollection services) services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Core/Core/GrainFactory.cs b/src/Orleans.Core/Core/GrainFactory.cs index 4fb1e8587f..4bf72de422 100644 --- a/src/Orleans.Core/Core/GrainFactory.cs +++ b/src/Orleans.Core/Core/GrainFactory.cs @@ -1,5 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using Orleans.GrainReferences; using Orleans.Metadata; using Orleans.Runtime; @@ -204,13 +207,66 @@ private IAddressable GetGrain(Type interfaceType, IdSpan grainKey, string grainC var grainInterfaceType = this.interfaceTypeResolver.GetGrainInterfaceType(interfaceType); GrainType grainType; - if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) + try { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); + if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) + { + grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); + } + else + { + grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); + } } - else + catch (ArgumentException) + { + // Not found in the type map. Maybe it's not available yet ? (heterogeneous case) + grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); + } + + var grainId = GrainId.Create(grainType, grainKey); + var grain = this.referenceActivator.CreateReference(grainId, grainInterfaceType); + return grain; + } + + internal async ValueTask GetGrainAsync(GrainInterfaceType grainInterfaceType, IdSpan grainKey, string grainClassNamePrefix, CancellationToken ct) + { + GrainType grainType; + Exception lastException = null; + do + { + try + { + if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) + { + grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); + } + else + { + grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); + } + } + catch (ArgumentException ex) + { + // Not found in the type map. Maybe it's not available yet ? (heterogeneous case) + grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); + lastException = ex; + } + try + { + await Task.Delay(1_000, ct); + } + catch (TaskCanceledException) + { + break; + } + } + while (grainType.IsStubGrain() || ct.IsCancellationRequested); + + if (grainType.IsStubGrain()) { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); + Debug.Assert(lastException != null); + throw lastException; } var grainId = GrainId.Create(grainType, grainKey); diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index f37a342443..9fbe53d98e 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -4,6 +4,8 @@ using System.Linq; using System.Reflection; using System.Reflection.Emit; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.CodeGeneration; @@ -13,6 +15,7 @@ using Orleans.Serialization; using Orleans.Serialization.Cloning; using Orleans.Serialization.Configuration; +using Orleans.Serialization.Invocation; using Orleans.Serialization.Serializers; using Orleans.Serialization.TypeSystem; @@ -94,6 +97,128 @@ private IGrainReferenceActivator CreateActivator(GrainType grainType, GrainInter } } + /// + /// Creates grain references that do not have their definitive type. + /// + internal class StubGrainReferenceActivatorProvider : IGrainReferenceActivatorProvider + { + private readonly CopyContextPool _copyContextPool; + private readonly CodecProvider _codecProvider; + private readonly GrainVersionManifest _versionManifest; + private readonly RpcProvider _rpcProvider; + private readonly IServiceProvider _serviceProvider; + + public StubGrainReferenceActivatorProvider( + GrainVersionManifest manifest, + RpcProvider rpcProvider, + CodecProvider codecProvider, + CopyContextPool copyContextPool, + IServiceProvider serviceProvider) + { + _versionManifest = manifest; + _rpcProvider = rpcProvider; + _codecProvider = codecProvider; + _copyContextPool = copyContextPool; + _serviceProvider = serviceProvider; + } + + public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) + { + if (!grainType.IsStubGrain()) + { + activator = default; + return false; + } + + _rpcProvider.TryGet(interfaceType, out var proxyType); + + var interfaceVersion = _versionManifest.GetLocalVersion(interfaceType); + var shared = new GrainReferenceShared( + grainType, + interfaceType, + interfaceVersion, + new StubGrainReferenceRuntime(proxyType), + InvokeMethodOptions.None, + _codecProvider, + _copyContextPool, + _serviceProvider); + activator = new GrainReferenceActivator(proxyType, shared); + return true; + } + + /// + /// Creates grain references for a given grain type and grain interface type. + /// + private sealed class GrainReferenceActivator : IGrainReferenceActivator + { + private readonly GrainReferenceShared _shared; + private readonly Func _create; + + /// + /// Initializes a new instance of the class. + /// + /// The generated proxy object type. + /// The functionality shared between all grain references for a specified grain type and grain interface type. + public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) + { + _shared = shared; + + var ctor = referenceType.GetConstructor(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, new[] { typeof(GrainReferenceShared), typeof(IdSpan) }) + ?? throw new SerializerException("Invalid proxy type: " + referenceType); + + var method = new DynamicMethod(referenceType.Name, typeof(GrainReference), new[] { typeof(object), typeof(GrainReferenceShared), typeof(IdSpan) }); + var il = method.GetILGenerator(); + // arg0 is unused for better delegate performance (avoids argument shuffling thunk) + il.Emit(OpCodes.Ldarg_1); + il.Emit(OpCodes.Ldarg_2); + il.Emit(OpCodes.Newobj, ctor); + il.Emit(OpCodes.Ret); + _create = method.CreateDelegate>(); + } + + public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); + } + + private sealed class StubGrainReferenceRuntime : IGrainReferenceRuntime + { + private Type _proxyType; + + public StubGrainReferenceRuntime(Type proxyType) + { + _proxyType = proxyType; + } + + public object Cast(IAddressable grain, Type grainInterface) + { + if (grain is GrainReference && grainInterface.IsAssignableFrom(grain.GetType())) + { + return grain; + } + throw new NotImplementedException(); + } + + public void InvokeMethod(GrainReference reference, IInvokable request, InvokeMethodOptions options) => throw new NotImplementedException(); + + public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) + { + var factory = reference.Shared.ServiceProvider.GetRequiredService(); + _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); + var grain = (GrainReference) await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token); // TODO track request timeout? + reference.Shared = grain.Shared; + return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); + } + + public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) + { + var factory = reference.Shared.ServiceProvider.GetRequiredService(); + _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); + var grain = (GrainReference) await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token); // TODO track request timeout? + reference.Shared = grain.Shared; + await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); + } + } + } + /// /// Creates grain references which do not have any specified grain interface, only a target grain id. /// @@ -316,7 +441,7 @@ public GrainReferenceActivatorProvider( /// public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) { - if (!_rpcProvider.TryGet(interfaceType, out var proxyType)) + if (grainType.IsStubGrain() || !_rpcProvider.TryGet(interfaceType, out var proxyType)) { activator = default; return false; diff --git a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs index c09320a788..e7bc9a4e86 100644 --- a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs +++ b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs @@ -78,13 +78,16 @@ public void Dispose() } [Fact] - public void GrainExcludedTest() + public async void GrainExcludedTest() { SetupAndDeployCluster(typeof(RandomPlacement), typeof(TestGrain)); // Should fail - var exception = Assert.Throws(() => this.cluster.GrainFactory.GetGrain(0)); - Assert.Contains("Could not find an implementation for interface", exception.Message); + //var exception = Assert.Throws(() => this.cluster.GrainFactory.GetGrain(0)); + //Assert.Contains("Could not find an implementation for interface", exception.Message); + + var grain = this.cluster.GrainFactory.GetGrain(0); + await grain.GetKey(); // Should not fail this.cluster.GrainFactory.GetGrain(0); From 2290524c2a1a02b75c8dd55702ca0b57772c27e9 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 13 Dec 2023 15:25:09 -0800 Subject: [PATCH 2/7] PTAL - rebond minor change --- src/Orleans.Core/Core/GrainFactory.cs | 20 ++--- .../GrainInterfaceTypeToGrainTypeResolver.cs | 79 +++++++++++++------ 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/src/Orleans.Core/Core/GrainFactory.cs b/src/Orleans.Core/Core/GrainFactory.cs index 4bf72de422..9d8d8e2f5f 100644 --- a/src/Orleans.Core/Core/GrainFactory.cs +++ b/src/Orleans.Core/Core/GrainFactory.cs @@ -206,24 +206,24 @@ private IAddressable GetGrain(Type interfaceType, IdSpan grainKey, string grainC { var grainInterfaceType = this.interfaceTypeResolver.GetGrainInterfaceType(interfaceType); - GrainType grainType; + bool success; + GrainType grainType = default; try { - if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) - { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); - } - else - { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); - } + success = interfaceTypeToGrainTypeResolver.TryGetGrainType(grainInterfaceType, grainClassNamePrefix, out grainType); } catch (ArgumentException) + { + success = false; + } + + if (!success) { // Not found in the type map. Maybe it's not available yet ? (heterogeneous case) - grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); + grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); } + Debug.Assert(!grainType.IsDefault); var grainId = GrainId.Create(grainType, grainKey); var grain = this.referenceActivator.CreateReference(grainId, grainInterfaceType); return grain; diff --git a/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs b/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs index 121749be45..88528c0d34 100644 --- a/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs +++ b/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs @@ -35,13 +35,34 @@ public GrainInterfaceTypeToGrainTypeResolver(IClusterManifestProvider clusterMan /// public GrainType GetGrainType(GrainInterfaceType interfaceType, string prefix) { - if (string.IsNullOrWhiteSpace(prefix)) + if (string.IsNullOrEmpty(prefix)) { return GetGrainType(interfaceType); } - GrainType result = default; + if (!TryGetGrainType(interfaceType, prefix, out var grainType)) + { + throw new ArgumentException($"Could not find an implementation matching prefix \"{prefix}\" for interface {interfaceType}"); + } + return grainType; + } + + /// + /// Resolves a which implements the provided , returning if an implementation was found; otherwise . + /// + /// The grain interface type. + /// A prefix of the grain implementation class name to search for. + /// The resolved grain type. + /// if an implementation was found; otherwise . + public bool TryGetGrainType(GrainInterfaceType interfaceType, string prefix, out GrainType result) + { + if (string.IsNullOrEmpty(prefix)) + { + return TryGetGrainType(interfaceType, out result); + } + + result = default; GrainInterfaceType lookupType; if (GenericGrainInterfaceType.TryParse(interfaceType, out var genericInterface)) { @@ -79,17 +100,12 @@ public GrainType GetGrainType(GrainInterfaceType interfaceType, string prefix) } } - if (result.IsDefault) - { - throw new ArgumentException($"Could not find an implementation matching prefix \"{prefix}\" for interface {interfaceType}"); - } - - if (GenericGrainType.TryParse(result, out var genericGrainType) && !genericGrainType.IsConstructed) + if (!result.IsDefault && GenericGrainType.TryParse(result, out var genericGrainType) && !genericGrainType.IsConstructed) { result = genericGrainType.GetConstructed(genericInterface); } - return result; + return !result.IsDefault; } /// @@ -108,6 +124,8 @@ public GrainType GetGrainType(GrainInterfaceType interfaceType) /// /// Resolves a which implements the provided , returning if an implementation was found; otherwise . /// + /// The grain interface type. + /// The resolved grain type. /// if an implementation was found; otherwise . public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType result) { @@ -115,26 +133,11 @@ public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType resu var cache = GetCache(); if (cache.Map.TryGetValue(interfaceType, out var entry)) { - if (!entry.PrimaryImplementation.IsDefault) - { - result = entry.PrimaryImplementation; - } - else if (entry.Implementations.Count == 1) - { - result = entry.Implementations[0].GrainType; - } - else if (entry.Implementations.Count > 1) - { - var candidates = string.Join(", ", entry.Implementations.Select(i => $"{i.GrainType} ({i.Prefix})")); - throw new ArgumentException($"Unable to identify a single appropriate grain type for interface {interfaceType}. Candidates: {candidates}"); - } - else - { - // No implementations - } + TryFind(interfaceType, entry, out result); } else if (_genericMapping.TryGetValue(interfaceType, out result)) { + // Nothing needed here. } else if (GenericGrainInterfaceType.TryParse(interfaceType, out var genericInterface) && genericInterface.IsConstructed) { @@ -164,6 +167,30 @@ public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType resu return !result.IsDefault; } + private bool TryFind(GrainInterfaceType interfaceType, CacheEntry entry, out GrainType result) + { + if (!entry.PrimaryImplementation.IsDefault) + { + result = entry.PrimaryImplementation; + } + else if (entry.Implementations.Count == 1) + { + result = entry.Implementations[0].GrainType; + } + else if (entry.Implementations.Count > 1) + { + var candidates = string.Join(", ", entry.Implementations.Select(i => $"{i.GrainType} ({i.Prefix})")); + throw new ArgumentException($"Unable to identify a single appropriate grain type for interface {interfaceType}. Candidates: {candidates}"); + } + else + { + // No implementations + result = default; + } + + return !result.IsDefault; + } + /// /// Returns the cache, rebuilding it if it is out of date. /// From a2c55387912ecd31bf2b946cab434cd046d7a757 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 09:00:30 -0800 Subject: [PATCH 3/7] Respect grain call timeouts, dispose CancellationTokenSource, implement Cast & InvokeMethod. --- .../Core/DefaultClientServices.cs | 3 + .../GrainReferenceActivator.cs | 57 ++++++++++++------- .../Hosting/DefaultSiloServices.cs | 1 + .../HeterogeneousTests.cs | 4 +- 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 55b3bb8d12..7edd02e112 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -19,6 +19,7 @@ using Orleans.Serialization.Cloning; using Microsoft.Extensions.Hosting; using System.Runtime.InteropServices; +using System; namespace Orleans { @@ -42,6 +43,8 @@ public static void AddDefaultServices(IServiceCollection services) services.Add(ServiceDescriptor); + services.TryAddSingleton(TimeProvider.System); + // Options logging services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>)); services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>)); diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index 9fbe53d98e..86659df363 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -107,19 +107,23 @@ internal class StubGrainReferenceActivatorProvider : IGrainReferenceActivatorPro private readonly GrainVersionManifest _versionManifest; private readonly RpcProvider _rpcProvider; private readonly IServiceProvider _serviceProvider; + private readonly StubGrainReferenceRuntime _stubRuntime; public StubGrainReferenceActivatorProvider( GrainVersionManifest manifest, + IRuntimeClient runtimeClient, RpcProvider rpcProvider, CodecProvider codecProvider, CopyContextPool copyContextPool, - IServiceProvider serviceProvider) + IServiceProvider serviceProvider, + TimeProvider timeProvider) { _versionManifest = manifest; _rpcProvider = rpcProvider; _codecProvider = codecProvider; _copyContextPool = copyContextPool; _serviceProvider = serviceProvider; + _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, timeProvider); } public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) @@ -137,7 +141,7 @@ public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IG grainType, interfaceType, interfaceVersion, - new StubGrainReferenceRuntime(proxyType), + _stubRuntime, InvokeMethodOptions.None, _codecProvider, _copyContextPool, @@ -166,7 +170,7 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) var ctor = referenceType.GetConstructor(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, new[] { typeof(GrainReferenceShared), typeof(IdSpan) }) ?? throw new SerializerException("Invalid proxy type: " + referenceType); - var method = new DynamicMethod(referenceType.Name, typeof(GrainReference), new[] { typeof(object), typeof(GrainReferenceShared), typeof(IdSpan) }); + var method = new DynamicMethod(referenceType.Name, typeof(GrainReference), [typeof(object), typeof(GrainReferenceShared), typeof(IdSpan)]); var il = method.GetILGenerator(); // arg0 is unused for better delegate performance (avoids argument shuffling thunk) il.Emit(OpCodes.Ldarg_1); @@ -179,31 +183,31 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); } - private sealed class StubGrainReferenceRuntime : IGrainReferenceRuntime + private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, TimeProvider timeProvider) : IGrainReferenceRuntime { - private Type _proxyType; - - public StubGrainReferenceRuntime(Type proxyType) - { - _proxyType = proxyType; - } + private readonly IRuntimeClient _runtimeClient = runtimeClient; + private readonly TimeProvider _timeProvider = timeProvider; - public object Cast(IAddressable grain, Type grainInterface) - { - if (grain is GrainReference && grainInterface.IsAssignableFrom(grain.GetType())) - { - return grain; - } - throw new NotImplementedException(); - } + public object Cast(IAddressable grain, Type grainInterface) => _runtimeClient.GrainReferenceRuntime.Cast(grain, grainInterface); - public void InvokeMethod(GrainReference reference, IInvokable request, InvokeMethodOptions options) => throw new NotImplementedException(); + public void InvokeMethod(GrainReference reference, IInvokable request, InvokeMethodOptions options) => InvokeMethodAsync(reference, request, options).AsTask().Ignore(); public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { var factory = reference.Shared.ServiceProvider.GetRequiredService(); _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); - var grain = (GrainReference) await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token); // TODO track request timeout? + var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); + var cancellation = new CancellationTokenSource(timeout, _timeProvider); + GrainReference grain; + try + { + grain = (GrainReference)await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); + } + finally + { + cancellation.Dispose(); + } + reference.Shared = grain.Shared; return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } @@ -212,7 +216,18 @@ public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable re { var factory = reference.Shared.ServiceProvider.GetRequiredService(); _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); - var grain = (GrainReference) await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token); // TODO track request timeout? + var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); + var cancellation = new CancellationTokenSource(timeout, _timeProvider); + GrainReference grain; + try + { + grain = (GrainReference)await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); + } + finally + { + cancellation.Dispose(); + } + reference.Shared = grain.Shared; await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index b1ff01c077..a5889944df 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -56,6 +56,7 @@ internal static void AddDefaultServices(IServiceCollection services) services.AddOptions(); + services.TryAddSingleton(TimeProvider.System); services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>)); services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>)); diff --git a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs index e7bc9a4e86..1ebf14182d 100644 --- a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs +++ b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs @@ -78,7 +78,7 @@ public void Dispose() } [Fact] - public async void GrainExcludedTest() + public void GrainExcludedTest() { SetupAndDeployCluster(typeof(RandomPlacement), typeof(TestGrain)); @@ -87,7 +87,7 @@ public async void GrainExcludedTest() //Assert.Contains("Could not find an implementation for interface", exception.Message); var grain = this.cluster.GrainFactory.GetGrain(0); - await grain.GetKey(); + //await grain.GetKey(); // Should not fail this.cluster.GrainFactory.GetGrain(0); From e44ee6f9daaeef2b54a0eef8fc6d1529f7708a5c Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 09:21:07 -0800 Subject: [PATCH 4/7] Refactor --- src/Orleans.Core/Core/GrainFactory.cs | 6 ++-- .../GrainReferenceActivator.cs | 32 +++++++------------ 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/Orleans.Core/Core/GrainFactory.cs b/src/Orleans.Core/Core/GrainFactory.cs index 9d8d8e2f5f..94f658f043 100644 --- a/src/Orleans.Core/Core/GrainFactory.cs +++ b/src/Orleans.Core/Core/GrainFactory.cs @@ -229,7 +229,7 @@ private IAddressable GetGrain(Type interfaceType, IdSpan grainKey, string grainC return grain; } - internal async ValueTask GetGrainAsync(GrainInterfaceType grainInterfaceType, IdSpan grainKey, string grainClassNamePrefix, CancellationToken ct) + internal async ValueTask GetGrainAsync(GrainInterfaceType grainInterfaceType, IdSpan grainKey, string grainClassNamePrefix, CancellationToken cancellationToken) { GrainType grainType; Exception lastException = null; @@ -254,14 +254,14 @@ internal async ValueTask GetGrainAsync(GrainInterfaceType grainInt } try { - await Task.Delay(1_000, ct); + await Task.Delay(1_000, cancellationToken); } catch (TaskCanceledException) { break; } } - while (grainType.IsStubGrain() || ct.IsCancellationRequested); + while (grainType.IsStubGrain() || cancellationToken.IsCancellationRequested); if (grainType.IsStubGrain()) { diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index 86659df363..f566118935 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -116,6 +116,7 @@ public StubGrainReferenceActivatorProvider( CodecProvider codecProvider, CopyContextPool copyContextPool, IServiceProvider serviceProvider, + GrainFactory grainFactory, TimeProvider timeProvider) { _versionManifest = manifest; @@ -123,7 +124,7 @@ public StubGrainReferenceActivatorProvider( _codecProvider = codecProvider; _copyContextPool = copyContextPool; _serviceProvider = serviceProvider; - _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, timeProvider); + _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, grainFactory, timeProvider); } public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) @@ -183,9 +184,10 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); } - private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, TimeProvider timeProvider) : IGrainReferenceRuntime + private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, GrainFactory grainFactory, TimeProvider timeProvider) : IGrainReferenceRuntime { private readonly IRuntimeClient _runtimeClient = runtimeClient; + private readonly GrainFactory _grainFactory = grainFactory; private readonly TimeProvider _timeProvider = timeProvider; public object Cast(IAddressable grain, Type grainInterface) => _runtimeClient.GrainReferenceRuntime.Cast(grain, grainInterface); @@ -194,34 +196,25 @@ private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, Tim public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - var factory = reference.Shared.ServiceProvider.GetRequiredService(); - _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); - var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); - var cancellation = new CancellationTokenSource(timeout, _timeProvider); - GrainReference grain; - try - { - grain = (GrainReference)await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); - } - finally - { - cancellation.Dispose(); - } - - reference.Shared = grain.Shared; + await ResolveGrainType(reference, request); return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - var factory = reference.Shared.ServiceProvider.GetRequiredService(); + await ResolveGrainType(reference, request); + await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); + } + + private async Task ResolveGrainType(GrainReference reference, IInvokable request) + { _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); var cancellation = new CancellationTokenSource(timeout, _timeProvider); GrainReference grain; try { - grain = (GrainReference)await factory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); + grain = (GrainReference)await _grainFactory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); } finally { @@ -229,7 +222,6 @@ public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable re } reference.Shared = grain.Shared; - await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } } } From 8104fc4f5ee88c60e9caf91b1ea04f6b8e8ed271 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 09:56:11 -0800 Subject: [PATCH 5/7] Refactor more, add comments --- .../GrainReferenceActivator.cs | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index f566118935..e7f28e2db6 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reflection; @@ -11,6 +12,7 @@ using Orleans.CodeGeneration; using Orleans.Metadata; using Orleans.Runtime; +using Orleans.Runtime.Internal; using Orleans.Runtime.Versions; using Orleans.Serialization; using Orleans.Serialization.Cloning; @@ -184,10 +186,17 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); } - private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, GrainFactory grainFactory, TimeProvider timeProvider) : IGrainReferenceRuntime + private sealed class StubGrainReferenceRuntime( + IRuntimeClient runtimeClient, + GrainFactory grainFactory, + GrainInterfaceTypeToGrainTypeResolver interfaceTypeToGrainTypeResolver, + IClusterManifestProvider clusterManifestProvider, + TimeProvider timeProvider) : IGrainReferenceRuntime { private readonly IRuntimeClient _runtimeClient = runtimeClient; private readonly GrainFactory _grainFactory = grainFactory; + private readonly GrainInterfaceTypeToGrainTypeResolver _interfaceTypeToGrainTypeResolver = interfaceTypeToGrainTypeResolver; + private readonly IClusterManifestProvider _clusterManifestProvider = clusterManifestProvider; private readonly TimeProvider _timeProvider = timeProvider; public object Cast(IAddressable grain, Type grainInterface) => _runtimeClient.GrainReferenceRuntime.Cast(grain, grainInterface); @@ -196,31 +205,39 @@ private sealed class StubGrainReferenceRuntime(IRuntimeClient runtimeClient, Gra public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - await ResolveGrainType(reference, request); + await ResolveGrainTypeAsync(reference, request); return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) { - await ResolveGrainType(reference, request); + await ResolveGrainTypeAsync(reference, request); await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); } - private async Task ResolveGrainType(GrainReference reference, IInvokable request) + private async Task ResolveGrainTypeAsync(GrainReference reference, IInvokable request) { _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); - var cancellation = new CancellationTokenSource(timeout, _timeProvider); - GrainReference grain; - try - { - grain = (GrainReference)await _grainFactory.GetGrainAsync(reference.InterfaceType, reference.GrainId.Key, grainClassPrefix, cancellation.Token); - } - finally + using var cancellation = new CancellationTokenSource(timeout, _timeProvider); + var clusterManifestUpdates = _clusterManifestProvider.Updates.WithCancellation(cancellation.Token).GetAsyncEnumerator(); + GrainType grainType; + while (!_interfaceTypeToGrainTypeResolver.TryGetGrainType(reference.InterfaceType, grainClassPrefix, out grainType)) { - cancellation.Dispose(); + // Wait for the next cluster manifest update. + // The cluster manifest is updated when silos join or leave the cluster. + // Each time the cluster manifest is updated, the grain type resolution cache is invalidated, and will be + // recomputed on the subsequent call to TryGetGrainType. + if (!await clusterManifestUpdates.MoveNextAsync()) + { + // If execution has reached this point, the host is either shutting down or the response timeout has elapsed. + throw new OperationCanceledException(cancellation.Token); + } } + // Use the newly resolved grain type to replace the grain reference's runtime with the runtime corresponding to the resolved grain reference. + var grainId = GrainId.Create(grainType, reference.GrainId.Key); + var grain = (GrainReference)_grainFactory.GetGrain(grainId, reference.InterfaceType); reference.Shared = grain.Shared; } } From c9ceba98f25cc8e6153dc70a30b28e5acb9c33e1 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 09:58:13 -0800 Subject: [PATCH 6/7] Correctly dispose enumerator, remove unused GetGrainAsync --- src/Orleans.Core/Core/GrainFactory.cs | 45 ------------------- .../GrainReferenceActivator.cs | 2 +- 2 files changed, 1 insertion(+), 46 deletions(-) diff --git a/src/Orleans.Core/Core/GrainFactory.cs b/src/Orleans.Core/Core/GrainFactory.cs index 94f658f043..75aaa77e65 100644 --- a/src/Orleans.Core/Core/GrainFactory.cs +++ b/src/Orleans.Core/Core/GrainFactory.cs @@ -229,51 +229,6 @@ private IAddressable GetGrain(Type interfaceType, IdSpan grainKey, string grainC return grain; } - internal async ValueTask GetGrainAsync(GrainInterfaceType grainInterfaceType, IdSpan grainKey, string grainClassNamePrefix, CancellationToken cancellationToken) - { - GrainType grainType; - Exception lastException = null; - do - { - try - { - if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) - { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); - } - else - { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); - } - } - catch (ArgumentException ex) - { - // Not found in the type map. Maybe it's not available yet ? (heterogeneous case) - grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); - lastException = ex; - } - try - { - await Task.Delay(1_000, cancellationToken); - } - catch (TaskCanceledException) - { - break; - } - } - while (grainType.IsStubGrain() || cancellationToken.IsCancellationRequested); - - if (grainType.IsStubGrain()) - { - Debug.Assert(lastException != null); - throw lastException; - } - - var grainId = GrainId.Create(grainType, grainKey); - var grain = this.referenceActivator.CreateReference(grainId, grainInterfaceType); - return grain; - } - /// /// Creates a grain reference. /// diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index e7f28e2db6..b48f9bbe08 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -220,7 +220,7 @@ private async Task ResolveGrainTypeAsync(GrainReference reference, IInvokable re _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); using var cancellation = new CancellationTokenSource(timeout, _timeProvider); - var clusterManifestUpdates = _clusterManifestProvider.Updates.WithCancellation(cancellation.Token).GetAsyncEnumerator(); + await using var clusterManifestUpdates = _clusterManifestProvider.Updates.WithCancellation(cancellation.Token).GetAsyncEnumerator(); GrainType grainType; while (!_interfaceTypeToGrainTypeResolver.TryGetGrainType(reference.InterfaceType, grainClassPrefix, out grainType)) { From c7a0b8f9e048fe868f88cb152c420d0f237f252d Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 Dec 2023 10:03:09 -0800 Subject: [PATCH 7/7] Fix build --- .../Core/InterfaceToImplementationMappingCache.cs | 2 +- .../GrainReferences/GrainReferenceActivator.cs | 8 ++++---- test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs index d5e74b9b41..5878b767bd 100644 --- a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs +++ b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs @@ -28,7 +28,7 @@ public Entry(MethodInfo implementationMethod, MethodInfo interfaceMethod) } /// - /// Gets the grain implmentation . + /// Gets the grain implementation . /// public MethodInfo ImplementationMethod { get; } diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index b48f9bbe08..f324a2a6b6 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -118,7 +118,8 @@ public StubGrainReferenceActivatorProvider( CodecProvider codecProvider, CopyContextPool copyContextPool, IServiceProvider serviceProvider, - GrainFactory grainFactory, + GrainInterfaceTypeToGrainTypeResolver grainTypeResolver, + IClusterManifestProvider clusterManifestProvider, TimeProvider timeProvider) { _versionManifest = manifest; @@ -126,7 +127,7 @@ public StubGrainReferenceActivatorProvider( _codecProvider = codecProvider; _copyContextPool = copyContextPool; _serviceProvider = serviceProvider; - _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, grainFactory, timeProvider); + _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, grainTypeResolver, clusterManifestProvider, timeProvider); } public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) @@ -188,13 +189,12 @@ public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) private sealed class StubGrainReferenceRuntime( IRuntimeClient runtimeClient, - GrainFactory grainFactory, GrainInterfaceTypeToGrainTypeResolver interfaceTypeToGrainTypeResolver, IClusterManifestProvider clusterManifestProvider, TimeProvider timeProvider) : IGrainReferenceRuntime { private readonly IRuntimeClient _runtimeClient = runtimeClient; - private readonly GrainFactory _grainFactory = grainFactory; + private readonly IGrainFactory _grainFactory = runtimeClient.InternalGrainFactory; private readonly GrainInterfaceTypeToGrainTypeResolver _interfaceTypeToGrainTypeResolver = interfaceTypeToGrainTypeResolver; private readonly IClusterManifestProvider _clusterManifestProvider = clusterManifestProvider; private readonly TimeProvider _timeProvider = timeProvider; diff --git a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs index 1ebf14182d..118ab124ff 100644 --- a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs +++ b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs @@ -87,6 +87,8 @@ public void GrainExcludedTest() //Assert.Contains("Could not find an implementation for interface", exception.Message); var grain = this.cluster.GrainFactory.GetGrain(0); + Assert.True(grain.GetGrainId().Type.IsStubGrain()); + //await grain.GetKey(); // Should not fail