亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Spring整合JMS(一)——基于ActiveMQ實(shí)現(xiàn)

系統(tǒng) 1799 0

?

?

1.1 ???? JMS 簡(jiǎn)介

?????? JMS 的全稱是 Java Message Service ,即 Java 消息服務(wù)。它主要用于在生產(chǎn)者和消費(fèi)者之間進(jìn)行消息傳遞,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,而消費(fèi)者負(fù)責(zé)接收消息。把它應(yīng)用到實(shí)際的業(yè)務(wù)需求中的話我們可以在特定的時(shí)候利用生產(chǎn)者生成一消息,并進(jìn)行發(fā)送,對(duì)應(yīng)的消費(fèi)者在接收到對(duì)應(yīng)的消息后去完成對(duì)應(yīng)的業(yè)務(wù)邏輯。對(duì)于消息的傳遞有兩種類型,一種是點(diǎn)對(duì)點(diǎn)的,即一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者一一對(duì)應(yīng);另一種是發(fā)布 / 訂閱模式,即一個(gè)生產(chǎn)者產(chǎn)生消息并進(jìn)行發(fā)送后,可以由多個(gè)消費(fèi)者進(jìn)行接收。

1.2 ???? Spring 整合 JMS

?????? 對(duì) JMS 做了一個(gè)簡(jiǎn)要介紹之后,接下來(lái)就講一下 Spring 整合 JMS 的具體過(guò)程。 JMS 只是一個(gè)標(biāo)準(zhǔn),真正在使用它的時(shí)候我們需要有它的具體實(shí)現(xiàn),這里我們就使用 Apache activeMQ 來(lái)作為它的實(shí)現(xiàn)。所使用的依賴?yán)? Maven 來(lái)進(jìn)行管理,具體依賴如下:

?

        <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring-version}</version>
        </dependency>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>jsr250-api</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies>


      

?

1.2.1 ? activeMQ準(zhǔn)備

?????? 既然是使用的 apache activeMQ 作為 JMS 的實(shí)現(xiàn),那么首先我們應(yīng)該到 apache 官網(wǎng)上下載 activeMQ http://activemq.apache.org/download.html ),進(jìn)行解壓后運(yùn)行其 bin 目錄下面的 activemq.bat 文件啟動(dòng) activeMQ

1.2.2配置ConnectionFactory

?????? ConnectionFactory 是用于產(chǎn)生到 JMS 服務(wù)器的鏈接的, Spring 為我們提供了多個(gè) ConnectionFactory ,有 SingleConnectionFactory CachingConnectionFactory SingleConnectionFactory 對(duì)于建立 JMS 服務(wù)器鏈接的請(qǐng)求會(huì)一直返回同一個(gè)鏈接,并且會(huì)忽略 Connection close 方法調(diào)用。 CachingConnectionFactory 繼承了 SingleConnectionFactory ,所以它擁有 SingleConnectionFactory 的所有功能,同時(shí)它還新增了緩存功能,它可以緩存 Session MessageProducer MessageConsumer 。這里我們使用 SingleConnectionFactory 來(lái)作為示例。

        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
      

?

?????? 這樣就定義好產(chǎn)生 JMS 服務(wù)器鏈接的 ConnectionFactory 了嗎?答案是非也。 Spring 提供的 ConnectionFactory 只是 Spring 用于管理 ConnectionFactory 的,真正產(chǎn)生到 JMS 服務(wù)器鏈接的 ConnectionFactory 還得是由 JMS 服務(wù)廠商提供,并且需要把它注入到 Spring 提供的 ConnectionFactory 中。我們這里使用的是 ActiveMQ 實(shí)現(xiàn)的 JMS ,所以在我們這里真正的可以產(chǎn)生 Connection 的就應(yīng)該是由 ActiveMQ 提供的 ConnectionFactory 。所以定義一個(gè) ConnectionFactory 的完整代碼應(yīng)該如下所示:

            <!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>


      

