ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » General IBM MQ Support » Multiple MQGET on a single QUEUE

Post new topic  Reply to topic
 Multiple MQGET on a single QUEUE « View previous topic :: View next topic » 
Author Message
mandy13
PostPosted: Fri Sep 20, 2013 5:08 am    Post subject: Multiple MQGET on a single QUEUE Reply with quote

Apprentice

Joined: 17 Apr 2013
Posts: 28

Hi,

We need to implement multiple MQGET's for a single queue. As queue contains lots of messages and we need to store these messages in DB (messages to remain on MQ till DB write is successful)
For this we thought of using MQCB function, since we have implemented MQCB before, after reading the following from the IBM's website.

http://pic.dhe.ibm.com/infocenter/wmqv7/v7r5/index.jsp?topic=%2Fcom.ibm.mq.ref.dev.doc%2Fq101720_.htm


Quote:
Only one callback function can be registered against each object handle. If a single queue is to be read with multiple selection criteria then the queue must be opened multiple times and a consumer function registered on each handle.


However when we tried doing a sample code the desired result was not obtained. Each message consumer function wait for other message consumer function to complete. Instead it should start its process as soon as there is message on MQ (when other message consumer function goes to sleep state).

Below is the sample code i have written.
Code:
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <ctype.h>
                                                /* includes for MQI  */
 #include <cmqc.h>
                                                /* Constants         */
 #define MAX_QUEUES  10
                                                /* Statics           */
/********************************************************************/
 /* FUNCTION: MessageConsumer                                        */
 /* PURPOSE : Callback function called when messages arrive          */
 /********************************************************************/
 void MessageConsumer(MQHCONN   hConn,
                      MQMD    * pMsgDesc,
                      MQGMO   * pGetMsgOpts,
                      MQBYTE  * Buffer,
                      MQCBC   * pContext)
 {
   MQLONG i,max;
   MQLONG Length;
   printf("From 0\n");

   switch(pContext->CallType)
   {
     case MQCBCT_MSG_REMOVED:
     case MQCBCT_MSG_NOT_REMOVED:
          max = pGetMsgOpts -> ReturnedLength;
          if (max > 200) max = 200;
          for (i=0; i<max; i++)
          {
            if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
                               else fputc('.',stdout);
          }
          fputc('\n',stdout);

          break;
   }

   sleep(10);
 }

 void MessageConsumer1(MQHCONN   hConn,
                      MQMD    * pMsgDesc,
                      MQGMO   * pGetMsgOpts,
                      MQBYTE  * Buffer,
                      MQCBC   * pContext)
 {
   MQLONG i,max;
   MQLONG Length;
   printf("From 1\n");

   switch(pContext->CallType)
   {
     case MQCBCT_MSG_REMOVED:
     case MQCBCT_MSG_NOT_REMOVED:
          max = pGetMsgOpts -> ReturnedLength;
          if (max > 200) max = 200;
          for (i=0; i<max; i++)
          {
            if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
                               else fputc('.',stdout);
          }
          fputc('\n',stdout);

          sleep(10);
          break;
   }
 }

 void MessageConsumer2(MQHCONN   hConn,
                      MQMD    * pMsgDesc,
                      MQGMO   * pGetMsgOpts,
                      MQBYTE  * Buffer,
                      MQCBC   * pContext)
 {
   MQLONG i,max;
   MQLONG Length;
   printf("From 2\n");

   switch(pContext->CallType)
   {
     case MQCBCT_MSG_REMOVED:
     case MQCBCT_MSG_NOT_REMOVED:
          max = pGetMsgOpts -> ReturnedLength;
          if (max > 200) max = 200;
          for (i=0; i<max; i++)
          {
            if (isprint(Buffer[i])) fputc(Buffer[i],stdout);
                               else fputc('.',stdout);
          }
          fputc('\n',stdout);

          sleep(10);
          break;
   }
 }


 /********************************************************************/
 /* FUNCTION: main                                                   */
 /* PURPOSE : Main program entry point                               */
 /********************************************************************/
