ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » IBM MQ API Support » MQ Publisher - Subscriber code in java

Post new topic  Reply to topic
 MQ Publisher - Subscriber code in java « View previous topic :: View next topic » 
Author Message
pshan81
PostPosted: Thu Jul 19, 2007 10:23 pm    Post subject: MQ Publisher - Subscriber code in java Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

Hi,

I am new to MQ Publisher Subscriber concept.I have gone through the publisher and subscriper code in C.

I need to develop the same using java MQ API(MQI).

Can anyone post java sample for Pub Sub.


Thanks a lot.
Back to top
View user's profile Send private message
Vitor
PostPosted: Thu Jul 19, 2007 11:42 pm    Post subject: Reply with quote

Grand High Poobah

Joined: 11 Nov 2005
Posts: 26093
Location: Texas, USA

You'll find Java samples in most of the places you found C samples.
_________________
Honesty is the best policy.
Insanity is the best defence.
Back to top
View user's profile Send private message
pshan81
PostPosted: Tue Jul 24, 2007 5:05 am    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

Hi,
After going through some C language samples,I have written a publisher Subscriber program using pure java MQI API.I am able to put the message in the Stream queue.However, I am not getting any message in the subscriber.I have pasted the publisher and subscriber code below.

Am I missing out something in the code?

Code:


public class publisher {

   
   final public static String MQFMT_RF_HEADER = "MQHRF ";
   final public static byte [] MQRFH_STRUC_ID = {'R','F','H',' '};
   final public static byte [] MQFMT_STRING_ARRAY = {'M','Q','S','T','R',' ',' ',' '};
   final public static int MQRFH_VERSION_1 = 1;
   final public static int MQRFH_STRUC_LENGTH_FIXED = 32;
   final public static int MQRFH_NONE = 0x00000000;
   final public static int MQRC_UNKNOWN_OBJECT_NAME = 2085;
   final public static int MQRC_NO_MSG_AVAILABLE = 2033;

   
   final public static String MQPS_REGISTER_PUBLISHER = "RegPub ";

   final public static String MQPS_COMMAND ="MQPS_PUBLISH";


   final public static String MQPS_TOPIC = "Stock/wip";
   final public static String MQPS_TOPIC_B = " MQPSTopic ";

   public void init(String args)
   {
      String[] queues=new String[3];
      queues[0]="SYSTEM.BROKER.CONTROL.QUEUE";
      queues[1]="SAMPLE.BROKER.RESULTS.STREAM";
      queues[2]="RESULTS.SERVICE.SAMPLE.QUEUE";
      try {
         MQEnvironment.hostname="XX.XXX.XX.XXX";
         MQEnvironment.port=YYYY;
         MQEnvironment.channel="SYSTEM.DEF.SVRCONN";
         MQQueueManager qmgr=new MQQueueManager(args);
         System.out.println("Queue manager connected");
         int openOptions=MQC.MQOO_OUTPUT;
         MQQueue queue=qmgr.accessQueue("SAMPLE.BROKER.RESULTS.STREAM",openOptions,null,null,null);
         System.out.println("Queue opened");
         MQMessage msg=new MQMessage();
         msg.messageType=MQC.MQMT_REQUEST;
         msg.replyToQueueName="RESULTS.SERVICE.SAMPLE.QUEUE";
         StringBuffer nameValues = new StringBuffer (MQPS_COMMAND + MQPS_REGISTER_PUBLISHER);

         nameValues.append (MQPS_TOPIC_B);
         nameValues.append (MQPS_TOPIC);
         nameValues.append ("TEST MSG");

         while ( (nameValues.length () % 16) != 0 )
         {
         nameValues.append (" ");
         }

         String requestStr = nameValues.toString();
         msg.feedback = MQC.MQFB_NONE;
         msg.format = MQFMT_RF_HEADER;
         msg.encoding = MQC.MQENC_NATIVE;
         msg.characterSet = MQC.MQCCSI_Q_MGR;
         try {
         msg.write (MQRFH_STRUC_ID);         
         msg.writeInt (MQRFH_VERSION_1); //RFH Version
         msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
         msg.writeInt (MQC.MQENC_NATIVE); //Encoding
         msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
         msg.write (MQFMT_STRING_ARRAY); //Format
         msg.writeInt (MQRFH_NONE); //Flags
         msg.writeBytes (requestStr);
         queue.put(msg);
         System.out.println("Message successfully put into the queue");
         } catch (IOException e) {
            
            e.printStackTrace();
         }
      } catch (MQException e) {
         
         e.printStackTrace();
      }
      
   }
   public static void main(String[] args) {
      publisher obj=new publisher();
      obj.init(qmgrname);

   }
}

