亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

微信開源mars源碼分析5—底層核心mars分析(續(xù)2)

asce1885 / 1883人閱讀

摘要:執(zhí)行并根據(jù)每個連接的狀態(tài)決定后續(xù)處理,上篇已經(jīng)講過,不再累述。上面的三段處理完畢后,應(yīng)該是數(shù)組中不再有連接才對,這里的保險處理是對數(shù)組再進(jìn)行檢查。至此跳出,算是整個連接過程完畢了。這里需要逐句分析,首先是。

最近回顧之前的文章,發(fā)現(xiàn)最后一篇有些著急了,很多地方?jīng)]有敘述清楚。這里先做個銜接吧。
我們還是以長連接為例,從longlink.cc看起。首先是那個線程函數(shù)__Run:
/mars-master/mars/stn/src/longlink.cc

void LongLink::__Run() {
    ......
    // 執(zhí)行連接
    SOCKET sock = __RunConnect(conn_profile);
    
    // 無效的socket,更新描述文件,記錄失敗的時間節(jié)點,返回
    if (INVALID_SOCKET == sock) {
        conn_profile.disconn_time = ::gettickcount();
        conn_profile.disconn_signal = ::getSignal(::getNetInfo() == kWifi);
        __UpdateProfile(conn_profile);
        return;
    }
    ......
    // 執(zhí)行讀寫
    __RunReadWrite(sock, errtype, errcode, conn_profile);
}

實際上核心的就2個,連接和讀寫,我們分別看下。
/mars-master/mars/stn/src/longlink.cc

SOCKET LongLink::__RunConnect(ConnectProfile& _conn_profile) {
    std::vector ip_items;
    std::vector vecaddr;
    ......
    // 賦值填充ip_items地址端口數(shù)組
    netsource_.GetLongLinkItems(ip_items, dns_util_);
    ......
    // 根據(jù)ip_items創(chuàng)建socket_address并加入vecaddr中
    for (unsigned int i = 0; i < ip_items.size(); ++i) {
        vecaddr.push_back(socket_address(ip_items[i].str_ip.c_str(), ip_items[i].port).v4tov6_address(isnat64));
    }
    ......
    // 創(chuàng)建觀察者和ComplexConnect連接核心,然后開始執(zhí)行連接
    LongLinkConnectObserver connect_observer(*this, ip_items);
    ComplexConnect com_connect(kLonglinkConnTimeout, kLonglinkConnInteral, kLonglinkConnInteral, kLonglinkConnMax);
    SOCKET sock = com_connect.ConnectImpatient(vecaddr, connectbreak_, &connect_observer);
    
    // 返回socket
    return sock;
}

1.創(chuàng)建2個數(shù)組,地址端口item和socket_address;
2.調(diào)用netsource_.GetLongLinkItems(ip_items, dns_util_);填充IPPortItem數(shù)組;
3.根據(jù)填充好的前者數(shù)組生成socket_address填充后者數(shù)組;
4.創(chuàng)建連接觀察者;
5.開始執(zhí)行連接;
首先看看netsource_.GetLongLinkItems是如何填充的:
/mars-master/mars/stn/src/net_source.cc

bool NetSource::GetLongLinkItems(std::vector& _ipport_items, DnsUtil& _dns_util) {
    
    ScopedLock lock(sg_ip_mutex);

    if (__GetLonglinkDebugIPPort(_ipport_items)) {
        return true;
    }
    
    lock.unlock();

     std::vector longlink_hosts = NetSource::GetLongLinkHosts();
     if (longlink_hosts.empty()) {
         xerror2("longlink host empty.");
         return false;
     }

     __GetIPPortItems(_ipport_items, longlink_hosts, _dns_util, true);

    return !_ipport_items.empty();
}

可以看到debug的優(yōu)先,這里增加了調(diào)試的ip。再往下就先不貼代碼了,基本上就是之前通過SetLongLink設(shè)置進(jìn)去的sg_longlink_hosts(長連接主機(jī)列表),再往上倒騰就是在MarsServiceNative.java的onCreate中通過描述文件profile設(shè)置進(jìn)去的主機(jī)列表。也就是說之前早就設(shè)置好的主機(jī)列表已經(jīng)存在了。
下面我們?nèi)匀灰M(jìn)入到上一篇提到的ComplexConnect::ConnectImpatient這個核心函數(shù)中看看。
/mars-master/mars/comm/socket/complexconnect.cc