int main(int argc, char **argv)
{

    /*   Declare MQI structures needed                                */
    MQCNO   cno = {MQCNO_DEFAULT};   /* Connect Options               */
    MQOD     od = {MQOD_DEFAULT};    /* Object Descriptor             */
    MQOD    od1 = {MQOD_DEFAULT};    /* Object Descriptor             */
    MQOD    od2 = {MQOD_DEFAULT};    /* Object Descriptor             */
    MQMD     md = {MQMD_DEFAULT};    /* Message Descriptor            */
    MQMD    md1 = {MQMD_DEFAULT};    /* Message Descriptor            */
    MQMD    md2 = {MQMD_DEFAULT};    /* Message Descriptor            */
    MQGMO   gmo = {MQGMO_DEFAULT};   /* get message options           */
    MQCBD   cbd = {MQCBD_DEFAULT};   /* Callback Descriptor           */
    MQCBD  cbd1 = {MQCBD_DEFAULT};   /* Callback Descriptor           */
    MQCBD  cbd2 = {MQCBD_DEFAULT};   /* Callback Descriptor           */
    MQCTLO ctlo = {MQCTLO_DEFAULT};  /* Control Options               */
    /** note, sample uses defaults where it can **/

    MQHCONN  Hcon = MQHC_UNUSABLE_HCONN;       /* connection handle   */
    MQHOBJ   Hobj = MQHO_UNUSABLE_HOBJ;        /* object handle       */
    MQHOBJ   Hobj1 = MQHO_UNUSABLE_HOBJ;        /* object handle       */
    MQHOBJ   Hobj2 = MQHO_UNUSABLE_HOBJ;        /* object handle       */
    MQLONG   O_options;              /* MQOPEN options                */
    MQLONG   CompCode;               /* completion code               */
    MQLONG   OpenCode;               /* MQOPEN completion code        */
    MQLONG   Reason = 999;           /* reason code                   */
    MQLONG   CReason;                /* reason code for MQCONN        */
    char     QMName[50] = "";        /* queue manager name            */
    MQCHAR48 Queues[MAX_QUEUES];     /* queue array                   */
    int      Qindex = 0;
    char    *pFlag,* pParm;
    char    *qname="Q1";
    int setEnv = putenv("MQSERVER=MSHINDE.DEF.SVRCONN/TCP/clsdev1(30006)");


    O_options = MQOO_INPUT_AS_Q_DEF      /* open queue for input      */
        | MQOO_FAIL_IF_QUIESCING;  /* but not if MQM stopping   */
        //| MQOO_INPUT_SHARED;

    strcpy(QMName,"mshinde.MQM");
    /****************************************************************/
    /*                                                              */
    /*   Open the queue  Manager                                    */
    /*                                                              */
    /****************************************************************/

    MQCONN(QMName, &Hcon, &CompCode, &CReason);


    strcpy(od.ObjectName, "Q1");
    strcpy(od1.ObjectName, "Q1");
    strcpy(od2.ObjectName, "Q1");
    /****************************************************************/
    /*                                                              */
    /*   Open the queue                                             */
    /*                                                              */
    /****************************************************************/
    MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason);

    if (OpenCode == MQCC_FAILED)
    {
        printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 0\n",
                qname,Reason);
        goto MOD_EXIT;
    }

    MQOPEN(Hcon, &od1, O_options, &Hobj1, &OpenCode, &Reason);

    if (OpenCode == MQCC_FAILED)
    {
        printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 1\n",
                qname,Reason);
        goto MOD_EXIT;
    }

    MQOPEN(Hcon, &od2, O_options, &Hobj2, &OpenCode, &Reason);

    if (OpenCode == MQCC_FAILED)
    {
        printf("MQOPEN of '%.48s' ended with reason code %d for connection handle 2\n",
                qname,Reason);
        goto MOD_EXIT;
    }
    /****************************************************************/
    /*                                                              */
    /*   Register a consumer                                        */
    /*                                                              */
    /****************************************************************/
    cbd.CallbackFunction = MessageConsumer;
    gmo.Options = MQGMO_NO_SYNCPOINT;

    MQCB(Hcon, MQOP_REGISTER, &cbd, Hobj, &md, &gmo, &CompCode, &Reason);
    if (CompCode == MQCC_FAILED)
    {
        printf("MQCB ended with reason code %d for connection handle 0\n", Reason);
        goto MOD_EXIT;
    }

    cbd1.CallbackFunction = MessageConsumer1;

    MQCB(Hcon, MQOP_REGISTER, &cbd1, Hobj1, &md1, &gmo, &CompCode, &Reason);
    if (CompCode == MQCC_FAILED)
    {
        printf("MQCB ended with reason code %d for connection handle 1\n", Reason);
        goto MOD_EXIT;
    }

    cbd2.CallbackFunction = MessageConsumer2;

    MQCB(Hcon, MQOP_REGISTER, &cbd2, Hobj2, &md2, &gmo, &CompCode, &Reason);
    if (CompCode == MQCC_FAILED)
    {
        printf("MQCB ended with reason code %d for connection handle 2\n", Reason);
        goto MOD_EXIT;
    }
    /******************************************************************/
    /*                                                                */
    /*  Start consumption of messages                                 */
    /*                                                                */
    /******************************************************************/
    MQCTL(Hcon, MQOP_START, &ctlo, &CompCode, &Reason);
    if (CompCode == MQCC_FAILED)
    {
        printf("MQCTL ended with reason code %d\n", Reason);
        goto MOD_EXIT;
    }
    /******************************************************************/
    /*                                                                */
    /*  Wait for the user to press enter                              */
    /*                                                                */
    /******************************************************************/
    {
        char Buffer[10];
        printf("Press enter to end\n");
        fgets(Buffer,sizeof(Buffer),stdin);
    }
    /******************************************************************/
    /*                                                                */
    /*  Stop consumption of messages                                  */
    /*                                                                */
    /******************************************************************/
    MQCTL(Hcon, MQOP_STOP, &ctlo, &CompCode, &Reason);
    if (CompCode == MQCC_FAILED)
    {
        printf("MQCTL ended with reason code %d\n", Reason);
        goto MOD_EXIT;
    }

