前面的两篇文章中介绍了微信的libco库如何和,本篇文章将介绍libco库是进行事件管理的。
libco库使用一种类似时间片的技术进行轮询,使得每个注册在其上的事件都有机会执行。
1. 基础数据结构
在上一篇文章中介绍stCoRoutineEnv_t
时,我们将stCoEpoll_t
这个结构跳过了,现在我们来仔细分析下这个数据结构。
struct stCoEpoll_t{ int iEpollFd; static const int _EPOLL_SIZE = 1024 * 10; struct stTimeout_t *pTimeout; //用于保存timeout item struct stTimeoutItemLink_t *pstTimeoutList; // 在后续的event_ loop中介绍 struct stTimeoutItemLink_t *pstActiveList; co_epoll_res *result; };
stCoEpoll_t
中主要保存了epoll监听的fd,以及注册在其中的超时事件。
stTimeoutItem_t
其实是libco库实现的双向链表,有prev和next指针,同时保存了链表指针。后面在使用过程中再介绍stTimeout_t
。 struct stTimeoutItem_t{ enum { eMaxTimeout = 40 * 1000 //40s }; stTimeoutItem_t *pPrev; stTimeoutItem_t *pNext; stTimeoutItemLink_t *pLink; unsigned long long ullExpireTime; OnPreparePfn_t pfnPrepare; OnProcessPfn_t pfnProcess; void *pArg; // routine bool bTimeout;};struct stTimeoutItemLink_t{ stTimeoutItem_t *head; stTimeoutItem_t *tail;};struct stTimeout_t{ stTimeoutItemLink_t *pItems; int iItemSize; unsigned long long ullStart; long long llStartIdx;};
2. 初始化
在上篇文章中,在初始化本线程的stCoRoutineEnv_t
时,在co_init_curr_thread_env
的最后,会调用AllocEpoll() => AllocTimeout()
方法,我们看一下AllocTimeout中具体做了哪些事情。
stTimeout_t *AllocTimeout( int iSize ){ stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) ); lp->iItemSize = iSize; lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize ); lp->ullStart = GetTickMS(); lp->llStartIdx = 0; return lp;}
- 申请了60*1000个timeoutLink链表
- 设置当前时间为起始时间
- 设置当前游标为0
3. 添加监听事件
下面以一个简单的客户端链接服务器的例子在说明在libco中是如何添加监听事件的。
fd = socket(PF_INET, SOCK_STREAM, 0); struct sockaddr_in addr; SetAddr(endpoint->ip, endpoint->port, addr); ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
由于在libco库中hook了socket和connect的函数,因此,这个逻辑会调用poll
函数,最终将调用co_poll_inner
。下面介绍co_poll_inner
的具体逻辑。
//1.struct change stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t))); memset( &arg,0,sizeof(arg) ); arg.iEpollFd = epfd; arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd)); arg.nfds = nfds; stPollItem_t arr[2]; if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) { arg.pPollItems = arr; } else { arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) ); } memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) ); arg.pfnProcess = OnPollProcessEvent; //记住这个函数,后续有用 arg.pArg = GetCurrCo( co_get_curr_thread_env() );//参数为当前Env指针
第二步,将poll结构加入到epoll的监听事件中
第三步,添加timeout事件//3.add timeout unsigned long long now = GetTickMS(); arg.ullExpireTime = now + timeout; int ret = AddTimeout( ctx->pTimeout,&arg,now ); // 将本事件加入到timeout的指定链表中 int iRaiseCnt = 0; if( ret != 0 ) { co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld", ret,now,timeout,arg.ullExpireTime); errno = EINVAL; iRaiseCnt = -1; } else { co_yield_env( co_get_curr_thread_env() ); iRaiseCnt = arg.iRaiseCnt; }
在AllocTimeout
只初始化了60*1000(即60s)的链表数组,此时在AddTimeout
中,将根据本监听事件的超时时间添加到对应的数组index中的链表中,是不是比较类似于java
中的HashMap
的实现方式?
AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem );
添加完超时事件后,本协程调用co_yield_env
放弃执行,stRoutineEnv_t将会调用其他的协程进行处理。
4. 轮询
将事件都加入到timeout链表,以及注册到epoll fd后,main 协程将调用co_eventloop
进行轮询。
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg ){ if( !ctx->result ) { ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE ); } co_epoll_res *result = ctx->result; for(;;) { // 1. 调用epoll_wait int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 ); stTimeoutItemLink_t *active = (ctx->pstActiveList); stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList); // 将timeout链表清空 memset( timeout,0,sizeof(stTimeoutItemLink_t) ); // 处理poll事件 for(int i=0;ievents[i].data.ptr; if( item->pfnPrepare ) { // 这个函数基本是 OnPollPreparePfn // 在pollPreaprePfn中,将poll_inner中添加的timeout事件删除,并添加到active list中 item->pfnPrepare( item,result->events[i],active ); } else { AddTail( active,item ); } } // 2. 将stTimeout_t中的timeout事件全部添加到timeout链表中 unsigned long long now = GetTickMS(); TakeAllTimeout( ctx->pTimeout,now,timeout ); // 设置其为timeout事件 stTimeoutItem_t *lp = timeout->head; while( lp ) { //printf("raise timeout %p\n",lp); lp->bTimeout = true; lp = lp->pNext; } // 3. 添加timeoutList 到 active list Join ( active,timeout ); // 4. 对active list进行遍历执行 lp = active->head; while( lp ) { PopHead ( active ); // 这里会对timeout事件进行判断,若时间不超时,仍然会将其加入到stTimeout_t的timeout数组队列中 if (lp->bTimeout && now < lp->ullExpireTime) { int ret = AddTimeout(ctx->pTimeout, lp, now); if (!ret) { lp->bTimeout = false; lp = active->head; continue; } } if( lp->pfnProcess ) { lp->pfnProcess( lp ); } lp = active->head; } if( pfn ) { if( -1 == pfn( arg ) ) { break; } } }}
具体步骤如下:
- 调用epoll_wait等待监听的事件
- 将stTimeout_t中的timeout链表清空
- 若epoll中有数据,则将对应的事件加入到stTimeout_t的active链表中;同时将timeout数组链表中删除本事件的超时事件
- 遍历timout数组链表,将已经超时的事件加入到timeout链表中
- 将timeout链表中的所有事件置为超时事件,需要后续特殊处理;同时将timeout链表合并到active链表
- 遍历active链表,对超时事件且当前时间未超过超时时间的,重新将其加入到timeout数组链表中,这就解决了上面超时时间超过60s的问题;对其他的事件进行处理
至此,整个libco库的事件监听的分析已经完成。
5. 总结
每个网络框架都会有一个类似eventloop的函数,用于轮询注册的io事件,libco库也不例外,轮询就是比较简单粗暴,但是又是很有效果。
libco库将socket相关的函数都进行了hook,使得调用者可以使用同步的方法进行编码,却能够异步的执行。