|
前言
最近碰到很多这样的案例,即需要实现系统和MQ的互联。以前已经有一些文章论述了如何使用WebLogic Server的JMS Bridge来连接MQ,也有使用WebLogic Server内置的Foreign JMS Server来实现的,但是这需要编写很多代码如MDB来侦听消息,JMS客户端程序来发送消息等。其实BEA的SOA基础架构软件企业服务总线AquaLogic ESB能方便实现这样的功能,且无需编程。利用ESB还有一个好处是可以直接使用XML接口,而前者得直接编程面对消息解析。 文章还对如何实现传递中文XML做了详细的解释,同时也加入了如何利用XQuery技术快速实现数据格式的转换。
基于ESB的MQ连接方案
概述
BEA ESB 2.6RP1加入了Native MQ Transport的新特性,使得BEA ESB可以直接与IBM MQ进行消息交换。可以通过Business 服务向MQ发送消息,支持单向发送和发送等待返回两种模式;可以通过Proxy服务从MQ取消息,支持单向取消息和取消息消费以后再放入MQ返回队列两种模式。
使用ESB Native MQ Transport有几个好处:
- 可以读取和产生MQ Message,如果直接使用MQ JMS接口无法直接设置某些header属性
- 支持发送和收取MQ receipt message
- 内置在ESB,无需额外配置如JMS Bridge或Foreign JMS Server等。
关键特性:
- 支持Inbound和Outbound连接
- 支持IBM MQ5.3和6.0
- 可以处理所有MQMD(MQ message descriptor)头信息
- 支持Biding模式(ESB与MQ装在相同机器上)和TCP模式(ESB可与MQ装在不同机器上)
- 支持单向和双向SSL(用于TCP模式)
Native MQ Transport目前支持消息服务(Message service)和任意XML服务(Any xml serivce),根据不同业务场景可以选择合适的服务模式。任意XML服务支持将XML作为服务的调用和返回接口,对于传递数据非常方便,下面的例子即是讲解如何使用任意XML服务把IBM MQ集成到BEA AquaLogic ESB上。
场景描述

- 代理服务Send2MQ_withResponse发起请求,将xml request消息路由到业务服务Send2MQ_withReponse_Business
- Business服务Send2MQ_withReponse_Business使用Native MQ Transport将消息路由到MQ接收队列esb.mqReceiveQueue,随后即刻侦听esb.mqSendQueue队列。
- 代理服务MQMessageResponder,从esb.mqReceiveQueue取消息,进行业务处理和数据格式转换,将结果xml发送到esb.mqSendQueue
- 业务服务Send2MQ_withReponse_Business侦听的esb.mqSendQueue队列有了先前消息的返回结果,该服务接收到返回消息
- 代理服务Send2MQ_withResponse得到经过处理的返回结果
具体步骤
1、设置 MQ Server
(1)创建队列管理器QM_YourIP(如QM_jizhou01),创建Listner监听1414端口, 创建服务器连接通道(如BRIDGE.CHANNEL传输协议TCP)
(2)创建本地队列esb.mqReceiveQueue, esb.mqSendQueue, 选择“持久”。如图:

(3)编辑JMSAdmin.config,如下:
#INITIAL_CONTEXT_FACTORY=com.sun.jndi.ldap.LdapCtxFactory
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.ejs.ns.jndi.CNInitialContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.websphere.naming.WsnInitialContextFactory
…
#PROVIDER_URL=ldap://polaris/o=ibm,c=us
PROVIDER_URL=file:/C:/bea/IBM/JNDI
#PROVIDER_URL=iiop://localhost/
…
SECURITY_AUTHENTICATION=none
… 运行JMSAdmin.bat
创建binding的JNDI定义:
DEFINE XAQCF(esb.mqQCFXA)
DEFINE Q(esb.mqSendQueue) QUEUE(esb.mqSendQueue) QMGR(QM_jizhou01)
DEFINE Q(esb.mqReceiveQueue) QUEUE(esb. mqReceiveQueue) QMGR(QM_jizhou01)
DIS CTX
END
…
2、设置 BEA AquaLogic ESB:
(1) 登录ESB 控制台hhttp://localhost:7021/sbconsole
创建Project和文件夹,如图: 
(2) 在resources文件夹下创建 MQ Connection
 (注意,需要先取得编辑锁,左上角点击Edit) 然后定义MQ Connection:名称为MQResource, 如图:  我们看到,这里没有更多的属性设置,所以都是使用MQ默认设置的,如CCSID为1381。
(3) 在Business Services目录下创建Send2MQ_withResponse_Business,
选择Any XML Service, 选择mq作为连接协议,如图:  End Point URL为 mq://esb.mqReceiveQueue?conn=MQTransport/resources/MQResource
具体配置如图: 
注意返回选择Bytes作为消息类型,MQCorrelationID为返回模式,返回等待的结果队列为esb.mqSendQueue。
(4) 在Proxy Services文件夹下创建Proxy Servcie,名称为:Send2MQwithResponseProxy,创建该代理的目的是为封装发送消息的业务服务,这样本来是直接调用MQ协议的服务变为基于http的服务。具体配置如图:
 为Proxy Service 添加Route节点:  编辑该节点,添加Action  路由服务选择刚才创建的Business服务Send2MQ_withResponse_Business, 
