使用WebSphere MQ Java和JMS API 对消息进行分组.doc

上传人:丁** 文档编号:1544269 上传时间:2019-10-25 格式:DOC 页数:10 大小:130KB
返回 下载 相关 举报
使用WebSphere MQ Java和JMS API 对消息进行分组.doc_第1页
第1页 / 共10页
使用WebSphere MQ Java和JMS API 对消息进行分组.doc_第2页
第2页 / 共10页
使用WebSphere MQ Java和JMS API 对消息进行分组.doc_第3页
第3页 / 共10页
点击查看更多>>
资源描述
转自http:/www.zuoyefeng.com/html/2006-09/212.htm本文介绍了 WebSphere MQ 中的消息组支持,以及如何利用该支持来提供逻辑消息排序和支持相关消息分组。并演示了如何使用 WebSphere MQ Java 类进行分组操作,和如何使用 JMS API 实现与此相同的功能。随后,本文还给出了一个建议解决方案,并说明了如何在 WebSphere Application Server 或其他 J2EE 应用服务器中异步接收消息组时如何应用此解决方案。消息组介绍IBM WebSphere MQ 并不能始终保证发送和接收应用程序间的消息的正确顺序。如果三条消息按照顺序 A B C 发送,可能不会按照相同的顺序到达(例如,如果中间网络将消息分布到集群中,然后再重新组合时)。但如果消息顺序对应用程序的正常工作非常重要又该如何呢?假设有这样的场景,消息 B 告知应用程序忽略前一个消息。如果消息以顺序 C B A 送达,则序列的意义将完全不同了。WebSphere MQ 通过消息分组来解决此问题。发送消息的应用程序可以指定其将消息 A、B 和 C 作为组的一部分发送。组中的每个消息都分配了一个序列号(从 1 开始)。然后,接收应用程序可以指定希望按照此逻辑顺序接收消息(与消息到达目的地的实际顺序相对)。现在,即使消息 B 或 C 首先到达,也不会将其立即传递给应用程序,因为它们的序列号不为 1。消息组还可用于另一个目的。有时候消息顺序可能并不重要,但可能要求将一个消息集合一起处理(在空间上和时间上)。例如,假定有一个应用程序在每次向在线购物车添加了物品后都会发送一条消息。购物车中的物品可能需要一起处理,或许要将其聚合到单个订单消息中。可以通过将消息放入到消息组中对此聚合进行管理。消息的接收者可以指定,在所有消息到达目的地之前,不希望接收组中的任何消息。在此场景中,在同一个位置接收所有消息也很重要。如果出于可伸缩性方面的原因,目的地有多个使用者,则务必将表示相同订单中的物品的所有消息发送到相同的使用者,而消息组就可以确保满足这一要求。消息组的概念与消息段不同,后者表示大型消息发送时被拆分为较小的消息,应在接收时将其重新组装为原始消息。消息组中的每个实体都是一个完整的消息。可以使用消息段对消息组内的消息进行拆分,但在本文中将不会考虑此选项。使用 WebSphere MQ Java API现在我们将讨论使用 WebSphere MQ Java API 发送和接收消息组的实际操作。发送消息组下面的清单 1 给出了使用 WebSphere MQ Java API 将包含五条消息的组发送到队列管理器 QM_host 上的队列 default 所需的代码:清单 1. 使用 WebSphere MQ Java API 发送消息组MQQueueManager queueManager = new MQQueueManager(QM_host);MQQueue queue = queueManager.accessQueue(default, MQC.MQOO_OUTPUT);MQPutMessageOptions pmo = new MQPutMessageOptions();pmo.options = MQC.MQPMO_LOGICAL_ORDER;for (int i = 1; i = 5; i+) MQMessage message = new MQMessage(); message.format = MQSTR; message.writeString(Message + i); if (i 5) message.messageFlags = MQC.MQMF_MSG_IN_GROUP; else message.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP; queue.put(message, pmo);queue.close();queueManager.disconnect();该示例首先连接到队列管理器,并打开用于进行输出的队列句柄。从消息组的角度而言,要注意的第一个重要方面是向 put 消息选项添加了约束 MQPMO_LOGICAL_ORDER。此值告知队列管理器,应用程序将把组中的每个消息按照序列顺序放入队列中,客户机在处理任何后续消息前会将一个组中的所有消息放置到其上。此代码随后循环五次,每次放置一个新消息。(消息格式设置为 MQSTR,因此我们可能稍后接收到 JMS 文本消息类型的消息。)对于前四条消息,设置了消息标志 MQMF_MSG_IN_GROUP,以指示相应的消息应属于当前组。第五条消息设置了消息标志 MQMF_LAST_MSG_IN_GROUP,以指示该消息是组中的最后一条消息。下一次放置具有 MQMF_MSG_IN_GROUP 标志的消息时,将自动开始一个新的组。示例最后关闭队列句柄并从队列管理器断开,从而结束。运行此代码时,通过使用 WebSphere MQ Explorer 浏览组消息,可得到图 1 所示的结果:图 1. 在 WebSphere MQ Explorer 中浏览消息组为每条消息分配了相同的 24 位组标识符,且分别具有从 1 到 5 的逻辑序列号。使用 MQPMO_LOGICAL_ORDER put 消息选项纯粹为了方便起见。应用程序可能不会使用此标志,而采用显式设置组标识符和序列号的方式。如果消息不按顺序发出,或和其他消息组穿插着发送,则有必要采取后一种方式。仍然应设置消息标志来指示消息属于某个组以及是否为组中的最后一条消息。此机制的另一个可用场景为组中的消息分散在很长的时间内进行传递。可能出现这样的情况,应用程序使用逻辑消息排序发送组中的开头的若干消息,然后系统出现故障。当应用程序重新启动时,可以继续处理该消息组,能在不进行逻辑排序的情况下发送后面的消息,只要显式地将组标识符设置为前面的消息所使用的组标识符并使用后续序列标识符即可。此时,可以仍然对后续消息使用逻辑消息排序。队列管理器将随后继续使用相同的组标识符,并在每次递增序列号。可以将消息组与事务结合使用。如果第一个消息放置在事务下,则必须将所有使用相同队列句柄的所有其他消息都放置于事务下。不过,每个消息并不一定要在相同的事务中。接收消息组我们已经以组的形式发送了消息,接下来我们希望采用相同的顺序接收这些消息。下面的清单 2 给出了如何使用 WebSphere MQ Java API 完成此任务的示例:清单 2. 使用 WebSphere MQ Java API 接收消息组MQQueueManager queueManager = new MQQueueManager(QM_host);MQQueue queue = queueManager.accessQueue(default, MQC.MQOO_INPUT_AS_Q_DEF);MQGetMessageOptions gmo = new MQGetMessageOptions();gmo.options = MQC.MQGMO_LOGICAL_ORDER | MQC.MQGMO_ALL_MSGS_AVAILABLE;gmo.matchOptions = MQC.MQMO_NONE;MQMessage message = new MQMessage();do queue.get(message, gmo); int dataLength = retrievedMessage.getDataLength(); System.out.println(message.readStringOfCharLength(dataLength); gmo.matchOptions = MQC.MQMO_MATCH_GROUP_ID; while (gmo.groupStatus != MQC.MQGS_LAST_MSG_IN_GROUP);queue.close();queueManager.disconnect();和前面一样,该代码首先连接到队列管理器,并打开队列句柄,但这次是为了使用缺省队列定义接收消息。我们指定两个 get 消息选择:MQGMO_LOGICAL_ORDER 指示我们希望按照逻辑顺序接收消息,即,应该首先接收序列号为 1 的消息,然后是序列号为 2 的消息,依此类推。第二个选项 MQGMO_ALL_MSGS_AVAILABLE 指示在组中的所有消息可用前,我们不希望接收其中的任何消息。此选项可防止在开始处理组中的消息时却发现后续消息尚未发送或尚未到达的情况。对于第一个 get,我们指定不需要任何匹配选项准备接收任何组中的第一条消息。对于后续迭代,我们均指定 MQMO_MATCH_GROUP_ID 选项,以指示我们只希望接收具有匹配组标识符的消息。我们将为每个迭代使用相同的消息对象,因此,对于第二个 get,将包含所接收到的第一条消息的组标识符。每个 get 操作都将更新 get 消息选项的组状态字段。当设置为 MQGS_LAST_MSG_IN_GROUP 时,我们就知道已经接收到了组中的所有消息。和前面的清单中一样,请确保在完成时进行清理工作,即关闭队列句柄并从队列管理器断开。使用 WebSphere MQ JMS API规范版本本文中的 JMS 示例使用来自 JMS 1.1 的统一域接口。不过,可以对其进行重新编写,以使用早期 WebSphere MQ JMS 版本中提供的点到点或发布/订阅接口。类似地,可以使用 EJB 2.0 部署描述符和 JMS 1.0.2b 接口来在 J2EE 1.3 应用服务器中使用下载部分提供的 MDB 示例。此时,您可能会问,为什么这些示例都使用 WebSphere MQ Java API 在这个标准盛行的时代,我们是不是应该使用 Java Message Service (JMS) API 不过,对于大部分标准规范,JMS 代表了消息传递系统所支持的功能的最低要求。因此,并非 WebSphere MQ 所支持的所有行为都可通过此 API 进行表示,而消息组正是其中之一。JMS 规范确实定义了两个分别名为 JMSXGroupID 和 JMSXSeqNum 的属性,并指定这两个属性分别表示消息所属的组的标识符和在该组中的序列号。不过,JMS 规范未提供任何使用这些属性的支持。不过,这并非十分绝对通过采用一些补救方法,仍然可以通过使用这些属性来复现现有行为。发送消息组首先,让我们看看发送应用程序。正如上面提到的,put 消息选项 MQPMO_LOGICAL_ORDER 仅是一个队列管理器指令,用于自动分配消息组标识符和序列号。下面的清单 3 演示了如何在 JMS API 缺少此选项的情况下显式地设置这些属性。清单 3. 使用 WebSphere MQ JMS API 发送消息组MQConnectionFactory factory = new MQConnectionFactory();factory.setQueueManager(QM_host)MQQueue destination = new MQQueue(default);destination.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);Connection connection = factory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer(destination);String groupId = ID: + new BigInteger(24 * 8, new Random().toString(16);for (int i = 1; i = 5; i+) TextMessage message = session.createTextMessage(); message.setStringProperty(JMSXGroupID, groupId); message.setIntProperty(JMSXGroupSeq, i); if (i = 5) message.setBooleanProperty(JMS_IBM_Last_Msg_In_Group, true); message.setText(Message + i); producer.send(message);connection.close();该示例首先采用编程的方式构造一个连接工厂和目的地。这些被管理的对象也可以从存储库(如 Java Naming and Directory InterfaceJNDI)获得。接下来,我们创建用于发送消息的常见 JMS 构件,然后采用 24 字节的 BigInteger 随机值,并将其转换为十六进制字符串,从而生成组标识符。对于消息标识符,WebSphere MQ JMS API 要求组标识符带有字符串 ID: 作为前缀然后,该代码进行迭代,以发送上述五条消息。组标识符设置为 JMSXGroupId 字符串属性,序列号设置为 JMSXGroupSeq 整数属性。API 将假定设置了组标识符,且消息属于组的一部分。因此,所剩下的部分就是如何指示组中的最后一条消息,我们通过将 Boolean 型属性 JMS_IBM_Last_Msg_In_Group 设置为 True 来进行指示。如果运行此代码,然后使用 WebSphere MQ Explorer 浏览结果,将会看到消息部署描述符属性已像前面一样进行了设置。我们现在应该可以运行最初的 WebSphere MQ Java 接收程序来拾取此消息组。为此,我们需和上面的清单 3 中一样,将目标客户机指定为 MQJMS_CLIENT_NONJMS_MQ,从而确保不会向消息正文添加 RFH2 Header(此 Header 可能会给非 JMS 客户机造成混淆)。接收消息组不过,使用 JMS 复现接收消息组的行为并不像这样简单。使用消息选择器进行逻辑排序相当简单。首先,我们使用选择器来匹配任何序列号为 1 的消息,获得此消息后,我们将确定其所属的组。然后,我们将设置第二个选择器,以执行序列号 2 和相应的组标识符。我们将继续递增序列号,直到收到一条设置了 JMS_IBM_Last_Msg_In_Group 属性的消息为止。其中难点在于如何复现 MQGMO_ALL_MSGS_AVAILABLE 选项的行为。清单 4 给出了一个可能的解决方案:清单 4. 使用 WebSphere MQ JMS API 接收消息组MQConnectionFactory factory = new MQConnectionFactory();factory.setQueueManager(QM_host)MQQueue destination = new MQQueue(default);Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);MessageConsumer lastMessageConsumer = session.createConsumer(destination, JMS_IBM_Last_Msg_In_Group=TRUE);TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait();lastMessageConsumer.close();if (lastMessage != null) int groupSize = lastMessage.getIntProperty(JMSXGroupSeq); String groupId = lastMessage.getStringProperty(JMSXGroupID); boolean failed = false; for (int i = 1; (i groupSize) & !failed; i+) MessageConsumer consumer = session.createConsumer(destination, JMSXGroupID= + groupId + AND JMSXGroupSeq= + i); TextMessage message = (TextMessage)consumer.receiveNoWait(); if (message != null) System.out.println(message.getText(); else failed = true; consumer.close(); if (failed) session.rollback(); else System.out.println(lastMessage.getText(); session.commit(); connection.close();和前面一样,我们首先创建使用消息所必需的所有 JMS 资源。此时,我们必须启动连接;否则就完全不能接收到任何消息。此解决方案的关键是,我们将尝试首先接收组中的最后一条消息。为此,我们要创建一个具有 JMS_IBM_Last_Msg_In_Group=TRUE 消息选择器的使用者。如果消息传递拓扑无法保证顺序,则无法无法百分之百地确定组中所有其他消息已经达到。稍后我们将讨论如何处理消息未全部达到的情况。如果接收到了组中的最后一条消息,则可以获取组标识符,并使用其序列号来确定组的大小。有了此信息后,我们可以进行迭代,尝试从第一个序列号开始接收组中的所有其他消息。为此,我们将在每个迭代上设置一个新使用者,以根据组标识符和所需的序列号进行选择。现在,如果由于某种原因而导致消息尚未达到,我们将从 receiveNoWait 方法得到 null。此时,我们将设置 failed 标志,该标志将导致退出循环。回头看一下我们创建 JMS 会话的代码行。您将看到,与前一个示例代码不同,第一个参数设置为 true,以指示此会话上执行的发送和接收操作应作为本地事务的一部分执行。这意味着,如果由于尚未接收到某条消息而已经设置了 failed 标志,则可以回滚事务,并将组中的所有其他消息返回给队列。如果成功接收到了所有消息,则必须记得提交事务。否则,当关闭连接时,事务将会被回滚。不过,在我们重复地回滚某个不完整的组时,目的地上可能有其他完整的组可用(但尚未达到进行处理的位置)。这个问题可以通过将不完整的组临时从消息选择器中排除得到解决,也可以将该组复制到内存中或持久性存储中,以便稍后完成。下一部分将讨论如何在一个特定的环境应用服务器中处理此问题。在 J2EE 应用服务器中接收消息组上面的清单 4 演示了如何使用 WebSphere MQ JMS API 接收消息组。但在应用服务器环境中,消息接收通常是通过消息驱动 Bean(Message-driven Bean,MDB)执行的。如何对我们的方法进行调整,才能使其在此情况下也能正常工作呢?仍然可以为 MDB 配置消息选择器,但将由管理员采用静态方式定义。因此,我们将使用 JMS_IBM_Last_Msg_In_Group=TRUE 配置此选择器,以便使 MDB 始终为组中的最后一条消息。我们需要首先作为事务的一部分接收此消息,因此 MDB 也应使用事务属性 RequiresNew 配置为使用容器管理事务(Container Managed Transactions,CMT)。以下是我们的 MDB 的 onMessage 方法的代码:清单 5. 用于接收消息组的消息驱动 Beanpublic void onMessage(Message lastMessage) InitialContext context = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(java:comp/env/jms/factory); Destination destination = (Destination) context.lookup(java:comp/env/jms/destination); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); int groupSize = lastMessage.getIntProperty(JMSXGroupSeq); String groupId = lastMessage.getStringProperty(JMSXGroupID); boolean failed = false; for (int i = 1; (i groupSize) & !failed; i+) MessageConsumer consumer = session.createConsumer(destination, JMSXGroupID= + groupId + AND JMSXGroupSeq= + i); TextMessage message = (TextMessage) consumer.receiveNoWait(); if (message != null) System.out.println(message.getText(); else failed = true; consumer.close(); if (failed) _context.setRollbackOnly(); Thread.sleep(5000); else System.out.println(TextMessage) lastMessage).getText(); connection.close();现在,示例采用适合 J2EE 环境的方式从 JNDI 获取连接工厂和目的地。创建 JMS 会话时创建的事务参数此时将被忽略所有消息都将作为容器所启动的全局事务的一部分进行接收。如果接收其中的某个消息失败,我们会标记全局事务进行回滚,然后等候 5 秒。由于在方法退出前,事务不会实际进行回滚,已接收的消息将在目的地上处于锁定状态,从而防止其他 MDB 实例尝试接收下这个不完整的组。这些其他实例可以转而尝试从目的地接收其他组。当该方法最终退出时,事务将回滚,且可再次对这些消息进行接收(此时可能整个组已经全部达到)。在此环境中,请尝试对试图接收组的次数进行限制,因为每次回滚事务,被接收的消息的消息交付计数已递增过一次。如果重复调用上述代码,此计数可能会达到目的地上配置的搁置阈值,此时这些消息就将重新进行排队,暂时不再能进行接收。如果成功接收了整个消息组,则方法将完成,容器将提交事务,从而从目的地中删除这些消息。
展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > 管理文书 > 各类标准


copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!