SOCKET ComplexConnect::ConnectImpatient(const std::vector& _vecaddr, SocketSelectBreaker& _breaker, MComplexConnect* _observer) {
    ......
    // 根據(jù)地址列表,生成ConnectCheckFSM連接列表
    std::vector vecsocketfsm;

    for (unsigned int i = 0; i < _vecaddr.size(); ++i) {
        xinfo2(TSF"complex.conn %_", _vecaddr[i].url());

        ConnectCheckFSM* ic = new ConnectCheckFSM(_vecaddr[i], timeout_, i, _observer);
        vecsocketfsm.push_back(ic);
    }
    // 下面就是對這個連接列表的各種操作了
    do {
        ......
        // 生成socketselect的封裝對象,并執(zhí)行PreSelect前期準(zhǔn)備工作
        SocketSelect sel(_breaker);
        sel.PreSelect();
        
        ......
        // 執(zhí)行連接
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->PreSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;
            timeout = std::min(timeout, vecsocketfsm[i]->Timeout());
        }
        
        ......
        
        for (unsigned int i = 0; i < index; ++i) {
            if (NULL == vecsocketfsm[i]) continue;

            xgroup2_define(group);
            vecsocketfsm[i]->AfterSelect(sel, group);
            xgroup2_if(!group.Empty(), TSF"index:%_, @%_, ", i, this) << group;

            if (TcpClientFSM::EEnd == vecsocketfsm[i]->Status()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckFail == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                vecsocketfsm[i]->Close();
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                lasterror = -1;
                continue;
            }

            if (TcpClientFSM::EReadWrite == vecsocketfsm[i]->Status() && ConnectCheckFSM::ECheckOK == vecsocketfsm[i]->CheckStatus()) {
                if (_observer) _observer->OnFinished(i, socket_address(&vecsocketfsm[i]->Address()), vecsocketfsm[i]->Socket(), vecsocketfsm[i]->Error(),
                                                         vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), (int)(gettickcount() - starttime));

                xinfo2(TSF"index:%_, sock:%_, suc ConnectImpatient:%_:%_, RTT:(%_, %_), @%_", i, vecsocketfsm[i]->Socket(),
                       vecsocketfsm[i]->IP(), vecsocketfsm[i]->Port(), vecsocketfsm[i]->Rtt(), vecsocketfsm[i]->TotalRtt(), this);
                retsocket = vecsocketfsm[i]->Socket();
                index_ = i;
                index_conn_rtt_ = vecsocketfsm[i]->Rtt();
                index_conn_totalcost_ = vecsocketfsm[i]->TotalRtt();
                vecsocketfsm[i]->Socket(INVALID_SOCKET);
                delete vecsocketfsm[i];
                vecsocketfsm[i] = NULL;
                break;
            }
        }
        
    } while (true);
}

1.數(shù)組中的每個長連接地址依次執(zhí)行連接;
2.數(shù)組中的每個連接分別做后續(xù)處理(一個for循環(huán)中的三段處理);

我們首先看看vecsocketfsm[i]->PreSelect(sel, group);這句話,是由ConnectCheckFSM的父類TcpClientFSM實現(xiàn)的:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreSelect(SocketSelect& _sel, XLogger& _log) {
    
    switch(status_) {
        case EStart: {
            PreConnectSelect(_sel, _log);
            break;
        }
        case EConnecting: {
            _sel.Write_FD_SET(sock_);
            _sel.Exception_FD_SET(sock_);
            break;
        }
        case EReadWrite: {
            PreReadWriteSelect(_sel, _log);
            break;
        }
        default:
            xassert2(false, "preselect status error");
    }
}

這里是根據(jù)這個連接的當(dāng)前狀態(tài)決定前置操作的行為(開始連接、讀寫、連接中)。再往下看就是進(jìn)行socket的connect。以PreConnectSelect為例,這里生產(chǎn)了socket,并執(zhí)行了connect,最后將成功連接的socket執(zhí)行_sel.Write_FD_SET(sock_);保存在了SocketSelect中。
我們來看下代碼:
/mars-master/mars/comm/socket/tcpclient_fsm.cc

