我們接下來就看和業(yè)務(wù)息息相關(guān)的解碼器,首先我們來看FrameDecoder,這個東西應(yīng)該是所有的解碼器都會實現(xiàn)這個,所以我們來重點看一下。
? ? ? ??FrameDecoder產(chǎn)生的根源就是TCP/IP數(shù)據(jù)包的傳輸方式?jīng)Q定的,包在傳輸?shù)倪^程中會分片和重組,
正如javadoc里面所說的:
? ? 客戶端在發(fā)送的時候的序列如下:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
服務(wù)器端在接受到后可能會變成下面的序列:
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+
FrameDecoder幫助我們將接受到的數(shù)據(jù)包整理成有意義的數(shù)據(jù)幀,例如,可以幫助我們將數(shù)據(jù)包整理成
下面的數(shù)據(jù)格式:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
我們接下來就看看FrameDecoder的實現(xiàn)吧:
FrameDecoder是繼承SimpleChannelUpstreamHandler的,我們首先來看一下messageReceived的實現(xiàn):
? ? ? ??
- @Override??
- ??? public? void?messageReceived(??
- ???????????ChannelHandlerContext?ctx,?MessageEvent?e)? throws?Exception?{??
- ??
- ???????Object?m?=?e.getMessage();??
- ??????? if?(!(m? instanceof?ChannelBuffer))?{??
- ???????????ctx.sendUpstream(e);??
- ??????????? return;??
- ???????}??
- ??
- ???????ChannelBuffer?input?=?(ChannelBuffer)?m;??
- ??????? if?(!input.readable())?{??
- ??????????? return;??
- ???????}??
- ??
- ???????ChannelBuffer?cumulation?=?cumulation(ctx);??
- ??????? if?(cumulation.readable())?{??
- ???????????cumulation.discardReadBytes();??
- ???????????cumulation.writeBytes(input);??
- ???????????callDecode(ctx,?e.getChannel(),?cumulation,?e.getRemoteAddress());??
- ???????}? else?{??
- ???????????callDecode(ctx,?e.getChannel(),?input,?e.getRemoteAddress());??
- ??????????? if?(input.readable())?{??
- ???????????????cumulation.writeBytes(input);??
- ???????????}??
- ???????}??
- ???}??
? ? ? ??這個里面首先會檢查input的可讀性,這個比較好理解,關(guān)鍵是cumulation,
我們首先來看一下cumulation的實現(xiàn)吧:
- private?ChannelBuffer?cumulation(ChannelHandlerContext?ctx)?{??
- ????????ChannelBuffer?c?=?cumulation;??
- ???????? if?(c?==? null)?{??
- ????????????c?=?ChannelBuffers.dynamicBuffer(??
- ????????????????????ctx.getChannel().getConfig().getBufferFactory());??
- ????????????cumulation?=?c;??
- ????????}??
- ???????? return?c;??
- ????}??
? ? ? ??這個函數(shù)很簡單,就是如果cumulation為空的時候初始化一下,如果不為空,就返回。我們得思考一下什么時候cumulation為空,什么時候不為空。我們再回過頭來看一下上面的實現(xiàn)吧。如果cumulation可讀,cumulation.discardReadBytes函數(shù)的作用是將0到readIndex之間的空間釋放掉,將readIndex和writeIndex都重新標(biāo)記一下。然后將讀到的數(shù)據(jù)寫到buffer里面。如果cumulation不可讀,在調(diào)callDecode,如果發(fā)現(xiàn)從不可讀狀態(tài)到可讀狀態(tài),則將讀到的數(shù)據(jù)寫到緩存區(qū)里面。
? ? ? ??我們再來看callDecode的實現(xiàn):
- private? void?callDecode(??
- ????????????ChannelHandlerContext?context,?Channel?channel,??
- ????????????ChannelBuffer?cumulation,?SocketAddress?remoteAddress)? throws?Exception?{??
- ??
- ???????? while?(cumulation.readable())?{??
- ???????????? int?oldReaderIndex?=?cumulation.readerIndex();??
- ????????????Object?frame?=?decode(context,?channel,?cumulation);??
- ???????????? if?(frame?==? null)?{??
- ???????????????? if?(oldReaderIndex?==?cumulation.readerIndex())?{??
- ???????????????????? //?Seems?like?more?data?is?required.??
- ???????????????????? //?Let?us?wait?for?the?next?notification.??
- ???????????????????? break;??
- ????????????????}? else?{??
- ???????????????????? //?Previous?data?has?been?discarded.??
- ???????????????????? //?Probably?it?is?reading?on.??
- ???????????????????? continue;??
- ????????????????}??
- ????????????}? else? if?(oldReaderIndex?==?cumulation.readerIndex())?{??
- ???????????????? throw? new?IllegalStateException(??
- ???????????????????????? "decode()?method?must?read?at?least?one?byte?"?+??
- ???????????????????????? "if?it?returned?a?frame?(caused?by:?"?+?getClass()?+? ")");??
- ????????????}??
- ??
- ????????????unfoldAndFireMessageReceived(context,?remoteAddress,?frame);??
- ????????}??
- ??
- ???????? if?(!cumulation.readable())?{??
- ?????????? this.cumulation?=? null;??
- ????????}??
- ????}??
- ??????
?
?
? ? ? ?這個里面上面是一個循環(huán),首先將讀指針備份一下,decode方法是交個子類實現(xiàn)的一個抽象方這個用來實現(xiàn)具體數(shù)據(jù)分幀的算法,從這個里面看到如果子類沒有讀到一幀數(shù)據(jù),則返回null所以下面有一個判斷,是一點數(shù)據(jù)沒有讀呢,還是讀了一點,如果一點都沒有讀,就不需要再檢測了等下一次messageRecieved進(jìn)行通知,如果發(fā)現(xiàn)讀了一點數(shù)據(jù),就調(diào)用下一次分幀。如果讀了一幀數(shù)據(jù)就發(fā)送一個通知,unfold是針對讀到的循環(huán)數(shù)據(jù)要不要打開的意思。到最后如果發(fā)現(xiàn)不是可讀狀態(tài),
cumulation將會被設(shè)置成null。
?
最后來看一下cleanup的實現(xiàn)
- private? void?cleanup(ChannelHandlerContext?ctx,?ChannelStateEvent?e)??
- ???????????? throws?Exception?{??
- ???????? try?{??
- ????????????ChannelBuffer?cumulation?=? this.cumulation;??
- ???????????? if?(cumulation?==? null)?{??
- ???????????????? return;??
- ????????????}? else?{??
- ???????????????? this.cumulation?=? null;??
- ????????????}??
- ??
- ???????????? if?(cumulation.readable())?{??
- ???????????????? //?Make?sure?all?frames?are?read?before?notifying?a?closed?channel.??
- ????????????????callDecode(ctx,?ctx.getChannel(),?cumulation,? null);??
- ????????????}??
- ??
- ???????????? //?Call?decodeLast()?finally.??Please?note?that?decodeLast()?is??
- ???????????? //?called?even?if?there's?nothing?more?to?read?from?the?buffer?to??
- ???????????? //?notify?a?user?that?the?connection?was?closed?explicitly.??
- ????????????Object?partialFrame?=?decodeLast(ctx,?ctx.getChannel(),?cumulation);??
- ???????????? if?(partialFrame?!=? null)?{??
- ????????????????unfoldAndFireMessageReceived(ctx,? null,?partialFrame);??
- ????????????}??
- ????????}? finally?{??
- ????????????ctx.sendUpstream(e);??
- ????????}??
- ????}??
? ??在這個里面一般來說是在socket斷開的時候調(diào)用,這個時候如果發(fā)現(xiàn)buffer還是可讀狀態(tài),還會努力的確保所有的數(shù)據(jù)已經(jīng)被分幀,然后調(diào)用decodeLast
?
===========================================================================================
轉(zhuǎn)自http://blog.csdn.net/xiaolang85/article/details/12621663
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
