|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
asynchronous mathing of replies from mainframe - urgent |
« View previous topic :: View next topic » |
Author |
Message
|
taggz |
Posted: Wed Aug 07, 2002 5:52 am Post subject: asynchronous mathing of replies from mainframe - urgent |
|
|
Newbie
Joined: 06 Jan 2002 Posts: 2
|
Greetings,
I am currently sending messages to the mainframe and responses are found on the reply-to-queue (permanent queue). If I do one message at a time, no problems. The problem is when there is more than one thread using the reply queue. It seems to be using a first-in-first-out match instead of matching using the correlation ID or message ID. Does this behaviour seem reasonable given the following code? What changes need to be made to ensure each thread gets its own reply only?
I have tried the following combinations:
1) Moved the message Id from the request message to the correlation id of the get message and a sometime incorrect match (fifo) is always found. Used MQC.MQMO_MATCH_CORREL_ID
2) Moved the message Id from the request message to the message Id of the get message and no match is found (2033). Used MQC.MQMO_MATCH_MSG_ID.
Any suggestions are appreciated. Thanks.
package ebalance.mqseries;
import com.ibm.mq.*; // Include the MQ package
import com.ibm.mqservices.*;
import hms.file.layouts.GddainqInputLayout;
import balanceadjustment.beans.comm.objects.ValidateAccountInputObject;
import hms.file.layouts.records.*;
import resource.errors.*;
import javax.naming.*;
import javax.sql.*;
import java.sql.*;
import java.util.HashMap;
import databases.commobjects.BaladjSysCfgDBBean;
import databases.BaladjSysCfgDB;
public class GDDAINQAccountLookup implements java.io.Serializable {
/***********************************************************************/
/* set qManager to the name of your queue manager if not using default */
/***********************************************************************/
private String qManager = "";
private MQQueueManager qMgr;
private String hmsEnvironmentCode = "";
private String replyToQueueName = "";
private String requestQueueName = "";
private String channel = "";
private String hostname = "";
private String hmsUserId = "";
private MQMessage validationResponseMessage = null;
private String recvMessage = null;
/***********************************************************************/
/* Actually run the program.... */
/***********************************************************************/
public synchronized String performLookup(ValidateAccountInputObject comm) throws HmsException, EnvironmentSetupError {
Connection con = null;
try {
InitialContext ic = new InitialContext();
DataSource ds = (DataSource) ic.lookup("jdbc/BALADJ01");
con = ds.getConnection();
BaladjSysCfgDB configDB = new BaladjSysCfgDB(con);
java.util.Hashtable cfg = configDB.getCfg();
con.close();
StringBuffer bank = new StringBuffer(Integer.toString(comm.getBankNumber()));
// right justify and pad to 3 digits
while (bank.length() < 3) {
bank.insert(0,"0");
}
try {
BaladjSysCfgDBBean hmsEnvironemntCodeBean = (BaladjSysCfgDBBean)cfg.get(bank + "." + "HMS.ENV.CODE");
hmsEnvironmentCode = hmsEnvironemntCodeBean.getSys_cfg_tx();
BaladjSysCfgDBBean qManagerBean = (BaladjSysCfgDBBean)cfg.get("HMS.Q.MGR.NM");
qManager = qManagerBean.getSys_cfg_tx();
BaladjSysCfgDBBean replyToQueueNameBean = (BaladjSysCfgDBBean)cfg.get("HMS.RPLY.Q.NM");
replyToQueueName = replyToQueueNameBean.getSys_cfg_tx();
BaladjSysCfgDBBean channelBean = (BaladjSysCfgDBBean)cfg.get("QENV.CHANNEL");
channel = channelBean.getSys_cfg_tx();
BaladjSysCfgDBBean requestQueueNameBean = (BaladjSysCfgDBBean)cfg.get(bank + "." + "HMS.RQST.Q.NM");
requestQueueName = requestQueueNameBean.getSys_cfg_tx();
BaladjSysCfgDBBean hostnameBean = (BaladjSysCfgDBBean)cfg.get("QENV.HOSTNAME");
hostname = hostnameBean.getSys_cfg_tx();
BaladjSysCfgDBBean hmsUserIdBean = (BaladjSysCfgDBBean)cfg.get("HMS.USER.ID");
hmsUserId = hmsUserIdBean.getSys_cfg_tx();
} catch (NullPointerException npe) {
throw new EnvironmentSetupError("Key Not Found");
}
HmsGddainqInputRecord record = new HmsGddainqInputRecord();
String message = record.createHmsInputRecord(comm).toString();
System.out.println(message);
//This sets up the environment to create MQ messages
MQEnvironment qEnv = new MQEnvironment();
qEnv.hostname = hostname; // IP address
qEnv.channel = channel; // CHANNEL of type ServerConnection
/*****************************/
/* Create a queue manager */
/*****************************/
qMgr = new MQQueueManager(qManager);
/*****************************/
/* Open the queue for output */
/*****************************/
int requestOpenOptions = MQC.MQOO_OUTPUT | MQC.MQOO_SET_ALL_CONTEXT;
MQQueue validationRequestQueue = qMgr.accessQueue(requestQueueName,
requestOpenOptions, null, null, null);
/*****************************/
/* Create a message to put */
/*****************************/
MQMessage validationRequestMessage = new MQMessage();
validationRequestMessage.format = MQC.MQFMT_STRING;
validationRequestMessage.writeBytes(message);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQC.MQPMO_SET_ALL_CONTEXT;
/***************************************/
/* Set the messageId and correlationId */
/***************************************/
//validationRequestMessage.messageId = messageId.getBytes();
//validationRequestMessage.correlationId = corr.getBytes();
validationRequestMessage.userId = hmsUserId;
validationRequestMessage.replyToQueueName = replyToQueueName;
validationRequestMessage.replyToQueueManagerName = qManager;
//validationRequestMessage.expiry = 700;
/**********************************************/
/* Put the message and save off the messageId */
/**********************************************/
validationRequestQueue.put(validationRequestMessage,pmo);
byte[] messageId = validationRequestMessage.messageId;
byte[] correlationId = validationRequestMessage.correlationId;
System.out.println("written message: " + message);
/*****************************/
/* Close the request queueu */
/*****************************/
validationRequestQueue.close();
/*****************************/
/* Open the queue for output */
/*****************************/
int responseOpenOptions = MQC.MQOO_INPUT_SHARED;
// other inputOptions MQOO_INPUT_EXCLUSIVE MQOO_INPUT_AS_Q_DEF
MQQueue validationResponseQueue = qMgr.accessQueue(replyToQueueName,
responseOpenOptions, null, null, null);
/******************************************************/
/* Set up our options to browse for the first message */
/******************************************************/
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = MQC.MQGMO_WAIT;
gmo.waitInterval = 40000;
gmo.matchOptions = MQC.MQMO_MATCH_MSG_ID;
//gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
System.out.println("requestUpdated"+ validationRequestMessage.correlationId);
/********************************************************/
/* Set the messageId so that browse will find the match */
/********************************************************/
MQMessage validationResponseMessage = new MQMessage();
validationResponseMessage.messageId = correlationId;
/****************************/
/* Get the matching message */
/****************************/
validationResponseQueue.get(validationResponseMessage, gmo);
recvMessage = validationResponseMessage.readString(validationResponseMessage.getMessageLength());
System.out.println("resp" + validationResponseMessage.correlationId);
System.out.println("gotten message: " + recvMessage);
/*****************************/
/* Close and disconnect */
/*****************************/
validationResponseQueue.close(); // this will become the replyToQueue instead of requestQueue
qMgr.disconnect();
/************************************/
/* Catch any errors and report them */
/************************************/
} catch (MQException ex) {
try {
qMgr.disconnect();
} catch (MQException ex1) {
System.out.println(ex1.getMessage());
}
throw new HmsException(ex.getMessage());
} catch (java.io.IOException ex) {
System.out.println("An error occurred writing to message buffer: " +
ex);
} catch (databases.errors.RepositoryException re) {
throw new EnvironmentSetupError(re.getMessage());
} catch (NamingException ne) {
throw new EnvironmentSetupError(ne.getMessage());
} catch (SQLException sqle) {
throw new EnvironmentSetupError(sqle.getMessage());
} finally {
if (con != null) {
try {
con.close();
} catch (Exception e) {
}
}
}
return recvMessage;
} /* end of start */
} /* end of program */
 |
|
Back to top |
|
 |
sgopal |
Posted: Wed Aug 07, 2002 1:26 pm Post subject: |
|
|
Acolyte
Joined: 30 Jul 2002 Posts: 63
|
Normally, in a request and reply message scenario (Client and server),
Client application generate the request message, puts the message on the queue and message ID is stored.
Server application which receives the messages, would process the request and generate
reply message by copying message ID of the request message into Correlation ID of the reply
message.
Client application waits for the reply message on the queue with correlation id as message of the request message.
This process of setting message ID and correlation ID should solve your problem.
Going thru ur code,
i found that you are storing message id and correlation id of the request message and waiting for the reply message with message id as correlation id of the request message.
Please note the message id is automatically generated by MQ not correlation ID. Correlation ID will be generated if you specify MQPMO_NEW_CORREL_ID option before putting the request message.
Hope this helps. |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|