| Author | 
		  Message
		 | 
		
		  | murdeep | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 9:39 am    Post subject: Event Broker: Multicast connection cannot be established | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				Hi, I am following Redbook sg246088 and experimenting with Event Broker V6.
 
 
I am doing the multicast pub/sub example utilizing the real time transport Chapter 10. 
 
 
I have enabled the broker to support multicast, I have created the topics and ensured they support multicast. 
 
 
In JNDI I created the TCF to support multicast.
 
 
When I start the subscriber using the multicast TCF I get following error:
 
 
javax.jms.JMSException: MQJMS1102: Multicast connection cannot be established
 
        at com.ibm.mq.jms.services.ConfigEnvironment.newException(ConfigEnvironment.java:567)
 
        at com.ibm.mq.jms.MessageConsumerImpl.<init>(MessageConsumerImpl.java:369)
 
        at com.ibm.mq.jms.TopicSubscriberImpl.<init>(TopicSubscriberImpl.java:121)
 
        at com.ibm.mq.jms.TopicSessionImpl.createSubscriber(TopicSessionImpl.java:401)
 
        at com.ibm.mq.jms.TopicSessionImpl.createSubscriber(TopicSessionImpl.java:261)
 
        at Subscriber.main(Subscriber.java:143)
 
 
Anyone know if there are known issues with multicast with V6? or if there is something else I need to change/verify?
 
 
If I change the TCF to multicast(enabled) it works but I think this reverts to real-time and is not true multicast.
 
 
Thanks | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 11:07 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				| What platform and protocol are you using? | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 12:35 pm    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				| I am using W2K EB V6. As far as protocol is concerned I am not clear on what you are asking. The TCF is defined to use direct transport and reliable multicast. | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 3:58 pm    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				On v5 there was only one multicast protocol - "PTL". On v6 there are two more: "PGM/UDP" and "PGM/IP". PGM/IP requires admin rights...
 
I haven't tried running the redbook sample, but from what I saw looking at it, it runs using localhost, so that you are not actually sending messages out over the network... Is that right - are you subscribing to a local box? In that case you will never see true multicast functionality as such, although you can confirm that your topics are multicast enabled by using java calls.
 
It is actually possible to subscribe to a multicast topic that hasn't been created, so it is unlikely that your topic is in error.
 
You said in your first note that you actually got it working? | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 4:15 pm    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				Update:  I tried the example on a V5 broker and obtained the same error - 
 
javax.jms.JMSException: MQJMS1102: Multicast connection cannot be established
 
        at com.ibm.mq.jms.services.ConfigEnvironment.newException(ConfigEnvironment.java:567)
 
        at com.ibm.mq.jms.MessageConsumerImpl.<init>(MessageConsumerImpl.java:369)
 
        at com.ibm.mq.jms.TopicSubscriberImpl.<init>(TopicSubscriberImpl.java:121)
 
        at com.ibm.mq.jms.TopicSessionImpl.createSubscriber(TopicSessionImpl.java:401)
 
        at com.ibm.mq.jms.TopicSessionImpl.createSubscriber(TopicSessionImpl.java:261)
 
        at Subscriber.main(Subscriber.java:143)
 
 
So this seems to indicate that this is not related to V6. 
 
 
When I said I got it working I changed the TCF multicast attribute to enabled from reliable. This indicates that if reliable cannot be supported by the broker it reverts to real time transport. Reliable will cause an error if the broker cannot support multicast.
 
 
Do you think that running the subscriber on the same box as the broker and/or broker may be causing the problem? I'll try running the subscriber on another box and see if that makes a difference. | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Wed Feb 08, 2006 10:13 pm    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				Ok, I redefined my TCF definitions and tried starting the publisher and subscriber apps and got past the "MQJMS1102: Multicast connection cannot be established" error. 
 
 
However, the subscriber doesn't receive the multicast messages published. Both the publisher and subscriber are referencing the same topic in the JNDI.
 
 
Any recommendation on how to debug why the published messages are being picked up by the subscriber? | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 1:52 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				For testing purposes it is better if you test on your local box first so that potential network issues cannot get in the way.
 
 
Enabled and reliable are two different things. You should set the TCF as enabled.
 
MQTopicConnectionFactory tcf = new MQTopicConnectionFactory();
 
tcf.setMulticast(JMSC.MQJMS_MULTICAST_ENABLED);
 