void TcpClientFSM::PreConnectSelect(SocketSelect& _sel, XLogger& _log) {
    xassert2(EStart == status_, "%d", status_);
    // 執(zhí)行虛函數(shù),由子類繼承實現(xiàn)
    _OnCreate();

    xinfo2(TSF"addr:(%_:%_), ", addr_.ip(), addr_.port()) >> _log;

    // 生成socket
    sock_ = socket(addr_.address().sa_family, SOCK_STREAM, IPPROTO_TCP);

    if (sock_ == INVALID_SOCKET) {
        error_ = socket_errno;
        last_status_ = status_;
        status_ = EEnd;
        _OnClose(last_status_, error_, false);
        xerror2(TSF"close socket err:(%_, %_)", error_, socket_strerror(error_)) >> _log;
        return;
    }

    if (::getNetInfo() == kWifi && socket_fix_tcp_mss(sock_) < 0) {
#ifdef ANDROID
        xinfo2(TSF"wifi set tcp mss error:%0", strerror(socket_errno));
#endif
    }
    if (0 != socket_ipv6only(sock_, 0)){
        xwarn2(TSF"set ipv6only failed. error %_",strerror(socket_errno));
    }
    
    if (0 != socket_set_nobio(sock_)) {
        error_ = socket_errno;
        xerror2(TSF"close socket_set_nobio:(%_, %_)", error_, socket_strerror(error_)) >> _log;
    } else {
        xinfo2(TSF"socket:%_, ", sock_) >> _log;
    }

    if (0 != error_) {
        last_status_ = status_;
        status_ = EEnd;
        return;
    }

    start_connecttime_ = gettickcount();

    // 執(zhí)行連接
    int ret = connect(sock_, &(addr_.address()), addr_.address_length());

    if (0 != ret && !IS_NOBLOCK_CONNECT_ERRNO(socket_errno)) {
        end_connecttime_ = ::gettickcount();

        error_ = socket_errno;
        xwarn2(TSF"close connect err:(%_, %_), localip:%_", error_, socket_strerror(error_), socket_address::getsockname(sock_).ip()) >> _log;
    } else {
        xinfo2("connect") >> _log;
        // 記錄socket到SocketSelect中
        _sel.Write_FD_SET(sock_);
        _sel.Exception_FD_SET(sock_);
    }

    last_status_ = status_;

    if (0 != error_)
        status_ = EEnd;
    else
        status_ = EConnecting;

    if (0 == error_) _OnConnect();
}

需要注意的是_OnCreate的調(diào)用,實際上是子類實現(xiàn)的,這里也就是ConnectCheckFSM實現(xiàn)的:

virtual void _OnCreate() { if (m_observer) m_observer->OnCreated(m_index, addr_, sock_);}

這里將觀察者與連接對象的生命周期綁在了一起,執(zhí)行了觀察者的OnCreated。那么觀察者是誰呢?往上看,在LongLink::__RunConnect中生成的LongLinkConnectObserver。當(dāng)然生命周期的回調(diào)并不止OnCreated一個。

回到__RunConnect中,看后續(xù)處理(for循環(huán)的三段處理)。執(zhí)行AfterSelect并根據(jù)每個連接的狀態(tài)決定后續(xù)處理,上篇已經(jīng)講過,不再累述。

那么何時終止這個do while循環(huán)呢?當(dāng)for循環(huán)的三段處理完畢后,所有的連接過程都已經(jīng)處理完畢了:

        // end of loop
        bool all_invalid = true;

        for (unsigned int i = 0; i < vecsocketfsm.size(); ++i) {
            if (NULL != vecsocketfsm[i]) {
                all_invalid = false;
                break;
            }
        }

        if (all_invalid || INVALID_SOCKET != retsocket) break;

最后枚舉一遍連接數(shù)組,每個元素檢查是否非空,如果還有非空的,就將all_invalid置為false,那么會繼續(xù)走一次do while。上面的三段處理完畢后,應(yīng)該是數(shù)組中不再有連接才對,這里的保險處理是對數(shù)組再進(jìn)行檢查。至此跳出do while,算是整個連接過程完畢了。

可以看到,經(jīng)過了三段處理后,連接數(shù)組中只會命中一個檢測成功的連接,其他的失敗和完成的都會置為null。這里從全局看就是一個地址池的淘汰篩選機(jī)制。在三段處理的for循環(huán)中清除不合格的連接,挑出第一個找到的合格的連接,然后跳出三段后,立刻檢查整個數(shù)組是否已經(jīng)就剩這一個可用了,如果不是繼續(xù)執(zhí)行do while,又會去執(zhí)行數(shù)組中的每個item的連接過程,再回到三段處理。也就是說所有的數(shù)組中的item都會連接一次,然后根據(jù)返回的狀態(tài)決定是否命中最終的一個socket。這是干嘛呢這么繞?我之前的理解恐怕還不透徹,現(xiàn)在感覺是在找一個穩(wěn)定的可以讀寫狀態(tài)的連接。
第一次進(jìn)入do while已經(jīng)連接所有池中的item了,那么在經(jīng)過了三段處理后淘汰掉不合適的和失敗的,然后再進(jìn)入do while再次執(zhí)行vecsocketfsm[i]->PreSelect(sel, group);的時候,已經(jīng)更新了狀態(tài)并執(zhí)行了不同的調(diào)用了,再經(jīng)過三段處理在新的狀態(tài)下再淘汰一批,最后經(jīng)過整個運轉(zhuǎn),留下來的只能是最持久的(穩(wěn)定的)唯一的一個連接,返回這個。
不得不說,這里確實巧妙,如果我寫并不會比這要好。

