亚洲线精品久久一区二区三区,成人看片在线观看,草草视频手机在线观看视频,亚洲六月丁香色婷婷综合久久

首頁 > 生活 >

【Netty源碼分析】04 服務端讀流程

讀流程

客戶端接入后,下面一步操作就是讀取客戶端傳輸過來的數據,這一節我們就來分析下服務端讀取客戶端數據流程。從前面分析來看,channel的事件輪詢、事件處理是在NioEventLooprun方法中,從這里我們就很容易找我服務端讀流程的入口方法:processSelectedKeys()


(資料圖片)

processSelectedKeys()一直追蹤下去,可以看到OP_READ處理邏輯分支:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

可能你會比較奇怪:為什么OP_READOP_ACCEPT都會走這個分支?

OP_ACCEPTNioServerSocketChannel處理的事件,而OP_READNioSocketChannel處理的事件,所以,雖然它們都走這個分支,但是channel類型確是不一樣的,即這里的unsafe類型也不一樣,一個是:NioMessageUnsafe,另一個是:NioSocketChannelUnsafeNioServerSocketChannel負責監聽客戶端連接,當有客戶端連接進入時,對它來說就是有個讀入消息需要被處理。

這里我們是處理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe類型實例。

AbstractNioByteChannel.NioByteUnsafe#read方法代碼如下:

