|
RSS Feed - WebSphere MQ Support
|
RSS Feed - Message Broker Support
|
 |
|
Application Segmentation |
« View previous topic :: View next topic » |
Author |
Message
|
mandar.kharat |
Posted: Wed Jun 10, 2009 8:02 am Post subject: Application Segmentation |
|
|
Newbie
Joined: 10 Jun 2009 Posts: 1
|
Hello,
I have coded the Application level segmentation in Java. The happy path works well and I am able to read all the messages once segemented and written to a queue.
The scenario when an exception occurs and the loop breaks and I have to write messages after reading the sequence from a status queue is not read correctly. I get a Reason code '2247'. I have posted my code here, after stripping the MQ configuration. Please can someone help me understand if I am missing something here.
I have pasted my code below, as I did not find any way to attach the files.
All your help appreciated. Any links to functional code would help.
Regards
Mandar.
################################################
Code: |
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;
import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
public class MQSegmentation {
/* Removed the MQ Configuration, please add that
* */
// Set up the options on the queue we wish to open...
protected int openOptions = MQConstants.MQOO_OUTPUT + MQConstants.MQOO_FAIL_IF_QUIESCING + MQConstants.MQMF_SEGMENTATION_ALLOWED;
protected MQQueueManager qMgr;
protected MQQueue brass_read_stat_queue;
protected MQQueue brass_write_stat_queue;
protected MQQueue brass_test_queue;
protected String groupId;
protected int seqNum;
protected int bufferSize = 524288;
protected int commitpoint=2;
protected String grpConst = "groupId";
protected String msgSeqNumConst = "messageSequenceNumber";
protected String filePath = "C:/apps_data_01/brass/billpay/R716/translated/testxml.xml";
@SuppressWarnings("unchecked")
protected MQSegmentation(){
MQEnvironment.hostname = hostName;
MQEnvironment.channel = channel;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY, MQConstants.TRANSPORT_MQSERIES);
try {
//Create a connection to the queue manager
qMgr = new MQQueueManager(qmanager);
} catch (MQException e) {
e.printStackTrace();
}
}
protected void openQueues(){
try {
System.out.println("Opening queues...");
brass_test_queue = qMgr.accessQueue(testQueue, openOptions);
brass_read_stat_queue = qMgr.accessQueue(readStatusQueue, openOptions);
brass_write_stat_queue = qMgr.accessQueue(writeStatusQueue, openOptions);
} catch (MQException e) {
System.out.println("Error occured in opening the queues");
e.printStackTrace();
}
}
protected void closeQueues(){
try {
System.out.println("Closing queues...");
brass_read_stat_queue.close();
brass_write_stat_queue.close();
brass_test_queue.close();
} catch (MQException e) {
System.out.println("Error occured in closing the queues");
e.printStackTrace();
}
}
@SuppressWarnings({"deprecation", "unchecked" })
protected Map getStatusQueue(MQQueue brass_stat_queue) throws MQException{
Map<String, Integer> getMap = new HashMap();
MQGetMessageOptions status_gmo = new MQGetMessageOptions();
status_gmo.options = status_gmo.options + MQConstants.MQGMO_SYNCPOINT;
status_gmo.options = status_gmo.options + MQConstants.MQGMO_WAIT;
status_gmo.options = status_gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;
MQMessage getMsg = new MQMessage();
try{
brass_stat_queue.get(getMsg, status_gmo);
getMap = (HashMap)getMsg.readObject();
qMgr.commit();
}catch(Exception ex){
MQException mqe = (MQException)ex;
if(mqe.getReason() == 2033){
System.out.println("No Messages Read");
}
}
System.out.println("Got status Map : "+getMap);
return getMap;
}
@SuppressWarnings({"deprecation", "unchecked" })
private void putStatusQueue(String groupId, int seqNum, MQQueue brass_stat_queue) throws IOException, MQException{
Map<String, Integer> putMap = new HashMap();
MQMessage putMsg = new MQMessage();
MQPutMessageOptions status_pmo = new MQPutMessageOptions();
status_pmo.options = MQConstants.MQPMO_SYNCPOINT;
putMap.put(groupId, seqNum);
putMsg.writeObject(putMap);
System.out.println("Put status Map : "+putMap);
brass_stat_queue.put(putMsg, status_pmo);
qMgr.commit();
}
protected int getOffset(MQQueue brass_stat_queue) throws MQException{
Map map = readStatusMap(brass_stat_queue);
int offset=0;
if(!map.isEmpty()){
offset=bufferSize*seqNum;
return offset;
}else{
return offset;
}
}
protected Map readStatusMap(MQQueue brass_stat_queue) throws MQException{
Map map = getStatusQueue(brass_stat_queue);
Set keys = map.keySet();
Object[] groupArray = keys.toArray();
if(groupArray.length!=0){
groupId = (String)groupArray[0];//convertByteToStr(((String)groupArray[0]).getBytes());
System.out.println("groupId = "+groupId);
seqNum = ((Integer)map.get(groupId)).intValue();
}
return map;
}
protected int updateUOW(int uow, MQMessage mqMsg, MQQueue brass_stat_queue) throws MQException, IOException{
System.out.println("Setting the sequence number and group Id");
seqNum = mqMsg.messageSequenceNumber;
groupId = convertByteToStr(mqMsg.groupId);//new String(mqMsg.groupId);
if(uow==commitpoint){
/* if(mqMsg.messageFlags != MQConstants.MQMF_LAST_MSG_IN_GROUP){
System.out.println("Commiting Unit Of Work");
qMgr.commit();
uow=1;
}
*/ System.out.println("Commiting Unit Of Work");
qMgr.commit();
uow=1;
//Get the Group and Seq num
/* System.out.println("Setting the sequence number and group Id");
seqNum = mqMsg.messageSequenceNumber;
groupId = convertByteToStr(mqMsg.groupId);//new String(mqMsg.groupId);
*/ System.out.println("updateUOW: Sequence number :"+seqNum+" Group Id :"+groupId);
getStatusQueue(brass_stat_queue);
putStatusQueue(groupId, seqNum, brass_stat_queue);
}else{
uow=uow+1;
}
return uow;
}
protected String convertByteToStr(byte[] byteArr) throws NullPointerException{
/* StringBuffer buffer = new StringBuffer();
for(int i=0; i<byteArr.length; i++){
buffer.append((char)byteArr[i]);
}
return buffer.toString();*/
String s = Base64.encode(byteArr);
return s;
}
protected byte[] convertStrTByte(String s) throws NullPointerException, IOException{
return Base64.decode(s);
}
}
################################################
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.constants.MQConstants;
public class MQSegmentationWrite extends MQSegmentation{
@SuppressWarnings({ "deprecation", "unchecked" })
public MQSegmentationWrite(){
super();
}
public static void main(String[] args) {
MQSegmentationWrite testMqSegmentWrite = new MQSegmentationWrite();
testMqSegmentWrite.testWriteSegmentation();
}
@SuppressWarnings({ "unchecked", "deprecation" })
private void testWriteSegmentation() {
try{
FileInputStream reader = new FileInputStream(new File(filePath));
byte[] inputFileByteArr = new byte[reader.available()];
reader.read(inputFileByteArr);
// Now specify the queue that we wish to open, and the open options...
openQueues();
MQMessage mqMsg = new MQMessage();
MQPutMessageOptions pmo = new MQPutMessageOptions();
int ii=0;
try {
long offset=0;
int ioStreamOffset = getOffset(brass_write_stat_queue);
if(ioStreamOffset!=0){
//mqMsg.setBytesProperty(grpConst, convertStrTByte(groupId));
//mqMsg.setIntProperty(msgSeqNumConst, seqNum);
mqMsg.groupId = convertStrTByte(groupId);
mqMsg.messageSequenceNumber = seqNum+1;
}
System.out.println("ioStreamOffset= "+ioStreamOffset);
ByteArrayInputStream inputStream = new ByteArrayInputStream(inputFileByteArr, ioStreamOffset, inputFileByteArr.length);
System.out.println("Start of message write.....");
int uow=1;
do{
byte[] segmentedByteArr = null;
System.out.println("available="+inputStream.available());
if(!(bufferSize>=inputStream.available())){
segmentedByteArr = new byte[bufferSize];
inputStream.read(segmentedByteArr, 0, bufferSize);
if(offset==0 && ioStreamOffset!=0){
// Set the MQPMO_LOGICAL_ORDER for the subsequent messages; following IBM documentation
pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;
}else{
pmo.options = MQConstants.MQPMO_LOGICAL_ORDER;
pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;
}
mqMsg.messageFlags = MQConstants.MQMF_MSG_IN_GROUP;
//mqMsg.messageFlags = mqMsg.messageFlags + MQConstants.MQMF_SEGMENT;
// Write message to queue
System.out.println("Writing message to queue");
mqMsg.format = MQConstants.MQFMT_STRING;
mqMsg.write(segmentedByteArr);
brass_test_queue.put(mqMsg, pmo);
uow = updateUOW(uow, mqMsg, brass_write_stat_queue);
offset=offset+bufferSize;
ii++;
/* if(ii==6){
System.out.println("Explicitly breaking the loop ....... :"+ii);
break;
}*/
}else{
mqMsg.messageFlags = MQConstants.MQMF_LAST_MSG_IN_GROUP;
//mqMsg.messageFlags = mqMsg.messageFlags + MQConstants.MQMF_LAST_SEGMENT;
pmo.options = MQConstants.MQPMO_LOGICAL_ORDER;
pmo.options = pmo.options + MQConstants.MQPMO_SYNCPOINT;
segmentedByteArr = new byte[inputStream.available()];
inputStream.read(segmentedByteArr, 0, inputStream.available());
// Write message to queue
System.out.println("Writing last message to queue");
mqMsg.write(segmentedByteArr);
brass_test_queue.put(mqMsg, pmo);
System.out.println("Commiting Last Unit Of Work");
// Updating the sequence number
updateUOW(commitpoint, mqMsg, brass_write_stat_queue);
// Remove the latest status message from the queue
// as all the messages are committed
System.out.println("Clearing the status queue");
getStatusQueue(brass_write_stat_queue);
// Uncomment the below line to set a status
//putStatusQueue(groupId, 5);
}
}while(inputStream.available()!=0);
System.out.println("End of writing");
} catch (MQException e) {
e.printStackTrace();
if (e instanceof MQException) {
MQException mqe = (MQException) e;
if (mqe.completionCode == 2 && mqe.reasonCode == 2033) {
// no message to read
System.out.println("No message read");
}
}
}
// Close the queue and qmanager...
closeQueues();
qMgr.disconnect();
}catch (Exception ex){
System.out.println("An error occurred while reading the message " + ex.getMessage());
ex.printStackTrace();
}
}
}
################################################
import java.util.Map;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.constants.MQConstants;
public class MQSegmentationRead extends MQSegmentation {
@SuppressWarnings({ "deprecation", "unchecked" })
public MQSegmentationRead(){
super();
}
public static void main(String[] args) {
MQSegmentationRead testMqSegmentRead = new MQSegmentationRead();
testMqSegmentRead.testReadSegmentation();
}
@SuppressWarnings("deprecation")
private void testReadSegmentation(){
try{
// Declare the gmo here to avoid the compile errors
MQGetMessageOptions gmo = new MQGetMessageOptions();
MQMessage mqMsg = new MQMessage();
openQueues();
Map statusQueue = readStatusMap(brass_read_stat_queue);
int uow=1;
int ii = 0;
boolean initCheck=false;
boolean firstRead=false;
boolean checkGmo=false;
//do{
do {
try{
System.out.println("statusQueue.isEmpty() = "+statusQueue.isEmpty());
if(!statusQueue.isEmpty() && !initCheck){
gmo = new MQGetMessageOptions();
gmo.options = MQConstants.MQGMO_COMPLETE_MSG;
gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;
gmo.options = gmo.options + MQConstants.MQGMO_WAIT;
//gmo.options = gmo.options + MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE;
gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_MSG_SEQ_NUMBER;
//readStatusMap();
System.out.println("Message Read sequence number :"+seqNum+" Group Id :"+groupId);
System.out.println("Bytes $$$$$$ "+new String(convertStrTByte(groupId)));
//mqMsg.setBytesProperty(grpConst, convertStrTByte(groupId));
mqMsg.groupId = convertStrTByte(groupId);
//mqMsg.setIntProperty(msgSeqNumConst, seqNum);
if(firstRead==false){
seqNum=0;
firstRead=true;
}
mqMsg.messageSequenceNumber = seqNum+1;
initCheck=true;
}else{
readStatusMap(brass_read_stat_queue);
//if(checkGmo==false){
gmo = new MQGetMessageOptions();
gmo.options = MQConstants.MQGMO_LOGICAL_ORDER;
gmo.options = gmo.options + MQConstants.MQGMO_WAIT;
gmo.options = gmo.options + MQConstants.MQGMO_COMPLETE_MSG;
gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;
//gmo.options = gmo.options + MQConstants.MQGMO_ALL_SEGMENTS_AVAILABLE;
if(ii!=0){
gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
}
//}else{
//checkGmo=false;
//}
//gmo.options = gmo.options + MQConstants.MQMF_MSG_IN_GROUP;
//gmo.options = gmo.options + MQConstants.MQGMO_MSG_UNDER_CURSOR;
/* if(ii==0){
gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_NONE;
}else{
gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
//gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_MSG_SEQ_NUMBER;
}*/
mqMsg.groupId = convertStrTByte(groupId);
/* if(mqMsg.groupId!=null)
System.out.println("mqMsg.groupId="+new String(mqMsg.groupId));*/
mqMsg.messageSequenceNumber = seqNum;
System.out.println("Message Read sequence number :"+seqNum+" Group Id :"+groupId);
//System.out.println("Bytes $$$$$$ "+new String(convertStrTByte(groupId)));
}
System.out.println("Reading test queue...");
brass_test_queue.get(mqMsg, gmo);
uow = updateUOW(uow, mqMsg, brass_read_stat_queue);
// Testing the resumption of Group
ii++;
/* if(ii==4){
System.out.println("Explicitly breaking the loop .......");
break;
}*/
System.out.println("Completed reading test queue...");
System.out.println("gmo.groupStatus="+gmo.groupStatus);
if(gmo.groupStatus == MQConstants.MQGS_LAST_MSG_IN_GROUP)
getStatusQueue(brass_read_stat_queue);
} catch (MQException e) {
//e.printStackTrace();
if (e instanceof MQException) {
MQException mqe = (MQException) e;
if (mqe.completionCode == 2 && mqe.reasonCode == 2033) {
// no message to read
System.out.println("No message read");
}
if (mqe.completionCode == 2 && mqe.reasonCode == 2247) {
System.out.println("Setting GMO..");
gmo = new MQGetMessageOptions();
//gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;
//gmo.matchOptions = gmo.matchOptions + MQConstants.MQMO_MATCH_GROUP_ID;
checkGmo=true;
}
}
}
} while (gmo.groupStatus != MQConstants.MQGS_LAST_MSG_IN_GROUP);
// Close the queue and qmanager...
closeQueues();
qMgr.disconnect();
//}while(true);
}catch (Exception ex){
System.out.println("An error occurred while reading the message " + ex.getMessage());
ex.printStackTrace();
}
}
}
|
|
|
Back to top |
|
 |
|
|
 |
|
Page 1 of 1 |
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum
|
|
|
|