I don't think that reliable/unreliable have any meaning unless multicast is enabled. I think reliable/unreliable only applies to topics... It sounds to me like you were getting it to work correctly on your local box initially!
 
 
To debug a java subscriber, type
 
java -DMQJMS_TRACE_LEVEL=base <your stuff>
 
This will produce a file called mqjms.trc in the directory where you run the command.
 
 
If you are now trying to subscribe remotely, remember to increase your Time To Live setting on the Multicast GUI to reflect the number of network hops between your two boxes. A value of >1 is necessary and 10 should be enough. Also it is worth changing the GUI value for the IPv4 Minimum address. The default is 224.0.0.0 but the 224.0.0-1.x range values are reserved and may get bounced back by the network. Ensure your topic address is at a higher value, like 239.0.0.0 | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 2:34 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				I wasn't aware of the setting that you've been using, which I guess is:
 
tcf.setMulticast(JMSC.MQJMS_MULTICAST_RELIABLE);
 
This forces the JMS client to only accept connections to multicast topics which have had their Quality of Service set to Reliable (settable on GUI). If there is a mismatch between these two settings, a Java exception will be thrown (this is expected behaviour). | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 8:53 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				Ok, some more testing results. I did make the changes recommended by dirac, increased the IPv4 min address, and increased the Multicast TTL.
 
 
In JNDI I have two TCFs, one named MC has multicast(reliable), the other named IP has multicast(disabled). 
 
 
I start the publisher using the MC TCF and it connects no problem. 
 
 
I then start the subscriber with MC TCF and it connects no problem. 
 
 
I publish a message and the subscriber doesn't see it. 
 
 
I then stop the subscriber and restart it with IP TCF. 
 
 
I then publish a message and the subscriber gets it. Note the publisher is still using the MC TCF. 
 
 
So, this leads me to believe that the publisher and subscriber apps are working and the fact that when the subscriber uses the MC TCF it doesn't get the messages has something to do with the network or the broker and how the multicast is configured. 
 
 
I traced the eg and saw nothing erroneous, the messages seem to indicate that the publisher was working and the broker did in fact publish. So I think this is a network issue. I'm not much of a network expert so I don't know what to look for in that area. Any ideas?
 
 
Thanks | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 9:09 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				| Try starting the subscriber before the publisher | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 9:33 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				| dirac, I have started the subscriber before and after the publisher and it makes no difference. When I use the TCF with multicast(disabled) I can  start the publisher or subscriber in any order and the subscriber gets the published messages. If the subscriber starts after the publisher it just doesn't get the messages published before it starts. | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Thu Feb 09, 2006 10:30 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				| I haven't tried mixing TCF's like that myself. I'll try it on my setup tomorrow and let you know how I get on... | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | dirac | 
		  
		    
			  
				 Posted: Fri Feb 10, 2006 3:58 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Novice
 
 Joined: 31 Mar 2005 Posts: 23
  
  | 
		  
		    
			  
				Hi there - I can't reproduce your error. 
 
 
Let me explain my setup as I'm not totally sure what you have on your box.
 
On the GUI I have made my broker multicast enabled. I have 2 topics defined. Topic MC is set as multicast enabled and reliable. Topic IP is set as mulitcast disabled. (the root topic is multicast enabled and reliable)
 
In my Java code I create two tcf's
 
 
   
	| Code: | 
   
  
	MQTopicConnectionFactory tcf = new MQTopicConnectionFactory();
 
tcf.setTransportType(JMSC.MQJMS_TP_DIRECT_TCPIP);
 
tcf.setHostName(brokerName);
 
tcf.setPort(brokerPort);
 
MQTopicConnectionFactory tcf2 = new MQTopicConnectionFactory();
 
tcf2.setTransportType(JMSC.MQJMS_TP_DIRECT_TCPIP);
 
tcf2.setHostName(brokerName);
 
tcf2.setPort(brokerPort);
 
// Set multicast 
 
tcf.setMulticast(JMSC.MQJMS_MULTICAST_ENABLED);
 
tcf.setMulticast(JMSC.MQJMS_MULTICAST_RELIABLE);
 
tcf2.setMulticast(JMSC.MQJMS_MULTICAST_DISABLED);
 
// Create topic connection
 
TopicConnection tc = tcf.createTopicConnection();
 
TopicConnection tc2 = tcf2.createTopicConnection();
 
TopicSession ts =tc.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
 
TopicSession ts2 =
 
tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
// Create topic
 
Topic topic = ts.createTopic("TC");
 
((MQTopic) topic).setBrokerVersion(JMSC.MQJMS_BROKER_V2);
 
Topic topic2 = ts.createTopic("IP");
 
((MQTopic) topic2).setBrokerVersion(JMSC.MQJMS_BROKER_V2);
 
// Create subscriber
 
TopicSubscriber tSub = ts.createSubscriber(topic);
 
TopicSubscriber tSub2 = ts2.createSubscriber(topic2);
 
// Start the connection
 
tc.start();
 
tc2.start();  | 
   
 
 
 
I don't have any trouble pubsubbing to either topic.
 
 
If you have the same setup, my suspicion is that you may have made an error... can you check that after creating your TCF's, your createPublisher and createSubscriber calls for "MC" are actually pointing at "MC", and not "IP" by mistake, as this might explain the behaviour you are seeing. | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Fri Feb 10, 2006 8:30 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				dirac, your config is slightly different than mine since I am using JNDI to obtain the topic and topic connection factory definitions.
 
 
In jmsadmin I have the following:
 
 
InitCtx> dis t(multicast)
 
 
    BROKERPUBQMGR()
 
    FAILIFQUIESCE(YES)
 
    BROKERDURSUBQ(SYSTEM.JMS.D.SUBSCRIBER.QUEUE)
 
    TOPIC(messages/multicast)
 
    BROKERPUBQ()
 
    BROKERVER(V2)
 
    PERSISTENCE(APP)
 
    CCSID(1208)
 
    TARGCLIENT(JMS)
 
    ENCODING(NATIVE)
 
    BROKERCCDURSUBQ(SYSTEM.JMS.D.CC.SUBSCRIBER.QUEUE)
 
    MULTICAST(ASCF)
 
    PRIORITY(APP)
 
    EXPIRY(APP)
 
    VERSION(6)
 
 
InitCtx> dis tcf(IP_2004)
 
 
    MULTICAST(DISABLED)
 
    CCSID(819)
 
    BROKERSUBQ(SYSTEM.JMS.ND.SUBSCRIBER.QUEUE)
 
    PORT(2004)
 
    SYNCPOINTALLGETS(NO)
 
    DIRECTAUTH(BASIC)
 
    PUBACKINT(25)
 
    POLLINGINT(5000)
 
    RECEIVEISOLATION(COMMITTED)
 
    MSGSELECTION(BROKER)
 
    OUTCOMENOTIFICATION(YES)
 
    CHANNEL(SYSTEM.DEF.SVRCONN)
 
    OPTIMISTICPUBLICATION(NO)
 
    BROKERCONQ(SYSTEM.BROKER.CONTROL.QUEUE)
 
    USECONNPOOLING(YES)
 
    HOSTNAME(127.0.0.1)
 
    CLONESUPP(DISABLED)
 
    LOCALADDRESS()
 
    SPARSESUBS(NO)
 
    VERSION(6)
 
    BROKERCCSUBQ(SYSTEM.JMS.ND.CC.SUBSCRIBER.QUEUE)
 
    PROCESSDURATION(UNKNOWN)
 
    SSLRESETCOUNT(0)
 
    CONNOPT(STANDARD)
 
    MAXBUFFSIZE(1000)
 
    BROKERQMGR()
 
    SSLFIPSREQUIRED(NO)
 
    FAILIFQUIESCE(YES)
 
    CLEANUP(SAFE)
 
    RESCANINT(5000)
 
    MSGBATCHSZ(10)
 
    BROKERPUBQ(SYSTEM.BROKER.DEFAULT.STREAM)
 
    QMANAGER()
 
    PROXYPORT(443)
 
    BROKERVER(V2)
 
    CLEANUPINT(3600000)
 
    STATREFRESHINT(60000)
 
    SUBSTORE(MIGRATE)
 
    TRANSPORT(DIRECT)
 
    TARGCLIENTMATCHING(YES)
 
 
InitCtx> dis tcf(MC_2004)
 
 
    MULTICAST(RELIABLE)
 
    CCSID(819)
 
    BROKERSUBQ(SYSTEM.JMS.ND.SUBSCRIBER.QUEUE)
 
    PORT(2004)
 
    SYNCPOINTALLGETS(NO)
 
    DIRECTAUTH(BASIC)
 
    PUBACKINT(25)
 
    POLLINGINT(5000)
 
    RECEIVEISOLATION(COMMITTED)
 
    MSGSELECTION(BROKER)
 
    OUTCOMENOTIFICATION(YES)
 
    CHANNEL(SYSTEM.DEF.SVRCONN)
 
    OPTIMISTICPUBLICATION(NO)
 
    BROKERCONQ(SYSTEM.BROKER.CONTROL.QUEUE)
 
    USECONNPOOLING(YES)
 
    HOSTNAME(127.0.0.1)
 
    CLONESUPP(DISABLED)
 
    LOCALADDRESS()
 
    SPARSESUBS(NO)
 
    VERSION(6)
 
    BROKERCCSUBQ(SYSTEM.JMS.ND.CC.SUBSCRIBER.QUEUE)
 
    PROCESSDURATION(UNKNOWN)
 
    SSLRESETCOUNT(0)
 
    CONNOPT(STANDARD)
 
    MAXBUFFSIZE(1000)
 
    BROKERQMGR()
 
    SSLFIPSREQUIRED(NO)
 
    FAILIFQUIESCE(YES)
 
    CLEANUP(SAFE)
 
    RESCANINT(5000)
 
    MSGBATCHSZ(10)
 
    BROKERPUBQ(SYSTEM.BROKER.DEFAULT.STREAM)
 
    QMANAGER()
 
    PROXYPORT(443)
 
    BROKERVER(V2)
 
    CLEANUPINT(3600000)
 
    STATREFRESHINT(60000)
 
    SUBSTORE(MIGRATE)
 
    TRANSPORT(DIRECT)
 
    TARGCLIENTMATCHING(YES)
 
 
In the WBI-EB V6 GUI I have created the messages/multicast topic and it is set to multicast(enabled) QoS(Reliable). The broker is also multicast enabled.
 
 
The publisher and subscriber code is vanilla redbook code except I have changed where the JNDI file lives.
 
 
Publisher code:
 
 
 
 
   
	| Code: | 
   
  
	import javax.jms.*;
 
import java.io.*;
 
import java.util.*;
 
import javax.naming.*;
 
import javax.naming.directory.*;
 
 
// A publisher class
 
// Takes in a TCF and topic and looks them up in JNDI
 
// Creates JMS objects in order to publish messages
 
// Waits for command line input for message data
 
// Loops until it receives "quit"
 
// Closes JMS objects and terminates
 
 
public class Publisher
 
{
 
    public static void main(String args[])
 
    {
 
        // Strings for default ICF and URL for JNDI connection
 
        String icf = "com.sun.jndi.fscontext.RefFSContextFactory";
 
        String url = "file://C:/JNDI-Directory";
 
 
        // Variables to track modes
 
        boolean trans = true;
 
        boolean pers = true;
 
 
        // Required JMS object declarations
 
        Context ctx = null;
 
        TopicConnectionFactory tcf = null;
 
        Topic topic = null;
 
        TopicConnection conn = null;
 
        TopicSession session = null;
 
        TopicPublisher pub = null;
 
        BufferedReader in = null;
 
 
        output("Publisher Application");
 
 
        // Check our input parameters are correct
 
        if(args.length != 4)
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        // Verify our transactionality from input parameter
 
        if(args[2].equals("tran")) trans = true;
 
        else if(args[2].equals("notran")) trans = false;
 
        else
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        // Verify our message persistence from input parameter
 
        if(args[3].equals("pers")) pers = true;
 
        else if(args[3].equals("nonpers")) pers = false;
 
        else
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        if(trans) output("Using transactional control");
 
        else output("Not using transactional control");
 
 
        if(pers) output("Messages are persistent");
 
        else output("Message are not persistent");
 
 
        try
 
        {
 
            // Create a Hashtable for JNDI required parameters
 
            Hashtable env = new Hashtable();
 
            env.put(Context.INITIAL_CONTEXT_FACTORY, icf);
 
            env.put(Context.PROVIDER_URL, url);
 
            env.put(Context.REFERRAL, "throw");
 
 
            // Initialise the JNDI Context
 
            ctx = new InitialDirContext(env);
 
        }
 
        catch(Exception e)
 
        {
 
            output("Unable to create JNDI context");
 
            e.printStackTrace();
 
            System.exit(0);
 
        }
 
 
        // Obtain the TCF from JNDI context
 
        try
 
        {
 
            tcf = (TopicConnectionFactory)ctx.lookup(args[0]);
 
        }
 
        catch(Exception e)
 
        {
 
            usage();
 
            e.printStackTrace();
 
            try{ctx.close();}catch(Exception ce){}
 
            System.exit(0);
 
        }
 
 
        // Obtain the Topic from JNDI context
 
        try
 
        {
 
            topic = (Topic)ctx.lookup(args[1]);
 
            output("Using Topic: " + topic.toString());
 
        }
 
        catch(Exception e)
 
        {
 
            usage();
 
            e.printStackTrace();
 
            try{ctx.close();}catch(Exception ce){}
 
            System.exit(0);
 
        }
 
 
        // Close our JNDI context
 
        try{ctx.close();}catch(Exception ce){}
 
 
        // Create a reader from stdin to take text for published messages
 
        try
 
        {
 
            in = new BufferedReader(new InputStreamReader(System.in));
 
        }
 
        catch(Exception e)
 
        {
 
            output("Unable to create BufferedReader from System.in");
 
            e.printStackTrace();
 
            System.exit(0);
 
        }
 
 
        // Create our JMS objects
 
        try
 
        {
 
            // Create TopicConnection
 
            conn = tcf.createTopicConnection();
 
            output("Connection created");
 
 
            // Create TopicSession with required transactionality
 
            session = conn.createTopicSession(trans,
 
                                              Session.AUTO_ACKNOWLEDGE);
 
            if(trans) output("Transacted Session created");
 
            else output("Non-transacted Session created");
 
 
            // Create TopicPublisher
 
            pub = session.createPublisher(topic);
 
 
            // Set correct delivery mode (persistence) on publisher
 
            if(pers) pub.setDeliveryMode(DeliveryMode.PERSISTENT);
 
            else pub.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
            if(pers) output("Publisher for persistent messages created");
 
            else output("Publisher for non-persistent messages created");
 
        }
 
        catch(JMSException e)
 
        {
 
            output("Unable to create JMS objects");
 
            e.printStackTrace();
 
            Exception le = e.getLinkedException();
 
            if(le != null) le.printStackTrace();
 
 
            // Close the reader
 
            if(in != null)
 
                try{in.close();}catch(Exception ie){}
 
 
            // Close our JMS objects
 
            if(pub != null)
 
                try{pub.close();}catch(JMSException je){}
 
            if(session != null)
 
                try{session.close();}catch(JMSException je){}
 
            if(conn != null)
 
                try{conn.close();}catch(JMSException je){}
 
 
            System.exit(0);
 
        }
 
 
        // Go into a publishing loop
 
        String input = "";
 
        boolean run = true;
 
        while((!(input.equals("quit"))) && run)
 
        {
 
            output("Waiting for input...");
 
            try
 
            {
 
                // Wait for input from command line
 
                input = in.readLine();
 
 
                // Received input, create and publish message from it
 
                TextMessage msg = session.createTextMessage();
 
                msg.setText(input);
 
                pub.publish(msg);
 
                output("Message published: '"+input+"'");
 
 
                // Commit the send if we are transacted
 
                if(trans) session.commit();
 
 
                // NOTE even "quit" messages are published
 
                // This stops the subscribers too
 
            }
 
            catch(JMSException e)
 
            {
 
                output("Error while publishing messages");
 
                e.printStackTrace();
 
                Exception le = e.getLinkedException();
 
                if(le != null) le.printStackTrace();
 
                run = false;
 
            }
 
            catch(IOException ie)
 
            {
 
                output("Error while publishing messages");
 
                ie.printStackTrace();
 
                run = false;
 
            }
 
        }
 
 
        output("Terminating application");
 
        // Close the reader
 
        if(in != null)
 
            try{in.close();}catch(Exception e){}
 
 
        // Close our JMS objects
 
        if(pub != null)
 
            try{pub.close();}catch(JMSException je){}
 
        if(session != null)
 
            try{session.close();}catch(JMSException je){}
 
        if(conn != null)
 
            try{conn.close();}catch(JMSException je){}
 
        output("JMS objects closed");
 
        output("Application terminated");
 
    }
 
 
    private static void usage()
 