我們回來到longlink.cc的線程函數(shù)__Run中,當(dāng)連接處理完畢后,下面繼續(xù)執(zhí)行的是__RunReadWrite。我們來看看:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {
    // Alarm消息觸發(fā)處理綁定在__OnAlarm上
    Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
    Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);
}

首先是2個Alarm,這里要理解就需要看看這個Alarm是個什么東西:
/mars-master/mars/comm/alarm.h

    template
    explicit Alarm(const T& _op, bool _inthread = true)
        : target_(detail::transform(_op))
        , reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))
        , runthread_(boost::bind(&Alarm::__Run, this), "alarm")
        , inthread_(_inthread)
        , seq_(0), status_(kInit)
        , after_(0) , starttime_(0) , endtime_(0)
        , reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))
#ifdef ANDROID
        , wakelock_(NULL)
#endif
    {}

構(gòu)造函數(shù)。這里需要逐句分析,首先是target_(detail::transform(_op))。簡單看是個賦值語句,后面的參數(shù)需要看這個:
/mars-master/mars/comm/thread/runnable.h

// base template for no argument functor
template 
struct TransformImplement {
    static Runnable* transform(const T& t) {
        return new RunnableFunctor(t);
    }
};

template 
inline Runnable* transform(const T& t) {
    return TransformImplement::transform(t);
}

1.這里使用的是c++魔板,直接new了一個RunnableFunctor對象,這個對象是個runnable,其實就是將這個傳遞進(jìn)來的參數(shù)t包裝成了一個runnable,在適當(dāng)?shù)臅r候調(diào)用他的run方法的時候就會調(diào)用這個t了。那么帶入到具體的內(nèi)容中,這個t是_op,就是boost::bind(&LongLink::__OnAlarm, this)。這里又使用了c++的boost庫,做了bind操作,綁定了參數(shù)this也就是LongLink與函數(shù)體LongLink::__OnAlarm。好了,現(xiàn)在target_是個包裝好的runnable了,在適當(dāng)?shù)臅r候可以執(zhí)行LongLink::__OnAlarm。

2.reg_async_(MessageQueue::InstallAsyncHandler(MessageQueue::GetDefMessageQueue()))。首先看MessageQueue::InstallAsyncHandler:
/mars-master/mars/comm/messagequeue/message_queue.cc

MessageHandler_t InstallAsyncHandler(const MessageQueue_t& id) {
    ASSERT(0 != id);
    return InstallMessageHandler(__AsyncInvokeHandler, false, id);
}

MessageHandler_t InstallMessageHandler(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid) {
    ASSERT(bool(_handler));

    ScopedLock lock(sg_messagequeue_map_mutex);
    const MessageQueue_t& id = _messagequeueid;

    if (sg_messagequeue_map.end() == sg_messagequeue_map.find(id)) {
        ASSERT2(false, "%" PRIu64, id);
        return KNullHandler;
    }

    HandlerWrapper* handler = new HandlerWrapper(_handler, _recvbroadcast, _messagequeueid, __MakeSeq());
    sg_messagequeue_map[id].lst_handler.push_back(handler);
    return handler->reg;
}

struct HandlerWrapper {
    HandlerWrapper(const MessageHandler& _handler, bool _recvbroadcast, const MessageQueue_t& _messagequeueid, unsigned int _seq)
        : handler(_handler), recvbroadcast(_recvbroadcast) {
        reg.seq = _seq;
        reg.queue = _messagequeueid;
    }

    MessageHandler_t reg;
    MessageHandler handler;
    bool recvbroadcast;
};

生成了一個HandlerWrapper,并將其保留在了一個map中,隨后返回了MessageHandler_t,其中保存了_seq與_messagequeueid。這里我的感覺是這個handler就是個類似句柄的東西,保存MessageHandler的一個關(guān)聯(lián)關(guān)系,即消息隊列與seq碼(這里是個自增的靜態(tài)變量)。實際上調(diào)用者只要有這個MessageHandler_t就可以了。最后將這個MessageHandler_t賦值給了reg_async_。這里又有一個對象ScopeRegister是個MessageHandler_t的包裝對象,里面統(tǒng)一封裝了方法來操作MessageHandler_t。

3.runthread_(boost::bind(&Alarm::__Run, this), "alarm")。一個線程對象,線程函數(shù)是Alarm::__Run。沒事什么好解釋的。

