ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » General IBM MQ Support » Concurrent XMS listeners don't pick up messages

Post new topic  Reply to topic
 Concurrent XMS listeners don't pick up messages « View previous topic :: View next topic » 
Author Message
zbyszanna
PostPosted: Tue Apr 21, 2015 5:57 am    Post subject: Concurrent XMS listeners don't pick up messages Reply with quote

Novice

Joined: 03 Feb 2012
Posts: 23

I've created a simple test which aims to create 10 concurrent XMS listeners that will process 10 messages at once.
I create 10 connections and one session for each connection. Then on each session I register a listener and start a connection.

The program picks up 10 messages from the queue and starts processing 4 of them, after they are processed, another 4 messages are taken from the queue. During this time the uncommited count is 10. It looks like only 4 listeners are processing the messages at any time.

I also tried creating all the infrastructure and start them in new threads, but to no avail.

What do I do wrong?

Program.cs
Code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace MultiTestXMS
{
    class Program
    {
        static void Main(string[] args)
        {
            debug("Starting application...");

            for (int i = 0; i < 10; ++i)
            {
                StartListener();
                //new Thread(new ThreadStart(StartListener)).Start();
            }

            Thread.Sleep(600 * 1000);
        }

        public static void StartListener()
        {
            Listener listener = new Listener();
        }

        protected static void debug(string text, params object[] args)
        {
            Console.WriteLine("Thread [" + Thread.CurrentThread.ManagedThreadId + "]: " + string.Format(text, args));
        }

    }
}


Listener.cs
Code:

using System;
using IBM.XMS;
using System.Threading;

namespace MultiTestXMS
{
    public class Listener
    {
        private ISession xmsSession;
        private int number;

        public Listener() :this(0)
        {
        }

        public Listener(int number)
        {
            this.number = number;

            Log("LogCreating XMS Listener");

            try
            {
                Log("LogXMS Listener creating factory");
                XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
                Log("LogXMS Listener creating connection factory");
                IConnectionFactory xmsConnectionFactory = factory.CreateConnectionFactory();
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, "10.200.0.1");
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_NAME, "QMGR1");
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, "CHL1");
                xmsConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, 1414);
                xmsConnectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);

                Log("LogXMS Listener creating connection");
                IConnection xmsConnection = xmsConnectionFactory.CreateConnection();

                Log("LogXMS Listener creating session");
                xmsSession = xmsConnection.CreateSession(true, AcknowledgeMode.AutoAcknowledge);
                IDestination xmsQueueIn = xmsSession.CreateQueue("QUEUE1");
                Log("LogXMS Listener creating consumer");
                IMessageConsumer xmsMessageConsumerIn = xmsSession.CreateConsumer(xmsQueueIn);
                xmsMessageConsumerIn.MessageListener = OnMessage;
                Log("LogXMS Listener created");

                Log("LogXMS Listener starting connection");
                xmsConnection.Start();
            }
            catch (Exception e)
            {
                Log("LogXMS Listener creation failed: " + e.Message);
            }
        }

        private void OnMessage(IMessage message)
        {
            Log("Message: " + message.JMSMessageID);
            System.Threading.Thread.Sleep(60 * 1000);
            xmsSession.Commit();
        }

        private void Log(String msg)
        {
            System.Console.WriteLine("WRK [" + number + "," + Thread.CurrentThread.ManagedThreadId + "]: " + msg);
        }
    }
}




The queue:
Code:

AMQ8450: Display queue status details.
   QUEUE(QUEUE1)             TYPE(QUEUE)
   CURDEPTH(404)                           IPPROCS(0)
   LGETDATE( )                             LGETTIME( )
   LPUTDATE( )                             LPUTTIME( )
   MEDIALOG(S0159774.LOG)                  MONQ(OFF)
   MSGAGE( )                               OPPROCS(1)
   QTIME( , )                              UNCOM(NO)



During work:
Code:

AMQ8450: Display queue status details.
   QUEUE(QUEUE1)             TYPE(QUEUE)
   CURDEPTH(394)                           IPPROCS(10)
   LGETDATE( )                             LGETTIME( )
   LPUTDATE( )                             LPUTTIME( )
   MEDIALOG(S0159774.LOG)                  MONQ(OFF)
   MSGAGE( )                               OPPROCS(1)
   QTIME( , )                              UNCOM(10)



Client:
Windows 7 64bit
mq: 7.5.0.4
xms: 2.5.0.4

Server:
AIX 6.1
mq: 7.0.12

I also tried matching mq client on the client machine with the same result.

Thank you in advance.
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Tue Apr 21, 2015 12:41 pm    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20700
Location: LI,NY

Ok... so

Why not have a single factory spawn a single connection per listener thread... ?

You did not specify what the sharecnv value of the channel is. Please set it to 1.

You're not logging anything once inside the onMessage... perhaps an oversight?
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
zbyszanna
PostPosted: Wed Apr 22, 2015 12:52 am    Post subject: Reply with quote

Novice

Joined: 03 Feb 2012
Posts: 23

Thank you for your answer.

fjb_saper wrote:

Why not have a single factory spawn a single connection per listener thread... ?


I just wanted to avoid any possible bottle neck. It's just for this example. Could it be a problem here?

Changed it and there is no difference (with SHARECNV = 1).

fjb_saper wrote:

You did not specify what the sharecnv value of the channel is. Please set it to 1.


It was originally set to 10. I changed it to 1 and restarted the application. Unfortunately I can see no difference in behavior. 10 messages are taken from the queue, 10 are in uncommited state, 4 are being processed and when they finish, another 4 (and so on).

Code:

CHANNEL(CHANNEL1)                    CHLTYPE(SVRCONN)
   ALTDATE(2015-04-22)                     ALTTIME(10.39.31)
   COMPHDR(NONE)                           COMPMSG(NONE)
   DESCR()                           HBINT(300)
   KAINT(AUTO)                             MAXINST(999999999)
   MAXINSTC(999999999)                     MAXMSGL(10000000)
   MCAUSER(mquser)                        MONCHL(QMGR)
   RCVDATA( )                              RCVEXIT( )
   SCYDATA( )                              SCYEXIT( )
   SENDDATA( )                             SENDEXIT( )
   SHARECNV(1)                             SSLCAUTH(REQUIRED)
   SSLCIPH( )                              SSLPEER( )
   TRPTYPE(TCP)                         



fjb_saper wrote:

You're not logging anything once inside the onMessage... perhaps an oversight?


I'm not? There is Log(...) in the first line and the message ids do appear in the logs. Could you elaborate?[/code][/quote]
Back to top
View user's profile Send private message
zbyszanna
PostPosted: Wed Apr 22, 2015 1:21 am    Post subject: Reply with quote

Novice

Joined: 03 Feb 2012
Posts: 23

dis chs(...) gives 20 instances of the channel now.

Example channel status:

Code:

   CHANNEL(CHANNEL1)                    CHLTYPE(SVRCONN)
   BUFSRCVD(8)                             BUFSSENT(7)
   BYTSRCVD(1512)                          BYTSSENT(1644)
   CHSTADA(2015-04-22)                     CHSTATI(11.14.59)
   COMPHDR(NONE,NONE)                      COMPMSG(NONE,NONE)
   COMPRATE(0,0)                           COMPTIME(0,0)
   CONNAME(10.200.0.2)                  CURRENT
   EXITTIME(0,0)                           HBINT(300)
   JOBNAME(017F026C000000D7)               LOCLADDR(::ffff:10.200.0.1(1414))
   LSTMSGDA(2015-04-22)                    LSTMSGTI(11.15.00)
   MCASTAT(RUNNING)                        MCAUSER(user1)
   MONCHL(OFF)                             MSGS(5)
   RAPPLTAG(bin\Release\MutliTestXMS.exe)
   SSLCERTI( )                             SSLKEYDA( )
   SSLKEYTI( )                             SSLPEER( )
   SSLRKEYS(0)                             STATUS(RUNNING)
   STOPREQ(NO)                             SUBSTATE(RECEIVE)
   CURSHCNV(1)                             MAXSHCNV(1)
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Wed Apr 22, 2015 4:46 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20700
Location: LI,NY

Sorry missed the log line.
20 connections are expected. One to browse and one to process.
You might get down to 11 if you had one connection and 10 sessions?
At a minimum each thread needs to have its own session, but as you are not using a J2EE MDB with multiple instances, that is what I would have expected.
Are you still seeing the blocking after 4 reads / gets?
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
mqjeff
PostPosted: Wed Apr 22, 2015 4:48 am    Post subject: Reply with quote

Grand Master

Joined: 25 Jun 2008
Posts: 17447

What happens if you move the sleep in the OnMessage to after the commit?

What happens if you remove the sleep entirely?
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Wed Apr 22, 2015 5:10 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20700
Location: LI,NY

I thought the sleep was in there to exaggerate and mimic the behavior of the actual app processing the message, and what the OP was showing us was a stub...

This not withstanding (and assuming that the shareconv was indeed set to either 0 or 1), the app should still consume 10 messages by 10 messages... However this might be somewhat delayed depending on the delay between the threads... Assuming the delay between the threads is negligible in view of the 1 min delay inside the thread, there should be 10 messages at a time being consumed, assuming again that 10 messages or more are available on the input queue.

Let's also not forget the polling interval, but then again 5 seconds (default polling interval) are puny against a full minute...

The OP did not specify the version of MQ. There might also be a APAR concerning this type of behavior... if not on the latest release....
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
fjb_saper
PostPosted: Wed Apr 22, 2015 5:18 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20700
Location: LI,NY

Quote:
It was originally set to 10. I changed it to 1 and restarted the application. Unfortunately I can see no difference in behavior. 10 messages are taken from the queue, 10 are in uncommited state, 4 are being processed and when they finish, another 4 (and so on).

So if 10 messages are being taken from the queue and in uncommitted state, are you saying that only 4 of the 10 were delivered to the onMessage method?
Please add a log line after the commit so that you can track how many messages were actually being processed when taking a snapshot... And could you please also log the thread ID, not just the messageID?

Due to thread drift you could see consumption of 4 messages at a time, meaning that 6 other threads were still busy processing...
Being able to determine if the thread was in fact processing or not should help clear this up. Also make sure to catch the ThreadInterruptedException or equivalent for the sleep command.

Also do you see a change in behavior if you set the sharecnv to 0?


_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
zbyszanna
PostPosted: Wed Apr 22, 2015 6:37 am    Post subject: Reply with quote

Novice

Joined: 03 Feb 2012
Posts: 23

We have this problem in a more complex scenario and I created this sample for the sake of finding the problem.

I posted the MQ and XMS versions in the first message. For your convenience my first tests were performed on:
client:
mq: 7.5.0.4
xms: 2.5.0.4
Windows 7 64bit

server:
7.0.1.12
AIX 6.1

I currently run tests on:
mq: 7.0.1.12
xms: 2.0.0.12

I'm already logging the thread id together with a message id. Now I've added also HashCode of the current Listener. It looks like the listener objects are different, but there are only four threads that work on them.

Code:

using System;
using IBM.XMS;
using System.Threading;

namespace MultiTestXMS
{
    public class Listener
    {
        private ISession xmsSession;
        private int number;

        private static XMSFactoryFactory factory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);
        private static IConnectionFactory xmsConnectionFactory = factory.CreateConnectionFactory();

        public Listener() :this(0)
        {
            number = this.GetHashCode();
        }

        public Listener(int number)
        {
            this.number = number;

            Log("LogCreating XMS Listener");

            try
            {
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, "10.200.0.1");
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_NAME, "QMGR1");
                xmsConnectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, "CHANNEL1");
                xmsConnectionFactory.SetIntProperty(XMSC.WMQ_PORT, 1414);
                xmsConnectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);

                Log("LogXMS Listener creating connection");
                IConnection xmsConnection = xmsConnectionFactory.CreateConnection();

                Log("LogXMS Listener creating session");
                xmsSession = xmsConnection.CreateSession(true, AcknowledgeMode.AutoAcknowledge);
                IDestination xmsQueueIn = xmsSession.CreateQueue("QUEUE1");
                Log("LogXMS Listener creating consumer");
                IMessageConsumer xmsMessageConsumerIn = xmsSession.CreateConsumer(xmsQueueIn);
                xmsMessageConsumerIn.MessageListener = OnMessage;
                Log("LogXMS Listener created");

                Log("LogXMS Listener starting connection");
                xmsConnection.Start();
            }
            catch (Exception e)
            {
                Log("LogXMS Listener creation failed: " + e.Message);
            }
        }

        private void OnMessage(IMessage message)
        {
            Log("OnMessage: " + message.JMSMessageID);
            //System.Threading.Thread.Sleep(60 * 1000);
            xmsSession.Commit();
            Log("OnMessage: " + message.JMSMessageID + ", finished.");
        }

        private void Log(String msg)
        {
            System.Console.WriteLine("WRK [" + number + "," + Thread.CurrentThread.ManagedThreadId + "]: " + msg);
        }
    }
}


When I remove the sleep:
Both single and multi threaded mode: 4 threads process the messages immediately.
Code:

WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff46
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff45
WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff46, finished.
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff45, finished.
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff47
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff47, finished.
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff48
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff48, finished.
WRK [12068108,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff49
WRK [12068108,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff49, finished.
WRK [33439030,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4a
WRK [33439030,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4a, finished.
WRK [45013120,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4b
WRK [45013120,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4b, finished.
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4c
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4c, finished.
WRK [65192075,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4d
WRK [65192075,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4d, finished.
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4e
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4e, finished.
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff4f
...
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff88, finished.
WRK [65192075,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff89
WRK [65192075,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff89, finished.
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8a
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8a, finished.
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8b
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8b, finished.
WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8c
WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8c, finished.
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8d
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8d, finished.
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8e
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8e, finished.
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8f
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff8f, finished.
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff90
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff90, finished.
WRK [45013120,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff91
WRK [45013120,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff91, finished.
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff92
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff92, finished.
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff93
WRK [56431129,12]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff93, finished.
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff94
WRK [10579059,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff94, finished.
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff95
WRK [58903342,11]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff95, finished.
WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff96
WRK [7425030,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff96, finished.
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff97
WRK [2425794,10]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff97, finished.
WRK [31864982,17]: OnMessage: ID:414d5120444d5142524b31202020202055352507200cff98
WRK [31864982,17]: OnMessage:



After setting SHARECNV to 0, 10 messages are being processed at the same time (with 60 second sleep) so it works as desired. Any comment on that?
Back to top
View user's profile Send private message
fjb_saper
PostPosted: Wed Apr 22, 2015 8:36 am    Post subject: Reply with quote

Grand High Poobah

Joined: 18 Nov 2003
Posts: 20700
Location: LI,NY

There was something in one of the technotes possibly about setting the sharecnv back to the V6 behavior for better performance on mdbs.

Seems that with your version mix, you hit the case...
Feel free to google the sharecnv term. You might find there the official documentation for the case.

Whenever you get to V8 you should try again with SHARECNV=1.

Glad it resolves the problem for you.
_________________
MQ & Broker admin
Back to top
View user's profile Send private message Send e-mail
PaulClarke
PostPosted: Wed Apr 22, 2015 9:27 am    Post subject: Reply with quote

Grand Master

Joined: 17 Nov 2005
Posts: 1002
Location: New Zealand

Have you considered read-ahead? If there are only a 'few' messages then read-ahead can have the effect of sending all the messages down one connection. Try disabling read-ahead and trying again.

Cheers,
Paul.
_________________
Paul Clarke
MQGem Software
www.mqgem.com
Back to top
View user's profile Send private message Visit poster's website
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » General IBM MQ Support » Concurrent XMS listeners don't pick up messages
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.