讀流程
客戶端接入后,下面一步操作就是讀取客戶端傳輸過來(lái)的數(shù)據(jù),這一節(jié)我們就來(lái)分析下服務(wù)端讀取客戶端數(shù)據(jù)流程。從前面分析來(lái)看,channel
的事件輪詢、事件處理是在NioEventLoop
的run
方法中,從這里我們就很容易找我服務(wù)端讀流程的入口方法:processSelectedKeys()
。
(資料圖片)
從processSelectedKeys()
一直追蹤下去,可以看到OP_READ
處理邏輯分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你會(huì)比較奇怪:為什么OP_READ
和OP_ACCEPT
都會(huì)走這個(gè)分支?
OP_ACCEPT
是NioServerSocketChannel
處理的事件,而OP_READ
是NioSocketChannel
處理的事件,所以,雖然它們都走這個(gè)分支,但是channel類型確是不一樣的,即這里的unsafe
類型也不一樣,一個(gè)是:NioMessageUnsafe
,另一個(gè)是:NioSocketChannelUnsafe
。NioServerSocketChannel
負(fù)責(zé)監(jiān)聽客戶端連接,當(dāng)有客戶端連接進(jìn)入時(shí),對(duì)它來(lái)說就是有個(gè)讀入消息需要被處理。這里我們是處理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
類型實(shí)例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代碼如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申請(qǐng)ByteBuf對(duì)象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):將數(shù)據(jù)讀取到ByteBuf中 //lastBytesRead()將讀取的字節(jié)數(shù)設(shè)置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //觸發(fā)pipeline channelRead事件,將讀入數(shù)據(jù)ByteBuf傳入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判斷是否繼續(xù)讀取 allocHandle.readComplete(); //觸發(fā)pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
這個(gè)方法刨除其它邏輯,關(guān)于客戶端數(shù)據(jù)處理邏輯主要包括3個(gè)步驟:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:調(diào)用java api
,從channel
中讀取字節(jié)數(shù)據(jù)到ByteBuf
緩存中;pipeline.fireChannelRead(byteBuf)
:觸發(fā)pipeline
的channelRead
事件,并將帶有讀入數(shù)據(jù)的ByteBuf
通過參數(shù)傳入;pipeline.fireChannelReadComplete()
:觸發(fā)pipeline
的channelReadComplete
事件;事件傳播
調(diào)用pipeline
的fireChannelRead()
就可觸發(fā)channelRead
事件在handler
之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來(lái)回調(diào)用容易繞暈,下面通過圖可以更加直觀的看出調(diào)用流程,再配合代碼就很好理解了。
關(guān)鍵點(diǎn)就在于HandlerContext
中提供了一個(gè)靜態(tài)方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一個(gè)是在哪個(gè)handler
上觸發(fā)事件,第二個(gè)參數(shù)就是數(shù)據(jù)本身,通過這個(gè)方法就可以指定在哪個(gè)handler
上觸發(fā)channelRead
事件。由于pipeline
中的handler
是被包裝成HandlerContext
放入的,所以,可以通過handler()
方法找到真正的handler
對(duì)象進(jìn)行觸發(fā)。
比如pipeline
的fireChannelRead()
就是觸發(fā)head
的channelRead
事件,如果處理完成需要把事件繼續(xù)傳播給下一個(gè)handler
,就需要調(diào)用ctx.fireChannelRead(msg)
方法即可,該方法中通過next
屬性獲取到下一個(gè)節(jié)點(diǎn),然后執(zhí)行static invokeChannelRead(next, msg)
這個(gè)方法就可以將事件傳播到下一個(gè)節(jié)點(diǎn)上。
pipeline.fireChannelRead(byteBuf)
運(yùn)行完成后會(huì)調(diào)用pipeline.fireChannelReadComplete()
方法,觸發(fā)channelReadComplete
事件,執(zhí)行機(jī)制和channelRead
事件一樣,就不再贅述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之間的區(qū)別了,避免誤用。
Pipeline線程模型
上面分析的都是常規(guī)模式,沒有給handler
指定額外線程情況下channelRead
和channelReadComplete
傳播機(jī)制,大致如下圖:
先觸發(fā)channelRead
事件,按照pipeline
中順序依次觸發(fā),當(dāng)所有handler
都觸發(fā)完后,再觸發(fā)channelReadComplete
事件,按照pipeline
中的順序依次觸發(fā)。這些所有流程采用的都是同步方式,在同一個(gè)線程中執(zhí)行,這個(gè)線程就是channel
注冊(cè)的NioEventLoop
。
我們來(lái)看下static void invokeChannelRead()
這個(gè)方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在執(zhí)行next.invokeChannelRead(m)
方法前有個(gè)executor.inEventLoop()
判斷,判斷當(dāng)前執(zhí)行線程是不是就是handler
執(zhí)行所需的線程。執(zhí)行handler
方法是不能隨便線程都可以去執(zhí)行的,必須使用handler
內(nèi)部指定的executor
線程執(zhí)行器中執(zhí)行才行。如下圖,也就是說紅色框框中的內(nèi)容必須在executor
線程執(zhí)行器中執(zhí)行,如果當(dāng)前線程和handler
執(zhí)行線程不是同一個(gè),就需要進(jìn)行線程切換:則調(diào)用封裝成一個(gè)任務(wù),提交到executor
的任務(wù)隊(duì)列中讓其執(zhí)行。
executor
線程執(zhí)行器是通過next.executor()
方法獲取到的,從這個(gè)方法源碼中可以看到獲取邏輯:如果HandlerContext
中executor
有值則直接返回;否則返回channel
注冊(cè)的NioEventLoop
作為線程執(zhí)行器。
在添加handler
時(shí)可以指定一個(gè)EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,這樣,再把handler
包裝成HandlerContext
過程中會(huì)從這個(gè)EventGroup
根據(jù)chooser
選取策略獲得一個(gè)EventLoop
賦值給executor
。
所以,從上面分析,默認(rèn)情況下handler
都是在channel
注冊(cè)的NioEventLoop
線程中執(zhí)行的,除非在addLast
添加handloer
時(shí)特別指定。
下面我們通過一個(gè)案例分析下pipeline
線程模型,如下,給handler02
添加一個(gè)額外的線程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
這時(shí),channelRead
和channelReadComplete
事件觸發(fā)流程見下圖:
channelRead
事件執(zhí)行流程說明:
handler01
的channelRead
事件,本身當(dāng)前線程和handler01
是同一個(gè)線程,所以,直接調(diào)用handler#channelRead()
方法;handler01#channelRead()
方法執(zhí)行完成后,事件繼續(xù)向下傳播,需要調(diào)用handler02#channelRead()
方法,但是handler02
執(zhí)行線程并不是默認(rèn)的channel
的注冊(cè)線程,而是額外設(shè)置的biz
線程,需要將調(diào)用包裝成一個(gè)任務(wù)提交到biz
線程的任務(wù)隊(duì)列taskQueue
中,然后直接返回;biz線程執(zhí)行器內(nèi)部線程會(huì)一直循環(huán)從taskQueue
中獲取任務(wù)執(zhí)行,這樣就完成了線程切換效果;當(dāng)handler02#channelRead()
方法執(zhí)行完成后,需要執(zhí)行handler03#channelRead()
,它們又不在同一個(gè)線程中執(zhí)行,這時(shí)有需要切換線程,所以會(huì)把handler03#channelRead()
的調(diào)用封裝成一個(gè)任務(wù)提交到register eventLoop的taskQueue
中,待其內(nèi)部線程提取執(zhí)行;下面再來(lái)看下channelReadComplete
事件執(zhí)行流程:
a1
將任務(wù)提交給taskQueue
任務(wù)隊(duì)列后直接返回了,而不是等其執(zhí)行完成再返回;a1
返回后,從源碼分析來(lái)看,會(huì)立即觸發(fā)channelReadComplete
事件,涉及到線程切換,同理b1
這里也是將handler02#channelReadComplete()
調(diào)用封裝成任務(wù)放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;這樣,biz eventLoop
線程執(zhí)行器taskQueue
中就有兩個(gè)任務(wù),會(huì)按照順序依次執(zhí)行:先執(zhí)行channelRead()
調(diào)用,再執(zhí)行channelReadComplete()
調(diào)用;執(zhí)行a3、b3
時(shí)同理;總結(jié)
從上面可以看出,Pipeline
中handler
可以在不同線程間切換得到關(guān)鍵是:taskQueue
。還要一點(diǎn)非常重要:handler
線程池執(zhí)行器默認(rèn)使用的channel
注冊(cè)的NioEventLoop
這個(gè),NioEventLoop
采用的是單線程工作模式,同時(shí)還需要處理selector.select()
事件輪詢,所以,handler
里肯定不能有耗時(shí)、特別是IO
阻塞等操作,不然卡在handler
中,selector#select()
執(zhí)行不到,無(wú)法及時(shí)接收到客戶端傳送過來(lái)的數(shù)據(jù)。
關(guān)鍵詞:
責(zé)任編輯:Rex_29