public final void read() { final ChannelConfig config = config();    if (shouldBreakReadReady(config)) {     clearReadPending();         return;    }    final ChannelPipeline pipeline = pipeline();    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();    allocHandle.reset(config);    ByteBuf byteBuf = null;    boolean close = false;    try {        do {            // 申請ByteBuf對象            byteBuf = allocHandle.allocate(allocator);            //doReadBytes(byteBuf):將數據讀取到ByteBuf中            //lastBytesRead()將讀取的字節數設置到lastBytesRead            allocHandle.lastBytesRead(doReadBytes(byteBuf));            if (allocHandle.lastBytesRead() <= 0) {                byteBuf.release();                byteBuf = null;                close = allocHandle.lastBytesRead() < 0;                if (close) {                    readPending = false;                }                break;            }            allocHandle.incMessagesRead(1);            readPending = false;            //觸發pipeline channelRead事件,將讀入數據ByteBuf傳入到handler中            pipeline.fireChannelRead(byteBuf);            byteBuf = null;        } while (allocHandle.continueReading());//判斷是否繼續讀取          allocHandle.readComplete();        //觸發pipeline channelReadComplete        pipeline.fireChannelReadComplete();        if (close) {            closeOnRead(pipeline);        }    } catch (Throwable t) {        handleReadException(pipeline, byteBuf, t, close, allocHandle);    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

這個方法刨除其它邏輯,關于客戶端數據處理邏輯主要包括3個步驟:

allocHandle.lastBytesRead(doReadBytes(byteBuf)):調用java api,從channel中讀取字節數據到ByteBuf緩存中;pipeline.fireChannelRead(byteBuf):觸發pipelinechannelRead事件,并將帶有讀入數據的ByteBuf通過參數傳入;pipeline.fireChannelReadComplete():觸發pipelinechannelReadComplete事件;

事件傳播

調用pipelinefireChannelRead()就可觸發channelRead事件在handler之間傳播,事件傳播這塊代碼比較繞,給人感覺不停的來回調用容易繞暈,下面通過圖可以更加直觀的看出調用流程,再配合代碼就很好理解了。

關鍵點就在于HandlerContext中提供了一個靜態方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一個是在哪個handler上觸發事件,第二個參數就是數據本身,通過這個方法就可以指定在哪個handler上觸發channelRead事件。由于pipeline中的handler是被包裝成HandlerContext放入的,所以,可以通過handler()方法找到真正的handler對象進行觸發。

比如pipelinefireChannelRead()就是觸發headchannelRead事件,如果處理完成需要把事件繼續傳播給下一個handler,就需要調用ctx.fireChannelRead(msg)方法即可,該方法中通過next屬性獲取到下一個節點,然后執行static invokeChannelRead(next, msg)這個方法就可以將事件傳播到下一個節點上。

pipeline.fireChannelRead(byteBuf)運行完成后會調用pipeline.fireChannelReadComplete()方法,觸發channelReadComplete事件,執行機制和channelRead事件一樣,就不再贅述。

搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之間的區別了,避免誤用。

Pipeline線程模型

上面分析的都是常規模式,沒有給handler指定額外線程情況下channelReadchannelReadComplete傳播機制,大致如下圖:

先觸發channelRead事件,按照pipeline中順序依次觸發,當所有handler都觸發完后,再觸發channelReadComplete事件,按照pipeline中的順序依次觸發。這些所有流程采用的都是同步方式,在同一個線程中執行,這個線程就是channel注冊的NioEventLoop

我們來看下static void invokeChannelRead()這個方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

在執行next.invokeChannelRead(m)方法前有個executor.inEventLoop()判斷,判斷當前執行線程是不是就是handler執行所需的線程。執行handler方法是不能隨便線程都可以去執行的,必須使用handler內部指定的executor線程執行器中執行才行。如下圖,也就是說紅色框框中的內容必須在executor線程執行器中執行,如果當前線程和handler執行線程不是同一個,就需要進行線程切換:則調用封裝成一個任務,提交到executor的任務隊列中讓其執行。

executor線程執行器是通過next.executor()方法獲取到的,從這個方法源碼中可以看到獲取邏輯:如果HandlerContextexecutor有值則直接返回;否則返回channel注冊的NioEventLoop作為線程執行器。

在添加handler時可以指定一個EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,這樣,再把handler包裝成HandlerContext過程中會從這個EventGroup根據chooser選取策略獲得一個EventLoop賦值給executor

所以,從上面分析,默認情況下handler都是在channel注冊的NioEventLoop線程中執行的,除非在addLast添加handloer時特別指定。

下面我們通過一個案例分析下pipeline線程模型,如下,給handler02添加一個額外的線程池:

EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast( "handler01", new OtherTest01());    pipeline.addLast( bizGroup, "handler02", new OtherTest02());    pipeline.addLast( "handler03", new OtherTest03());}

這時,channelReadchannelReadComplete事件觸發流程見下圖:

channelRead事件執行流程說明:

上下兩部分代表兩個線程,上面是channel注冊的eventLoop,下面是添加handler02指定的eventLoop;首先觸發handler01channelRead事件,本身當前線程和handler01是同一個線程,所以,直接調用handler#channelRead()方法;handler01#channelRead()方法執行完成后,事件繼續向下傳播,需要調用handler02#channelRead()方法,但是handler02執行線程并不是默認的channel的注冊線程,而是額外設置的biz線程,需要將調用包裝成一個任務提交到biz線程的任務隊列taskQueue中,然后直接返回;biz線程執行器內部線程會一直循環從taskQueue中獲取任務執行,這樣就完成了線程切換效果;當handler02#channelRead()方法執行完成后,需要執行handler03#channelRead(),它們又不在同一個線程中執行,這時有需要切換線程,所以會把handler03#channelRead()的調用封裝成一個任務提交到register eventLoop的taskQueue中,待其內部線程提取執行;

下面再來看下channelReadComplete事件執行流程:

上圖a1將任務提交給taskQueue任務隊列后直接返回了,而不是等其執行完成再返回;a1返回后,從源碼分析來看,會立即觸發channelReadComplete事件,涉及到線程切換,同理b1這里也是將handler02#channelReadComplete()調用封裝成任務放入到biz eventLooptaskQueue中的,然后也直接返回了;這樣,biz eventLoop線程執行器taskQueue中就有兩個任務,會按照順序依次執行:先執行channelRead()調用,再執行channelReadComplete()調用;執行a3、b3時同理;

總結

從上面可以看出,Pipelinehandler可以在不同線程間切換得到關鍵是:taskQueue。還要一點非常重要:handler線程池執行器默認使用的channel注冊的NioEventLoop這個,NioEventLoop采用的是單線程工作模式,同時還需要處理selector.select()事件輪詢,所以,handler里肯定不能有耗時、特別是IO阻塞等操作,不然卡在handler中,selector#select()執行不到,無法及時接收到客戶端傳送過來的數據。

關鍵詞:

責任編輯:Rex_29

推薦閱讀

當前速看:強國必先強農

· 2023-03-28 19:34:52

關于我們 聯系我們 商務合作 誠聘英才 網站地圖

Copyright @ 2008-2020 www.wnchengjie.com Corporation,All Rights Reserved

熱訊新聞網 版權所有 備案號:豫ICP備20005723號-6
文章投訴郵箱:2 9 5 9 1 1 5 7 8@qq.com 違法信息舉報郵箱:jubao@123777.net.cn

營業執照公示信息

亚洲线精品久久一区二区三区,成人看片在线观看,草草视频手机在线观看视频,亚洲六月丁香色婷婷综合久久
  • <acronym id="qmqcg"><cite id="qmqcg"></cite></acronym>
    <td id="qmqcg"><em id="qmqcg"></em></td>
    • 主站蜘蛛池模板: 国产福利精品视频| 国产精品视频最多的网站| 97av在线视频| 欧美极品欧美精品欧美视频| 蜜臀久久99精品久久久久久宅男| 日本欧美一级片| 欧美成人手机在线| 久久艳片www.17c.com| www.欧美视频| 色先锋资源久久综合5566| 一区二区成人精品| 一区二区福利视频| 久久综合久久八八| 操人视频在线观看欧美| 久久成年人视频| 97色在线视频| 久热精品视频在线观看| 欧美成年人视频网站| 国产精品日韩专区| 国产午夜精品久久久 | 成人激情在线播放| 亚洲天堂影视av| 欧美日韩一区免费| 日韩高清免费在线| 欧美日韩国产精品一区二区不卡中文 | 久久久久五月天| 亚洲国产精品免费| 欧美日韩精品在线| 亚洲综合精品伊人久久| 欧美理论在线观看| 亚洲高清不卡av| 欧美色另类天堂2015| 91九色国产社区在线观看| 久久影视电视剧免费网站| 日韩av免费在线播放| 精品久久久av| 成人激情综合网| 成人日韩在线电影| 久久久久久中文字幕| 日韩少妇与小伙激情| 成人午夜两性视频|