public class subscriber{

   
   final public static String MQFMT_RF_HEADER = "MQHRF ";
   final public static byte [] MQRFH_STRUC_ID = {'R','F','H',' '};
   final public static byte [] MQFMT_STRING_ARRAY = {'M','Q','S','T','R',' ',' ',' '};
   final public static int MQRFH_VERSION_1 = 1;
   final public static int MQRFH_STRUC_LENGTH_FIXED = 32;
   final public static int MQRFH_NONE = 0x00000000;
   final public static int MQRC_UNKNOWN_OBJECT_NAME = 2085;
   final public static int MQRC_NO_MSG_AVAILABLE = 2033;

   final public static String MQPS_COMMAND_B = "MQPSCommand ";
   final public static String MQPS_REGISTER_SUBSCRIBER = "RegSub ";
   final public static String MQPS_TOPIC = "Stock/wip ";
   final public static String MQPS_TOPIC_B = " MQPSTopic ";

   public void init(String args)
   {
      String[] queues=new String[3];
      queues[0]="SYSTEM.BROKER.CONTROL.QUEUE";
      queues[1]="SAMPLE.BROKER.RESULTS.STREAM";
      queues[2]="RESULTS.SERVICE.SAMPLE.QUEUE";
      try {
          MQEnvironment.hostname="XX.XXX.XX.XXX";
          MQEnvironment.port=YYYY;
          MQEnvironment.channel="SYSTEM.DEF.SVRCONN";

         MQQueueManager qmgr=new MQQueueManager(args);
         int openOptions=MQC.MQOO_INPUT_SHARED;
         MQQueue queue=qmgr.accessQueue("SYSTEM.BROKER.CONTROL.QUEUE",openOptions,null,null,null);
         MQMessage msg=new MQMessage();
         //msg.format=MQC.MQFMT_RF_HEADER_2;
         msg.messageType=MQC.MQMT_REQUEST;
         msg.replyToQueueName="RESULTS.SERVICE.SAMPLE.QUEUE";
         StringBuffer nameValues = new StringBuffer (MQPS_COMMAND_B + MQPS_REGISTER_SUBSCRIBER);

         nameValues.append (MQPS_TOPIC_B);
         nameValues.append (MQPS_TOPIC);


         while ( (nameValues.length () % 16) != 0 )
         {
         nameValues.append (" ");
         }

         String requestStr = nameValues.toString();
         msg.feedback = MQC.MQFB_NONE;
         msg.format = MQFMT_RF_HEADER;
         msg.encoding = MQC.MQENC_NATIVE;
         msg.characterSet = MQC.MQCCSI_Q_MGR;
         try {
         msg.write (MQRFH_STRUC_ID);         
         msg.writeInt (MQRFH_VERSION_1); //RFH Version
         msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
         msg.writeInt (MQC.MQENC_NATIVE); //Encoding
         msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
         msg.write (MQFMT_STRING_ARRAY); //Format
         msg.writeInt (MQRFH_NONE); //Flags
         msg.writeBytes (requestStr);
         MQGetMessageOptions gmo=new MQGetMessageOptions();
         gmo.options=MQC.MQGMO_WAIT;
         gmo.waitInterval=180000;
         System.out.println("Subscriber started...Waiting for messages");
         queue.get(msg,gmo);
         byte[] a=new byte[msg.getMessageLength()];
         msg.readFully(a);
         System.out.println(msg);
         } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
         } //StructId
      } catch (MQException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
      }
      
   }
   public static void main(String[] args) {
      subscriber obj=new subscriber ();
        obj.init(qmgrname);
   }
   

}
 


