This project is read-only.

Messages and requests

Jul 25, 2009 at 5:07 AM

 

Hi,
Here's my feeling about a messages bus:
1) from a sender point of view, there is no difference between sending a message and publishing it.
  A message is just a class containing data that can be sent to a bus.
  The bus must be preconfigured to be able to resolve the destination of the message which can
  be routed to either a regular queue or a topic (publish/subscribe). Note that this is where 
  ActiveMQ shines as it handles both of them natively.
  
2) I prefer to be able to configure the bus using a fluent interface, instead of relying on xml,
  and also benefit from strongly typed message mappings.
  
3) a bus should be able to send a list of messages as a transaction, which can either succeed or fail as 
  a whole. Again ActiveMQ supports this natively, either when sending or receiving messages.
4) the request/response pattern is so common that it must be properly supported by the framework.  
I will now detail each of the previous points.
1) Sending messages.
Here's how I've abstracted the notion of send in my framework, as I'm building an extra layer to avoid taking 
a strong dependency on any message bus framework.
I have only 2 methods one for sending a list of messages using a fire and forget mode, 
and one for sending a request and expecting a response within a specific time frame.
Note that I named the send request method 'Ask' to make it obvious that we expect a response.
Note also that I didn't introduce any generic signature in this interface to make it as simple as possible.
Instead I've added overloads as extension methods (see following class) in the same namespace to make
them available more easily.
    /// <summary>
    /// Service that allows one to send a message to a message bus.
    /// </summary>
    /// <remarks>
    /// Messages are simple classes that must be serializable.
    /// The destination of a message is not known by the message, instead it will be configured within the message bus where we'll map 
    /// a message type to specific destination which can be a queue or a topic (publish/subscribe).
    /// </remarks>
    public interface IMessagingService {
        /// <summary>
        /// Sends a request and waits for a response during a specific time interval.
        /// </summary>
        /// <param name="request">the request to send</param>
        /// <param name="timeout">the period to wait for the response</param>
        /// <returns>the response of the submitted <paramref name="request"/> or null if the <paramref name="timeout"/> expired</returns>
        Object Ask(Object request, TimeSpan timeout);
        /// <summary>
        /// Sends a list of messages
        /// </summary>
        /// <param name="messages">the messages to send</param>
        void Send(IEnumerable<Object> messages);
    }
public static class MessagingServiceExtensions {
        /// <summary>
        /// Sends a request and waits for a response during a specific time interval.
        /// </summary>
        /// <typeparam name="TResponse">the type of the expected response</typeparam>
        /// <param name="messaging">the messaging service</param>
        /// <param name="request">the request to send</param>
        /// <param name="timeout">the period to wait for the response</param>
        /// <returns>the response of the submitted <paramref name="request"/> or null if the <paramref name="timeout"/> expired</returns>
        /// <exception cref="ArgumentNullException">messaging is null</exception>
        public static TResponse Ask<TResponse>(this IMessagingService messaging, IMessageRequest<TResponse> request, TimeSpan timeout) {
            if(messaging == null)
                throw new ArgumentNullException("messaging");
            return (TResponse)messaging.Ask((Object)request, timeout);
        }
        /// <summary>
        /// Sends a list of messages
        /// </summary>
        /// <param name="messaging">the messaging service</param>
        /// <param name="messages">the messages to send</param>
        /// <exception cref="ArgumentNullException">messaging is null</exception>
        public static void Send(this IMessagingService messaging, params Object[] messages) {
            if(messaging == null)
                throw new ArgumentNullException("messaging");
            messaging.Send(messages);
        }
}
2) For a fluent interface, here's an example of how I configure the bus:
var configurator = new ActiveMQConfigurator("ENDPOINT_ID", "EndpointMainQueueName")
.ConnectTo("tcp://localhost:61616")
.UseCredentials("myUserName", "my Password")
.EnableIgnoreConnectionErrorsOnStartup()
.SetupDefaultConsumerPool(x => {
x.WorkerCount = 5;
x.ConsumeQueue("SomeQueue"); // a second queue, besides "EndpointMainQueueName"
})
.SetupConsumerPool("pool#2", x => {
x.WorkerCount = 3;
x.ConsumeQueue("AnotherQueue1");
x.ConsumeQueue("AnotherQueue2");
x.Subscribe<MyTopicMessage>();
x.Subscribe<MyTopicMessage>("My selector"); // in the future I want to use Linq to generate the selector
x.Subscribe("MyTopic.MySubTopic1");
x.Subscribe("MyTopic.MySubTopic2", "My Selector");
})
.SetupMessageHandlers(x => {
x.Register(GetType().Assembly);
x.Register<TestPublicationMessageHandler>();
x.Register(typeof(TestPublicationMessageHandler));
})
.SetupMessageDestinations(x => {
x.Send<TestPublicationMessage>("ManagedClient");
x.Send(typeof(TestPublicationMessage), "ManagedClient");
x.Publish<TestPublicationMessage>(); // will build a topic name by walking the class hierarchy and detecting optional custom attributes.
x.Publish(typeof(TestPublicationMessage)); // will build a topic name by walking the class hierarchy and detecting optional custom attributes.
x.Publish<TestPublicationMessage>("MyMainTopic.MySubTopic");
x.Publish(typeof(TestPublicationMessage), "MyMainTopic.MySubTopic");
})
;
endpoint = configurator.CreateEndpoint();
So, by doing this configuration I'm using a single connection to ActiveMQ and WorkerCount sessions 
per ConsumerPool. Each consumer pool can listen to any number of queues or topics. The received
messages will be dispatched among the available WorkerCount threads that are created for each
ActiveMQ session.
So, if you only have a DefaultConsumerPool with a single worker, you'll be using only 2 threads,
one for receiving data from the ActiveMQ broker and another one for the single session used by the pool.
Then you'll have one MessageConsumer per queue/topic.
Also, subscribing to or publishing a message should avoid using hardcoded strings if possible.
Instead I'll walk the class hierarchy of the message and use the class name as a topic element,
and check for a custom attribute that would decorate the class in order to use any other string 
instead of the class name.
I also want to support message selectors that allow ActiveMQ to filter within the broker the messages 
that your consumer will receive based either on message attributes or on an xpath expression if your
message contains xml. What would be really cool is to use a Linq expression in order to build such
an expression in a strongly typed and easy way:
  x.Subscribe<StockAlert>(m => m.Company == "Microsoft");
