ASG
IBM
Zystems
Cressida
Icon
Netflexity
 
  MQSeries.net
Search  Search       Tech Exchange      Education      Certifications      Library      Info Center      SupportPacs      LinkedIn  Search  Search                                                                   FAQ  FAQ   Usergroups  Usergroups
 
Register  ::  Log in Log in to check your private messages
 
RSS Feed - WebSphere MQ Support RSS Feed - Message Broker Support

MQSeries.net Forum Index » IBM MQ Java / JMS » Application Segmentation

Post new topic  Reply to topic
 Application Segmentation « View previous topic :: View next topic » 
Author Message
mandar.kharat
PostPosted: Wed Jun 10, 2009 8:02 am    Post subject: Application Segmentation Reply with quote

Newbie

Joined: 10 Jun 2009
Posts: 1

Hello,
I have coded the Application level segmentation in Java. The happy path works well and I am able to read all the messages once segemented and written to a queue.
The scenario when an exception occurs and the loop breaks and I have to write messages after reading the sequence from a status queue is not read correctly. I get a Reason code '2247'. I have posted my code here, after stripping the MQ configuration. Please can someone help me understand if I am missing something here.

I have pasted my code below, as I did not find any way to attach the files.
All your help appreciated. Any links to functional code would help.

Regards
Mandar.


################################################
Code:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;
import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;

public class MQSegmentation {
   /* Removed the MQ Configuration, please add that
    * */
   
    // Set up the options on the queue we wish to open...
   protected int openOptions  = MQConstants.MQOO_OUTPUT + MQConstants.MQOO_FAIL_IF_QUIESCING + MQConstants.MQMF_SEGMENTATION_ALLOWED;
   protected MQQueueManager qMgr;
   protected MQQueue brass_read_stat_queue;
   protected MQQueue brass_write_stat_queue;
   protected MQQueue brass_test_queue;
   protected String groupId;
   protected int seqNum;
   protected int bufferSize = 524288;
   protected int commitpoint=2;
   protected String grpConst = "groupId";
   protected String msgSeqNumConst = "messageSequenceNumber";
   protected String filePath = "C:/apps_data_01/brass/billpay/R716/translated/testxml.xml";

   @SuppressWarnings("unchecked")
   protected MQSegmentation(){
        MQEnvironment.hostname = hostName;
        MQEnvironment.channel = channel;
        MQEnvironment.port = port;
        MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);

