Author |
Message
|
paustin_ours |
Posted: Wed Apr 18, 2018 4:47 pm Post subject: subscribe on request - retained publication |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
I used the MQ sample pub sub java code and tweaked it a little
Code: |
/*
* <copyright
* notice="lm-source-program"
* pids="5724-H72,5655-R36,5655-L82,5724-L26"
* years="2008,2014"
* crc="1158690601" >
* Licensed Materials - Property of IBM
*
* 5724-H72,5655-R36,5655-L82,5724-L26
*
* (C) Copyright IBM Corp. 2008, 2014 All Rights Reserved.
*
* US Government Users Restricted Rights - Use, duplication or
* disclosure restricted by GSA ADP Schedule Contract with
* IBM Corp.
* </copyright>
*/
import java.io.IOException;
import java.util.Hashtable;
import com.ibm.mq.MQDestination;
import com.ibm.mq.jmqi.*;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.msg.client.wmq.compat.jms.internal.MQPubSubServices;
/**
* Demonstrates using the publish/subscribe API introduced into WebSphere MQ v7 Use system
* properties to define the parameters :-
* <ul>
* <li>com.ibm.mq.pubSubSample.queueManagerName
* <li>com.ibm.mq.pubSubSample.topicObject (default is "myTopicObject")
* <li>com.ibm.mq.pubSubSample.subscriberCount (default is 3)
* <ul>
*/
public class TopicSub {
/** The SCCSID which is expanded when the file is extracted from CMVC */
public static final String sccsid = "@(#) MQMBID sn=p904-L171030.1 su=_fzLDAL1hEeet95U9U8zg4w pn=MQJavaSamples/wmqjava/MQPubSubApiSample.java";
/** The name of the Queue Manager to use in these tests */
private String queueManagerName;
/*
* used for Wait/Notify between subscribers and publisher.
*/
private static Object syncPoint = new Object();
private static volatile int readySubscribers = 0;
private Hashtable<String, Object> properties;
private String topicString;
private String topicObject;
private Thread[] subscribers;
private static int subscriberCount;
TopicSub() {
topicString = null;
topicObject = System.getProperty("com.ibm.mq.pubSubSample.topicObject", "mon");
queueManagerName = System.getProperty("com.ibm.mq.pubSubSample.queueManagerName");
subscriberCount = Integer.getInteger("com.ibm.mq.pubSubSample.subscriberCount", 1).intValue();
properties = new Hashtable<String, Object>();
properties.put("hostname", "server1.com");
properties.put("port", new Integer(1414));
properties.put("channel", "server1.chl");
return;
}
/**
* Main entry point
*
* @param arg command line arguments (ignored)
*/
public static void main(String[] arg) {
TopicSub sample = new TopicSub();
sample.launchSubscribers();
// Wait till the subscribers have initialised and notified us...
synchronized (syncPoint) {
while (readySubscribers < subscriberCount) {
try {
syncPoint.wait();
}
catch (InterruptedException e) {
// We'll silently ignore it
}
}
}
/*try {
sample.doPublish();
}
catch (MQException e) {
e.printStackTrace();
for (Throwable t = e.getCause(); t != null; t = t.getCause()) {
System.out.println("... Caused by ");
t.printStackTrace();
}
}
catch (IOException e) {
e.printStackTrace();
}*/
return;
}
private void launchSubscribers() {
System.out.println("Launching the subscribers");
subscribers = new Thread[subscriberCount];
for (int threadNo = 0; threadNo < subscribers.length; threadNo++) {
subscribers[threadNo] = new Subscriber("Subscriber" + threadNo);
subscribers[threadNo].start();
}
return;
}
private void doPublish() throws MQException, IOException {
int destinationType = CMQC.MQOT_TOPIC;
/* Do the publish */
MQQueueManager queueManager = new MQQueueManager(queueManagerName, properties);
MQMessage messageForPut = new MQMessage();
System.out.println("***Publishing ***");
messageForPut.writeString("Hello world!");
queueManager.put(destinationType, topicObject, messageForPut);
return;
}
class Subscriber extends Thread {
private String myName;
final int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING
| CMQC.MQSO_MANAGED | CMQC.MQSO_NON_DURABLE;
Subscriber(String subscriberName) {
super(subscriberName);
myName = subscriberName;
return;
}
/**
* run method for a subscriber - waits for data
*/
@Override
public void run() {
try {
System.out.println(myName + " - ***Subscribing***");
// Must use our own queue manager or we will block...
MQQueueManager queueManager = new MQQueueManager(queueManagerName, properties);
MQDestination destinationForGet = queueManager.accessTopic(topicString, topicObject,
CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);
// Let the controlling thread know we've started ...
synchronized (syncPoint) {
readySubscribers++;
syncPoint.notify();
}
MQGetMessageOptions mgmo = new MQGetMessageOptions();
mgmo.options = CMQC.MQGMO_WAIT;
mgmo.waitInterval = 30000;
System.out.println(myName + " - ***Retrieving***");
MQMessage messageForGet = new MQMessage();
synchronized (getClass()) {
destinationForGet.get(messageForGet, mgmo);
}
String messageDataFromGet = messageForGet.readLine();
System.out.println(myName + " - Got [" + messageDataFromGet + "]");
}
catch (Exception e) {
System.err.println(myName + " " + e);
e.printStackTrace();
for (Throwable t = e.getCause(); t != null; t = t.getCause()) {
System.out.println("... Caused by ");
t.printStackTrace();
}
return;
}
readySubscribers--;
return;
}
}
}
|
this retrieves the latest published messages but i need to get retained publish messages for this topic.
I tried using CMQC.MQSO_PUBLICATIONS_ON_REQUEST in the openOptions
but wasn't sure how to make a subscription request call in Java. |
|
Back to top |
|
 |
