|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Duplicate messages read off queue |
« View previous topic :: View next topic » |
Author |
Message
|
akuszel |
Posted: Mon Nov 03, 2003 8:30 am Post subject: Duplicate messages read off queue |
|
|
Newbie
Joined: 28 Oct 2003 Posts: 3
|
Hi all,
I am running a multi-threaded application using the MQI "C" client libraries (MQSeries V5.2). The application uses POSIX threads and is compiled with SUN Forte C 6 on Solaris 8. The application uses a parent thread to do a non-desctuctive read of a message off a queue, obtain the MsgId from the MQMD header and then launch a worker
thread(in a detached state) passing the MsgId before getting the next message. Once the parent thread reaches the end of the queue, it will wait for 30 seconds for any new messages to arrive before closing and reopening the queue to begin the process over again.
The worker thread using a different connection handle opens the Queue and does a non-destructive read of the message using the MsgId passed to it from the parent thread. The message is processed and if successfull, the queue is closed, reopened, the message is re-read, then committed and the worker thread exits. If the message is not successfully processed, it is left on the queue and the thread exits. In any case, the
worker thread always closes the Queue and disconects from the queue manager and free's up any allocated memory prior to exiting.
The issue that I am facing is that every so often (not all of the time) the parent will launch two worker threads, right after one another using the same MsgId. I have decreased the number of times that this happens by insterting a 1 second sleep in the parent thread between the launching of a worker thread and the getting of the next message on the queue.
The application has a requirement to processes several messages concurrently, so I do not like the idea of having the application sleeping between MQGET's.
I am fairly new to MQSeries and multi-threaded applications and I am not sure if this is a timing problem with MQSeries or a problem with the application. Any assistance would be greatly appreciated. Please find below some pseudo logic and MQI options for both the parent and worker threads.
Thank You
Alan Kuszel
Parent Thread:
MQGMO GetMsgOpts = { MQGMO_DEFAULT };
OpenOptions = MQOO_BROWSE+MQOO_FAIL_IF_QUIESCING;
GetMsgOpts.Version = MQGMO_VERSION_2;
GetMsgOpts.MatchOptions = MQMO_NONE;
GetMsgOpts.Options = MQGMO_WAIT+MQGMO_ACCEPT_TRUNCATED_MSG+MQGMO_BROWSE_NEXT;
GetMsgOpts.WaitInterval = 30000;
connect to queue manager
open queue
while() {
get message from queue and obtain MsgId
launch worker thread passing MsgId
get next message on queue
no more messages ?
wait for 30 secs
close and re-open queue
}
Worker Thread:
MQGMO GetMsgOpts = { MQGMO_DEFAULT };
bs_MsgDesc.Version = MQMD_VERSION_2;
bs_OpenOptions = MQOO_BROWSE+MQOO_FAIL_IF_QUIESCING;
bs_GetMsgOpts.Version = MQGMO_VERSION_2;
bs_GetMsgOpts.Options = MQGMO_NO_WAIT+MQGMO_BROWSE_FIRST;
bs_GetMsgOpts.matchOptions = MQMO_MATCH_MSG_ID;
connect to queue manager
open queue
get message from queue based on passed MsgId
process message
if fail {
close queue
disconnect from manager
exit
}
close queue
bs_OpenOptions = MQOO_INPUT_AS_Q_DEF+MQOO_FAIL_IF_QUIESCING;
bs_GetMsgOpts.Version = MQGMO_VERSION_2;
bs_GetMsgOpts.MatchOptions = MQMO_MATCH_MSG_ID;
bs_GetMsgOpts.Options = MQGMO_NO_WAIT+MQGMO_SYNCPOINT+MQGMO_ACCEPT_TRUNCATED_MSG;
open queue
get message from queue based on passed MsgId
commit message
close queue
disconnect from queue manager
exit |
|
Back to top |
|
 |
jefflowrey |
Posted: Mon Nov 03, 2003 8:49 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
This looks like a lot of extra work to me.
Why not just get the message in syncpoint, and pass it to the worker thread?
If the worker thread fails, then backout the UOW. Otherwise, commit.
Also your main loop in your main thread is very unusual (at least to me). You have an extra 'get next message' in there that doesn't seem very helpful. It seems like you'll skip a lot of messages - because you're getting the next message both before your loop ends one iteration and again when it starts the next one. I would code it a lot more like
Code: |
while () {
get message
if no more messages (reason code 2033) {
wait 30 seconds
close and reopen queue
}
launch worker thread with msgId
} |
Also, let's think about what you're doing here. You open a browse cursor on the queue in one thread. Then you change the queue in a different thread. How do you know that the browse cursor in your main thread will stay "synced" with the queue? I think you're going to be doing a lot of extra work to process the same set of messages, because you keep shuffling unprocessed messages to positions before the current browse cursor (so Browse Next returns 'no more messages on queue). _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
akuszel |
Posted: Mon Nov 03, 2003 11:26 am Post subject: |
|
|
Newbie
Joined: 28 Oct 2003 Posts: 3
|
Hi Jeff,
Thank you for the response.
My parent thread mainly operates as the code example that you had provided, I apologise as I did not explain that very well.
I don't use syncpointing and UOW commits and backouts in the parent thread as it can create multiple workers that run concurrently and the parent cannot know which worker thread completes and when.
For example, this is my understanding, if 10 threads are started for 10 different messages, Hconn will point (for a lack of a better term) to the last message that a MQGET was done. If a thread fails for thread number 2(2nd message of 10 read off the queue), then a backout would restore the 10th message and not the second.
In the worker threads I use a different connection handle so that the parent handle does not get shuffled around or so I am hoping. I realisze in the worker thread though I could probably just do the one MQGET under syncpoint control, but when I was originally developing the application I did not want to lose the message if the thread failed or crashed.
Thanks....
Al K |
|
Back to top |
|
 |
jefflowrey |
Posted: Mon Nov 03, 2003 11:53 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
akuszel wrote: |
when I was originally developing the application I did not want to lose the message if the thread failed or crashed. |
That's the whole point of syncpoint. If your thread contains the HCONN and the unit of work, then if it fails or crashes the message will be rolled back, not lost.
I'm still worried about the browse cursor being out of sync with the queue. But I haven't really done any multi-threaded apps like you are building. So keep that in mind, because I might be wrong about that and other things.
I have two suggestions to think about.
The first is to change your main loop to only do Inquiry against queue depth, instead of browsing the queue. If the queue depth > 1, then spawn a copy of the worker thread. The worker thread will then see if the queue depth is > 2 and spawn another worker thread if so. It will also get the first message, in syncpoint, and process it. You could put in additional logic to keep track of how many worker threads are active, so that you don't spawn a million of them if your queue gets really big.
The second is to change the looping of your main thread. Have it run a loop to gather message IDs into an array. Then run another loop through that array to launch your worker threads as you are now. _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|