Author |
Message
|
steffi2 |
Posted: Fri Sep 04, 2009 4:03 pm Post subject: When to acknowledge when using mulitple threads? |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
So on a project I'm working on I read from a queue and I dispatch to a thread pool that has worker threads and the general idea is that I never want to acknowledge a message until I know for sure that I've processed it otherwise it risks becoming lost.
There are two approaches
1. is to acknowledge the message immediately after the details have been written to a working table and then in the event that the server restarts you pickup messages from this table prior to processing any from the queue. upon processing each message you update it's state or delete the row from this table to reflect that it's no longer working.
2. another approach is to queue up acknowledgements in the main message listener thread such that as each worker thread completes it's job it enques the message for acknowledge by the main delivery thread. ie. the same thread that is picking up the messages. So after pulling a message from the queue and having dispatched it you then at the end of the cycle work thru the queue to acknowledge the messages that has been queued up by the worker threads to be acknowledged.
I believe that option 2 results in less work since there's no round trips to the database necessary to maintain a working table.
Websphere's MQ in JMS requires that you acknowledge in the delivery thread so unless you do it immediately or after processing each message you have to maintain a working area if you want to guarantee that any message pulled of the queue was actually processed and not lost along the way. |
|
Back to top |
|
 |
fjb_saper |
Posted: Sat Sep 05, 2009 7:38 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
Steffi2 If it were as you display it would indeed be a very poor implementation of the JMS standard.
- You should use the JMS standard i.e. MDBs. It automatically handles a number of things for you, such as scalability and transactionality...
- you should make use of the transactionality of the session (requires new transaction, or participates in the global transaction
- use Session.AUTO_ACKNOWLEDGE. You won't have to bother with acknowledgement. All you have to think about is your session and the transactional borders.
And remember to never rely on the default behavior of MQ as it is not the same across OS platforms. Always explicitely set your commit and rollbacks even if it is only by marking the transactionality on your ejb / mdb method descriptor.
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Wed Sep 09, 2009 7:15 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
As we are not using any app server we cannot use MDBs.
So, that leaves me with a Message Listener solution only.
Right now I suspect I'm going to need to use mutiple sessions across processes and split the incoming traffic across multiple queues.
If I use AUTO_ACKNOWLEDGE the message is gone at the end of the transaction but I'm guessing that transaction cannot span mutiple threads. Hence the approach I'm taking where I'm queuing up acknowledgements to be performed by the delivery thread. I'm guessing the last place I can commit is at the end of my onMessage and not anywhere in a child thread. Transactions only help if I want to operate on multiple records automatically ie. acknowledge them as one, processes them as one sec. I don't need that behaviour.
fjb_saper wrote: |
Steffi2 If it were as you display it would indeed be a very poor implementation of the JMS standard.
- You should use the JMS standard i.e. MDBs. It automatically handles a number of things for you, such as scalability and transactionality...
- you should make use of the transactionality of the session (requires new transaction, or participates in the global transaction
- use Session.AUTO_ACKNOWLEDGE. You won't have to bother with acknowledgement. All you have to think about is your session and the transactional borders.
And remember to never rely on the default behavior of MQ as it is not the same across OS platforms. Always explicitely set your commit and rollbacks even if it is only by marking the transactionality on your ejb / mdb method descriptor.
Have fun  |
|
|
Back to top |
|
 |
fjb_saper |
Posted: Wed Sep 09, 2009 7:30 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
OK so you're running a stand alone type JMS app.
Remember that the transactional borders are manifested by your MQsession.
Code: |
connection.createSession(true,Session.AUTO_ACKNOWLEDGE); |
How you use the session is up to you.
You can create multiple sessions.
But remember that you are single threading everything through the connection. So may be you need to create multiple connections/sessions.
Enjoy  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Thu Sep 10, 2009 6:45 am Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
Yes I am doing that already. ie. mutiple asyncdelivery threads thru muliple sessions/listeners.
Q.
is there any extension in MQ that allows me with JMS to _only_ acknowledge the message and _NOT_ all previously acknowledged messages which is what the JMS spec defines? |
|
Back to top |
|
 |
atheek |
Posted: Thu Sep 10, 2009 11:52 am Post subject: |
|
|
 Partisan
Joined: 01 Jun 2006 Posts: 327 Location: Sydney
|
Operate session in transacted mode as FJ has suggested and do session.commit() as the last step of onMessage() method. In this way, you are acknowledging only the message processed within the current context of onMessage.
One query, I couldn't understand what benefit you are getting here by separating the work into delivery threads and worker threads. The delivery thread here has to wait within the onMessage() method till the worker thread completes , so that you can acknowledge the message to JMS provider. So why can't the main work be done within delivery thread itself and scale up the delivery thread pool size ( by running multiple sessions/consumers) for scalability. What is actually driving you here to spread the work across multiple threads. |
|
Back to top |
|
 |
steffi2 |
Posted: Thu Sep 10, 2009 4:10 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
The general idea is that the granularity of concurrency is finer when I dispatch to my thread pool. This could be changed by enriching the message properties further so I can use selectors to consumer with the same level of granularity but there's one thing I'm confused about that you said.
"the delivery thread has to wait within onMessage till the worker thread completes?"
this isn't the case because the submit to the threadpool is an async operation it's only acknowleding the messages I have collected (from those worker threads that have completed) that keep the onMessage busy at the end of it's method because it iterates over those messages and acknowledges them using the same Session that received them. I don't necessarily have to wait for the work thread that I just dispatched to on this "go round" to finish before onMessage completes. Another message can come off the queue and in the meantime that previous work thread finishes and adds that older message to the list of messages that need to be acknowledged (LinkedBlockingQueue)
The idea was that I could only acknowledge a message when I knew it has been processed but the notion that CLIENT_ACKNOWLEDGE will acknowledge all prior unacknowledged messages cause that no longer to be the case and leaves me with the possibility that messages were ACKNOWLEDGE before having been processed and that would result in lost messages upon a restart.
So it sounds like I'm essentially forced to do all the work in the delivery thread and abandon any use of a thread pool because I don't have a way to just acknowledge this and only this message. Each worker thread essentially has it's own LinkedBlockingQueue that it's reading from and so the delivery thread submits to a LinkedBlockingQueue for a given worker thread which ensures guaranteed order of processing. It's a little more complicated than this but I simply figure that if I can take messages off the queue, queue them in a worker threads queue and then work on them that that will result in a fast throughput than simply doing all the work in onMessage where there are is a distinct session->connection->receiver->listener per thread resulting in multiple AsyncDelivery Threads. |
|
Back to top |
|
 |
fjb_saper |
Posted: Thu Sep 10, 2009 4:25 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
The general idea is that the granularity of concurrency is finer when I dispatch to my thread pool. This could be changed by enriching the message properties further so I can use selectors to consumer with the same level of granularity but there's one thing I'm confused about that you said.
"the delivery thread has to wait within onMessage till the worker thread completes?"
this isn't the case because the submit to the threadpool is an async operation it's only acknowleding the messages I have collected (from those worker threads that have completed) that keep the onMessage busy at the end of it's method because it iterates over those messages and acknowledges them using the same Session that received them. I don't necessarily have to wait for the work thread that I just dispatched to on this "go round" to finish before onMessage completes. Another message can come off the queue and in the meantime that previous work thread finishes and adds that older message to the list of messages that need to be acknowledged (LinkedBlockingQueue)
The idea was that I could only acknowledge a message when I knew it has been processed but the notion that CLIENT_ACKNOWLEDGE will acknowledge all prior unacknowledged messages cause that no longer to be the case and leaves me with the possibility that messages were ACKNOWLEDGE before having been processed and that would result in lost messages upon a restart.
So it sounds like I'm essentially forced to do all the work in the delivery thread and abandon any use of a thread pool because I don't have a way to just acknowledge this and only this message. Each worker thread essentially has it's own LinkedBlockingQueue that it's reading from and so the delivery thread submits to a LinkedBlockingQueue for a given worker thread which ensures guaranteed order of processing. It's a little more complicated than this but I simply figure that if I can take messages off the queue, queue them in a worker threads queue and then work on them that that will result in a fast throughput than simply doing all the work in onMessage where there are is a distinct session->connection->receiver->listener per thread resulting in multiple AsyncDelivery Threads. |
OK here we are (finally) getting to the meat of the matter.
You are working an anti pattern. If you are trying to distribute work to x worker thread according to some criteria but there is message affinity you should create x queues and have a dispatcher thread. It receives the message, inspects the criteria and puts it on the processor queue. Work done, session acknowledged.
Now you can have an MDB on each topical/processor queue and pick up the messages there and process them and acknowledge them against the session.
Overall it can be solved by a dispatcher pattern but it is a bad design all together because of the message affinity. Find out what mandates the message affinity and get rid of it. This way you will have much better scalability. The dispatcher pattern is still valid.
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Thu Sep 10, 2009 7:00 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
Can you describe what you mean by "message affinity" in this context?
So dispatcher is a bad idea so the only pattern that is recommended is one where you have n asyncdelivery threads where each has it's own session that qualifies by selector what each receiver/listener should grab off the queue (this also guarantees order of processing since I have to process messages that meet that same criteria in chronological order) and in addition I must complete all work in onMessage ie. the async delivery thread and acknowledge at the end of onMessage. Your'e saying that is more performant then abandoning the idea of queueing up acknowledgements and replacing that with maintaining a working table as each message is pulled off the queue it's content is inserted into the table (which will be read upon restart to ensure all messages that were acknoweledge get processed) and as each worker thread completes it's work it removes the content or marks the content deleted. In this scenario as soon as I have inserted my row into the working table I immediately acknowledge the message. The last thing I have to do in my onMessage is dispatch to worker thread pool.
given those two scenarios n asyncdelivery threads where each one is represented by a message listener attached to the receiver which is attached to a separate session which uses a selector to qualify the subset that's appropriate for any given receiver? Even despite the fact that the evaluation of the selector will be on the client and not the server and hence by that I assume it means that a delivery thread has to inspect the message before it knows it has to process it. |
|
Back to top |
|
 |
fjb_saper |
Posted: Thu Sep 10, 2009 7:30 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
Can you describe what you mean by "message affinity" in this context?
So dispatcher is a bad idea so the only pattern that is recommended is one where you have n asyncdelivery threads where each has it's own session that qualifies by selector what each receiver/listener should grab off the queue (this also guarantees order of processing since I have to process messages that meet that same criteria in chronological order) and in addition I must complete all work in onMessage ie. the async delivery thread and acknowledge at the end of onMessage. Your'e saying that is more performant then abandoning the idea of queueing up acknowledgements and replacing that with maintaining a working table as each message is pulled off the queue it's content is inserted into the table (which will be read upon restart to ensure all messages that were acknoweledge get processed) and as each worker thread completes it's work it removes the content or marks the content deleted. In this scenario as soon as I have inserted my row into the working table I immediately acknowledge the message. The last thing I have to do in my onMessage is dispatch to worker thread pool.
given those two scenarios n asyncdelivery threads where each one is represented by a message listener attached to the receiver which is attached to a separate session which uses a selector to qualify the subset that's appropriate for any given receiver? Even despite the fact that the evaluation of the selector will be on the client and not the server and hence by that I assume it means that a delivery thread has to inspect the message before it knows it has to process it. |
Looks to me like you have target locking and are not getting what I'm trying to tell you.
- Say you receive messages with a characteristic with a value.
- Say you want to discriminate between blue, white and red
- Say each color needs a different processing
so
- create an MDB with no selector to read the input queue
- create 3 queues one blue, one white, one red
- have your MDB dispatch the messages read to queue blue, white or red according to their charateristic
- Have the blue processing MDB pick up the messages from the blue queue
- have the white processing MDB pick up the message from the white queue
- have the red processing MDB pick the messages from the red queue.
All nice and asynchronous, and scalable.
No selector is involved and no slowdown, but a dispatcher pattern.
As said previously this does not solve your message affinity problem (i.e. messages have to process in order on red) which will prevent you from scaling (red).
Enjoy  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Thu Sep 10, 2009 8:02 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
In our example we have order id's that are what has to be processed in sequence. So records with the same order id need to be processed by the same thread.... that's how the current dispatch works it dispatches by order id and then I get concurrency by order id but the range of order id's is unlimited so I cannot have N MDB's waiting to process them. Also our current technology set doesn't allow for any MDB's only J2SE JMS only.
Your approach sounds interesting but I think I've still got a problem.
For instance let's say I've got a main input queue and I hash the order id into n buckets and create n queues to listen to those messages that have been directed to those queues.
in that approach which seems elegant I still have the same problem because in my main delivery thread I'm removing an object from the queue, applying the hashing algorithm and then writing it to the N'th bucket's queue. Then a listener is attached to a bucket queue to process the message. If the client should go down it needs to remember what messages were in the bucket queues so these queues need to be static and persistent.
Now this model is essentially what I already have with my dispatcher and the problem is simply that I don't know what's left in the bucket queues when my server goes down but the message would have already been acknowledged and so it's no longer on the main queue.
So in order for this to work I need to have a queue that can survive restart but having that be an MQ is a lot better than an in memory queue.
I think it might be possible to test how this works but the downside is that I'm still limited to N static queues as a temporary queue will not survive a restart ie. remember where things left off prior to restart. |
|
Back to top |
|
 |
atheek |
Posted: Sat Sep 12, 2009 12:55 am Post subject: |
|
|
 Partisan
Joined: 01 Jun 2006 Posts: 327 Location: Sydney
|
Weblogic JMS has this Unit-of-Order feature [which does exactly what you are trying to achieve] which allows ordered processing within a group of messages.How BEA guys has implemented this is not known. Under the hood they might be using temporary queues and delayed acknowledgement on the original queue, which is very similar to your original idea. I believe it requires some support at the core Messaging kernel of the JMS provider. Webpshere MQJMS doesn't offer any support currently and building a wrapper around the non supported JMS provider to mock this really doesn't work, I think. So you may be restricted to use dispatcher pattern with predefined set of queues.
This is what my guess on how BEA guys would have done this. Might be a naive idea.
Have a property Setter component which sets a custom property on the message which indicates the bucket to which it belongs and route the message to another queue.
Example, assume red, blue, green are the different buckets and there is a JMS property called Color which contain this information. Have 3 session objects in 3 threads, operated in CLIENT_ACKNOWLEDGE mode. Create three MessageConsumers from these 3 sessions and have them synchronously poll the queue at the same time. Use MesaageSelectors on the Consumer object, one for each color. Whenever a message is matched, it is processed and CLIENT_ACKNOWLEDGEMENT is used
for acknowledgment.
[ Note: Unit of order is a custom JMS header and I think weblogic JMS supports indexing of all headers and properties which improves the performance of message selectors. It is upto the JMS client to set this header, very similar to the property setter in our case. However, a client application can use a single consumer and have messages belonging to multiple groups in sequence. ]
No need for temporary queues or separate worker threads. The objective of attaining concurrency of 'N' Messages (where N is the number of buckets) is achieved.
How about performance? Depends very much on the JMS provider support for message selection. MQ v6.0 as I understand doesn't perform well with message selection on custom properties.
Also it depends heavily on how interleaved the messages are on the queue
say my queue is BGRBGRBGR - I get great performance, even with non support of indexing on custom properties by the JMS provider.
say my queue is BBBBBBBBGGGRRRRRRRR, the performance suffers heavily. |
|
Back to top |
|
 |
fjb_saper |
Posted: Sat Sep 12, 2009 11:13 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
Quote: |
say my queue is BGRBGRBGR - I get great performance, even with non support of indexing on custom properties by the JMS provider. |
Not necessarily. Your assumption (that you did not state) is that it takes the same amount of time to process each of the colors (G,B,R).
Now if the messages are not sparse enough and it takes significantly longer to process R than G or B but the volume is sufficiently low to not need to scale the R processor, then you quickly enter a performance degradation for B and G as the beginning of the queue will look like RRRRRBGRBGRBGR until the backlog or R is worked off.
Indexing and server side selection will be a great help there.  _________________ MQ & Broker admin |
|
Back to top |
|
 |
fjb_saper |
Posted: Sat Sep 12, 2009 11:22 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
in that approach which seems elegant I still have the same problem because in my main delivery thread I'm removing an object from the queue, applying the hashing algorithm and then writing it to the N'th bucket's queue. Then a listener is attached to a bucket queue to process the message. If the client should go down it needs to remember what messages were in the bucket queues so these queues need to be static and persistent.
Now this model is essentially what I already have with my dispatcher and the problem is simply that I don't know what's left in the bucket queues when my server goes down but the message would have already been acknowledged and so it's no longer on the main queue.
So in order for this to work I need to have a queue that can survive restart but having that be an MQ is a lot better than an in memory queue. |
If you have a DB available you can also run the following setup and it only requires 2 queues and multiphase commit or good juggling with commits.
- upon receiving message from queue 1 move to queue 2 and record following in the DB :
original msgid, correlid, characteristic; processed status, current msgid (q2)
- have a process that reads the DB (index it) by processed status and characteristic. It will retrieve the current msgid and build the selector to use this on Q2. It needs to lock the current record during processing and update the status when done.Depending on how you lock your db / characteristics you can even allow concurrency of processing for different characteristics...
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
atheek |
Posted: Sat Sep 12, 2009 12:01 pm Post subject: |
|
|
 Partisan
Joined: 01 Jun 2006 Posts: 327 Location: Sydney
|
Quote: |
Not necessarily. Your assumption (that you did not state) is that it takes the same amount of time to process each of the colors (G,B,R). |
OK. I didn't state it, but unit of order concept is used generally for similar type messages which require some ordered processing within a group. So the processing time for each type is roughly same. Steffi's message affinity requirement was not for different types of orders, If I understand correctly. It was for similar type orders, but the required ordered processing was for messages with same orderID. We had a similar requirement for flight schedule update messages. We used unit of order in Weblogic JMS for guaranteeing ordered processing of messages for a particular flight on a particular day. |
|
Back to top |
|
 |
|