|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
WebSphere MQ – asynchronous message reading problem (.Net) |
« View previous topic :: View next topic » |
Author |
Message
|
AKomlev |
Posted: Tue Feb 18, 2014 5:42 am Post subject: WebSphere MQ – asynchronous message reading problem (.Net) |
|
|
Newbie
Joined: 18 Feb 2014 Posts: 1
|
Hello!
I have problem with async message reading from the IBM MQ in the several threads
Configuration
IBM MQ 7.1 on Windows, .Net 3.5
Problem:
I successful create connection to the MQ, after that - i can add some connectors to the queue, using several object «MQconnector » other one Connection object (IConnection).
In result a get the follow situation:
In the MQ Explorer I see 4 opened connection to the queue. Length of the queue is 0.
Using MQ Explorer I will add 6 test messages to the queue. ( messages body – 1, after 2,3,4,5,6 ) 4 messages will be removed from the queue (by count of opened connections) and processed long time (20 sec, see OnMessage(IMessage msg) handler). 2 Messages waiting unoccupied connections.
That`s Ok
Problem : Processing working just in 2 threads, not in 4, how could I expect. After 20 sec – started processing of the next 2 messages and etc.
In the first thread – messages 1,2,3
In the second thread – messages 4,5,6
It looks like that messages reserved for thread, by 3 per thread.
If I`ll create 7 connectors – messages will be processed by 3 threads, 10 connectors- by 4 threads.
What parameters define this behavior ?
How to do something for processing every message in the separate thread, 1 by open session to the queue ?
PS Sorry for my English.
Program code you can see below:
Code: |
using System;
using IBM.XMS;
namespace MQConsumer_Console
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Start");
var conn = new MqConn();
IConnection c = conn.Connect();
var con1 = new MQconnector(); con1.AddListener(c);
var con2 = new MQconnector(); con2.AddListener(c);
var con3 = new MQconnector(); con3.AddListener(c);
var con4 = new MQconnector(); con4.AddListener(c);
Console.WriteLine("END");
Console.ReadLine();
}
}
public class MqConn
{
public IConnection Connect()
{
Console.WriteLine("before XMSFactoryFactory");
XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
Console.WriteLine("before IConnectionFactory");
IConnectionFactory cf = factoryFactory.CreateConnectionFactory();
// Set the properties
cf.SetStringProperty(XMSC.WMQ_HOST_NAME, "10.21.249.75");
cf.SetIntProperty(XMSC.WMQ_PORT, 1491);
cf.SetStringProperty(XMSC.WMQ_CHANNEL, "SC.M00");
cf.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);
cf.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "M00.DMZ");
cf.SetStringProperty(XMSC.WMQ_SSL_CIPHER_SPEC, "DES_SHA_EXPORT");
cf.SetStringProperty(XMSC.WMQ_SSL_KEY_REPOSITORY, @"C:\SMKeys\MQ_dmz\key");
cf.SetStringProperty(XMSC.WMQ_SSL_PEER_NAME,
@"CN=QM_M00,OU=Test,O=Org,C=RU");
/**********************************************************************************/
Console.WriteLine("before connect");
IConnection connection = cf.CreateConnection();
Console.WriteLine("connect");
/**********************************************************************************/
return connection;
}
}
public class MQconnector
{
public void AddListener(IConnection connection)
{
ISession session = connection.CreateSession(false, AcknowledgeMode.AutoAcknowledge);
Console.WriteLine("Sesssion opened");
/**********************************************************************************/
IDestination destination = session.CreateQueue("Q.DMZ2");
Console.WriteLine("Destination Created");
destination.SetIntProperty(XMSC.DELIVERY_MODE, XMSC.DELIVERY_PERSISTENT);
/**********************************************************************************/
IMessageConsumer consumer = session.CreateConsumer(destination);
Console.WriteLine("Consumer");
/**********************************************************************************/
Console.WriteLine("Connection started");
consumer.MessageListener = new MessageListener(OnMessage);
connection.Start();
}
private void OnMessage(IMessage msg)
{
if (msg.GetType().FullName == "IBM.XMS.Client.Impl.XmsTextMessageImpl")
{
Console.WriteLine(DateTime.Now.ToString() + " >>>>>>> " + ((ITextMessage)msg).Text);
System.Threading.Thread.Sleep(20 * 1000);
}
}
}
}
|
|
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|