|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Multiple MQGET on a single QUEUE |
« View previous topic :: View next topic » |
Author |
Message
|
mandy13 |
Posted: Fri Sep 20, 2013 5:08 am Post subject: Multiple MQGET on a single QUEUE |
|
|
Apprentice
Joined: 17 Apr 2013 Posts: 28
|
Hi,
We need to implement multiple MQGET's for a single queue. As queue contains lots of messages and we need to store these messages in DB (messages to remain on MQ till DB write is successful)
For this we thought of using MQCB function, since we have implemented MQCB before, after reading the following from the IBM's website.
http://pic.dhe.ibm.com/infocenter/wmqv7/v7r5/index.jsp?topic=%2Fcom.ibm.mq.ref.dev.doc%2Fq101720_.htm
Quote: |
Only one callback function can be registered against each object handle. If a single queue is to be read with multiple selection criteria then the queue must be opened multiple times and a consumer function registered on each handle. |
However when we tried doing a sample code the desired result was not obtained. Each message consumer function wait for other message consumer function to complete. Instead it should start its process as soon as there is message on MQ (when other message consumer function goes to sleep state).
Below is the sample code i have written.
Code: |
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
/* includes for MQI */
#include <cmqc.h>
/* Constants */
#define MAX_QUEUES 10
/* Statics */
/********************************************************************/
/* FUNCTION: MessageConsumer */
/* PURPOSE : Callback function called when messages arrive */
/********************************************************************/
void MessageConsumer(MQHCONN hConn,
MQMD * pMsgDesc,
MQGMO * pGetMsgOpts,
MQBYTE * Buffer,
MQCBC * pContext)
{
MQLONG i,max;
MQLONG Length;
printf("From 0\n");
switch(pContext->CallType)
{
case MQCBCT_MSG_REMOVED:
case MQCBCT_MSG_NOT_REMOVED:
max = pGetMsgOpts -> ReturnedLength;
if (max > 200) max = 200;
for (i=0; i<max; i++)
{
if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
else fputc('.',stdout);
}
fputc('\n',stdout);
break;
}
sleep(10);
}
void MessageConsumer1(MQHCONN hConn,
MQMD * pMsgDesc,
MQGMO * pGetMsgOpts,
MQBYTE * Buffer,
MQCBC * pContext)
{
MQLONG i,max;
MQLONG Length;
printf("From 1\n");
switch(pContext->CallType)
{
case MQCBCT_MSG_REMOVED:
case MQCBCT_MSG_NOT_REMOVED:
max = pGetMsgOpts -> ReturnedLength;
if (max > 200) max = 200;
for (i=0; i<max; i++)
{
if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
else fputc('.',stdout);
}
fputc('\n',stdout);
sleep(10);
break;
}
}
void MessageConsumer2(MQHCONN hConn,
MQMD * pMsgDesc,
MQGMO * pGetMsgOpts,
MQBYTE * Buffer,
MQCBC * pContext)
{
MQLONG i,max;
MQLONG Length;
printf("From 2\n");
switch(pContext->CallType)
{
case MQCBCT_MSG_REMOVED:
case MQCBCT_MSG_NOT_REMOVED:
max = pGetMsgOpts -> ReturnedLength;
if (max > 200) max = 200;
for (i=0; i<max; i++)
{
if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
else fputc('.',stdout);
}
fputc('\n',stdout);
sleep(10);
break;
}
}
/********************************************************************/
/* FUNCTION: main */
/* PURPOSE : Main program entry point */
/********************************************************************/
int main(int argc, char **argv)
{
/* Declare MQI structures needed */
MQCNO cno = {MQCNO_DEFAULT}; /* Connect Options */
MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */
MQOD od1 = {MQOD_DEFAULT}; /* Object Descriptor */
MQOD od2 = {MQOD_DEFAULT}; /* Object Descriptor */
MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */
MQMD md1 = {MQMD_DEFAULT}; /* Message Descriptor */
MQMD md2 = {MQMD_DEFAULT}; /* Message Descriptor */
MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */
MQCBD cbd = {MQCBD_DEFAULT}; /* Callback Descriptor */
MQCBD cbd1 = {MQCBD_DEFAULT}; /* Callback Descriptor */
MQCBD cbd2 = {MQCBD_DEFAULT}; /* Callback Descriptor */
MQCTLO ctlo = {MQCTLO_DEFAULT}; /* Control Options */
/** note, sample uses defaults where it can **/
MQHCONN Hcon = MQHC_UNUSABLE_HCONN; /* connection handle */
MQHOBJ Hobj = MQHO_UNUSABLE_HOBJ; /* object handle */
MQHOBJ Hobj1 = MQHO_UNUSABLE_HOBJ; /* object handle */
MQHOBJ Hobj2 = MQHO_UNUSABLE_HOBJ; /* object handle */
MQLONG O_options; /* MQOPEN options */
MQLONG CompCode; /* completion code */
MQLONG OpenCode; /* MQOPEN completion code */
MQLONG Reason = 999; /* reason code */
MQLONG CReason; /* reason code for MQCONN */
char QMName[50] = ""; /* queue manager name */
MQCHAR48 Queues[MAX_QUEUES]; /* queue array */
int Qindex = 0;
char *pFlag,* pParm;
char *qname="Q1";
int setEnv = putenv("MQSERVER=MSHINDE.DEF.SVRCONN/TCP/clsdev1(30006)");
O_options = MQOO_INPUT_AS_Q_DEF /* open queue for input */
| MQOO_FAIL_IF_QUIESCING; /* but not if MQM stopping */
//| MQOO_INPUT_SHARED;
strcpy(QMName,"mshinde.MQM");
/****************************************************************/
/* */
/* Open the queue Manager */
/* */
/****************************************************************/
MQCONN(QMName, &Hcon, &CompCode, &CReason);
strcpy(od.ObjectName, "Q1");
strcpy(od1.ObjectName, "Q1");
strcpy(od2.ObjectName, "Q1");
/****************************************************************/
/* */
/* Open the queue */
/* */
/****************************************************************/
MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason);
if (OpenCode == MQCC_FAILED)
{
printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 0\n",
qname,Reason);
goto MOD_EXIT;
}
MQOPEN(Hcon, &od1, O_options, &Hobj1, &OpenCode, &Reason);
if (OpenCode == MQCC_FAILED)
{
printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 1\n",
qname,Reason);
goto MOD_EXIT;
}
MQOPEN(Hcon, &od2, O_options, &Hobj2, &OpenCode, &Reason);
if (OpenCode == MQCC_FAILED)
{
printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 2\n",
qname,Reason);
goto MOD_EXIT;
}
/****************************************************************/
/* */
/* Register a consumer */
/* */
/****************************************************************/
cbd.CallbackFunction = MessageConsumer;
gmo.Options = MQGMO_NO_SYNCPOINT;
MQCB(Hcon, MQOP_REGISTER, &cbd, Hobj, &md, &gmo, &CompCode, &Reason);
if (CompCode == MQCC_FAILED)
{
printf("MQCB ended with reason code %d for connection handle 0\n", Reason);
goto MOD_EXIT;
}
cbd1.CallbackFunction = MessageConsumer1;
MQCB(Hcon, MQOP_REGISTER, &cbd1, Hobj1, &md1, &gmo, &CompCode, &Reason);
if (CompCode == MQCC_FAILED)
{
printf("MQCB ended with reason code %d for connection handle 1\n", Reason);
goto MOD_EXIT;
}
cbd2.CallbackFunction = MessageConsumer2;
MQCB(Hcon, MQOP_REGISTER, &cbd2, Hobj2, &md2, &gmo, &CompCode, &Reason);
if (CompCode == MQCC_FAILED)
{
printf("MQCB ended with reason code %d for connection handle 2\n", Reason);
goto MOD_EXIT;
}
/******************************************************************/
/* */
/* Start consumption of messages */
/* */
/******************************************************************/
MQCTL(Hcon, MQOP_START, &ctlo, &CompCode, &Reason);
if (CompCode == MQCC_FAILED)
{
printf("MQCTL ended with reason code %d\n", Reason);
goto MOD_EXIT;
}
/******************************************************************/
/* */
/* Wait for the user to press enter */
/* */
/******************************************************************/
{
char Buffer[10];
printf("Press enter to end\n");
fgets(Buffer,sizeof(Buffer),stdin);
}
/******************************************************************/
/* */
/* Stop consumption of messages */
/* */
/******************************************************************/
MQCTL(Hcon, MQOP_STOP, &ctlo, &CompCode, &Reason);
if (CompCode == MQCC_FAILED)
{
printf("MQCTL ended with reason code %d\n", Reason);
goto MOD_EXIT;
}
MOD_EXIT:
/******************************************************************/
/* */
/* Disconnect from MQM if not already connected */
/* */
/******************************************************************/
if (Hcon != MQHC_UNUSABLE_HCONN)
{
if (CReason != MQRC_ALREADY_CONNECTED )
{
MQDISC(&Hcon, &CompCode, &Reason);
/* report reason, if any */
if (Reason != MQRC_NONE)
{
printf("MQDISC ended with reason code %d\n", Reason);
}
}
}
return((int)Reason);
}
|
PS. Is there any other way of having multiple MQGET for single Queue other than MQCB?
Thanks,
Mandar |
|
Back to top |
|
 |