4.inthread_(_inthread), seq_(0), status_(kInit), after_(0) , starttime_(0) , endtime_(0)。都是簡單賦值,暫時不去管它。

5.reg_(MessageQueue::InstallMessageHandler(boost::bind(&Alarm::OnAlarm, this, _1, _2), true))。類似2。

好了,這個Alarm可以看做是個消息處理,在有消息觸發(fā)的情況下會調(diào)用到具體的函數(shù)中,一般是__OnAlarm。

回到__RunReadWrite,往下看。首先是個while的死循環(huán),我們多帶帶摘錄如下:

    while (true) {
        ......
        if (!alarmnoopinterval.IsWaiting()) {
            ......
            if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
                is_noop = true;
                __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
            }
            ......
        }
        
        ......
        // socket處理
        SocketSelect sel(readwritebreak_, true);
        sel.PreSelect();
        sel.Read_FD_SET(_sock);
        sel.Exception_FD_SET(_sock);
        
        ScopedLock lock(mutex_);
        
        if (!lstsenddata_.empty()) sel.Write_FD_SET(_sock);
        
        lock.unlock();
        
        int retsel = sel.Select(10 * 60 * 1000);
        ......
        // 處理發(fā)送(寫入)
        if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
            ......
            ssize_t writelen = ::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0);
            ......
            while (it != lstsenddata_.end() && 0 < writelen) {
                if (0 == it->data.Pos()) OnSend(it->taskid);
                
                if ((size_t)writelen >= it->data.PosLength()) {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, it->data.PosLength(), it->data.PosLength(), it->data.Length()) >> xlog_group;
                    writelen -= it->data.PosLength();
                    if (!it->task_info.empty()) sent_taskids[it->taskid] = it->task_info;
                    LongLinkNWriteData nwrite(it->taskid, it->data.PosLength(), it->cmdid, it->task_info);
                    nsent_datas.push_back(nwrite);
                    
                    it = lstsenddata_.erase(it);
                } else {
                    xinfo2(TSF"sub send taskid:%_, cmdid:%_, %_, len(S:%_, %_/%_), ", it->taskid, it->cmdid, it->task_info, writelen, it->data.PosLength(), it->data.Length()) >> xlog_group;
                    it->data.Seek(writelen, AutoBuffer::ESeekCur);
                    writelen = 0;
                }
            }
            
        }
        
        ......
        // 處理接收(讀取)
        if (sel.Read_FD_ISSET(_sock)) {
            bufrecv.AllocWrite(64 * 1024, false);
            ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
            ......
            while (0 < bufrecv.Length()) {
                uint32_t cmdid = 0;
                uint32_t taskid = Task::kInvalidTaskID;
                size_t packlen = 0;
                AutoBuffer body;
                
                int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body);
                
                if (LONGLINK_UNPACK_FALSE == unpackret) {
                    xerror2(TSF"task socket recv sock:%0, unpack error dump:%1", _sock, xdump(bufrecv.Ptr(), bufrecv.Length()));
                    _errtype = kEctNetMsgXP;
                    _errcode = kEctNetMsgXPHandleBufferErr;
                    goto End;
                }
                
                xinfo2(TSF"task socket recv sock:%_, pack recv %_ taskid:%_, cmdid:%_, %_, packlen:(%_/%_)", _sock, LONGLINK_UNPACK_CONTINUE == unpackret ? "continue" : "finish", taskid, cmdid, sent_taskids[taskid], LONGLINK_UNPACK_CONTINUE == unpackret ? bufrecv.Length() : packlen, packlen);
                lastrecvtime_.gettickcount();
                
                if (LONGLINK_UNPACK_CONTINUE == unpackret) {
                    OnRecv(taskid, bufrecv.Length(), packlen);
                    break;
                } else {
                    
                    sent_taskids.erase(taskid);
                    
                    bufrecv.Move(-(int)(packlen));
                    
                    if (__NoopResp(cmdid, taskid, body, alarmnooptimeout, _profile)) {
                        xdebug2(TSF"noopresp span:%0", alarmnooptimeout.ElapseTime());
                        is_noop = false;
                    } else {
                        OnResponse(kEctOK, 0, cmdid, taskid, body, _profile);
                    }
                }
            }
        }
    }

// 收尾,整個looper退出
End:
    

從while中的代碼能夠看出,基本上就是上面摘錄的幾塊,如下所示:
1.__NoopReq調(diào)用,無數(shù)據(jù)狀態(tài)處理;
2.socket的select處理;
3.處理發(fā)送寫入部分;
4.處理接收讀取部分;

這里需要逐個分析了:
1.__NoopReq:
先看代碼,并不長:

