#StackBounty: #debian #systemd #rabbitmq How to make RabbitMQ start and stop faster

Bounty: 50

On Debian 10, service rabbitmq-server start takes 10 seconds; service rabbitmq-server stop takes 5 seconds.

This is for a Docker development environment where load is extremely low and start/stop speed is more important than RabbitMQ performance or reliability. Stopping and re-starting the container takes 15 seconds because of this.

How can I make it faster? I added log.default.level = debug to rabbitmq.conf in case it would show me what it’s doing during startup, but it didn’t give me anything useful.


Get this bounty!!!

#StackBounty: #javascript #node.js #rabbitmq #message-queue #amqp How to remove particular messages in rabbitmq before publishing new m…

Bounty: 50

I have a subscriber which pushes data into queues. Now the messages looks this

{
 "Content": {
   "_id" ""5ceya67bbsbag3",
   "dataset": { 
     "upper": {},
      "lower": {}

}
}

Now a new message can be pushed with same content id but data will be different. So in that i want to delete the old message with same id or replaece the message those id is same & retain only latest message.

I have not found a direct solution for this in rabbitmq. Please guide me how we can do this ?

I have already gone through some posts.

Post 1

Post 2


Get this bounty!!!

#StackBounty: #events #asp.net-core #dependency-injection #rabbitmq #multi-tenant Get Tenant Id from RabbitMq Message for Db Connection

Bounty: 50

I have a microservice architecture with ASP.Net Core applications and RabbitMq as the event bus between the microservices.
I also want to support multi tenancy.
So I have following dependency injection service defined in the Startup.cs to open a connection to the Database on every request based on the user’s tenant id.

services.AddScoped<IDocumentSession>(ds =>
            {
                var store = ds.GetRequiredService<IDocumentStore>();
                var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
                var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
                return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
            });

The problem is when the service processes an event bus message (like UserUpdatedEvent).
In that case when it tries to open the Db connection, it obviously does not have the user information from the http context.

How do I send/access the tenant id of the respective user when injecting the scoped service and processing an event with RabbitMq?


Get this bounty!!!

#StackBounty: #c# #queue #asp.net-web-api #rabbitmq Using RabbitMQ in asp.net web api

Bounty: 50

Use case :

I have a Order web Api which creates an orders and publish the order to messaging queue. Notification service (web api 2) consumes this message by subscribing to the messaging queue and notify the user.

I’m newbie in messaging queue
I’m using cloudAMQP as queue service.

Order Service where i publish the event

public class MessageQueueConfig
{
    public static IModel Channel { get; private set; }
    public static IConnection Connection { get; private set; }
    const string ExchangeName = "Notifications";
    public const string CreatedRoutingKey = "WorkOrderCreated";
    public const string UpdatedRoutingKey = "WorkOrderUpdated";

    public static void Register()
    {
        var factory = new ConnectionFactory()
        {
            Uri = new Uri(@"amqp://myurl"),
        };
        factory.RequestedHeartbeat = 5;
        Connection = factory.CreateConnection();
        Channel = Connection.CreateModel();
        Channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Topic, durable: true);
    }

    public static void Publish(string routingKey, object body)
    {
        var byteBody = MemorySerializer.ObjectToByteArray(body);
        if (Connection.IsOpen)
        {    
            Channel.BasicPublish(exchange: ExchangeName,
                    routingKey: routingKey, basicProperties: null,
                    body: byteBody);
        }
    }
}

Order Service Global.asax.cs

protected void Application_Start()
{
  FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
  //web api config
  MessageQueueConfig.Register(); //where i start the queue.
}

Order Service Controller

[HttpPost]
public async Task<IHttpActionResult> InsertOrder(Order Parms)
{
   try
   {

    var jobNumber = await _WorkOrderB.InsertOrder(Parms, passCode);
    ResponseObj responseobj = new ResponseObj();               
    var msg= new MyQueueClass{ //some data};
    MessageQueueConfig.Publish(MessageQueueConfig.CreatedRoutingKey, msg);
    responseobj.Result = GeneralDTO.SuccessStatus;
    responseobj.ResponseData = jobNumber;
    return Ok(responseobj);
   }
   catch (Exception exception)
   {
     return InternalServerError(exception);
   }
}

Notification Service where i consume the events

public class MessageQueueConfig
{
    public static IModel Channel { get; private set; }
    public static IConnection Connection { get; private set; }
    const string ExchangeName = "Notifications";
    const string CreatedRoutingKey = "WorkOrderCreated";
    const string UpdatedRoutingKey = "WorkOrderUpdated";
    static List<string> RoutingKeys = new List<string>
    {
        "WorkOrderCreated",
        "WorkOrderUpdated"
    };

    public static void RegisterAndSubscribe()
    {
       var factory = new ConnectionFactory()
       {
            Uri = new Uri(@"amqp://myurl"),
       };
       factory.RequestedHeartbeat = 5;
       Connection = factory.CreateConnection();
       Channel = Connection.CreateModel();
       Channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Topic, durable: true);
       var queueName = Channel.QueueDeclare().QueueName;

       foreach (var routingKey in RoutingKeys)
        {
            Channel.QueueBind(queue: queueName,
                          exchange: ExchangeName,
                          routingKey: routingKey);
        }

        var consumer = new EventingBasicConsumer(Channel);
        consumer.Received += Consumer_Received;
        Channel.BasicConsume(queue: queueName,
                             autoAck: true,
                             consumer: consumer);
  }
 async static void Consumer_Received(object sender, BasicDeliverEventArgs e)
    {
        switch (routingKey)
        {
            case CreatedRoutingKey:                   
                await ProcessWorkOrderCreated(e.Body);
                break;
            case UpdatedRoutingKey:
                await ProcessWorkOrderUpdated(e.Body);
                break;
            default:
                break;
        }

Notifcation Service Global.ascx.cx

protected void Application_Start()
{
    //Web api Conifg
    BundleConfig.RegisterBundles(BundleTable.Bundles);
    MessageQueueConfig.RegisterAndSubscribe();
}

Questions

  1. I would like to know any potential issue for this architecture ?
  2. What should be ideal factory.RequestedHeartbeat = 5; ?
  3. What would happen if my notification service (consumer) goes down ?
    i have put my queue as durable= true assuming it would still available when my consumer is back online.
  4. I haven’t disposed them, Is Global.asxs.cs public void Dispose() is the right place to Close the rabbitmq connection ?

Global.asxs.cs

public override void Dispose()
  {
      MessageQueueConfig.Dispose();
      base.Dispose();
  }

Note: This code is running dev environment, i’m planning to release this next week to production


Get this bounty!!!