|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
non-JMS subscriber/JMS publisher |
« View previous topic :: View next topic » |
Author |
Message
|
StuBarrett |
Posted: Tue Jul 10, 2001 6:16 am Post subject: |
|
|
Newbie
Joined: 09 Jul 2001 Posts: 2
|
Hello,
We are using MQSeries version 5.2 on Solaris. We are trying to do
publish/subscribe from a JMS publisher to a non-JMS subscriber. So far the
non-JMS subscriber is only able to receive messages from non-JMS
publishers. When we use a JMS publisher, the non-JMS subscriber does not
receive any messages.
Has anyone out there tried/succeeded in using pub/sub between JMS and
non-JMS users? We've read somewhere that you can set the TargetClient value
to JMSC.MQJMS_CLIENT_NONJMS_MQ to indicate that no MQRFH2 field is to be
produced, but there doesn't seem to be a setTargetClient method on the
appropriate Java classes and this constant does not appear to be defined on our
VAJ system. We are not convinced this is the source of the problem anyway since
we do not seem to be receiving any messages, even if they are in the wrong
format.
Below is the code we are using to publish and subscribe. Any idea what we
may doing wrong???? Any form of clue/advice/direction would be greatly
appreciated.
Thanks!
Natalie Berger
################################################
############# PUBLISHER - Java/JMS ############
################################################
{
/*
* Initialize Subscribing Topics
*/
TopicConnectionFactory tFactory = null;
// Obtain the TopicConnectionFactory name from Settings
String tcfName =
Settings.getSettingValue(TOPIC_CONNECTION_FACTORY_NAME);
// Obtain the TopicConnectionFactory from JNDI
Object tcf = JndiUtil.getObjectFromJndi(Replicator._context,
tcfName);
if (tcf instanceof TopicConnectionFactory)
tFactory = (TopicConnectionFactory) tcf;
else
throw new NamingException("JNDI Can't resolve: " + tcfName);
// Create a topic connection
_tConn = tFactory.createTopicConnection();
__log.info("Created topic connection");
// Create a Event session from the connection.
// There should be one session per thread.
_tSessionEvent = _tConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Obtain the EventTopic name from settings
String evName = Settings.getSettingValue(EVENT_TOPIC_SETTING_NAME);
// Create Event Topic and Publisher
Topic topicEvent = _tSessionEvent.createTopic(evName);
_tPubEvent = _tSessionEvent.createPublisher(topicEvent);
_tPubEvent.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
_tPubEvent.setTimeToLive(
Integer.parseInt(Settings.getSettingValue(EVENT_TIME_TO_LIVE)));
_pubMessage = _tSessionEvent.createTextMessage();
__log.info("Created Event Publisher");
// Don't forget to start the connection
_tConn.start();
__log.info("Event Topic initialized");
}
#########################################################################
#################### SUBSCRIBER - C #####################################
#########################################################################
########### (((this has been condensed for clarity...)))
#########################
//
// ################ Open the broker control queue
##########################
//
MQLONG OpenCode;
MQLONG Reason;
MQLONG O_options = MQOO_OUTPUT // open queue for output
+ MQOO_FAIL_IF_QUIESCING; // but not if MQM stopping
MQOD od = {MQOD_DEFAULT};
strncpy(od.ObjectName, "SYSTEM.BROKER.CONTROL.QUEUE",
(size_t)MQ_Q_NAME_LENGTH);
MQOPEN(m_Hcon, // connection handle
&od, // object descriptor for queue
O_options, // open options
&m_Hobj, // object handle
&OpenCode, // completion code
&Reason); // reason code
if (Reason != MQRC_NONE)
{
char buff[256];
sprintf(buff,
"open: MQOPEN(%s) ended with reason code %ld",
m_qName.c_str(),
Reason);
CBERR("open", buff);
return false;
}
if (OpenCode == MQCC_FAILED)
{
CBERR("open", "unable to open queue for output");
}
//
// ################ Open the named message queue for input
################################
//
MQLONG OpenCode;
MQLONG Reason;
MQLONG O_options = MQOO_INPUT_EXCLUSIVE // open queue for input
+ MQOO_FAIL_IF_QUIESCING; // but not if MQM stopping
MQOD od = {MQOD_DEFAULT};
strncpy(od.ObjectName, "SUBSCRIBER_QUEUE", (size_t)MQ_Q_NAME_LENGTH);
MQOPEN(m_Hcon, // connection handle
&od, // object descriptor for queue
O_options, // open options
&m_Hobj, // object handle
&OpenCode, // completion code
&Reason); // reason code
if (Reason != MQRC_NONE)
{
char buff[256];
sprintf(buff,
"MQOPEN(%s) ended with reason code %ld",
m_qName.c_str(),
Reason);
CBERR("open", buff);
return false;
}
if (OpenCode == MQCC_FAILED)
{
CBERR("open", "unable to open queue for input");
}
//
// ############# Register as a Subscriber ###############
//
PMQBYTE pMessageBlock = NULL;
MQLONG messageLength;
PMQRFH pMQRFHeader;
PMQCHAR pNameValueString;
MQLONG nameValueStringLength;
MQMD md = { MQMD_DEFAULT };
MQPMO pmo = { MQPMO_DEFAULT };
MQRFH DefaultMQRFH = { MQRFH_DEFAULT };
MQLONG CompCode = 0;
MQLONG Reason = 0;
//
// Allocate a storage block for the command
//
messageLength = MAX_MSG_LENGTH;
pMessageBlock = (PMQBYTE) malloc (messageLength);
if ( pMessageBlock == NULL)
{
CBERR("sendBrokerCommand", "unable to alloc mem for msg block");
}
else
{
// init buffer to nulls
memset(pMessageBlock, 0, MAX_MSG_LENGTH);
// define MQRFH values
pMQRFHeader = (PMQRFH) (pMessageBlock);
memcpy( pMQRFHeader, &DefaultMQRFH,
(size_t)MQRFH_STRUC_LENGTH_FIXED);
// start the NameValueString after MQRFH
pNameValueString = ((MQCHAR *) pMQRFHeader) +
MQRFH_STRUC_LENGTH_FIXED;
// construct command
strcpy(pNameValueString, MQPS_COMMAND_B);
strcat(pNameValueString, MQPS_REGISTER_SUBSCRIBER);
strcat(pNameValueString, MQPS_STREAM_NAME_B);
strcat(pNameValueString, "SYSTEM.BROKER.DEFAULT.STREAM");
strcat(pNameValueString, MQPS_Q_NAME_B);
strcat(pNameValueString, "SUBSCRIBER_QUEUE");
strcat(pNameValueString, MQPS_TOPIC_B);
strcat(pNameValueString, "topic://CrewSolver/Event";
// calc length of NameValueString (incl null)
nameValueStringLength = strlen(pNameValueString) + 1;
// calc end of structure and align to 16-byte boundary
messageLength = MQRFH_STRUC_LENGTH_FIXED +
((nameValueStringLength+15)/16)*16;
pMQRFHeader->StrucLength = messageLength;
// set md for MQRFH msg
// send as a request so we get a response to reply q
memcpy(md.Format, MQFMT_RF_HEADER, MQ_FORMAT_LENGTH);
md.MsgType = MQMT_REQUEST;
strcpy( md.ReplyToQ, m_qName.c_str());
pmo.Options |= MQPMO_NEW_MSG_ID;
MQPUT( m_Hcon,
m_broker.getObjectHandle(),
&md,
&pmo,
messageLength,
pMessageBlock,
&CompCode,
&Reason );
// check outcome and response
if (CompCode != MQCC_OK)
{
char buff[256];
sprintf(buff,
"MQPUT failed with CompCode %ld and Reason %ld",
CompCode, Reason);
CBERR("sendBrokerCommand", buff);
}
else
{
checkResponse(md.MsgId);
}
// clean up
free( pMessageBlock );
//
// ########### get message ############
//
// Set up get message options ...
//
MQGMO gmo = {MQGMO_DEFAULT};
if (waitIntervalInMilliseconds == -1)
{
gmo.Options = MQGMO_CONVERT; // convert if necessary
}
else
{
gmo.Options = MQGMO_WAIT // wait for new messages
+ MQGMO_CONVERT; // convert if necessary
gmo.WaitInterval = 10000;
}
//
// Set up message descriptor ...
//
MQMD md = {MQMD_DEFAULT};
// In order to read the messages in sequence, MsgId and
// CorrelID must have the default value. MQGET sets them
// to the values in for message it returns, so re-initialise
// them before every call
memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId));
memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId));
// MQGET sets Encoding and CodedCharSetId to the values in
// the message returned, so these fields should be reset to
// the default values before every call, as MQGMO_CONVERT is
// specified.
md.Encoding = MQENC_NATIVE;
md.CodedCharSetId = MQCCSI_Q_MGR;
//
// Get the next message ...
//
MQLONG messlen, CompCode, Reason;
MQGET(m_Hcon, // connection handle
m_Hobj, // object handle
&md, // message descriptor
&gmo, // get message options
m_buffSize - 1, // buffer length
m_buff, // message buffer
&messlen, // message length
&CompCode, // completion code
&Reason); // reason code
//
// Bail if no message is currently available ...
//
if (Reason == MQRC_NO_MSG_AVAILABLE)
{
char buff[256];
sprintf(buff, "No msg available on PS queue %s", m_qName.c_str());
CBMSG("getMsg", buff);
return NULL;
}
//
// Check for an error ...
//
if (Reason != MQRC_NONE)
{
char buff[256];
if (Reason == MQRC_TRUNCATED_MSG_FAILED)
{
sprintf(buff,
"MQGET failed - msg was truncated");
}
else
{
sprintf(buff,
"MQGET ended with reason code %ld",
Reason);
}
return NULL;
}
//
// A message was received and no error ...
//
replyToQMgr.assign(md.ReplyToQMgr, sizeof(md.ReplyToQMgr)-1);
replyToQName.assign(md.ReplyToQ, sizeof(md.ReplyToQ)-1);
//
// We are only expecting MQRFH messages...
//
PMQRFH pMQRFHeader = (PMQRFH) ( m_buff );
char * payload = (char*) (m_buff + pMQRFHeader->StrucLength);
outBuff = std::string(payload);
_________________ Stu Barrett (512) 617-2119
CALEB Technologies, Corp. (512) 345-1974 (fax)
9130 Jollyville Road, Suite 100 stu.barrett@calebtech.com
Austin, TX 78759 www.calebtech.com |
|
Back to top |
|
 |
kolban |
Posted: Tue Jul 10, 2001 6:35 am Post subject: |
|
|
 Grand Master
Joined: 22 May 2001 Posts: 1072 Location: Fort Worth, TX, USA
|
Wow ... lots of code ... With both the JMS Publisher and JMS Subscriber active, use the amqspsd sample program supplied with MA0C (assuming that is what you are using) and inquire on what is being published and what is being actively subscribed. Just on a hunch, have a look at the topic names that the MQSeries Pub/Sub broker is subscribing on for the applications. I am not 100% certain that the broker knows a topic as "topic://value" it may simply be "value". |
|
Back to top |
|
 |
StuBarrett |
Posted: Tue Jul 10, 2001 9:51 am Post subject: |
|
|
Newbie
Joined: 09 Jul 2001 Posts: 2
|
Thanks for the tip. The name was the key. It seems like JMS strips off the topic:// portion (its a feature!) and since the C++ program was using the entire string there was never a match.
You were very helpful, this was my first use of this information source and it worked very well. |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|