|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Tobic Subscriber message listener problem |
« View previous topic :: View next topic » |
Author |
Message
|
jarun111 |
Posted: Mon Apr 19, 2004 10:35 am Post subject: Tobic Subscriber message listener problem |
|
|
Acolyte
Joined: 19 Apr 2004 Posts: 70
|
Hi,
I am facing a strange problem while using JMS PubSub Api. My class creates a connection then a subscriber and registers the subscriber with a message listenr. The connection handle is registered with a ExceptionListener. The application runs as a daemon process looking for any message through messageListener.
When the Queue Manager is brought down the Exception listener tracks the exception and then tries to create a connection to the QManager and refresh all session,subscriber and again reset the message listener and exception listener with the new connection handle. Now when the Queue MAnager is brough up the new handles are created and the lsietener and exception listener are set again. When message is published after Queue Manager restart, message listener receives only a single message after which it does not receive any. But for consequent Queue Manager restart the message listener works fine.
I am not sure about the problem. My program looks clean.See the below code and check it if it works with your broker. If you have any input do let me know.This is very importnat.Thanks in advance.
package com.citigroup.zeus.jms.pubsub.client;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.naming.directory.*;
import java.net.InetAddress;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQTopicConnectionFactory;
public class TestListener
{
com.ibm.mq.jms.MQTopic Topic = null;
messageListenerMListener2 listener1 = null;
MQTopicConnectionFactory citiFactory = null;
TopicConnection conn = null;
static boolean requireNewConnection = false;
public static void main(String[] args) throws Exception
{
try {
TestListener ts = new TestListener();
ts.startExecuting();
} catch(Exception e) {
e.printStackTrace();
}
}
public void startExecuting() {
try {
citiFactory = new MQTopicConnectionFactory();
citiFactory.setStatusRefreshInterval(1000);
citiFactory.setQueueManager("ZEUS.MQAPI_QM2.DV");
citiFactory.setHostName("eqznydb2i");
citiFactory.setChannel("ZEUS_CHL");
citiFactory.setPort(1415);
citiFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
Topic = new com.ibm.mq.jms.MQTopic("Topic1");
}
catch ( Exception nx ) {
System.out.println( "ERROR: " + nx );
}
try{
conn = citiFactory.createTopicConnection();
conn.setExceptionListener(new messageExceptionListener2());
System.out.println( "connection created" );
TopicSession sess = conn.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
System.out.println( "session created " );
TopicSubscriber sub = sess.createSubscriber(Topic);
System.out.println( "Subscriber created " );
messageListenerMListener2 listener1 = new messageListenerMListener2();
sub.setMessageListener(listener1);
System.out.println("starting connection");
conn.start();
while(true){
try {
Thread.sleep(10000);
System.out.println("I am alive");
if(requireNewConnection==true) {
TopicConnection conn1 = citiFactory.createTopicConnection();
conn1.start();
System.out.println( "connection created" );
TopicSession sess1 = conn1.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
System.out.println( "session created " );
Topic = new com.ibm.mq.jms.MQTopic("Topic1");
TopicSubscriber sub1 = sess1.createSubscriber(Topic);
messageListenerMListener2 listener11 = new messageListenerMListener2();
sub1.setMessageListener(listener11);
sub1.close();
sess1.close();
conn1.close();
conn1=null; sess1 = null; sub1=null;listener11=null;Topic=null;
conn1 = citiFactory.createTopicConnection();
conn1.start();
System.out.println( "connection created" );
sess1 = conn1.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
System.out.println( "session created " );
Topic = new com.ibm.mq.jms.MQTopic("Topic1");
sub1 = sess1.createSubscriber(Topic);
listener11 = new messageListenerMListener2();
sub1.setMessageListener(listener11);
conn1.setExceptionListener(new messageExceptionListener2());
requireNewConnection=false;
System.out.println("requireNewConnection is now set to false" );
}
}catch(Exception e) {
e.printStackTrace();
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}//end of CitiSubscribeClient class
class messageListenerMListener2 implements MessageListener
{
public void onMessage(Message message)
{
String body;
try {
if (message instanceof TextMessage) {
body = ((TextMessage)message).getText();
if (body != null)
System.out.println("message contained '"+body+"'");
}
}
catch (JMSException je) {
System.out.println("onMessage caught "+je);
}
catch (Exception e) {
System.out.println("onMessage caught "+e);
}
}
}
class messageExceptionListener2 implements ExceptionListener
{
public void onException(JMSException exc)
{
System.err.println("Exception detected");
TestListener.requireNewConnection=true;
System.err.println("requiredNewConn made to true");
}
}
Regards,
Arun |
|
Back to top |
|
 |
bower5932 |
Posted: Mon Apr 19, 2004 12:27 pm Post subject: |
|
|
 Jedi Knight
Joined: 27 Aug 2001 Posts: 3023 Location: Dallas, TX, USA
|
I didn't look at your code. I do know that there were some problems with asynchronous message delivery that were fixed in some of the WMQ 5.3 CSDs. What version of the code are you running? You can tell by using mqver. |
|
Back to top |
|
 |
jarun111 |
Posted: Mon Apr 19, 2004 12:32 pm Post subject: |
|
|
Acolyte
Joined: 19 Apr 2004 Posts: 70
|
Hi,
Version info is as follows:
bash-2.03$ mqver
Name: WebSphere MQ
Version: 530.5 CSD05
CMVC level: p530-05-L030926
BuildType: IKAP - (Production)
Regards,
Arun |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|