基础知识-Epoll
最近在学习Linux网络编程,目前进展到和epoll相关的部分了
,我的学习路线是
- 第一阶段:能写阻塞式 socket 程序
目标:写出最普通的 TCP echo server / client。 - 第二阶段:理解 TCP 是“字节流”,不是“消息流”
目标:写一个带协议的服务,例如:4 字节长度 + body - 当前阶段:进入 non-blocking + epoll
写一个单线程并发 TCP server。 - 第四阶段:简单的 Reactor 网络库
按 muduo 的概念拆
按照之前的计划,用输出倒逼自己深入学习,目前的打算学一点就写一点 ![]()
编程的部分已经写好了,不过我不懂linux下的调试,还要学习一段时间 正好做个拆分吧 其实感觉每一部分都比较长了 应该没人看完吧![]()
Epoll是什么
Epoll是linux中的一种通知机制,即让内核通知你某些文件描述符上你感兴趣的事件,如你对几个fd上的读写事件感兴趣,你就可以用epoll_create新建一个epoll,用epoll_ctl将自己感兴趣的fd和想监听的事件类型传进去,同时给一个epoll_event类型的数组,数组的大小就是你让内核一次最多通知你的数量。linux内核会帮你监控这组fd上的io事件,如果你感兴趣的事件到来,把数据填到这个epoll_event数组里。
#include <sys/epoll.h>
struct epoll_event {
uint32_t events; // 事件类型
epoll_data_t data; // 用户数据,常用 data.fd 存 fd
};
epoll_create1(int flag); // flag一般传0
epoll_ctl(int epoll_fd, int op, int fd, epoll_event* event); // event里写你感兴趣的事件
int epoll_wait(
int epfd,
struct epoll_event *events, //返回的事件
int maxevents,
int timeout
); // 会阻塞 等待事件到来
这里epoll_ctl的event是你告诉内核,你想要监控什么fd上的什么事件,而epoll_wait在等待到事件后会返给你一个event,里面有fd和实际发生的事件。
非阻塞
IO有几种模型,其中两种就是阻塞式和非阻塞式,他们的区别就一句话:
当你尝试读数据而数据未就绪的时候,是否会立刻返回
在尝试使用recv从一个fd上读数据,而这个fd上没有数据到来时:
阻塞IO: 阻塞在这等待数据到来
非阻塞IO:马上返回
显然想要最大利用资源,非阻塞IO是必须用的。记得不要只设置被accept的fd为非阻塞,监听fd自身也要设置为非阻塞,否则如果队列里没有要accept的连接而你调了accept,listener fd就会被阻塞,直接前功尽弃;
设置一个fd为非阻塞的方法是使用int fcntl(int fd, int op, ...);中的F_GETFL和F_SETFL, 先获取到flag然后|上```O_NONBLOCK ``,一般封装成一个bool setNoBlock(int fd)函数
int set_nonblocking(int fd)
{
// 1. 获取原来的 file status flags
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl F_GETFL");
return -1;
}
// 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志
flags |= O_NONBLOCK;
// 3. 将新的 flags 设置回去
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl F_SETFL");
return -1;
}
return 0;
}
Epoll的水平触发和边沿触发
假设你关注的是数据到来的事件
水平触发LT:只要你没读完,缓冲区还有剩数据,就会一直收到event的通知
边沿触发ET:只有数据到来时会发一下,即无数据->有数据会触发,后面不发,你没读完也不发,所以你必须保证在一次ET中读完所有数据
Epoll+非阻塞 单线程模型
只有一个主线程,同时负责accept到来的连接和接收已经连接上的fd发出的事件。如果事件很多会导致主线程忙,不能即时accept到来的连接。
// 主线程
void work_main(){
...
epoll_event events[MAX_EVENTS];
// event是接受到来的事件的数组
int n = epoll_wait(epfd, events, MAX_EVENTS, timeout_ms);
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
uint32_t ev = events[i].events;
if(fd == listenerFD){
// 是监听FD,需要accept连接
}
else{
// 数据来了 需要读数据
}
}
}
Epoll+非阻塞+线程池
开一个主线程和线程池,主线程只负责accept到来的连接,然后将fd添加到epoll。当事件到来时,就唤醒线程池的一个线程去读(使用条件变量机制),大大提升效率。
重要处理,边缘/水平触发都要小心一个fd被多个worker处理的情况
水平触发就不用讲了,只要A一次没读完,他就会继续给出event,就可能被错误地分给B线程读。但出乎一些人的意料,边缘触发也会产生这种情况:
fd=10 第一次可读
epoll_wait 返回 fd=10
worker A 正在处理 fd=10
这时第二批数据又到来
epoll 可能再次返回 fd=10
主线程又投递给 worker B
所以我们要使用EPOLLONESHOT阻止这种情况(具体代码放下面了),给epoll里的fd设置这个flag后,当epoll产生了数据到来的事件后会直接禁用epoll对这个fd的监听,但fd没被禁用,数据依然可以到来并进入内核缓冲区,不过epoll对他不再感兴趣,需要你重新将fd加入到epoll监听;这样就可以避免多次通知导致几个线程同时操作一个fd。
t1: fd=10 注册 EPOLLIN | EPOLLET | EPOLLONESHOT
t2: 第一批数据到来
t3: epoll_wait 返回 fd=10
t4: fd=10 在 epoll 中被自动 disabled
t5: 主线程把 fd=10 投递给 worker A
t6: worker A 正在读 / 解析
t7: 第二批数据又到来
t8: 不会再因为 fd=10 产生新的 epoll 通知给主线程
简而言之,最好使用EPOLLONESHOT,worker 持有 fd 期间要一直读到 EAGAIN或者EWOULDBLOCK(内核另一种委婉地告诉你没数据了的方法,即: 你再读会阻塞),再重启监听。
重启后如果fd就绪可读且还有没读完的数据,就又会收到一个event,然后周而复始。也有一种可能是新来的数据已经被第一个worker读完了,虽然他不知道新来数据了,但他的目的是读到EAGAIN,可能碰巧完成了任务。
被禁用后启用监听的方法,其实和添加监听一模一样:
void add_fd(int epfd, int fd) {
epoll_event ev{};
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // 被禁用后重新启动就改成EPOLL_CTL_MOD,其他一样
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
基础知识-多线程
毕竟我们要使用多线程开发,一些多线程知识还是必不可少
Linux下现代C++开启一个线程的方法
#include <thread>
void function(){
while(true)
print("Hello world");
}
std::thread t = thread(&function); // 此时已经在运行function,也就是在疯狂打印hello world
t.join() //等待线程执行完成,必须调用join(某些情况下detach),否则thread在析构时会直接terminate导致程序退出
// 但是因为function里写的是死循环,所以其实永远也等不到t主动关闭
// 这么写是为了规范
经典的生产者消费者模型
使用epoll+多线程我们用的是生产者-消费者模型
- 我们会有1个主线程,专门负责accept到来的连接和接收数据到来的事件,注意他只是接收事件,他自己不读,它让其他线程读
- 很多的worker线程,专门负责读数据,不自己处理连接和数据到来事件
那么如何让这些线程联动呢?答案就是生产者-消费者模型中的队列;我们会放个全局的std::queue队列,然后让全部的worker线程都去等待这个队列,他们会全部阻塞;然后再启主线程,不断地将到需要读数据的fd的相关信息放到queue中,放一个唤醒一个worker线程,worker线程会自己取走queue中的信息,然后自己开始读;
这种设计能大大提高效率,
锁的使用
我们用锁的方式来保护queue队列,不管是主线程还是子线程,操作queue时必须先获得锁,为了方便,我们直接使用std::unique_lock来获得锁,他是一个RAII的锁,也就是初始化时上锁,析构时解锁
void func(){
{
std::unique_lock(mutex); //已上锁
queue.pop();
}
// unique_lock析构了,此时已解锁
}
那么初始化时如何让所有的worker线程都因等待queue而阻塞呢?这样我们才能在数据到来时一个个唤醒。答案是使用std::conditional_variable条件变量,使用方式是这样的
// 全局的
std::mutex mutex;
std::condition_variable cv;
std::queue<event> queue;
void workder(){ // 假设这是子线程的锁
std::unique_lock l(mutex); // 条件变量必须和锁配合使用,且必须是已经上锁的变量
cv.wait(l, [](){ return !event.isEmpty();}); // !event.isEmpty()是一个pred谓词,这里等价于
// while(!pred){ 继续等待 } 即如果队列为空就等待,而wait会解锁传给他的lock,等他被唤醒时再尝试重新上锁
// 导致的结果是,所有worker经历了:获取到了锁,再主动解锁然后阻塞
// 然后我们就可以启主线程,获取锁,往队列里放东西,解锁,唤醒一个线程让他去获取锁并拿走数据
}
参考资料
epoll(7) - Linux manual page
accept(2) - Linux manual page
fcntl(2) - Linux manual page
https://en.cppreference.com/w/cpp/thread/condition_variable/wait
https://en.cppreference.com/w/cpp/thread/thread/~thread
https://en.cppreference.com/w/cpp/thread/unique_lock
8 个帖子 - 6 位参与者