bool LongLink::__NoopReq(XLogger& _log, Alarm& _alarm, bool need_active_timeout) {
    AutoBuffer buffer;
    uint32_t req_cmdid = 0;
    bool suc = false;
    
    if (identifychecker_.GetIdentifyBuffer(buffer, req_cmdid)) {
        suc = Send((const unsigned char*)buffer.Ptr(), (int)buffer.Length(), req_cmdid, Task::kLongLinkIdentifyCheckerTaskID);
        identifychecker_.SetSeq(Task::kLongLinkIdentifyCheckerTaskID);
        xinfo2(TSF"start noop synccheck taskid:%0, cmdid:%1, ", Task::kLongLinkIdentifyCheckerTaskID, req_cmdid) >> _log;
    } else {
        AutoBuffer body;
        longlink_noop_req_body(body);
        suc = SendWhenNoData((const unsigned char*) body.Ptr(), body.Length(), longlink_noop_cmdid(), Task::kNoopTaskID);
        xinfo2(TSF"start noop taskid:%0, cmdid:%1, ", Task::kNoopTaskID, longlink_noop_cmdid()) >> _log;
    }
    
    if (suc) {
        _alarm.Cancel();
        _alarm.Start(need_active_timeout ? (2* 1000) : (10 * 1000));
    } else {
        xerror2("send noop fail");
    }
    
    return suc;
}

說實話,這里看的不是很清晰 ,因為之前肯定有忽略的部分,我的猜測是在走了一個發(fā)送信令的校驗后,根據(jù)返回的值的不同決定是執(zhí)行send發(fā)送數(shù)據(jù)(使用校驗填充好的buffer),還是走SendWhenNoData發(fā)送(自行填充請求體)沒有數(shù)據(jù)的情況。暫時先往下看一步,看看Send:

bool LongLink::__Send(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid, const std::string& _task_info) {
    lstsenddata_.push_back(LongLinkSendData());

    lstsenddata_.back().cmdid = _cmdid;
    lstsenddata_.back().taskid = _taskid;
    longlink_pack(_cmdid, _taskid, _pbuf, _len, lstsenddata_.back().data);
    lstsenddata_.back().data.Seek(0, AutoBuffer::ESeekStart);
    lstsenddata_.back().task_info = _task_info;

    readwritebreak_.Break();
    return true;
}

這里能夠清晰的看到,在使用lstsenddata_這個隊列,來進(jìn)行發(fā)送的請求,實際上就是向隊列中增加一項。那么現(xiàn)在的問題就在于這個發(fā)送的數(shù)據(jù)時怎么來的了。這就需要我們弄懂LongLinkIdentifyChecker這個玩意兒。
/mars-master/mars/stn/src/longlink_identify_checker.cc

bool LongLinkIdentifyChecker::GetIdentifyBuffer(AutoBuffer &_buffer, uint32_t &_cmdid)
{
    if (has_checked_) return false;
    
    hash_code_buffer_.Reset();
    _buffer.Reset();

    IdentifyMode mode = (IdentifyMode)GetLonglinkIdentifyCheckBuffer(_buffer, hash_code_buffer_, (int&)_cmdid);

    switch (mode)
    {
    case kCheckNever:
        {
            has_checked_ = true;
        }
        break;
    case kCheckNext:
        {
            has_checked_ = false;
        }
        break;
    case kCheckNow:
        {
            cmd_id_ = _cmdid;
            return true;
        }
        break;
    default:
        xassert2(false);
    }
    
    return false;
}

調(diào)用了GetLonglinkIdentifyCheckBuffer,我們追溯到stn_logic.cc中:

    int  GetLonglinkIdentifyCheckBuffer(AutoBuffer& identify_buffer, AutoBuffer& buffer_hash, int32_t& cmdid) {
        xassert2(sg_callback != NULL);
        return sg_callback->GetLonglinkIdentifyCheckBuffer(identify_buffer, buffer_hash, cmdid);
    }

實際上是對sg_callback這個回調(diào)的調(diào)用。最終我找到的線索是在MarsServiceNative.java上層的onCreate中設(shè)置了回調(diào):

        // set callback
        AppLogic.setCallBack(stub);
        StnLogic.setCallBack(stub);
        SdtLogic.setCallBack(stub);

再接著找到了MarsServiceStub.java中的getLongLinkIdentifyCheckBuffer:

    @Override
    public int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID) {
        // Send identify request buf to server
        // identifyReqBuf.write();

        return ECHECK_NEVER;
    }

