|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
ImqQueueManager shared between threads |
« View previous topic :: View next topic » |
Author |
Message
|
__guardian__ |
Posted: Thu Jan 02, 2014 5:05 am Post subject: |
|
|
Newbie
Joined: 02 Jan 2014 Posts: 1
|
I am trying to use an MQ connection that is shared between threads.
I've read the doc about MQCONNX.
However the code I inherited is not using MQCONN to connect but an "imqqueuemanager" is there a way to modify the following so that I can call the send function from another thread that the one performing the connect ?
My code currently looks like this:
MQConnection.h
Code: |
#ifndef __MEMFEEDER_MQCONNECTION_H_INCLUDED__
#define __MEMFEEDER_MQCONNECTION_H_INCLUDED__
#include <imqi.hpp>
#include <string>
#include <iostream>
#include <vector>
// --------------------------------------------------------------------------
enum MQAccessType
{
MQ_read,
MQ_write,
MQ_readWrite
};
struct MQConfiguration
{
std::string qmHostname;
std::string qmPort;
std::string qmName;
std::string qName;
std::string qChannel;
};
/**
* @class MQConnection
* @brief Allow a simple connection to MQ to send execution
*/
class MQConnection
{
private:
// Queue Manager Name
std::string qmHostname_;
// Queue Manager Port
std::string qmPort_;
// Queue Manager Name
std::string qmName_;
// Queue Name
std::string qName_;
// Queue Channel
std::string qChannel_;
MQConnection(const MQConnection&);
MQConnection& operator = (const MQConnection&);
public:
// The Tiret Feeder Queue
ImqQueue queue_;
MQConnection(const MQConfiguration& config);
virtual ~MQConnection() {}
bool send(const std::string&);
bool read(ImqMessage* msg,char* buffer );
bool read( std::string& str );
bool connect(const MQAccessType accessType, std::string& error );
void disconnect();
virtual bool put(ImqMessage& msg);
};
#endif
|
MQConnection.cpp
Code: |
#include "MQConnection.h"
#include <memory>
#include <sstream>
using namespace std;
// ---------------------------------------------------------------------------
MQConnection::MQConnection(const MQConfiguration& config)
{
// Connexion on the queue manager
this->qmHostname_ = config.qmHostname;
this->qmPort_ = config.qmPort;
this->qmName_ = config.qmName;
this->qName_ = config.qName;
this->qChannel_ = config.qChannel;
string connnectionName = this->qmHostname_ + "(" + this->qmPort_ + ")";
}
// ---------------------------------------------------------------------------
bool MQConnection::connect(const MQAccessType accessType, std::string& error)
{
// Le Queue Manager
ImqQueueManager* mgr =new ImqQueueManager(this->qmName_.c_str());
// Définition pour le channel
ImqChannel* channel = new ImqChannel();
channel->setHeartBeatInterval(1);
channel->setTransportType(MQXPT_TCP);
channel->setChannelName(this->qChannel_.c_str());
channel->setConnectionName((this->qmHostname_ + "(" + this->qmPort_ + ")").c_str());
// Mise en place du channel
mgr->setChannelReference(channel);
if (!mgr->connect())
{
/* stop if it failed */
printf( "ImqQueueManager::connect ended with reason code %d\n",
(int)mgr->reasonCode( ) );
std::stringstream ss;
ss << "ImqQueueManager::connect ended with reason code " << (int)mgr->reasonCode();
error = ss.str();
mgr->setChannelReference();
delete channel;
return false;
}
// Connection à la queue
this->queue_.setConnectionReference(mgr);
this->queue_.setName(this->qName_.c_str());
if(accessType==MQ_read)
{
this->queue_.setOpenOptions(MQOO_FAIL_IF_QUIESCING +MQOO_INPUT_SHARED );
}
else if( accessType==MQ_write )
{
this->queue_.setOpenOptions(MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING );
}
else if( accessType==MQ_readWrite )
{
this->queue_.setOpenOptions(MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING +MQOO_INPUT_SHARED );
}
else
{
std::cout << "no accesstype setup " << std::endl;
}
this->queue_.open();
if (this->queue_.reasonCode())
{
std::stringstream ss;
ss << "queue::open ended with reason code " << (int)this->queue_.reasonCode();
error = ss.str();
printf( "queue::open ended with reason code %d\n",
(int)this->queue_.reasonCode() );
return false;
}
if (this->queue_.completionCode() == MQCC_FAILED)
{
return false;
}
return true;
}
void MQConnection::disconnect()
{
delete this->queue_.connectionReference()->channelReference();
delete this->queue_.connectionReference();
}
// ---------------------------------------------------------------------------
bool MQConnection::send(const std::string& message)
{
ImqMessage msg;
msg.useEmptyBuffer(message.c_str(), message.size());
msg.setFormat(MQFMT_STRING);
msg.setMessageLength(message.size());
if (!put(msg))
{
if (this->queue_.reasonCode())
{
// std::stringstream ss;
// ss << "queue::send ended with reason code " << (int)this->queue_.reasonCode();
// error = ss.str();
printf( "queue::send ended with reason code %d\n",
(int)this->queue_.reasonCode() );
return false;
}
return false;
}
return true;
}
// ---------------------------------------------------------------------------
bool MQConnection::put(ImqMessage& msg)
{
return this->queue_.put(msg);
}
bool MQConnection::read(ImqMessage* msg, char* buffer )
{
memset(buffer,0,65537);
msg->useEmptyBuffer(buffer,65537 );
//msg->setFormat(MQFMT_RF_HEADER_2);
msg->setFormat(MQFMT_STRING);
bool ok = this->queue_.get(*msg);
if(!ok)
{
std::cout << this->queue_.reasonCode() << std::endl;
}
else
{
buffer[ msg->dataLength( ) ] = 0 ; // add terminator
}
return ok;
}
bool MQConnection::read( std::string& str )
{
ImqMessage msg;
char buffer[65536];
buffer[0]='\0';
msg.useEmptyBuffer(buffer,sizeof( buffer ) - 1 );
msg.setFormat(MQFMT_STRING);
bool ok = this->queue_.get(msg);
if(!ok)
{
std::cout << this->queue_.reasonCode() << std::endl;
}
else
{
buffer[ msg.dataLength( ) ] = 0 ; // add terminator
str = buffer;
}
return ok;
}
|
|
|
Back to top |
|
 |
PaulClarke |
Posted: Thu Jan 02, 2014 7:07 am Post subject: |
|
|
 Grand Master
Joined: 17 Nov 2005 Posts: 1002 Location: New Zealand
|
Frankly I have no idea but I would have thought your best bet was to issue something like
mgr->setConnectOptions( MQCNO_HANDLE_SHARE_BLOCK );
before you issue the connect().
The bottom line is that you need a shared connection handle if you are going to use an HCONN on multiple threads. Whether C++ Supports this or exactly how you get it to work I don't know but in C you would use the option above on an MQCONNX verb.
Hope this helps,
Paul. _________________ Paul Clarke
MQGem Software
www.mqgem.com |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|