You can create as many consumer pools as you want. This allows you to dedicate a specific pool of 
worker threads to handle a list of queues/topics, using a dedicated acknowledgment (user ack or 
transacted).
3) To send multiple messages, I need at least a signature that takes an enumeration of messages
instead of a single one.
Another option, would be to create a messaging scope like this:
interface IMessagingScope: IDisposable {
  void Send(Object message); 
  void Commit();
  void Rollback();
}
the Send method would resolve immediatly the destination of the message in order to throw an exception
at the place where the message is beeing send and understand who's sending it.
Also the message should be accumulated within the scope.
Rollback would flag the scope as rollbacked and would override any further commit.
Commit would actually send the accumulated messages using the precalculated destinations.
The send should happen through the message pipeline.
Dispose could throw an exception if you didn't call Commit or Rollback, to avoid forgetting it.
4) Supporting requests as top citizen.
I want to strongly type my request messages and enforce the type of the response message like this:
   /// <summary>
    /// marker for any message class that should send a response back.
    /// </summary>
    public interface IMessageRequest {
    }
    /// <summary>
    /// Strongly typed marker for any message class that should send a response back.
    /// </summary>
    /// <typeparam name="TResponse">the type of the expected response</typeparam>
    public interface IMessageRequest<TResponse> : IMessageRequest {
    }
public class EchoRequest : IMessageRequest<EchoResponse> {
 public String Text { get; set; }
}
public class EchoResponse  {
 public String Text { get; set; }
}
This allows me to write:
var request = new EchoRequest { Text = "hello world" };
var response = MessagingService.Ask(request, TimeSpan.FromSeconds(10)); // inferred as EchoResponse (see extension method for Ask)
                   // public static TResponse Ask<TResponse>(this IMessagingService messaging, IMessageRequest<TResponse> request, TimeSpan timeout) {
Console.WriteLine(response.Text);
    Now that we have a strongly typed request, it is easy, and very compelling, to provide a strongly typed
    request handler that will enforce the return type of the response and avoid the developer to know
    how to reply to a message (through context.Endpoint.MessageBus.Reply).
    Also, I will throw an exception if the handler returns null or I can catch the exception
    and return a specific error message that the send request implementation will intercept and
    rethrow as a local exception.
