|
零售行业中,数据传输非常重要。本文介绍了一种基于IBM WebSphere MQ 和WebSphere MQ Everyplace的有效可靠的零售行业数据传输解决方案,不仅将开发人员从后台的复杂数据传输中解脱出来,还大量简化项目实施过程中的配置工作。
引言
在零售行业的解决方案中,在门店,总店之间进行数据传输是非常普遍和重要的。为了保证数据传输的实时性,可靠性,开发人员往往花大量的时间和精力在考虑后台网络传输的各种发生情况,比如传输过程中的网络故障等,而且需要找到一种有效可靠的数据自动重传的方式。
这篇文章介绍了一种基于IBM WebSphere MQ 和WebSphere MQ Everyplace的有效的可靠的数据传输的解决方案。在零售行业中,总店需要和各个门店进行通信,数据量相对较大,因此,我们在总店部署MQ和MQSeries Everyplace Server。而由于门店的通信量比较少,而且同时考虑到门店的数目比较多,对应的店内POS机的硬件设施的条件有限,所以我们在门店部署MQSeries Everyplace client。通过本文介绍的这个解决方案,开发人员可以从后台的复杂数据传输中解脱出来,此外,由于所有数据传输需要的MQ Everyplace的构件和通道都会由解决方案中的Java应用自动创立,可以大量简化项目实施过程中的配置工作。
该解决方案来自于零售行业数据传输的经验总结,已经被使用在多个零售项目上,并且获得了客户的好评。解决方案分为两部分,第一部分即本文提供了对该解决方案的大致介绍,以及介绍如何通过Java应用创建从数据发送源到目的地之间所需的所有构件和通道。第二部分介绍了门店与总店之间的数据传输,以及该解决方案的特性。
1 介绍
该数据传输的解决方案分为两部分,第一部分提供了对该解决方案的大致介绍,以及介绍如何通过Java应用创建从数据发送源到目的地之间所需的所有构件和通道。本文是第二部分,除了详细介绍数据传输,还会介绍可选的数据传输的多种模式,如何对大量的MQ Everyplace客户端进行统一便捷的管理,如何设置消息的过期时间,取出消息的多种方式,事务管理等。
2 准备
在进行第二部分之间,我推荐你先读该解决方案的第一部分: “零售行业的数据传输解决方案 第一部分:创建构件和通道”。在第一部分中会介绍在总店和门店中如何建立构件和通道的。
3 特性介绍
3.1 门店、总店之间的消息传输
3.1.1 从总店传输消息到门店
下面是从总店的MQ发送消息到门店的MQ Everyplace
1.打开总店的MQ中的远程队列
请参看下面的Tag11标明处的示例代码
2.创建MQMessage 消息对象,把要发送的消息写入其中
请参看下面的Tag12标明处的示例代码
3.发送消息
请参看下面的Tag13标明处的示例代码
示例代码:
……
- private static void sendMessage(String hostname,int port,String channelName,String qMName,int CCSID,String message,String qName) throws MQException
- {try{
- //Tag 11 : 打开总店的MQ中的远程队列
- MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES);
- MQEnvironment.hostname=hostname;
- MQEnvironment.port= port;
- MQEnvironment.channel=channelName;
- MQEnvironment.CCSID=CCSID;
- qMqr=new MQQueueManager(qMName);
- int openOptions=MQC.MQOO_OUTPUT;
- system_default_remote_queue=qMqr.accessQueue(qName,openOptions);
- //Tag 12 : 创建MQMessage 消息对象,把要发送的消息写入其中。
- MQMessage mqMessage = new MQMessage();
- try { mqMessage.writeUTF(message);}
- catch (IOException e) {
- e.printStackTrace();}
- //Tag 13 : 发送消息
- MQPutMessageOptions pmo = new MQPutMessageOptions();
- system_default_remote_queue.put(mqMessage,pmo);
- }catch(MQException mqe)
- {throw mqe;}}}
3.1.2从门店发送消息到总店
下面是从门店的MQ Everyplace发送消息到总店的MQ
1. 创建 MQeMsgObject 对象, 把要发送的消息写入其中
请参看下面的Tag14标明处的示例代码
2. 发送消息, 调用 MQeQueueManager 的API : putMessage(),其中定义发送目的地的queue manager的名字,以及门店中Asynchronous Queue 的名字
请参看下面的Tag15标明处的示例代码
示例代码:
- private static void putMessage(MQeQueueManager client,String message, String qMName, String qName) {
- MQeMsgObject msg;
- try {
- /*Tag 14 : 创建 MQeMsgObject 对象, 把要发送的消息写入其中*/
- msg=new MQeMQMsgObject();
- msg.putAscii("Message", message);
- /*Tag 15 : 发送消息, 调用 MQeQueueManager 的API : putMessage(),其中定义发送目的地的queue manager的名字,以及门店中Asynchronous Queue 的名字*/
- client.putMessage(qMName, qName, msg, null, 0);
- Thread.sleep(1000);
- client.triggerTransmission();
- } catch (Exception e) {
- …… } }
3.2 门店、总店之间的数据对象传输
3.2.1 门店和总店之间发送数据对象
在客户的应用中,他们常常从数据库中取出数据对象,做相应的数据处理后,希望可以直接把数据对象发送给目的地,这样,应用无需把数据对象转化为文件的格式,这样可以提高整个系统的性能。当然,如果客户希望以文件的模式进行传输,该解决方案也是支持的。
3.2.1.1 从总店向门店直接发送数据对象
下面是从总店的MQ发送数据对象到门店的MQ Everyplace
1.打开总店的MQ中的远程队列
请参看下面的Tag16标明处的示例代码
2.把传输的对象转化为bytes的格式.
请参看下面的Tag17标明处的示例代码
3. 发送消息
请参看下面的Tag18标明处的示例代码
示例代码:
- ……
- int openOptions = MQC.MQOO_OUTPUT;
- system_default_remote_queue =
- //Tag 16 : 打开总店的MQ中的远程队列.
- qMqr.accessQueue("FREDAPP.Q", openOptions, null, null, null);
- //Tag 17 : 把传输的对象转化为bytes的格式.
- MQMessage retrievedMessage = new MQMessage();
- TranObj trnObj = new TranObj();
- ……
- byte[] data = objectToBytes(trnObj);
- //Tag 18: 发送消息
- retrievedMessage.write(data);
- MQPutMessageOptions pmo = new MQPutMessageOptions();
- system_default_remote_queue.put(retrievedMessage, pmo);
- system_default_remote_queue.close();
- …}
- private static byte[] objectToBytes(TranObj obj) throws IOException {
- //Tag 17
- java.io.ObjectOutputStream out;
- java.io.ByteArrayOutputStream bs;
- bs = new java.io.ByteArrayOutputStream();
- out = new java.io.ObjectOutputStream(bs);
- out.writeObject(obj);
- out.close();
- return bs.toByteArray();
- }
3.2.1.2 从门店向总店直接发送数据对象
1. 在门店创建MQeMQMsgObject 对象,用以传输数据对象
请参看下面的Tag19标明处的示例代码
2. 把传输的对象转化为bytes的格式.
请参看下面的Tag20标明处的示例代码
3.从门店的MQ Everyplace的 AsynQ 发送数据对象
请参看下面的Tag21标明处的示例代码
- ……
- /*Tag 19: 通过使用MQeMQMsgObject类,从MQ Everyplace发送数据对象给MQ */
- MQeMQMsgObject msg=new MQeMQMsgObject();
- //得到需要传输的数据对象
- TranObj obj = new TranObj();
- ……
- //Tag 20 把对象转化为Bytes;
- byte[] data = objectToBytes(obj);
- msg.setData(data);
- //Tag 21: 在门店的MQ Everyplace,从AsynQ发送数据
- client.putMessage("BIGMQ", "FRED.Q", msg, null, 0);
- client.triggerTransmission();
- …}
- private static byte[] objectToBytes(TranObj obj) throws IOException {
- //Tag 20
- java.io.ObjectOutputStream out;
- java.io.ByteArrayOutputStream bs;
- bs = new java.io.ByteArrayOutputStream ();
- out = new java.io.ObjectOutputStream (bs);
- out.writeObject (obj);
- out.close ();
- return bs.toByteArray ();
- }
3.2.2 接受数据对象
3.2.2.1 从总店的MQ的队列接受数据对象
1.在总店创建MQMessage对象用来接收数据对象。
请参看下面的Tag22标明处的示例代码
2.从总店的本地队列中取出消息
请参看下面的Tag23标明处的示例代码
3.把消息转化为对象,在接收方的客户应用中直接使用
请参看下面的Tag24标明处的示例代码
示例代码:
- /*Tag 22: 在总店创建MQMessage对象用来接收数据对象.*/
- MQMessage retrievedMessage=new MQMessage();
- MQGetMessageOptions gmo = new MQGetMessageOptions();
- //Tag 23: 从总店的本地队列中取出消息
- system_default_local_queue.get(retrievedMessage,gmo);
- /*Tag 24: 把消息转化为对象,在接收方的客户应用中直接使用*/
- byte[] data = new byte[retrievedMessage.getDataLength()];
- retrievedMessage.readFully(data);
- TranObj temp = bytesToTranObj(data);
- private static TranObj bytesToTranObj(byte[] bytes) throws IOException, ClassNotFoundException {
- TranObj obj = null;
- java.io.ObjectInputStream is;
- java.io.ByteArrayInputStream bi;
- bi = new ByteArrayInputStream(bytes);
- is = new java.io.ObjectInputStream(bi);
- obj = (TranObj) is.readObject();
- is.close ();
- bi.close ();
- return obj; }
3.2.2.2 从门店的MQ Everyplace接收数据对象
1.打开queue manager: AsyncClientToMQ1的本地队列:SYSTEM.DEFAULT.LOCAL.QUEUE。
请参看下面的Tag25标明处的示例代码
2. 把消息转化为对象,在客户应用里直接使用。
请参看下面的Tag26标明处的示例代码
示例代码:
- MQeMQMsgObject msg;
- try {
- /* Tag 25 : 打开queue manager: AsyncClientToMQ1的本地队列:SYSTEM.DEFAULT.LOCAL.QUEUE */
- msg = (MQeMQMsgObject) client.getMessage("AsyncClientToMQ1", "SYSTEM.DEFAULT.LOCAL.QUEUE", null, null, 0);
- //Tag 31
- byte[] data = msg.getData();//ArrayOfByte("Message");
- //Tag 26: 把消息转化为对象
- mqe.util.TranObj obj = bytesToTranObj(data);//msg.getObject("Message");
- …
- }
- private static mqe.util.TranObj bytesToTranObj(byte[] bytes) throws IOException, ClassNotFoundException {
- ……}
3.3 配置 WebSphere MQ 以大规模部署 WebSphere MQ Everyplace 设备
当门店的数目及其之多时,比如上千个门店。如果每个门店都在总店的MQ上对应有相应的远程队列,那么管理总店MQ上的远程队列会是一件很复杂的事情。下面将告诉你如何解决这一问题:通过在总店只建立唯一的远程队列,该远程队列指向gateway上的本地队列,所有发给门店的消息会先到达gateway.而真实的消息目的地会置于消息体内.当消息到达gateway后,gateway会根据消息体中指定的消息目的地把消息发到对应的queue manager.
通过该解决方法,使得避免了为每个门店的queue manager建立远程队列。
此解决方法包括:
- 将消息发送到网关队列管理器,而不是目标队列管理器
- 对消息中的目标队列管理器和队列进行编码
- 拦截 MQ 传输队列和网关队列管理器之间的消息
- 替换 MQ 传输标头中的目标队列管理器和队列,以使网关不知道已发生过替换。替换是使用通道接收退出(MQ 客户端的一个功能)完成的。
图 1 解决方案体系结构
对于进一步的具体配置,请参看Dave Locke 和Lakshman Yatawara的文章:Configuring WebSphere MQ for large-scale deployment of WebSphere MQ Everyplace devices
3.4 取出消息的两种方法
1) 取出消息并删除它
1. 从Queue Manager 中取出消息并删除它
请参看下面的Tag27标明处的示例代码
2. 把传输的bytes转化成数据对象
请参看下面的Tag28标明处的示例代码
- private static void getObject(MQeQueueManager client) {
- MQeMQMsgObject msg;
- try {
- msg = (MQeMQMsgObject) client.getMessage("AsyncClientToMQ1", "SYSTEM.DEFAULT.LOCAL.QUEUE", null, null, 0); //Tag 27 : 从Queue Manager 中取出消息并删除它
- byte[] data = msg.getData();
- //Tag 28: 把传输的bytes转化成数据对象
- mqe.util.TranObj obj = bytesToTranObj(newData);
- …}
- private static mqe.util.TranObj bytesToTranObj(byte[] bytes) throws IOException, ClassNotFoundException {
- //Tag 28: 把传输的bytes转化成数据对象
- mqe.util.TranObj obj = null;
- java.io.ObjectInputStream is;
- java.io.ByteArrayInputStream bi;
- bi = new ByteArrayInputStream(bytes);
- is = new java.io.ObjectInputStream(bi);
- obj = (mqe.util.TranObj) is.readObject();
- is.close ();
- bi.close ();
- return obj;
- }
2) 仅仅读出消息,但并不删除它。你可以自行控制删除消息的时间
1. 读出消息集合
请参看下面的Tag29标明处的示例代码
2 . 从集合中取出数据对象
请参看下面的Tag30标明处的示例代码
- private static void getMessageByBrowser(MQeQueueManager client) {
- MQeMsgObject msg;
- try {
- //Tag 29: 读出消息集合
- MQeEnumeration msgEnum = null;
- msgEnum =client.browseMessages("AsyncClientToMQ","FREDAPP.Q",null, null,false);
- //Tag 30: 从集合中取出数据对象
- while (msgEnum.hasMoreElements()) {
- msg = (MQeMsgObject) msgEnum.nextElement();
- ……
- }
3.5 自动清除无用的消息
消息可以设置一个过期时间间隔,如果消息在队列中保持的时间超过定义的这个时间间隔,那么消息会被queue manager自动删除掉。
1. 设置MQ 消息的过期时间
- //该MQ消息将在120秒后过期
- retrievedMessage.expiry=120;
2. 为MQ Everyplace 消息设置过期时间
1)设置当某一段时间间隔后,消息过期
- //该MQ Everyplace消息将在20秒后过期
- msg.putInt( MQe.Msg_ExpireTime, 20000 );
2) 设置在某个具体的时间点,消息过期
- //该消息将在2007/08/15日的15:25分过期
- /* 创建一个新的消息 */
- MQeMsgObject msg = new MQeMsgObject();
- …
- Calendar calendar = Calendar.getInstance();
- calendar.set( 2007, 08, 15, 15, 25 );
- Date expiryTime = calendar.getTime();
- /* 给消息设置过期时间 */
- msg.putLong( MQe.Msg_ExpireTime, expiryTime.getTime() );
- /* 将消息放入队列 */
- client.putMessage("BIGMQ", "FRED.Q", msg, null, 0);
4. 解决方案特性总结
- 发送一次且仅发送一次
- 数据传输的准确性和精确性
- 如何对大量的MQ Everyplace客户端进行统一便捷的管理
- 可选的多种数据传输模式
- 取出消息的多种方式
- 事务管理
- 可增加规则来处理错误信息
- 自动清除无效消息
- 负载均衡以及解决单点故障
4.1 发送一次且仅发送一次
MQ和MQ Everyplace都有保证数据传输一次且仅有一次的特性。
1) 当你发送一个消息,你需要确保这个消息能完整安全的到达目的地。即便发送过程中网络出现了故障,数据也会在网络恢复后被MQ或者MQ Everyplace自动重传,而且保证数据发送一次且仅发送一次.
2) 当你发送完消息时,即便目标Server down机了,也就是说目标Server不处于侦听状态了,数据也会在目标Server恢复后被MQ或者MQ Everyplace自动重传,而且保证数据发送一次且仅发送一次..
4.2 数据的准确性和精确性
传输的数据的准确性和精确性是由MQ和MQ Everyplace的特性决定的,如果发送的数据包在传输的过程中发生了数据包丢失的问题,那么MQ或者MQ Everyplace会认为传输失败,会自动再重传消息的。
4.3 如何对大量的MQ Everyplace客户端进行统一便捷的管理
如果门店的数目很多,如果每个门店都在总店的MQ上对应有相应的远程队列,那么管理总店MQ上的远程队列会是一件很麻烦的事情。该解决方案只需要在总店建立唯一的远程队列,该远程队列指向gateway上的本地队列,所有发给门店的消息会先到达gateway.而真实的消息目的地会置于消息体内.当消息到达gateway后,gateway会根据消息体中指定的消息目的地把消息发到对应的queue manager.
通过该解决方法,使得避免了为每个门店的queue manager建立远程队列,以方便总店的管理。
4.4 传输文件或者直接传输数据对象
可选择的多种传输模式,你可以选择文件传输,也可以选择直接数据对象的传输。在客户的应用中,他们常常从数据库中取出数据对象,做相应的数据处理后,希望可以直接把数据对象发送给目的地,这样,应用无需把数据对象转化为文件的格式,这样可以提高整个系统的性能。
4.5 取出消息的两种方法
1) 取出消息并删除它
2)读出消息,但并不删除它。你可以自行控制删除消息的时间
4.6 事务管理
MQ的事务处理功能是指:它不仅能够作为资源管理者管理自身的资源,同时能够作为资源协调者,同其他资源管理者如数据库等协同工作,即在一个全局工作单元中,保持MQ自身资源和数据库资源的数据一致性。
MQ的事物处理功能分为两类:
1) 本地工作单元(Local unit of work),它是指参与事务处理的参与者只有MQ本身,例如,我们可以将两个put操作和一个get操作作为一个事物,通过本地工作单元支持,做到这三个动作要么全部成功要么全部回滚。
2) 全局工作单元(Global unit of work),它是指参与事务处理的参与者除了MQ之外,还有其他的资源管理者,如关系型数据库等,例如,我们可以将一个队列的get操作和数据库的写操作作为一个事物,通过本地工作单元支持,做到这两个动作要么全部成功要么全部回滚。
MQ Everyplace也可提供本地工作单元的事务处理能力,虽然它本身不提供全局事务处理,但是通过利用自行控制删除MQ Everyplace消息的时间,可以通过开发代码来实现它的全局事务处理能力。
4.7 自动清除无用的消息
消息可以设置一个过期时间间隔,如果消息在队列中保持的时间超过定义的这个时间间隔,那么消息会被queue manager自动删除掉
5 总结
本文介绍了一种有效的实时数据传输的解决方案,通过该解决方案,即便网络环境不佳的情况下,仍能保证门店和总店之间,门店和门店之间进行可靠而稳定数据传输。在数据传输的过程中,发送方无需关注接收方是否处于侦听状态(比如在线)或者网络是否顺畅,解决方案会保证数据发送一次且仅发送一次。
- 解决方案的特性
- 发送一次且仅发送一次
- 数据传输的准确性和精确性
- 如何对大量的MQ Everyplace客户端进行统一便捷的管理
- 可选的多种数据传输模式
- 可选择的对部分目的地进行传输
- 取出消息的多种方式
- 事务管理
- 可增加规则来处理错误信息
- 自动清除无效消息
- 负载均衡以及解决单点故障
优势
该解决方案,可帮助开发人员从后台的复杂数据传输中解脱出来,此外,由于所有数据传输需要的MQ和MQ Everyplace的构件和通道都会由解决方案中的Java应用自动创立,可以大量简化项目实施过程中的配置工作。
参考资料
-
Configuring WebSphere MQ for large-scale deployment of WebSphere MQ Everyplace devices介绍当涉及大量 MQ Everyplace 设备时如何简化 WebSphere MQ 的配置。
-
配置 WebSphere MQ Everyplace Gateway V2 以获取最佳吞吐量提供在 WebSphere MQ 与 WebSphere MQ Everyplace 之间建立网关的指导和范例代码。
-
Websphere MQ Everyplace 和 Websphere MQ集成实践——描述一个用MQe与WMQ集成移动设备和企业内部系统。
- WebSphere MQ 信息中心:更多产品信息。
- WebSphere MQ Everyplace信息中心:更多产品信息。
|