This project is read-only.

Only one ReceivePipeline is allowed per Thread

Nov 3, 2010 at 3:56 PM

Hi,

I am currently using SSB in a project i am building. Basically what i have is a set of services that are seperated, and a bunch of handlers simply relaying the messages to the services and answering the response back over the queue ( these services are also used in WCF and a couple of other access ways)

Basically what i run into is the following

 

You Have:

Messagehandler ( uses by "server Bus")

service ( has it's own "client bus to talk to other services)

ChildService ( runs on different machine thus no issue)

Now what happens:

MessageHandler gets message
gives this to the service (same thread logically)
service processes and requires an answer from a different service. when doing an ask/reply using IextendedBus in Activemq, the childservice does what is required, replies and then the Requesting Service process fires the error:

 

System.InvalidOperationException: Only oneReceivePipeline is allowed per thread. A MessagePipline has already been asigned to this thread.

at SimpleServiceBus.Bus.Pipeline.MessagePipeline.SetCurrentPipeline(IMessagePipeline pipeline, PipelineDirection direction)

at SimpleServiceBus.Bus.Pipeline.MessagePipeline.Initialize(MessageEnvelope messageEnvelope, IMessagingEndpoint endpoint, PipelineDirection direction, IMessagePipelineComponent[] pipelineComponents)

at SimpleServiceBus.Bus.Pipeline.DefaultMessagePipelineFactory.CreatePipeline(MessageEnvelope messageEnvelope, PipelineDirection direction, IMessagePipelineComponent[] components)

at SimpleServiceBus.Endpoints.ActiveMQ.Transport.ActiveMQMessagePipelineFactory.CreateReceivePipelineFor(MessageEnvelope messageEnvelope)

at SimpleServiceBus.Endpoints.ActiveMQ.Transport.ActiveMQMessageBus.Ask(Object request, TimeSpan timeout)

 

Looking at the origin of this error, i have noticed that MessagePipeline has some threadstatic properties ( like CurrentReceivePipeLine and CurrentSendPipeline) meaning that you cannot have a service that is accessed from a messagehandler, do a request/reply action using a different bus ( this different bus would run in the same thread hence, it uses the same ThreadStatic pipelines..

so:

A) can i remove the threadstatic part and make this just a per instance property ?

B) If not, is there any other way to get around this ?

Nov 3, 2010 at 6:20 PM

I wanted something similar in my project using SSB (with MSMQ) and I ended up creating another EndPoint where I send messages between services. It may be ok to remove the threadstatic, but it seems risky.

Nov 3, 2010 at 7:29 PM

Hmm.. could you be more specific about that ? I actually did the same and it still failed :s

 

 

Messagehandler bus is started from the windows Service it is running in

The actual service starts it's own instance of SSB inside the service ( private)

However, offcourse both the handler as the service are running in the same thread since they both get spawned at the same time ( messagehandler get's spawned and the service is injected into it)

 

Eitherway.. the error is thrown :s

Nov 3, 2010 at 7:43 PM

It was rather messy to show the code, I used our IoC to generates the extra endpoint and differentiated them with a unique name. This is a snippet of the part that creates the new endpoint:

ioc.Register(Component
    .For<IMessageBus>()
    .ImplementedBy<MessageBus>()
    .LifeStyle.Is(Castle.Core.LifestyleType.Singleton)
    .Named("ssbMessageBus_" + queueName))
    .Register(Component
    .For<ITransport>()
    .ImplementedBy<MsmqTransport>()
    .LifeStyle.Is(Castle.Core.LifestyleType.Singleton)
    .Named("ssbTransport_" + queueName));

// Create the endpoint
var endpointBuilder = new EndpointBuilder<ServiceEndpoint, MsmqTransport>();

// Minimum configuration for endpoint operation
ServiceEndpoint endpoint = (ServiceEndpoint)endpointBuilder.CreateEndpoint(
    new ObjectBuilder(ioc, queueName));

endpoint.Initialize(endpointBuilder, queueName, workerThreads, messageLoggingLevel);

extraEndpoints.Add(endpoint);

return endpoint;

The trick to get it to work was to have a complete set up of endpoint and everything in it and use that when sending messages. I then use IoC to tell it to use this "outbound endpoint" when I need to send messages from my service:

ioc.Register(Component
    .For<IMessagingEndpoint>()
    .Instance(endpoint));

Nov 3, 2010 at 10:49 PM

I didn't realize this limitation existed, which doesn't really sound good to me! I want to reuse the same ActiveMQ connection all the time, in order to avoid spawning too many threads for nothing.

I think it deserves an investigation, and maybe it's not a big deal to fix, especially since there are unit tests...

Nov 3, 2010 at 11:22 PM

The issue is not really reusing connections, but for example in MSMQ you can only read the first message in a queue, it has no tags, priorities or anything that lets you peek ahead. Let's say you have one queue for both receiving messages that your service is handling (let's call it the logic module). And then you have another service that handles your database access, let's call it database module, with its own queue. Your client delivers a message to the logic module which needs stuff from the database to do its thing:

  1. App sends a request to Logic by delivering a message into LOGIC_QUEUE, waiting on response
  2. Logic handles the messages, but sends a request to Database by delivering a message into DB_QUEUE, waiting on the response since it needs stuff from DB
  3. Database accesses the database, sending a reply back to Logic, which is done by Database delivering the response into LOGIC_QUEUE.
  4. Logic sends a reply back to App by delivering it into APP_QUEUE.

 

