I/O 多路复用之 epoll
背景
前文总结了IO复用中select的相关知识 ,其有着性能上的缺点。Linux内核在2.6中支持的epoll在大量fd场景下有着更优秀的性能。
例如Redis在使用哪个IO复用的系统调用时,会查看当前系统运行环境,优先匹配epoll。
下面开始对epoll做个介绍,并给出一个运行demo加深理解。
基本介绍和用法
epoll API的核心概念是一个内核的数据结构叫epoll实例。从用户态视角来看,可认为它是一个提供了两个链表的容器:
-
需要监控要关注的队列 也叫epoll集合,就是我们需要监控哪些fd的集合
-
就绪的队列 发生事件的fd集合,是上边的子集,这个队列当然每次wait后会变化
图片描述:
epoll 相比 select,拆分了更多个函数调用。有以下3个系统调用来创建和管理epoll实例。
- 调用1:epoll_create
int epoll_create(int size);
第一步骤,创建一个新epoll实例,返回fd指向这个epoll实例。Linux 2.6.8后,size会被忽略,只需要大于0即可。 epoll早期版本需要size,用户传递期望fd数量大小。
- 调用2:epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第二步骤,把要关注的fd添加到epoll实例中。
各参数含义:
-
epfd
第一步的epoll实例fd
-
op
参数 | 含义 |
---|---|
EPOLL_CTL_ADD | 增加一个要关注的fd,和指定的event(下面介绍这个结构) |
EPOLL_CTL_MOD | 修改这个fd 对应的event,比如监听可读 改成监听可写 |
EPOLL_CTL_DEL | 删除指定的要关注的fd |
从这个op参数能看出来,epoll的易用性要比select高,不再需要每次重新塞入全部fd,只需要增删改即可。
-
fd
要关注的fd,epll_ctl一次只能放一个
-
epoll_event 结构体
epoll_event 包含一个epoll events(要关注事件类型)和 user data(可来回传递的值,如要关注的fd)
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
a). epoll events
epoll事件,可读可写等
参数 | 含义 |
---|---|
EPOLLET | edge-triggered notification 边缘触发,默认水平触发(下边讲到) |
EPOLLIN | 可读事件 |
EPOLLOUT | 可写事件 |
… | 其他不列举更多,可以参考手册 |
b). user data
从epoll_data 联合体能看到,可以放入要关注的fd。epoll_ctl放入的user data,会在epoll_wait中原样返回,方便用来传递信息。
- 调用3:epoll_wait
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
第三步骤,epoll_wait()系统调用用来等待epoll实例(epoll fdset)上事件发生。
第二个参数events值得说下,和epoll_ctl()的参数一样,函数返回时,这里的值就是就绪队列下的fd集合。并且epoll_event.data下的用户数据和上一次调用的epoll_ctl里的data一样,这就能做到了fd的关联。
触发方式
epoll支持两种通知机制,Level-triggered and edge-triggered(水平触发,边缘触发)。默认是水平触发。
这两种机制是什么意思?
手册上举了个例子,5个操作顺序:
- 一个管道fd注册到epoll实例
- 管道writer写了2kb数据
- 调用了epoll_wait(),1的fd可读就绪
- 读了1kb数据
- 继续调用epoll_wait()
区分水平和边缘触发主要就是看第5步。
如果是边缘触发,那么步骤5会hang住,虽然缓冲区还有1kb数据未读。这就是边缘触发模式的机制,它只有当fd发生变化时返回。而水平触发在第5步还会返回。
因此边缘触发编程有个建议: fd用非阻塞IO,这样可以不被阻塞而持续从缓冲区读数据,直到返回EAGAIN。
疑问和理解
-
为什么用红黑树,而不用链表、哈希表结构?
对于select的fd集合,每次都是全集一次塞入,使用的线性数据结构数组bitmap。对于epoll,给用户提供了增删改fd的操作调用,因此内核要有一个空间来存放它一直维护着。
增删改查的操作,什么数据结构比较稳定呢?红黑树作为一种平衡二叉查找树其在增删改查的性能上比较靠谱。如果是链表,当fd集合比较大时,查找是耗时的;如果是哈希表,查找是快了,但增删可能存在扩缩容的问题,不够稳定。
-
API使用问题
可参考手册
epoll 使用demo,简易服务端
#define IPADDRESS "127.0.0.1"
#define PORT 9000
#define MAXSIZE 87380
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
int main(int argc,char *argv[]) {
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}
static int socket_bind(const char* ip,int port) {
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1) {
perror("socket error:");
exit(1);
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ready_cnt;
char buf[MAXSIZE];
memset(buf, 0, MAXSIZE);
// 创建epoll实例
epollfd = epoll_create(FDSIZE);
// 加入监听描述符,等待可读事件
add_event(epollfd, listenfd, EPOLLIN);
for ( ; ; ) {
// epoll_wait
ready_cnt = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
// 用户事件处理,wait返回的events包含传入的fd 用来匹配事件
handle_events(epollfd, events, ready_cnt, listenfd, buf);
}
close(epollfd);
}
static void
handle_events(int epollfd, struct epoll_event *events, int num,
int listenfd, char *buf) {
int i;
int fd;
for (i = 0; i < num; i++) {
// 从epoll_wati返回的events里的userdata,和ctl写进去的一样
fd = events[i].data.fd;
// 监听描述符处理 accept新的fd
if ((fd == listenfd) && (events[i].events & EPOLLIN))
handle_accpet(epollfd, listenfd);
// 处理读事件
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}
static void handle_accpet(int epollfd,int listenfd) {
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
// accept后生成新的连接描述符
clifd = accept(listenfd, (struct sockaddr*) &cliaddr, &cliaddrlen);
if (clifd == -1)
perror("Accpet error:");
else {
printf("Accept a new client: %s:%d\n",
inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
// 把新的连接描述符放到epoll,监听可读事件
add_event(epollfd, clifd, EPOLLIN);
}
}
static void do_read(int epollfd, int fd, char *buf) {
int nread;
nread = read(fd, buf, MAXSIZE);
if (nread == -1) {
perror("Read error:");
delete_event(epollfd, fd, EPOLLIN);
close(fd);
}
else if (nread == 0) {
fprintf(stderr, "Client closed.\n");
delete_event(epollfd, fd, EPOLLIN);
close(fd);
}
else {
//printf("Read message is : %s", buf);
// 读取完客户端信息后,改成对该fd监听可写事件
modify_event(epollfd, fd, EPOLLOUT);
}
}
static void do_write(int epollfd, int fd, char *buf) {
int nwrite;
nwrite = write(fd, buf, strlen(buf));
if (nwrite == -1) {
perror("Write error:");
delete_event(epollfd, fd, EPOLLOUT);
close(fd);
}
else
// 写给完客户端信息后,又再把fd改成监听可读事件
modify_event(epollfd, fd, EPOLLIN);
memset(buf, 0, MAXSIZE);
}
static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
// epoll_ctl 注册新增描述符监听
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
printf("Add event failed!\n");
}
}
static void delete_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) < 0) {
printf("Delete event failed!\n");
}
}
static void modify_event(int epollfd,int fd,int state) {
// epoll_event:1. 事件类型 2. userdata.fd
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
// epoll_ctl 修改描述符监听事件类型
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
printf("Modify event failed!\n");
}
}
以上就是对epoll的理解和使用介绍。