I/O多路复用之epoll

简介

I/O多路复用(I/O multiplexing)使得程序可以同时监测多个文件描述符,查看它们是否就绪(ready),即能否进行I/O操作。能够进行I/O操作,具体来说就是,在描述符上执行I/O相关的系统调用时不会阻塞,函数要么成功返回,要么返回错误,进程不会进入睡眠状态。

在Linux系统中,多路复用由selectpollepoll三个函数支持。它们的作用相同,特别地,pollepoll的接口非常相似。在处理大量描述符时,epoll的性能优于另外两个函数。epoll同时支持水平触发(level-triggered)和边沿触发(edge-triggered)模式,而selectpoll只支持水平触发模式。本文结合一个回射服务器程序来介绍epoll的基本用法。

触发模式

水平触发和边沿触发模式的区别如下:

  • 水平触发:如果对文件描述符执行I/O操作不会阻塞,则该文件描述符状态为就绪,内核会通知进程
  • 边沿触发:当文件描述符上有新的I/O事件到来时,内核才会通知进程

如果使用水平触发模式,当内核通知文件描述符可读写时,接下来可以继续去检测它的状态,看它是否依然可读或可写。所以在收到通知后,没必要一次执行尽可能多的读写操作。

如果使用边沿触发模式,I/O事件发生时才会有通知,只有另一个I/O事件到来时才会收到新的通知。由于我们不知道到底能读写多少数据,所以在收到通知后应尽可能地读写数据,以免错失读写的机会。如果使用循环从文件描述符读写数据,且文件描述符是阻塞的,那么没有数据可读写时,进程会阻塞在读写函数那里。所以边沿触发模式一般和非阻塞I/O搭配使用,程序会一直执行I/O操作,直到系统调用(如readwrite)返回错误,错误类型为EAGAINEWOULDBLOCK。关于这点,详情请看下面代码中的do_echo函数。

echo服务器

我们通过一个回射服务器程序来展示epoll的标准用法。先简要介绍epoll的接口。主要由三个函数组成:

  • epoll_create创建新的epoll实例,一般使用最新的epoll_create1调用
  • epoll_ctl管理感兴趣的文件描述符和相应事件
  • epoll_wait返回就绪的文件描述符,然后对它们进行I/O操作

程序的主要流程如下:

  1. 创建一个监听套接字lfd,调用set_nonblocking函数将其设置为非阻塞
  2. 调用epoll_create1函数创建一个epoll实例,对应的文件描述符为epfd
  3. 将监听套接字lfd加入到epfd的事件列表event中,监听的事件为EPOLLIN
  4. 进入死循环,epoll_wait函数一直阻塞,直到有事件发生。事件信息保存在evlist
  5. 检查文件描述符及其事件:
    • 如果是发生在监听套接字lfd上的事件,则收到了一个客户请求,将返回的连接套接字cfd设置为边沿触发(EPOLLET)模式,加入到事件列表event
    • 如果是连接套接字cfd上发生的事件,对EPOLLIN事件,调用do_echo函数执行回射操作;对EPOLLERREPOLLHUP事件,则关闭描述符

全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#define PORT_NUM 40713
#define MAX_EVENTS 1024

void error(char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}

void set_nonblocking(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL, NULL)) < 0) {
error("ERROR fcntl F_GETFL");
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
error("ERROR fcntl F_SETFL");
}
}

void do_echo(int fd) {
char c, buf[100];
int cnt = 0;
for (;;) {
int n = read(fd, &c, 1);
if (n < 0) {
if (errno == EAGAIN) {
break;
}
error("ERROR read from client");
} else if (n == 0) {
break;
}
buf[cnt++] = c;
if (c == '\n') {
if (write(fd, buf, cnt) < 0) {
error("ERROR write to client");
}
cnt = 0;
continue;
}
}
}

int main(int argc, char **argv) {
int lfd;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;

int epfd;
struct epoll_event event;
struct epoll_event *evlist;

server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT_NUM);

if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
error("ERROR socket");
}
set_nonblocking(lfd);
if (bind(lfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
error("ERROR bind");
}
if (listen(lfd, 16) < 0) {
error("ERROR listen");
}

epfd = epoll_create1(0);
if (epfd < 0) {
perror("ERROR epoll_create1");
}
event.events = EPOLLIN;
event.data.fd = lfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &event) < 0) {
error("ERROR epoll_ctl");
}
evlist = malloc(sizeof(struct epoll_event)*MAX_EVENTS);

for (;;) {
int cfd;
socklen_t len = sizeof(client_addr);
int ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1);
if (ready == -1) {
error("ERROR epoll_wait");
}

printf("Ready: %d\n", ready);
for (int i = 0; i < ready; i++) {
printf(" fd = %d; events: %s%s%s\n", evlist[i].data.fd,
(evlist[i].events & EPOLLIN) ? "EPOLLIN " : "",
(evlist[i].events & EPOLLHUP) ? "EPOLLHUP " : "",
(evlist[i].events & EPOLLERR) ? "EPOLLERR " : "");
}

for (int i = 0; i < ready; i++) {
if (evlist[i].data.fd == lfd) {
cfd = accept(lfd, (struct sockaddr *)&client_addr, &len);
if (cfd < 0) {
perror("ERROR accept");
continue;
}
printf(" accepting fd %d\n", cfd);
set_nonblocking(cfd);
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
event.data.fd = cfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &event);
} else if (evlist[i].events & EPOLLIN) {
cfd = evlist[i].data.fd;
printf(" reading from fd %d\n", cfd);
do_echo(cfd);
} else if (evlist[i].events & (EPOLLHUP | EPOLLERR)) {
printf(" closing fd %d\n", evlist[i].data.fd);
if (close(evlist[i].data.fd) == -1) {
error("ERROR close");
}
}
}
}

close(lfd);
exit(EXIT_SUCCESS);
}

参考

  1. The Linux Programming Interface
  2. epoll(7) - Linux manual page