diff --git a/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs b/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs index c684f9236a..118c7c24a5 100644 --- a/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs +++ b/src/Orleans.Core.Abstractions/IDs/GrainTypePrefix.cs @@ -58,6 +58,16 @@ public static class GrainTypePrefix /// public static readonly ReadOnlyMemory LegacyGrainPrefixBytes = Encoding.UTF8.GetBytes(LegacyGrainPrefix); + /// + /// The prefix for stub grain (grain without definitive type). + /// + public const string StubGrainPrefix = SystemPrefix + "grain.stub."; + + /// + /// A span representation of + /// + public static readonly ReadOnlyMemory StubGrainPrefixBytes = Encoding.UTF8.GetBytes(StubGrainPrefix); + /// /// Returns if the type is a client, if not. /// @@ -99,5 +109,36 @@ public static class GrainTypePrefix /// The grain id. /// if the type is a system target, if not. public static bool IsSystemTarget(this in GrainId id) => id.Type.IsSystemTarget(); + + /// + /// Returns if the grain is a stub grain. + /// + /// The grain type. + /// if the type is a stub type , if not. + public static bool IsStubGrain(this in GrainType type) => type.AsSpan().StartsWith(StubGrainPrefixBytes.Span); + + /// + /// Create a stub grain type. + /// + /// The prefix of the class to be used. + /// The grain type. + public static GrainType CreateStubGrainType(string grainClassPrefix) + { + return string.IsNullOrWhiteSpace(grainClassPrefix) + ? new GrainType(StubGrainPrefixBytes.ToArray()) + : GrainType.Create(string.Concat(StubGrainPrefix, grainClassPrefix)); + } + + public static bool TryGetCrainClassPrefix(GrainType grainType, out string grainClassPrefix) + { + if (!IsStubGrain(grainType)) + { + grainClassPrefix = default; + return false; + } + + grainClassPrefix = Encoding.UTF8.GetString(grainType.AsSpan().Slice(StubGrainPrefixBytes.Length)); + return true; + } } } diff --git a/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs b/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs index b585d68cbc..004559f8ab 100644 --- a/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs +++ b/src/Orleans.Core.Abstractions/Runtime/GrainReference.cs @@ -256,7 +256,7 @@ public class GrainReference : IAddressable, IEquatable, ISpanFor /// The grain reference functionality which is shared by all grain references of a given type. /// [NonSerialized] - private readonly GrainReferenceShared _shared; + private GrainReferenceShared _shared; /// /// The underlying grain id key. @@ -267,7 +267,11 @@ public class GrainReference : IAddressable, IEquatable, ISpanFor /// /// Gets the grain reference functionality which is shared by all grain references of a given type. /// - internal GrainReferenceShared Shared => _shared ?? throw new GrainReferenceNotBoundException(this); + internal GrainReferenceShared Shared + { + get => _shared ?? throw new GrainReferenceNotBoundException(this); + set => _shared = value; + } /// /// Gets the grain reference runtime. diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 32d9b6039b..7edd02e112 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -19,6 +19,7 @@ using Orleans.Serialization.Cloning; using Microsoft.Extensions.Hosting; using System.Runtime.InteropServices; +using System; namespace Orleans { @@ -42,6 +43,8 @@ public static void AddDefaultServices(IServiceCollection services) services.Add(ServiceDescriptor); + services.TryAddSingleton(TimeProvider.System); + // Options logging services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>)); services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>)); @@ -69,6 +72,7 @@ public static void AddDefaultServices(IServiceCollection services) services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Core/Core/GrainFactory.cs b/src/Orleans.Core/Core/GrainFactory.cs index 4fb1e8587f..75aaa77e65 100644 --- a/src/Orleans.Core/Core/GrainFactory.cs +++ b/src/Orleans.Core/Core/GrainFactory.cs @@ -1,5 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; using Orleans.GrainReferences; using Orleans.Metadata; using Orleans.Runtime; @@ -203,16 +206,24 @@ private IAddressable GetGrain(Type interfaceType, IdSpan grainKey, string grainC { var grainInterfaceType = this.interfaceTypeResolver.GetGrainInterfaceType(interfaceType); - GrainType grainType; - if (!string.IsNullOrWhiteSpace(grainClassNamePrefix)) + bool success; + GrainType grainType = default; + try { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType, grainClassNamePrefix); + success = interfaceTypeToGrainTypeResolver.TryGetGrainType(grainInterfaceType, grainClassNamePrefix, out grainType); } - else + catch (ArgumentException) { - grainType = this.interfaceTypeToGrainTypeResolver.GetGrainType(grainInterfaceType); + success = false; } + if (!success) + { + // Not found in the type map. Maybe it's not available yet ? (heterogeneous case) + grainType = GrainTypePrefix.CreateStubGrainType(grainClassNamePrefix); + } + + Debug.Assert(!grainType.IsDefault); var grainId = GrainId.Create(grainType, grainKey); var grain = this.referenceActivator.CreateReference(grainId, grainInterfaceType); return grain; diff --git a/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs b/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs index 121749be45..88528c0d34 100644 --- a/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs +++ b/src/Orleans.Core/Core/GrainInterfaceTypeToGrainTypeResolver.cs @@ -35,13 +35,34 @@ public GrainInterfaceTypeToGrainTypeResolver(IClusterManifestProvider clusterMan /// public GrainType GetGrainType(GrainInterfaceType interfaceType, string prefix) { - if (string.IsNullOrWhiteSpace(prefix)) + if (string.IsNullOrEmpty(prefix)) { return GetGrainType(interfaceType); } - GrainType result = default; + if (!TryGetGrainType(interfaceType, prefix, out var grainType)) + { + throw new ArgumentException($"Could not find an implementation matching prefix \"{prefix}\" for interface {interfaceType}"); + } + return grainType; + } + + /// + /// Resolves a which implements the provided , returning if an implementation was found; otherwise . + /// + /// The grain interface type. + /// A prefix of the grain implementation class name to search for. + /// The resolved grain type. + /// if an implementation was found; otherwise . + public bool TryGetGrainType(GrainInterfaceType interfaceType, string prefix, out GrainType result) + { + if (string.IsNullOrEmpty(prefix)) + { + return TryGetGrainType(interfaceType, out result); + } + + result = default; GrainInterfaceType lookupType; if (GenericGrainInterfaceType.TryParse(interfaceType, out var genericInterface)) { @@ -79,17 +100,12 @@ public GrainType GetGrainType(GrainInterfaceType interfaceType, string prefix) } } - if (result.IsDefault) - { - throw new ArgumentException($"Could not find an implementation matching prefix \"{prefix}\" for interface {interfaceType}"); - } - - if (GenericGrainType.TryParse(result, out var genericGrainType) && !genericGrainType.IsConstructed) + if (!result.IsDefault && GenericGrainType.TryParse(result, out var genericGrainType) && !genericGrainType.IsConstructed) { result = genericGrainType.GetConstructed(genericInterface); } - return result; + return !result.IsDefault; } /// @@ -108,6 +124,8 @@ public GrainType GetGrainType(GrainInterfaceType interfaceType) /// /// Resolves a which implements the provided , returning if an implementation was found; otherwise . /// + /// The grain interface type. + /// The resolved grain type. /// if an implementation was found; otherwise . public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType result) { @@ -115,26 +133,11 @@ public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType resu var cache = GetCache(); if (cache.Map.TryGetValue(interfaceType, out var entry)) { - if (!entry.PrimaryImplementation.IsDefault) - { - result = entry.PrimaryImplementation; - } - else if (entry.Implementations.Count == 1) - { - result = entry.Implementations[0].GrainType; - } - else if (entry.Implementations.Count > 1) - { - var candidates = string.Join(", ", entry.Implementations.Select(i => $"{i.GrainType} ({i.Prefix})")); - throw new ArgumentException($"Unable to identify a single appropriate grain type for interface {interfaceType}. Candidates: {candidates}"); - } - else - { - // No implementations - } + TryFind(interfaceType, entry, out result); } else if (_genericMapping.TryGetValue(interfaceType, out result)) { + // Nothing needed here. } else if (GenericGrainInterfaceType.TryParse(interfaceType, out var genericInterface) && genericInterface.IsConstructed) { @@ -164,6 +167,30 @@ public bool TryGetGrainType(GrainInterfaceType interfaceType, out GrainType resu return !result.IsDefault; } + private bool TryFind(GrainInterfaceType interfaceType, CacheEntry entry, out GrainType result) + { + if (!entry.PrimaryImplementation.IsDefault) + { + result = entry.PrimaryImplementation; + } + else if (entry.Implementations.Count == 1) + { + result = entry.Implementations[0].GrainType; + } + else if (entry.Implementations.Count > 1) + { + var candidates = string.Join(", ", entry.Implementations.Select(i => $"{i.GrainType} ({i.Prefix})")); + throw new ArgumentException($"Unable to identify a single appropriate grain type for interface {interfaceType}. Candidates: {candidates}"); + } + else + { + // No implementations + result = default; + } + + return !result.IsDefault; + } + /// /// Returns the cache, rebuilding it if it is out of date. /// diff --git a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs index d5e74b9b41..5878b767bd 100644 --- a/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs +++ b/src/Orleans.Core/Core/InterfaceToImplementationMappingCache.cs @@ -28,7 +28,7 @@ public Entry(MethodInfo implementationMethod, MethodInfo interfaceMethod) } /// - /// Gets the grain implmentation . + /// Gets the grain implementation . /// public MethodInfo ImplementationMethod { get; } diff --git a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs index f37a342443..f324a2a6b6 100644 --- a/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs +++ b/src/Orleans.Core/GrainReferences/GrainReferenceActivator.cs @@ -1,18 +1,23 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reflection; using System.Reflection.Emit; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.CodeGeneration; using Orleans.Metadata; using Orleans.Runtime; +using Orleans.Runtime.Internal; using Orleans.Runtime.Versions; using Orleans.Serialization; using Orleans.Serialization.Cloning; using Orleans.Serialization.Configuration; +using Orleans.Serialization.Invocation; using Orleans.Serialization.Serializers; using Orleans.Serialization.TypeSystem; @@ -94,6 +99,150 @@ private IGrainReferenceActivator CreateActivator(GrainType grainType, GrainInter } } + /// + /// Creates grain references that do not have their definitive type. + /// + internal class StubGrainReferenceActivatorProvider : IGrainReferenceActivatorProvider + { + private readonly CopyContextPool _copyContextPool; + private readonly CodecProvider _codecProvider; + private readonly GrainVersionManifest _versionManifest; + private readonly RpcProvider _rpcProvider; + private readonly IServiceProvider _serviceProvider; + private readonly StubGrainReferenceRuntime _stubRuntime; + + public StubGrainReferenceActivatorProvider( + GrainVersionManifest manifest, + IRuntimeClient runtimeClient, + RpcProvider rpcProvider, + CodecProvider codecProvider, + CopyContextPool copyContextPool, + IServiceProvider serviceProvider, + GrainInterfaceTypeToGrainTypeResolver grainTypeResolver, + IClusterManifestProvider clusterManifestProvider, + TimeProvider timeProvider) + { + _versionManifest = manifest; + _rpcProvider = rpcProvider; + _codecProvider = codecProvider; + _copyContextPool = copyContextPool; + _serviceProvider = serviceProvider; + _stubRuntime = new StubGrainReferenceRuntime(runtimeClient, grainTypeResolver, clusterManifestProvider, timeProvider); + } + + public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) + { + if (!grainType.IsStubGrain()) + { + activator = default; + return false; + } + + _rpcProvider.TryGet(interfaceType, out var proxyType); + + var interfaceVersion = _versionManifest.GetLocalVersion(interfaceType); + var shared = new GrainReferenceShared( + grainType, + interfaceType, + interfaceVersion, + _stubRuntime, + InvokeMethodOptions.None, + _codecProvider, + _copyContextPool, + _serviceProvider); + activator = new GrainReferenceActivator(proxyType, shared); + return true; + } + + /// + /// Creates grain references for a given grain type and grain interface type. + /// + private sealed class GrainReferenceActivator : IGrainReferenceActivator + { + private readonly GrainReferenceShared _shared; + private readonly Func _create; + + /// + /// Initializes a new instance of the class. + /// + /// The generated proxy object type. + /// The functionality shared between all grain references for a specified grain type and grain interface type. + public GrainReferenceActivator(Type referenceType, GrainReferenceShared shared) + { + _shared = shared; + + var ctor = referenceType.GetConstructor(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic, new[] { typeof(GrainReferenceShared), typeof(IdSpan) }) + ?? throw new SerializerException("Invalid proxy type: " + referenceType); + + var method = new DynamicMethod(referenceType.Name, typeof(GrainReference), [typeof(object), typeof(GrainReferenceShared), typeof(IdSpan)]); + var il = method.GetILGenerator(); + // arg0 is unused for better delegate performance (avoids argument shuffling thunk) + il.Emit(OpCodes.Ldarg_1); + il.Emit(OpCodes.Ldarg_2); + il.Emit(OpCodes.Newobj, ctor); + il.Emit(OpCodes.Ret); + _create = method.CreateDelegate>(); + } + + public GrainReference CreateReference(GrainId grainId) => _create(_shared, grainId.Key); + } + + private sealed class StubGrainReferenceRuntime( + IRuntimeClient runtimeClient, + GrainInterfaceTypeToGrainTypeResolver interfaceTypeToGrainTypeResolver, + IClusterManifestProvider clusterManifestProvider, + TimeProvider timeProvider) : IGrainReferenceRuntime + { + private readonly IRuntimeClient _runtimeClient = runtimeClient; + private readonly IGrainFactory _grainFactory = runtimeClient.InternalGrainFactory; + private readonly GrainInterfaceTypeToGrainTypeResolver _interfaceTypeToGrainTypeResolver = interfaceTypeToGrainTypeResolver; + private readonly IClusterManifestProvider _clusterManifestProvider = clusterManifestProvider; + private readonly TimeProvider _timeProvider = timeProvider; + + public object Cast(IAddressable grain, Type grainInterface) => _runtimeClient.GrainReferenceRuntime.Cast(grain, grainInterface); + + public void InvokeMethod(GrainReference reference, IInvokable request, InvokeMethodOptions options) => InvokeMethodAsync(reference, request, options).AsTask().Ignore(); + + public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) + { + await ResolveGrainTypeAsync(reference, request); + return await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); + } + + public async ValueTask InvokeMethodAsync(GrainReference reference, IInvokable request, InvokeMethodOptions options) + { + await ResolveGrainTypeAsync(reference, request); + await reference.Shared.Runtime.InvokeMethodAsync(reference, request, options); + } + + private async Task ResolveGrainTypeAsync(GrainReference reference, IInvokable request) + { + _ = GrainTypePrefix.TryGetCrainClassPrefix(reference.GrainId.Type, out var grainClassPrefix); + var timeout = request.GetDefaultResponseTimeout() ?? _runtimeClient.GetResponseTimeout(); + using var cancellation = new CancellationTokenSource(timeout, _timeProvider); + await using var clusterManifestUpdates = _clusterManifestProvider.Updates.WithCancellation(cancellation.Token).GetAsyncEnumerator(); + GrainType grainType; + while (!_interfaceTypeToGrainTypeResolver.TryGetGrainType(reference.InterfaceType, grainClassPrefix, out grainType)) + { + // Wait for the next cluster manifest update. + // The cluster manifest is updated when silos join or leave the cluster. + // Each time the cluster manifest is updated, the grain type resolution cache is invalidated, and will be + // recomputed on the subsequent call to TryGetGrainType. + if (!await clusterManifestUpdates.MoveNextAsync()) + { + // If execution has reached this point, the host is either shutting down or the response timeout has elapsed. + throw new OperationCanceledException(cancellation.Token); + } + } + + // Use the newly resolved grain type to replace the grain reference's runtime with the runtime corresponding to the resolved grain reference. + var grainId = GrainId.Create(grainType, reference.GrainId.Key); + var grain = (GrainReference)_grainFactory.GetGrain(grainId, reference.InterfaceType); + reference.Shared = grain.Shared; + } + } + } + /// /// Creates grain references which do not have any specified grain interface, only a target grain id. /// @@ -316,7 +465,7 @@ public GrainReferenceActivatorProvider( /// public bool TryGet(GrainType grainType, GrainInterfaceType interfaceType, out IGrainReferenceActivator activator) { - if (!_rpcProvider.TryGet(interfaceType, out var proxyType)) + if (grainType.IsStubGrain() || !_rpcProvider.TryGet(interfaceType, out var proxyType)) { activator = default; return false; diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index b1ff01c077..a5889944df 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -56,6 +56,7 @@ internal static void AddDefaultServices(IServiceCollection services) services.AddOptions(); + services.TryAddSingleton(TimeProvider.System); services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>)); services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>)); diff --git a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs index c09320a788..118ab124ff 100644 --- a/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs +++ b/test/Tester/HeterogeneousSilosTests/HeterogeneousTests.cs @@ -83,8 +83,13 @@ public void GrainExcludedTest() SetupAndDeployCluster(typeof(RandomPlacement), typeof(TestGrain)); // Should fail - var exception = Assert.Throws(() => this.cluster.GrainFactory.GetGrain(0)); - Assert.Contains("Could not find an implementation for interface", exception.Message); + //var exception = Assert.Throws(() => this.cluster.GrainFactory.GetGrain(0)); + //Assert.Contains("Could not find an implementation for interface", exception.Message); + + var grain = this.cluster.GrainFactory.GetGrain(0); + Assert.True(grain.GetGrainId().Type.IsStubGrain()); + + //await grain.GetKey(); // Should not fail this.cluster.GrainFactory.GetGrain(0);