返回的是ECHECK_NEVER,沒有填充buffer。也即是說has_checked_ = true,然后返回false。其實看到這一刻我是崩潰的,真心不知道是想干嘛。我們只能這么理解,只要進(jìn)入__NoopReq其實都是在走SendWhenNoData發(fā)送無數(shù)據(jù)狀態(tài)。好吧,我們重新回到__RunReadWrite中看一下。每次在while循環(huán)中一上來只要不是alarmnoopinterval正在等待的狀態(tài),那么就走一個發(fā)送無數(shù)據(jù)狀態(tài)??纯窗l(fā)送無數(shù)據(jù)的代碼:

bool LongLink::SendWhenNoData(const unsigned char* _pbuf, size_t _len, uint32_t _cmdid, uint32_t _taskid) {
    ScopedLock lock(mutex_);

    if (kConnected != connectstatus_) return false;
    if (!lstsenddata_.empty()) return false;

    return __Send(_pbuf, _len, _cmdid, _taskid, "");
}

其實是檢查lstsenddata_是否有內(nèi)容,如果沒有才發(fā)送。那么好吧,這里整體理解就是每次whie循環(huán)開始都會檢查如果發(fā)送隊列中沒有數(shù)據(jù)的時候,發(fā)送一個特定的無數(shù)據(jù)狀態(tài)來確認(rèn)連接。但是這里寫的比較復(fù)雜,還需要回調(diào)回上層java的代碼中,讓其來控制狀態(tài),從而根據(jù)狀態(tài)控制流程,只能說考慮的很周全,任何情況在任何節(jié)點都可以有處理。吐槽下如果我們自己寫來規(guī)劃這部分的時候大多數(shù)時候都是最對無數(shù)據(jù)檢測放在下層,然后直接就發(fā)送了,不會讓上層這里進(jìn)行什么干涉吧。其實這里還有些點沒有詳細(xì)的分析很清楚,原諒文章有限,畢竟不能偏離主線太多。

2.socket的select操作。
這里倒沒什么可說的,前面的設(shè)置,為后面的sel.Select(10 60 1000)做準(zhǔn)備,內(nèi)部采用poll來運作。

3.發(fā)送過程。
先是判斷如果發(fā)送隊列里面有內(nèi)容,執(zhí)行下面的::send(_sock, lstsenddata_.begin()->data.PosPtr(), lstsenddata_.begin()->data.PosLength(), 0)。這里注意,參數(shù)給定的是隊列的第一個的data,也就是說這里是取出第一個執(zhí)行發(fā)送。
下面就是一個while循環(huán),將發(fā)送隊列過了一遍。如果剛才發(fā)送的數(shù)據(jù)大小與待發(fā)送的實際數(shù)據(jù)長度相等,那么認(rèn)為是發(fā)送完了這一個,從隊列中移除這一個,然后下一次while會自動取下一個了。如果沒有;認(rèn)為是沒發(fā)完,位移數(shù)據(jù),下次while仍然獲取到這個item,但是數(shù)據(jù)位移已經(jīng)變了,因此繼續(xù)發(fā)送下面的數(shù)據(jù)。經(jīng)過這個while之后,所有的發(fā)送隊列中的數(shù)據(jù)都應(yīng)當(dāng)發(fā)送完畢了。

4.接收過程。
前面沒什么好說的,無非是開辟buffer空間,然后執(zhí)行recv調(diào)用。之后進(jìn)入一個while循環(huán),條件是讀取的buffer有數(shù)據(jù)。
首先走一個解包調(diào)用,內(nèi)部走的是__unpack_test,具體內(nèi)容就不貼了,我簡單看了下,基本上就是解開頭部,頭部的信息標(biāo)識了本次傳遞的基本信息,包括了版本號等內(nèi)容,一個結(jié)構(gòu)體,還是比較標(biāo)準(zhǔn)的。這里是嘗試解包,如果本次接收到的大小連頭部都不夠,那肯定返回錯誤,需要繼續(xù)接收了。那么從這個能夠看出,每次傳遞的數(shù)據(jù)都是帶有一個頭部的__STNetMsgXpHeader。這東西里面塞入的內(nèi)容可以和客戶端的版本,當(dāng)前這個信令的id等關(guān)聯(lián)起來。
再下去看到的就是對解包返回值的判斷了,如果一切順利,就走到sent_taskids.erase(taskid);這里需要注意,這個sent_taskids是個發(fā)送的taskid的map,這里可以推測發(fā)送和接受其實是關(guān)聯(lián)的,這里接收完畢移除這個保留項。然后走的__NoopResp這個調(diào)用。如果返回false表示不是空的信令返回,那么就走OnResponse。這個函數(shù)實際上是在LongLinkTaskManager中綁定了longlink_->OnResponse = boost::bind(&LongLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6);綁定在了LongLinkTaskManager::__OnResponse這里。