Vitor |
Posted: Fri Sep 20, 2013 5:15 am Post subject: Re: Multiple MQGET on a single QUEUE |
|
|
 Grand High Poobah
Joined: 11 Nov 2005 Posts: 26093 Location: Texas, USA
|
mandy13 wrote: |
PS. Is there any other way of having multiple MQGET for single Queue other than MQCB? |
Yes - have multiple applications issue an MQGet. Really. If you start 100 applications, and each issue an MQOPEN and an MQGET (with a wait interval), then each will get a message all other things being equal.
There are any number of discussions in this forum about this as it's a very common pattern. Possible enhacements include but are not limited to using WMQ triggering to start the process when the first message arrives, using triggering to scale the number of applications processing the queue based on message traffic, etc, etc, etc. _________________ Honesty is the best policy.
Insanity is the best defence. |
|
Back to top |
|
 |
hughson |
Posted: Fri Sep 27, 2013 7:08 am Post subject: hConn |
|
|
 Padawan
Joined: 09 May 2013 Posts: 1959 Location: Bay of Plenty, New Zealand
|
The important thing to note here is that you have only got one single hConn. The hConn that was returned to you on your MQCONN is donated for use by the call back functions when you issue MQCTL(MQOP_START). This is why you can't make any other MQ API call successfully with that hConn until you have issued MQCTL(MQOP_STOP).
Your hConn is passed into the Call back function as the first parameter so that the call back function can also make MQ calls. Thus it is not available to be used by another call back until you return from the call back function to indicate you are finished with it.
If you want to be accessing MQ in different threads at the same time, each thread is going to need an hConn. You can do this with MQGET or with MQCB, up to you, it's the hConns and threads that are important at this point.
Cheers
Morag _________________ Morag Hughson @MoragHughson
IBM MQ Technical Education Specialist
Get your IBM MQ training here!
MQGem Software |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|