emit(Listtuple,ObjectmessageId){returnemit(Utils.DEFAULT_STREAM_ID,tuple,messageId);}這里的t" />

亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

理解Storm可靠性消息

系統(tǒng) 2390 0

看過一些別人寫的, 感覺有些東西沒太說清楚,個(gè)人主要以源代碼跟蹤,參考個(gè)人理解講述,有錯(cuò)誤請指正。

1基本名詞

1.1 Tuple: 消息傳遞的基本單位。很多文章中介紹都是這么說的, 個(gè)人覺得應(yīng)該更詳細(xì)一點(diǎn)。

?在spout發(fā)送的時(shí)候,函數(shù)原型

?public List<Integer> emit(List<Object> tuple, Object messageId) {
??????? return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
??? }

這里的tuple, 實(shí)際上是List<Object> 對象,返回的是 List<Integer> 是要發(fā)送的tast的IdsList

在bolt接收的時(shí)候, 函數(shù)原型

public void execute(Tuple tuple)

變成了一個(gè)Tuple對象,? 結(jié)構(gòu)應(yīng)該也是一個(gè)list, List<Field1, value1, Field2, value2..>這樣的一個(gè)結(jié)構(gòu), FieldList ValueList, 我們根據(jù)對應(yīng)的fieldname就可以取出對應(yīng)的getIntegerByField方法

回到spout對象中來, 在spout有一個(gè)定義的輸出字段

??? public void declareOutputFields(OutputFieldsDeclarer declarer) {
?? ??? ?declarer.declare(new Fields("word"));
?? ?}

這里定義的一個(gè)字段,所以我們在emit的時(shí)候就只能發(fā)送一個(gè)包含一個(gè)value的tuple(spout部分), storm會(huì)將field, 和 發(fā)送的value下標(biāo)對應(yīng), 變成一個(gè)Tuple對象,? 也就是上面說的

List<Field1, value1, Field2, value2..>這樣的一個(gè)結(jié)構(gòu),? 在bolt 之間傳遞tuple, 發(fā)送又是List<Object> tuple, 根據(jù)組裝bolt定義的fiels, 再組合成Tuple對象給下一個(gè)Bolt處理

在發(fā)射的最后 還有一個(gè) void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);? 因?yàn)樯厦鎒mit的時(shí)候已經(jīng)返回List<taskid>, 所以它就知道要發(fā)送給哪些taskid處理,然后將taskid 和 tuple放入隊(duì)列 LinkedBlockingQueue , 代碼如下

; worker.clj

( defn mk-transfer- fn [ transfer-queue ]

( fn [ task ^Tuple tuple ]

(.put ^LinkedBlockingQueue

transfer-queue [ task tuple ] )

))
然后單獨(dú)會(huì)開啟一個(gè)叫async-loop的線程,取出每條記錄(taskid, tuple), 然后worker會(huì)從當(dāng)前task建立一個(gè)到目標(biāo)task的zeromq連接, 通過zeromq將tuple發(fā)送給目標(biāo)task
?

總結(jié): 每次emit都是根據(jù)List<Object>和定義的輸出Fields組合成一個(gè)Tuple對象,,每個(gè)接受對象接收的是Tuple對象,如果處理完再發(fā)送又再組合字段, 在emit的時(shí)候返回LIst<taskids>,所以就知道發(fā)送給哪些Task, 然后拿這些taskid和tuple再組合成一個(gè)任務(wù)隊(duì)列,通過zeromq發(fā)送到目標(biāo)task,目標(biāo)task接收到tuple進(jìn)程處理至于并發(fā)度控制, 參考

      http://www.cnblogs.com/chengxin1982/p/4001275.html


    

?

TupleID Tuple對應(yīng)的ID,? 在創(chuàng)建的時(shí)候賦予一個(gè)64位的id,主要用來跟蹤消息

MsgID? 官方解釋 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不會(huì)跟蹤。主要作用 , 在spout收到fail時(shí)候, 能夠定位到是哪條消息出錯(cuò),能夠決定重發(fā). 使用實(shí)例? _collector.emit(new Values(sentence),? new Integer(num));

acker 消息跟蹤者. acker 存儲(chǔ)一個(gè)Map<taskid, ack val> ,? taskid為祖宗tuple創(chuàng)建者的taskid ,? ack_val 為消息傳遞過程中的 tupleid的xor值,如果為0則知道是哪個(gè)spout或者bolt已經(jīng)處理完了, 為什么會(huì)有bolt, 因?yàn)閎olt在發(fā)射的時(shí)候,如果非錨定,就是不帶tuple發(fā)射,它會(huì)被認(rèn)為是祖宗tuple, 上一個(gè)tuple會(huì)認(rèn)為已經(jīng)結(jié)束.
至于分配發(fā)射源分配到acker, storm采用一致性hash 祖宗tupleid來分配,因?yàn)樵谒械膖uple中都能知道祖宗tupleid,所以在子孫tuple處理時(shí), 知道該發(fā)送給哪個(gè)acker跟蹤

?

理解Storm可靠性消息


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 久久天天躁狠狠躁夜夜不卡 | 一级特黄特交牲大片 | 国产看片网站 | 欧美综合区| 国产精品资源站 | 久久精品免费全国观看国产 | 99福利视频| 青青热久久国产久精品 | 热久久国产精品 | 国产福利在线免费观看 | 2021精品综合久久久久 | 亚洲精品乱码一区二区在线观看 | 亚洲精品乱码久久久久久中文字幕 | 性xxx欧美 | 免费播放欧美毛片欧美a | 欧美日韩国产一区二区三区播放 | 欧美高清一区二区三区欧美 | 国内自拍 在线播放 网红 | 国产福利在线观看 | 四虎在线最新永久免费 | 国产精品视频第一区二区 | 国产在线成人a | 亚洲图片一区 | 成人日韩在线 | 久久久精品久久久久特色影视 | 狠狠色噜噜狠狠狠狠97 | 日本特级视频 | 中文字幕视频在线观看 | 玖玖精品视频在线 | 毛片大全 | 亚洲人体视频 | 天天操天天操天天操天天操 | 日本高清在线精品一区二区三区 | 四虎精品国产一区二区三区 | 免费在线观看黄色毛片 | 国产免费久久精品丫丫 | 香蕉尹人综合精品 | 国产美女做爰免费视 | 日本特黄在线观看免费 | 加勒比一本大道在线 | 国产精品1区2区3区在线播放 |