A quick look on MSMQ and transactions

A quick look on MSMQ and transactions
september 4, 2015 Marcus Parkkinen

Microsoft Message Queuing, or simply MSMQ, is a tool that I’ve been using as part of my current assignment in developing an integration solution for a customer. In short, MSMQ provides failsafe message based communication between or within applications through the usage of queues. An example where MSMQ can be useful is when multiple different applications running at different times need to communicate asynchronously. In such scenarios MSMQ may be used to facilitate error handling through built-in support for automatic retries, poison message handling, limited message life time and much more. In this blog post, I’ll show an example on how to set up MSMQ queues programmatically as well as how to read and write messages from and to these within transactions. We’ll create a very simple application primarily consisting of two different classes: a Sender and a Receiver. An instance of the Sender class will place messages on three different MSMQ queues, and these messages will in turn be read from the queues by an instance of the Receiver class.

Let’s look at how we could implement the Sender class:

public class Sender
{
    public void Send(MessageQueue messageQueue, string message,
        MessageQueueTransaction transaction)
    {
        try
        {
            var msg = new Message
            {
                UseAuthentication = false,
                Recoverable = true,
                Body = message
            };

            messageQueue.Send(msg, transaction);
        }
        finally
        {
            messageQueue.Close();
        }
    }
}

To keep things short, the Sender class contains a single method Send that takes a queue, a message as well as a transaction as arguments. The classes MessageQueue and MessageQueueTransaction are part of the System.Messaging assembly. As shown in line 15, we simply forward the transaction argument to the messageQueue.Send method. The result of this is that the message will not be committed to the queue before we explicitly commit the transaction, something that is done outside the scope of our Send method. This way, we may call Send multiple times and not have any messages committed to queues until we explicitly say so by calling the Commit method on the transaction object.

The Receiver class might in turn look something like this:

public class Receiver
{
    public void StartReading(List<MessageQueue> messageQueues)
    {
        foreach (MessageQueue messageQueue in messageQueues)
        {
            messageQueue.PeekCompleted += MessageAvailable;
            messageQueue.BeginPeek();
        }
    }

    private void MessageAvailable(object source,
        PeekCompletedEventArgs args)
    {
        var messageQueue = (MessageQueue)source;

        try
        {
            ReadMessage(messageQueue);
        }
        finally
        {
            messageQueue.Close();
        }

        // Continue reading from the queue
        messageQueue.BeginPeek();
    }

    private void ReadMessage(MessageQueue messageQueue)
    {
        // We interpret the queue messages as strings
        messageQueue.Formatter =
            new XmlMessageFormatter(new[] { typeof(string) });

        using (var transaction = new MessageQueueTransaction())
        {
            transaction.Begin();

            var message = messageQueue.Receive(transaction);
            var messageContents = message == null ?
                string.Empty : message.Body.ToString();

            Console.WriteLine(
                "Received message \"{0}\" from queue \"{1}\"",
                messageContents,
                messageQueue.QueueName);

            transaction.Commit();
        }
    }
}

As shown in the implementation of the StartReading method, we use event listeners to subscribe to any messages being placed on the supplied MSMQ queues. Whenever a message is sent to any of the provided queues in the messageQueues parameter, a call will be made to the MessageAvailable method. Each message is read and processed in the ReadMessage method that uses an instance of the MessageQueueTransaction class for transactional reads from the queue. As shown in line 40, we forward this transaction object to the messageQueue.Receive method call. This way we achieve a transactional read from the MSMQ queue which is not committed until we invoke transaction.Commit on line 49. Failing to commit our transaction, which for example could happen in case of an exception being raised before line 49, would result in the message being placed back on the queue again. This behaviour helps us ensure that our message is successfully processed before we actually consume it from the queue.

The final thing that remains after we’ve created our Sender and Receiver classes is to wrap everything up:

public class Program
{
    private static List<MessageQueue> CreateQueues(
        params string[] queuePaths)
    {
        return
            queuePaths.Select(
                queuePath =>
                    MessageQueue.Exists(queuePath)
                        ? new MessageQueue(queuePath)
                        : MessageQueue.Create(queuePath, true)
            ).ToList();
    }

    private static void Main(string[] args)
    {
        const string message = "Testing MSMQ";

        List<MessageQueue> queues = CreateQueues(
            ".\\private$\\FirstQueue",
            ".\\private$\\SecondQueue",
            ".\\private$\\ThirdQueue");

        var sender = new Sender();
        var receiver = new Receiver();

        /*
         * Send to all the queues within a transaction
         */
        using (var transaction = new MessageQueueTransaction())
        {
            transaction.Begin();

            foreach (MessageQueue queue in queues)
            {
                sender.Send(queue, message, transaction);
            }

            transaction.Commit();
        }

        /*
         * Read from all the queues
         */
        receiver.StartReading(queues);

        Console.ReadKey();
    }
}

We start by creating our MSMQ queues in the method CreateQueues. We identify each queue by a path string, which in all three cases above starts with ”.\\private$\\” as we only want to publish them locally on our computer. Each queue is created conditionally as we first invoke MessageQueue.Exists to check whether or not a queue with the provided path string already exists on the computer. In such a case we simply use the MessageQueue constructor to acquire a reference to the already existing queue. Otherwise we use MessageQueue.Create to create the queue, indicating that it should be transactional using the second parameter. Note that an exception will be thrown in this method in case the MSMQ Windows feature is not installed on the executing client. A guide on how to install this feature can be found here.

Next, we send a simple test message to all our queues. As our implementation uses transactional sends, we use an instance of the MessageQueueTransaction class for all calls to Sender.Send. Remember that no message will be submitted to any of the queues until we explicitly call the Commit method on the transaction object, something that we do on line 39. Omitting this line would result in no messages being sent to any of the queues.

That’s all we need to do in order to be able to run our example app. The screenshot below illustrates the produced output.

MSMQ_Output

All source code is uploaded and available on GitHub.

0 Kommentarer

Lämna ett svar

E-postadressen publiceras inte. Obligatoriska fält är märkta *

*

Denna webbplats använder Akismet för att förhindra skräppost. Läs mer om hur dina kommentarsuppgifter behandlas.