Masstransit Filter with IoC injection/Database

1.7k views Asked by At

I have to validate incoming message before passing it to my consumer. To do it, I need to request some data from the database.

Following the tutorials, I created extension method to apply my specification + filter to the consumer pipe. Something like this:

public static void UseArticlesImportValidation(this IConsumePipeConfigurator configurator){}

public class ArticlesImportValidationSpecification : IPipeSpecification<ConsumeContext<ArticlesImportNotification>>

and the Filter

public class ArticlesImportValidationFilter : IFilter<ConsumeContext<ArticlesImportNotification>>

Everything looks good, but I want to injection some business services in my Filter to reuse some functionality + DAL services. This works completely fine for my Consumer using Autofac extension method builder.RegisterConsumers(Assembly.GetExecutingAssembly());.

Should I use middleware for this at all? Any suggestions?

2

There are 2 answers

5
Alexey Zimarev On BEST ANSWER

You need to have the built container in your specification (it is easy to pass as a parameter when you call AddPipeSpecification and then in the specification:

public void Apply(IPipeBuilder<T> builder)
{
    builder.AddFilter(new ArticlesImportValidationFilter(container.Resolve<IDataAccessSomethingUseful>()));
}

But I would validate in the consumer or, if you want to keep them separate, have one consumer to validate and send the next message to do the actual job.

0
Vospitannikov On

You should use Scoped Filters in this situation

Let's say you have a filter with dependency

public class MyConsumeFilter<T> :
    IFilter<ConsumeContext<T>>
    where T : class
{
    public MyConsumeFilter(IMyDependency dependency) { }
      
    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { }
      
    public void Probe(ProbeContext context) { }
}

you should register this dependency in your DI container

services.AddScoped<IMyDependency, MyDependency>(); //register dependency

And now you are ready to add this filter to the consumer pipe by calling UseConsumeFilter method:

services.AddConsumer<MyConsumer>();
services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.ReceiveEndpoint("input-queue", e =>
                {
                    e.UseConsumeFilter(typeof(MyConsumeFilter<>), context); //generic filter

                    e.ConfigureConsumer<MyConsumer>();
                });
            });
        });