亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

python使用pika操作rabbitmq編程入門(二)

系統(tǒng) 1769 0

pika生產(chǎn)者程序大致步驟:

            
              1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel  當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列   指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱
4. 聲明交換機(jī)  交換機(jī)類型,名稱等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定   queue_bind 
6. basic_publish 發(fā)送到交換機(jī) 指定路由鍵

            
          

pika消費(fèi)者程序大致步驟:

            
              1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel  當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列   指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱
4. 聲明交換機(jī)  交換機(jī)類型,名稱等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定   queue_bind 
6. basic_consume 消費(fèi)消息

            
          

1. 輪詢接收消息

使用消息隊(duì)列的一個(gè)好處就是, 可以將任務(wù)消息發(fā)送到隊(duì)列中,由消費(fèi)者異步進(jìn)行處理, 同時(shí)對(duì)于后端消費(fèi)者可以很容易地增加減少,只需要運(yùn)行多個(gè)進(jìn)程即可, 方便擴(kuò)展, 之前的示例中消費(fèi)端程序就可以開啟多個(gè),然后可以看到消費(fèi)被輪詢得分配給每個(gè)消費(fèi)者

將之前的消費(fèi)者略作更改, 加入客戶端編號(hào),啟動(dòng)三個(gè)消費(fèi)者, 通過(guò)生產(chǎn)者發(fā)送4個(gè)消息, 依次收到消息, 即是 輪詢(round-robin):

            
               [*] Waiting for messages. To exit press CTRL+C
1. [x] Received 'Hello World!'
1. [x] Received 'Hello World!'

            
          
            
               [*] Waiting for messages. To exit press CTRL+C
2. [x] Received 'Hello World!'

            
          
            
               [*] Waiting for messages. To exit press CTRL+C
3. [x] Received 'Hello World!'

            
          

2. 消息確認(rèn):

為了防止消息丟失,RabbitMQ提供了消息響應(yīng)(acknowledgments)。消費(fèi)者會(huì)通過(guò)一個(gè)ack(響應(yīng)),告訴RabbitMQ已經(jīng)收到并處理了某條消息,然后RabbitMQ就會(huì)釋放并刪除這條消息。 如果消費(fèi)者(consumer)掛掉了,沒(méi)有發(fā)送響應(yīng),RabbitMQ就會(huì)認(rèn)為消息沒(méi)有被完全處理,然后重新發(fā)送給其他消費(fèi)者(consumer)。這樣,及時(shí)工作者(workers)偶爾的掛掉,也不會(huì)丟失消息。

消息響應(yīng)默認(rèn)auto_ack=False, 不自動(dòng)確認(rèn)消息, 即是需要我們處理并確認(rèn)消息的

確認(rèn)需要發(fā)送確認(rèn)消息:

在回調(diào)callback中加入basic_ack

            
              channel
              
                .
              
              basic_ack
              
                (
              
              delivery_tag 
              
                =
              
               method
              
                .
              
              delivery_tag
              
                )
              
            
          

如果auto_ack設(shè)置為True,而忘記basic_ack消息確認(rèn),消息在程序退出之后就會(huì)重新發(fā)送,如果不及時(shí)釋放沒(méi)響應(yīng)的消息,RabbitMQ就會(huì)占用越來(lái)越多的內(nèi)存。

為了排除這種錯(cuò)誤,可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:

            
              # rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   0       0
TEST01  0       0

            
          

3. 消息持久化

如果沒(méi)有向rabbitmq指定消息持久化, 則退出或者崩潰的時(shí)候,將會(huì)丟失所有隊(duì)列和消息, 消息持久化必須把“隊(duì)列”和“消息”設(shè)為持久化

  1. 隊(duì)列聲明持久化:
            
              channel.queue_declare(queue='hello', durable=True)

            
          

注意一個(gè)消息隊(duì)列被聲明過(guò)一次后,rabbitmq不允許使用不同的參數(shù)重新定義隊(duì)列, 因此如果存在hello隊(duì)列,上面會(huì)提示錯(cuò)誤

  1. 消息聲明持久化

將publish生產(chǎn)者發(fā)送消息時(shí)候消息屬性, delivery_mode的屬性設(shè)為2

            
               properties=pika.BasicProperties(delivery_mode=2)

            
          

生產(chǎn)者代碼:

            
              channel.queue_declare(queue='TEST02', durable=True)

channel.basic_publish(exchange='',
                      routing_key='TEST02',
                      body='Hello World!',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )

            
          

客戶端代碼:

            
              channel.queue_declare(queue='TEST02', durable=True)

def callback(ch, method, properties, body):
    print(". [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(on_message_callback=callback,
                      queue='TEST02',
                      auto_ack=False,
                      )

            
          

4. 設(shè)置客戶端QOS

開啟客戶端最大的未處理消息隊(duì)列大小:

            
              channel.basic_qos(prefetch_count=1)

            
          

代碼示例:

            
              channel.queue_declare(queue='TEST02', durable=True)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    print(". [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(on_message_callback=callback,
                      queue='TEST02',
                      auto_ack=False,
                      )

            
          

5. 發(fā)布訂閱模式:

rabbitmq在之前介紹的時(shí)候可以看到,消息是被依次發(fā)送給消費(fèi)者,即是消息只會(huì)被發(fā)送給一個(gè)消費(fèi)者,除非開啟確認(rèn)機(jī)制時(shí)處理失敗了, 一個(gè)消息發(fā)送給多個(gè)消費(fèi)者, 這個(gè)是rabbitmq提供的發(fā)布訂閱模式