Let's say that Logic takes a long time to process each request and there are multiple requests to the database module, and Logic also can only run 2 threads at the same time (or whatever the number). Then add the fact that there are multiple Apps sending requests into LOGIC_QUEUE to be processed. Since all messages are equal and MSMQ can only return the first in the list, you can get a deadlock situation. Pretend this is what's in the LOGIC_QUEUE:

  1. Top: Request A to Logic, removed from queue once Logic starts working on it
  2. Request B to Logic
  3. Request C to Logic
  4. Request D to Logic
  5. Response from Database that is needed for Request A

Since Logic only has 2 worker threads and MSMQ won't let you access message #5 before #2-4 are taken out. Since you're out of worker threads that won't happen. So you'll get timeout in the handler for Request A.

This is not really SSB's fault, it's a limitation of MSMQ in my opinion. I think ActiveMQ has ways around this (priority and categories?). But when I started to use SSB we decided that MSMQ was fine, so we needed a way to work around this issue. What I did was create another queue for each module that needs to send messages, so in the example above I also have LOGIC_QUEUE_OUTBOUND which Database is using to send replies to. That way in Logic I have one queue getting requests from my client apps, and a separate queue that Logic use to get messages that it's waiting on. Works perfectly. But it was tricky to implement this advanced scenario with multiple queues (ended up being Endpoints) in SSB.

Hope this explanation helps,
/Hakan

 

 

Nov 4, 2010 at 2:49 AM

Ok. I understand the limitations of MSMQ and that's why I used ActiveMQ temporary queues for returning the response in a request/response scenario, to avoid such timeouts.

So, in the case of SSB with an ActiveMQ transport, the pipeline should not block other calls from the same thread, especially since it would accept such calls from other threads anyway.

I don't know the work to make this optional and secure, but anyway in the short time we could easily fix this problem by modifying the Ask method in ActiveMQMessageBus in order to detect if there is already a receive pipeline active and then wrap itself in an async threaded call, using either a QueueUserWorkItem or the new Task library of the .Net 4 framework.

Here's a fix that uses QueueUserWorkItem in order to compile with .Net 2.0:

    public class ActiveMQMessageBus: MessageBus, IExtendedMessageBus {

        private class AsyncDispatch {
            public MessageEnvelope Envelope { get; set; }
            public Object Result { get; set; }
            public AutoResetEvent Event { get; set; }
        }

        public object Ask(object request, TimeSpan timeout) {
            CheckRunning("Ask request");
            var _transport = (ActiveMQTransport)this.Transport;
            var _responseEnvelope = InternalSend(request, (envelope, destination) => _transport.Ask(envelope, destination, timeout));
            if(_responseEnvelope != null) {
                try {
                    if(MessagePipeline.CurrentReceivePipeline != null) {
                        var _dispatch = new AsyncDispatch();
                        _dispatch.Envelope = _responseEnvelope;
                        _dispatch.Event = new AutoResetEvent(false);
                        ThreadPool.QueueUserWorkItem(state => {
                            var _params = (AsyncDispatch)state;
                            _params.Result = DispatchRequestResponse(_params.Envelope);
                            _params.Event.Set();
                        }, _dispatch);
                        _dispatch.Event.WaitOne();
                        return _dispatch.Result;
                    }
                    else {
                        return DispatchRequestResponse(_responseEnvelope);
                    }
                }
                catch(Exception ex) {
                    fLog.Error("Unhandled error processing message", ex);
                    throw;
                }
            }
            return null;
        }

        private Object DispatchRequestResponse(MessageEnvelope responseEnvelope) {
            var _factory = new ActiveMQMessagePipelineFactory();
            _factory.Endpoint = this.Endpoint;
            _factory.IgnoreMessageDispatch = true;
            var _pipeline = _factory.CreateReceivePipelineFor(responseEnvelope);

            OnBeginReceiveMessage(responseEnvelope, _pipeline);

            _pipeline.PipelineEvents.PipelineError += Events_PipelineError;
            _pipeline.ProcessMessage();

            OnMessageReceived(responseEnvelope);

            return responseEnvelope.Body;
        }
}