MOD_EXIT:
    /******************************************************************/
    /*                                                                */
    /*   Disconnect from MQM if not already connected                 */
    /*                                                                */
    /******************************************************************/
    if (Hcon != MQHC_UNUSABLE_HCONN)
    {
        if (CReason != MQRC_ALREADY_CONNECTED )
        {
            MQDISC(&Hcon, &CompCode, &Reason);

            /* report reason, if any     */
            if (Reason != MQRC_NONE)
            {
                printf("MQDISC ended with reason code %d\n", Reason);
            }
        }
    }
    return((int)Reason);
}


PS. Is there any other way of having multiple MQGET for single Queue other than MQCB?

Thanks,
Mandar
Back to top
View user's profile Send private message
Vitor
PostPosted: Fri Sep 20, 2013 5:15 am    Post subject: Re: Multiple MQGET on a single QUEUE Reply with quote

Grand High Poobah

Joined: 11 Nov 2005
Posts: 26093
Location: Texas, USA

mandy13 wrote:
PS. Is there any other way of having multiple MQGET for single Queue other than MQCB?


Yes - have multiple applications issue an MQGet. Really. If you start 100 applications, and each issue an MQOPEN and an MQGET (with a wait interval), then each will get a message all other things being equal.

There are any number of discussions in this forum about this as it's a very common pattern. Possible enhacements include but are not limited to using WMQ triggering to start the process when the first message arrives, using triggering to scale the number of applications processing the queue based on message traffic, etc, etc, etc.
_________________
Honesty is the best policy.
Insanity is the best defence.
Back to top
View user's profile Send private message
hughson
PostPosted: Fri Sep 27, 2013 7:08 am    Post subject: hConn Reply with quote

Padawan

Joined: 09 May 2013
Posts: 1959
Location: Bay of Plenty, New Zealand

The important thing to note here is that you have only got one single hConn. The hConn that was returned to you on your MQCONN is donated for use by the call back functions when you issue MQCTL(MQOP_START). This is why you can't make any other MQ API call successfully with that hConn until you have issued MQCTL(MQOP_STOP).

Your hConn is passed into the Call back function as the first parameter so that the call back function can also make MQ calls. Thus it is not available to be used by another call back until you return from the call back function to indicate you are finished with it.

If you want to be accessing MQ in different threads at the same time, each thread is going to need an hConn. You can do this with MQGET or with MQCB, up to you, it's the hConns and threads that are important at this point.

Cheers
Morag
_________________
Morag Hughson @MoragHughson
IBM MQ Technical Education Specialist
Get your IBM MQ training here!
MQGem Software
Back to top
View user's profile Send private message Visit poster's website
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » General IBM MQ Support » Multiple MQGET on a single QUEUE
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.