Reactor模式
服务器基础 三种fd
认识三个fd
listenfd | connfd | clientfd |
---|---|---|
服务器监听连接请求 | accept 返回connfd,后续和客户端的通信fd | 客户端进行connect连接请求的 clientfd |
基本服务器模型 单线程阻塞
int listenfd; // 监听套接字
while(true) {
int connfd;
// 主线程阻塞在 accept 上直到返回已连接套接字
if ((connfd=accept(listenfd)) >= 0) {
// 如果返回 大于0,代表有新连接产生
dothing(connfd); // 处理请求
close(connfd); // 关闭连接
}
}
这是最基础的服务器模型,一个线程来监听、处理请求。这在请求处理耗时小,并发不高的场景下,还是适用的。
一旦并发高起来,很多客户端都要处于等待状态,等着服务器一个一个处理,性能比较差。
阻塞体现在哪了? 我们可以认为有两个地方
- 一个业务 dothing()函数处理完了,才能 accept()下一个连接,这里认为是一种比较广义的阻塞
- 另一个阻塞主要体现在,dothing()函数中对connfd这个fd的读写,使用的阻塞函数如read() write()
基本模型上优化 利用多线程
如上的单线程模型性能较差,我们可以把处理业务请求分给每个线程来处理(这里池化了线程),利用多线程技术进行优化。
这里的优化实际是对上述阻塞1的优化,业务处理交给其他线程,让主线程赶紧接着处理 accept()
int listenfd; // 监听套接字
poolExec threadPoolExec; // 模拟一个线程池
while(true) {
int connfd;
// 主线程阻塞在 accept 上直到返回已连接套接字
if ((connfd=accept(listenfd)) >= 0) {
// 如果返回 大于0,代表有新连接产生
// 提交给线程池处理
threadPoolExec.submit(connfd)
}
}
每一个处理线程还是会阻塞在 connfd 的IO读写上,性能也提不上去。
尤其这样的场景:有两个请求,线程池里的某个线程得到了这两个connfd,第一个请求fd还在等待数据的时候,虽然第二个fd可能实际已经有了,但没办法第一个fd还在等,这个线程也就只能等。如下:
// 该线程执行顺序
void run() {
// 先处理 connfd1
read(connfd1);
dothing(connfd1);
wirte(connfd1);
// 再处理connfd2
read(connfd1);
dothing(connfd1);
wirte(connfd1);
}
IO复用模型
上述的场景,众所周知引出IO复用模型了。IO复用虽然在select() 或者 epoll_wait()上也是阻塞的,但他的强大优点在于有多fd时,能同时监听到多fd的读写事件。(后边为了行文方便,IO复用就用epoll代替)。
epoll 使用demo,简易服务端
epoll使用可以参考之前的文章 I/O 多路复用之 epoll
#define IPADDRESS "127.0.0.1"
#define PORT 9000
#define MAXSIZE 87380
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100
int main(int argc,char *argv[]) {
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
}
static int socket_bind(const char* ip,int port) {
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1) {
perror("socket error:");
exit(1);
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, ip, &servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ready_cnt;
char buf[MAXSIZE];
memset(buf, 0, MAXSIZE);
// 创建epoll实例
epollfd = epoll_create(FDSIZE);
// 加入监听描述符,等待可读事件
add_event(epollfd, listenfd, EPOLLIN);
for ( ; ; ) {
// epoll_wait
ready_cnt = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
// 用户事件处理,wait返回的events包含传入的fd 用来匹配事件
handle_events(epollfd, events, ready_cnt, listenfd, buf);
}
close(epollfd);
}
static void
handle_events(int epollfd, struct epoll_event *events, int num,
int listenfd, char *buf) {
int i;
int fd;
for (i = 0; i < num; i++) {
// 从epoll_wati返回的events里的userdata,和ctl写进去的一样
fd = events[i].data.fd;
// 监听描述符处理 accept新的fd
if ((fd == listenfd) && (events[i].events & EPOLLIN))
handle_accpet(epollfd, listenfd);
// 处理读事件
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}
static void handle_accpet(int epollfd,int listenfd) {
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
// accept后生成新的连接描述符
clifd = accept(listenfd, (struct sockaddr*) &cliaddr, &cliaddrlen);
if (clifd == -1)
perror("Accpet error:");
else {
printf("Accept a new client: %s:%d\n",
inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
// 把新的连接描述符放到epoll,监听可读事件
add_event(epollfd, clifd, EPOLLIN);
}
}
static void do_read(int epollfd, int fd, char *buf) {
int nread;
nread = read(fd, buf, MAXSIZE);
if (nread == -1) {
perror("Read error:");
delete_event(epollfd, fd, EPOLLIN);
close(fd);
}
else if (nread == 0) {
fprintf(stderr, "Client closed.\n");
delete_event(epollfd, fd, EPOLLIN);
close(fd);
}
else {
//printf("Read message is : %s", buf);
// 读取完客户端信息后,改成对该fd监听可写事件
modify_event(epollfd, fd, EPOLLOUT);
}
}
static void do_write(int epollfd, int fd, char *buf) {
int nwrite;
nwrite = write(fd, buf, strlen(buf));
if (nwrite == -1) {
perror("Write error:");
delete_event(epollfd, fd, EPOLLOUT);
close(fd);
}
else
// 写给完客户端信息后,又再把fd改成监听可读事件
modify_event(epollfd, fd, EPOLLIN);
memset(buf, 0, MAXSIZE);
}
static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
// epoll_ctl 注册新增描述符监听
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
printf("Add event failed!\n");
}
}
static void delete_event(int epollfd,int fd,int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) < 0) {
printf("Delete event failed!\n");
}
}
static void modify_event(int epollfd,int fd,int state) {
// epoll_event:1. 事件类型 2. userdata.fd
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
// epoll_ctl 修改描述符监听事件类型
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
printf("Modify event failed!\n");
}
}
Reactor模式
Reactor模式是事件驱动模型,它正是利用了IO多路复用模型,给 fd先注册绑定上回调函数,当epoll 检测到该fd可读可写了,那么就去执行对应的这个函数。
这里又分两种模式:
- 单Reactor
简单说就是所有fd(包括 listenfd和各个 connfd)都由一个epoll注册。
监听这些 fd的一个epoll就对应了一个线程,但处理 connfd业务的Handler可以多个线程做。
- 多Reactor
利用多线程优势,简单说就是把fd 拆给不同的epoll来处理。通常把 listenfd给一个epoll专门处理,accept()后生成的 connfd交给其他epoll(指其他线程的epoll了)。
参考
- Scalable IO in Java
- 《Redis源码剖析与实战》