Btw - I don't really like sending a specific message containing an Error code and checking it
at the other side. That's why we introduced exceptions many years ago ;-)
    /// <summary>
    /// A handler that knows how to respond to a submitted request.
    /// </summary>
    /// <typeparam name="TRequest">the type of the request to process</typeparam>
    /// <typeparam name="TResponse">the type of the response to send after processing of the request</typeparam>
    public interface IMessageRequestHandler<TRequest, TResponse>
        where TRequest: IMessageRequest<TResponse> {
        /// <summary>
        /// Processes the submitted <paramref name="request"/> and return a response of type <typeparamref name="TResponse"/>
        /// </summary>
        /// <param name="request">the request to process</param>
        /// <param name="context">the context of the <paramref name="request"/></param>
        /// <returns>an instance of a <typeparamref name="TResponse"/></returns>
        TResponse Handle(TRequest request, IMessageContext context);
    }
So, the echo handler would be:
class EchoRequestHandler : IMessageRequestHandler<EchoRequest, EchoResponse> {
  public EchoResponse Handler(EchoRequest request, IMessageContext context) {
    return new EchoResponse { Text = request.Text };
  }
}
    So far, I manager to create a message handler adapters to support this scenario without altering
your framework.
A final note regarding requests: for me a request should provide a response within a short amount of 
time (the timeout) and block the code until it either comes back or times out.
We should not be able to register a callback as it might introduce memory leaks if the response never
comes back or keep resources too long in memory. If the response is expected to take a long time
then the developer should change his strategy and simply consume a message queue, and thus make it
asynchronous and reuse the messaging infrastructure.
Also, requests should not be persistant and they should use the expiration feature to make it expire slightly
before the request timeout, as requests are just a disguised RPC feature.

 

Hi,

Sorry to answer your question late but I've been busy.

Here are my feelings about the way a messages bus should work, from a developer perspective:

 

1) from a sender point of view, there is no difference between sending a message and publishing it.

  A message is just a class containing data that can be sent to a bus.

  The bus must be preconfigured to be able to resolve the destination of the message which can  be routed to either a regular queue or a topic (publish/subscribe). Note that this is where ActiveMQ shines as it handles both of them natively.

2) I prefer to be able to configure the bus using a fluent interface, instead of relying on xml,  and also benefit from strongly typed message mappings.

3) a bus should be able to send a list of messages as a transaction, which can either succeed or fail as  a whole. Again ActiveMQ supports this natively, either when sending or receiving messages.

4) the request/response pattern is so common that it must be properly supported by the framework.  

 

I will now detail each of the previous points.

 

1) Sending messages.

Here's how I've abstracted the notion of send in my framework, as I'm building an extra layer to avoid taking a strong dependency on any message bus framework.

I have only 2 methods one for sending a list of messages using a fire and forget mode, and one for sending a request and expecting a response within a specific time frame.

Note that I named the send request method 'Ask' to make it obvious that we expect a response.

Note also that I didn't introduce any generic signature in this interface to make it as simple as possible.

Instead I've added overloads as extension methods (see following class) in the same namespace to make them available more easily.

 

 

    /// <summary>
    /// Service that allows one to send a message to a message bus.
    /// </summary>
    /// <remarks>
    /// Messages are simple classes that must be serializable.
    /// The destination of a message is not known by the message, instead it will be configured within the message bus where we'll map 
    /// a message type to specific destination which can be a queue or a topic (publish/subscribe).
    /// </remarks>
    public interface IMessagingService {
        /// <summary>
        /// Sends a request and waits for a response during a specific time interval.
        /// </summary>
        /// <param name="request">the request to send</param>
        /// <param name="timeout">the period to wait for the response</param>
        /// <returns>the response of the submitted <paramref name="request"/> or null if the <paramref name="timeout"/> expired</returns>
        Object Ask(Object request, TimeSpan timeout);

        /// <summary>
        /// Sends a list of messages
        /// </summary>
        /// <param name="messages">the messages to send</param>
        void Send(IEnumerable<Object> messages);
    }

public static class MessagingServiceExtensions {

        /// <summary>
        /// Sends a request and waits for a response during a specific time interval.
        /// </summary>
        /// <typeparam name="TResponse">the type of the expected response</typeparam>
        /// <param name="messaging">the messaging service</param>
        /// <param name="request">the request to send</param>
        /// <param name="timeout">the period to wait for the response</param>
        /// <returns>the response of the submitted <paramref name="request"/> or null if the <paramref name="timeout"/> expired</returns>
        /// <exception cref="ArgumentNullException">messaging is null</exception>
        public static TResponse Ask<TResponse>(this IMessagingService messaging, IMessageRequest<TResponse> request, TimeSpan timeout) {
            if(messaging == null)
                throw new ArgumentNullException("messaging");
            return (TResponse)messaging.Ask((Object)request, timeout);
        }

