|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Proxy Registration(subscription) |
« View previous topic :: View next topic » |
Author |
Message
|
gsaluja |
Posted: Mon Feb 27, 2006 8:23 am Post subject: Proxy Registration(subscription) |
|
|
Novice
Joined: 06 Dec 2005 Posts: 21 Location: Houston
|
Need to have a unix process do the registration(part of the subscription) on behalf of desktop clients. Publisher is running as a unix process. Is this possible, and if yes how? Code -- in C++ for both unix and desktop client.
In this application, once teh registration is done by the unix process, the corresponding desktop client can start to get the messages from the subscription queue. When Desktop client logs off, the unix process will deRegister on behalf of this client. |
|
Back to top |
|
 |
jefflowrey |
Posted: Mon Feb 27, 2006 8:28 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
You can use the XMS support pack to create a subscription registration message.
Deregistration depends a bit more on which Broker you are running. (But the XMS support pack is still what you want to use)
The desktop client doesn't have to know anything about Pub/Sub itself (unless it needs to deregister). It can simply use the subscription queue like any other MQ application would.
But it makes a lot more sense for the desktop application to register the subscription itself at startup and de-register at shutdown. _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
gsaluja |
Posted: Mon Feb 27, 2006 8:41 am Post subject: |
|
|
Novice
Joined: 06 Dec 2005 Posts: 21 Location: Houston
|
Let me take a step back. Our application need is as follows: desktop clients have frontend which need to recieve data from the publisher running on Solaris. We are doing the implementation of publish/subscribe. Now when the DeskTop client(VisualC++ app) runs as a subscriber -- does registration, gets from the subscriberQ, after completion on exit does deRegistraiton. Depending on the way DeskTop client exits, it is able to do the deRegistration or not. When client app is restarted, it starts to have problem in gettign messages if the app was not closed cleanly. What we are thinking this problem cud be due to not able to deRegister properly. And if we move the registration/deregistration out of the deskop Client app, it may resolve this problem. Both unix process and desktop client will be using the same broker/qmanager. Wud you think chosing proxy registration/deRegistraion process eliminate our problem? or is there any better way you recommend. |
|
Back to top |
|
 |
jefflowrey |
Posted: Mon Feb 27, 2006 8:53 am Post subject: |
|
|
Grand Poobah
Joined: 16 Oct 2002 Posts: 19981
|
What broker are you running?
What options are being specified when the subscription is registered? _________________ I am *not* the model of the modern major general. |
|
Back to top |
|
 |
