ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » IBM MQ Java / JMS » JMS API Publish/Subscribe help needed

Post new topic  Reply to topic
 JMS API Publish/Subscribe help needed « View previous topic :: View next topic » 
Author Message
envee_10
PostPosted: Fri Apr 19, 2013 2:48 am    Post subject: JMS API Publish/Subscribe help needed Reply with quote

Newbie

Joined: 19 Apr 2013
Posts: 3

Hi,

My objective is to publish text messages to a Topic and then have this message broadcast to some queues which are subscribing to this Topic.

I tried using a modified version of the sample application that ships with the MQ installation for Solaris i.e. JmsJndiProducer.java. All I have modified is to create a TextMessage from a SOAPMessage which I read from a file hardcoded as create_subscription.xml.

On running this sample application it says that it was able to successfully publish the message, but when I try to browse the queues, there is no message in there.

If I publish the message using the "Test Publication" option from Websphere MQ Explorer, I am able to see the message that I publish in the queues that are subscribers.

What could be wrong in what I am doing in the JMS sample application.

This is what my source code looks like (please scroll below to see details of my MQ objects and environment) :

Code:

// SCCSID "@(#) samples/jms/JmsJndiProducer.java, jmscc.samples, k700, k700-002-090421  1.4.1.1 08/06/01 09:32:43"
/*
 * <N_OCO_COPYRIGHT>
 * Licensed Materials - Property of IBM
 *
 * 5724-H72, 5655-R36, 5724-L26, 5655-L82
 *
 * (c) Copyright IBM Corp. 2008 All Rights Reserved.
 *
 * US Government Users Restricted Rights - Use, duplication or
 * disclosure restricted by GSA ADP Schedule Contract with
 * IBM Corp.
 * <NOC_COPYRIGHT>
 */

import javax.xml.soap.*;
import java.io.*;
import javax.xml.transform.stream.StreamSource;

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.directory.InitialDirContext;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsDestination;

/**
 * A JMS producer (sender or publisher) application that sends a simple message to the named
 * destination (queue or topic) by looking up the connection factory instance and the destination
 * instance in an initial context (This sample supports file system context only).
 *
 * Notes:
 *
 * API type: IBM JMS API (v1.1, unified domain)
 *
 * Messaging domain: Point-to-point or Publish-Subscribe
 *
 * Provider type: WebSphere MQ
 *
 * Connection mode: Client connection or bindings connection
 *
 * JNDI in use: Yes
 *
 * Usage:
 *
 * JmsJndiProducer -i initialContext -c connectionFactory -d destination
 *
 * for example:
 *
 * JmsJndiProducer -i file:/C:/JNDI-Directory -c myQCF -d myQueue
 */
public class JmsJndiProducer {

  private static String initialContextUrl = null;
  private static String connectionFactoryFromJndi = null;
  private static String destinationFromJndi = null;

  // System exit status value (assume unset value to be 1)
  private static int status = 1;

  /**
   * Main method
   *
   * @param args
   */
  public static void main(String[] args) {
    // Parse the arguments
    parseArgs(args);

    // Variables
    Connection connection = null;
    Session session = null;
    Destination destination = null;
    MessageProducer producer = null;

    try {
      // Instantiate the initial context
      String contextFactory = "com.ibm.mq.jms.context.WMQInitialContextFactory";
      Hashtable environment = new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
      environment.put(Context.PROVIDER_URL, initialContextUrl);
      Context context = new InitialDirContext(environment);
      System.out.println("Initial context found!");

      // Lookup the connection factory
      JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup(connectionFactoryFromJndi);
      System.out.println("Connection Factory found!");

      // Lookup the destination
      destination = (JmsDestination) context.lookup(destinationFromJndi);
      System.out.println("Topic found!");

      // Create JMS objects
      connection = cf.createConnection();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      producer = session.createProducer(destination);
      System.out.println("Producer created !");

      // SOAP Connection Factory
      SOAPConnectionFactory soapFactory = SOAPConnectionFactory.newInstance();
      SOAPConnection soapConnection = soapFactory.createConnection();
      MessageFactory msgFactory = MessageFactory.newInstance();
      SOAPMessage soapMessage = msgFactory.createMessage();
      SOAPPart soapPart = soapMessage.getSOAPPart();
      SOAPEnvelope soapEnvelope = soapPart.getEnvelope();
      SOAPBody soapBody = soapEnvelope.getBody();

      StreamSource soapMsgSource = new StreamSource(new FileInputStream("create_subscription.xml"));
      soapPart.setContent(soapMsgSource);
      soapMessage.saveChanges();

      // Convert the SOAP Message to String format
      ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
      soapMessage.writeTo(byteArray);

      String strSoap = new String(byteArray.toByteArray());
      System.out.println(strSoap);

      long uniqueNumber = System.currentTimeMillis() % 1000;
      TextMessage message = session
          .createTextMessage(strSoap);

      // Start the connection
      connection.start();
        System.out.println("Connection started");

      // And, send the message
      producer.send(message);
      System.out.println("Sent message:\n" + message);

      soapConnection.close();

      recordSuccess();
    }
    catch (JMSException jmsex) {
      recordFailure(jmsex);
    }
    catch (NamingException ne) {
      System.out.println("The initial context could not be instantiated, or the lookup failed.");
      recordFailure(ne);
    }
    catch (Exception se) {
            System.out.println("SOAP Exception");
    }
    finally {
      if (producer != null) {
        try {
          producer.close();
        }
        catch (JMSException jmsex) {
          System.out.println("Producer could not be closed.");
          recordFailure(jmsex);
        }
      }

      if (session != null) {
        try {
          session.close();
        }
        catch (JMSException jmsex) {
          System.out.println("Session could not be closed.");
          recordFailure(jmsex);
        }
      }

      if (connection != null) {
        try {
          connection.close();
        }
        catch (JMSException jmsex) {
          System.out.println("Connection could not be closed.");
          recordFailure(jmsex);
        }
      }
    }
    System.exit(status);
  } // end main()

  /**
   * Process a JMSException and any associated inner exceptions.
   *
   * @param jmsex
   */
  private static void processJMSException(JMSException jmsex) {
    System.out.println(jmsex);
    Throwable innerException = jmsex.getLinkedException();
    if (innerException != null) {
      System.out.println("Inner exception(s):");
    }
    while (innerException != null) {
      System.out.println(innerException);
      innerException = innerException.getCause();
    }
  }

  /**
   * Record this run as successful.
   */
  private static void recordSuccess() {
    System.out.println("SUCCESS");
    status = 0;
  }

  /**
   * Record this run as failure.
   *
   * @param ex
   */
  private static void recordFailure(Exception ex) {
    if (ex != null) {
      if (ex instanceof JMSException) {
        processJMSException((JMSException) ex);
      }
      else {
        System.out.println(ex);
      }
    }
    System.out.println("FAILURE");
    status = -1;
  }

  /**
   * Parse user supplied arguments.
   *
   * @param args
   */
  private static void parseArgs(String[] args) {
    try {
      int length = args.length;
      if (length == 0) {
        throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified.");
      }
      if ((length % 2) != 0) {
        throw new IllegalArgumentException("Incorrect number of arguments!");
      }

      int i = 0;

      while (i < length) {
        if ((args[i]).charAt(0) != '-') {
          throw new IllegalArgumentException("Expected a '-' character next: " + args[i]);
        }

        char opt = (args[i]).toLowerCase().charAt(1);

        switch (opt) {
          case 'i' :
            initialContextUrl = args[++i];
            break;
          case 'c' :
            connectionFactoryFromJndi = args[++i];
            break;
          case 'd' :
            destinationFromJndi = args[++i];
            break;
          default :
            throw new IllegalArgumentException("Unknown argument: " + opt);
        }

        ++i;
      }

      if (initialContextUrl == null) {
        throw new IllegalArgumentException("An initial context must be specified.");
      }

      if (connectionFactoryFromJndi == null) {
        throw new IllegalArgumentException(
            "A connection factory to lookup in the initial context must be specified.");
      }

      if (destinationFromJndi == null) {
        throw new IllegalArgumentException(
            "A destination to lookup in the initial context must be specified.");
      }
    }
    catch (Exception e) {
      System.out.println(e.getMessage());
      printUsage();
      System.exit(-1);
    }
  }

  /**
   * Display usage help.
   */
  private static void printUsage() {
    System.out.println("\nUsage:");
    System.out.println("JmsJndiProducer -i initialContext -c connectionFactory -d destination");
  }

} // end class





Here is my configuration :
Websphere MQ v7, running on Solaris 10 64-bit

The details of my MQ objects are as follows

Code:

dis topic(ONDB.WME.TOPIC.1)
     2 : dis topic(ONDB.WME.TOPIC.1)
AMQ8633: Display topic details.
   TOPIC(ONDB.WME.TOPIC.1)                 TYPE(LOCAL)
   TOPICSTR(ONDB.WME.TOPIC)                DESCR(WME Topic)
   CLUSTER( )                              DURSUB(YES)
   PUB(ASPARENT)                           SUB(ASPARENT)
   DEFPSIST(YES)                           DEFPRTY(ASPARENT)
   DEFPRESP(ASPARENT)                      ALTDATE(2013-04-19)
   ALTTIME(19.17.32)                       PMSGDLV(ASPARENT)
   NPMSGDLV(ASPARENT)                      PUBSCOPE(ASPARENT)
   SUBSCOPE(ASPARENT)                      PROXYSUB(FIRSTUSE)
   WILDCARD(PASSTHRU)                      MDURMDL( )
   MNDURMDL( )



Subscription Details

Code:


dis sub(ESS.WME.SUBSCRIPTION.1)
     4 : dis sub(ESS.WME.SUBSCRIPTION.1)
AMQ8096: WebSphere MQ subscription inquired.
   SUBID(414D51204D51535341505030393633445149EB8B2002F721)
   SUB(ESS.WME.SUBSCRIPTION.1)             TOPICSTR(ONDB.WME.TOPIC)
   TOPICOBJ(ONDB.WME.TOPIC.1)              DEST(ESS.WME.SUBSCR.QUEUE.1)
   DESTQMGR(MQSSAPP0963D01)                PUBAPPID( )
   SELECTOR( )                             USERDATA( )
   PUBACCT(0000000000000000000000000000000000000000000000000000000000000000)
   DESTCORL(414D51204D51535341505030393633445149EB8B2002F721)
   DESTCLAS(PROVIDED)                      DURABLE(YES)
   EXPIRY(UNLIMITED)                       PSPROP(MSGPROP)
   PUBPRTY(ASPUB)                          REQONLY(NO)
   SUBSCOPE(ALL)                           SUBLEVEL(1)
   SUBTYPE(ADMIN)                          VARUSER(ANY)
   WSCHEMA(TOPIC)                          SUBUSER(mqm)
   CRDATE(2013-04-04)                      CRTIME(18:06:26)
   ALTDATE( )                              ALTTIME( )


dis sub(ONDB.WME.SUBSCRIPTION.1)
     5 : dis sub(ONDB.WME.SUBSCRIPTION.1)
AMQ8096: WebSphere MQ subscription inquired.
   SUBID(414D51204D51535341505030393633445149EB8B2002F257)
   SUB(ONDB.WME.SUBSCRIPTION.1)            TOPICSTR(ONDB.WME.TOPIC)
   TOPICOBJ(ONDB.WME.TOPIC.1)              DEST(ONDB.WME.SUBSCR.QUEUE.1)
   DESTQMGR(MQSSAPP0963D01)                PUBAPPID( )
   SELECTOR( )                             USERDATA( )
   PUBACCT(0000000000000000000000000000000000000000000000000000000000000000)
   DESTCORL(414D51204D51535341505030393633445149EB8B2002F257)
   DESTCLAS(PROVIDED)                      DURABLE(YES)
   EXPIRY(UNLIMITED)                       PSPROP(MSGPROP)
   PUBPRTY(ASPUB)                          REQONLY(NO)
   SUBSCOPE(ALL)                           SUBLEVEL(1)
   SUBTYPE(ADMIN)                          VARUSER(ANY)
   WSCHEMA(TOPIC)                          SUBUSER(mqm)
   CRDATE(2013-04-04)                      CRTIME(18:01:56)
   ALTDATE( )                              ALTTIME( )



Queue Details

Code:

dis queue(ESS.WME.SUBSCR.QUEUE.1)
     6 : dis queue(ESS.WME.SUBSCR.QUEUE.1)
AMQ8409: Display Queue details.
   QUEUE(ESS.WME.SUBSCR.QUEUE.1)           TYPE(QLOCAL)
   ACCTQ(QMGR)                             ALTDATE(2013-04-19)
   ALTTIME(19.16.34)                       BOQNAME( )
   BOTHRESH(0)                             CLUSNL( )
   CLUSTER( )                              CLWLPRTY(0)
   CLWLRANK(0)                             CLWLUSEQ(QMGR)
   CRDATE(2013-04-04)                      CRTIME(17.38.26)
   CURDEPTH(1)                             DEFBIND(OPEN)
   DEFPRTY(0)                              DEFPSIST(YES)
   DEFPRESP(SYNC)                          DEFREADA(NO)
   DEFSOPT(SHARED)                         DEFTYPE(PREDEFINED)
   DESCR(ESS_Subscriber_Queue)             DISTL(NO)
   GET(ENABLED)                            HARDENBO
   INITQ( )                                IPPROCS(0)
   MAXDEPTH(5000)                          MAXMSGL(4194304)
   MONQ(QMGR)                              MSGDLVSQ(PRIORITY)
   NOTRIGGER                               NPMCLASS(NORMAL)
   OPPROCS(0)                              PROCESS( )
   PUT(ENABLED)                            PROPCTL(COMPAT)
   QDEPTHHI(80)                            QDEPTHLO(20)
   QDPHIEV(DISABLED)                       QDPLOEV(DISABLED)
   QDPMAXEV(ENABLED)                       QSVCIEV(NONE)
   QSVCINT(999999999)                      RETINTVL(999999999)
   SCOPE(QMGR)                             SHARE
   STATQ(QMGR)                             TRIGDATA( )
   TRIGDPTH(1)                             TRIGMPRI(0)
   TRIGTYPE(FIRST)                         USAGE(NORMAL)


dis queue(ONDB.WME.SUBSCR.QUEUE.1)
     7 : dis queue(ONDB.WME.SUBSCR.QUEUE.1)
AMQ8409: Display Queue details.
   QUEUE(ONDB.WME.SUBSCR.QUEUE.1)          TYPE(QLOCAL)
   ACCTQ(QMGR)                             ALTDATE(2013-04-19)
   ALTTIME(19.16.10)                       BOQNAME( )
   BOTHRESH(0)                             CLUSNL( )
   CLUSTER( )                              CLWLPRTY(0)
   CLWLRANK(0)                             CLWLUSEQ(QMGR)
   CRDATE(2013-04-04)                      CRTIME(17.37.19)
   CURDEPTH(1)                             DEFBIND(OPEN)
   DEFPRTY(0)                              DEFPSIST(YES)
   DEFPRESP(SYNC)                          DEFREADA(NO)
   DEFSOPT(SHARED)                         DEFTYPE(PREDEFINED)
   DESCR(ONDB_Subscriber_Queue)            DISTL(NO)
   GET(ENABLED)                            HARDENBO
   INITQ( )                                IPPROCS(0)
   MAXDEPTH(5000)                          MAXMSGL(4194304)
   MONQ(QMGR)                              MSGDLVSQ(PRIORITY)
   NOTRIGGER                               NPMCLASS(NORMAL)
   OPPROCS(0)                              PROCESS( )
   PUT(ENABLED)                            PROPCTL(COMPAT)
   QDEPTHHI(80)                            QDEPTHLO(20)
   QDPHIEV(DISABLED)                       QDPLOEV(DISABLED)
   QDPMAXEV(ENABLED)                       QSVCIEV(NONE)
   QSVCINT(999999999)                      RETINTVL(999999999)
   SCOPE(QMGR)                             SHARE
   STATQ(QMGR)                             TRIGDATA( )
   TRIGDPTH(1)                             TRIGMPRI(0)
   TRIGTYPE(FIRST)                         USAGE(NORMAL)

Code:
Back to top
View user's profile Send private message
hughson
PostPosted: Tue May 14, 2013 5:26 am    Post subject: Reply with quote

Padawan

Joined: 09 May 2013
Posts: 1959
Location: Bay of Plenty, New Zealand

Create a subscription on the topicstr # and then see what topicstring you received.

Code:
DEFINE SUB(CATCH-ALL) TOPICSTR('#') DEST(CATCH-ALL-Q)


Use something to browse the queue that will show you message properties for example, the sample amqsbcg with parameter 3 with value 1.

Code:
amqsbcg CATCH-ALL-Q qmgr-name 1


and see what you actually published on:-

Code:
****Message properties****

  MQTopicString : 'Sports/Football/Hursley'


Cheers
Morag
_________________
Morag Hughson @MoragHughson
IBM MQ Technical Education Specialist
Get your IBM MQ training here!
MQGem Software
Back to top
View user's profile Send private message Visit poster's website
fjb_saper
PostPosted: Wed May 15, 2013 5:52 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20756
Location: LI,NY

You may also want to set a JMSReplyTo Destination on you publish message.
The reply to that destination will contain the reason code of the publish.

Have fun
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » IBM MQ Java / JMS » JMS API Publish/Subscribe help needed
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.