1.1 ????? JMS 簡(jiǎn)介
???????JMS 的全稱是 Java Message Service ,即 Java 消息服務(wù)。它主要用于在生產(chǎn)者和消費(fèi)者之間進(jìn)行消息傳遞,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,而消費(fèi)者負(fù)責(zé)接收消息。把它應(yīng)用到實(shí)際的業(yè)務(wù)需求中的話我們可以在特定的時(shí)候利用生產(chǎn)者生成一消息,并進(jìn)行發(fā)送,對(duì)應(yīng)的消費(fèi)者在接收到對(duì)應(yīng)的消息后去完成對(duì)應(yīng)的業(yè)務(wù)邏輯。對(duì)于消息的傳遞有兩種類型,一種是點(diǎn)對(duì)點(diǎn)的,即一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者一一對(duì)應(yīng);另一種是發(fā)布 / 訂閱模式,即一個(gè)生產(chǎn)者產(chǎn)生消息并進(jìn)行發(fā)送后,可以由多個(gè)消費(fèi)者進(jìn)行接收。
1.2 ????? Spring 整合 JMS
??????? 對(duì) JMS 做了一個(gè)簡(jiǎn)要介紹之后,接下來(lái)就講一下 Spring 整合 JMS 的具體過(guò)程。 JMS 只是一個(gè)標(biāo)準(zhǔn),真正在使用它的時(shí)候我們需要有它的具體實(shí)現(xiàn),這里我們就使用 Apache 的 activeMQ 來(lái)作為它的實(shí)現(xiàn)。所使用的依賴?yán)? Maven 來(lái)進(jìn)行管理,具體依賴如下:
?
- < dependencies > ??
- ???????? < dependency > ??
- ???????????? < groupId > junit </ groupId > ??
- ???????????? < artifactId > junit </ artifactId > ??
- ???????????? < version > 4.10 </ version > ??
- ???????????? < scope > test </ scope > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-context </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-jms </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.springframework </ groupId > ??
- ???????????? < artifactId > spring-test </ artifactId > ??
- ???????????? < version > ${spring-version} </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > javax.annotation </ groupId > ??
- ???????????? < artifactId > jsr250-api </ artifactId > ??
- ???????????? < version > 1.0 </ version > ??
- ???????? </ dependency > ??
- ???????? < dependency > ??
- ???????????? < groupId > org.apache.activemq </ groupId > ??
- ???????????? < artifactId > activemq-core </ artifactId > ??
- ???????????? < version > 5.7.0 </ version > ??
- ???????? </ dependency > ??
- </ dependencies > ??
?
1.2.1??activeMQ準(zhǔn)備
??????? 既然是使用的 apache 的 activeMQ 作為 JMS 的實(shí)現(xiàn),那么首先我們應(yīng)該到 apache 官網(wǎng)上下載 activeMQ ( http://activemq.apache.org/download.html ),進(jìn)行解壓后運(yùn)行其 bin 目錄下面的 activemq.bat 文件啟動(dòng) activeMQ 。
1.2.2配置ConnectionFactory
???????ConnectionFactory 是用于產(chǎn)生到 JMS 服務(wù)器的鏈接的, Spring 為我們提供了多個(gè) ConnectionFactory ,有 SingleConnectionFactory 和 CachingConnectionFactory 。 SingleConnectionFactory 對(duì)于建立 JMS 服務(wù)器鏈接的請(qǐng)求會(huì)一直返回同一個(gè)鏈接,并且會(huì)忽略 Connection 的 close 方法調(diào)用。 CachingConnectionFactory 繼承了 SingleConnectionFactory ,所以它擁有 SingleConnectionFactory 的所有功能,同時(shí)它還新增了緩存功能,它可以緩存 Session 、 MessageProducer 和 MessageConsumer 。這里我們使用 SingleConnectionFactory 來(lái)作為示例。
- < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" /> ??
?
??????? 這樣就定義好產(chǎn)生 JMS 服務(wù)器鏈接的 ConnectionFactory 了嗎?答案是非也。 Spring 提供的 ConnectionFactory 只是 Spring 用于管理 ConnectionFactory 的,真正產(chǎn)生到 JMS 服務(wù)器鏈接的 ConnectionFactory 還得是由 JMS 服務(wù)廠商提供,并且需要把它注入到 Spring 提供的 ConnectionFactory 中。我們這里使用的是 ActiveMQ 實(shí)現(xiàn)的 JMS ,所以在我們這里真正的可以產(chǎn)生 Connection 的就應(yīng)該是由 ActiveMQ 提供的 ConnectionFactory 。所以定義一個(gè) ConnectionFactory 的完整代碼應(yīng)該如下所示:
- <!--?真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的?JMS服務(wù)廠商提供--> ??
- < bean ? id = "targetConnectionFactory" ? class = "org.apache.activemq.ActiveMQConnectionFactory" > ??
- ???? < property ? name = "brokerURL" ? value = "tcp://localhost:61616" /> ??
- </ bean > ??
- ??
- <!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?--> ??
- < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" > ??
- ???? <!--?目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS?Connection的ConnectionFactory?--> ??
- ???? < property ? name = "targetConnectionFactory" ? ref = "targetConnectionFactory" /> ??
- </ bean > ??
??
1.2.3配置生產(chǎn)者
配置好 ConnectionFactory 之后我們就需要配置生產(chǎn)者。生產(chǎn)者負(fù)責(zé)產(chǎn)生消息并發(fā)送到 JMS 服務(wù)器,這通常對(duì)應(yīng)的是我們的一個(gè)業(yè)務(wù)邏輯服務(wù)實(shí)現(xiàn)類。但是我們的服務(wù)實(shí)現(xiàn)類是怎么進(jìn)行消息的發(fā)送的呢?這通常是利用 Spring 為我們提供的 JmsTemplate 類來(lái)實(shí)現(xiàn)的,所以配置生產(chǎn)者其實(shí)最核心的就是配置進(jìn)行消息發(fā)送的 JmsTemplate 。對(duì)于消息發(fā)送者而言,它在發(fā)送消息的時(shí)候要知道自己該往哪里發(fā),為此,我們?cè)诙x JmsTemplate 的時(shí)候需要往里面注入一個(gè) Spring 提供的 ConnectionFactory 對(duì)象。
- <!--?Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送、接收等?--> ??
- < bean ? id = "jmsTemplate" ? class = "org.springframework.jms.core.JmsTemplate" > ??
- ???? <!--?這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象?--> ??
- ???? < property ? name = "connectionFactory" ? ref = "connectionFactory" /> ??
- </ bean > ??
?
??????? 在真正利用 JmsTemplate 進(jìn)行消息發(fā)送的時(shí)候,我們需要知道消息發(fā)送的目的地,即 destination 。在 Jms 中有一個(gè)用來(lái)表示目的地的 Destination 接口,它里面沒(méi)有任何方法定義,只是用來(lái)做一個(gè)標(biāo)識(shí)而已。當(dāng)我們?cè)谑褂? JmsTemplate 進(jìn)行消息發(fā)送時(shí)沒(méi)有指定 destination 的時(shí)候?qū)⑹褂媚J(rèn)的 Destination 。默認(rèn) Destination 可以通過(guò)在定義 jmsTemplate bean 對(duì)象時(shí)通過(guò)屬性 defaultDestination 或 defaultDestinationName 來(lái)進(jìn)行注入, defaultDestinationName 對(duì)應(yīng)的就是一個(gè)普通字符串。在 ActiveMQ 中實(shí)現(xiàn)了兩種類型的 Destination ,一個(gè)是點(diǎn)對(duì)點(diǎn)的 ActiveMQQueue ,另一個(gè)就是支持訂閱 / 發(fā)布模式的 ActiveMQTopic 。在定義這兩種類型的 Destination 時(shí)我們都可以通過(guò)一個(gè) name 屬性來(lái)進(jìn)行構(gòu)造,如:
?
?
?
?
- <!--這個(gè)是隊(duì)列目的地,點(diǎn)對(duì)點(diǎn)的--> ??
- < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???? < constructor-arg > ??
- ???????? < value > queue </ value > ??
- ???? </ constructor-arg > ??
- </ bean > ??
- <!--這個(gè)是主題目的地,一對(duì)多的--> ??
- < bean ? id = "topicDestination" ? class = "org.apache.activemq.command.ActiveMQTopic" > ??
- ???? < constructor-arg ? value = "topic" /> ??
- </ bean > ??
?
?
?
??????? 假設(shè)我們定義了一個(gè) ProducerService ,里面有一個(gè)向 Destination 發(fā)送純文本消息的方法 sendMessage ,那么我們的代碼就大概是這個(gè)樣子:
?
?
?
?
- package ?com.tiantian.springintejms.service.impl;??
- ???
- import ?javax.annotation.Resource;??
- import ?javax.jms.Destination;??
- import ?javax.jms.JMSException;??
- import ?javax.jms.Message;??
- import ?javax.jms.Session;??
- ???
- import ?org.springframework.jms.core.JmsTemplate;??
- import ?org.springframework.jms.core.MessageCreator;??
- import ?org.springframework.stereotype.Component;??
- ???
- import ?com.tiantian.springintejms.service.ProducerService;??
- ???
- @Component ??
- public ? class ?ProducerServiceImpl? implements ?ProducerService?{??
- ???
- ???? private ?JmsTemplate?jmsTemplate;??
- ??????
- ???? public ? void ?sendMessage(Destination?destination,? final ?String?message)?{??
- ????????System.out.println( "---------------生產(chǎn)者發(fā)送消息-----------------" );??
- ????????System.out.println( "---------------生產(chǎn)者發(fā)了一個(gè)消息:" ?+?message);??
- ????????jmsTemplate.send(destination,? new ?MessageCreator()?{??
- ???????????? public ?Message?createMessage(Session?session)? throws ?JMSException?{??
- ???????????????? return ?session.createTextMessage(message);??
- ????????????}??
- ????????});??
- ????}???
- ??
- ???? public ?JmsTemplate?getJmsTemplate()?{??
- ????????returnjmsTemplate;??
- ????}???
- ??
- ???? @Resource ??
- ???? public ? void ?setJmsTemplate(JmsTemplate?jmsTemplate)?{??
- ???????? this .jmsTemplate?=?jmsTemplate;??
- ????}??
- ???
- }??
?
?
?
??????? 我們可以看到在 sendMessage 方法體里面我們是通過(guò) jmsTemplate 來(lái)發(fā)送消息到對(duì)應(yīng)的 Destination 的。到此,我們生成一個(gè)簡(jiǎn)單的文本消息并把它發(fā)送到指定目的地 Destination 的生產(chǎn)者就配置好了。
1.2.4配置消費(fèi)者
生產(chǎn)者往指定目的地 Destination 發(fā)送消息后,接下來(lái)就是消費(fèi)者對(duì)指定目的地的消息進(jìn)行消費(fèi)了。那么消費(fèi)者是如何知道有生產(chǎn)者發(fā)送消息到指定目的地 Destination 了呢?這是通過(guò) Spring 為我們封裝的消息監(jiān)聽(tīng)容器 MessageListenerContainer 實(shí)現(xiàn)的,它負(fù)責(zé)接收信息,并把接收到的信息分發(fā)給真正的 MessageListener 進(jìn)行處理。每個(gè)消費(fèi)者對(duì)應(yīng)每個(gè)目的地都需要有對(duì)應(yīng)的 MessageListenerContainer 。對(duì)于消息監(jiān)聽(tīng)容器而言,除了要知道監(jiān)聽(tīng)哪個(gè)目的地之外,還需要知道到哪里去監(jiān)聽(tīng),也就是說(shuō)它還需要知道去監(jiān)聽(tīng)哪個(gè) JMS 服務(wù)器,這是通過(guò)在配置 MessageConnectionFactory 的時(shí)候往里面注入一個(gè) ConnectionFactory 來(lái)實(shí)現(xiàn)的。所以我們 在配置一個(gè) MessageListenerContainer 的時(shí)候有三個(gè)屬性必須指定,一個(gè)是表示從哪里監(jiān)聽(tīng)的 ConnectionFactory ;一個(gè)是表示監(jiān)聽(tīng)什么的 Destination ;一個(gè)是接收到消息以后進(jìn)行消息處理的 MessageListener 。 Spring 一共為我們提供了兩種類型的 MessageListenerContainer , SimpleMessageListenerContainer 和 DefaultMessageListenerContainer 。
SimpleMessageListenerContainer 會(huì)在一開(kāi)始的時(shí)候就創(chuàng)建一個(gè)會(huì)話 session 和消費(fèi)者 Consumer ,并且會(huì)使用標(biāo)準(zhǔn)的 JMS MessageConsumer.setMessageListener() 方法注冊(cè)監(jiān)聽(tīng)器讓 JMS 提供者調(diào)用監(jiān)聽(tīng)器的回調(diào)函數(shù)。它不會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要和參與外部的事務(wù)管理。兼容性方面,它非常接近于獨(dú)立的 JMS 規(guī)范,但一般不兼容 Java EE 的 JMS 限制。
大多數(shù)情況下我們還是使用的 DefaultMessageListenerContainer ,跟 SimpleMessageListenerContainer 相比, DefaultMessageListenerContainer 會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要,并且能夠參與外部的事務(wù)管理。它很好的平衡了對(duì) JMS 提供者要求低、先進(jìn)功能如事務(wù)參與和兼容 Java EE 環(huán)境。
定義處理消息的 MessageListener
??????? 要定義處理消息的 MessageListener 我們只需要實(shí)現(xiàn) JMS 規(guī)范中的 MessageListener 接口就可以了。 MessageListener 接口中只有一個(gè)方法 onMessage 方法,當(dāng)接收到消息的時(shí)候會(huì)自動(dòng)調(diào)用該方法。
?
?
?
?
- package ?com.tiantian.springintejms.listener;??
- ???
- import ?javax.jms.JMSException;??
- import ?javax.jms.Message;??
- import ?javax.jms.MessageListener;??
- import ?javax.jms.TextMessage;??
- ???
- public ? class ?ConsumerMessageListener? implements ?MessageListener?{??
- ???
- ???? public ? void ?onMessage(Message?message)?{??
- ???????? //這里我們知道生產(chǎn)者發(fā)送的就是一個(gè)純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換,或者直接把onMessage方法的參數(shù)改成Message的子類TextMessage ??
- ????????TextMessage?textMsg?=?(TextMessage)?message;??
- ????????System.out.println( "接收到一個(gè)純文本消息。" );??
- ???????? try ?{??
- ????????????System.out.println( "消息內(nèi)容是:" ?+?textMsg.getText());??
- ????????}? catch ?(JMSException?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- ???
- }??
?
??
?
??????? 有了 MessageListener 之后我們就可以在 Spring 的配置文件中配置一個(gè)消息監(jiān)聽(tīng)容器了。
- <!--這個(gè)是隊(duì)列目的地--> ??
- < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???? < constructor-arg > ??
- ???????? < value > queue </ value > ??
- ???? </ constructor-arg > ??
- </ bean > ??
- <!--?消息監(jiān)聽(tīng)器?--> ??
- < bean ? id = "consumerMessageListener" ? class = "com.tiantian.springintejms.listener.ConsumerMessageListener" /> ??????
- ??
- <!--?消息監(jiān)聽(tīng)容器?--> ??
- < bean ? id = "jmsContainer" ???????? class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > ??
- ???? < property ? name = "connectionFactory" ? ref = "connectionFactory" ? /> ??
- ???? < property ? name = "destination" ? ref = "queueDestination" ? /> ??
- ???? < property ? name = "messageListener" ? ref = "consumerMessageListener" ? /> ??
- </ bean > ??
?
?
??????? 我們可以看到我們定義了一個(gè)名叫 queue 的 ActiveMQQueue 目的地,我們的監(jiān)聽(tīng)器就是監(jiān)聽(tīng)了發(fā)送到這個(gè)目的地的消息。
??????? 至此我們的生成者和消費(fèi)者都配置完成了,這也就意味著我們的整合已經(jīng)完成了。這個(gè)時(shí)候完整的 Spring 的配置文件應(yīng)該是這樣的:
- <? xml ? version = "1.0" ? encoding = "UTF-8" ?> ??
- < beans ? xmlns = "http://www.springframework.org/schema/beans" ??
- ???? xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" ? xmlns:context = "http://www.springframework.org/schema/context" ??
- ???? xmlns:jms = "http://www.springframework.org/schema/jms" ??
- ???? xsi:schemaLocation ="http://www.springframework.org/schema/beans??
- ?????http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
- ?????http://www.springframework.org/schema/context??
- ?????http://www.springframework.org/schema/context/spring-context-3.0.xsd??
- ????http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-3.0.xsd??
- ????http://www.springframework.org/schema/jms?http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" > ??
- ???
- ???? < context:component-scan ? base-package = "com.tiantian" ? /> ??
- ???
- ???? <!--?Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送、接收等?--> ??
- ???? < bean ? id = "jmsTemplate" ? class = "org.springframework.jms.core.JmsTemplate" > ??
- ???????? <!--?這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象?--> ??
- ???????? < property ? name = "connectionFactory" ? ref = "connectionFactory" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--?真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的?JMS服務(wù)廠商提供--> ??
- ???? < bean ? id = "targetConnectionFactory" ? class = "org.apache.activemq.ActiveMQConnectionFactory" > ??
- ???????? < property ? name = "brokerURL" ? value = "tcp://localhost:61616" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?--> ??
- ???? < bean ? id = "connectionFactory" ? class = "org.springframework.jms.connection.SingleConnectionFactory" > ??
- ???????? <!--?目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS?Connection的ConnectionFactory?--> ??
- ???????? < property ? name = "targetConnectionFactory" ? ref = "targetConnectionFactory" /> ??
- ???? </ bean > ??
- ??????
- ???? <!--這個(gè)是隊(duì)列目的地--> ??
- ???? < bean ? id = "queueDestination" ? class = "org.apache.activemq.command.ActiveMQQueue" > ??
- ???????? < constructor-arg > ??
- ???????????? < value > queue </ value > ??
- ???????? </ constructor-arg > ??
- ???? </ bean > ??
- ???? <!--?消息監(jiān)聽(tīng)器?--> ??
- ???? < bean ? id = "consumerMessageListener" ? class = "com.tiantian.springintejms.listener.ConsumerMessageListener" /> ??
- ???? <!--?消息監(jiān)聽(tīng)容器?--> ??
- ???? < bean ? id = "jmsContainer" ??
- ???????? class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > ??
- ???????? < property ? name = "connectionFactory" ? ref = "connectionFactory" ? /> ??
- ???????? < property ? name = "destination" ? ref = "queueDestination" ? /> ??
- ???????? < property ? name = "messageListener" ? ref = "consumerMessageListener" ? /> ??
- ???? </ bean > ??
- </ beans > ??
?
?
??????? 接著我們來(lái)測(cè)試一下,看看我們的整合是否真的成功了,測(cè)試代碼如下:
?
?
?
?
- package ?com.tiantian.springintejms.test;??
- ???
- import ?javax.jms.Destination;??
- ???
- import ?org.junit.Test;??
- import ?org.junit.runner.RunWith;??
- import ?org.springframework.beans.factory.annotation.Autowired;??
- import ?org.springframework.beans.factory.annotation.Qualifier;??
- import ?org.springframework.test.context.ContextConfiguration;??
- import ?org.springframework.test.context.junit4.SpringJUnit4ClassRunner;??
- import ?com.tiantian.springintejms.service.ProducerService;??
- ???
- @RunWith (SpringJUnit4ClassRunner. class )??
- @ContextConfiguration ( "/applicationContext.xml" )??
- public ? class ?ProducerConsumerTest?{??
- ???
- ???? @Autowired ??
- ???? private ?ProducerService?producerService;??
- ???? @Autowired ??
- ???? @Qualifier ( "queueDestination" )??
- ???? private ?Destination?destination;??
- ??????
- ???? @Test ??
- ???? public ? void ?testSend()?{??
- ???????? for ?( int ?i= 0 ;?i< 2 ;?i++)?{??
- ????????????producerService.sendMessage(destination,? "你好,生產(chǎn)者!這是消息:" ?+?(i+ 1 ));??
- ????????}??
- ????}??
- ??????
- }??
?
?
?
??????? 在上面的測(cè)試代碼中我們利用生產(chǎn)者發(fā)送了兩個(gè)消息,正常來(lái)說(shuō),消費(fèi)者應(yīng)該可以接收到這兩個(gè)消息。運(yùn)行測(cè)試代碼后控制臺(tái)輸出如下:
??????? 看,控制臺(tái)已經(jīng)進(jìn)行了正確的輸出,這說(shuō)明我們的整合確實(shí)是已經(jīng)成功了。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
