|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
how to handle multiple requests, to get response |
« View previous topic :: View next topic » |
Author |
Message
|
aanch |
Posted: Mon Mar 31, 2008 11:25 pm Post subject: how to handle multiple requests, to get response |
|
|
Newbie
Joined: 31 Mar 2008 Posts: 2
|
how to handle multiple requests, to get response for a particular request in the queue.
I have an urgent issue in my project that is too be handled as soon as possible. Please provide me some help on the issue I am stating below in the mail.
I have requirement using MQ to get and receive messages in the queue. I have made a MQ Sender and Reciever program that will put the request in the queue and will read the response and send it back.
Now when I am sending a single request in the REQUEST QUEUE, I am able to read the message and put it back. This scenario is working fine. But in another scenario I have to test the following:
When I am sending the first request, its response time out got over and it remained in the REQUEST queue itself. Thereafter I sent another request into the queue and tried executing it before response times out, in that case I am getting an error and not able to get the response for the 2nd request.
Can you please help me in how I can handle multiple request and response? I am setting correlation id also.
I am also attaching the java files that I am using for sending and receiving a request i.e MQSender.java and MQReciever.java
Thanks is advance. Please try to respond as soon as possible  |
|
Back to top |
|
 |
Gaya3 |
Posted: Mon Mar 31, 2008 11:29 pm Post subject: Re: how to handle multiple requests, to get response |
|
|
 Jedi
Joined: 12 Sep 2006 Posts: 2493 Location: Boston, US
|
aanch wrote: |
how to handle multiple requests, to get response for a particular request in the queue.
|
Threads will help you out to attain this..
and i didnt see your attachments at all...
Regards
Gayathri _________________ Regards
Gayathri
-----------------------------------------------
Do Something Before you Die |
|
Back to top |
|
 |
