|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
Multiple MQ Consumers accessing same queue on IBM MQ Series |
« View previous topic :: View next topic » |
Author |
Message
|
androidgalaxyman |
Posted: Thu Feb 06, 2014 12:12 am Post subject: |
|
|
Novice
Joined: 02 Jan 2014 Posts: 13
|
exerk wrote: |
androidgalaxyman wrote: |
...Are u saying the when under SYNCPOINT and don't commit - when either the app or when MQ backs out the transaction - then it will re-appear on the queue. ? |
When you get a message under SYNCPOINT the message is not available for other consumers to GET and unless you COMMIT the GET, i.e. when you have finished processing the message content, the message will not be deleted from the queue. If your application backs out the message, either explicitly or implicitly (e.g. broken connection) only then will it be available to another consumer to GET.
The above assumes your application doesn't do anything silly such as process the message content, act on it, and commit the content but not the message, e.g. GET, PROCESS, UPDATE DB, ROLL-BACK MESSAGE, and nowhere in your code do I see a COMMIT, explicit or otherwise. |
Hi Exerk, Thanks for your opinion. i didn't completly posted the code. Here that ..
Code: |
public void run() {
boolean bFileUsed = false;
boolean bSuccess = false;
/*
* For tracing purposes
*/
String sCurrentRequestID = "";
String sCurrentResponseID = "";
/*
* Options to get the messages
*/
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = MQC.MQGMO_SYNCPOINT | MQC.MQGMO_WAIT;
gmo.waitInterval = this._listener.getWaitInterval();
/*
* Let's twist !
*/
while (!this._bStopping) { // && !bReadyToLeave){
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.characterSet = MQC.MQCCSI_Q_MGR;
/*
* try{ _mqm.begin(); }catch(Exception e){ e.printStackTrace(); }
*/
/*
* Get the Message from the queue
*/
boolean bDone = false;
while (!this._bStopping && !bDone) {
try {
retrievedMessage.clearMessage();
} catch (IOException e1) {
this._listener.error("Error while clearing message.", e1);
}
long l = System.currentTimeMillis();
try {
this._listener.debug(new StringBuilder(" Retrieving message ...").append(_listener.getName()).toString());
this._inputQueue.get(retrievedMessage, gmo);
this._listener.debug(new StringBuilder(" Retrieving message ...").append(_listener.getName()).toString());
bDone = true;
} catch (MQException mqe) {
/*
* This is due to the connection getting suddenly down !
* This mean we have to raise it higher !
*/
if ((System.currentTimeMillis() - l) < 50) {
this._listener
.error("Error while reading message. Reason : "
+ mqe.reasonCode);
closeAll();
/*
* Retry every minutes.
*/
Initialize(60000);
}
/* Normal Error when no message - timeout */
}
bFileUsed = false;
if (bDone) {
bSuccess = true;
try {
/*
* Authenticate the message (Raw or Propertybag) The
* processing is Blocking .... (locked)
*/
sCurrentRequestID = Util
.Byte2String(retrievedMessage.messageId);
sCurrentResponseID = "";
this._listener.debug("Retrieved Message ID : "
+ sCurrentRequestID);
//logMQMD of request message
//call this method only on debug level
Level level = this._listener.getLogger().getLevel();
if(Level.DEBUG_INT == level.toInt()) {
String mqmdValues = new MQMessageWrapper().logMQMD(retrievedMessage);
this._listener.debug(mqmdValues);
}
//response message object
MQMessage respMsg = null;
//flag for specialResponseMsg. Need not publish in case of ack messages.
boolean spRespMsg = false;
/*
* Check for the backout count before doing anything.
*/
if (retrievedMessage.backoutCount > _nBackoutThreshold
&& _nBackoutThreshold != 0) {
/*
*
*/
retrievedMessage.encoding = MQC.MQENC_NATIVE;
retrievedMessage.format = " ";
this._listener.info("Backout count ("
+ retrievedMessage.backoutCount
+ ") exceed Backout Threshold ("
+ _nBackoutThreshold + ")");
if (_backoutQueue != null) {
bSuccess = publish_raw(this._backoutQueue,
retrievedMessage);
if (!bSuccess) {
if (accessBackoutQueue(true)) {
bSuccess = publish_raw(this._backoutQueue,
retrievedMessage);
}
}
} else {
this._listener
.warn("No Backout Queue defined.");
bSuccess = false;
}
if (!bSuccess
&& this._listener.getDeadLetterQueue() != null) {
this._listener
.warn("Publishing message to deadletterqueue '"
+ _deadLetterQueue.name.trim()
+ "'");
bSuccess = publish_raw(this._deadLetterQueue,
retrievedMessage);
if (!bSuccess) {
if (accessDeadLetterQueue(true)) {
bSuccess = publish_raw(
this._deadLetterQueue,
retrievedMessage);
}
}
}
if (!bSuccess) {
saveToFile("Rq_" + sCurrentRequestID,
new MQMessageWrapper(retrievedMessage,
this._listener)
.getMessageBody());
bSuccess = true;
bFileUsed = true;
}
} else {
MQMessageWrapper rqMsg = new MQMessageWrapper(
retrievedMessage, this._listener);
/*
* procStatus will contain the evt error code from
* the adapter.
*/
ProcessingStatus procStatus = new ProcessingStatus();
/*
* Main Processing of the request
*/
byte[] bMessageBody = rqMsg.getMessageBody();
if (bMessageBody.length > 4) {
this._listener.debug("Received MessageBody : "
+ (int) bMessageBody[0] + ", "
+ (int) bMessageBody[1] + ","
+ (int) bMessageBody[2] + ", "
+ (int) bMessageBody[3]);
}
byte[] bResponse = this._listener.process(
bMessageBody, null,
AbstractListener.LISTENER_TCC_MESSAGING,
procStatus);
//listener.debug("response message = " + new String(bResponse));
if (bResponse.length > 4) {
this._listener.debug("Sending MessageBody : "
+ (int) bResponse[0] + ", "
+ (int) bResponse[1] + ","
+ (int) bResponse[2] + ", "
+ (int) bResponse[3]);
}
/*
**************************************************** */
if (this._forwardQueue != null) {
this._listener
.debug("Attempting to froward original message ...");
/*
* We need to look at the response, and
* depending how it looks like, we will post the
* request in the forward queue.
*/
/*
* Every response starting with the value define
* in "<FORWARDPATTERN>" will imply a forward of
* the original request in the forward queue if
* defined.
*/
if (bResponse != null
&& bResponse.length > 0
&& this._listener.getForwardPattern() != null
&& new String(bResponse)
.startsWith(this._listener
.getForwardPattern())) {
retrievedMessage.expiry = -1; // Infinite
this._listener
.debug("Forwarding request in forward queue.");
publish_raw(this._forwardQueue,
retrievedMessage);
} else {
this._listener
.debug("Not forwarding because response doesn't match pattern ("
+ this._listener
.getForwardPattern()
+ ")");
}
}
if (this._listener.getRollBackOnFailure()
&& procStatus.getStatus() != Constants.STATUS_OK) {
/*
* we will roll back the transaction !
*/
if (procStatus.getStatus() == Constants.STATUS_TIMEDOUT) {
this._listener
.error("Session time out. Rolling back message.");
} else if (procStatus.getStatus() == Constants.STATUS_ERROR) {
this._listener
.error("Session error. Rolling back message.");
}
// System.out.println("procStatus = " +
// procStatus.getStatus());
/*
* This will genereate a rollback
*/
bSuccess = false;
} else {
/*
* In case of special messages (like an "ack"),
* we are not generating any response back. The
* response is a byte[]{0}.
*/
if (bResponse != null && bResponse.length == 1
&& bResponse[0] == 0) {
_listener.info("In case of special messages like an 'ack' we are not generating any response back. The response is a byte[]{0}.");
spRespMsg = true;
bSuccess = true;
} else {
/*
* Create a response MQMessage
*/
respMsg = new MQMessage();
respMsg.correlationId = rqMsg
.getCorrelationID();
rqMsg.appendMQMD(respMsg);
if (rqMsg.hasRFHHeader()) {
try {
rqMsg.appendRFHHeader(respMsg);
} catch (Exception e) {
this._listener
.error(" Error appending RFH header: "
+ e.getMessage());
bSuccess = false;
}
}
this._listener
.debug("Response MQMD QueueManager : "
+ respMsg.replyToQueueManagerName);
this._listener
.debug("Response MQMD Queue : "
+ respMsg.replyToQueueName);
// if
// (this._listener.getReplyToQueueManagerName()
// !=
// null){
// respMsg.replyToQueueManagerName =
// this._listener.getReplyToQueueManagerName();
// }
// if (this._listener.getReplyToQueueName()
// != null){
// respMsg.replyToQueueName =
// this._listener.getReplyToQueueName();
// }
if (bSuccess) {
try {
respMsg.write(bResponse);
} catch (Exception e) {
saveToFile("Resp_"
+ sCurrentRequestID,
bResponse);
bSuccess = true;
bFileUsed = true;
}
}
if (bSuccess && !bFileUsed) {
/*
* Try to publish to the
* replyqueue/queuemanager add
* IGNOREREPLYQUEUE
*/
if (!this._listener
.getIgnoreReplyQueue()) {
bSuccess = publishToReplyQueue(
rqMsg, respMsg);
} else {
bSuccess = false; // To force
// entering the
// publish to
// CONSUMER
// .....
}
if (!bSuccess) {
if (this._listener
.getConsumerDefined()) {
/*
* If it failed, and the replpy
* queue is not the default one,
* then try on the default one.
*/
if (_outputQueue != null) {
this._listener
.warn("Publishing message to default output queue '"
+ _outputQueue.name
.trim()
+ "'");
// if
// (rqMsg.getReplyToQueueName()
// == null ||
// !rqMsg.getReplyToQueueName().equalsIgnoreCase(this._outputQueue.name.trim()))
// {
bSuccess = publish(
this._outputQueue,
respMsg);
if (!bSuccess) {
if (accessOutputQueue(true)) {
bSuccess = publish(
this._outputQueue,
respMsg);
}
}
// }
} else {
this._listener
.warn("No Consumer defined for publishing response.");
bSuccess = false;
}
/*
* If it failed, then try to
* publish to the
* deadletterqueue.
*/
if (!bSuccess) {
if (this._listener
.getDeadLetterQueue() != null
&& _deadLetterQueue != null) {
this._listener
.warn("Publishing message to deadletterqueue '"
+ _deadLetterQueue.name
.trim()
+ "'");
bSuccess = publish(
this._deadLetterQueue,
respMsg);
if (!bSuccess) {
if (accessDeadLetterQueue(true)) {
bSuccess = publish(
this._deadLetterQueue,
respMsg);
}
}
}
/*
* No way to publis the
* response. We will try to
* persist it in a file.
*/
if (!bSuccess) {
saveToFile(
"Resp_"
+ sCurrentRequestID,
bResponse);
bSuccess = true;
bFileUsed = true;
}
}
} else {
/*
* There was no replyToQueue.
*/
if (!bSuccess) {
if (this._listener
.getIgnoreReplyQueue()) {
this._listener
.warn("No Consumer defined. Reply Queue Ignored due to Configuration. Commiting without response publishing");
} else {
this._listener
.warn("No Consumer defined. Commiting without response publishing");
}
bSuccess = true;
}
}
}
sCurrentResponseID = Util
.Byte2String(respMsg.messageId);
}
}
}
}
// bSuccess = false;
if (bSuccess) {
if (bFileUsed) {
this._listener
.info("Response has not been published to a queue, but written in a file. Commiting anywaying (RqID : "
+ sCurrentRequestID + ")");
try {
this._mqm.commit();
} catch (Exception e) {
this._listener
.error(" Error while commiting : "
+ e.getMessage());
}
} else {
if(spRespMsg == false && respMsg!=null){
if(Level.DEBUG_INT == level.toInt()) {
String mqmdValues = new MQMessageWrapper().logMQMD(respMsg);
this._listener.debug(mqmdValues);
}
}
this._listener
.info("Committing Response ID : "
+ sCurrentResponseID
+ ", (RqID : "
+ sCurrentRequestID + ")");
[b]this._mqm.commit();[/b]
}
} else {
this._listener.warn("Rolling Back Response ID : "
+ sCurrentResponseID + ", (RqID : "
+ sCurrentRequestID + ")");
this._mqm.backout();
}
} catch (Exception e) {
this._listener.error("Unexpected error.", e);
try {
this._mqm.backout();
} catch (MQException e2) {
this._listener
.error("Unexpected error while backout. Reason : "
+ e2.reasonCode);
}
}
}
}
}
closeAll();
_bIsStopped = true;
} |
|
|
Back to top |
|
 |