??

?????? ActiveMQ 為我們提供了一個(gè) PooledConnectionFactory ,通過(guò)往里面注入一個(gè) ActiveMQConnectionFactory 可以用來(lái)將 Connection Session MessageProducer 池化,這樣可以大大的減少我們的資源消耗。當(dāng)使用 PooledConnectionFactory 時(shí),我們?cè)诙x一個(gè) ConnectionFactory 時(shí)應(yīng)該是如下定義:

?

            <!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
        <property name="maxConnections" value="10"/>
    </bean>
    
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
      

1.2.3配置生產(chǎn)者

配置好 ConnectionFactory 之后我們就需要配置生產(chǎn)者。生產(chǎn)者負(fù)責(zé)產(chǎn)生消息并發(fā)送到 JMS 服務(wù)器,這通常對(duì)應(yīng)的是我們的一個(gè)業(yè)務(wù)邏輯服務(wù)實(shí)現(xiàn)類。但是我們的服務(wù)實(shí)現(xiàn)類是怎么進(jìn)行消息的發(fā)送的呢?這通常是利用 Spring 為我們提供的 JmsTemplate 類來(lái)實(shí)現(xiàn)的,所以配置生產(chǎn)者其實(shí)最核心的就是配置進(jìn)行消息發(fā)送的 JmsTemplate 。對(duì)于消息發(fā)送者而言,它在發(fā)送消息的時(shí)候要知道自己該往哪里發(fā),為此,我們?cè)诙x JmsTemplate 的時(shí)候需要往里面注入一個(gè) Spring 提供的 ConnectionFactory 對(duì)象。

            <!-- Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

      

?

?????? 在真正利用 JmsTemplate 進(jìn)行消息發(fā)送的時(shí)候,我們需要知道消息發(fā)送的目的地,即 destination 。在 Jms 中有一個(gè)用來(lái)表示目的地的 Destination 接口,它里面沒(méi)有任何方法定義,只是用來(lái)做一個(gè)標(biāo)識(shí)而已。當(dāng)我們?cè)谑褂? JmsTemplate 進(jìn)行消息發(fā)送時(shí)沒(méi)有指定 destination 的時(shí)候?qū)⑹褂媚J(rèn)的 Destination 。默認(rèn) Destination 可以通過(guò)在定義 jmsTemplate bean 對(duì)象時(shí)通過(guò)屬性 defaultDestination defaultDestinationName 來(lái)進(jìn)行注入, defaultDestinationName 對(duì)應(yīng)的就是一個(gè)普通字符串。在 ActiveMQ 中實(shí)現(xiàn)了兩種類型的 Destination ,一個(gè)是點(diǎn)對(duì)點(diǎn)的 ActiveMQQueue ,另一個(gè)就是支持訂閱 / 發(fā)布模式的 ActiveMQTopic 。在定義這兩種類型的 Destination 時(shí)我們都可以通過(guò)一個(gè) name 屬性來(lái)進(jìn)行構(gòu)造,如:

?

            <!--這個(gè)是隊(duì)列目的地,點(diǎn)對(duì)點(diǎn)的-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!--這個(gè)是主題目的地,一對(duì)多的-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>

      

?????? 假設(shè)我們定義了一個(gè) ProducerService ,里面有一個(gè)向 Destination 發(fā)送純文本消息的方法 sendMessage ,那么我們的代碼就大概是這個(gè)樣子:

?

        package com.tiantian.springintejms.service.impl;
 
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
import com.tiantian.springintejms.service.ProducerService;
 
@Component
public class ProducerServiceImpl implements ProducerService {
 
    private JmsTemplate jmsTemplate;
    
    public void sendMessage(Destination destination, final String message) {
        System.out.println("---------------生產(chǎn)者發(fā)送消息-----------------");
        System.out.println("---------------生產(chǎn)者發(fā)了一個(gè)消息:" + message);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    } 

