Publisher confirms with RabbitMQ and C#

RabbitMQ lets you handle messages that didn’t send successfully, without resorting to full-on transactions. It provides this capability in the form of publisher confirms. Using publisher confirms requires just a couple of extra lines of C#.

If you’re publishing messages, you probably have a method that contains something like this:

using (var connection = FACTORY.CreateConnection())
{
    var channel = connection.CreateModel();
    channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true);
    channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
    channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>());
 
    for (var i = 0; i < numberOfMessages; i++)
    {
        var message = String.Format("{0}\thello world", i);
        var payload = Encoding.Unicode.GetBytes(message);
        channel.BasicPublish(QUEUE_NAME, String.Empty, null, payload);
    }
}

 

But you’re out of luck if you want:

  1. A guarantee that your message was safely preserved in the event that the broker goes down (i.e. written to disk)
  2. Acknowledgement from the broker that your message was received, and written to disk

For many use cases, you want these guarantees. Fortunately, getting them is relatively straightforward:

//Set the message to persist in the event of a broker shutdown
var messageProperties = channel.CreateBasicProperties();
messageProperties.SetPersistent(true);

 

//Send an acknowledgement that the message was persisted to disk
channel.BasicAcks += channel_BasicAcks;
channel.ConfirmSelect();
 
//...
 
//Begin loop
channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload);
channel.WaitForConfirmsOrDie();
//End loop

 

(You’ll have to implement event handlers for acks and nacks.)

The difference between WaitForConfirms and WaitForConfirmsOrDie is not immediately obvious, but after digging through the Javadocs, it seems that WaitForConfirmsOrDie will give you an IOException if a message is nack‘d, whereas WaitForConfirms won’t.

You’ll get an IllegalStateException if you try to use either variation of WaitForConfirms without first setting the Confirms property with ConfirmSelect.

Here’s the complete code for getting an acknowledgement from the RabbitMQ broker, only after the broker has persisted the message to disk:

using (var connection = FACTORY.CreateConnection())
{
    var channel = connection.CreateModel();
    channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true);
    channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
    channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>());
    channel.BasicAcks += channel_BasicAcks;
    channel.ConfirmSelect();
 
    for (var i = 1; i <= numberOfMessages; i++)
    {
        var messageProperties = channel.CreateBasicProperties();
        messageProperties.SetPersistent(true);
 
        var message = String.Format("{0}\thello world", i);
        var payload = Encoding.Unicode.GetBytes(message);
        Console.WriteLine("Sending message: " + message);
        channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload);
        channel.WaitForConfirmsOrDie();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *