Author |
Message
|
steffi2 |
Posted: Fri Sep 11, 2009 12:22 pm Post subject: Can I attach a message listener to a temp queue's receiver? |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
If I create a temporary queue am I allowed to async consume messages from that queue?
so I want to create a receiver for my temporary queue and attach a message listener to it.
secondly are the message listeners consumers from a temporary queue running in their own async delivery thread?
I want to be able to pull off the main queue and then dispatch to temporary queues which themselves can be asynchronously consumed independently of the main async delivery thread.
Is that possible?
Maybe it's because currently I have only one session where as perhaps if I had two sessions and create the temporary queues of the second session and not the first I won't get the error I'm seeing. I'll try that now.
Well I got this working but I also tried to create a separate session for each temporaryQueue that I wanted to create. Anyway for some reason I'm now getting the following error
javax.jms.JMSException: MQJMS1013: operation invalid whilst session is using asynchronous delivery
at com.ibm.mq.jms.services.ConfigEnvironment.newException(ConfigEnvironment.java:540)
at com.ibm.mq.jms.MQQueueSender.send(MQQueueSender.java:234)
at com.ibm.mq.jms.MQQueueSender.send(MQQueueSender.java:272)
need to
Looks like I need to create an additional session.
If I have temporaryQueues created from a session and I want each temporaryQueue to be consumed concurrently is it necessary for each queue to be assigned it's own session as opposed to belonging to the same session? |
|
Back to top |
|
 |
atheek |
Posted: Fri Sep 11, 2009 5:31 pm Post subject: |
|
|
 Partisan
Joined: 01 Jun 2006 Posts: 327 Location: Sydney
|
As per MQJMS, if your session is set for asynchronous delivery, then all synchronous operations [ Send, receive etc] are illegal using the same session.
From using Java Manual
Quote: |
Note: Using asynchronous delivery with a message consumer marks the entire session as using asynchronous delivery. An application cannot call the receive() methods of a message consumer if the message consumer is associated with a session that is using asynchronous delivery. |
I think it makes sense. Since sessions are single threaded, it may be in use by the MQJMS thread delivering messages asynchronously to your Message Listener. At that time you don't need your application thread to use the same session for synchronous operations.. |
|
Back to top |
|
 |
fjb_saper |
Posted: Sat Sep 12, 2009 10:46 am Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
If I have temporaryQueues created from a session and I want each temporaryQueue to be consumed concurrently is it necessary for each queue to be assigned it's own session as opposed to belonging to the same session? |
Now I am getting upset. I already described the model. Each consumer should not only have a separate session but also a separate connection.
Remember that everything is single threaded through the connection.
You want scalability and speed right? And don't forget to correctly disconnect once the resource isn't used anymore...  _________________ MQ & Broker admin |
|
Back to top |
|
 |
mqjeff |
Posted: Sat Sep 12, 2009 1:28 pm Post subject: |
|
|
Grand Master
Joined: 25 Jun 2008 Posts: 17447
|
fjb_saper wrote: |
Now I am getting upset. I already described the model. |
We are all Vox clamantis in deserto even if some of us went to good schools. |
|
Back to top |
|
 |
steffi2 |
Posted: Mon Sep 14, 2009 10:09 am Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
Oops sorry I had not seen this until now so you're saying that if I want concurrent listeners they each need their own connection. So whenever I have mutliple sessions derived from the same connection that's serially delivered traffic to each of those sessions.
I have changed my code to create a separate connection for each of the bucket queues listeners and I even have a different session for the listener vs the session that it use to dispatch to that queue. However I still see
asyncDelivery0]14 Sep 2009 13:02:32.060 | com.calyonfinancial.ttlistener.watcher.TTDispatchedDataWatcher | IMPORTANT | - getQueueId=3
[asyncDelivery0]14 Sep 2009 13:02:32.072 | com.calyonfinancial.ttlistener.watcher.TTDispatchedDataWatcher | IMPORTANT | - CRUCIAL_JMS_SUPPORT : E_CANNOT_SEND_MESSAGE : message: ID:414d512054454f4c4d455851412e514d7f0e484ad431962f
javax.jms.ResourceAllocationException: MQJMS2008: failed to open MQ queue
at com.ibm.mq.jms.MQQueueSession.getQueueOpenException(MQQueueSession.java:942)
at com.ibm.mq.jms.MQQueueSession.getOutputQueue(MQQueueSession.java:876)
at com.ibm.mq.jms.MQQueueSession.createSender(MQQueueSession.java:205)
occasionally but not always.
Is there a reason for that? In this case there was 10 temporary queues in use. |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Sep 14, 2009 1:11 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
However I still see
Code: |
asyncDelivery0]14 Sep 2009 13:02:32.060 | com.calyonfinancial.ttlistener.watcher.TTDispatchedDataWatcher | IMPORTANT | - getQueueId=3
[asyncDelivery0]14 Sep 2009 13:02:32.072 | com.calyonfinancial.ttlistener.watcher.TTDispatchedDataWatcher | IMPORTANT | - CRUCIAL_JMS_SUPPORT : E_CANNOT_SEND_MESSAGE : message: ID:414d512054454f4c4d455851412e514d7f0e484ad431962f
javax.jms.ResourceAllocationException: MQJMS2008: failed to open MQ queue
at com.ibm.mq.jms.MQQueueSession.getQueueOpenException(MQQueueSession.java:942)
at com.ibm.mq.jms.MQQueueSession.getOutputQueue(MQQueueSession.java:876)
at com.ibm.mq.jms.MQQueueSession.createSender(MQQueueSession.java:205)
|
occasionally but not always.
Is there a reason for that? In this case there was 10 temporary queues in use. |
OK you need to change your code.
A JMSException is useless per se. What you want is the provider exception.
That's contained in the LinkedException (jmse.getLinkedException(); )
This will give you the completion and reason code.
By the way if your queues are tempdyn they will not accept persistent messages...  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Mon Sep 14, 2009 1:25 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
Here's what that says
com.ibm.mq.MQException: MQJE001: Completion Code 2, Reason 2017
googling now  |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Sep 14, 2009 1:27 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
Seems you have a scope problem for your connection handle
Quote: |
=>mqrc 2017
2017 0x000007e1 MQRC_HANDLE_NOT_AVAILABLE |
 _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Mon Sep 14, 2009 1:37 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
I suppose more detail is needed
When I dispatch from my dispatch listener I've got a map that contains the other sessions that are needed for each of these "child/bucket" queues. When I create those queues I use a separate connection and a separate send and receive session for each queue. ie. the dispatch thread looks up a map for the session to use to send a message to the queue.
setup code looks like this
/* this pertains to the dispatch consumer */
connection = createQueueConnection(queueName);
session = createQueueSession(connection, queueName);
queue = createQueue(session, queueName);
receiver = createReceiver(session, queue);
queueCount = Integer.parseInt(idList.get(0));
/* these are the "child" queues that are dispatched to */
for (int i = 0; i<queueCount; i++) {
Integer anIntId = new Integer(i);
QueueConnection childConnection = createQueueConnection(queueName);
queueConnectionMap.put(anIntId, childConnection);
QueueSession childReceiveSession = createQueueSession(childConnection, queueName);
QueueSession childSendSession = createQueueSession(childConnection, queueName, true);
Queue tempQueue = childReceiveSession.createTemporaryQueue();
// attach message listener to the queue
MessageListener messageListener = new TTDataProcessor(controller, new Integer(i+1), idList);
QueueReceiver rec = createReceiver(childReceiveSession, tempQueue, false);
rec.setMessageListener(messageListener);
queueMap.put(anIntId, tempQueue);
queueSessionMap.put(anIntId, childSendSession);
childConnection.start();
}
connection.start();
the dispatch thread in it's on message will write a message to the apprpriate queue and it does so by looking up the ID to use and then grabs the session needed to write to that queue out of the map that was constructed here. I'm not sure if this is the best approach but I figured I need to remember the sessions I'm using to setup the receivers of each of these queues in order to later close them when the time comes.
Q1. is it correct to use the session like this to write a message to these queues or should I be creating that lazily each time I do the write rather than assuming the one in the map is still valid. If you use temporary queues you pretty much have to retain the session and reference to the queue object to do anything with it anyway right?
Q2. is it necessary to have a separate session for both async receive and sync send for each child queue?
Q3. When do you benefit from using an anonymous queue sender?
I suspect I may be creating more resources than I need so I'd like to know what's the minumum needed.
I will shortly be moving to static queues instead of temporary queues but the idea is that with temporary queues I'm retaining the queue in a map that's used later when I want to write to the queue. When I move to static queues I could easily create the session needed to write to the queue at the time of the write but I doubt that that's optimal.
fjb_saper wrote: |
Seems you have a scope problem for your connection handle
Quote: |
=>mqrc 2017
2017 0x000007e1 MQRC_HANDLE_NOT_AVAILABLE |
 |