zpat |
Posted: Thu Feb 06, 2014 12:53 am Post subject: |
|
|
 Jedi Council
Joined: 19 May 2001 Posts: 5866 Location: UK
|
After a cursory look, you seem to have logic that reprocesses backed out messages until they exceed the backout threshold.
What value do you set in the backout threshold? If it's more than zero you should expect to process the same message again in certain situations.
Why not display the MQMD.Back value when you get the message, you will see it increment when re-processing the same message.
Or set the backout threshold to zero and see what happens then.
Or turn off syncpointing to check that this is the "problem".
MQ won't re-present the same message to several consumers - other than in the syncpoint or browse scenarios we have described.
Other MQGMO options you should consider are MQGMO_CONVERT, and MQGMO_FAIL_IF_QUIESCING.
These are both good practice to use (although won't affect backouts) - in particular the convert option will make your progam able to handle messages that arrive in any codepage. _________________ Well, I don't think there is any question about it. It can only be attributable to human error. This sort of thing has cropped up before, and it has always been due to human error. |
|
Back to top |
|
 |
androidgalaxyman |
Posted: Thu Feb 06, 2014 1:07 am Post subject: |
|
|
Novice
Joined: 02 Jan 2014 Posts: 13
|
zpat wrote: |
After a cursory look, you seem to have logic that reprocesses backed out messages until they exceed the backout threshold.
What value do you set in the backout threshold? If it's more than zero you should expect to process the same message again in certain situations.
Why not display the MQMD.Back value when you get the message, you will see it increment when re-processing the same message.
Or set the backout threshold to zero and see what happens then.
Or turn off syncpointing to check that this is the "problem".
MQ won't re-present the same message to several consumers - other than in the syncpoint or browse scenarios we have described.
Other MQGMO options you should consider are MQGMO_CONVERT, and MQGMO_FAIL_IF_QUIESCING.
These are both good practice to use (although won't affect backouts) - in particular the convert option will make your progam able to handle messages that arrive in any codepage. |
Dear zpat, thanks for your valuable inputs. But Listening MQ handler thread doesn't set any values. in default if there is no value specified, the application will set '0'
Code: |
String sTemp = getParam("BACKOUTTHRESHOLD");
if (sTemp != null) {
try {
_backoutThreshold = Integer.parseInt(sTemp);
} catch (Exception e) {
super
.error("BACKOUTTHRESHOLD must be numeric. Defaulting to '2'");
_retryCount = 2;
}
} else {
super
.warn("No Default BACKOUTTHRESHOLD specified. Make sure the backoutCount is specified in your Queue definition");
_backoutThreshold = 0;
}
|
|
|
Back to top |
|
 |
exerk |
Posted: Thu Feb 06, 2014 1:11 am Post subject: |
|
|
 Jedi Council
Joined: 02 Nov 2006 Posts: 6339
|
And don't back out messages to a Dead-Letter Queue, ensure you have an appropriate queue defined and back out any messages to that, and like zpat, I have only cursorily glanced at your code and don't see where you inquire on the queue to obtain the BOTHRESH and BOQNAME values. _________________ It's puzzling, I don't think I've ever seen anything quite like this before...and it's hard to soar like an eagle when you're surrounded by turkeys. |
|
Back to top |
|
 |
androidgalaxyman |
Posted: Thu Feb 06, 2014 1:36 am Post subject: |
|
|
Novice
Joined: 02 Jan 2014 Posts: 13
|
exerk wrote: |
And don't back out messages to a Dead-Letter Queue, ensure you have an appropriate queue defined and back out any messages to that, and like zpat, I have only cursorily glanced at your code and don't see where you inquire on the queue to obtain the BOTHRESH and BOQNAME values. |
Dear exerk, before starting Listener Thread, it will be read from some properties file. For my case these values not been supplied. so that the listener thread starts with default values._backoutThreshold = 0 . |
|
Back to top |
|
 |
exerk |
Posted: Thu Feb 06, 2014 2:19 am Post subject: |
|
|
 Jedi Council
Joined: 02 Nov 2006 Posts: 6339
|
androidgalaxyman wrote: |
Dear exerk, before starting Listener Thread, it will be read from some properties file. For my case these values not been supplied. so that the listener thread starts with default values._backoutThreshold = 0 . |
Save the effort of doing so; that's why QLOCALs have those attributes, so you don't have to externalize those parameters, and reduces complexity and problems due to human error. _________________ It's puzzling, I don't think I've ever seen anything quite like this before...and it's hard to soar like an eagle when you're surrounded by turkeys. |
|
Back to top |
|
 |
zpat |
Posted: Thu Feb 06, 2014 2:47 am Post subject: |
|
|
 Jedi Council
Joined: 19 May 2001 Posts: 5866 Location: UK
|
Like I said, print the message backout value (from the message) after each MQGET and see what is going on there.
If you have MQ 7.1 or 7.5 you could try using the activity trace feature to monitor the application's use of MQI calls.. _________________ Well, I don't think there is any question about it. It can only be attributable to human error. This sort of thing has cropped up before, and it has always been due to human error. |
|
Back to top |
|
 |
androidgalaxyman |
Posted: Thu Feb 06, 2014 8:57 pm Post subject: |
|
|
Novice
Joined: 02 Jan 2014 Posts: 13
|
zpat wrote: |
Like I said, print the message backout value (from the message) after each MQGET and see what is going on there.
If you have MQ 7.1 or 7.5 you could try using the activity trace feature to monitor the application's use of MQI calls.. |
Hi Zpat,
Please find the configuration. i have tried these configuration, but i haven't received Backout Cout= 0 more than that. so that there is never requeue takes place. but we do have some clients where they are experiencing the issue for below scenario.
Messages to be held up in a Queue listened by a Listener that is getting processed with one thread each in all the 4 servers.
While processing the message, the same message has been processed through two different sessions at the same point of time in two servers and hence, duplicate were played into the system.
WAITINTERVAL=1000
BACKOUTTHRESHOLD=3 |
|
Back to top |
|
 |
zpat |
Posted: Fri Feb 07, 2014 12:13 am Post subject: |
|
|
 Jedi Council
Joined: 19 May 2001 Posts: 5866 Location: UK
|
So you do have a backout threshold > 0.
Change it to zero if you don't want messages to be re-processed.
But you need to determine why these backouts are taking place.
I repeat, MQ will not present messages to the application more than once, unless browsed or backed out. If this was not the case, the world's financial systems would be in chaos.
We are not the IBM support center. If you still don't believe that your app is backing out messages (failing to commit properly) then run a MQ trace and ask IBM to look at it (open a PMR).
Make sure you have a backout queue set, and check the dead letter queue in case backed out messages are being placed there.
Just out of interest, what is your exact version of MQ on the QM and on the MQ clients? _________________ Well, I don't think there is any question about it. It can only be attributable to human error. This sort of thing has cropped up before, and it has always been due to human error. |
|
Back to top |
|
 |
exerk |
Posted: Fri Feb 07, 2014 1:06 am Post subject: |
|
|
 Jedi Council
Joined: 02 Nov 2006 Posts: 6339
|
androidgalaxyman, I've deleted your duplicate post. In the future please edit your posts rather than duplicate them just to change some formatting.
If you believe you have backed-out messages (again, I have not reviewed your code in depth) and the back-out count is zero, then open a PMR with IBM because there is definitely a problem. If, on the other hand, you are achieving the 'back out' by putting the message payload back on the queue as a new message, then the back-out count will be zero because it will be a new message, not the one you originally got. _________________ It's puzzling, I don't think I've ever seen anything quite like this before...and it's hard to soar like an eagle when you're surrounded by turkeys. |
|
Back to top |
|
 |
|
|
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|