博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
微信协程库libco研究(三):协程的事件管理
阅读量:6914 次
发布时间:2019-06-27

本文共 6172 字,大约阅读时间需要 20 分钟。

前面的两篇文章中介绍了微信的libco库如何和,本篇文章将介绍libco库是进行事件管理的。

libco库使用一种类似时间片的技术进行轮询,使得每个注册在其上的事件都有机会执行。

1. 基础数据结构

在上一篇文章中介绍stCoRoutineEnv_t时,我们将stCoEpoll_t这个结构跳过了,现在我们来仔细分析下这个数据结构。

struct stCoEpoll_t{    int iEpollFd;    static const int _EPOLL_SIZE = 1024 * 10;    struct stTimeout_t *pTimeout; //用于保存timeout item    struct stTimeoutItemLink_t *pstTimeoutList; // 在后续的event_ loop中介绍    struct stTimeoutItemLink_t *pstActiveList;     co_epoll_res *result; };

stCoEpoll_t中主要保存了epoll监听的fd,以及注册在其中的超时事件。

stTimeoutItem_t其实是libco库实现的双向链表,有prev和next指针,同时保存了链表指针。后面在使用过程中再介绍stTimeout_t

struct stTimeoutItem_t{    enum    {        eMaxTimeout = 40 * 1000 //40s    };    stTimeoutItem_t *pPrev;    stTimeoutItem_t *pNext;    stTimeoutItemLink_t *pLink;    unsigned long long ullExpireTime;    OnPreparePfn_t pfnPrepare;    OnProcessPfn_t pfnProcess;    void *pArg; // routine     bool bTimeout;};struct stTimeoutItemLink_t{    stTimeoutItem_t *head;    stTimeoutItem_t *tail;};struct stTimeout_t{    stTimeoutItemLink_t *pItems;    int iItemSize;    unsigned long long ullStart;    long long llStartIdx;};

2. 初始化

在上篇文章中,在初始化本线程的stCoRoutineEnv_t时,在co_init_curr_thread_env的最后,会调用AllocEpoll() => AllocTimeout() 方法,我们看一下AllocTimeout中具体做了哪些事情。

stTimeout_t *AllocTimeout( int iSize ){    stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) );        lp->iItemSize = iSize;    lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize );    lp->ullStart = GetTickMS();    lp->llStartIdx = 0;    return lp;}
  1. 申请了60*1000个timeoutLink链表
  2. 设置当前时间为起始时间
  3. 设置当前游标为0

3. 添加监听事件

下面以一个简单的客户端链接服务器的例子在说明在libco中是如何添加监听事件的。

fd = socket(PF_INET, SOCK_STREAM, 0);    struct sockaddr_in addr;    SetAddr(endpoint->ip, endpoint->port, addr);    ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));

由于在libco库中hook了socket和connect的函数,因此,这个逻辑会调用poll函数,最终将调用co_poll_inner。下面介绍co_poll_inner的具体逻辑。

第一步,先将epoll结构转换成poll结构(不清楚为什么一定要转换成poll类型,难道是为了兼容性吗?)

//1.struct change    stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));    memset( &arg,0,sizeof(arg) );    arg.iEpollFd = epfd;    arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));    arg.nfds = nfds;    stPollItem_t arr[2];    if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)    {        arg.pPollItems = arr;    }        else    {        arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );    }    memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) );    arg.pfnProcess = OnPollProcessEvent; //记住这个函数,后续有用    arg.pArg = GetCurrCo( co_get_curr_thread_env() );//参数为当前Env指针

第二步,将poll结构加入到epoll的监听事件中

第三步,添加timeout事件

//3.add timeout    unsigned long long now = GetTickMS();    arg.ullExpireTime = now + timeout;    int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 将本事件加入到timeout的指定链表中    int iRaiseCnt = 0;    if( ret != 0 )    {        co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",                ret,now,timeout,arg.ullExpireTime);        errno = EINVAL;        iRaiseCnt = -1;    }    else    {        co_yield_env( co_get_curr_thread_env() );        iRaiseCnt = arg.iRaiseCnt;    }

AllocTimeout只初始化了60*1000(即60s)的链表数组,此时在AddTimeout中,将根据本监听事件的超时时间添加到对应的数组index中的链表中,是不是比较类似于java中的HashMap的实现方式?

