RabbitMQ lets you handle messages that didn’t send successfully, without resorting to full-on transactions. It provides this capability in the form of publisher confirms. Using publisher confirms requires just a couple of extra lines of C#.
If you’re publishing messages, you probably have a method that contains something like this:
using (var connection = FACTORY.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true); channel.QueueDeclare(QUEUE_NAME, true, false, false, null); channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>()); for (var i = 0; i < numberOfMessages; i++) { var message = String.Format("{0}\thello world", i); var payload = Encoding.Unicode.GetBytes(message); channel.BasicPublish(QUEUE_NAME, String.Empty, null, payload); } }
But you’re out of luck if you want:
- A guarantee that your message was safely preserved in the event that the broker goes down (i.e. written to disk)
- Acknowledgement from the broker that your message was received, and written to disk
For many use cases, you want these guarantees. Fortunately, getting them is relatively straightforward:
//Set the message to persist in the event of a broker shutdown var messageProperties = channel.CreateBasicProperties(); messageProperties.SetPersistent(true);
//Send an acknowledgement that the message was persisted to disk channel.BasicAcks += channel_BasicAcks; channel.ConfirmSelect(); //... //Begin loop channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload); channel.WaitForConfirmsOrDie(); //End loop
(You’ll have to implement event handlers for ack
s and nack
s.)
The difference between WaitForConfirms
and WaitForConfirmsOrDie
is not immediately obvious, but after digging through the Javadocs, it seems that WaitForConfirmsOrDie
will give you an IOException if a message is nack
‘d, whereas WaitForConfirms
won’t.
You’ll get an IllegalStateException if you try to use either variation of WaitForConfirm
s without first setting the Confirm
s property with ConfirmSelect
.
Here’s the complete code for getting an acknowledgement from the RabbitMQ broker, only after the broker has persisted the message to disk:
using (var connection = FACTORY.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true); channel.QueueDeclare(QUEUE_NAME, true, false, false, null); channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>()); channel.BasicAcks += channel_BasicAcks; channel.ConfirmSelect(); for (var i = 1; i <= numberOfMessages; i++) { var messageProperties = channel.CreateBasicProperties(); messageProperties.SetPersistent(true); var message = String.Format("{0}\thello world", i); var payload = Encoding.Unicode.GetBytes(message); Console.WriteLine("Sending message: " + message); channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload); channel.WaitForConfirmsOrDie(); } }