發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換機(jī)(exchange)。交換機(jī)非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換機(jī)必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)交換機(jī)類型(exchange type)來(lái)定義的

交換機(jī)類型:直連交換機(jī)(direct)-- 一對(duì)一, 之前使用的就是這個(gè);主題交換機(jī)(topic)-- 模糊匹配,需要符合匹配規(guī)則; headers(頭交換機(jī))和 扇型交換機(jī)(fanout)-- 進(jìn)行消息廣播

fanout會(huì)發(fā)送消息到交換機(jī)所有的消息隊(duì)列

消息將會(huì)根據(jù)指定的routing_key分發(fā)到指定的隊(duì)列

rabbitmq擁有一個(gè)默認(rèn)交換機(jī) 即是 空字符串(""),

            
              channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

            
          

在pika編程中,可以不用指定隊(duì)列名稱,系統(tǒng)會(huì)隨機(jī)生成一個(gè)名稱, 在重啟都該隊(duì)列丟失

只需在聲明時(shí)不提供參數(shù)就可以了

            
              result = channel.queue_declare()

            
          

隊(duì)列需要綁定到交換, 才能通過(guò)交換機(jī)發(fā)送消息到該隊(duì)列

            
              channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

            
          

代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

6. 幾個(gè)重要概念的程序?qū)崿F(xiàn)

  1. 路由 routing

路由鍵在發(fā)送消息的時(shí)候進(jìn)行指定

            
              channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

            
          
  1. 隊(duì)列綁定 binding
            
              channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

            
          
  1. 交換機(jī)類型

交換機(jī)聲明的時(shí)候進(jìn)行指定, 一般常用direct, fanout, topic三種類型

            
              channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

            
          
  1. 路由鍵

一般路由鍵不等于綁定鍵, 但是我們通常在direct的時(shí)候可以近似的認(rèn)為這兩個(gè)等價(jià)的

在隊(duì)列綁定的時(shí)候,通過(guò)指定routing_key 指定

            
              channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='tologs')

            
          
  1. 一個(gè)路由鍵多個(gè)隊(duì)列

也就是多個(gè)隊(duì)列使用相同的綁定鍵, 這個(gè)是合法 的, 這樣就可以將消息發(fā)生到不同的隊(duì)列中

例如:

            
              channel.queue_bind(exchange="logs",queue="info",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="warn",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="debug",routing_key='tologs')

            
          
  1. 排他隊(duì)列:

一個(gè)只有自己可見的隊(duì)列,即不允許其它用戶訪問(wèn),RabbitMQ允許你將一個(gè)Queue聲明成為排他性的

            
              channel.queue_declare(exclusive=True)

            
          

示例代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

7. 主題交換機(jī) (topic)

發(fā)送到主題交換機(jī)(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個(gè)由 . 分隔開的詞語(yǔ)列表, 例如: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”

綁定鍵也必須擁有同樣的格式。主題交換機(jī)背后的邏輯跟直連交換機(jī)很相似, 攜帶著特定路由鍵的消息會(huì)被主題交換機(jī)投遞給綁定鍵與之想匹配的隊(duì)列

它的綁定鍵和路由鍵有兩個(gè)特殊應(yīng)用方式, 即是支持模糊匹配:

  • * (星號(hào)) 用來(lái)表示一個(gè)單詞.
  • # (井號(hào)) 用來(lái)表示任意數(shù)量(零個(gè)或多個(gè))單詞。

例如路由鍵: *.*.rabbit lazy.#

lazy.pink.rabbit 會(huì)匹配 *.*.rabbit lazy.#

lazy.x 匹配 lazy.#

注特殊情況:

當(dāng)一個(gè)隊(duì)列的綁定鍵為 “#”(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無(wú)視消息的路由鍵,接收所有的消息。

當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。

示例代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 天天艹日日干 | 久久久久综合网久久 | 色老头久久久久久久久久 | 九九爱国产 | 日本欧美强乱视频在线 | 99精品热视频 | 国产一区曰韩二区欧美三区 | 亚洲国产欧美精品一区二区三区 | 免费激情网址 | 国产精品国产精品国产专区不卡 | 国产你懂的 | 91正在播放极品白嫩在线观看 | 一区二区三区在线免费视频 | 亚洲精品综合欧美一区二区三区 | 亚洲综合日韩精品欧美综合区 | 91久久亚洲最新一本 | 日韩中文字幕不卡 | 99久久精品国产一区二区三区 | 亚洲精品久久久久中文字小说 | 伊人久久亚洲综合 | 国产精品久久久久久久免费大片 | 亚洲综合色站 | 久久免费视频6 | 色偷偷要色偷偷网站视频在线 | 国产精品久久亚洲一区二区 | 九九色视频在线观看 | 欧美久久一区二区三区 | 欧美日韩成人高清色视频 | 中文字幕在线观看国产 | 久草视频福利在线观看 | 国产乳摇福利视频在线观看 | 国产专区在线播放 | 欧美综合一区 | 欧美日韩一二三区 | 亚洲人妖女同在线播放 | 日韩在线天堂 | 日本在线播放一区 | 羞羞的视频在线观看 | 日日夜人人澡人人澡人人看免 | 亚洲高清一区二区三区四区 | 一级毛片真人不卡免费播 |