-
-
Notifications
You must be signed in to change notification settings - Fork 266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for MassTransit #853
Comments
Hi @Bru456 I’m not an expert on MassTransit, but I think you will want to look at the It is barebones though, so take a look at the aspnetcore middleware if you want to see how the edit: the |
I have made some filters that are used in production with MassTransit and FB.MultiTenant and it works pretty well: https://gist.github.com/fbjerggaard/12c58f47c20cd4fd44e1ccae320110c8 It is sorta "hacky" in that I don't get Finbuckle to handle the strategy but do it manually, but it works pretty well. It would be nice having it easily accessible in a package, however I am adding other things as well (that I removed from the samples) so not that useful to me. |
Excellent thank you @AndrewTriesToCode and @fbjerggaard, I'll have a play around with these and see what I can get working. I'm currently intrested in only the identifier as each service should maintain the list of tenants from a central configuration store. I feel a package for this would be very handy to have. I dont have indepth knowledge of how Finbuckle.MultiTenant works under the hood. But I am more than happy to look into a |
I looked a bit into it, and I don't really see how we can utilize I can't really see where the scope is created in MassTransit and how to hook into it. The closest I got currently is the following filter for the MassTransit middleware:
However, I don't know how to register this since it has 2 generic arguments where one of them ( |
I think it will still require the developer to need to do some configuration. I am seeing two possible ways to achieve this. I am unsure if this is correct. The first way (Could be wrong on so many levels):
The other possibly easier way:
I have already created the ability to determine the tenant via a property in the message by taking the identifier and looking it up in the tenant store. using @fbjerggaard pipeline filter looks like a more streamlined approach. Dont suppose you can share a code snippet of how you can access the TenantInfo within a Consumer? |
Right now I am just accessing it via the regular Writing an extension on the I am unsure how to access the context during the strategy portion of FB.MT - During a quick test I did it didn't seem to try any of the strategies whenever a message was processed by MT. I think the easiest solution would be to create a series of filters for setting and getting the It just feels a bit hacky to completely go around the |
@fbjerggaard thanks, let me see what I can do. I might be completely wrong but I'm doing all the dev work here: https://github.com/Bru456/Finbuckle.MultiTenant branch will be called MassTransit just not pushed it up yet. I plan on building samples to play with too. Will be two Console applications for simplicity. |
@AndrewTriesToCode I am assuming that Finbuckle only supports ASP.Net? I'm not seeing a middleware for console apps? Does anyone here use console apps for their background services or do you always front them with ASP.Net Web APIs? |
@fbjerggaard Theres sadly no easy way to get the tenant via extending the Its always an option tho :) |
No it works fine outside of ASP.NET Core. Works best with anything using the generic host which encapsulates DI behavior among other things—and can work just fine in console apps. For example: the Finbuckle configuration will work on the services in that example. When you need to determine a tenant get |
@Bru456 I think the flow should be something like this:
So something like this public class TenantConsumeFilter<T>(
IMultiTenantContextSetter mtcs,
IEnumerable<IMultiTenantStore<PlatformTenant>> stores,
ILoggerFactory loggerFactory,
IOptionsMonitor<MultiTenantOptions> options)
: IFilter<ConsumeContext<T>>
where T : class
{
public void Probe(ProbeContext context)
{
context.CreateFilterScope("tenantConsumeFilter");
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
var mtc = new MultiTenantContext<PlatformTenant>();
var identifier = context.Headers.Get<string>("tenantIdentifier");
if (options.CurrentValue.IgnoredIdentifiers.Contains(identifier, StringComparer.OrdinalIgnoreCase))
{
(loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance).LogInformation(
"Ignored identifier: {Identifier}", identifier);
identifier = null;
}
if (identifier is null)
{
await next.Send(context);
return;
}
foreach (var store in stores)
{
var wrappedStore = new MultiTenantStoreWrapper<PlatformTenant>(store,
loggerFactory?.CreateLogger(store.GetType()) ?? NullLogger.Instance);
var tenantInfo = await wrappedStore.TryGetByIdentifierAsync(identifier);
if (tenantInfo is null)
{
continue;
}
await options.CurrentValue.Events.OnTenantResolved(new TenantResolvedContext
{
Context = context,
TenantInfo = tenantInfo,
StrategyType = null,
StoreType = store.GetType()
});
mtc.TenantInfo = tenantInfo;
mtc.StrategyInfo = null; // TODO: Set when implemented in FB.MT package
mtc.StoreInfo = null; // TODO: Set when implemented in FB.MT package
mtcs.MultiTenantContext = mtc;
await next.Send(context);
return;
}
await options.CurrentValue.Events.OnTenantNotResolved(new TenantNotResolvedContext
{ Context = context, Identifier = identifier });
await next.Send(context);
}
} These filters can then be registered by creating an extension method on the public static void AddTenantFilters(this IBusFactoryConfigurator configurator, IRegistrationContext context)
{
configurator.UseConsumeFilter(typeof(TenantConsumeFilter<>), context);
configurator.UsePublishFilter(typeof(TenantPublishFilter<>), context);
configurator.UseExecuteActivityFilter(typeof(TenantExecuteFilter<>), context);
configurator.UseCompensateActivityFilter(typeof(TenantCompensateFilter<>), context);
configurator.UseSendFilter(typeof(TenantSendFilter<>), context);
} I only have 1 problem with this right now - It is currently hardcoded to a specific |
@Bru456 I actually think I have something usable now. It became quite convoluted, but it seems to do its job. It basically involves duplicating how MassTransit registers filters and customizing it to allow for a second generic type argument. This ends up in the following snippet: public static class MassTransitRegistrationExtensions
{
public static void UseFinbuckleFilters<TTenantInfo>(this IBusFactoryConfigurator configurator,
IRegistrationContext context)
where TTenantInfo : class, ITenantInfo, new()
{
configurator.UseFinbuckleMultiTenantConsumeFilter<TTenantInfo>(context);
configurator.UsePublishFilter(typeof(TenantPublishFilter<>), context);
configurator.UseSendFilter(typeof(TenantSendFilter<>), context);
}
private static void UseFinbuckleMultiTenantConsumeFilter<TTenantInfo>(this IConsumePipeConfigurator configurator,
IRegistrationContext context)
where TTenantInfo : class, ITenantInfo, new()
{
var filterType = typeof(TenantConsumeFilter<,>);
var messageTypeFilterConfigurator = new MessageTypeFilterConfigurator();
var observer =
new CustomScopedConsumePipeSpecificationObserver<TTenantInfo>(filterType, context,
messageTypeFilterConfigurator.Filter);
configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}
} I still need to create the PipeSpecificationObservers for Execute and Compensate Which is then configured like this: mt.UsingRabbitMq((ctx, cfg) =>
{
[...]
cfg.UseFinbuckleFilters<PlatformTenant>(ctx);
[...]
}); The I still need to clean it up a bit - which will probably happen tomorrow. Then I can make a gist with all the required files so you/others can test it out and see if it works as intended. It still skips all of the Finbuckle strategy stuff, but I just had a thought that it shouldn't be that hard to make a strategy that works for this - I just don't think it's needed since it is quite implicit the way it's configured. |
I wasnt even looking at the masstransit side. I've been attempting to do strategies. I have this: public class MassTransitHeaderStrategy : IMultiTenantStrategy
{
private readonly string _headerKey;
public MassTransitHeaderStrategy(string headerKey)
{
_headerKey = headerKey;
}
public Task<string?> GetIdentifierAsync(object context)
{
string? header = null;
if (!(context is ConsumeContext consumeContext))
throw new MultiTenantException(null,
new ArgumentException($"\"{nameof(context)}\" type must be of type ConsumeContext", nameof(context)));
if (consumeContext.Headers.TryGetHeader(_headerKey, out var tenantId))
{
header = tenantId as string;
}
return Task.FromResult(header);
}
} I have been trying to figure out how to whack it into mass transit. and use the inbuilt IResolverBased on the strat resolution so everything just falls into place. I have been struggling to get the tenant setup within a console app, even uses the static strat it still isnt picking it up.... current Program looks like this: public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
//Console.WriteLine("Hello, World!");
Console.ReadLine();
}
public static IHostBuilder CreateHostBuilder(string[] args)
=> Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMultiTenant<TenantInfo>()
.WithConfigurationStore()
//.WithInMemoryStore()
.WithStaticStrategy("tenant1")
.WithMassTransitHeaderStrategy("tenantIdentifier");
services.AddMassTransit(x =>
{
x.AddConsumer<GettingStartedConsumer>();
x.UsingInMemory((context, cfg) =>
{
//cfg.UseConsumeFilter(typeof(TenantConsumeFilter<>), context);
cfg.UsePublishFilter(typeof(TenantPublishFilter<>), context);
cfg.UseSendFilter(typeof(TenantPublishFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
services.AddHostedService<Worker>();
});
}
} Think I'll move to a webapi as I know how to easily get that working. |
Okay, I am playing with the following strategy: namespace Finbuckle.MultiTenant.MassTransit.Strategies
{
public class MassTransitHeaderStrategy : IMultiTenantStrategy
{
private readonly string _headerKey;
public MassTransitHeaderStrategy(string headerKey)
{
_headerKey = headerKey;
}
public Task<string?> GetIdentifierAsync(object context)
{
string? header = null;
if (!(context is ConsumeContext consumeContext))
throw new MultiTenantException(null,
new ArgumentException($"\"{nameof(context)}\" type must be of type ConsumeContext", nameof(context)));
if (consumeContext.Headers.TryGetHeader(_headerKey, out var tenantId))
{
header = tenantId as string;
}
return Task.FromResult(header);
}
}
} However I have noticed that the case in point BasePath: namespace Finbuckle.MultiTenant.AspNetCore.Strategies;
public class BasePathStrategy : IMultiTenantStrategy
{
public Task<string?> GetIdentifierAsync(object context)
{
if (!(context is HttpContext httpContext))
throw new MultiTenantException(null,
new ArgumentException($"\"{nameof(context)}\" type must be of type HttpContext", nameof(context)));
// removed for shortness
}
} @AndrewTriesToCode I can see the following way to address this: Attempt to gracefully handle errors within public async Task<IMultiTenantContext<TTenantInfo>> ResolveAsync(object context)
{
var mtc = new MultiTenantContext<TTenantInfo>();
string? identifier = null;
foreach (var strategy in Strategies)
{
//shorted for readability
try{
identifier = await wrappedStrategy.GetIdentifierAsync(context);
}
catch
{
continue;
}
//shorted for readability
}
} However this will no longer error out if its the wrong context types. I can still get it to error in the event they all error out. Thoughts??? This means the entire consumer filter becomes: namespace Finbuckle.MultiTenant.MassTransit.MassTransitFilters
{
public class TenantConsumeFilter<T> (
ITenantResolver tenantResolver,
IMultiTenantContextSetter mtcSetter
)
: IFilter<ConsumeContext<T>>
where T : class
{
public void Probe(ProbeContext context)
{
context.CreateFilterScope("tenantConsumeFilter");
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
IMultiTenantContext? multiTenantContext = await tenantResolver.ResolveAsync(context);
mtcSetter.MultiTenantContext = multiTenantContext;
await next.Send(context);
}
}
} this would allow users to set the header with: |
Maybe throwing an exception is not the right way to handle the wrong context type? Otherwise I really like your idea @Bru456 - Much simpler than what I cooked up. It should also be possible to write an extension method on the In regards to the header key, would it be a crazy idea to either:
Personally I think the latter is the cleanest of those two. |
I was thinking that too with the rewriting the strategies, it feels better and less hacky. I doubt anyone does anything with them outside of Finbuckle. I'm just not sure if it would have any unintended consequences. But hey thats what tests are for ;) Why thank you @fbjerggaard, my goal was to keep it relying on already built functionality than trying to do anything "new" or duplicate it. So I was trying to replicate the middleware for ASP into the mass transit. Spent a lot of time trying to figure out how to get it via a Funnily enough, I was thinking about the next task to see if its possible to inject the strategy, I might need to add a method to return the header. But keeps it clean :) |
It made sense when Finbuckle only supported Another approach could be to define a (list?) of supported context types for each strategy and then filtering the strategies based on what the current context is before running them - That would maybe make for slightly more performant code since fewer checks for the same needs to happen, and then we could keep the throwing - although it would be redundant. -- The only reason why I went the route I did was because it didn't seem like any strategies were actually tried when processing messages through MassTransit. But maybe something in my test was flawed, or I never really tried it :) |
Also - @AndrewTriesToCode - Is this something you would want to have in the main repo here, or should the "interop" package rather live in its own repository? It should definitely be its own project no matter what to avoid pulling in the MassTransit dependency for all users of this package, but whether it should be in this repository or its own is entirely up to you |
Could replace the But I dont really wanna modify the main one in any meaningful way without @AndrewTriesToCode say so. I'm close to pushing what I have:
Still to do:
I should be committing my stuff tonight as its in a somewhat useable state to here: https://github.com/Bru456/Finbuckle.MultiTenant |
Successful test complete:
|
Pushed into here: https://github.com/Bru456/Finbuckle.MultiTenant/tree/MassTransit Please check it out.
|
Still to be done: To be Complete:
|
Looking good so far! Some quick notes:
|
Thanks @fbjerggaard
Thank you, should make it easy.
Possibly, is there a use case to have this blanket on? Would need to test to ensure that a blanket on wont impact if there is no tenant. Right now its configurable down to individual topics/queues. There might be a slight impact to performance with a blanket on.
I think, this is just a remnant of me stumbling around trying to figure it out :D
Any ideas on how to achieve this? |
For example in my usecase - I am not doing anything that is not related to a tenant, so therefore I need the tenantIdentifier set on absolutely everything that goes throug MassTransit.
Not really sorry. The only thing I could come up with is checking in the filters, but that seems like a heavy operation to do on each message. |
@fbjerggaard Do you actively use the Execute and Compensate contexts? https://masstransit.io/documentation/concepts/messages#message-headers this says:
|
Looking at I think we can safely change the throw an error to simply return
Thus if no strategy finds it, it simply carries on with its life as normal if it cant find the tenant in the configured strategies. And more importantly requires no change to the |
@AndrewTriesToCode I can see from the git history that the strategies have always been set to throw an error. Would this have any unintended issues if it was changed to return |
Yes - when using Routing slips the context differs and is then either a Execute or Compensate context. So actually, the strategy needs to be able to handle multiple different context types now that I think about it. Unless they can be mapped to a |
@fbjerggaard 5 steps ahead of you ;) public Task<string?> GetIdentifierAsync(object context)
{
string? header = null;
if (!(context is ConsumeContext || context is CompensateContext || context is ExecuteContext))
return Task.FromResult<string?>(null);
if(context is MessageContext messageContext)
{
if (messageContext.Headers.TryGetHeader(_config.TenantIdentifierHeaderKey, out var tenantId))
{
header = tenantId as string;
}
}
//if (consumeContext.Headers.TryGetHeader(_config.TenantIdentifierHeaderKey, out var tenantId))
// {
// header = tenantId as string;
// }
return Task.FromResult(header);
} |
@fbjerggaard any idea how to unit test the filters? Only thing I can think of is to:
Dont get me wrong, this would defo ensure it works :D just wondering if you have done any testing of your filters in isolation? |
I haven't done any unit testing on my filters in isolation (Or at all, actually.. 😄 ) But it seems like a reasonable approach to do it, and would probably be the way I would do it too. |
@Bru456 The strategies in the Also I'm thrilled to see the engagement on this issue with you both--I should learn MassTransit! Regarding a pull request -- I want to keep the official repo as universal as possible. Something more specialized like this would be better in its own repo. If you wanted to create one I'd be happy to link to it from the readme and the website. |
Thanks @AndrewTriesToCode, its currently built to be slotted next to your stuff. I.e. I some of the internals of public Task<string?> GetIdentifierAsync(object context)
{
if (!(context is HttpContext httpContext))
return Task.FromResult<string?>(null);
//throw new MultiTenantException(null,
// new ArgumentException($"\"{nameof(context)}\" type must be of type HttpContext", nameof(context)));
return Task.FromResult(httpContext?.Request.Headers[_headerKey].FirstOrDefault());
} So far all this has done is brake the unit tests expecting an error instead of null. Everything else is working fine - as far as I can tell. Currently fighting with unit tests trying to test the filters 😄 All work is being done here: https://github.com/Bru456/Finbuckle.MultiTenant/tree/MassTransit. Once I get the unit tests working I'll do another push. |
Pushed latest with unit tests: https://github.com/Bru456/Finbuckle.MultiTenant/tree/MassTransit. @fbjerggaard can you provide examples of Execute and Compensate please. I have not used them so no idea what they look like. Unit tests for Publish and Send (Consume is baked into them) is working and passing. I have also changed all the exceptions for not being I have also modified (commented out and recreated) those unit tests to be check for nulls. All Tests passing. I have not added .net6 or .net7 support. Should only be package details so nothing too major. Inside of I then configure the Test harness to use a consumer (created in a class just below in the same file) and use the in memory bus and configured all of the filters on the bus. I then use this in the tests to create the test environment and manually set the I have not created the auto injector thing that @fbjerggaard wants as well I am kinda scared to learn the internals of Mass Transit.... But feel free to give it a bash :D |
@fbjerggaard added the Tested it, works fine. // This is a single add command that can be used to apply all FinBuckle.MultiTenant filters to the MassTransit pipeline.
x.UsingInMemory((IBusRegistrationContext context, IInMemoryBusFactoryConfigurator cfg) => //using in memory for simplicity. Please replace with your preferred transport method.
{
cfg.AddTenantFilters(context); // Required if wanting to have a MassTransit Consumer and maintain tenant context. To use this filter, .WithMassTransitHeaderStrategy() must be called in the MultiTenantBuilder.
cfg.ConfigureEndpoints(context);
}); Unit tests added to test both individual filter commands and adding all via Just have docs left to go. and the Execute and Compensate.
@AndrewTriesToCode Regarding:
I would like to keep it under Finbuckle if at all possible (mostly for the name and so people know its associated.) I have also seen a fair bit of chatter over in Mass Transit to get this to work as well. Some figured it out, some implemented in their solutions. I feel this is a good thing to keep them reasonably close. |
Yep, i'll cook something up on monday when i'm back in the office with easier access to the codebase using it :) |
@fbjerggaard dont suppose you had a chance? |
Also anyone think there is scope for a I think yes, but from looking at it, seems possible only with a Might be easier and more fool proof just doing docs to say how it find the tenant from the store based on the identifier as I cant modify the body in the filters. |
Sorry, got hung op on actual work yesterday. Tried forking your repository but couldn't do it because I already have a fork, so heres a link to a gist with the unit test for execute/compensate: https://gist.github.com/fbjerggaard/3507af0fb09e8fe4aea40f87b78198e5 |
@fbjerggaard thank you! Added :) Just have user docs to create now. @AndrewTriesToCode have you had a chance to see the below?
|
Okay I have encountered an issue after attempting to integrate into my own project.... I have a custom tenantinfo lets call it I get the following error back from the Bus and it is in faulted state. Message = "Unable to resolve service for type 'Finbuckle.MultiTenant.Abstractions.IMultiTenantContext' while attempting to activate 'Finbuckle.MultiTenant.MassTransit.MassTransitFilters.TenantPublishFilter`1[SharedModels.Contracts.IGetDataRepoIntegration]'." This is because I am using If I use This is because I am using in the filter with DI Any ideas around this guys? I have looked at adding |
@Bru456 What if you just access the plain |
................. I both hate you and love you right now :D unit tests are passing, and the message is being sent. Swear I tried that.... then again it was nearly midnight and I was at it from 9 am.... |
pushed it up. At a reasonably good state. just need to figure out where we are putting it before I do the docs. |
Thanks again gentlemen. I think I will make another repo under the org for Finbuckle.MultiTenant.MassTransit My only request would be that you provide enough detail in the readme that it be reasonably usable by others and provide a license and disclaimer that no support is provided. Also I don’t plan on publishing a nuget for it for now. Does that work for you? |
@AndrewTriesToCode sure, will need to make some changes so it sits outside of it. Main changes that will need to be made:
I'll host the docs within the repo and read me. If we decide to take it further and add it to Nuget we can put the docs in the finbuckle website :) |
Added the readme: Once the repo is created I'll fork it and move it over there :) |
I would love to use Finbuckle with MassTransit https://masstransit.io/.
The use case would be to offload tenant data processing to other microservices. To achieve this the tenant context would need to be passed and retrieved in background jobs and services through a Message Broker. Not via standard HTTP requests.
The following would be classed as acceptance criteria:
The way I am thinking this might be possible to achieve is through:
Something similar to the below might suffice for the middleware.
However I am unsure of the best way to implement a new strategy within Finbuckle as its not a web request and there is no authentication from an end user.
I plan on attempting to figure this out in the next few months.
The text was updated successfully, but these errors were encountered: