|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
JMS API Publish/Subscribe help needed |
« View previous topic :: View next topic » |
Author |
Message
|
envee_10 |
Posted: Fri Apr 19, 2013 2:48 am Post subject: JMS API Publish/Subscribe help needed |
|
|
Newbie
Joined: 19 Apr 2013 Posts: 3
|
Hi,
My objective is to publish text messages to a Topic and then have this message broadcast to some queues which are subscribing to this Topic.
I tried using a modified version of the sample application that ships with the MQ installation for Solaris i.e. JmsJndiProducer.java. All I have modified is to create a TextMessage from a SOAPMessage which I read from a file hardcoded as create_subscription.xml.
On running this sample application it says that it was able to successfully publish the message, but when I try to browse the queues, there is no message in there.
If I publish the message using the "Test Publication" option from Websphere MQ Explorer, I am able to see the message that I publish in the queues that are subscribers.
What could be wrong in what I am doing in the JMS sample application.
This is what my source code looks like (please scroll below to see details of my MQ objects and environment) :
Code: |
// SCCSID "@(#) samples/jms/JmsJndiProducer.java, jmscc.samples, k700, k700-002-090421 1.4.1.1 08/06/01 09:32:43"
/*
* <N_OCO_COPYRIGHT>
* Licensed Materials - Property of IBM
*
* 5724-H72, 5655-R36, 5724-L26, 5655-L82
*
* (c) Copyright IBM Corp. 2008 All Rights Reserved.
*
* US Government Users Restricted Rights - Use, duplication or
* disclosure restricted by GSA ADP Schedule Contract with
* IBM Corp.
* <NOC_COPYRIGHT>
*/
import javax.xml.soap.*;
import java.io.*;
import javax.xml.transform.stream.StreamSource;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.directory.InitialDirContext;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsDestination;
/**
* A JMS producer (sender or publisher) application that sends a simple message to the named
* destination (queue or topic) by looking up the connection factory instance and the destination
* instance in an initial context (This sample supports file system context only).
*
* Notes:
*
* API type: IBM JMS API (v1.1, unified domain)
*
* Messaging domain: Point-to-point or Publish-Subscribe
*
* Provider type: WebSphere MQ
*
* Connection mode: Client connection or bindings connection
*
* JNDI in use: Yes
*
* Usage:
*
* JmsJndiProducer -i initialContext -c connectionFactory -d destination
*
* for example:
*
* JmsJndiProducer -i file:/C:/JNDI-Directory -c myQCF -d myQueue
*/
public class JmsJndiProducer {
private static String initialContextUrl = null;
private static String connectionFactoryFromJndi = null;
private static String destinationFromJndi = null;
// System exit status value (assume unset value to be 1)
private static int status = 1;
/**
* Main method
*
* @param args
*/
public static void main(String[] args) {
// Parse the arguments
parseArgs(args);
// Variables
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
try {
// Instantiate the initial context
String contextFactory = "com.ibm.mq.jms.context.WMQInitialContextFactory";
Hashtable environment = new Hashtable();
environment.put(Context.INITIAL_CONTEXT_FACTORY, contextFactory);
environment.put(Context.PROVIDER_URL, initialContextUrl);
Context context = new InitialDirContext(environment);
System.out.println("Initial context found!");
// Lookup the connection factory
JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup(connectionFactoryFromJndi);
System.out.println("Connection Factory found!");
// Lookup the destination
destination = (JmsDestination) context.lookup(destinationFromJndi);
System.out.println("Topic found!");
// Create JMS objects
connection = cf.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
System.out.println("Producer created !");
// SOAP Connection Factory
SOAPConnectionFactory soapFactory = SOAPConnectionFactory.newInstance();
SOAPConnection soapConnection = soapFactory.createConnection();
MessageFactory msgFactory = MessageFactory.newInstance();
SOAPMessage soapMessage = msgFactory.createMessage();
SOAPPart soapPart = soapMessage.getSOAPPart();
SOAPEnvelope soapEnvelope = soapPart.getEnvelope();
SOAPBody soapBody = soapEnvelope.getBody();
StreamSource soapMsgSource = new StreamSource(new FileInputStream("create_subscription.xml"));
soapPart.setContent(soapMsgSource);
soapMessage.saveChanges();
// Convert the SOAP Message to String format
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
soapMessage.writeTo(byteArray);
String strSoap = new String(byteArray.toByteArray());
System.out.println(strSoap);
long uniqueNumber = System.currentTimeMillis() % 1000;
TextMessage message = session
.createTextMessage(strSoap);
// Start the connection
connection.start();
System.out.println("Connection started");
// And, send the message
producer.send(message);
System.out.println("Sent message:\n" + message);
soapConnection.close();
recordSuccess();
}
catch (JMSException jmsex) {
recordFailure(jmsex);
}
catch (NamingException ne) {
System.out.println("The initial context could not be instantiated, or the lookup failed.");
recordFailure(ne);
}
catch (Exception se) {
System.out.println("SOAP Exception");
}
finally {
if (producer != null) {
try {
producer.close();
}
catch (JMSException jmsex) {
System.out.println("Producer could not be closed.");
recordFailure(jmsex);
}
}
if (session != null) {
try {
session.close();
}
catch (JMSException jmsex) {
System.out.println("Session could not be closed.");
recordFailure(jmsex);
}
}
if (connection != null) {
try {
connection.close();
}
catch (JMSException jmsex) {
System.out.println("Connection could not be closed.");
recordFailure(jmsex);
}
}
}
System.exit(status);
} // end main()
/**
* Process a JMSException and any associated inner exceptions.
*
* @param jmsex
*/
private static void processJMSException(JMSException jmsex) {
System.out.println(jmsex);
Throwable innerException = jmsex.getLinkedException();
if (innerException != null) {
System.out.println("Inner exception(s):");
}
while (innerException != null) {
System.out.println(innerException);
innerException = innerException.getCause();
}
}
/**
* Record this run as successful.
*/
private static void recordSuccess() {
System.out.println("SUCCESS");
status = 0;
}
/**
* Record this run as failure.
*
* @param ex
*/
private static void recordFailure(Exception ex) {
if (ex != null) {
if (ex instanceof JMSException) {
processJMSException((JMSException) ex);
}
else {
System.out.println(ex);
}
}
System.out.println("FAILURE");
status = -1;
}
/**
* Parse user supplied arguments.
*
* @param args
*/
private static void parseArgs(String[] args) {
try {
int length = args.length;
if (length == 0) {
throw new IllegalArgumentException("No arguments! Mandatory arguments must be specified.");
}
if ((length % 2) != 0) {
throw new IllegalArgumentException("Incorrect number of arguments!");
}
int i = 0;
while (i < length) {
if ((args[i]).charAt(0) != '-') {
throw new IllegalArgumentException("Expected a '-' character next: " + args[i]);
}
char opt = (args[i]).toLowerCase().charAt(1);
switch (opt) {
case 'i' :
initialContextUrl = args[++i];
break;
case 'c' :
connectionFactoryFromJndi = args[++i];
break;
case 'd' :
destinationFromJndi = args[++i];
break;
default :
throw new IllegalArgumentException("Unknown argument: " + opt);
}
++i;
}
if (initialContextUrl == null) {
throw new IllegalArgumentException("An initial context must be specified.");
}
if (connectionFactoryFromJndi == null) {
throw new IllegalArgumentException(
"A connection factory to lookup in the initial context must be specified.");
}
if (destinationFromJndi == null) {
throw new IllegalArgumentException(
"A destination to lookup in the initial context must be specified.");
}
}
catch (Exception e) {
System.out.println(e.getMessage());
printUsage();
System.exit(-1);
}
}
/**
* Display usage help.
*/
private static void printUsage() {
System.out.println("\nUsage:");
System.out.println("JmsJndiProducer -i initialContext -c connectionFactory -d destination");
}
} // end class
|
Here is my configuration :
Websphere MQ v7, running on Solaris 10 64-bit
The details of my MQ objects are as follows
Code: |
dis topic(ONDB.WME.TOPIC.1)
2 : dis topic(ONDB.WME.TOPIC.1)
AMQ8633: Display topic details.
TOPIC(ONDB.WME.TOPIC.1) TYPE(LOCAL)
TOPICSTR(ONDB.WME.TOPIC) DESCR(WME Topic)
CLUSTER( ) DURSUB(YES)
PUB(ASPARENT) SUB(ASPARENT)
DEFPSIST(YES) DEFPRTY(ASPARENT)
DEFPRESP(ASPARENT) ALTDATE(2013-04-19)
ALTTIME(19.17.32) PMSGDLV(ASPARENT)
NPMSGDLV(ASPARENT) PUBSCOPE(ASPARENT)
SUBSCOPE(ASPARENT) PROXYSUB(FIRSTUSE)
WILDCARD(PASSTHRU) MDURMDL( )
MNDURMDL( )
|
Subscription Details
Code: |
dis sub(ESS.WME.SUBSCRIPTION.1)
4 : dis sub(ESS.WME.SUBSCRIPTION.1)
AMQ8096: WebSphere MQ subscription inquired.
SUBID(414D51204D51535341505030393633445149EB8B2002F721)
SUB(ESS.WME.SUBSCRIPTION.1) TOPICSTR(ONDB.WME.TOPIC)
TOPICOBJ(ONDB.WME.TOPIC.1) DEST(ESS.WME.SUBSCR.QUEUE.1)
DESTQMGR(MQSSAPP0963D01) PUBAPPID( )
SELECTOR( ) USERDATA( )
PUBACCT(0000000000000000000000000000000000000000000000000000000000000000)
DESTCORL(414D51204D51535341505030393633445149EB8B2002F721)
DESTCLAS(PROVIDED) DURABLE(YES)
EXPIRY(UNLIMITED) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBSCOPE(ALL) SUBLEVEL(1)
SUBTYPE(ADMIN) VARUSER(ANY)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2013-04-04) CRTIME(18:06:26)
ALTDATE( ) ALTTIME( )
dis sub(ONDB.WME.SUBSCRIPTION.1)
5 : dis sub(ONDB.WME.SUBSCRIPTION.1)
AMQ8096: WebSphere MQ subscription inquired.
SUBID(414D51204D51535341505030393633445149EB8B2002F257)
SUB(ONDB.WME.SUBSCRIPTION.1) TOPICSTR(ONDB.WME.TOPIC)
TOPICOBJ(ONDB.WME.TOPIC.1) DEST(ONDB.WME.SUBSCR.QUEUE.1)
DESTQMGR(MQSSAPP0963D01) PUBAPPID( )
SELECTOR( ) USERDATA( )
PUBACCT(0000000000000000000000000000000000000000000000000000000000000000)
DESTCORL(414D51204D51535341505030393633445149EB8B2002F257)
DESTCLAS(PROVIDED) DURABLE(YES)
EXPIRY(UNLIMITED) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBSCOPE(ALL) SUBLEVEL(1)
SUBTYPE(ADMIN) VARUSER(ANY)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2013-04-04) CRTIME(18:01:56)
ALTDATE( ) ALTTIME( )
|
Queue Details
Code: |
dis queue(ESS.WME.SUBSCR.QUEUE.1)
6 : dis queue(ESS.WME.SUBSCR.QUEUE.1)
AMQ8409: Display Queue details.
QUEUE(ESS.WME.SUBSCR.QUEUE.1) TYPE(QLOCAL)
ACCTQ(QMGR) ALTDATE(2013-04-19)
ALTTIME(19.16.34) BOQNAME( )
BOTHRESH(0) CLUSNL( )
CLUSTER( ) CLWLPRTY(0)
CLWLRANK(0) CLWLUSEQ(QMGR)
CRDATE(2013-04-04) CRTIME(17.38.26)
CURDEPTH(1) DEFBIND(OPEN)
DEFPRTY(0) DEFPSIST(YES)
DEFPRESP(SYNC) DEFREADA(NO)
DEFSOPT(SHARED) DEFTYPE(PREDEFINED)
DESCR(ESS_Subscriber_Queue) DISTL(NO)
GET(ENABLED) HARDENBO
INITQ( ) IPPROCS(0)
MAXDEPTH(5000) MAXMSGL(4194304)
MONQ(QMGR) MSGDLVSQ(PRIORITY)
NOTRIGGER NPMCLASS(NORMAL)
OPPROCS(0) PROCESS( )
PUT(ENABLED) PROPCTL(COMPAT)
QDEPTHHI(80) QDEPTHLO(20)
QDPHIEV(DISABLED) QDPLOEV(DISABLED)
QDPMAXEV(ENABLED) QSVCIEV(NONE)
QSVCINT(999999999) RETINTVL(999999999)
SCOPE(QMGR) SHARE
STATQ(QMGR) TRIGDATA( )
TRIGDPTH(1) TRIGMPRI(0)
TRIGTYPE(FIRST) USAGE(NORMAL)
dis queue(ONDB.WME.SUBSCR.QUEUE.1)
7 : dis queue(ONDB.WME.SUBSCR.QUEUE.1)
AMQ8409: Display Queue details.
QUEUE(ONDB.WME.SUBSCR.QUEUE.1) TYPE(QLOCAL)
ACCTQ(QMGR) ALTDATE(2013-04-19)
ALTTIME(19.16.10) BOQNAME( )
BOTHRESH(0) CLUSNL( )
CLUSTER( ) CLWLPRTY(0)
CLWLRANK(0) CLWLUSEQ(QMGR)
CRDATE(2013-04-04) CRTIME(17.37.19)
CURDEPTH(1) DEFBIND(OPEN)
DEFPRTY(0) DEFPSIST(YES)
DEFPRESP(SYNC) DEFREADA(NO)
DEFSOPT(SHARED) DEFTYPE(PREDEFINED)
DESCR(ONDB_Subscriber_Queue) DISTL(NO)
GET(ENABLED) HARDENBO
INITQ( ) IPPROCS(0)
MAXDEPTH(5000) MAXMSGL(4194304)
MONQ(QMGR) MSGDLVSQ(PRIORITY)
NOTRIGGER NPMCLASS(NORMAL)
OPPROCS(0) PROCESS( )
PUT(ENABLED) PROPCTL(COMPAT)
QDEPTHHI(80) QDEPTHLO(20)
QDPHIEV(DISABLED) QDPLOEV(DISABLED)
QDPMAXEV(ENABLED) QSVCIEV(NONE)
QSVCINT(999999999) RETINTVL(999999999)
SCOPE(QMGR) SHARE
STATQ(QMGR) TRIGDATA( )
TRIGDPTH(1) TRIGMPRI(0)
TRIGTYPE(FIRST) USAGE(NORMAL)
|
|
|
Back to top |
|
 |
hughson |
Posted: Tue May 14, 2013 5:26 am Post subject: |
|
|
 Padawan
Joined: 09 May 2013 Posts: 1959 Location: Bay of Plenty, New Zealand
|
Create a subscription on the topicstr # and then see what topicstring you received.
Code: |
DEFINE SUB(CATCH-ALL) TOPICSTR('#') DEST(CATCH-ALL-Q) |
Use something to browse the queue that will show you message properties for example, the sample amqsbcg with parameter 3 with value 1.
Code: |
amqsbcg CATCH-ALL-Q qmgr-name 1 |
and see what you actually published on:-
Code: |
****Message properties****
MQTopicString : 'Sports/Football/Hursley'
|
Cheers
Morag _________________ Morag Hughson @MoragHughson
IBM MQ Technical Education Specialist
Get your IBM MQ training here!
MQGem Software |
|
Back to top |
|
 |
fjb_saper |
Posted: Wed May 15, 2013 5:52 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
You may also want to set a JMSReplyTo Destination on you publish message.
The reply to that destination will contain the reason code of the publish.
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|