• <acronym id="qmqcg"><cite id="qmqcg"></cite></acronym>
    <td id="qmqcg"><em id="qmqcg"></em></td>
    • 首頁 > 生活 >

      【Netty源碼分析】04 服務端讀流程

      讀流程

      客戶端接入后,下面一步操作就是讀取客戶端傳輸過來的數據,這一節我們就來分析下服務端讀取客戶端數據流程。從前面分析來看,channel的事件輪詢、事件處理是在NioEventLooprun方法中,從這里我們就很容易找我服務端讀流程的入口方法:processSelectedKeys()


      (資料圖片)

      processSelectedKeys()一直追蹤下去,可以看到OP_READ處理邏輯分支:

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

      可能你會比較奇怪:為什么OP_READOP_ACCEPT都會走這個分支?

      OP_ACCEPTNioServerSocketChannel處理的事件,而OP_READNioSocketChannel處理的事件,所以,雖然它們都走這個分支,但是channel類型確是不一樣的,即這里的unsafe類型也不一樣,一個是:NioMessageUnsafe,另一個是:NioSocketChannelUnsafeNioServerSocketChannel負責監聽客戶端連接,當有客戶端連接進入時,對它來說就是有個讀入消息需要被處理。

      這里我們是處理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe類型實例。

      AbstractNioByteChannel.NioByteUnsafe#read方法代碼如下:

      public final void read() { final ChannelConfig config = config();    if (shouldBreakReadReady(config)) {     clearReadPending();         return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();    allocHandle.reset(config);    ByteBuf byteBuf = null;    boolean close = false;    try {        do {            // 申請ByteBuf對象            byteBuf = allocHandle.allocate(allocator);            //doReadBytes(byteBuf):將數據讀取到ByteBuf中            //lastBytesRead()將讀取的字節數設置到lastBytesRead            allocHandle.lastBytesRead(doReadBytes(byteBuf));            if (allocHandle.lastBytesRead() <= 0) {                byteBuf.release();                byteBuf = null;                close = allocHandle.lastBytesRead() < 0;                if (close) {                    readPending = false;                }                break;            }            allocHandle.incMessagesRead(1);            readPending = false;            //觸發pipeline channelRead事件,將讀入數據ByteBuf傳入到handler中            pipeline.fireChannelRead(byteBuf);            byteBuf = null;        } while (allocHandle.continueReading());//判斷是否繼續讀取          allocHandle.readComplete();        //觸發pipeline channelReadComplete        pipeline.fireChannelReadComplete();        if (close) {            closeOnRead(pipeline);        }    } catch (Throwable t) {        handleReadException(pipeline, byteBuf, t, close, allocHandle);    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

      這個方法刨除其它邏輯,關于客戶端數據處理邏輯主要包括3個步驟:

      allocHandle.lastBytesRead(doReadBytes(byteBuf)):調用java api,從channel中讀取字節數據到ByteBuf緩存中;pipeline.fireChannelRead(byteBuf):觸發pipelinechannelRead事件,并將帶有讀入數據的ByteBuf通過參數傳入;pipeline.fireChannelReadComplete():觸發pipelinechannelReadComplete事件;

      事件傳播

      調用pipelinefireChannelRead()就可觸發channelRead事件在handler之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來回調用容易繞暈,下面通過圖可以更加直觀的看出調用流程,再配合代碼就很好理解了。

      關鍵點就在于HandlerContext中提供了一個靜態方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一個是在哪個handler上觸發事件,第二個參數就是數據本身,通過這個方法就可以指定在哪個handler上觸發channelRead事件。由于pipeline中的handler是被包裝成HandlerContext放入的,所以,可以通過handler()方法找到真正的handler對象進行觸發。

      比如pipelinefireChannelRead()就是觸發headchannelRead事件,如果處理完成需要把事件繼續傳播給下一個handler,就需要調用ctx.fireChannelRead(msg)方法即可,該方法中通過next屬性獲取到下一個節點,然后執行static invokeChannelRead(next, msg)這個方法就可以將事件傳播到下一個節點上。

      pipeline.fireChannelRead(byteBuf)運行完成后會調用pipeline.fireChannelReadComplete()方法,觸發channelReadComplete事件,執行機制和channelRead事件一樣,就不再贅述。

      搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之間的區別了,避免誤用。

      Pipeline線程模型

      上面分析的都是常規模式,沒有給handler指定額外線程情況下channelReadchannelReadComplete傳播機制,大致如下圖:

      先觸發channelRead事件,按照pipeline中順序依次觸發,當所有handler都觸發完后,再觸發channelReadComplete事件,按照pipeline中的順序依次觸發。這些所有流程采用的都是同步方式,在同一個線程中執行,這個線程就是channel注冊的NioEventLoop

      我們來看下static void invokeChannelRead()這個方法:

      static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

      在執行next.invokeChannelRead(m)方法前有個executor.inEventLoop()判斷,判斷當前執行線程是不是就是handler執行所需的線程。執行handler方法是不能隨便線程都可以去執行的,必須使用handler內部指定的executor線程執行器中執行才行。如下圖,也就是說紅色框框中的內容必須在executor線程執行器中執行,如果當前線程和handler執行線程不是同一個,就需要進行線程切換:則調用封裝成一個任務,提交到executor的任務隊列中讓其執行。

      executor線程執行器是通過next.executor()方法獲取到的,從這個方法源碼中可以看到獲取邏輯:如果HandlerContextexecutor有值則直接返回;否則返回channel注冊的NioEventLoop作為線程執行器。

      在添加handler時可以指定一個EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,這樣,再把handler包裝成HandlerContext過程中會從這個EventGroup根據chooser選取策略獲得一個EventLoop賦值給executor

      所以,從上面分析,默認情況下handler都是在channel注冊的NioEventLoop線程中執行的,除非在addLast添加handloer時特別指定。

      下面我們通過一個案例分析下pipeline線程模型,如下,給handler02添加一個額外的線程池:

      EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast( "handler01", new OtherTest01());    pipeline.addLast( bizGroup, "handler02", new OtherTest02());    pipeline.addLast( "handler03", new OtherTest03());}

      這時,channelReadchannelReadComplete事件觸發流程見下圖:

      channelRead事件執行流程說明:

      上下兩部分代表兩個線程,上面是channel注冊的eventLoop,下面是添加handler02指定的eventLoop;首先觸發handler01channelRead事件,本身當前線程和handler01是同一個線程,所以,直接調用handler#channelRead()方法;handler01#channelRead()方法執行完成后,事件繼續向下傳播,需要調用handler02#channelRead()方法,但是handler02執行線程并不是默認的channel的注冊線程,而是額外設置的biz線程,需要將調用包裝成一個任務提交到biz線程的任務隊列taskQueue中,然后直接返回;biz線程執行器內部線程會一直循環從taskQueue中獲取任務執行,這樣就完成了線程切換效果;當handler02#channelRead()方法執行完成后,需要執行handler03#channelRead(),它們又不在同一個線程中執行,這時有需要切換線程,所以會把handler03#channelRead()的調用封裝成一個任務提交到register eventLoop的taskQueue中,待其內部線程提取執行;

      下面再來看下channelReadComplete事件執行流程:

      上圖a1將任務提交給taskQueue任務隊列后直接返回了,而不是等其執行完成再返回;a1返回后,從源碼分析來看,會立即觸發channelReadComplete事件,涉及到線程切換,同理b1這里也是將handler02#channelReadComplete()調用封裝成任務放入到biz eventLooptaskQueue中的,然后也直接返回了;這樣,biz eventLoop線程執行器taskQueue中就有兩個任務,會按照順序依次執行:先執行channelRead()調用,再執行channelReadComplete()調用;執行a3、b3時同理;

      總結

      從上面可以看出,Pipelinehandler可以在不同線程間切換得到關鍵是:taskQueue。還要一點非常重要:handler線程池執行器默認使用的channel注冊的NioEventLoop這個,NioEventLoop采用的是單線程工作模式,同時還需要處理selector.select()事件輪詢,所以,handler里肯定不能有耗時、特別是IO阻塞等操作,不然卡在handler中,selector#select()執行不到,無法及時接收到客戶端傳送過來的數據。

      關鍵詞:

      責任編輯:Rex_29

      推薦閱讀

      當前速看:強國必先強農

      · 2023-03-28 19:34:52

      關于我們 聯系我們 商務合作 誠聘英才 網站地圖

      Copyright @ 2008-2020 www.wnchengjie.com Corporation,All Rights Reserved

      熱訊新聞網 版權所有 備案號:豫ICP備20005723號-6
      文章投訴郵箱:2 9 5 9 1 1 5 7 8@qq.com 違法信息舉報郵箱:jubao@123777.net.cn

      營業執照公示信息

      亚洲线精品久久一区二区三区,成人看片在线观看,草草视频手机在线观看视频,亚洲六月丁香色婷婷综合久久
    • <acronym id="qmqcg"><cite id="qmqcg"></cite></acronym>
      <td id="qmqcg"><em id="qmqcg"></em></td>
      • 主站蜘蛛池模板: 国产精品夜色一区二区三区| 日本日本熟妇中文在线视频| 国产高清av在线播放| 国产精品久久久亚洲| 亚洲欧洲国产经精品香蕉网| 中文字幕在线观看第二页| 青青热久久久久综合精品| 日韩精品一区二区三区在线观看l 日韩精品一区二区三区毛片 | 欧洲高清一区二区三区试看| 国产精品福利尤物youwu| 再深点灬舒服灬太大了添a| 两个人看的WWW在线观看| 精品黑人一区二区三区| 尤物在线观看精品国产福利片| 国产精品区一区二区三在线播放| 亚洲日本在线电影| WWW夜片内射视频在观看视频| 老子影院午夜伦手机在线看| 成人无码av一区二区| 免费一级成人毛片| 99ee6热久久免费精品6| 男人j进女人p免费动态图| 在厨房里挺进美妇雪臀| 亚洲最大成人网色| 好吊色青青青国产在线播放| 日韩在线视频一区| 国产日韩精品中文字无码| 久久综合香蕉国产蜜臀av| 1000部啪啪未满十八勿入免费| 波多野结衣未删减在线| 国产精品先锋资源站先锋影院| 九九久久精品国产AV片国产| 草莓视频成人app下载| 嫩草影院一二三| 吃奶呻吟打开双腿做受在线视频 | 国产一级做美女做受视频| 一二三四在线观看高清| 永久免费无内鬼放心开车| 大胸美女洗澡扒奶衣挤奶| 亚洲国产视频一区| 麻豆国产剧果冻传媒视频|