    public JmsTemplate getJmsTemplate() {
        returnjmsTemplate;
    } 

    @Resource
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
 
}

      

?????? 我們可以看到在 sendMessage 方法體里面我們是通過(guò) jmsTemplate 來(lái)發(fā)送消息到對(duì)應(yīng)的 Destination 的。到此,我們生成一個(gè)簡(jiǎn)單的文本消息并把它發(fā)送到指定目的地 Destination 的生產(chǎn)者就配置好了。

1.2.4配置消費(fèi)者

生產(chǎn)者往指定目的地 Destination 發(fā)送消息后,接下來(lái)就是消費(fèi)者對(duì)指定目的地的消息進(jìn)行消費(fèi)了。那么消費(fèi)者是如何知道有生產(chǎn)者發(fā)送消息到指定目的地 Destination 了呢?這是通過(guò) Spring 為我們封裝的消息監(jiān)聽(tīng)容器 MessageListenerContainer 實(shí)現(xiàn)的,它負(fù)責(zé)接收信息,并把接收到的信息分發(fā)給真正的 MessageListener 進(jìn)行處理。每個(gè)消費(fèi)者對(duì)應(yīng)每個(gè)目的地都需要有對(duì)應(yīng)的 MessageListenerContainer 。對(duì)于消息監(jiān)聽(tīng)容器而言,除了要知道監(jiān)聽(tīng)哪個(gè)目的地之外,還需要知道到哪里去監(jiān)聽(tīng),也就是說(shuō)它還需要知道去監(jiān)聽(tīng)哪個(gè) JMS 服務(wù)器,這是通過(guò)在配置 MessageConnectionFactory 的時(shí)候往里面注入一個(gè) ConnectionFactory 來(lái)實(shí)現(xiàn)的。所以我們 在配置一個(gè) MessageListenerContainer 的時(shí)候有三個(gè)屬性必須指定,一個(gè)是表示從哪里監(jiān)聽(tīng)的 ConnectionFactory ;一個(gè)是表示監(jiān)聽(tīng)什么的 Destination ;一個(gè)是接收到消息以后進(jìn)行消息處理的 MessageListener Spring 一共為我們提供了兩種類型的 MessageListenerContainer SimpleMessageListenerContainer DefaultMessageListenerContainer

SimpleMessageListenerContainer 會(huì)在一開(kāi)始的時(shí)候就創(chuàng)建一個(gè)會(huì)話 session 和消費(fèi)者 Consumer ,并且會(huì)使用標(biāo)準(zhǔn)的 JMS MessageConsumer.setMessageListener() 方法注冊(cè)監(jiān)聽(tīng)器讓 JMS 提供者調(diào)用監(jiān)聽(tīng)器的回調(diào)函數(shù)。它不會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要和參與外部的事務(wù)管理。兼容性方面,它非常接近于獨(dú)立的 JMS 規(guī)范,但一般不兼容 Java EE JMS 限制。

大多數(shù)情況下我們還是使用的 DefaultMessageListenerContainer ,跟 SimpleMessageListenerContainer 相比, DefaultMessageListenerContainer 會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要,并且能夠參與外部的事務(wù)管理。它很好的平衡了對(duì) JMS 提供者要求低、先進(jìn)功能如事務(wù)參與和兼容 Java EE 環(huán)境。

定義處理消息的 MessageListener

?????? 要定義處理消息的 MessageListener 我們只需要實(shí)現(xiàn) JMS 規(guī)范中的 MessageListener 接口就可以了。 MessageListener 接口中只有一個(gè)方法 onMessage 方法,當(dāng)接收到消息的時(shí)候會(huì)自動(dòng)調(diào)用該方法。

