讀流程
客戶端接入后,下面一步操作就是讀取客戶端傳輸過來的數據,這一節我們就來分析下服務端讀取客戶端數據流程。從前面分析來看,channel
的事件輪詢、事件處理是在NioEventLoop
的run
方法中,從這里我們就很容易找我服務端讀流程的入口方法:processSelectedKeys()
。
(資料圖片)
從processSelectedKeys()
一直追蹤下去,可以看到OP_READ
處理邏輯分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}
可能你會比較奇怪:為什么OP_READ
和OP_ACCEPT
都會走這個分支?
OP_ACCEPT
是NioServerSocketChannel
處理的事件,而OP_READ
是NioSocketChannel
處理的事件,所以,雖然它們都走這個分支,但是channel類型確是不一樣的,即這里的unsafe
類型也不一樣,一個是:NioMessageUnsafe
,另一個是:NioSocketChannelUnsafe
。NioServerSocketChannel
負責監聽客戶端連接,當有客戶端連接進入時,對它來說就是有個讀入消息需要被處理。這里我們是處理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
類型實例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代碼如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申請ByteBuf對象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):將數據讀取到ByteBuf中 //lastBytesRead()將讀取的字節數設置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //觸發pipeline channelRead事件,將讀入數據ByteBuf傳入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判斷是否繼續讀取 allocHandle.readComplete(); //觸發pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
這個方法刨除其它邏輯,關于客戶端數據處理邏輯主要包括3個步驟:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:調用java api
,從channel
中讀取字節數據到ByteBuf
緩存中;pipeline.fireChannelRead(byteBuf)
:觸發pipeline
的channelRead
事件,并將帶有讀入數據的ByteBuf
通過參數傳入;pipeline.fireChannelReadComplete()
:觸發pipeline
的channelReadComplete
事件;事件傳播
調用pipeline
的fireChannelRead()
就可觸發channelRead
事件在handler
之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來回調用容易繞暈,下面通過圖可以更加直觀的看出調用流程,再配合代碼就很好理解了。
關鍵點就在于HandlerContext
中提供了一個靜態方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一個是在哪個handler
上觸發事件,第二個參數就是數據本身,通過這個方法就可以指定在哪個handler
上觸發channelRead
事件。由于pipeline
中的handler
是被包裝成HandlerContext
放入的,所以,可以通過handler()
方法找到真正的handler
對象進行觸發。
比如pipeline
的fireChannelRead()
就是觸發head
的channelRead
事件,如果處理完成需要把事件繼續傳播給下一個handler
,就需要調用ctx.fireChannelRead(msg)
方法即可,該方法中通過next
屬性獲取到下一個節點,然后執行static invokeChannelRead(next, msg)
這個方法就可以將事件傳播到下一個節點上。
pipeline.fireChannelRead(byteBuf)
運行完成后會調用pipeline.fireChannelReadComplete()
方法,觸發channelReadComplete
事件,執行機制和channelRead
事件一樣,就不再贅述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之間的區別了,避免誤用。
Pipeline線程模型
上面分析的都是常規模式,沒有給handler
指定額外線程情況下channelRead
和channelReadComplete
傳播機制,大致如下圖:
先觸發channelRead
事件,按照pipeline
中順序依次觸發,當所有handler
都觸發完后,再觸發channelReadComplete
事件,按照pipeline
中的順序依次觸發。這些所有流程采用的都是同步方式,在同一個線程中執行,這個線程就是channel
注冊的NioEventLoop
。
我們來看下static void invokeChannelRead()
這個方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
在執行next.invokeChannelRead(m)
方法前有個executor.inEventLoop()
判斷,判斷當前執行線程是不是就是handler
執行所需的線程。執行handler
方法是不能隨便線程都可以去執行的,必須使用handler
內部指定的executor
線程執行器中執行才行。如下圖,也就是說紅色框框中的內容必須在executor
線程執行器中執行,如果當前線程和handler
執行線程不是同一個,就需要進行線程切換:則調用封裝成一個任務,提交到executor
的任務隊列中讓其執行。
executor
線程執行器是通過next.executor()
方法獲取到的,從這個方法源碼中可以看到獲取邏輯:如果HandlerContext
中executor
有值則直接返回;否則返回channel
注冊的NioEventLoop
作為線程執行器。
在添加handler
時可以指定一個EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,這樣,再把handler
包裝成HandlerContext
過程中會從這個EventGroup
根據chooser
選取策略獲得一個EventLoop
賦值給executor
。
所以,從上面分析,默認情況下handler
都是在channel
注冊的NioEventLoop
線程中執行的,除非在addLast
添加handloer
時特別指定。
下面我們通過一個案例分析下pipeline
線程模型,如下,給handler02
添加一個額外的線程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}
這時,channelRead
和channelReadComplete
事件觸發流程見下圖:
channelRead
事件執行流程說明:
handler01
的channelRead
事件,本身當前線程和handler01
是同一個線程,所以,直接調用handler#channelRead()
方法;handler01#channelRead()
方法執行完成后,事件繼續向下傳播,需要調用handler02#channelRead()
方法,但是handler02
執行線程并不是默認的channel
的注冊線程,而是額外設置的biz
線程,需要將調用包裝成一個任務提交到biz
線程的任務隊列taskQueue
中,然后直接返回;biz線程執行器內部線程會一直循環從taskQueue
中獲取任務執行,這樣就完成了線程切換效果;當handler02#channelRead()
方法執行完成后,需要執行handler03#channelRead()
,它們又不在同一個線程中執行,這時有需要切換線程,所以會把handler03#channelRead()
的調用封裝成一個任務提交到register eventLoop的taskQueue
中,待其內部線程提取執行;下面再來看下channelReadComplete
事件執行流程:
a1
將任務提交給taskQueue
任務隊列后直接返回了,而不是等其執行完成再返回;a1
返回后,從源碼分析來看,會立即觸發channelReadComplete
事件,涉及到線程切換,同理b1
這里也是將handler02#channelReadComplete()
調用封裝成任務放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;這樣,biz eventLoop
線程執行器taskQueue
中就有兩個任務,會按照順序依次執行:先執行channelRead()
調用,再執行channelReadComplete()
調用;執行a3、b3
時同理;總結
從上面可以看出,Pipeline
中handler
可以在不同線程間切換得到關鍵是:taskQueue
。還要一點非常重要:handler
線程池執行器默認使用的channel
注冊的NioEventLoop
這個,NioEventLoop
采用的是單線程工作模式,同時還需要處理selector.select()
事件輪詢,所以,handler
里肯定不能有耗時、特別是IO
阻塞等操作,不然卡在handler
中,selector#select()
執行不到,無法及時接收到客戶端傳送過來的數據。
關鍵詞:
責任編輯:Rex_29