        try {
           //Create a connection to the queue manager
           qMgr = new MQQueueManager(qmanager);
        } catch (MQException e) {
         e.printStackTrace();
        }      
   }
   
   protected void openQueues(){
      try {
         System.out.println("Opening queues...");
         brass_test_queue = qMgr.accessQueue(testQueue, openOptions);
         brass_read_stat_queue = qMgr.accessQueue(readStatusQueue, openOptions);      
         brass_write_stat_queue = qMgr.accessQueue(writeStatusQueue, openOptions);   
      } catch (MQException e) {
         System.out.println("Error occured in opening the queues");
         e.printStackTrace();
      }
      
   }
   
   protected void closeQueues(){
      try {
         System.out.println("Closing queues...");
         brass_read_stat_queue.close();
         brass_write_stat_queue.close();         
         brass_test_queue.close();
      } catch (MQException e) {
         System.out.println("Error occured in closing the queues");
         e.printStackTrace();
      }
   }
   
   @SuppressWarnings({"deprecation", "unchecked" })
   protected Map getStatusQueue(MQQueue brass_stat_queue) throws MQException{
      Map<String, Integer> getMap = new HashMap();
      MQGetMessageOptions status_gmo = new MQGetMessageOptions();
      status_gmo.options = status_gmo.options + MQConstants.MQGMO_SYNCPOINT;
       status_gmo.options = status_gmo.options + MQConstants.MQGMO_WAIT;
       status_gmo.options = status_gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;
       MQMessage getMsg  = new MQMessage();   
       try{
          brass_stat_queue.get(getMsg, status_gmo);
          getMap = (HashMap)getMsg.readObject();
          qMgr.commit();
       }catch(Exception ex){
          MQException mqe = (MQException)ex;
          if(mqe.getReason() == 2033){
             System.out.println("No Messages Read");
          }
       }
       System.out.println("Got status Map : "+getMap);
       return getMap;
   }
   @SuppressWarnings({"deprecation", "unchecked" })
   private void putStatusQueue(String groupId, int seqNum, MQQueue brass_stat_queue) throws IOException, MQException{
       Map<String, Integer> putMap = new HashMap();
       MQMessage putMsg  = new MQMessage();   
      MQPutMessageOptions status_pmo = new MQPutMessageOptions();
      status_pmo.options = MQConstants.MQPMO_SYNCPOINT;
      putMap.put(groupId, seqNum);
      putMsg.writeObject(putMap);
      System.out.println("Put status Map : "+putMap);
      brass_stat_queue.put(putMsg, status_pmo);
      qMgr.commit();

   }
   
   protected int getOffset(MQQueue brass_stat_queue) throws MQException{
      Map map = readStatusMap(brass_stat_queue);
      int offset=0;
      if(!map.isEmpty()){
         offset=bufferSize*seqNum;
         return offset;
      }else{
         return offset;
      }
   }
   
   protected Map readStatusMap(MQQueue brass_stat_queue) throws MQException{
      Map map = getStatusQueue(brass_stat_queue);
      Set keys = map.keySet();
      Object[] groupArray = keys.toArray();
      if(groupArray.length!=0){
         groupId = (String)groupArray[0];//convertByteToStr(((String)groupArray[0]).getBytes());
         System.out.println("groupId = "+groupId);
         seqNum = ((Integer)map.get(groupId)).intValue();
      }
      return map;
   }
   
   protected int updateUOW(int uow, MQMessage mqMsg, MQQueue brass_stat_queue) throws MQException, IOException{
      System.out.println("Setting the sequence number and group Id");
      seqNum = mqMsg.messageSequenceNumber;
      groupId = convertByteToStr(mqMsg.groupId);//new String(mqMsg.groupId);
      
      if(uow==commitpoint){
/*         if(mqMsg.messageFlags != MQConstants.MQMF_LAST_MSG_IN_GROUP){
            System.out.println("Commiting Unit Of Work");
            qMgr.commit();
            uow=1;
         }
*/         System.out.println("Commiting Unit Of Work");
         qMgr.commit();
         uow=1;
         
         //Get the Group and Seq num
/*         System.out.println("Setting the sequence number and group Id");
         seqNum = mqMsg.messageSequenceNumber;
         groupId = convertByteToStr(mqMsg.groupId);//new String(mqMsg.groupId);
*/         System.out.println("updateUOW: Sequence number :"+seqNum+" Group Id :"+groupId);
         getStatusQueue(brass_stat_queue);
         putStatusQueue(groupId, seqNum, brass_stat_queue);
         
      }else{
         uow=uow+1;
      }      
      return uow;
   }   
   
   protected String convertByteToStr(byte[] byteArr) throws NullPointerException{
/*       StringBuffer buffer = new StringBuffer();
         for(int i=0; i<byteArr.length; i++){
            buffer.append((char)byteArr[i]);
         }
      return buffer.toString();*/
      String s = Base64.encode(byteArr);
      return s;
   }
   
   protected byte[] convertStrTByte(String s) throws NullPointerException, IOException{
      return Base64.decode(s);
   }
}



################################################



import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.constants.MQConstants;

public class MQSegmentationWrite extends MQSegmentation{

   
   @SuppressWarnings({ "deprecation", "unchecked" })
   public MQSegmentationWrite(){
      super();
   }
   
   public static void main(String[] args) {
      MQSegmentationWrite testMqSegmentWrite = new MQSegmentationWrite();
      testMqSegmentWrite.testWriteSegmentation();
   }
   
