|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
MQ Pub/Sub using MQSI Broker |
« View previous topic :: View next topic » |
Author |
Message
|
Maneesh |
Posted: Tue Jul 10, 2001 5:12 am Post subject: |
|
|
Newbie
Joined: 09 Jul 2001 Posts: 2
|
I am facing a strange problem when trying to run a Publish Subscribe
application. Basically
this is an application which publishes messages based on a given topic
(Command line arguments).
The publisher and subscriber programs works well when i use the MA0C
support pac broker. Publisher publishes the messages successfully and the
subscriber receives the messages successfully.
I wanted to try these same applications on an MQSI broker environment.
I created a simple message flow where there are two Nodes one is the Input
node which reads the messages from SYSTEM.BROKER.DEFAULT.STREAM and the
second node is the publication node. I didn't change any values for this
node. All the components are running viz., Queue manager and the broker.
But when i run the publisher application it throws an error "Broker is not
running, please start with the command 'strmqbrk'...". I am sure my queue
manager has all the required queues. It throws this error message exactly
at the line...
tPub.publish(msg,DeliveryMode.NON_PERSISTENT,priority,liveTime);
which means it's unable to publish. Where am i going wrong? Why is it working in MAOC environment but not in MQSI
environment? Any pointers would be helpful
Thanks in adavance...
|
|
Back to top |
|
 |
kolban |
Posted: Tue Jul 10, 2001 5:21 am Post subject: |
|
|
 Grand Master
Joined: 22 May 2001 Posts: 1072 Location: Fort Worth, TX, USA
|
Maneesh, I think I am confused by your message flow. In MQSI V2, you should get the message that you want to publish from a "regular" MQSeries queue (for example "IN"). The MQInput node should be configured to get messages from that queue. When the flow gets the message, it should then send it to a Publish node which will then publish the message based on the topic. I really don't see why you have a message flow reading directly from the SYSTEM.BROKER.DEFAULT.STREAM. I think that is asking for trouble.
With MA0C, the way you publish is to put a message directly on that queue, with MQSI V2, that is not how it is done.
[ This Message was edited by: kolban on 2001-07-10 06:22 ] |
|
Back to top |
|
 |
Maneesh |
Posted: Tue Jul 10, 2001 5:41 am Post subject: |
|
|
Newbie
Joined: 09 Jul 2001 Posts: 2
|
Thanks kolban ..
u r right if i'm using msg flows..
but right now am using java applications
to publish and subscribe..
I'm sending u the code tooo for publish and subscribe....
This code works fine with MA0C.. but creates problem when used with MQSI broker...
package Jms.Pubsub;
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.Hashtable;
import com.ibm.mq.jms.*;
public class Publisher {
private TopicSession pubSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String userName;
public Publisher(String topicName, String username, String password) throws Exception {
TopicConnectionFactory conFactory = new MQTopicConnectionFactory();
((MQTopicConnectionFactory)conFactory).setChannel("CH1");
((MQTopicConnectionFactory)conFactory).setHostName("192.168.168.61");
((MQTopicConnectionFactory)conFactory).setPort(5010);
((MQTopicConnectionFactory)conFactory).setQueueManager("FMCQM");
System.out.println("setters done...");
TopicConnection connection = conFactory.createTopicConnection();
System.out.println("got TopicConnection");
TopicSession pubSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic chatTopic = new MQTopic();
((MQTopic)chatTopic).setBaseTopicName(topicName);
System.out.println("topic set to " + topicName);
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
// Intialize the Chat application
set(connection, pubSession, publisher, username);
// Start the JMS connection; allows messages to be delivered
connection.start();
}
/* Initializes the instance variables */
public void set(TopicConnection con, TopicSession pubSess, TopicPublisher pub, String username) {
this.connection = con;
this.pubSession = pubSess;
this.publisher = pub;
this.userName = username;
}
/* Receive message from topic subscriber */
/*
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch(JMSException jmse){ jmse.printStackTrace(); }
}
*/
/* Create and send message using topic publisher */
protected void writeMessage(String text)throws JMSException {
TextMessage message = pubSession.createTextMessage();
message.setText(userName+" : "+text);
publisher.publish(message);
}
public void close(){
try{
if (pubSession != null){
System.out.println("closing session");
pubSession.close();
}
if (connection != null){
System.out.println("----closing connection----");
connection.close();
}
}
catch(JMSException jmse){
System.out.println(jmse);
}
}
public static void main(String [] args){
try{
if(args.length!=3){
System.out.println("usage : Publisher topicName user password");
System.exit(0);
}
/*
* args[0]=topicName;
* args[1]=username;
* args[2]=password
*/
System.out.println(args[0]);
Publisher chat = new Publisher(args[0],args[1],args[2]);
BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in));
while(true){
String s = commandLine.readLine();
if(s.equalsIgnoreCase("exit")){
chat.close(); // close down connection
System.exit(0);// exit program
}else
chat.writeMessage(s);
}
}catch(Exception e){ e.printStackTrace(); }
}
}
package Jms.Pubsub;
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.Hashtable;
import com.ibm.mq.jms.*;
public class Subscriber implements javax.jms.MessageListener{
private TopicSession pubSession;
private TopicSession subSession;
private TopicConnection connection;
private String userName;
/* Constructor. Establish JMS publisher and subscriber */
public Subscriber(String topicName){
try{
TopicConnectionFactory conFactory = new MQTopicConnectionFactory();
((MQTopicConnectionFactory)conFactory).setHostName("192.168.168.72");
((MQTopicConnectionFactory)conFactory).setPort(1414);
((MQTopicConnectionFactory)conFactory).setQueueManager("QM_maneesh");
System.out.println("setters done...");
TopicConnection connection = conFactory.createTopicConnection();
System.out.println("got TopicConnection");
TopicSession subSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic chatTopic = new MQTopic();
((MQTopic)chatTopic).setBaseTopicName(topicName);
System.out.println("Base topic set to :" + ((MQTopic)chatTopic).getBaseTopicName());
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application
set(connection, subSession);
// Start the JMS connection; allows messages to be delivered
connection.start();
while(true){}
}catch(JMSException jmse){
System.out.println(jmse);
}
finally{
if (subSession != null){
try{
System.out.println("closing session");
subSession.close();
}
catch (JMSException e){
System.err.println("caught "+e);
}
}
if (connection != null){
try{
System.out.println("----closing connection----");
connection.close();
}
catch (JMSException e){
System.err.println("connection.close() threw "+e);
}
}
}
}
/* Initializes the instance variables */
public void set(TopicConnection con, TopicSession subSess) {
this.connection = con;
this.subSession = subSess;
}
/* Receive message from topic subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch(JMSException jmse){ jmse.printStackTrace(); }
}
/* Close the JMS connection */
public void close() throws JMSException {
connection.close();
}
/* Run the Chat client */
public static void main(String [] args){
if(args.length!=1){
System.out.println("usage :Subscriber topic name");
System.exit(0);
}
System.out.println(args[0]);
Subscriber chat = new Subscriber(args[0]);
/*while(true){
}*/
}
}
|
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|