    {
 
        output("java Publisher TCF Topic tran|notran pers|nonpers");
 
    }
 
 
    private static void output(String s)
 
    {
 
        System.out.println(s);
 
    }
 
}
 
 | 
   
 
 
 
Subscriber code:
 
 
   
	| Code: | 
   
  
	import javax.jms.*;
 
import java.io.*;
 
import java.util.*;
 
import javax.naming.*;
 
import javax.naming.directory.*;
 
 
// A subscriber class
 
// Takes in a TCF and topic and looks them up in JNDI
 
// Creates JMS objects in order to receive messages over pub/sub
 
// Loops receiving messages until it receives "quit"
 
// Closes JMS objects and terminates
 
 
public class Subscriber
 
{
 
    public static void main(String args[])
 
    {
 
        // Strings for default ICF and URL for JNDI connection
 
        String icf = "com.sun.jndi.fscontext.RefFSContextFactory";
 
        String url = "file://C:/JNDI-Directory";
 
 
        // Variables to track modes
 
        String subid = "";
 
        boolean trans = true;
 
        boolean dur = true;
 
 
        // Required JMS object declarations
 
        Context ctx = null;
 
        TopicConnectionFactory tcf = null;
 
        Topic topic = null;
 
        TopicConnection conn = null;
 
        TopicSession session = null;
 
        TopicSubscriber sub = null;
 
        BufferedReader in = null;
 
 
        output("Subscriber Application");
 
 
        // Check our input parameters are correct
 
        if((args.length != 4) && (args.length != 5))
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        // Verify our transactionality from input parameter
 
        if(args[2].equals("tran")) trans = true;
 
        else if(args[2].equals("notran")) trans = false;
 
        else
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        // Verify our subscriber durability from input parameter
 
        if(args[3].equals("dur")) dur = true;
 
        else if(args[3].equals("nondur")) dur = false;
 
        else
 
        {
 
            usage();
 
            System.exit(0);
 
        }
 
 
        // Detect and use subscriber ID if we are durable
 
        if(dur) subid = args[4];
 
 
        if(trans) output("Using transactional control");
 
        else output("Not using transactional control");
 
 
        if(dur) output("Using a durable subscription, subscriber ID: " +
 
                       subid);
 
        else output("Using a non-durable subscription");
 
 
        try
 
        {
 
            // Create a Hashtable for JNDI required parameters
 
            Hashtable env = new Hashtable();
 
            env.put(Context.INITIAL_CONTEXT_FACTORY, icf);
 
            env.put(Context.PROVIDER_URL, url);
 
            env.put(Context.REFERRAL, "throw");
 
 
            // Initialise the JNDI Context
 
            ctx = new InitialDirContext(env);
 
        }
 
        catch(Exception e)
 
        {
 
            output("Unable to create JNDI context");
 
            e.printStackTrace();
 
            System.exit(0);
 
        }
 
 
        // Obtain the TCF from JNDI context
 
        try
 
        {
 
            tcf = (TopicConnectionFactory)ctx.lookup(args[0]);
 
        }
 
        catch(Exception e)
 
        {
 
            usage();
 
            e.printStackTrace();
 
            try{ctx.close();}catch(Exception ce){}
 
            System.exit(0);
 
        }
 
 
        // Obtain the Topic from JNDI context
 
        try
 
        {
 
            topic = (Topic)ctx.lookup(args[1]);
 
            output("Using Topic: " + topic.toString());
 
        }
 
        catch(Exception e)
 
        {
 
            usage();
 
            e.printStackTrace();
 
            try{ctx.close();}catch(Exception ce){}
 
            System.exit(0);
 
        }
 
 
        // Close our JNDI context
 
        try{ctx.close();}catch(Exception ce){}
 
 
        // Create our JMS objects
 
        try
 
        {
 
            // Create TopicConnection
 
            conn = tcf.createTopicConnection();
 
            output("Connection created");
 
 
            // Create TopicSession with required transactionality
 
            session = conn.createTopicSession(trans,
 
                                              Session.AUTO_ACKNOWLEDGE);
 
            if(trans) output("Transacted Session created");
 
            else output("Non-transacted Session created");
 
 
            // Create TopicSubscriber
 
            if(dur)
 
            {
 
                // Create durable subscription
 
                sub = session.createDurableSubscriber(topic, subid);
 
                output("Durable subscriber created");
 
            }
 
            else
 
            {
 
                // Create ordinary subscription
 
                sub = session.createSubscriber(topic);
 
                output("Non-durable subscriber created");
 
            }
 
            conn.start();
 
        }
 
        catch(JMSException e)
 
        {
 
            output("Unable to create JMS objects");
 
            e.printStackTrace();
 
            Exception le = e.getLinkedException();
 
            if(le != null) le.printStackTrace();
 
 
            // Close our JMS objects
 
            if(sub != null)
 
                try{sub.close();}catch(JMSException je){}
 
            if(session != null)
 
                try{session.close();}catch(JMSException je){}
 
            if(conn != null)
 
                try{conn.close();}catch(JMSException je){}
 
 
            System.exit(0);
 
        }
 
 
        // Go into a subscribing loop
 
        boolean run = true;
 
        while(run)
 
        {
 
            try
 
            {
 
                // Wait for 5 seconds to receive a message
 
                TextMessage msg = (TextMessage)sub.receive(5000);
 
 
                if(msg != null)
 
                {
 
                    // We got a message so process it
 
                    String data = msg.getText();
 
                    output("Received message: '"+data+"'");
 
 
                    // If "quit" then stop looping
 
                    if(data.equals("quit")) run = false;
 
 
                    // Commit the receive if we are transacted
 
                    if(trans) session.commit();
 
                }
 
                else
 
                {
 
                    // We didn't get a message, so loop round again
 
                    output("No message received");
 
                }
 
            }
 
            catch(JMSException e)
 
            {
 
                output("Error while receiving messages");
 
                e.printStackTrace();
 
                Exception le = e.getLinkedException();
 
                if(le != null) le.printStackTrace();
 
                run = false;
 
            }
 
        }
 
 
        output("Terminating application");
 
 
        // Close our JMS objects
 
        if(sub != null)
 
            try{sub.close();}catch(JMSException je){}
 
 
        // Check to see if we are to delete the durable subscription
 
        if(dur)
 
        {
 
            try
 
            {
 
                // Create a reader from stdin
 
                in = new BufferedReader(new InputStreamReader(System.in));
 
                System.out.print("'y' deletes the durable subscription: ");
 
                String input = in.readLine();
 
                if(input.equals("y"))
 
                {
 
                    // Unsubscribe the durable subscription
 
                    session.unsubscribe(subid);
 
                    output("Durable subscription deleted");
 
                }
 
            }
 
            catch(JMSException e)
 
            {
 
                output("Error unsubscribing durable subscription");
 
                e.printStackTrace();
 
                Exception le = e.getLinkedException();
 
                if(le != null) le.printStackTrace();
 
            }
 
            catch(Exception e)
 
            {
 
                output("Error unsubscribing durable subscription");
 
                e.printStackTrace();
 
            }
 
 
        }
 
 
        // Close the rest of the JMS objects
 
        if(session != null)
 
            try{session.close();}catch(JMSException je){}
 
        if(conn != null)
 
            try{conn.close();}catch(JMSException je){}
 
        output("JMS objects closed");
 
        output("Application terminated");
 
    }
 
 
    private static void usage()
 
    {
 
        output("java Subscriber TCF Topic tran|notran dur|nondur [subID]");
 
    }
 
 
    private static void output(String s)
 
    {
 
        System.out.println(s);
 
    }
 
}
 
 
 | 
   
 
 
 
I start the publisher using:
 
 
java Publisher MC_2004 multicast notran nonpers
 
 
I start the subscriber using:
 
 
java Subscriber MC_2004 multicast notran nondur
 
 
Publish a message and the subscriber doesn't see it.
 
 
I restart the subscriber using:
 
 
java Subscriber IP_2004 multicast notran nondur
 
 
Publish a message and the subscriber receives it.
 
 
 
So I think I have everything set up correctly in the broker and jndi... | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | murdeep | 
		  
		    
			  
				 Posted: Fri Feb 10, 2006 8:41 am    Post subject:  | 
				     | 
			   
			 
		   | 
		
		
		   Master
 
 Joined: 03 Nov 2004 Posts: 211
  
  | 
		  
		    
			  
				| dira, one additional note. On the broker I have changed IPv4 min address to 239.0.0.0 and TTL to 10. | 
			   
			 
		   | 
		
		
		  | Back to top | 
		  
		  	
		   | 
		
		
		    | 
		
		
		  | 
		    
		   |