void LongLinkTaskManager::__OnResponse(ErrCmdType _error_type, int _error_code, uint32_t _cmdid, uint32_t _taskid, AutoBuffer& _body, const ConnectProfile& _connect_profile) {
    copy_wrapper body(_body);
    RETURN_LONKLINK_SYNC2ASYNC_FUNC(boost::bind(&LongLinkTaskManager::__OnResponse, this, _error_type, _error_code, _cmdid, _taskid, body, _connect_profile));

    ......
    
    int err_code = 0;
    int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, body, err_code, Task::kChannelLong);
    
    switch(handle_type){
        case kTaskFailHandleNoError:
        {
            dynamic_timeout_.CgiTaskStatistic(it->task.cgi, (unsigned int)it->transfer_profile.send_data_size + (unsigned int)body->Length(), ::gettickcount() - it->transfer_profile.start_send_time);
            __SingleRespHandle(it, kEctOK, err_code, handle_type, _connect_profile);
            xassert2(fun_notify_network_err_);
            fun_notify_network_err_(__LINE__, kEctOK, err_code, _connect_profile.ip, _connect_profile.port);
        }
            break;
        ......
    }

}

其實就2件事,通過Buf2Resp底層回包返回給上層解析。如果沒有錯誤kTaskFailHandleNoError,會執(zhí)行__SingleRespHandle:

bool LongLinkTaskManager::__SingleRespHandle(std::list::iterator _it, ErrCmdType _err_type, int _err_code, int _fail_handle, const ConnectProfile& _connect_profile) {
    ......
    int cgi_retcode = fun_callback_(_err_type, _err_code, _fail_handle, _it->task, (unsigned int)(curtime - _it->start_task_time));
    ......
}

這里的關(guān)鍵點就這一個,調(diào)用回調(diào),回調(diào)的綁定在net_core.cc中的NetCore構(gòu)造里,longlink_task_manager_->fun_callback_ = boost::bind(&NetCore::__CallBack, this, (int)kCallFromLong, _1, _2, _3, _4, _5);,最終執(zhí)行的是NetCore::__CallBack:

int NetCore::__CallBack(int _from, ErrCmdType _err_type, int _err_code, int _fail_handle, const Task& _task, unsigned int _taskcosttime) {

    if (task_callback_hook_ && 0 == task_callback_hook_(_from, _err_type, _err_code, _fail_handle, _task)) {
        xwarn2(TSF"task_callback_hook let task return. taskid:%_, cgi%_.", _task.taskid, _task.cgi);
        return 0;
    }

    if (kEctOK == _err_type || kTaskFailHandleTaskEnd == _fail_handle)
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    if (kCallFromZombie == _from) return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

#ifdef USE_LONG_LINK
    if (!zombie_task_manager_->SaveTask(_task, _taskcosttime))
#endif
        return OnTaskEnd(_task.taskid, _task.user_context, _err_type, _err_code);

    return 0;
}

看到了吧,走了OnTaskEnd,任務(wù)結(jié)束。

此文從中間部分開始粗糙了,前面鋪墊的東西后面沒有講到,心不靜的時候分析東西效果確實不大好??偠灾热粓猿謱懲炅耍@里還是留個記錄吧,日后有機(jī)會的時候會回顧把這部分完善好。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/70720.html

相關(guān)文章

  • 微信開源mars源碼分析2—上層samples分析續(xù)

    摘要:本來是想直接深入到的核心層去看的,但是發(fā)現(xiàn)其實上面的部分還有好些沒有分析到,因此回來繼續(xù)分析。另外一個,是專用于統(tǒng)計的,我們暫時不去關(guān)注。具體的內(nèi)容我會在后面的核心層分析的時候指出。準(zhǔn)備下一篇進(jìn)行的核心層分析吧。 本來是想直接深入到mars的核心層去看的,但是發(fā)現(xiàn)其實上面的samples部分還有好些沒有分析到,因此回來繼續(xù)分析。ConversationActivity這個類中實際上還做...

    MyFaith 評論0 收藏0
  • 微信開源mars源碼分析1—上層samples分析

    摘要:微信已經(jīng)開源了,但是市面上相關(guān)的文章較少,即使有也是多在于使用等這些,那么這次我希望能夠從這個直接用于底層通訊的部分進(jìn)行個分析。首先明確下,微信用了的開源協(xié)議庫,來代替和。核心的部分我們先放下,下一篇再深入分析。 微信已經(jīng)開源了mars,但是市面上相關(guān)的文章較少,即使有也是多在于使用xlog等這些,那么這次我希望能夠從stn這個直接用于im底層通訊的部分進(jìn)行個分析。為了能分析的全面些,...

    caiyongji 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<