• <acronym id="qmqcg"><cite id="qmqcg"></cite></acronym>
    <td id="qmqcg"><em id="qmqcg"></em></td>
    • 首頁(yè) > 生活 >

      【Netty源碼分析】04 服務(wù)端讀流程

      讀流程

      客戶端接入后,下面一步操作就是讀取客戶端傳輸過來(lái)的數(shù)據(jù),這一節(jié)我們就來(lái)分析下服務(wù)端讀取客戶端數(shù)據(jù)流程。從前面分析來(lái)看,channel的事件輪詢、事件處理是在NioEventLooprun方法中,從這里我們就很容易找我服務(wù)端讀流程的入口方法:processSelectedKeys()


      (資料圖片)

      processSelectedKeys()一直追蹤下去,可以看到OP_READ處理邏輯分支:

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

      可能你會(huì)比較奇怪:為什么OP_READOP_ACCEPT都會(huì)走這個(gè)分支?

      OP_ACCEPTNioServerSocketChannel處理的事件,而OP_READNioSocketChannel處理的事件,所以,雖然它們都走這個(gè)分支,但是channel類型確是不一樣的,即這里的unsafe類型也不一樣,一個(gè)是:NioMessageUnsafe,另一個(gè)是:NioSocketChannelUnsafeNioServerSocketChannel負(fù)責(zé)監(jiān)聽客戶端連接,當(dāng)有客戶端連接進(jìn)入時(shí),對(duì)它來(lái)說就是有個(gè)讀入消息需要被處理。

      這里我們是處理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe類型實(shí)例。

      AbstractNioByteChannel.NioByteUnsafe#read方法代碼如下:

      public final void read() { final ChannelConfig config = config();    if (shouldBreakReadReady(config)) {     clearReadPending();         return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();    allocHandle.reset(config);    ByteBuf byteBuf = null;    boolean close = false;    try {        do {            // 申請(qǐng)ByteBuf對(duì)象            byteBuf = allocHandle.allocate(allocator);            //doReadBytes(byteBuf):將數(shù)據(jù)讀取到ByteBuf中            //lastBytesRead()將讀取的字節(jié)數(shù)設(shè)置到lastBytesRead            allocHandle.lastBytesRead(doReadBytes(byteBuf));            if (allocHandle.lastBytesRead() <= 0) {                byteBuf.release();                byteBuf = null;                close = allocHandle.lastBytesRead() < 0;                if (close) {                    readPending = false;                }                break;            }            allocHandle.incMessagesRead(1);            readPending = false;            //觸發(fā)pipeline channelRead事件,將讀入數(shù)據(jù)ByteBuf傳入到handler中            pipeline.fireChannelRead(byteBuf);            byteBuf = null;        } while (allocHandle.continueReading());//判斷是否繼續(xù)讀取          allocHandle.readComplete();        //觸發(fā)pipeline channelReadComplete        pipeline.fireChannelReadComplete();        if (close) {            closeOnRead(pipeline);        }    } catch (Throwable t) {        handleReadException(pipeline, byteBuf, t, close, allocHandle);    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

      這個(gè)方法刨除其它邏輯,關(guān)于客戶端數(shù)據(jù)處理邏輯主要包括3個(gè)步驟:

      allocHandle.lastBytesRead(doReadBytes(byteBuf)):調(diào)用java api,從channel中讀取字節(jié)數(shù)據(jù)到ByteBuf緩存中;pipeline.fireChannelRead(byteBuf):觸發(fā)pipelinechannelRead事件,并將帶有讀入數(shù)據(jù)的ByteBuf通過參數(shù)傳入;pipeline.fireChannelReadComplete():觸發(fā)pipelinechannelReadComplete事件;

      事件傳播

      調(diào)用pipelinefireChannelRead()就可觸發(fā)channelRead事件在handler之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來(lái)回調(diào)用容易繞暈,下面通過圖可以更加直觀的看出調(diào)用流程,再配合代碼就很好理解了。

      關(guān)鍵點(diǎn)就在于HandlerContext中提供了一個(gè)靜態(tài)方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一個(gè)是在哪個(gè)handler上觸發(fā)事件,第二個(gè)參數(shù)就是數(shù)據(jù)本身,通過這個(gè)方法就可以指定在哪個(gè)handler上觸發(fā)channelRead事件。由于pipeline中的handler是被包裝成HandlerContext放入的,所以,可以通過handler()方法找到真正的handler對(duì)象進(jìn)行觸發(fā)。

      比如pipelinefireChannelRead()就是觸發(fā)headchannelRead事件,如果處理完成需要把事件繼續(xù)傳播給下一個(gè)handler,就需要調(diào)用ctx.fireChannelRead(msg)方法即可,該方法中通過next屬性獲取到下一個(gè)節(jié)點(diǎn),然后執(zhí)行static invokeChannelRead(next, msg)這個(gè)方法就可以將事件傳播到下一個(gè)節(jié)點(diǎn)上。

      pipeline.fireChannelRead(byteBuf)運(yùn)行完成后會(huì)調(diào)用pipeline.fireChannelReadComplete()方法,觸發(fā)channelReadComplete事件,執(zhí)行機(jī)制和channelRead事件一樣,就不再贅述。

      搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之間的區(qū)別了,避免誤用。

      Pipeline線程模型

      上面分析的都是常規(guī)模式,沒有給handler指定額外線程情況下channelReadchannelReadComplete傳播機(jī)制,大致如下圖:

      先觸發(fā)channelRead事件,按照pipeline中順序依次觸發(fā),當(dāng)所有handler都觸發(fā)完后,再觸發(fā)channelReadComplete事件,按照pipeline中的順序依次觸發(fā)。這些所有流程采用的都是同步方式,在同一個(gè)線程中執(zhí)行,這個(gè)線程就是channel注冊(cè)的NioEventLoop

      我們來(lái)看下static void invokeChannelRead()這個(gè)方法:

      static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

      在執(zhí)行next.invokeChannelRead(m)方法前有個(gè)executor.inEventLoop()判斷,判斷當(dāng)前執(zhí)行線程是不是就是handler執(zhí)行所需的線程。執(zhí)行handler方法是不能隨便線程都可以去執(zhí)行的,必須使用handler內(nèi)部指定的executor線程執(zhí)行器中執(zhí)行才行。如下圖,也就是說紅色框框中的內(nèi)容必須在executor線程執(zhí)行器中執(zhí)行,如果當(dāng)前線程和handler執(zhí)行線程不是同一個(gè),就需要進(jìn)行線程切換:則調(diào)用封裝成一個(gè)任務(wù),提交到executor的任務(wù)隊(duì)列中讓其執(zhí)行。

      executor線程執(zhí)行器是通過next.executor()方法獲取到的,從這個(gè)方法源碼中可以看到獲取邏輯:如果HandlerContextexecutor有值則直接返回;否則返回channel注冊(cè)的NioEventLoop作為線程執(zhí)行器。

      在添加handler時(shí)可以指定一個(gè)EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,這樣,再把handler包裝成HandlerContext過程中會(huì)從這個(gè)EventGroup根據(jù)chooser選取策略獲得一個(gè)EventLoop賦值給executor

      所以,從上面分析,默認(rèn)情況下handler都是在channel注冊(cè)的NioEventLoop線程中執(zhí)行的,除非在addLast添加handloer時(shí)特別指定。

      下面我們通過一個(gè)案例分析下pipeline線程模型,如下,給handler02添加一個(gè)額外的線程池:

      EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast( "handler01", new OtherTest01());    pipeline.addLast( bizGroup, "handler02", new OtherTest02());    pipeline.addLast( "handler03", new OtherTest03());}

      這時(shí),channelReadchannelReadComplete事件觸發(fā)流程見下圖:

      channelRead事件執(zhí)行流程說明:

      上下兩部分代表兩個(gè)線程,上面是channel注冊(cè)的eventLoop,下面是添加handler02指定的eventLoop;首先觸發(fā)handler01channelRead事件,本身當(dāng)前線程和handler01是同一個(gè)線程,所以,直接調(diào)用handler#channelRead()方法;handler01#channelRead()方法執(zhí)行完成后,事件繼續(xù)向下傳播,需要調(diào)用handler02#channelRead()方法,但是handler02執(zhí)行線程并不是默認(rèn)的channel的注冊(cè)線程,而是額外設(shè)置的biz線程,需要將調(diào)用包裝成一個(gè)任務(wù)提交到biz線程的任務(wù)隊(duì)列taskQueue中,然后直接返回;biz線程執(zhí)行器內(nèi)部線程會(huì)一直循環(huán)從taskQueue中獲取任務(wù)執(zhí)行,這樣就完成了線程切換效果;當(dāng)handler02#channelRead()方法執(zhí)行完成后,需要執(zhí)行handler03#channelRead(),它們又不在同一個(gè)線程中執(zhí)行,這時(shí)有需要切換線程,所以會(huì)把handler03#channelRead()的調(diào)用封裝成一個(gè)任務(wù)提交到register eventLoop的taskQueue中,待其內(nèi)部線程提取執(zhí)行;

      下面再來(lái)看下channelReadComplete事件執(zhí)行流程:

      上圖a1將任務(wù)提交給taskQueue任務(wù)隊(duì)列后直接返回了,而不是等其執(zhí)行完成再返回;a1返回后,從源碼分析來(lái)看,會(huì)立即觸發(fā)channelReadComplete事件,涉及到線程切換,同理b1這里也是將handler02#channelReadComplete()調(diào)用封裝成任務(wù)放入到biz eventLooptaskQueue中的,然后也直接返回了;這樣,biz eventLoop線程執(zhí)行器taskQueue中就有兩個(gè)任務(wù),會(huì)按照順序依次執(zhí)行:先執(zhí)行channelRead()調(diào)用,再執(zhí)行channelReadComplete()調(diào)用;執(zhí)行a3、b3時(shí)同理;

      總結(jié)

      從上面可以看出,Pipelinehandler可以在不同線程間切換得到關(guān)鍵是:taskQueue。還要一點(diǎn)非常重要:handler線程池執(zhí)行器默認(rèn)使用的channel注冊(cè)的NioEventLoop這個(gè),NioEventLoop采用的是單線程工作模式,同時(shí)還需要處理selector.select()事件輪詢,所以,handler里肯定不能有耗時(shí)、特別是IO阻塞等操作,不然卡在handler中,selector#select()執(zhí)行不到,無(wú)法及時(shí)接收到客戶端傳送過來(lái)的數(shù)據(jù)。

      關(guān)鍵詞:

      責(zé)任編輯:Rex_29

      推薦閱讀
      亚洲线精品久久一区二区三区,成人看片在线观看,草草视频手机在线观看视频,亚洲六月丁香色婷婷综合久久
    • <acronym id="qmqcg"><cite id="qmqcg"></cite></acronym>
      <td id="qmqcg"><em id="qmqcg"></em></td>
      • 主站蜘蛛池模板: 精品乱子伦一区二区三区| 亚洲国产精品久久丫| 一本色道久久88加勒比—综合| 色与欲影视天天看综合网| 日本在线视频www色| 国产美女极度色诱视频www| 喷出巨量精子系列在线观看| 久久99国产精品成人欧美| 超碰97人人做人人爱少妇| 欧美日韩国产在线观看一区二区三区 | 无遮挡很污很爽很黄的网站 | 国产99视频精品免视看7| 久久91精品国产91久久户| 老熟女高潮一区二区三区| 成人精品国产亚洲欧洲| 午夜dj在线观看免费高清在线| 一级做a爰片久久毛片| 男女抽搐一进一出无遮挡| 天堂网在线观看| 亚洲欧美综合国产精品一区| 88av在线播放| 欧洲卡一卡二卡在线| 国产强伦姧在线观看| 久久久久久久国产精品电影 | 欧美xxxx做受欧美| 国产日韩AV免费无码一区二区| 久久天天躁狠狠躁夜夜免费观看| 高清国产一级精品毛片基地| 无码综合天天久久综合网| 午夜理论影院第九电影院| Aⅴ精品无码无卡在线观看| 欧美激情一级二级三级在线视频 | 偷炮少妇宾馆半推半就激情| 99re热在线观看| 欧美亚洲另类综合| 国产人妖乱国产精品人妖| 三级理论中文字幕在线播放| 狠狠色欧美亚洲综合色黑a| 国产精品无圣光一区二区| 久久精品国产亚洲AV麻豆王友容 | 婷婷丁香五月中文字幕|