        /// <summary>
        /// Sends a list of messages
        /// </summary>
        /// <param name="messaging">the messaging service</param>
        /// <param name="messages">the messages to send</param>
        /// <exception cref="ArgumentNullException">messaging is null</exception>
        public static void Send(this IMessagingService messaging, params Object[] messages) {
            if(messaging == null)
                throw new ArgumentNullException("messaging");
            messaging.Send(messages);
        }
}

 

 

2) For a fluent interface, here's an example of how I configure the bus:

 

	var configurator = new ActiveMQConfigurator("ENDPOINT_ID", "EndpointMainQueueName")
			.ConnectTo("tcp://localhost:61616")
			.UseCredentials("myUserName", "my Password")
			.EnableIgnoreConnectionErrorsOnStartup()
			.SetupDefaultConsumerPool(x => {
				x.WorkerCount = 5;
				x.ConsumeQueue("SomeQueue"); // a second queue, besides "EndpointMainQueueName"
			})
			.SetupConsumerPool("pool#2", x => {
				x.WorkerCount = 3;
				x.ConsumeQueue("AnotherQueue1");
				x.ConsumeQueue("AnotherQueue2");
				x.Subscribe<MyTopicMessage>();
				x.Subscribe<MyTopicMessage>("My selector"); // in the future I want to use Linq to generate the selector
				x.Subscribe("MyTopic.MySubTopic1");
				x.Subscribe("MyTopic.MySubTopic2", "My Selector");
			})
			.SetupMessageHandlers(x => {
				x.Register(GetType().Assembly);
				x.Register<TestPublicationMessageHandler>();
				x.Register(typeof(TestPublicationMessageHandler));
			})
			.SetupMessageDestinations(x => {
				x.Send<TestPublicationMessage>("ManagedClient");
				x.Send(typeof(TestPublicationMessage), "ManagedClient");
				x.Publish<TestPublicationMessage>(); // will build a topic name by walking the class hierarchy and detecting optional custom attributes.
				x.Publish(typeof(TestPublicationMessage)); // will build a topic name by walking the class hierarchy and detecting optional custom attributes.
				x.Publish<TestPublicationMessage>("MyMainTopic.MySubTopic");
				x.Publish(typeof(TestPublicationMessage), "MyMainTopic.MySubTopic");
			})
			;

	endpoint = configurator.CreateEndpoint();

So, by doing this configuration I'm using a single connection to ActiveMQ and WorkerCount sessions per ConsumerPool. Each consumer pool can listen to any number of queues or topics. The received

messages will be dispatched among the available WorkerCount threads that are created for each ActiveMQ session.

So, if you only have a DefaultConsumerPool with a single worker, you'll be using only 2 threads, one for receiving data from the ActiveMQ broker and another one for the single session used by the pool.

Then you'll have one MessageConsumer per queue/topic.

Also, subscribing to or publishing a message should avoid using hardcoded strings when possible.

Instead I'll walk the class hierarchy of the message and use the class name as a topic element, and check for a custom attribute that would decorate the class in order to use any other string 

instead of the class name.

I also want to support message selectors that allow ActiveMQ to filter within the broker the messages that your consumer will receive based either on message attributes or on an xpath expression if your

message contains xml. What would be really cool is to use a Linq expression in order to build such an expression in a strongly typed and easy way:

 

  x.Subscribe<StockAlert>(m => m.Company == "Microsoft");

You can create as many consumer pools as you want. This allows you to dedicate a specific pool of worker threads to handle a list of queues/topics, using a dedicated acknowledgment (user ack or 

transacted).

 

3) To send multiple messages, I need at least a signature that takes an enumeration of messages instead of a single one.

Another option, would be to create a messaging scope like this:

 

 

interface IMessagingScope: IDisposable {
  void Send(Object message); 
  void Commit();
  void Rollback();
}

the Send method would resolve immediatly the destination of the message in order to throw an exception at the place where the message is beeing send and understand who's sending it.

Also the message should be accumulated within the scope.

Rollback would flag the scope as rollbacked and would override any further commit.

Commit would actually send the accumulated messages using the precalculated destinations.

The send should happen through the message pipeline.

Dispose could throw an exception if you didn't call Commit or Rollback, to avoid forgetting it.

 