RogerLacroix |
Posted: Thu Apr 19, 2018 3:32 pm Post subject: |
|
|
 Jedi Knight
Joined: 15 May 2001 Posts: 3264 Location: London, ON Canada
|
|
Back to top |
|
 |
mvic |
Posted: Thu Apr 19, 2018 3:54 pm Post subject: Re: subscribe on request - retained publication |
|
|
 Jedi
Joined: 09 Mar 2004 Posts: 2080
|
paustin_ours wrote: |
but wasn't sure how to make a subscription request call in Java. |
You're asking the forum an unbounded question like that?
I don't think you will get an answer.
Try a more precise question. |
|
Back to top |
|
 |
paustin_ours |
Posted: Thu Apr 19, 2018 6:06 pm Post subject: |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
|
Back to top |
|
 |
paustin_ours |
Posted: Fri Apr 20, 2018 4:39 am Post subject: |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
Roger,
This code again only gets the latest subscription, don't look like it is getting the retained publication. I am subscribing to topic object "test" with the below topic string.
SYSTEM.FTE/monitors/# topic
when i create a subscription on the queue manager
dis sub('testsub') ALL
5 : dis sub('testsub') ALL
AMQ8096: WebSphere MQ subscription inquired.
SUBID(414D512044495245504F514D312044495245504F514D3120)
SUB() TOPICSTR(SYSTEM.FTE/monitors/#)
TOPICOBJ(test) DISTYPE(RESOLVED)
DEST(TEST.LOCAL) DESTQMGR(QM1)
PUBAPPID( ) SELECTOR( )
SELTYPE(NONE) USERDATA( )
PUBACCT(0334353000000000000000000000000000000000000000000000000000000006)
DESTCORL(414D512044495245504F514D31202020A7DEE659C977642E)
DESTCLAS(PROVIDED) DURABLE(YES)
EXPIRY(UNLIMITED) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBSCOPE(ALL) SUBLEVEL(1)
SUBTYPE(ADMIN) VARUSER(ANY)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2017-11-30) CRTIME(10:59:51)
ALTDATE(2017-11-30) ALTTIME(10:59:51)
this drops 175 msgs on the TEST.LOCAL queue and it matches the 175 monitor jobs on the queue manager
I am trying to get the same thing via the code but code only gets one message for 1 monitor. how do i get all? |
|
Back to top |
|
 |
RogerLacroix |
Posted: Fri Apr 20, 2018 2:00 pm Post subject: |
|
|
 Jedi Knight
Joined: 15 May 2001 Posts: 3264 Location: London, ON Canada
|
paustin_ours wrote: |
This code again only gets the latest subscription, don't look like it is getting the retained publication. I am subscribing to topic object "test" with the below topic string. |
From the 3rd paragraph of: https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.pro.doc/q004940_.htm
Quote: |
The queue manager can retain only one publication for each topic, so the existing retained publication of a topic is deleted when a new retained publication arrives at the queue manager. |
paustin_ours wrote: |
this drops 175 msgs on the TEST.LOCAL queue and it matches the 175 monitor jobs on the queue manager |
You will need 175 topics and publish one retained message to each topic. You may be able to subscribe to the parent topic with a wildcard, but I don't know if MQ will see the subscription as a single subscription or a subscription to 175 topics.
Regards,
Roger Lacroix
Capitalware Inc. _________________ Capitalware: Transforming tomorrow into today.
Connected to MQ!
Twitter |
|
Back to top |
|
 |
fjb_saper |
Posted: Fri Apr 20, 2018 8:21 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
In addition to the very pertinent remarks from my esteemed colleague, there is a little fact I'd like to remind you of:
There is no subscription capable of reading previously published items if the publication was not done with the retain option. And only the last published message on the topic is retained (ever) IF the publication was set up to retain that message...
So if you're not getting the message, did you verify the publication settings?  _________________ MQ & Broker admin |
|
Back to top |
|
 |
paustin_ours |
Posted: Sun Apr 22, 2018 6:54 pm Post subject: |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
yes it is a retained publication, i think you missed that when i created a subscription the queue manager, i was getting 175 messages. those are all retained publication messages. |
|
Back to top |
|
 |
fjb_saper |
Posted: Sun Apr 22, 2018 9:54 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
Look at the subscription options. I'm sure there is one that says send me all the retained messages...  _________________ MQ & Broker admin |
|
Back to top |
|
 |
paustin_ours |
Posted: Mon Apr 23, 2018 2:35 pm Post subject: |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
This gets some of the subscriptions but not all
int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING | CMQC.MQSO_NON_DURABLE |CMQC.MQSO_SCOPE_QMGR; |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Apr 23, 2018 4:01 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
|
Back to top |
|
 |
paustin_ours |
Posted: Mon Apr 23, 2018 4:24 pm Post subject: |
|
|
Yatiri
Joined: 19 May 2004 Posts: 667 Location: columbus,oh
|
yes. I posted the same link earlier. I tried using MQSO_PUBLICATIONS_ON_REQUEST with a non-durable subscriber. I dont get anything back.
If you see my post earlier, i am not sure how to make a MQSUBRQ request from within my Java code as it says i need to do in the link.
https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_8.0.0/com.ibm.mq.ref.dev.doc/q101940_.htm
the above link shows how to do this for C and COBOL but I am not following as to how to do this via Java.
the sample code above |
|
Back to top |
|
 |
fjb_saper |
Posted: Wed Apr 25, 2018 2:34 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
|
Back to top |
|
 |
|