经典的I/O服务设计(BIO模式)
经典的I/O服务设计中每连接对应一个线程(进程)的同步阻塞I/O模式。服务器端的Server是一个线程(进程),线程(进程)中执行一个死循环来阻塞的监听客户端的连接请求和通信。当客户端向服务器端发送一个连接请求后,服务器端的Server会接受客户端的请求,ServerSocket.accept()从阻塞中返回,得到一个与客户端连接相对于的Socket。构建一个handler,将Socket传入该handler。创建一个线程(进程)并启动该线程(进程),在线程(进程)中执行handler,这样与客户端的所有的通信以及数据处理都在该线程(进程)中执行。当该客户端和服务器端完成通信关闭连接后,线程(进程)就会被销毁。然后Server继续执行accept()操作等待新的连接请求。(这也是postgresql数据库的服务方式)
该模式的本质问题在于严重依赖线程(进程),但线程(进程)是非常宝贵的资源。随着客户端并发访问量的急剧增加,线程(进程)数量的不断膨胀将服务器端的性能将急剧下降。该模式有如下缺点:
- 线程(进程)生命周期的开销非常高。在Linux这样的操作系统中,线程本质上就是一个进程,创建和销毁都是重量级的系统函数。
- 资源消耗。内存:大量空闲的线程(进程)会占用许多内存。
- 稳定性。在可创建线程(进程)的数量上存在一个限制。
- 线程(进程)的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,不仅会带来许多无用的上下文切换,还可能导致执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统负载偏高、CPU sy(系统CPU)使用率特别高,导致系统几乎陷入不可用的状态。
- 若是长连接的情况下并且客户端与服务器端交互并不频繁的,那么客户端和服务器端的连接会一直保留着,对应的线程(进程)也就一直存在,但因为不频繁的通信,导致大量线程(进程)在大量时间内都处于空置状态。
如果你有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。
Reactor模式
先看Wikipedia上的描述:“The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.”。从这个描述中,我们知道Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。如果用图来表达:
Reactor模式要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将事件通知工作线程(处理单元)。除此之外,主线程不做任何其他实质性的工作。读写数据,接受新的连接,以及处理客户端请求均在工作线程中完成。使用同步I/O模式(以epoll_wait为例)实现的Reactor模式的工作流程是:
- 主线程往epoll内核事件表中注册socket上的读就绪事件
- 主线程调用epoll_wait等待socket上有数据可读
- 当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列
- 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
- 主线程调用epoll_wait等待socket可写
- 当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列
- 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果
在Linux上多路复用方案有select、poll、epoll,其中epoll的性能表现是最优秀的,能支持的并发量也是最大的。由于Redis对不同的Linux分支也支持其他多路复用函数,但是原理一致,所以我们以epoll进行介绍,这里先引入一个简单的示例:
int main() {
listen(lfd, ...); // 监听lfd描述符
cfd1 = accept(...);
cfd2 = accept(...);
efd = epoll_create(...); // 创建一个epoll对象
epoll_ctl(efd, EPOLL_CTL_ADD, cfd1, ...); // 向epoll对象中添加要管理的连接
epoll_ctl(efd, EPOLL_CTL_ADD, cfd2, ...);
epoll_wait(efd, ...) // 等待其管理的连接上的IO事件
// 处理触发的事件
}
单线程Reactor模式
Redis数据库网络层的主要逻辑如下所示(Redis早期版本都是单线程处理请求的),主要就是三个函数initServer、aeMain和aeDeleteEventLoop。
initServer
initServer用于监听服务器端口、初始化aeEventLoop结构体、为监听服务端网络描述符初始化aeFileEvent槽。首先我们知道服务器上可以有多个网卡,因此可以监听多个网络地址,这里redisServer中的ipfd成员的socketFds类型就是用于存放和管理这些网络地址创建的监听描述符;而这里的sofd成员是为UNIX套接字创建的监听描述符。
main src/server.c
|-- initServer src/server.c
| -- server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR)
| -- aeApiCreate(eventLoop)
| -- Open listening socket
| -- aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
| -- for(j = 0; j < server.ipfd_count; j++)
| -- aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
| -- aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler, NULL)
| -- aeSetBeforeSleepProc(server.el, beforeSleep)
aeApiCreate函数用于创建一个epoll对象,aeApiState结构体用于保存epoll_create创建的句柄epfd和aeFileEevent events一一对应的事件结构体epoll_event。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) { zfree(state); return -1; }
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) { zfree(state->events); zfree(state); return -1;}
eventLoop->apidata = state;
return 0;
}
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
aeCreateFileEvent函数为监听的网络套接字文件描述符初始化FileEvent槽(使用该文件描述符作为句柄索引该槽),调用aeApiAddEvent函数(select()机制中提供一fd_set的数据结构,实际上是一long类型的数组,每一个数组元素都能与一打开的文件句柄(不管是socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一socket或文件发生了可读或可写事件。我们主要关注epoll,select api的特殊使用我们无需关注)。将掩码mask设置到FileEvent的mask中,针对读/写事件设置相应的回调函数,首先这里的clientData设置为NULL(因为监听描述符没有客户端数据与之关联)。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;
return AE_OK;
}
aeApiAddEvent函数首先确定如果 fd 已经被监视某些事件,我们需要一个 MOD 操作,否则我们仅需要一个 ADD 操作。然后调用epoll_ctl函数向epoll对象中添加要管理的连接和监视的事件。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { // src/ae_epoll.c
aeApiState *state = eventLoop->apidata;
struct epoll_event ee; ee.events = 0;
/* If the fd was already monitored for some event, we need a MOD operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
由于是设置的是监听描述符的读事件,肯定是accept请求,所以需要注册的事件回调函数也是acceptTcpHandler。acceptCommonHandler函数将创建redisClient结构体,并将客户端连接句柄cfd设置到fd成员中,调用aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
为客户端连接句柄fd也获取一个FileEvent槽,设置其读事件回调函数为readQueryFromClient,将redisClient结构体与FileEvent槽关联,最后将redisClient结构体加入到客户端链表中(如果设置了 maxclient 并且再加入一个客户端会超过maxclient,直接关闭该连接。 请注意,我们创建客户端来代替之前检查这种情况,因为现在套接字已经设置为非阻塞模式,我们可以使用内核 I/O 发送错误)。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { // src/networking.c
int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN];
while(max--) { // 调用一次acceptTcpHandler可以accept多个请求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // anetGenericAccept -> accept
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0);
}
}
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; }
/* If maxclient directive is set and this is one client more... close the connection. Note that we create the client instead to check before for this condition, since now the socket is already set in non-blocking mode and we can send an error for free using the Kernel I/O */
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ }
server.stat_rejected_conn++;
freeClient(c);
return;
}
server.stat_numconnections++;
c->flags |= flags;
}
createClient函数用于创建redisClient结构体,如果我们传入 -1 作为 fd ,可以创建未连接的客户端( 这很有用,因为所有 Redis 命令都需要在客户端的上下文中执行。 当命令在其他上下文[例如 Lua 脚本]中执行时,我们需要一个未连接的客户端)。如果传入的是正常的fd,则需要进行设置套接字为非阻塞模式等工作,最后也是最重要的就是需要调用aeCreateFileEvent为接受的新连接的网络套接字初始化aeFileEvent槽,设置其回调函数为readQueryFromClient,关联新创建的redisClient到aeFileEvent槽中。
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client. This is useful since all the Redis commands needs to be executed in the context of a client. When commands are executed in other contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) {
close(fd); zfree(c); return NULL;
}
}
... // 初始化其他成员
if (fd != -1) listAddNodeTail(server.clients,c); // 正常client结构,加入server.clients列表
initClientMultiState(c);
return c;
}
aeMain
aeMain函数其实就是循环监听事件发生,并调用相应的回调函数进行处理,其中最重要的就是aeApiPoll等待监听函数和回调函数的执行。
|-- aeMain(server.el)
| -- eventLoop->stop = 0;
| -- while (!eventLoop->stop)
| -- if(eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop)
| -- aeProcessEvents(eventLoop, AE_ALL_EVENTS); src/ae.c
| -- if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)))
| -- numevents = aeApiPoll(eventLoop, tvp);
| -- for (j = 0; j < numevents; j++)
| -- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]
| -- if (fe->mask & mask & AE_READABLE)
| -- fe->rfileProc(eventLoop,fd,fe->clientData,mask);
| -- if (fe->mask & mask & AE_WRITABLE)
| -- fe->wfileProc(eventLoop,fd,fe->clientData,mask);
| -- if (flags & AE_TIME_EVENTS)
| -- processed += processTimeEvents(eventLoop);
|-- aeDeleteEventLoop(server.el)
aeApiPoll等待监听函数将从eventLoop中取出aeApiState(由epoll_wait函数监听的所有事件数组)和超时时间传入epoll_wait函数。如果epoll_wait函数的返回值大于零,说明有监听事件发生。将有事件发生的fd、事件类型设置到fired event槽中。用于后续遍历fired event槽取出fd,并找到相应的aeFileEvent槽,执行相应的回调函数。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j; numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
回调函数有如下两类:readQueryFromClient可读事件回调和sendReplyToClient可写事件回调(prepareClientToWrite --> aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c)
函数设置可写事件回调函数)
aeDeleteFileEvent --> aeApiDelEvent
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
参考资料:
http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html