   @SuppressWarnings({ "unchecked", "deprecation" })
   private void testWriteSegmentation() {
      try{      
           FileInputStream reader = new FileInputStream(new File(filePath));
           byte[] inputFileByteArr = new byte[reader.available()];
           reader.read(inputFileByteArr);
            // Now specify the queue that we wish to open, and the open options...
            openQueues();
            MQMessage mqMsg  = new MQMessage();   
            MQPutMessageOptions pmo = new MQPutMessageOptions();
            int ii=0;
            try {
                  long offset=0;
                  
                  int ioStreamOffset = getOffset(brass_write_stat_queue);
                  if(ioStreamOffset!=0){
                     //mqMsg.setBytesProperty(grpConst, convertStrTByte(groupId));
                     //mqMsg.setIntProperty(msgSeqNumConst, seqNum);
                     mqMsg.groupId = convertStrTByte(groupId);
                     mqMsg.messageSequenceNumber = seqNum+1;                     
                  }
                  System.out.println("ioStreamOffset= "+ioStreamOffset);
                  ByteArrayInputStream inputStream = new ByteArrayInputStream(inputFileByteArr, ioStreamOffset, inputFileByteArr.length);
                  System.out.println("Start of message write.....");
                  int uow=1;
                  do{
                     byte[] segmentedByteArr = null;
                     System.out.println("available="+inputStream.available());
                     
                     if(!(bufferSize>=inputStream.available())){
                        segmentedByteArr = new byte[bufferSize];
                        inputStream.read(segmentedByteArr, 0, bufferSize);
                        if(offset==0 && ioStreamOffset!=0){
                           // Set the MQPMO_LOGICAL_ORDER for the subsequent messages; following IBM documentation
                           pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;
                        }else{
                           pmo.options = MQConstants.MQPMO_LOGICAL_ORDER;
                           pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;                           
                        }
                        
                        mqMsg.messageFlags = MQConstants.MQMF_MSG_IN_GROUP;
                        //mqMsg.messageFlags = mqMsg.messageFlags + MQConstants.MQMF_SEGMENT;

                        // Write message to queue
                        System.out.println("Writing message to queue");
                        mqMsg.format = MQConstants.MQFMT_STRING;
                        mqMsg.write(segmentedByteArr);
                     
                        brass_test_queue.put(mqMsg, pmo);
                        uow = updateUOW(uow, mqMsg, brass_write_stat_queue);
                        offset=offset+bufferSize;
                        ii++;
/*                        if(ii==6){
                           System.out.println("Explicitly breaking the loop ....... :"+ii);
                           break;
                        }*/
                     }else{
                        mqMsg.messageFlags = MQConstants.MQMF_LAST_MSG_IN_GROUP;
                        //mqMsg.messageFlags = mqMsg.messageFlags + MQConstants.MQMF_LAST_SEGMENT;
                        pmo.options = MQConstants.MQPMO_LOGICAL_ORDER;
                        pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;                           
                        
                        segmentedByteArr = new byte[inputStream.available()];
                        inputStream.read(segmentedByteArr, 0, inputStream.available());
                        // Write message to queue
                        System.out.println("Writing last message to queue");
                        mqMsg.write(segmentedByteArr);
                        brass_test_queue.put(mqMsg, pmo);
                        System.out.println("Commiting Last Unit Of Work");
//                         Updating the sequence number
                        updateUOW(commitpoint, mqMsg, brass_write_stat_queue);
                        // Remove the latest status message from the queue
                        // as all the messages are committed
                        System.out.println("Clearing the status queue");
                        getStatusQueue(brass_write_stat_queue);
                        // Uncomment the below line to set a status
                        //putStatusQueue(groupId, 5);
                     }
                  }while(inputStream.available()!=0);
                  System.out.println("End of writing");
               } catch (MQException e) {
                  e.printStackTrace();
                  if (e instanceof MQException) {
                     MQException mqe = (MQException) e;
                     if (mqe.completionCode == 2 && mqe.reasonCode == 2033) {
                        // no message to read
                        System.out.println("No message read");
                     }
                  }
               }
            // Close the queue and qmanager...
           closeQueues();
            qMgr.disconnect();
          }catch (Exception ex){
            System.out.println("An error occurred while reading the message " + ex.getMessage());
            ex.printStackTrace();
          }
   }

}


################################################


import java.util.Map;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.constants.MQConstants;

public class MQSegmentationRead extends MQSegmentation {
   
   @SuppressWarnings({ "deprecation", "unchecked" })
   public MQSegmentationRead(){
      super();
   }
   
   public static void main(String[] args) {
      MQSegmentationRead testMqSegmentRead = new MQSegmentationRead();
      testMqSegmentRead.testReadSegmentation();
   }   
   