gsaluja |
Posted: Mon Feb 27, 2006 9:24 am Post subject: |
|
|
Novice
Joined: 06 Dec 2005 Posts: 21 Location: Houston
|
$ dspmqbrk
MQSeries Publish/Subscribe broker for queue manager NYUSCSW22 running.
$ mqver
Name: WebSphere MQ
Version: 530
CMVC level: p000-L020617
BuildType: IKAP - (Production)
-------------------------------------------------------
Code that does the connection, registration.......
/*
MQManager.cpp: Implementation of the MQManager Class.
This class handles ALL MQ Client Communication for CSW GUI. It publishses an MQ Subscriber
that listens for BCS XML Messages to be broadcast to CSW GUI.
Author: Daniel Acquah
Date: 12.15.05
Revision:
*/
#include "stdafx.h"
#include "suballocgui.h"
#include "MQManager.h"
#include "MQErrorMessages.h"
#include "XMLManager.h"
#include "GUIClient.h"
#include "AVPLog.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
// MQ Includes
#define MAX_STRING_SIZE 100 /* maximum string length */
#define MAX_TOPIC_LENGTH 256 /* maximum topic length */
#define MAX_MSG_LENGTH 5000 /* maximum message size */
#define MAX_WAIT_TIME 120000 /* period of inactivity */
#define MAX_RESPONSE_TIME 6000 /* maximum response wait */
#define STREAM "SYSTEM.BROKER.DEFAULT.STREAM"
MQRFH m_rfDefaultMQRFH = { MQRFH_DEFAULT };
MQMD m_mdDefaultMQMD = { MQMD_DEFAULT };
MQGMO m_gmoMQDequeue = { MQGMO_DEFAULT };
MQMD m_mdMQDequeue = { MQMD_DEFAULT };
MQMD m_mdSubscribeMQ = { MQMD_DEFAULT };
static const char* const m_strModelQueueName = "SUBSCRIBER_QUEUE";
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Constructor
CMQManager::CMQManager()
{
// Initialize Member Variables.
m_bConnected = FALSE;
m_bSubscriberRegistered = FALSE;
m_objConnection = MQHC_UNUSABLE_HCONN;
m_objBroker = MQHO_UNUSABLE_HOBJ;
m_objSubscriber = MQHO_UNUSABLE_HOBJ;
m_pCompCode = MQCC_OK; // MQCC_OK; Assume OK for now. Innnocent until proven guilty
m_pReasonCode = MQRC_NO_MSG_AVAILABLE;
m_btMessageID = NULL;
m_iConnectionErrorCount = 0;
// Read MQ Manager + Queue Name Values from INI File
m_strMQManager = "";
m_strSubscriber = "";
m_strTopic = "";
// Allocate Storage for Dequing BCS XML
m_lMessageLength = MAX_MSG_LENGTH;
m_pDequeueMessageBlock = (PMQBYTE)malloc(m_lMessageLength);
m_pDequeMQRFHeader = NULL;
m_bRestartCSWGUI = FALSE;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Destructor
CMQManager::~CMQManager()
{
// Disconnect MQ
Disconnect();
// Clean Up
if(m_pDequeueMessageBlock) free(m_pDequeueMessageBlock);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Connecting To MQ
BOOLEAN CMQManager::Connect()
{
// Local Variables
MQLONG lMQOptions;
MQOD odMQ = { MQOD_DEFAULT };
// Already Connected ?
if(m_bConnected) return TRUE;
// Read MQ Manager + Queue Name Values from INI File
CWinApp* m_pApp = AfxGetApp();
m_strMQManager = m_pApp->GetProfileString("PARAMETERS", "MQ_MANAGER_NAME", "");
m_strSubscriber = m_pApp->GetProfileString("PARAMETERS", "MQ_SUBSCRIBER_NAME", "");
m_strTopic = m_pApp->GetProfileString("PARAMETERS", "MQ_TOPIC_NAME", "");
// Do we have all relevant info to connect ?
if(!ValidateConnectionInfo())
{
m_iConnectionErrorCount++;
LogAndReport("WARNING !!! Please STOP CSW GUI and contact CSW Support Set MQ Variables in INI File.");
return FALSE;
}
// Connect to MQ Manager
MQCONN((char *)m_strMQManager.GetBuffer(m_strMQManager.GetLength()), &m_objConnection, &m_pCompCode, &m_pReasonCode);
if(m_pReasonCode == MQRC_ALREADY_CONNECTED) m_pCompCode = MQCC_OK;
if(m_pCompCode == MQCC_OK)
{
// Connect to Broker
strncpy(odMQ.ObjectName, "SYSTEM.BROKER.CONTROL.QUEUE", (size_t)MQ_Q_NAME_LENGTH);
lMQOptions = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQOPEN(m_objConnection, &odMQ, lMQOptions, &m_objBroker, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode != MQCC_OK)
{
m_iConnectionErrorCount++;
m_strError.Format("Connect(): MQOPEN Broker Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("WARNING !!! Unable to Connect to MQ Managaer. Please STOP CSW GUI and contact CSW Support.");
return FALSE;
}
// Get Subscriber Handle
strncpy(odMQ.ObjectName, m_strModelQueueName, (size_t)MQ_Q_NAME_LENGTH);
strcpy(odMQ.DynamicQName, m_strSubscriber);
lMQOptions = MQOO_INPUT_EXCLUSIVE + MQOO_FAIL_IF_QUIESCING;
MQOPEN(m_objConnection, &odMQ, lMQOptions, &m_objSubscriber, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode != MQCC_OK )
{
m_iConnectionErrorCount++;
m_strError.Format("Connect(): MQOPEN Subscriber Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("WARNING !!! Unable to Connect to MQ Managaer. Please STOP CSW GUI and contact CSW Support.");
return FALSE;
}
}
else
{
m_iConnectionErrorCount++;
m_strError.Format("Connect(): MQCONN Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("WARNING !!! Unable to Connect to MQ Managaer. Please STOP CSW GUI and contact CSW Support.");
}
// As of Here, we are Connected.
m_bConnected = (m_pCompCode == MQCC_OK);
return m_bConnected;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void CMQManager::Disconnect()
{
// Locals
CString strError;
// Why bother if we already tried to disconnect ?
if(!m_bConnected && !m_bSubscriberRegistered) return;
// First De-Register Susbciber.
if(!SendBrokerCommand(MQPS_DEREGISTER_SUBSCRIBER))
{
m_strError = "Disconnect(): Unable to De-register Subscriber";
LogAndReport("");
}
// Check Deregistration Repsonse
if(!CheckResponse())
{
m_strError = "Disconnect(): Unable to De-register Subscriber. No Response";
LogAndReport("");
}
// Disconnect Broker
if(m_objBroker != MQHO_UNUSABLE_HOBJ)
{
MQCLOSE(m_objConnection, &m_objBroker, MQCO_NONE, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode != MQCC_OK )
{
m_strError.Format("Disconnect(): MQCLOSE Broker Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("");
}
}
// Disconnect Subscriber
if(m_objSubscriber != MQHO_UNUSABLE_HOBJ)
{
MQCLOSE(m_objConnection, &m_objSubscriber, MQCO_DELETE_PURGE, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode != MQCC_OK)
{
m_strError.Format("Disconnect(): MQCLOSE Subscriber Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("");
}
}
// Disconnect MQManager
if((m_objConnection != MQHC_UNUSABLE_HCONN) && (m_pReasonCode != MQRC_ALREADY_CONNECTED))
{
MQDISC(&m_objConnection, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode != MQCC_OK)
{
m_strError.Format("Disconnect(): MQDISC Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("");
}
}
m_bConnected = FALSE;
m_bSubscriberRegistered = FALSE;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Send Register Broker Command and Check Response
BOOLEAN CMQManager::RegisterSubscriber()
{
// Local Variables
BOOLEAN bRetVal = FALSE;
// Already Subscribed ?
if(m_bSubscriberRegistered) return TRUE;
// Send Register Command
if(!SendBrokerCommand(MQPS_REGISTER_SUBSCRIBER))
{
m_iConnectionErrorCount++;
LogAndReport("WARNING !!! Unable to Connect to MQ Managaer. Please STOP CSW GUI and contact CSW Support.");
return FALSE;
}
// Check Repsonse
if(!CheckResponse())
{
m_iConnectionErrorCount++;
LogAndReport("WARNING !!! Unable to Connect to MQ Managaer. Please STOP CSW GUI and contact CSW Support.");
return FALSE;
}
m_bSubscriberRegistered = TRUE;
return m_bSubscriberRegistered;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BOOLEAN CMQManager::SendBrokerCommand(char strCommand[])
{
// Local Variables
PMQBYTE pMessageBlock = NULL;
MQLONG lMessageLength = MAX_MSG_LENGTH;
PMQRFH pMQRFHeader;
PMQCHAR pStrNameValueString;
MQLONG lNameValueStringLength;
// MQMD mdMQ = { MQMD_DEFAULT };
MQPMO pmoMQ = { MQPMO_DEFAULT };
BOOLEAN bRetVal = TRUE;
// Allocate Storage
pMessageBlock = (PMQBYTE)malloc(lMessageLength);
if(pMessageBlock == NULL)
{
m_pCompCode = MQCC_FAILED;
m_strError = "Unable to allocate storage for subscriber registration";
return FALSE;
}
memset(pMessageBlock, 0, MAX_MSG_LENGTH);
pMQRFHeader = (PMQRFH)(pMessageBlock);
memcpy(pMQRFHeader, &m_rfDefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED);
// Set Attributes
pStrNameValueString = ((MQCHAR *)pMQRFHeader) + MQRFH_STRUC_LENGTH_FIXED;
strcpy(pStrNameValueString, MQPS_COMMAND_B);
strcat(pStrNameValueString, strCommand);
strcat(pStrNameValueString, MQPS_STREAM_NAME_B);
strcat(pStrNameValueString, STREAM);
strcat(pStrNameValueString, MQPS_Q_NAME_B);
strcat(pStrNameValueString, m_strSubscriber);
strcat(pStrNameValueString, MQPS_TOPIC_B);
strcat(pStrNameValueString, m_strTopic);
lNameValueStringLength = strlen(pStrNameValueString) + 1;
lMessageLength = MQRFH_STRUC_LENGTH_FIXED + ((lNameValueStringLength + 15) / 16) * 16;
// Set Options
pMQRFHeader->StrucLength = lMessageLength;
memcpy(m_mdSubscribeMQ.Format, MQFMT_RF_HEADER, MQ_FORMAT_LENGTH);
m_mdSubscribeMQ.MsgType = MQMT_REQUEST;
strcpy(m_mdSubscribeMQ.ReplyToQ, m_strSubscriber);
m_mdSubscribeMQ.Persistence = MQPER_NOT_PERSISTENT;
pmoMQ.Options |= MQPMO_NEW_MSG_ID;
// Send Registration request via MQ
MQPUT(m_objConnection, m_objBroker, &m_mdSubscribeMQ, &pmoMQ, lMessageLength, pMessageBlock, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode == MQCC_OK) bRetVal = TRUE;
else
{
m_strError.Format("SendBrokerCommand(): MQPUT Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
bRetVal = FALSE;
}
free(pMessageBlock);
return bRetVal;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BOOLEAN CMQManager::CheckResponse()
{
// Local Variables
MQGMO gmoMQ = { MQGMO_DEFAULT };
MQMD mdMQ = { MQMD_DEFAULT };
PMQBYTE pMessageBlock = NULL;
MQLONG lMessageLength = MAX_MSG_LENGTH;
BOOLEAN bRetVal = TRUE;
// CString strQueueName = "CSW." + m_objSubscriber;
// Allocate Memmory
pMessageBlock = (PMQBYTE)malloc(lMessageLength);
if(pMessageBlock == NULL)
{
m_pCompCode = MQCC_FAILED;
m_strError = "Unable to allocate storage for subscriber registration response";
return FALSE;
}
else if(m_mdSubscribeMQ.MsgId == NULL)
{
m_pCompCode = MQCC_FAILED;
m_strError = "Invalid response Id for subscriber registration response";
free(pMessageBlock);
return FALSE;
}
// Set Options
memcpy(mdMQ.CorrelId, m_mdSubscribeMQ.MsgId, sizeof(MQBYTE24));
gmoMQ.Version = MQGMO_VERSION_2;
gmoMQ.MatchOptions = MQMO_MATCH_CORREL_ID;
gmoMQ.Options = MQGMO_WAIT + MQGMO_CONVERT;
gmoMQ.WaitInterval = MAX_RESPONSE_TIME;
// Check for Response in MQ
MQGET(m_objConnection, m_objSubscriber, &mdMQ, &gmoMQ, lMessageLength, pMessageBlock, &lMessageLength, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode == MQCC_OK) bRetVal = TRUE;
else
{
m_strError.Format("SendBrokerCommand(): MQGET Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
bRetVal = FALSE;
}
free(pMessageBlock);
return bRetVal;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BOOLEAN CMQManager::Connected(){return m_bConnected;}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BOOLEAN CMQManager::SubscriberRegisted(){return m_bSubscriberRegistered;}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
LPCTSTR CMQManager::GetReason(){return MQErrMessages::GetMqErrorMessage(m_pReasonCode);}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void CMQManager::LogAndReport(CString strUserInfo)
{
// Log any errors in "C:\\temp\\CSW_GUI_LOG\\cswgui_log.txt"
// Also Notify user where appropriate
CAVPLog::WriteToServerOutFile(1, m_strError + "\n");
if(m_iConnectionErrorCount == 1 && strUserInfo != "") AfxMessageBox(strUserInfo);
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
BOOLEAN CMQManager::ValidateConnectionInfo()
{
// Make sure INI File has Connection variables set
if(m_strTopic == ""){m_strError = "Invalid MQ Topic. Please set MQ_TOPIC_NAME variable in INI file"; return FALSE;}
if(m_strSubscriber == ""){m_strError = "Invalid MQ Subcriber Name. Please set MQ_SUBSCRIBER_NAME variable in INI file"; return FALSE;}
if(m_strMQManager == ""){m_strError = "Invalid MQ Manager Name. Please set MQ_MANAGER_NAME variable in INI file"; return FALSE;}
return TRUE;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Must be called in a Loop
char* CMQManager::DequeueBCSXMLMessage()
{
// Local Variables
BOOLEAN bRetVal = TRUE;
char *pBCSXMLData;
// Allocate Memory
if(m_pDequeueMessageBlock == NULL) m_pDequeueMessageBlock = (PMQBYTE)malloc(m_lMessageLength);
if(m_pDequeueMessageBlock == NULL)
{
m_strError = "Unable to allocate storage for dequing BCS XML";
LogAndReport("");
return "";
}
// Set Options
m_gmoMQDequeue.Options = MQGMO_NO_WAIT + MQGMO_CONVERT;
memcpy(&m_mdMQDequeue, &m_mdDefaultMQMD, sizeof(MQMD));
m_mdMQDequeue.Persistence = MQPER_NOT_PERSISTENT;
// Dequeue a SINGLE BCS XML. Call this function in Loop
MQGET(m_objConnection, m_objSubscriber, &m_mdMQDequeue, &m_gmoMQDequeue, MAX_MSG_LENGTH, m_pDequeueMessageBlock, &m_lMessageLength, &m_pCompCode, &m_pReasonCode);
if(m_pCompCode == MQCC_OK)
{
if(memcmp(m_mdMQDequeue.Format, MQFMT_RF_HEADER, MQ_FORMAT_LENGTH) == 0)
{
m_pDequeMQRFHeader = (PMQRFH)(m_pDequeueMessageBlock);
pBCSXMLData = (char *)(m_pDequeueMessageBlock + m_pDequeMQRFHeader->StrucLength);
return pBCSXMLData;
}
}
else if(m_pReasonCode != MQRC_NO_MSG_AVAILABLE)
{
// If this fails, Disconnect MQ and ask user to re-login
m_bRestartCSWGUI = TRUE;
m_strError.Format("DequeueBCSXMLMessage(): MQGET Broker Failed: Completion Code %d | Reason %d-%s", m_pCompCode, m_pReasonCode, GetReason());
LogAndReport("WARNING !!! Unable to Connect to Receive Orders. Please STOP CSW GUI and Login Again.");
Disconnect();
}
return "";
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////// |
|
Back to top |
|
 |
mvic |
Posted: Mon Feb 27, 2006 10:11 am Post subject: |
|
|
 Jedi
Joined: 09 Mar 2004 Posts: 2080
|
gsaluja wrote: |
$ mqver
Name: WebSphere MQ
Version: 530
CMVC level: p000-L020617
BuildType: IKAP - (Production) |
(Might be time to put the latest maintenance on there... that code is over 3.5 years old) |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Feb 27, 2006 10:47 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
As well since CSD08 the pub/sub engine is no longer a support pack but part of the product....  _________________ MQ & Broker admin |
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|