I managed to solve the MassTransit dependency problem in a fairly simple way without overly complex generics or runtime emits.
In the example below we are using the following Nuget packages: MassTransit, MassTransit.RabbitMq, MassTransit.Ninject
Example
Our Domain's Simple Consumer (Assembly: OurDomain.Core)
Our Core assembly has no reference or dependency on MassTransit. Furthermore, any other assemblies that want to implement a consumer will not need reference to MassTransit either.
public interface IConsume<T> where T : MessageBase
{
int RetryAllotment { get; }
void Consume(T message);
}
public class ExampleConsumer : IConsume<ExampleMessage>
{
public int RetryAllotment { get { return 3; } }
public void Consume(ExampleMessage message){...}
}
Internal MassTransitConsumer (Assembly: OurDomain.Messaging)
The Messaging assembly is the only assembly with reference to MassTransit. It also has reference to the Core assembly
The purpose of this class is to simply provide a wrapper over our domain's consumers that can be used to hook up a MassTransit subscription
internal class MassTransitConsumer<TMessage, TConsumer> : Consumes<TMessage>.Context
where TMessage : MessageBase
where TConsumer : IConsume<TMessage>
{
private readonly TConsumer _consumer;
public MassTransitConsumer(TConsumer consumer)
{
_consumer = consumer;
}
public void Consume(IConsumeContext<TMessage> ctx)
{
try
{
_consumer.Consume(ctx.Message);
}
catch
{
if (ctx.RetryCount >= _consumer.RetryAllotment)
throw;
ctx.RetryLater();
}
}
}
Configurator (Assembly: OurDomain.Messaging)
Pay special attention to the GetMassTransitConsumer method
public class MassTransitConfigurator
{
private readonly string _rabbitMqConnection;
private readonly Queue _queue;
private readonly IKernel _kernel;
private readonly IEnumerable<Type> _consumers;
private readonly IEnumerable<Type> _massTransitConsumers;
public enum Queue
{
Worker,
WorkerProducer,
}
public MassTransitConfigurator(string rabbitMqConnection, Queue queue, IKernel kernel,
IEnumerable<Type> consumers)
{
_rabbitMqConnection = rabbitMqConnection;
_queue = queue;
_kernel = kernel;
_consumers = consumers;
if (_queue == Queue.Worker)
_massTransitConsumers = _consumers.Select(GetMassTransitConsumer);
}
private static Type GetMassTransitConsumer(Type domainConsumer)
{
var interfaceType = domainConsumer.GetInterface(typeof (IConsume<>).Name);
if (interfaceType == null)
throw new ArgumentException("All consumers must implement IConsume<>.", "domainConsumer");
var messageType = interfaceType.GetGenericArguments().First();
var massTransitConsumer = typeof (MassTransitConsumer<,>).MakeGenericType(messageType, domainConsumer);
return massTransitConsumer;
}
public void Configure()
{
if (_queue == Queue.Worker)
{
foreach (var consumer in _consumers)
_kernel.Bind(consumer).ToSelf();
foreach (var massTransitConsumer in _massTransitConsumers)
_kernel.Bind(massTransitConsumer).ToSelf();
}
var massTransitServiceBus = ServiceBusFactory.New(ConfigureMassTransit);
var ourServiceBus = new MassTransitServiceBus(massTransitServiceBus);
_kernel.Bind<OurDomain.IServiceBus>().ToConstant(ourServiceBus);
}
private void ConfigureMassTransit(ServiceBusConfigurator config)
{
config.UseRabbitMq();
var queueConnection = _rabbitMqConnection + "/" + _queue;
config.ReceiveFrom(queueConnection);
if (_queue == Queue.Worker)
{
foreach (var massTransitConsumer in _massTransitConsumers)
{
var consumerType = massTransitConsumer;
config.Subscribe(s => s.Consumer(consumerType, t => _kernel.Get(t)));
}
}
}
}
Example Program for Subscribing (Assembly: OurDomain.Services.Worker)
The Worker Assembly has reference to the Core assembly and whatever assemblies our IConsume implementations happen to be defined in.
However, it DOES NOT have reference to MassTransit. Instead, it references our Messaging assembly which internally hooks up MassTransit.
internal class Program
{
private static void Main(string[] args)
{
var kernel = new StandardKernel();
var rabbitMqConnection = "rabbitmq://localhost";
var consumers = new[] { typeof(ExampleConsumer) };
var configurator = new MassTransitConfigurator(
rabbitMqConnection,
MassTransitConfigurator.Queue.Worker,
kernel,
consumers);
configurator.Configure();
}
}