这里有个问题,如果超时时间超过了60s,那么超时事件都会添加到当前index的前一个游标处,相当于有可能61s,65s的超时事件都会在同一个timeout链表中,那么会不会出现,由于时间还没到,而超时事件被处理呢?

AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );

添加完超时事件后,本协程调用co_yield_env放弃执行,stRoutineEnv_t将会调用其他的协程进行处理。

4. 轮询

将事件都加入到timeout链表,以及注册到epoll fd后,main 协程将调用co_eventloop进行轮询。

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg ){    if( !ctx->result )    {        ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );    }    co_epoll_res *result = ctx->result;    for(;;)    {        // 1. 调用epoll_wait        int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );        stTimeoutItemLink_t *active = (ctx->pstActiveList);        stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);        // 将timeout链表清空        memset( timeout,0,sizeof(stTimeoutItemLink_t) );        // 处理poll事件        for(int i=0;i
events[i].data.ptr; if( item->pfnPrepare ) { // 这个函数基本是 OnPollPreparePfn // 在pollPreaprePfn中,将poll_inner中添加的timeout事件删除,并添加到active list中 item->pfnPrepare( item,result->events[i],active ); } else { AddTail( active,item ); } } // 2. 将stTimeout_t中的timeout事件全部添加到timeout链表中 unsigned long long now = GetTickMS(); TakeAllTimeout( ctx->pTimeout,now,timeout ); // 设置其为timeout事件 stTimeoutItem_t *lp = timeout->head; while( lp ) { //printf("raise timeout %p\n",lp); lp->bTimeout = true; lp = lp->pNext; } // 3. 添加timeoutList 到 active list Join
( active,timeout ); // 4. 对active list进行遍历执行 lp = active->head; while( lp ) { PopHead
( active ); // 这里会对timeout事件进行判断,若时间不超时,仍然会将其加入到stTimeout_t的timeout数组队列中 if (lp->bTimeout && now < lp->ullExpireTime) { int ret = AddTimeout(ctx->pTimeout, lp, now); if (!ret) { lp->bTimeout = false; lp = active->head; continue; } } if( lp->pfnProcess ) { lp->pfnProcess( lp ); } lp = active->head; } if( pfn ) { if( -1 == pfn( arg ) ) { break; } } }}

具体步骤如下:

  1. 调用epoll_wait等待监听的事件
  2. 将stTimeout_t中的timeout链表清空
  3. 若epoll中有数据,则将对应的事件加入到stTimeout_t的active链表中;同时将timeout数组链表中删除本事件的超时事件
  4. 遍历timout数组链表,将已经超时的事件加入到timeout链表中
  5. 将timeout链表中的所有事件置为超时事件,需要后续特殊处理;同时将timeout链表合并到active链表
  6. 遍历active链表,对超时事件且当前时间未超过超时时间的,重新将其加入到timeout数组链表中,这就解决了上面超时时间超过60s的问题;对其他的事件进行处理

至此,整个libco库的事件监听的分析已经完成。

5. 总结

每个网络框架都会有一个类似eventloop的函数,用于轮询注册的io事件,libco库也不例外,轮询就是比较简单粗暴,但是又是很有效果。

libco库将socket相关的函数都进行了hook,使得调用者可以使用同步的方法进行编码,却能够异步的执行。

转载地址:http://tmicl.baihongyu.com/

你可能感兴趣的文章
session共享方法
查看>>
ASP.NET AJAX web chat application
查看>>
Codeforces Round #566 (Div. 2) B. Plus from Picture
查看>>
自己动手制作一个本地的yum仓库
查看>>
Ubuntu下用命令行快速打开各类型文件(转)
查看>>
Magento多语言设置——优化前台与后台实现方法
查看>>
leetcode121买股票
查看>>
SQL SERVER 2008中启用相应的功能
查看>>
Implementing a small Cron service in C# - CodeProject
查看>>
REST::Neo4p – PERL版本的”OGM”
查看>>
linux中service *** start与直接运行/usr/bin/***的区别
查看>>
剑指offer题目java实现
查看>>
Linux内核之于红黑树and AVL树
查看>>
LoaderManager使用详解(二)---了解LoaderManager
查看>>
EtherCAT对PHY有要求?
查看>>
ios应用内下载并安装另一个应用
查看>>
SQL GROUP BY 语句
查看>>
简单介绍一些HTML代码(字幕、音频和视频)
查看>>
Java——复选框:JCheckBox
查看>>
用android模拟器Genymotion定位元素
查看>>