到这里,本来应该可以了;但是我们在测试中可以发现,在传输中文XML时发现到MQ队列里变成了乱码。经过分析,原来MQ服务器端的字符集设置一般为默认设置1381,而使用同样的设置发送中文就出现乱码了。UTF-8在MQ的字符集中的编号为1208,因此我们在把中文XML消息发送到MQ队列之前,要进行CCSID设置。方法就是在路由节点的Request动作里,即发送前设置。  然后选择Outbound Request, 记住调用Business服务是outbound对象。 设置header信息,如图:  我们可以看到BEA ESB在这方面是非常强大的,可以设置每一个header信息而无需编码。
测试: 先确认能否发送XML消息到MQ队列。点击测试Proxy服务的按钮,使用测试消息:
<?xml version="1.0" encoding="UTF-8"?>
<ns0:Request xmlns:ns0="hhttp://www.example.org/wlsRequestSchema">
<Item>
<companyID>001</companyID>
<companyName>公司</companyName>
<EmployeeCount>10</EmployeeCount>
<Total>199</Total>
</Item>
</ns0:Request>  点击执行,该测试程序会一直等待结果(因为你已经设置等待返回结果了)。我们转到MQ服务器端,看看消息是否已经到达。  可以看到,消息队列esb.mqReceiveQueue已经有了消息,且消息体正确反应出了中文信息。 至此,从ESB发送消息到MQ已经完成。下面,我们要设计如何从MQ队列中取消息,并如何消费消息。
(5) 在Proxy Services文件夹下创建Proxy 服务MQMessageResponder,创建该服务的目的是从MQ队列中取消息,然后进行处理,再把经过处理的消息放回指定队列中。
创建Any XML Service,mq为传输协议,具体如图: 
此时,该服务只是从esb.mqReceiveQueue中取出消息,没有做如何处理又放入了esb.mqSendQueue队列。 我们可以做一个简单的格式转换处理,即将源xml映射到结果xml中,并增添一个字段MQProcessResult,值为“MQ处理完毕”  实际上,这个代理服务是模拟了在MQ服务器端对消息的处理,也可以理解为BEA ESB可以直接消费MQ队列的消息。 用BEA提供的Workshp工具,新建一个XQueryProject  创建两个xml schema。一个是源schema:
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema targetNamespace="hhttp://www.example.org/wlsRequestSchema" xmlns:xs="hhttp://www.w3.org/2001/XMLSchema">
<xs:element name="Request">
<xs:complexType >
<xs:sequence>
<xs:element name="Item" minOccurs="0" maxOccurs="unbounded">
<xs:complexType >
<xs:sequence>
<xs:element name="companyID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="companyName" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="EmployeeCount" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="Total" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
另外是目标schema:
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema targetNamespace="hhttp://www.example.org/wlsResponseSchema" xmlns:xs="hhttp://www.w3.org/2001/XMLSchema">
<xs:element name="Response">
<xs:complexType >
<xs:sequence>
<xs:element name="Item" minOccurs="0" maxOccurs="unbounded">
<xs:complexType >
<xs:sequence>
<xs:element name="companyID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="companyName" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="EmployeeCount" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="Total" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="MQProcessResult" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
再新建一个XQuery转换myXQuery.xq,具体如图  设置MQProcessResult为字符串常量:“MQ处理完毕”。 myXQuery.xq的源码为:
declare namespace xf = "hhttp://tempuri.org/MyXQueryProject/xquery/myXQuery/";
declare namespace ns-1 = "hhttp://www.example.org/wlsRequestSchema";
declare namespace ns-2 = "hhttp://www.example.org/wlsResponseSchema"; declare function xf:myXQuery($request1 as element(ns-1:Request))
as element(ns-2:Response) {
<ns-2:Response>
{
for $Item in $request1/Item
return
<Item>
<companyID>{ data($Item/companyID) }</companyID>
<companyName>{ data($Item/companyName) }</companyName>
<EmployeeCount>{ data($Item/EmployeeCount) }</EmployeeCount>
{
for $Total in $Item/Total
return
<Total>{ data($Total) }</Total>
}
<MQProcessResult>MQ处理完毕</MQProcessResult>
</Item>
}
</ns-2:Response>
}; declare variable $request1 as element(ns-1:Request) external; xf:myXQuery($request1)
在ESB控制台中,引入这两个schema和xquery。如图导入到文件夹 Transformation ,如图: 
现在我们可以在MQMessageResponder中进行数据格式转换了。 如图,添加pipeline和请求节点  编辑state1,将$body/wls:Request内容赋值到变量request中,然后对request为参数调用myXQuery, 将返回的转换结果赋值到response变量中,再用response的内容代替$body/wls:Request。这样即起到了转换效果。如图:  其中第二个Assign 的XQuery调用的参数设置为: 
测试: 直接测试MQMessageResponder,如果能得到正确结果,则再测试Send2MQwithResponseProxy。 
总结
到这里,我们已经验证BEA ESB在处理与MQ的交互时是非常方便好用的,无需任何编码,且能方便设置消息的各种属性。BEA ESB的特性其实还远不止如此,一但服务封装完毕,就可以很多其他的传输接口来封装,比如我们可以设置File为传输协议的Proxy服务,然后在其内部配置路由,讲消息转发到往MQ发送消息的Business服务,这样我们就可以很容易地定制出这样的逻辑,即侦听某个文件夹,只要符合格式的xml文件到达,即可自动调用发送逻辑,将消息处理后发送到MQ,同时可以指定返回队列并异步侦听。一但MQ服务器端消息处理完毕,将消息转到指定队列后,该侦听逻辑即可获得处理过的消息。
|