|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Reading a Topic Multiple Times in same Thread |
« View previous topic :: View next topic » |
Author |
Message
|
sbuster |
Posted: Wed Oct 22, 2008 4:04 am Post subject: Reading a Topic Multiple Times in same Thread |
|
|
Apprentice
Joined: 07 Oct 2008 Posts: 25
|
What options do I need to pass to the MQQueueManager.accessTopic method in order to read the same Topic multiple times. I've slightly altered the example shipped with MQ 7 and added it below. When I run this example, it gets the first message, but just sits on the destinationForGet.get() line after than. Can someone help?
package com.ibm.mq.v7;
/*
* <N_OCO_COPYRIGHT>
* Licensed Materials - Property of IBM
*
* 5724-H72, 5655-R36, 5724-L26, 5655-L82
*
* (c) Copyright IBM Corp. 2008 All Rights Reserved.
*
* US Government Users Restricted Rights - Use, duplication or
* disclosure restricted by GSA ADP Schedule Contract with
* IBM Corp.
* <NOC_COPYRIGHT>
*/
import java.io.IOException;
import java.util.Hashtable;
import com.ibm.mq.MQDestination;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
/**
* Demonstrates using the publish/subscribe API introduced into WebSphere MQ v7 Use system
* properties to define the parameters :-
* <ul>
* <li> com.ibm.mq.pubSubSample.queueManagerName
* <li> com.ibm.mq.pubSubSample.topicObject (default is "myTopicObject")
* <li> com.ibm.mq.pubSubSample.subscriberCount (default is 3)
* <ul>
*/
public class AliasExample {
// @COPYRIGHT_START@
/** Comment for copyright_notice */
static final String copyright_notice = "Licensed Materials - Property of IBM "
+ "5724-H72, 5655-R36, 5724-L26, 5655-L82 "
+ "(c) Copyright IBM Corp. 2008 All Rights Reserved. "
+ "US Government Users Restricted Rights - Use, duplication or "
+ "disclosure restricted by GSA ADP Schedule Contract with " + "IBM Corp.";
// @COPYRIGHT_END@
/** The SCCSID which is expanded when the file is extracted from CMVC */
public static final String sccsid = "@(#) samples/wmqjava/MQPubSubApiSample.java, jmscc.samples, k000, k000-L080311.1 1.5 08/03/10 15:04:13";
/** The name of the Queue Manager to use in these tests */
private String queueManagerName;
/*
* used for Wait/Notify between subscribers and publisher.
*/
private static Object syncPoint = new Object();
private static volatile int readySubscribers = 0;
private Hashtable properties;
private String topicString;
private String topicObject;
private Thread[] subscribers;
private static int subscriberCount;
AliasExample() {
topicString = null;
topicObject = System.getProperty("com.ibm.mq.pubSubSample.topicObject", "Football");
queueManagerName = "red_prairie_test.queue.manager";
subscriberCount = Integer.getInteger("com.ibm.mq.pubSubSample.subscriberCount", 1).intValue();
properties = new Hashtable();
properties.put("hostname", x
properties.put("port", new Integer(1420));
properties.put("channel", "SYSTEM.DEF.SVRCONN");
}
/**
* Main entry point
*
* @param arg command line arguments (ignored)
*/
public static void main(String[] arg) {
AliasExample sample = new AliasExample();
sample.launchSubscribers();
// Wait till the subscribers have initialised and notified us...
synchronized (syncPoint) {
while (readySubscribers < subscriberCount) {
try {
syncPoint.wait();
}
catch (InterruptedException e) {
// We'll silently ignore it
}
}
}
/*
try {
sample.doPublish();
}
catch (MQException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
*/
}
private void launchSubscribers() {
System.out.println("Launching the subscribers");
subscribers = new Thread[subscriberCount];
for (int threadNo = 0; threadNo < subscribers.length; threadNo++) {
subscribers[threadNo] = new Subscriber("Subscriber" + threadNo);
subscribers[threadNo].start();
}
}
private void doPublish() throws MQException, IOException {
int destinationType = CMQC.MQOT_TOPIC;
/* Do the publish */
MQQueueManager queueManager = new MQQueueManager(queueManagerName, properties);
MQMessage messageForPut = new MQMessage();
System.out.println("***Publishing ***");
messageForPut.writeString("Hello world!");
queueManager.put(destinationType, topicObject, messageForPut);
}
class Subscriber extends Thread {
private String myName;
final int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING
| CMQC.MQSO_MANAGED | CMQC.MQSO_NON_DURABLE;
Subscriber(String subscriberName) {
super(subscriberName);
myName = subscriberName;
}
/**
* run method for a subscriber - waits for data
*/
public void run() {
try {
System.out.println(myName + " - ***Subscribing***");
// Must use our own queue manager or we will block...
MQQueueManager queueManager = new MQQueueManager(queueManagerName, properties);
MQDestination destinationForGet = queueManager.accessTopic(topicString, topicObject,
CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);
// Let the controlling thread know we've started ...
synchronized (syncPoint) {
readySubscribers++;
syncPoint.notify();
}
MQGetMessageOptions mgmo = new MQGetMessageOptions();
mgmo.options = CMQC.MQGMO_WAIT;
mgmo.waitInterval = 30000;
System.out.println(myName + " - ***Retrieving***");
MQMessage messageForGet = new MQMessage();
synchronized (getClass()) {
int i=0;
while(i==0){
destinationForGet.get(messageForGet, mgmo);
String messageDataFromGet = messageForGet.readLine();
System.out.println(myName + " - Got [" + messageDataFromGet + "]");
messageForGet.clearMessage();
}
}
}
catch (Exception e) {
System.err.println(myName + " " + e);
e.printStackTrace();
return;
}
readySubscribers--;
}
}
} |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|