Thanks
Back to top
View user's profile Send private message
jefflowrey
PostPosted: Tue Jul 24, 2007 5:15 am    Post subject: Reply with quote

Grand Poobah

Joined: 16 Oct 2002
Posts: 19981

You won't get messages published until there is a registered subscriber to a topic.

You don't register publishers or subscribers by sending messages to the STREAM queue, you send them to the broker Control queue.

JMS makes working with Pub/Sub very easy. However, if you're not running in a J2EE app server, you might not want to use JMS.

Start with the Using Pub/Sub manual: http://publib.boulder.ibm.com/infocenter/wmqv6/v6r0/index.jsp?topic=/com.ibm.mq.amqnar.doc/amqnar10.htm
_________________
I am *not* the model of the modern major general.
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Tue Jul 24, 2007 1:19 pm    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20756
Location: LI,NY

Knowing that pub/sub is always somewhat slower than P2P...
In order to write a pub/sub in Java Base you'd have to give me a real compelling reason not to use JMS.

JMS just makes everything with pub/sub so much simpler...

Now if you use the WebSphere Message Broker, there is a possibility for your clients to do P2P and leave the pub/sub to the broker only...

But at that point I would expect the clients to still be JMS just for the automatic stripping of the RFH header. I'd also expect the broker to put the pub RFH header on the message...

Enjoy
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
pshan81
PostPosted: Mon Jul 30, 2007 8:35 pm    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

I have tried out the pub/sub using JMS.It's working fine and looks very easy.I am exploring the same using MQI now.I have changed the subscriber code (which I have posted earlier) and I am getting the published message in the subscriber queue.

Using the below code I am able to get all the messages whichever is coming to the subscriber queue.What is the change I have to do to get the messages based on the topic.


Code:


StringBuffer nameValues = new StringBuffer ("MQPSCommand "+"RegSub "+"MQPSTopic "+"Stock/IBM/* "+"MQPSStreamName "+"SAMPLE.BROKER.RESULTS.STREAM "+"MQPSQMgrName "+"QMBRK1 "+"MQPSQName "+"RESULTS.SERVICE.SUB.QUEUE ");
         while ( (nameValues.length () % 16) != 0 )
         {
         nameValues.append (" ");
         }

         String requestStr = nameValues.toString();          
         msg.feedback = MQC.MQFB_NONE;
         msg.format = MQFMT_RF_HEADER;
         msg.messageType=MQC.MQMT_DATAGRAM;
         msg.encoding = MQC.MQENC_NATIVE;
         msg.characterSet = MQC.MQCCSI_Q_MGR;
         msg.replyToQueueName=streamQueue.name;
                  pmo.options=MQC.MQPMO_NEW_MSG_ID;
         try {
         msg.write (MQRFH_STRUC_ID);         
         msg.writeInt (MQRFH_VERSION_1); //RFH Version
         msg.writeInt (MQRFH_STRUC_LENGTH_FIXED + requestStr.length());
         msg.writeInt (MQC.MQENC_NATIVE); //Encoding
         msg.writeInt (MQC.MQCCSI_INHERIT); //qmgr CCSID
         msg.write (MQFMT_STRING_ARRAY); //Format
         msg.writeInt (MQRFH_NONE); //Flags
         msg.writeBytes (requestStr);
         queue.put(msg);
           MQGetMessageOptions  gmo        = new MQGetMessageOptions ();
            gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT;      
         gmo.waitInterval=10000;
          MQMessage            replyMsg   = new MQMessage ();
          streamQueue.get (replyMsg, gmo);
      
Back to top
View user's profile Send private message
jefflowrey
PostPosted: Tue Jul 31, 2007 3:21 am    Post subject: Reply with quote

Grand Poobah

Joined: 16 Oct 2002
Posts: 19981

You can't GET messages by topic once they're on the queue.

You can either only subscribe the queue to a particular topic, or you can GET by CorrelID to ensure you only get messages for your subscription.
_________________
I am *not* the model of the modern major general.
Back to top
View user's profile Send private message
pshan81
PostPosted: Tue Jul 31, 2007 3:52 am    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

Jef ,
Could you pls brief on the above....
Back to top
View user's profile Send private message
jefflowrey
PostPosted: Tue Jul 31, 2007 3:54 am    Post subject: Reply with quote

Grand Poobah

Joined: 16 Oct 2002
Posts: 19981

Use different queues for different subscribers.

Or use the CorrelId (which should be your subscriberId) to separate messages.
_________________
I am *not* the model of the modern major general.
Back to top
View user's profile Send private message
pshan81
PostPosted: Sun Aug 05, 2007 11:20 pm    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

We have an application that is doing some business validations and routing messages to different customers.It uses MQ as the transport layer and we are planning to implement the publish/subscribe feature of MQ.

What are the pros/cons of achieving publish/subscribe using MQI and MQ JMS.We are using MQ 5.3 CSD10 in AIX 5.2 We are not using any application server.
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Mon Aug 06, 2007 4:23 pm    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20756
Location: LI,NY

pshan81 wrote:
We have an application that is doing some business validations and routing messages to different customers.It uses MQ as the transport layer and we are planning to implement the publish/subscribe feature of MQ.

What are the pros/cons of achieving publish/subscribe using MQI and MQ JMS.We are using MQ 5.3 CSD10 in AIX 5.2 We are not using any application server.

If you are using JMS you should at least be @ CSD 11 of 5.3.
Better go to 6.0.2.2 as 5.3 will be out of support soon...
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
pshan81
PostPosted: Tue Aug 07, 2007 12:31 am    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

Saper,
Thanks.We are planning and hope we will migrate to MQ 6 soon.What will be the pros/cons of implementing of pub/sub using MQI and MQJMS(Other than the programming overhead of framing RFH header in MQI).
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Tue Aug 07, 2007 3:01 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20756
Location: LI,NY

pshan81 wrote:
Saper,
Thanks.We are planning and hope we will migrate to MQ 6 soon.What will be the pros/cons of implementing of pub/sub using MQI and MQJMS(Other than the programming overhead of framing RFH header in MQI).


You are doing pub/sub with Java. If you do not have a very compelling reason not to use JMS, I would strongly suggest you do use JMS.

Enjoy
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
pshan81
PostPosted: Tue Aug 07, 2007 4:58 am    Post subject: Reply with quote

Acolyte

Joined: 24 May 2005
Posts: 72

I have two queue managers/brokers qmgrA and qmgrB running on two different windows machine.The queue managers are connected using sender and receiver channel.

I am publishing a message on Topic "wip" from qmgrA using MQJMS.I am able to subscribe the message in qmgrB(topic "wip") If I specify qmgrB in the BROKERPUBQMGR attribute in the topic.

In addition to the above,when I run a subscriber subscribing to the same topic "wip" on qmgrA, I am not able to get the published message.However,if i specify qmgrA in the BROKERPUBQMGR attribute in the topic I am able to subscribe the message(but not in the subscriber running on qmgrb).

Is there anything I am missing out for both the subscribers(on different machines) to subscribe to the same topic?
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Tue Aug 07, 2007 12:43 pm    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20756
Location: LI,NY

First you need to know wich the publishing broker is.
This is the broker you subscribe to, irrespective of the qmgr you are on.
Make sure that the Topic object you use for the subscription and retrieval of your message specifies a queue local to the qmgr you are connected to for the retrieval of the published messages from your subscription...

Hope this is not too confusing.
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » IBM MQ API Support » MQ Publisher - Subscriber code in java
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.