Azure Service Bus Queue – Sessions

In scenarios where there are multiple Azure Service Bus clients and you want to route particular messages to particular consumers, sessions are an ideal feature of the Azure Service Bus Queue. This post builds upon the sample project available on MSDN: Getting Started: Messaging with Queues.

Setup

Install either the MSDN sample project or create a new project using the SDK Template shown below:
GettingStartedTemplate
This gives us a great starting point to look at sessions.

Scenario

To illustrate sessions, three messages will be posted to the service bus. Two will have the same session id and the third will have a different session id. We will then use two clients to consume the messages. There are many scenarios where this technique could be useful. Here are three examples:

  • Priority – A low priority queue and a high priority queue could be created where the high priority items are worked before the low priority queue items.
  • Partitioning – The distribution of the queue items across multiple message brokers.
  • Aggregation – Routing messages to specific message brokers

Code

The first step is to set the queue to require sessions when being created:

private static void CreateQueue()
{
	NamespaceManager namespaceManager = NamespaceManager.Create();
 
	Console.WriteLine("\nCreating Queue '{0}'...", QueueName);
 
	// Delete if exists
	if (namespaceManager.QueueExists(QueueName))
	{
		namespaceManager.DeleteQueue(QueueName);
	}
 
	var description = new QueueDescription(QueueName)
	{
		RequiresSession = true
	};
 
	namespaceManager.CreateQueue(description);
}

When messages are sent to the service bus, a specific session is required to be specified.

private static BrokeredMessage CreateSampleMessage(string messageId, string messageBody, string sessionId)
{
	BrokeredMessage message = new BrokeredMessage(messageBody);
	message.MessageId = messageId;
	message.SessionId = sessionId;
	return message;
}

This post illustrates two ways to consume the messages. The first creates a queue client per client:

private static void ReceiveMessages()
{
	Console.WriteLine("\nReceiving message from Queue...");
	BrokeredMessage message = null;
 
	var sessions = queueClient.GetMessageSessions();
 
	foreach (var browser in sessions)
	{
		Console.WriteLine(string.Format("Session discovered: Id = {0}", browser.SessionId));
		var session = queueClient.AcceptMessageSession(browser.SessionId);                
 
		while (true)
		{
			try
			{
				message = session.Receive(TimeSpan.FromSeconds(5));
 
				if (message != null)
				{
					Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
 
					// Further custom message processing could go here...
					message.Complete();
				}
				else
				{
					//no more messages in the queue
					break;
				}
			}
			catch (MessagingException e)
			{
				if (!e.IsTransient)
				{
					Console.WriteLine(e.Message);
					throw;
				}
				else
				{
					HandleTransientErrors(e);
				}
			}
		}
	}
	queueClient.Close();
}

The second uses an implementation of IMessageSessionHandler:

public class MyMessageSessionHandler : IMessageSessionHandler
{
	private string WhoAmI = Guid.NewGuid().ToString().Substring(0, 4);
 
	public void OnCloseSession(MessageSession session)
	{
		Console.WriteLine(string.Format("MyMessageSessionHandler {1} close session Id = {0}", session.SessionId, WhoAmI));
	}
 
	public void OnMessage(MessageSession session, BrokeredMessage message)
	{
		Console.WriteLine(string.Format("MyMessageSessionHandler {3} received messaged on session Id = {0}, Id = {1}, Body = {2}", session.SessionId, message.MessageId, message.GetBody<string>(), WhoAmI));
 
		message.Complete();
	}
 
	public void OnSessionLost(Exception exception)
	{
		Console.WriteLine(string.Format("MyMessageSessionHandler {1} OnSessionLost {0}", exception.Message, WhoAmI));
	}
}

To register the message handler register using the queueclient:

queueClient.RegisterSessionHandler(typeof(MyMessageSessionHandler), new SessionHandlerOptions { AutoComplete = false });

The completed project has been uploaded onto MSDN here.

This entry was posted in Azure, C#, Service Bus and tagged , , , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>