Hope this helps,
Morgan

 

Nov 4, 2010 at 4:21 AM

I forgot to catch any exception while dispatching to a sub-thread, which would block the calling thread forever! I also renamed a few things.

So, here's the take 2:

    public class ActiveMQMessageBus: MessageBus, IExtendedMessageBus {

        private class AsyncResponse {
            public MessageEnvelope Envelope { get; set; }
            public Object Result { get; set; }
            public AutoResetEvent Event { get; set; }
            public Exception Error { get; set; }
        }

        public object Ask(object request, TimeSpan timeout) {
            CheckRunning("Ask request");
            var _transport = (ActiveMQTransport)this.Transport;
            var _responseEnvelope = InternalSend(request, (envelope, destination) => _transport.Ask(envelope, destination, timeout));
            if(_responseEnvelope != null) {
                try {
                    if(MessagePipeline.CurrentReceivePipeline != null) {
                        var _asyncResponse = new AsyncResponse();
                        _asyncResponse.Envelope = _responseEnvelope;
                        _asyncResponse.Event = new AutoResetEvent(false);
                        ThreadPool.QueueUserWorkItem(state => {
                            var _args = (AsyncResponse)state;
                            try {
                                _args.Result = ProcessRequestResponse(_args.Envelope);
                            }
                            catch(Exception _error) {
                                _args.Error = _error;
                            }
                            _args.Event.Set();
                        }, _asyncResponse);
                        _asyncResponse.Event.WaitOne();
                        if(_asyncResponse.Error != null) 
                            throw _asyncResponse.Error;
                        else
                            return _asyncResponse.Result;
                    }
                    else {
                        return ProcessRequestResponse(_responseEnvelope);
                    }
                }
                catch(Exception ex) {
                    fLog.Error("Unhandled error processing message", ex);
                    throw;
                }
            }
            return null;
        }

        private Object ProcessRequestResponse(MessageEnvelope responseEnvelope) {
            var _factory = new ActiveMQMessagePipelineFactory();
            _factory.Endpoint = this.Endpoint;
            _factory.IgnoreMessageDispatch = true;
            var _pipeline = _factory.CreateReceivePipelineFor(responseEnvelope);

            OnBeginReceiveMessage(responseEnvelope, _pipeline);

            _pipeline.PipelineEvents.PipelineError += Events_PipelineError;
            _pipeline.ProcessMessage();

            OnMessageReceived(responseEnvelope);

            return responseEnvelope.Body;
        }

    }


Nov 4, 2010 at 10:52 AM

Ok, it was night time here so i didn' t follow up. Actually livetocode's answer is more related to my situation. I do understand the limitations of msmq ( hence using ActiveMQ ;-) )

 

But indeed, i am not quite sure why you would have the messagepipeline be threadstatic since this blocks your whole thread (independant of the amount of Busses you fire).

Actually in my case (i work for a company handeling a lot of audio /video) the idea is like this:

 

A) Client says to "publish" a media ( meaning encrypt it and get it ready for transport to a customer) to the "publish service"
B) Publish service receives this request, get's the item from database and checks some stuff. At this point, it finds out the media needs to be transcoded to a different audio/video format, it sends a request to the "transcode service"
C) Transcode service receives this, sees it is a WAV file that needs to be transcoded to an MP3, it does this and after this is done, sends a "tagging request" to the tagging service
D) Tagging service receives and processes, then answers back
E) Transcode service answers to publish that transcode is done
F) Publish goes further, sends "store request" to storage service
G) ... etc at infinitum ;-)

 