aanch |
Posted: Tue Apr 01, 2008 12:52 am Post subject: RE: |
|
|
Newbie
Joined: 31 Mar 2008 Posts: 2
|
Hey sorry i forgot paste my Java Files.
Here is MQReciever.java
[size=12][size=18]package com.wipro.mq;
import java.io.CharArrayReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Result;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import com.ibm.mq.MQC;
import com.ibm.mq.MQCcsidTable;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
*
* @author Venkat
*
* This class will be used to receive messages from
* Remote Q and will persist in a file system.
*/
public class MQReceiver {
MQQueueManager _queueManager = null;
int port = 1414;
String hostname = "10.177.150.192";
//String hostname = "davap0279.foremost.com";
String channel = "SYSTEM.DEF.SVRCONN";
String qManager = "WMQ1QM";
//String qManager = "ficdqma0";
String inputQName = "MQS.OIV.TEST.REQUEST.LOCAL";
private byte[] correlationId;
private byte[] messageId;
private String filename = null;
public MQReceiver() {
System.out.println("MQReceiverWITS connected");
}
private void browse() throws MQException {
int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_BROWSE
+ MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
MQQueue queue = _queueManager.accessQueue(inputQName, openOptions,
null, null, null);
System.out.println("MQBrowse v1.0 connected.\n");
int depth = queue.getCurrentDepth();
System.out.println("Current depth: " + depth + "\n");
if (depth == 0) {
return;
}
MQGetMessageOptions getOptions = new MQGetMessageOptions();
// below is the original one
getOptions.options = MQC.MQGMO_BROWSE_NEXT + MQC.MQGMO_NO_WAIT
+ MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
int i = 1;
while (i < 510) {
MQMessage message = new MQMessage();
try {
getOptions.options = MQC.MQGMO_BROWSE_NEXT + MQC.MQGMO_NO_WAIT
+ MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
queue.get(message, getOptions);
byte[] b = new byte[message.getMessageLength()];
message.readFully(b);
String strMQ = (String) new String(b);
String strHT = MQCcsidTable.lookup("819");
System.out.println(strMQ);
write(strMQ);
message.clearMessage();
getOptions.options = MQC.MQGMO_MSG_UNDER_CURSOR
+ MQC.MQGMO_SYNCPOINT;
queue.get(message, getOptions);
} catch (IOException e) {
System.out.println("IOException during GET: " + e.getMessage());
break;
} catch (MQException e) {
if (e.completionCode == 2
&& e.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
if (depth > 0) {
System.out.println("All messages read.");
}
} else {
System.out.println("GET Exception: " + e);
}
break;
}
++i;
}
queue.close();
_queueManager.disconnect();
}
// public String browseToString() throws MQException {
public void browseToString() throws MQException {
int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_BROWSE
+ MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
MQQueue queue = _queueManager.accessQueue(inputQName, openOptions,
null, null, null);
System.out.println("MQBrowse v1.0 connected.");
int depth = queue.getCurrentDepth();
System.out.println("Current depth: " + queue.getCurrentDepth());
/*
* if (depth == 0) { return ""; }
*/
MQGetMessageOptions getOptions = new MQGetMessageOptions();
// below is the original one
getOptions.options = MQC.MQGMO_BROWSE_NEXT + MQC.MQGMO_NO_WAIT
+ MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
String strMQ = "";
while (true) {
MQMessage message = new MQMessage();
try {
getOptions.options = MQC.MQGMO_BROWSE_NEXT + MQC.MQGMO_NO_WAIT
+ MQC.MQGMO_FAIL_IF_QUIESCING + MQC.MQGMO_CONVERT;
queue.get(message, getOptions);
byte[] b = new byte[message.getMessageLength()];
correlationId = message.correlationId;
messageId = message.messageId;
message.readFully(b);
strMQ = strMQ + (String) new String(b);
// String strHT = MQCcsidTable.lookup("819");
// stringToXml(strMQ);
message.clearMessage();
getOptions.options = MQC.MQGMO_MSG_UNDER_CURSOR
+ MQC.MQGMO_SYNCPOINT;
queue.get(message, getOptions);
} catch (IOException e) {
System.out.println("IOException during GET: " + e.getMessage());
break;
} catch (MQException e) {
if (e.completionCode == 2
&& e.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
if (depth > 0) {
System.out.println("All messages read.");
}
} else {
System.out.println("GET Exception: " + e);
}
break;
}
// stringToXml(strMQ.trim());
}
/*
* String string = "<?xml";
*
* String strMQStripped =
* strMQ.substring(strMQ.indexOf(string),strMQ.length());
*
* String filename = write(strMQStripped.trim());
*/
// String filename = write(strMQ);
write(strMQ);
queue.close();
_queueManager.disconnect();
// return filename;
}
public int getDepth() throws MQException {
int openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_BROWSE
+ MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INPUT_SHARED;
MQQueue queue = _queueManager.accessQueue(inputQName, openOptions,
null, null, null);
System.out.println("MQBrowse v1.0 connected.");
int depth = queue.getCurrentDepth();
System.out.println("Current depth: " + depth);
return depth;
}
public void init() {
// Set up MQ environment
MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.port = port;
}
/*
* public static void main(String[] args) {
*
* MQReceiver browse = new MQReceiver();
*
* try {
*
* browse.init(); browse.selectQMgr(); browse.browseToString();
* } catch (Exception e) { System.out.println(e);
* } }
*/
public void selectQMgr() throws MQException {
_queueManager = new MQQueueManager(qManager);
}
// added for writing to the text file
private void write(String strToBeWritten) {
FileWriter fileWr = null;
// String filename = null;
try {
java.util.Date timestamp = new java.util.Date();
filename = "Request_" + timestamp.getTime() + ".xml";
fileWr = new FileWriter(filename, true);
fileWr.write(strToBeWritten);
// fileWr.write("\n");
} catch (Exception e) {
System.out.println(e);
} finally {
try {
fileWr.close();
} catch (Exception e) {
}
}
// return filename;
}
/**
* Method for converting a String to an XML document.
*
* @param xmlText
* @author tatpta
*/
private void stringToXml(String xmlText) {
try {
// Create a factory
DocumentBuilderFactory factory = DocumentBuilderFactory
.newInstance();
// Use document builder factory
DocumentBuilder builder = factory.newDocumentBuilder();
// Parse the document
Reader reader = new CharArrayReader(xmlText.toCharArray());
Document doc = builder.parse(new org.xml.sax.InputSource(reader));
TransformerFactory tranFact = TransformerFactory.newInstance();
Transformer transfor = tranFact.newTransformer();
Node node = doc.getDocumentElement();
Source src = new DOMSource(node);
Result dest = new StreamResult(new FileOutputStream(new File(
"MQ_CoverageRequest_IAA.xml")));
transfor.transform(src, dest);
} catch (Exception e) {
System.err.println(e);
}
}
public String getFileName() {
return filename;
}
public byte[] getCorrelationId() {
return correlationId;
}
public byte[] getMessageId() {
return messageId;
}
public void setMessageId(byte[] messageId) {
this.messageId = messageId;
}
}
Here is MQSender.java
package com.wipro.mq;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
* @author Venkat
*
* This class read messages from a file system and writes messages to Remote queues
*/
public class MQSender {
/** Holds QManager Name */
private String strQMgr = null;
/** Holds HostName */
private String strHostName = null;
/** Holds Port Number */
private int iPort = 0;
/** Holds QName */
private String strQName = null;
/** Holds Channel name */
private String strChannel = null;
/** Holds Options that are needed while sending message */
private final static int OPENOPTIONS = MQC.MQOO_INPUT_AS_Q_DEF
| MQC.MQOO_OUTPUT;
/** Holds the MQQueueManager */
private MQQueueManager qMgr;
/** Holds MQQueue */
private MQQueue queue;
/** Correaltion Id */
private byte[] correlationId = null;
/** Message Id */
private byte[] messageId = null;
/**
* Method for assigning the values of Queue manager, Queue name, port,
* HostName and Channel name.
*
* @param strQMgr
* Queue Manager
* @param strHostName
* Host Name
* @param iPort
* Port Number
* @param strQName
* Queue Name
* @param strChannel
* Channel Name
*/
public MQSender(String strQMgr, String strHostName, int iPort,
String strQName, String strChannel) {
this.strQMgr = strQMgr;
this.strHostName = strHostName;
this.iPort = iPort;
this.strQName = strQName;
this.strChannel = strChannel;
}
/**
* This method initializes the Queue and Queue Manager.
*
* @param
* @return
* @throws
*/
private void initialize() {
MQEnvironment.hostname = strHostName;
MQEnvironment.channel = strChannel;
MQEnvironment.port = iPort;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
try {
qMgr = new MQQueueManager(strQMgr);
queue = qMgr.accessQueue(strQName, OPENOPTIONS);
} catch (MQException mqEx) {
}
}
/**
* This method send the Data to Queue by reading from an input data file.
*
* @param message
* String
*/
public void send(String messageFilePath) {
initialize();
try {
int i = 0;
String orderXML = null;
FileInputStream in = new FileInputStream(messageFilePath);
DataInputStream ind = new DataInputStream(in);
MQMessage putMessage = new MQMessage();
String longXML = "";
while ((orderXML = ind.readLine()) != null) {
longXML += orderXML;
}
//System.out.println(longXML);
putMessage.correlationId = correlationId;
putMessage.writeString(longXML);
putMessage.format = MQC.MQFMT_STRING;
MQPutMessageOptions pmo = new MQPutMessageOptions();
queue.put(putMessage, pmo);
qMgr.commit();
System.out.println("Message with (" + correlationId + ","+ messageId + "despatched to queue.");
} catch (Exception e) {
e.printStackTrace();
} finally {
closeAll();
}
}
/**
* This method send the message to Queue
*
* @param message
* String
*/
public void postMessage(String msg) {
synchronized ("wait") {
initialize();
MQMessage putMessage = new MQMessage();
try {
putMessage.writeString(msg);
} catch (IOException e) {
e.printStackTrace();
}
putMessage.format = MQC.MQFMT_STRING;
MQPutMessageOptions pmo = new MQPutMessageOptions();
//added for CorrelationId and MessageId
pmo.options = MQC.MQPMO_NEW_MSG_ID | MQC.MQPMO_NEW_CORREL_ID;
try {
queue.put(putMessage, pmo);
qMgr.commit();
} catch (MQException e) {
e.printStackTrace();
}
finally {
closeAll();
}
}
}
/**
* This method send the message to Queue
*
* @param message
* String
*/
public void postMessage(ArrayList<String> msgs) {
synchronized ("wait") {
initialize();
//MQMessage putMessage = new MQMessage();
try
{
int i = 0;
while(i<50000)
{
try {
MQMessage putMessage = new MQMessage();
putMessage.writeString(msgs.get(i));
putMessage.format = MQC.MQFMT_STRING;
MQPutMessageOptions pmo = new MQPutMessageOptions();
queue.put(putMessage, pmo);
qMgr.commit();
i++;
} catch (MQException mqEx) {
}
}
}
catch(Exception e)
{
e.printStackTrace();
}
finally {
closeAll();
}
}
}
/**
* This method closes the QueueManager Connection and Queue Connection
*
* @param
* @return
* @throws
*/
public void closeAll() {
try {
queue.close();
qMgr.close();
} catch (MQException mqEx) {
System.out.println("Error Occurred while Closing the connections");
}
}
public byte[] getCorrelationId(){
return correlationId;
}
public void setCorrelationId(byte[] correlationId){
this.correlationId = correlationId;
}
public byte[] getMessageId() {
return messageId;
}
public void setMessageId(byte[] messageId) {
this.messageId = messageId;
}
/*
public static void main(String[] args) {
//10.158.3.147
//192.168.56.10
MQSender sender = new MQSender("WMQ1QM", "10.177.150.192",
1414, "MQS.OIV.TEST.REQUEST.LOCAL", "SYSTEM.DEF.SVRCONN");
sender.send("NOProlog_CoverageRequest_IAA.xml");
}
*/
}
I am going through the links.In meanwhile if you all could help me
Thanks |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|