ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » IBM MQ API Support » ImqQueueManager shared between threads

Post new topic  Reply to topic
 ImqQueueManager shared between threads « View previous topic :: View next topic » 
Author Message
__guardian__
PostPosted: Thu Jan 02, 2014 5:05 am    Post subject: Reply with quote

Newbie

Joined: 02 Jan 2014
Posts: 1

I am trying to use an MQ connection that is shared between threads.
I've read the doc about MQCONNX.

However the code I inherited is not using MQCONN to connect but an "imqqueuemanager" is there a way to modify the following so that I can call the send function from another thread that the one performing the connect ?

My code currently looks like this:

MQConnection.h
Code:

#ifndef __MEMFEEDER_MQCONNECTION_H_INCLUDED__
#define __MEMFEEDER_MQCONNECTION_H_INCLUDED__

#include <imqi.hpp>

#include <string>
#include <iostream>
#include <vector>
// --------------------------------------------------------------------------

enum MQAccessType
{
    MQ_read,
    MQ_write,
    MQ_readWrite
};

struct MQConfiguration
{
    std::string qmHostname;
    std::string qmPort;
    std::string qmName;
    std::string qName;
    std::string qChannel;
};

/**
* @class MQConnection
* @brief Allow a simple connection to MQ to send execution
*/
class  MQConnection
{
private:
   // Queue Manager Name
   std::string qmHostname_;
   // Queue Manager Port
   std::string qmPort_;
   // Queue Manager Name
   std::string qmName_;
   // Queue Name
   std::string qName_;
   // Queue Channel
   std::string qChannel_;

   MQConnection(const MQConnection&);
   MQConnection& operator = (const MQConnection&);

public:
   // The Tiret Feeder Queue
   ImqQueue queue_;

   MQConnection(const MQConfiguration& config);
   virtual ~MQConnection() {}

   bool send(const std::string&);
   bool read(ImqMessage* msg,char* buffer );
   bool read( std::string& str );

   bool connect(const MQAccessType accessType, std::string& error );
   void disconnect();
   virtual bool put(ImqMessage& msg);
};

#endif



MQConnection.cpp
Code:
 
#include "MQConnection.h"
#include <memory>
#include <sstream>

using namespace std;

// ---------------------------------------------------------------------------
MQConnection::MQConnection(const MQConfiguration& config)
{
   // Connexion on the queue manager
   this->qmHostname_   = config.qmHostname;
   this->qmPort_       = config.qmPort;
   this->qmName_       = config.qmName;
   this->qName_        = config.qName;
   this->qChannel_     = config.qChannel;

   string connnectionName = this->qmHostname_ + "(" + this->qmPort_ + ")";
}

// ---------------------------------------------------------------------------
bool MQConnection::connect(const MQAccessType accessType, std::string& error)
{

   // Le Queue Manager
   ImqQueueManager* mgr =new ImqQueueManager(this->qmName_.c_str());

   // Définition pour le channel
   ImqChannel* channel = new ImqChannel();
   channel->setHeartBeatInterval(1);
   channel->setTransportType(MQXPT_TCP);
   channel->setChannelName(this->qChannel_.c_str());
   channel->setConnectionName((this->qmHostname_ + "(" + this->qmPort_ + ")").c_str());

   // Mise en place du channel
   mgr->setChannelReference(channel);
   if (!mgr->connect())
   {
       /* stop if it failed */
       printf( "ImqQueueManager::connect ended with reason code %d\n",
               (int)mgr->reasonCode( ) );

       std::stringstream ss;
       ss << "ImqQueueManager::connect ended with reason code " << (int)mgr->reasonCode();
       error = ss.str();

       mgr->setChannelReference();
       delete channel;
       return false;
   }

   // Connection à la queue
   this->queue_.setConnectionReference(mgr);
   this->queue_.setName(this->qName_.c_str());
   if(accessType==MQ_read)
   {
       this->queue_.setOpenOptions(MQOO_FAIL_IF_QUIESCING +MQOO_INPUT_SHARED );
   }
   else if( accessType==MQ_write )
   {
       this->queue_.setOpenOptions(MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING );
   }
   else if( accessType==MQ_readWrite )
   {
       this->queue_.setOpenOptions(MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING +MQOO_INPUT_SHARED );
   }
   else
   {
       std::cout << "no accesstype setup " << std::endl;
   }
   this->queue_.open();
   if (this->queue_.reasonCode())
   {
       std::stringstream ss;
       ss << "queue::open ended with reason code " << (int)this->queue_.reasonCode();
       error = ss.str();

       printf( "queue::open ended with reason code %d\n",
               (int)this->queue_.reasonCode() );
       return false;
   }
   if (this->queue_.completionCode() == MQCC_FAILED)
   {
      return false;
   }

   return true;
}

void MQConnection::disconnect()
{
    delete this->queue_.connectionReference()->channelReference();
    delete this->queue_.connectionReference();
}

// ---------------------------------------------------------------------------
bool MQConnection::send(const std::string& message)
{
   ImqMessage msg;
   msg.useEmptyBuffer(message.c_str(), message.size());
   msg.setFormat(MQFMT_STRING);

   msg.setMessageLength(message.size());

   if (!put(msg))
   {
       if (this->queue_.reasonCode())
       {
//           std::stringstream ss;
//           ss << "queue::send ended with reason code " << (int)this->queue_.reasonCode();
//           error = ss.str();

           printf( "queue::send ended with reason code %d\n",
                   (int)this->queue_.reasonCode() );
           return false;
       }
      return false;
   }
   return true;
}

// ---------------------------------------------------------------------------
bool MQConnection::put(ImqMessage& msg)
{
   return this->queue_.put(msg);
}
bool MQConnection::read(ImqMessage*  msg, char* buffer  )
{
    memset(buffer,0,65537);
    msg->useEmptyBuffer(buffer,65537 );
    //msg->setFormat(MQFMT_RF_HEADER_2);
msg->setFormat(MQFMT_STRING);
    bool ok = this->queue_.get(*msg);
    if(!ok)
    {
        std::cout << this->queue_.reasonCode() << std::endl;
    }
    else
    {
        buffer[ msg->dataLength( ) ] = 0 ;  // add terminator
    }

    return ok;
}
bool MQConnection::read( std::string& str )
{
    ImqMessage msg;
    char buffer[65536];
    buffer[0]='\0';

    msg.useEmptyBuffer(buffer,sizeof( buffer ) - 1 );
    msg.setFormat(MQFMT_STRING);

    bool ok = this->queue_.get(msg);
    if(!ok)
    {
        std::cout << this->queue_.reasonCode() << std::endl;
    }
    else
    {
        buffer[ msg.dataLength( ) ] = 0 ;  // add terminator
        str = buffer;
    }

    return ok;
}

Back to top
View user's profile Send private message
PaulClarke
PostPosted: Thu Jan 02, 2014 7:07 am    Post subject: Reply with quote

Grand Master

Joined: 17 Nov 2005
Posts: 1002
Location: New Zealand

Frankly I have no idea but I would have thought your best bet was to issue something like

mgr->setConnectOptions( MQCNO_HANDLE_SHARE_BLOCK );

before you issue the connect().

The bottom line is that you need a shared connection handle if you are going to use an HCONN on multiple threads. Whether C++ Supports this or exactly how you get it to work I don't know but in C you would use the option above on an MQCONNX verb.

Hope this helps,

Paul.
_________________
Paul Clarke
MQGem Software
www.mqgem.com
Back to top
View user's profile Send private message Visit poster's website
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » IBM MQ API Support » ImqQueueManager shared between threads
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.