?

        package com.tiantian.springintejms.listener;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        //這里我們知道生產(chǎn)者發(fā)送的就是一個(gè)純文本消息,所以這里可以直接進(jìn)行強(qiáng)制轉(zhuǎn)換
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一個(gè)純文本消息。");
        try {
            System.out.println("消息內(nèi)容是:" + textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}
      

?

?????? 有了 MessageListener 之后我們就可以在 Spring 的配置文件中配置一個(gè)消息監(jiān)聽(tīng)容器了。

            <!--這個(gè)是隊(duì)列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!-- 消息監(jiān)聽(tīng)器 -->
    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>    

    <!-- 消息監(jiān)聽(tīng)容器 -->
    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>

      

??

?????? 我們可以看到我們定義了一個(gè)名叫 queue ActiveMQQueue 目的地,我們的監(jiān)聽(tīng)器就是監(jiān)聽(tīng)了發(fā)送到這個(gè)目的地的消息。

?????? 至此我們的生成者和消費(fèi)者都配置完成了,這也就意味著我們的整合已經(jīng)完成了。這個(gè)時(shí)候完整的 Spring 的配置文件應(yīng)該是這樣的:

        <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
 
    <context:component-scan base-package="com.tiantian" />
 
    <!-- Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!--這個(gè)是隊(duì)列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!-- 消息監(jiān)聽(tīng)器 -->
    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
    <!-- 消息監(jiān)聽(tīng)容器 -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>
</beans>

      

?

?

?????? 接著我們來(lái)測(cè)試一下,看看我們的整合是否真的成功了,測(cè)試代碼如下:

?

        package com.tiantian.springintejms.test;
 
import javax.jms.Destination;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.tiantian.springintejms.service.ProducerService;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;
    
    @Test
    public void testSend() {
        for (int i=0; i<2; i++) {
            producerService.sendMessage(destination, "你好,生產(chǎn)者!這是消息:" + (i+1));
        }
    }
    
}

      

?

?????? 在上面的測(cè)試代碼中我們利用生產(chǎn)者發(fā)送了兩個(gè)消息,正常來(lái)說(shuō),消費(fèi)者應(yīng)該可以接收到這兩個(gè)消息。運(yùn)行測(cè)試代碼后控制臺(tái)輸出如下:

?
Spring整合JMS(一)——基于ActiveMQ實(shí)現(xiàn)
?

?????? 看,控制臺(tái)已經(jīng)進(jìn)行了正確的輸出,這說(shuō)明我們的整合確實(shí)是已經(jīng)成功了。

?

?附:

Spring整合JMS(一)——基于ActiveMQ實(shí)現(xiàn)


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 九色视频网| 欧美aaa毛片免费看 欧美aaa性bbb毛片 | 国产高清一区二区三区四区 | 久久久久久亚洲精品 | 中文线码中文高清播放中 | 伊人久久中文字幕久久cm | 综合色久七七综合七七蜜芽 | 欧美一级日韩在线观看 | 色一情一欲一爱一乱 | 黄色网毛片 | 免费一极毛片 | 视频一区中文字幕 | 国产成人高清亚洲一区91 | 久久香蕉国产线 | 午夜影院免费体验区 | 不卡一区在线观看 | 国产精品美女久久久久网 | a毛片基地 | 亚洲国产欧美在线 | 久久国产高清视频 | 性生活视频免费观看 | 护士日本xxxxx丰满hd4k | 国产成人亚洲精品77 | 四虎永久在线精品 | 四虎国产精品影库永久免费 | 午夜日韩在线 | 青青青在线观看免费视频精品 | 亚洲免费黄色网 | 亚洲精品一| 九九热在线视频观看这里只有精品 | 成人看毛片 | 久久香蕉国产线看观看乱码 | 日本中文字幕在线视频 | 久久亚洲精品成人综合 | 久久88香港三级台湾三级中文 | 亚洲免费大片 | 久久精品国产亚洲精品2020 | 视频福利在线 | 久操视频在线观看免费 | 成年女人看片免费视频频 | 国产国产成人人免费影院 |