4) Supporting requests as a top citizen.

 

I want to strongly type my request messages and enforce the type of the response message like this:

 

 

    /// <summary>
    /// marker for any message class that should send a response back.
    /// </summary>
    public interface IMessageRequest {
    }

    /// <summary>
    /// Strongly typed marker for any message class that should send a response back.
    /// </summary>
    /// <typeparam name="TResponse">the type of the expected response</typeparam>
    public interface IMessageRequest<TResponse> : IMessageRequest {
    }

    public class EchoRequest : IMessageRequest<EchoResponse> {
      public String Text { get; set; }
    }
    public class EchoResponse  {
      public String Text { get; set; }
    }

This allows me to write:

 

	var request = new EchoRequest { Text = "hello world" };
	var response = MessagingService.Ask(request, TimeSpan.FromSeconds(10)); // inferred as EchoResponse (see extension method for Ask)
                   // public static TResponse Ask<TResponse>(this IMessagingService messaging, IMessageRequest<TResponse> request, TimeSpan timeout) {
	Console.WriteLine(response.Text);

    Now that we have a strongly typed request, it is easy, and very compelling, to provide a strongly typed request handler that will enforce the return type of the response and avoid the developer to know  how to reply to a message (through context.Endpoint.MessageBus.Reply).

    Also, I will throw an exception if the handler returns null or I can catch the exception  and return a specific error message that the send request implementation will intercept and rethrow as a local exception.

Btw - I don't really like sending a specific message containing an Error code and checking it at the other side. That's why we introduced exceptions many years ago ;-)

 

    /// <summary>
    /// A handler that knows how to respond to a submitted request.
    /// </summary>
    /// <typeparam name="TRequest">the type of the request to process</typeparam>
    /// <typeparam name="TResponse">the type of the response to send after processing of the request</typeparam>
    public interface IMessageRequestHandler<TRequest, TResponse>
        where TRequest: IMessageRequest<TResponse> {

        /// <summary>
        /// Processes the submitted <paramref name="request"/> and return a response of type <typeparamref name="TResponse"/>
        /// </summary>
        /// <param name="request">the request to process</param>
        /// <param name="context">the context of the <paramref name="request"/></param>
        /// <returns>an instance of a <typeparamref name="TResponse"/></returns>
        TResponse Handle(TRequest request, IMessageContext context);
    }

   So, the echo handler would be:

 

	class EchoRequestHandler : IMessageRequestHandler<EchoRequest, EchoResponse> {
	   public EchoResponse Handler(EchoRequest request, IMessageContext context) {
	     return new EchoResponse { Text = request.Text };
	   }
	}

    So far, I managed to create a message handler adapter to support this scenario without altering your framework.

A final note regarding requests: for me a request should provide a response within a short amount of time (the timeout) and block the code until it either comes back or times out.

We should not be able to register a callback as it might introduce memory leaks if the response never comes back or keep resources too long in memory. If the response is expected to take a long time then the developer should change his strategy and simply consume a message queue, and thus make it asynchronous and reuse the messaging infrastructure.

Also, requests should not be persistant and they should use the expiration feature to make it expire slightly before the request timeout, as requests are just a disguised RPC feature.

 

This is my 2 cents.

I'm just starting to use a message bus and ActiveMQ, and thus I could be completely wrong!

Morgan

 

 

Jul 25, 2009 at 5:23 AM

I forgot to say that when the endpoint implements the send of a request, it should first try to detect if the transport natively supports it before doing it with the correlation service.

It could be done by checking for a specific interface like this:

 

interface IRequestSender {
     Object Send(Object request, TimeSpan timeout);
}

 

 

Jul 25, 2009 at 10:44 PM

I forgot to include a destination parameter to the Send method in IRequestSender!

But now I'm wondering if it would make sense to have multiple destinations for a request? Even with a single one, if it is a topic, then several subscribers could reply to the same request. This would be the case when asking for a price to several suppliers in order to return the best quote within a specific time frame.

Then should we have a signature that allows an enumeration of responses? If yes, it means that we would have to wait until the timeout expires in order to collect as many responses as possible.

So, we would need 2 signatures: a simple one that returns a single response as fast as possible (RPC) and another one that would allow several responses.

 

interface IRequestSender {
     Object Ask(Object request, String destination, TimeSpan timeout);
     IEnumerable<Object> AskMany(Object request, String[] destinations, TimeSpan timeout);
}

Am I going crazy???