Eitherway, the error fires directly when a service processing a message needs to do a request/reply to another service ( for example, when transcode get's the "tagging respone", it fires an error since there already is a pipeline in order

 

I'm going to do a test with livetocode's hack but i think it would be more interessting to look at the core problem ( being, why is a messagepipeline related to a thread instead of an instance ?)

Nov 4, 2010 at 11:06 AM

Ok.. first checks and seems to be working. I'll implement like this and see if this causes any issues. Thanks for the quick reaction !

Nov 4, 2010 at 1:22 PM

Great, but this morning I came up with a much simpler/better solution, without involving subthreads! I simply cleared the current pipeline before invoking a new one, and restored it after.

    public class ActiveMQMessageBus: MessageBus, IExtendedMessageBus {

        public object Ask(object request, TimeSpan timeout) {
            CheckRunning("Ask request");
            var _transport = (ActiveMQTransport)this.Transport;
            var _responseEnvelope = InternalSend(request, (envelope, destination) => _transport.Ask(envelope, destination, timeout));
            if(_responseEnvelope != null) {
                try {
                    var _currentReceivePipeline = MessagePipeline.CurrentReceivePipeline;
                    if(_currentReceivePipeline != null)
                        MessagePipeline.ClearCurrentPipeline(PipelineDirection.Receive);
                    try {
                        return ProcessRequestResponse(_responseEnvelope);
                    }
                    finally {
                        if(_currentReceivePipeline != null)
                            MessagePipeline.SetCurrentPipeline(_currentReceivePipeline, PipelineDirection.Receive);
                    }
                }
                catch(Exception ex) {
                    fLog.Error("Unhandled error processing message", ex);
                    throw;
                }
            }
            return null;
        }

        private Object ProcessRequestResponse(MessageEnvelope responseEnvelope) {
            var _factory = new ActiveMQMessagePipelineFactory();
            _factory.Endpoint = this.Endpoint;
            _factory.IgnoreMessageDispatch = true;
            var _pipeline = _factory.CreateReceivePipelineFor(responseEnvelope);

            OnBeginReceiveMessage(responseEnvelope, _pipeline);

            _pipeline.PipelineEvents.PipelineError += Events_PipelineError;
            _pipeline.ProcessMessage();

            OnMessageReceived(responseEnvelope);

            return responseEnvelope.Body;
        }
    }
}

Nov 4, 2010 at 1:27 PM

Btw - I didn't question your request usage, but I must warn you that this might be tricky as your main handler has to block until it receives a response, preventing processing of other messages, and also it must specify a timeout for its request, which is not easy to guess, especially if your operations can take a long time, like converting an audio file. Even if it usually works without any load, then as soon as you'll get load in your systems, your queues might become overloaded and every requests will simply timeout!

So, if you can break down your workflow in simple messages which forward states to continue the operations, then it will scale as much as you want, but it's more difficult to code...

Nov 4, 2010 at 3:31 PM

I'll have to take a look at this but for now the solution will do.

 

About the request response. For us, even though these actions involve multiple services and messaging between them,  this is one transaction and i want the blocking to occur on the main handler since this makes it easier for us to rollback ( when transactions fail, a lot of corresponding actions on both the database, as on files stored locally and on public services like Amazon S3 have to be undone.

 

Considering the RequestTimout: true.. however: we give sufficient timeout for a load 4 times our normal. We already have a similar system inproduction using ActiveMQ. We just started seeing we were actually building (a very simple) messagebus, and started wondering if there was a library we could use rather then having to code everything from scratch. Eithery way: we do have experience with the trancoding and publishing process but are just taking a slighty different approach for more robustness. The succesfull publish of a file takes priority over the time it will take to do so.

We are not in a live or even near to live scenario (batches are run every so many hours) so increased waiting times on some of our services are less important and could easily be resolved by adding some extra publisher servers. I am however VERY interessted in what is called Saga's in MT and NServicebus. This could allow us to do distributed transactions without needing to keep track of all state in our own system, we would like this to be seperate and maybe only put a "concluded" state back in our own system (like "processing" , "error", "active", user does not require full knowledge of the backend process and in what step of the transaction it is)

Nov 5, 2010 at 1:36 AM

Ok, I fully understand your point regarding your main message which must run within a transaction, and in fact as ActiveMQ will retry the message several times if it fails, it should handle the rare cases where your requests would expire.

However, in our systems we rely heavily on ActiveMQ DLQ (Dead letter queue) to make sure we don't miss a message. So, if a publish action would fail, it would move it to the DLQ. Then we have Hyperic that will send alerts as soon as a message enters any DLQ, forcing a support guy to investigate the problem and reprocess the message.

Sagas look interesting too, but did you have a look at Microsoft Windows Server App Fabric? You could benefit from the persisting workflows and just bridge them with SSB... 

Also, did you have a look at Camel for ActiveMQ? It looks very powerful regarding enterprise messaging patterns...