#StackBounty: #c# #design-patterns Anemic class that binds functionality

Bounty: 100

I have cleanly separated functionality of a message bus – namely the diagnostics (optional), sending, decoding and the bus driver, I need to now bind this together and have done so with a MessageBus class.

It feels anemic though, with a lot of pass through methods.

On the flip side it is necessary for coordinating StartAsync() which links dependencies on the driver and receiver (binding them so they can call back) – I saw no other way of achieving this if I registered all individual interfaces via an inversion of control container as it would get out of sync (how to bind the driver to the decoder for instance).

If this is the wrong place for comments on code please let me know and I will remove straight away. Any pointers much appreciated.

public class MessageBus : IDisposable
{
    private readonly MessageBusReceiver receiver;
    private readonly MessageBusPublisher publisher;
    private readonly IMessageBusDriver driver;
    private int isRunning;
    private readonly MessageBusDiagnostics diagnostics = null;

    public MessageBus(IMessageBusDriver driver, SerializerRegistry serializerRegistry, ITypeIdentifier typeIdentifier, ILoggerFactory loggerFactory, bool isDebug = false)
    {
        this.driver = driver;
        this.publisher = new MessageBusPublisher(serializerRegistry, driver, typeIdentifier);
        this.receiver = new MessageBusReceiver(serializerRegistry, typeIdentifier, loggerFactory.CreateLogger<MessageBusReceiver>());
        if (isDebug)
            diagnostics = new MessageBusDiagnostics(driver, loggerFactory.CreateLogger<MessageBusDiagnostics>());
    }

    void AssertIsRunning()
    {
        if (this.isRunning == 0)
            throw new InvalidOperationException($"{nameof(StartAsync)} must be called before calling other methods.");
    }

    public async Task SubscribeAsync(string topic, QualityOfService qos, CancellationToken cancellationToken = default)
    {
        AssertIsRunning();
        await this.driver.SubscribeAsync(topic, qos, null, null, null, cancellationToken);
    }

    public async Task UnsubscribeAsync(string topic, CancellationToken cancellationToken = default)
    {
        AssertIsRunning();
        await this.driver.UnsubscribeAsync(topic, cancellationToken);
    }

    public async Task StartAsync(Func<object, Task> receiveApplicationMessage)
    {
        if (Interlocked.CompareExchange(ref this.isRunning, 1, 0) == 1)
            throw new InvalidOperationException("Already running.");

        this.receiver.OnMessageDecoded = receiveApplicationMessage;
        await this.driver.StartAsync(this.receiver.ReceiveMessageFromMessageBusDriver);
    }

    public async Task StopAsync()
    {
        if (Interlocked.CompareExchange(ref this.isRunning, 0, 1) == 0)
            throw new InvalidOperationException("Not running.");

        await this.driver.StopAsync();
    }

    public async Task PublishAsync(object notification, string busPath, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default)
    {
        AssertIsRunning();
        await this.publisher.PublishAsync(notification, busPath, qos, cancellationToken);
    }

    public async Task PublishAsync(object notification, Dictionary<string, string[]> pathTokens = null, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default)
    {
        AssertIsRunning();
        await this.publisher.PublishAsync(notification, pathTokens, qos, cancellationToken);
    }

    public void Dispose()
    {
        this.driver.Dispose();
        this.diagnostics?.Dispose();
    }
}


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.