|
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Sep 14, 2009 1:56 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
steffi2 wrote: |
When I dispatch from my dispatch listener I've got a map that contains the other sessions that are needed for each of these "child/bucket" queues. When I create those queues I use a separate connection and a separate send and receive session for each queue. ie. the dispatch thread looks up a map for the session to use to send a message to the queue.
.....
the dispatch thread in it's on message will write a message to the apprpriate queue and it does so by looking up the ID to use and then grabs the session needed to write to that queue out of the map that was constructed here. I'm not sure if this is the best approach but I figured I need to remember the sessions I'm using to setup the receivers of each of these queues in order to later close them when the time comes.
|
Wrong pattern. The dispatcher should only lookup/determine which queue the message will be put on.
The receive and send operation should happen on the same thread and in the same session. You can use an anonymous sender and only fill in the destination when calling the send method on the dispatcher ...
Your receive and dispatch represents then an atomic operation that you can commit. (Use session commit for each message).
The work of the child sessions is to allow for the message to be picked up from the child queue and be processed via the onMessage method. The end of the onMessage method would then be to call a commit on its own session.
You should not use these child sessions for putting the message to the queue.
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Mon Sep 14, 2009 2:44 pm Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
Oh that all makes sense now that you talk about it. The only confusion I have then with that is how does that all relat to client acknowledge?
so I still use client acknowledge with a transacted session?
so my commit on the session at the end of onMessage is after I've already acknowledged right? |
|
Back to top |
|
 |
fjb_saper |
Posted: Mon Sep 14, 2009 6:48 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
You use connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
This will give you a transacted session.
Remember to always explicitly handle your transactions with a session.commit() or a session.rollback(). All messages handled by the session since the last commit()/ rollback will be part of the UOW.
Have fun  _________________ MQ & Broker admin |
|
Back to top |
|
 |
steffi2 |
Posted: Tue Sep 15, 2009 6:33 am Post subject: |
|
|
Apprentice
Joined: 04 Sep 2009 Posts: 31
|
so i'm using the same Session for both async receive from the dispatch queue and sync send to the child queue but isn't that the reason why I currently get the following error?
Are you sure that I can receive and send using the same session?
[asyncDelivery0]15 Sep 2009 09:26:41.736 | IMPORTANT | - CRUCIAL_JMS_SUPPORT : E_CANNOT_SEND_MESSAGE : message: ID:414d512054454f4c4d455851412e514d7f0e484af8bb8d2f
javax.jms.JMSException: MQJMS1013: operation invalid whilst session is using asynchronous delivery
at com.ibm.mq.jms.services.ConfigEnvironment.newException(ConfigEnvironment.java:540)
at com.ibm.mq.jms.MQQueueSender.send(MQQueueSender.java:342)
at com.ibm.mq.jms.MQQueueSender.send(MQQueueSender.java:454)
at
fjb_saper wrote: |
steffi2 wrote: |
When I dispatch from my dispatch listener I've got a map that contains the other sessions that are needed for each of these "child/bucket" queues. When I create those queues I use a separate connection and a separate send and receive session for each queue. ie. the dispatch thread looks up a map for the session to use to send a message to the queue.
.....
the dispatch thread in it's on message will write a message to the apprpriate queue and it does so by looking up the ID to use and then grabs the session needed to write to that queue out of the map that was constructed here. I'm not sure if this is the best approach but I figured I need to remember the sessions I'm using to setup the receivers of each of these queues in order to later close them when the time comes.
|
Wrong pattern. The dispatcher should only lookup/determine which queue the message will be put on.
The receive and send operation should happen on the same thread and in the same session. You can use an anonymous sender and only fill in the destination when calling the send method on the dispatcher ...
Your receive and dispatch represents then an atomic operation that you can commit. (Use session commit for each message).
The work of the child sessions is to allow for the message to be picked up from the child queue and be processed via the onMessage method. The end of the onMessage method would then be to call a commit on its own session.
You should not use these child sessions for putting the message to the queue.
Have fun  |
|
|
Back to top |
|
 |
fjb_saper |
Posted: Tue Sep 15, 2009 12:19 pm Post subject: |
|
|
 Grand High Poobah
Joined: 18 Nov 2003 Posts: 20756 Location: LI,NY
|
|
Back to top |
|
 |
|