   @SuppressWarnings("deprecation")
   private void testReadSegmentation(){
      try{
         // Declare the gmo here to avoid the compile errors
         MQGetMessageOptions gmo = new MQGetMessageOptions();
         MQMessage mqMsg = new MQMessage();
         openQueues();
         Map statusQueue = readStatusMap(brass_read_stat_queue);
         int uow=1;
         int ii = 0;
         boolean initCheck=false;
         boolean firstRead=false;
         boolean checkGmo=false;
      //do{
         do {
            
            try{
               System.out.println("statusQueue.isEmpty() = "+statusQueue.isEmpty());
               if(!statusQueue.isEmpty() && !initCheck){
                  gmo = new MQGetMessageOptions();
                  gmo.options =  MQConstants.MQGMO_COMPLETE_MSG;
                  gmo.options =  gmo.options + MQConstants.MQGMO_SYNCPOINT; 
                  gmo.options =  gmo.options + MQConstants.MQGMO_WAIT;
                  //gmo.options =  gmo.options + MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE;
                  gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
                  gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_MSG_SEQ_NUMBER;
                  
                  //readStatusMap();
                  System.out.println("Message Read sequence number :"+seqNum+" Group Id :"+groupId);
                  System.out.println("Bytes $$$$$$ "+new String(convertStrTByte(groupId)));
                  //mqMsg.setBytesProperty(grpConst, convertStrTByte(groupId));
                  mqMsg.groupId = convertStrTByte(groupId);
                  //mqMsg.setIntProperty(msgSeqNumConst, seqNum);
                  if(firstRead==false){
                     seqNum=0;
                     firstRead=true;
                  }
                  mqMsg.messageSequenceNumber = seqNum+1;
                  initCheck=true;
               }else{
                  readStatusMap(brass_read_stat_queue);
                  //if(checkGmo==false){
                     gmo = new MQGetMessageOptions();
                     gmo.options =  MQConstants.MQGMO_LOGICAL_ORDER;
                     gmo.options =  gmo.options + MQConstants.MQGMO_WAIT;
                     gmo.options =  gmo.options + MQConstants.MQGMO_COMPLETE_MSG;
                     gmo.options =  gmo.options + MQConstants.MQGMO_SYNCPOINT;
                     //gmo.options =  gmo.options + MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE;
                     if(ii!=0){
                        gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
                     }                     
                  //}else{
                     //checkGmo=false;
                  //}
                     

                     //gmo.options =  gmo.options + MQConstants.MQMF_MSG_IN_GROUP;
                     //gmo.options =  gmo.options + MQConstants.MQGMO_MSG_UNDER_CURSOR;
/*                     if(ii==0){
                        gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_NONE;
                     }else{
                        gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
                        //gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_MSG_SEQ_NUMBER;
                     }*/
                     mqMsg.groupId = convertStrTByte(groupId);
/*                     if(mqMsg.groupId!=null)
                        System.out.println("mqMsg.groupId="+new String(mqMsg.groupId));*/
                     mqMsg.messageSequenceNumber = seqNum;
                  System.out.println("Message Read sequence number :"+seqNum+" Group Id :"+groupId);
                  //System.out.println("Bytes $$$$$$ "+new String(convertStrTByte(groupId)));
               }
               System.out.println("Reading test queue...");
               brass_test_queue.get(mqMsg, gmo);
               uow = updateUOW(uow, mqMsg, brass_read_stat_queue);
               // Testing the resumption of Group
               ii++;
/*               if(ii==4){
                  System.out.println("Explicitly breaking the loop .......");
                  break;
               }*/               
               System.out.println("Completed reading test queue...");

               System.out.println("gmo.groupStatus="+gmo.groupStatus);
               if(gmo.groupStatus == MQConstants.MQGS_LAST_MSG_IN_GROUP)
                  getStatusQueue(brass_read_stat_queue);
            } catch (MQException e) {
               //e.printStackTrace();
               if (e instanceof MQException) {
                  MQException mqe = (MQException) e;
                     if (mqe.completionCode == 2 && mqe.reasonCode == 2033) {
                     // no message to read
                     System.out.println("No message read");
                     }
                     if (mqe.completionCode == 2 && mqe.reasonCode == 2247) {
                        System.out.println("Setting GMO..");
                        gmo = new MQGetMessageOptions();
                        //gmo.options =  gmo.options + MQConstants.MQGMO_SYNCPOINT;
                        //gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
                        checkGmo=true;
                     }                     
               }
            }
         } while (gmo.groupStatus != MQConstants.MQGS_LAST_MSG_IN_GROUP);
            // Close the queue and qmanager...
           closeQueues();
            qMgr.disconnect();
      //}while(true);

      }catch (Exception ex){
         System.out.println("An error occurred while reading the message " + ex.getMessage());
         ex.printStackTrace();
       }

   }

}

Back to top
View user's profile Send private message
Display posts from previous:   
Post new topic  Reply to topic Page 1 of 1

MQSeries.net Forum Index » IBM MQ Java / JMS » Application Segmentation
Jump to:  



You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
Protected by Anti-Spam ACP
 
 


Theme by Dustin Baccetti
Powered by phpBB © 2001, 2002 phpBB Group

Copyright © MQSeries.net. All rights reserved.