Author |
Message
|
pshan81 |
Posted: Thu Jul 19, 2007 10:23 pm Post subject: MQ Publisher - Subscriber code in java |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
Hi,
I am new to MQ Publisher Subscriber concept.I have gone through the publisher and subscriper code in C.
I need to develop the same using java MQ API(MQI).
Can anyone post java sample for Pub Sub.
Thanks a lot. |
|
Back to top |
|
 |
Vitor |
Posted: Thu Jul 19, 2007 11:42 pm Post subject: |
|
|
 Grand High Poobah
Joined: 11 Nov 2005 Posts: 26093 Location: Texas, USA
|
You'll find Java samples in most of the places you found C samples. _________________ Honesty is the best policy.
Insanity is the best defence. |
|
Back to top |
|
 |
pshan81 |
Posted: Tue Jul 24, 2007 5:05 am Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
Hi,
After going through some C language samples,I have written a publisher Subscriber program using pure java MQI API.I am able to put the message in the Stream queue.However, I am not getting any message in the subscriber.I have pasted the publisher and subscriber code below.
Am I missing out something in the code?
Code: |
public class publisher {
final public static String MQFMT_RF_HEADER = "MQHRF ";
final public static byte [] MQRFH_STRUC_ID = {'R','F','H',' '};
final public static byte [] MQFMT_STRING_ARRAY = {'M','Q','S','T','R',' ',' ',' '};
final public static int MQRFH_VERSION_1 = 1;
final public static int MQRFH_STRUC_LENGTH_FIXED = 32;
final public static int MQRFH_NONE = 0x00000000;
final public static int MQRC_UNKNOWN_OBJECT_NAME = 2085;
final public static int MQRC_NO_MSG_AVAILABLE = 2033;
final public static String MQPS_REGISTER_PUBLISHER = "RegPub ";
final public static String MQPS_COMMAND ="MQPS_PUBLISH";
final public static String MQPS_TOPIC = "Stock/wip";
final public static String MQPS_TOPIC_B = " MQPSTopic ";
public void init(String args)
{
String[] queues=new String[3];
queues[0]="SYSTEM.BROKER.CONTROL.QUEUE";
queues[1]="SAMPLE.BROKER.RESULTS.STREAM";
queues[2]="RESULTS.SERVICE.SAMPLE.QUEUE";
try {
MQEnvironment.hostname="XX.XXX.XX.XXX";
MQEnvironment.port=YYYY;
MQEnvironment.channel="SYSTEM.DEF.SVRCONN";
MQQueueManager qmgr=new MQQueueManager(args);
System.out.println("Queue manager connected");
int openOptions=MQC.MQOO_OUTPUT;
MQQueue queue=qmgr.accessQueue("SAMPLE.BROKER.RESULTS.STREAM",openOptions,null,null,null);
System.out.println("Queue opened");
MQMessage msg=new MQMessage();
msg.messageType=MQC.MQMT_REQUEST;
msg.replyToQueueName="RESULTS.SERVICE.SAMPLE.QUEUE";
StringBuffer nameValues = new StringBuffer (MQPS_COMMAND + MQPS_REGISTER_PUBLISHER);
nameValues.append (MQPS_TOPIC_B);
nameValues.append (MQPS_TOPIC);
nameValues.append ("TEST MSG");
while ( (nameValues.length () % 16) != 0 )
{
nameValues.append (" ");
}
String requestStr = nameValues.toString();
msg.feedback = MQC.MQFB_NONE;
msg.format = MQFMT_RF_HEADER;
msg.encoding = MQC.MQENC_NATIVE;
msg.characterSet = MQC.MQCCSI_Q_MGR;
try {
msg.write (MQRFH_STRUC_ID);
msg.writeInt (MQRFH_VERSION_1); //RFH Version
msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
msg.writeInt (MQC.MQENC_NATIVE); //Encoding
msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
msg.write (MQFMT_STRING_ARRAY); //Format
msg.writeInt (MQRFH_NONE); //Flags
msg.writeBytes (requestStr);
queue.put(msg);
System.out.println("Message successfully put into the queue");
} catch (IOException e) {
e.printStackTrace();
}
} catch (MQException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
publisher obj=new publisher();
obj.init(qmgrname);
}
}
public class subscriber{
final public static String MQFMT_RF_HEADER = "MQHRF ";
final public static byte [] MQRFH_STRUC_ID = {'R','F','H',' '};
final public static byte [] MQFMT_STRING_ARRAY = {'M','Q','S','T','R',' ',' ',' '};
final public static int MQRFH_VERSION_1 = 1;
final public static int MQRFH_STRUC_LENGTH_FIXED = 32;
final public static int MQRFH_NONE = 0x00000000;
final public static int MQRC_UNKNOWN_OBJECT_NAME = 2085;
final public static int MQRC_NO_MSG_AVAILABLE = 2033;
final public static String MQPS_COMMAND_B = "MQPSCommand ";
final public static String MQPS_REGISTER_SUBSCRIBER = "RegSub ";
final public static String MQPS_TOPIC = "Stock/wip ";
final public static String MQPS_TOPIC_B = " MQPSTopic ";
public void init(String args)
{
String[] queues=new String[3];
queues[0]="SYSTEM.BROKER.CONTROL.QUEUE";
queues[1]="SAMPLE.BROKER.RESULTS.STREAM";
queues[2]="RESULTS.SERVICE.SAMPLE.QUEUE";
try {
MQEnvironment.hostname="XX.XXX.XX.XXX";
MQEnvironment.port=YYYY;
MQEnvironment.channel="SYSTEM.DEF.SVRCONN";
MQQueueManager qmgr=new MQQueueManager(args);
int openOptions=MQC.MQOO_INPUT_SHARED;
MQQueue queue=qmgr.accessQueue("SYSTEM.BROKER.CONTROL.QUEUE",openOptions,null,null,null);
MQMessage msg=new MQMessage();
//msg.format=MQC.MQFMT_RF_HEADER_2;
msg.messageType=MQC.MQMT_REQUEST;
msg.replyToQueueName="RESULTS.SERVICE.SAMPLE.QUEUE";
StringBuffer nameValues = new StringBuffer (MQPS_COMMAND_B + MQPS_REGISTER_SUBSCRIBER);
nameValues.append (MQPS_TOPIC_B);
nameValues.append (MQPS_TOPIC);
while ( (nameValues.length () % 16) != 0 )
{
nameValues.append (" ");
}
String requestStr = nameValues.toString();
msg.feedback = MQC.MQFB_NONE;
msg.format = MQFMT_RF_HEADER;
msg.encoding = MQC.MQENC_NATIVE;
msg.characterSet = MQC.MQCCSI_Q_MGR;
try {
msg.write (MQRFH_STRUC_ID);
msg.writeInt (MQRFH_VERSION_1); //RFH Version
msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
msg.writeInt (MQC.MQENC_NATIVE); //Encoding
msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
msg.write (MQFMT_STRING_ARRAY); //Format
msg.writeInt (MQRFH_NONE); //Flags
msg.writeBytes (requestStr);
MQGetMessageOptions gmo=new MQGetMessageOptions();
gmo.options=MQC.MQGMO_WAIT;
gmo.waitInterval=180000;
System.out.println("Subscriber started...Waiting for messages");
queue.get(msg,gmo);
byte[] a=new byte[msg.getMessageLength()];
msg.readFully(a);
System.out.println(msg);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //StructId
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
subscriber obj=new subscriber ();
obj.init(qmgrname);
}
}
|
Thanks |
|
Back to top |
|
 |
jefflowrey |
Posted: Tue Jul 24, 2007 5:15 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
You won't get messages published until there is a registered subscriber to a topic.
You don't register publishers or subscribers by sending messages to the STREAM queue, you send them to the broker Control queue.
JMS makes working with Pub/Sub very easy. However, if you're not running in a J2EE app server, you might not want to use JMS.
Start with the Using Pub/Sub manual: http://publib.boulder.ibm.com/infocenter/wmqv6/v6r0/index.jsp?topic=/com.ibm.mq.amqnar.doc/amqnar10.htm _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
fjb_saper |
Posted: Tue Jul 24, 2007 1:19 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
Knowing that pub/sub is always somewhat slower than P2P...
In order to write a pub/sub in Java Base you'd have to give me a real compelling reason not to use JMS.
JMS just makes everything with pub/sub so much simpler...
Now if you use the WebSphere Message Broker, there is a possibility for your clients to do P2P and leave the pub/sub to the broker only...
But at that point I would expect the clients to still be JMS just for the automatic stripping of the RFH header. I'd also expect the broker to put the pub RFH header on the message...
Enjoy  _________________ MQ & Broker admin |
|
Back to top |
|
 |
pshan81 |
Posted: Mon Jul 30, 2007 8:35 pm Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
I have tried out the pub/sub using JMS.It's working fine and looks very easy.I am exploring the same using MQI now.I have changed the subscriber code (which I have posted earlier) and I am getting the published message in the subscriber queue.
Using the below code I am able to get all the messages whichever is coming to the subscriber queue.What is the change I have to do to get the messages based on the topic.
Code: |
StringBuffer nameValues = new StringBuffer ("MQPSCommand "+"RegSub "+"MQPSTopic "+"Stock/IBM/* "+"MQPSStreamName "+"SAMPLE.BROKER.RESULTS.STREAM "+"MQPSQMgrName "+"QMBRK1 "+"MQPSQName "+"RESULTS.SERVICE.SUB.QUEUE ");
while ( (nameValues.length () % 16) != 0 )
{
nameValues.append (" ");
}
String requestStr = nameValues.toString();
msg.feedback = MQC.MQFB_NONE;
msg.format = MQFMT_RF_HEADER;
msg.messageType=MQC.MQMT_DATAGRAM;
msg.encoding = MQC.MQENC_NATIVE;
msg.characterSet = MQC.MQCCSI_Q_MGR;
msg.replyToQueueName=streamQueue.name;
pmo.options=MQC.MQPMO_NEW_MSG_ID;
try {
msg.write (MQRFH_STRUC_ID);
msg.writeInt (MQRFH_VERSION_1); //RFH Version
msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
msg.writeInt (MQC.MQENC_NATIVE); //Encoding
msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
msg.write (MQFMT_STRING_ARRAY); //Format
msg.writeInt (MQRFH_NONE); //Flags
msg.writeBytes (requestStr);
queue.put(msg);
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT;
gmo.waitInterval=10000;
MQMessage replyMsg = new MQMessage ();
streamQueue.get (replyMsg, gmo);
|
|
|
Back to top |
|
 |
jefflowrey |
Posted: Tue Jul 31, 2007 3:21 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
You can't GET messages by topic once they're on the queue.
You can either only subscribe the queue to a particular topic, or you can GET by CorrelID to ensure you only get messages for your subscription. _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
pshan81 |
Posted: Tue Jul 31, 2007 3:52 am Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
Jef ,
Could you pls brief on the above.... |
|
Back to top |
|
 |
jefflowrey |
Posted: Tue Jul 31, 2007 3:54 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
Use different queues for different subscribers.
Or use the CorrelId (which should be your subscriberId) to separate messages. _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
pshan81 |
Posted: Sun Aug 05, 2007 11:20 pm Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
We have an application that is doing some business validations and routing messages to different customers.It uses MQ as the transport layer and we are planning to implement the publish/subscribe feature of MQ.
What are the pros/cons of achieving publish/subscribe using MQI and MQ JMS.We are using MQ 5.3 CSD10 in AIX 5.2 We are not using any application server. |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Aug 06, 2007 4:23 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
pshan81 wrote: |
We have an application that is doing some business validations and routing messages to different customers.It uses MQ as the transport layer and we are planning to implement the publish/subscribe feature of MQ.
What are the pros/cons of achieving publish/subscribe using MQI and MQ JMS.We are using MQ 5.3 CSD10 in AIX 5.2 We are not using any application server. |
If you are using JMS you should at least be @ CSD 11 of 5.3.
Better go to 6.0.2.2 as 5.3 will be out of support soon...  _________________ MQ & Broker admin |
|
Back to top |
|
 |
pshan81 |
Posted: Tue Aug 07, 2007 12:31 am Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
Saper,
Thanks.We are planning and hope we will migrate to MQ 6 soon.What will be the pros/cons of implementing of pub/sub using MQI and MQJMS(Other than the programming overhead of framing RFH header in MQI). |
|
Back to top |
|
 |
fjb_saper |
Posted: Tue Aug 07, 2007 3:01 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
pshan81 wrote: |
Saper,
Thanks.We are planning and hope we will migrate to MQ 6 soon.What will be the pros/cons of implementing of pub/sub using MQI and MQJMS(Other than the programming overhead of framing RFH header in MQI). |
You are doing pub/sub with Java. If you do not have a very compelling reason not to use JMS, I would strongly suggest you do use JMS.
Enjoy  _________________ MQ & Broker admin |
|
Back to top |
|
 |
pshan81 |
Posted: Tue Aug 07, 2007 4:58 am Post subject: |
|
|
Acolyte
Joined: 24 May 2005 Posts: 72
|
I have two queue managers/brokers qmgrA and qmgrB running on two different windows machine.The queue managers are connected using sender and receiver channel.
I am publishing a message on Topic "wip" from qmgrA using MQJMS.I am able to subscribe the message in qmgrB(topic "wip") If I specify qmgrB in the BROKERPUBQMGR attribute in the topic.
In addition to the above,when I run a subscriber subscribing to the same topic "wip" on qmgrA, I am not able to get the published message.However,if i specify qmgrA in the BROKERPUBQMGR attribute in the topic I am able to subscribe the message(but not in the subscriber running on qmgrb).
Is there anything I am missing out for both the subscribers(on different machines) to subscribe to the same topic? |
|
Back to top |
|
 |
fjb_saper |
Posted: Tue Aug 07, 2007 12:43 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
First you need to know wich the publishing broker is.
This is the broker you subscribe to, irrespective of the qmgr you are on.
Make sure that the Topic object you use for the subscription and retrieval of your message specifies a queue local to the qmgr you are connected to for the retrieval of the published messages from your subscription...
Hope this is not too confusing.  _________________ MQ & Broker admin |
|
Back to top |
|
 |
|