亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python 操作 Rabbit MQ 發布/訂閱 (五)

系統 2233 0

Python 操作 Rabbit MQ 發布/訂閱 (五)

一、發布、訂閱:

我們將一個消息分發給 多個消費者 ,這種模式被稱為 發布/訂閱

為了更好的理解這個模式,我們將構建一個日志系統,它包括兩個程序:

  • 第一個程序,負責發送日志消息;
  • 第二個程序,負責獲取消息并輸出內容;

在日志系統中,所有正在運行的接收方程序都會接收消息;

  • 一個接受者,把日志寫入硬盤中;
  • 另一個接受者,把日志輸出到屏幕上;

最終,日志消息被廣播給所有的接受者。

二、交換機(Exchanges):

概念 :應用程序發送消息時,先把消息給交換機,由交換機投遞給隊列,而不是直接給隊列。交換機可以由多個 消息通道(Channel) ,用于投遞消息。

簡單概括下之前的知識

  • 發布者(Producer):是發布消息的應用程序。
  • 隊列(Queue):用于消息存儲的緩沖。
  • 消費者(Consumer):是接收消息的應用程序。

圖解大體流程
Python 操作 Rabbit MQ 發布/訂閱 (五)_第1張圖片

  • P:代表是發布者;
  • X:是交換機;

詳解圖意 :發布者(P )→交換機(X)→隊列(Q)→消費者(C );

  • 交換機一邊從發布者方接收消息,一邊把消息推送到隊列(Q)。 交換機必須知道如何處理它接收到的消息,是推送到指定的隊列、還是多個隊列,或者是忽略消息 。這些都是通過 交換機類型(Exchange Type) 來定義的。

交換機類型

1.直連交換機(Direct);

2.主題交換機(Topic);

3.頭交換機(Headers);

4.扇形交換機(Fanout);

  • 主要說明—扇形交換,它把消息發送給它所知道的所有隊列。

                    
                      channel
                      
                        .
                      
                      
                        exchange_declare
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        'fanout_logs'
                      
                      
                        ,
                      
                      
                             exchange_type
                      
                        =
                      
                      
                        'fanout'
                      
                      
                        )
                      
                    
                  

參數講解

  • exchange:就是交換機的名稱, 空字符串代表默認或者匿名交換機;

                    
                      channel
                      
                        .
                      
                      
                        basic_publish
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        ''
                      
                      
                        )
                      
                    
                  
  • exchange_type:就是交換機的類型;

  • routing_key:分發到指定的隊列;

  • body:發送的內容;

  • properties:使消息持久化;

查看交換器列表

命令: rabbitmqctl list_exchanges

            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers

            
          

列表中以amq.*的開頭的交換器,都是默認創建的,目前不需要管它們。

三、臨時隊列:

我們連接上Rabbit MQ的時候,需要一個 全新的、空的隊列 (也就是說不使用之前提到的,routing_key參數指定的隊列名),我們可以 手動創建一個隨機的隊列名 ,或者讓 服務器為我們選擇一個隨機的隊列名(推薦) 。我們僅需要在 調用queue_declare方法時,不提供queue參數 即可:

            
              # 在管道里
              
                ,
              
               不聲明隊列名稱
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          

可通過 result.method.queue 獲取已經生成的隨機隊列名,大概的樣子如下所示:

            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          

與消費者斷開連接時,這個隊列應被立即刪除:

            
              # 需要一個空的隊列  exclusive
              
                =
              
              True 表示與消費者斷開時
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          

四、綁定:

img

目前已經創建一個扇形交換機和一個隊列。現在需要告訴交換機如果發送消息給隊列。

交換機和隊列之間的聯系我們稱為綁定(binding)

            
              # 將fanount_logs交換機將會把消息添加到我們的隊列中
              
                ,
              
               隊列名服務器隨機生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          

查看綁定列表

列出所有現存的綁定命令: rabbitmqctl list_bindings

五、整理本節最終代碼:

圖解最終流程

Python 操作 Rabbit MQ 發布/訂閱 (五)_第2張圖片

發布日志與之前的區別

1.我們把消息發送給fanout_logs交換機而不是匿名的交換機;

2.發送的時候需要提供routing_key參數,但它的值會被扇形交換機忽略;

以下是 send.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創建一個實例  本地訪問
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因為默認就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個管道
              
                ,
              
               在管道里發送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機的類型為fanout
              
                ,
              
               執行交換機名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 投遞消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交換機的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交換機
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 隊列關閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

若沒有綁定隊列的交換器,消息將會丟失。以下是 receive.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 創建實例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機名為 fanout_logs 類型為扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

# 綁定交換機和隊列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調用callback函數來處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費消息

            
          

3.如果想把日志保存到文件中,打開控制臺輸入:

            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 

            
          

4.在屏幕中查看日志,在打開一個新的終端運行:

            
              python receive
              
                .
              
              py 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

5.發送消息:

            
              python send
              
                .
              
              py 發送第一條消息

            
          

6.可以看到消費者接收到了消息,并且日志中也記錄了這條消息。

            
              cat logs_from_rabbit
              
                .
              
              log 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發送第一條消息

            
          

7.確認已經創建的隊列綁定:

            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          

交換器fanout_logs把數據發送給兩個系統名命的隊列


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 91嫩草国产线免费观看 | 手机看一级片 | 99精品国产福利在线观看 | 国产成人aa免费视频 | 在线欧美v日韩v国产精品v | 色狠狠成人综合色 | 国产高清精品一级毛片 | 国产成人精品一区二区免费 | 深夜免费福利 | 中文字幕免费在线看线人动作大片 | 目韩一区二区三区系列片丶 | 鲁一鲁射一射 | 日本一区视频在线 | 91视频国产高清 | 91精品国产免费自在线观看 | 欧美国产亚洲一区 | 亚洲国产一区二区三区四区五区 | 精品999久久久久久中文字幕 | 精品久久中文网址 | 69欧美另类xxxxx高清 | 日本欧美一区二区三区不卡视频 | 久久久久久久国产精品视频 | 欧美日韩中文字幕在线观看 | 久久久久国产精品 | 精彩视频一区二区 | 4虎永免费最新永久免费地址 | 成年女人18毛片毛片免费 | 国产精品国色综合久久 | 国产福利不卡视频在免费播放 | 一本大道加勒比久久综合 | 另类尿喷潮videofree | 91成人免费在线视频 | 日产国语一区二区三区在线看 | 国产成人不卡亚洲精品91 | 久久人人爽人人爽 | 色偷偷久久| 福利在线视频观看 | 4399一级成人毛片 | 99精品国产高清自在线看超 | 99视频在线观看免费 | 免费观看四虎精品成人 |