前情提要:
基础知识-Epoll
最近在学习Linux网络编程,目前进展到和epoll相关的部分了
,我的学习路线是
- 第一阶段:能写阻塞式 socket 程序
目标:写出最普通的 TCP echo server / client。- 第二阶段:理解 TCP 是“字节流”,不是“消息流”
目标:写一个带协议的服务,例如:4 字节长度 + body- 当前阶段:进入 non-blocking + epoll
写一个单线程并发 TCP server。- 第四阶段:简单的 Reactor 网络库
按 muduo 的概念拆
实战
封装函数
我把创建一个listenFD放到了一个方法里,这样调起来比较方便
#include "sys/socket.h"
#include <netinet/in.h>
#include <arpa/inet.h>
#include "unistd.h"
#include <iostream>
#include <fcntl.h>
int listenFD();
// 设置fd为非阻塞
int set_nonblocking(int fd);
实现
#include "common.h"
int listenFD()
{
// 进入准备连接的状态
int listenFD = socket(AF_INET, SOCK_STREAM, 0);
if (listenFD == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
return -1;
}
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
int ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
if (ret == 0)
{
std::cerr << "src does not contain a character string\
representing a valid network address in the specified address family ";
return -1;
}
else if (ret == -1)
{
std::cerr << "af does not contain a valid address family";
return -1;
}
ret = bind(listenFD, (sockaddr *)&addr, sizeof(addr));
if (ret == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
close(listenFD);
return -1;
}
ret = listen(listenFD, 3);
if (ret == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
close(listenFD);
return -1;
}
return listenFD;
}
int set_nonblocking(int fd)
{
// 1. 获取原来的 file status flags
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl F_GETFL");
return -1;
}
// 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志
flags |= O_NONBLOCK;
// 3. 将新的 flags 设置回去
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl F_SETFL");
return -1;
}
return 0;
}
先搭骨架
写好主线程的函数void worker_main()和void worker()
worker_main
在上一部分里我们讲到,worker的任务是
- 等待到来的连接 accept他们
- 把读数据的ReadTask放到全局队列里
开一个监听fd
既然是“等待到来的连接”,肯定需要一个监听fd,所以在worker_main进入循环之前,我们要新建一个listen fd
int epfd = -1;
void worker(){
int listener = listenFD();
// 错误处理?
}
这里设计到错误处理了,因为是socket()返回的结果,我们可以直接在linux中运行man socket或者网络搜索manpage socket,跳到其中的return value段落,对于其他函数的错误处理,我们都是如法炮制的。
这里可以看到错误时返回-1并设置errno(errno number),这里就直接判断-1并将errno转为str输出:
iostream提供 std::error
system_error 提供std::system_category().message(errno),将错误码转为str输出
int listener = listenFD();
if (listener == -1){
std::cout<< std::system_category().message(errno)<<std::endl;
// listener == -1 不是有效 fd,不需要 close
return;
}
之后将listener加入到epoll中,记得设置非阻塞(原因上一篇说了)
int epfd = -1;
void worker() {
int listener = listenFD();
if (listener == -1) {
std::cout << std::system_category().message(errno) << std::endl;
close(listener);
return;
}
set_nonblocking(listener);
epfd = epoll_create1(0); // 返回值同样用man命令查询
if (epfd == -1) {
std::cout << std::system_category().message(errno) << std::endl;
// listener == -1 不是有效 fd,不需要 close
return;
}
// 把listener加到epoll ctl的方法
// 同样通过manpage查询到
// man epoll_ctl或者网络搜索manpage epoll_ctl
// 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行
// 一定不要偷懒,要学会自己查的方法
epoll_event e; // man epoll_event
e.data.fd = listener;
// type的写法在man epoll_ctl的“The available event types are:”中
e.events = EPOLLIN; // 连接到来会发出EPOLLIN
epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e);
while (true) {
/* code */
}
}
真正的循环等待
在上一部分里我们讲到,worker的任务是
- 等待到来的连接 accept他们
- 把读数据的ReadTask放到全局队列里
这里就要真正编写循环等待逻辑了,从epoll中获取函数的事件是epoll_wait, 原型是
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这里的epfd当然是上面得到的epoll的fd,events是一个数组,需要你提供,当事件到来时,内核会把event一个个拷到你这个数组里,maxevents表示你一次想要多少个,比如一次最多要1024个,就新建一个event[1024],然后maxevents传1024;timeout这里没特殊要求,-1即可。因为我们监听的event_type是EPOLLIN,listener fd来新连接和其他fd有数据到来都会触发,所以我们要在循环中判断
while (true) {
epoll_event event[1024];
int n = epoll_wait(epfd, event, 1024, 0);
for (int i = 0; i < n; i++) {
if (event[i].data.fd == listener){
// 监听fd来事件了 要accept新连接 并把新连接加到epoll
}
else{
// 其他fd来事件 要读数据
}
}
}
接下来写accept连接,epoll给出了连接到来的event,但并没有告诉有几个连接,所以你需要一直accept到EAGAIN\EWOULDBLOCK,剩下的按上面的listener如法炮制就可以了; 记得设置非阻塞。
while (true) {
epoll_event event[1024];
int n = epoll_wait(epfd, event, 1024, -1);
for (int i = 0; i < n; i++) {
if (event[i].data.fd == listener) {
// 有连接需要accept,但是不知道具体来了几个连接
while (true) {
// man accept
int fd =
accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针
if (fd == -1) {
if (errno == EWOULDBLOCK || errno==EAGAIN) {
// 全部accept完了
break;
}
if (errno=EINTR){
// 再试
continue;
}
std::cerr << "Falied to accept new connect";
std::cerr << std::system_category().message(errno) << std::endl;
// 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了
break;
}
// 设置非阻塞
set_nonblocking(fd);
// 加到epoll中
epoll_event e; // man epoll_event
e.data.fd = fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e);
}
} else {
// 其他fd来事件 要读数据
ReadTask task{};
task.fd = event[i].data.fd;
task.event_type = event[i].events;
{
// 这也是一个技巧 即尽可能短的持有锁
// RAII的unique_lock在离开这个花括号作用域就会直接解锁
// 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧
std::unique_lock l(mutex);
queue.push(task);
}
cv.notify_one(); // 唤醒一个去读
// 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办?
}
}
}
而其他读的事件,我们要新建一个ReadTask放到队列中,这里我们补一个全局的队列和对应的锁。注意条件变量要是全局的,之前就看到小白犯这样的错误:每次要用的时候新建一个条件变量,这么做是局部的,不能做到全局约束读写,只有全局共享一个才能做到全局只有一个线程允许读写。
#include <mutex>
#include <condition_variable>
struct ReadTask {
int fd;
uint32_t event_type;
};
std::queue<ReadTask> queue;
std::mutex mutex;
std::condition_variable cv;
然后我们回去编写while中的else分支(数据到来),在这里我们要生成一个ReadTask,给队列上锁,把task放进去,解锁,然后唤醒一个worker线程去读
加锁解锁我们可以通过RAII的包装来解决,即unique_lock,唤醒一个线程就是cv.notify_one(),具体操作就是这样
else {
// 其他fd来事件 要读数据
ReadTask task{};
task.fd = event[i].data.fd;
task.event_type = event[i].events;
{
// 这也是一个技巧 即尽可能短的持有锁
// RAII的unique_lock在离开这个花括号作用域就会直接解锁
// 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧
std::unique_lock l(mutex);
queue.push(task);
}
cv.notify_one(); //唤醒一个
}
到这里我们的worker_main就写完了,完整函数如下
void worker_main() {
int listener = listenFD();
if (listener == -1) {
std::cerr << std::system_category().message(errno) << std::endl;
// listener == -1 不是有效 fd,不需要 close
return;
}
set_nonblocking(listener);
epfd = epoll_create1(0); // 返回值同样用man命令查询
if (epfd == -1) {
std::cerr << std::system_category().message(errno) << std::endl;
close(listener);
return;
}
// 把listener加到epoll ctl的方法
// 同样通过manpage查询到
// man epoll_ctl或者网络搜索manpage epoll_ctl
// 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行
// 一定不要偷懒,要学会自己查的方法
epoll_event e; // man epoll_event
e.data.fd = listener;
// type的写法在man epoll_ctl的“The available event types are:”中
e.events = EPOLLIN | EPOLLET; // 连接到来会发出EPOLLIN
epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e);
while (true) {
epoll_event event[1024];
int n = epoll_wait(epfd, event, 1024, -1);
for (int i = 0; i < n; i++) {
if (event[i].data.fd == listener) {
// 有连接需要accept,但是不知道具体来了几个连接
while (true) {
// man accept
int fd =
accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针
if (fd == -1) {
if (errno == EWOULDBLOCK || errno==EAGAIN) {
// 全部accept完了
break;
}
if (errno=EINTR){
// 再试
continue;
}
std::cerr << "Falied to accept new connect";
std::cerr << std::system_category().message(errno) << std::endl;
// 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了
break;
}
// 设置非阻塞
set_nonblocking(fd);
// 加到epoll中
epoll_event e; // man epoll_event
e.data.fd = fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e);
}
} else {
// 其他fd来事件 要读数据
ReadTask task{};
task.fd = event[i].data.fd;
task.event_type = event[i].events;
{
// 这也是一个技巧 即尽可能短的持有锁
// RAII的unique_lock在离开这个花括号作用域就会直接解锁
// 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧
std::unique_lock l(mutex);
queue.push(task);
}
cv.notify_one(); // 唤醒一个去读
// 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办?
}
}
}
}
worker
worker的工作就相对简单,先阻塞自己然后,等待被唤醒读数据就行了
while (true){
ReadTask task;
{
// 先获取锁
std::unique_lock l(mutex);
// 这里等价于while(queue.empty()) { 解锁并阻塞 }
cv.wait(l, []() -> bool { return !queue.empty(); });
// 运行到这里时线程已被阻塞且不再持有锁
// .. 等待中 ..
// 被唤醒
// 已获取锁
task= queue.front();
queue.pop();
// 出了作用域 释放锁
}
// 可以读数据了
}
接下来就可以读数据了,ET触发必须把所有数据都读完
// 读数据,ET触发必须保证读完
// c选手直接用char*数组也可以; 反正也不会有多大的开销,我习惯用vector
std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节)
while (true){
// 读到没数据才能停
// 恢复epoll对当前fd的监听
}
这里我们使用EAGAIN和EWOULDBLOCK作为没有数据的条件,内核在没有数据时会给出EAGAIN,而EWOULDBLOCK(再读会阻塞)也是内核在委婉地告诉你没数据了。
代码大概是这样
// 读数据,ET触发必须保证读完
std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节)
std::string str;
while (true) {
ssize_t n = recv(task.fd, buf.data(), 1024, 0);
if (n == 0) { // 对端关闭
// recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。
close(task.fd);
break;
} else if (n == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
// 读空了 现在没数据了,但以后可能还有,连接还在
// 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾
// 不指定的话会读取越界
// 需要恢复监听
epoll_event e{};
e.data.fd = task.fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e);
// 退出但不关闭连接
break;
} else {
// 其他错误 不读了直接撤
// 退出 把连接关了
std::cerr << std::system_category().message(errno) << std::endl;
close(task.fd);
break;
}
} else {
str.append(buf.data(), n);
continue;
}
}
因为之前我们给加入epoll的fd加了oneshot标志位,这会导致我们收到信号的时候,epoll对fd的监听已被禁用(防止再产生事件被其他线程处理),我们读完数据后必须重新调用epoll_ctl把监听再打开,代码如下
epoll_event e{};
e.data.fd = task.fd;
e.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e);
完整函数如下
void worker(int threadNum) {
std::cout << "Thread: " << threadNum << " started"; // 标识线程
while (true) {
ReadTask task;
{
// 先获取锁
std::unique_lock l(mutex);
// 这里等价于while(queue.empty()) { 解锁并阻塞 }
cv.wait(l, []() -> bool { return !queue.empty(); });
// 运行到这里时线程已被阻塞且不再持有锁
// .. 等待中 ..
// 被唤醒
// 已获取锁
task = queue.front();
queue.pop();
// 出了作用域 释放锁
}
// 读数据,ET触发必须保证读完
std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节)
std::string str;
while (true) {
ssize_t n = recv(task.fd, buf.data(), 1024, 0);
if (n == 0) { // 对端关闭
// recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。
close(task.fd);
break;
} else if (n == -1) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
// 读空了 现在没数据了,但以后可能还有,连接还在
// 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾
// 不指定的话会读取越界
// 需要恢复监听
epoll_event e{};
e.data.fd = task.fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e);
// 退出但不关闭连接
break;
} else {
// 其他错误 不读了直接撤
// 退出 把连接关了
std::cerr << std::system_category().message(errno) << std::endl;
close(task.fd);
break;
}
} else {
str.append(buf.data(), n);
continue;
}
}
std::cout << "Received: " << str << std::endl;
}
}
恭喜你,读到这里你已经完成了大部分的工作
接下来我们只需要在main函数里起几个线程就可以完成这个简单的程序了;
主程序
int main() {
// 先起读数据的线程,让他们自己阻塞自己然后等待
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++) {
auto func = std::bind(worker, i);
threads.push_back(std::thread(func));
}
// 再起主线程,来唤醒子线程读数据
std::thread mainThread(worker_main);
mainThread.join();
for (int i = 0; i < 10; i++) {
threads[i].join();
}
}
大功告成,完整程序如下
#include "common/common.h"
#include <sys/epoll.h>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <system_error>
#include <thread>
#include <vector>
int epfd = -1;
struct ReadTask
{
int fd;
uint32_t event_type;
};
std::queue<ReadTask> queue;
std::mutex mutex;
std::condition_variable cv;
void worker_main()
{
int listener = listenFD();
if (listener == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
// listener == -1 不是有效 fd,不需要 close
return;
}
set_nonblocking(listener);
epfd = epoll_create1(0); // 返回值同样用man命令查询
if (epfd == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
close(listener);
return;
}
// 把listener加到epoll ctl的方法
// 同样通过manpage查询到
// man epoll_ctl或者网络搜索manpage epoll_ctl
// 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行
// 一定不要偷懒,要学会自己查的方法
epoll_event e; // man epoll_event
e.data.fd = listener;
// type的写法在man epoll_ctl的“The available event types are:”中
e.events = EPOLLIN | EPOLLET; // 连接到来会发出EPOLLIN
epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e);
while (true)
{
epoll_event event[1024];
int n = epoll_wait(epfd, event, 1024, -1);
for (int i = 0; i < n; i++)
{
if (event[i].data.fd == listener)
{
// 有连接需要accept,但是不知道具体来了几个连接
while (true)
{
// man accept
int fd =
accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针
if (fd == -1)
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
// 全部accept完了
break;
}
if (errno = EINTR)
{
// 再试
continue;
}
std::cerr << "Falied to accept new connect";
std::cerr << std::system_category().message(errno) << std::endl;
// 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了
break;
}
// 设置非阻塞
set_nonblocking(fd);
// 加到epoll中
epoll_event e; // man epoll_event
e.data.fd = fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e);
}
}
else
{
// 其他fd来事件 要读数据
ReadTask task{};
task.fd = event[i].data.fd;
task.event_type = event[i].events;
{
// 这也是一个技巧 即尽可能短的持有锁
// RAII的unique_lock在离开这个花括号作用域就会直接解锁
// 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧
std::unique_lock l(mutex);
queue.push(task);
}
cv.notify_one(); // 唤醒一个去读
// 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办?
}
}
}
}
void worker(int threadNum)
{
std::cout << "Thread: " << threadNum << " started"; // 标识线程
while (true)
{
ReadTask task;
{
// 先获取锁
std::unique_lock l(mutex);
// 这里等价于while(queue.empty()) { 解锁并阻塞 }
cv.wait(l, []() -> bool
{ return !queue.empty(); });
// 运行到这里时线程已被阻塞且不再持有锁
// .. 等待中 ..
// 被唤醒
// 已获取锁
task = queue.front();
queue.pop();
// 出了作用域 释放锁
}
// 读数据,ET触发必须保证读完
std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节)
std::string str;
while (true)
{
ssize_t n = recv(task.fd, buf.data(), 1024, 0);
if (n == 0)
{ // 对端关闭
// recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。
close(task.fd);
break;
}
else if (n == -1)
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
// 读空了 现在没数据了,但以后可能还有,连接还在
// 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾
// 不指定的话会读取越界
// 需要恢复监听
epoll_event e{};
e.data.fd = task.fd;
e.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e);
// 退出但不关闭连接
break;
}
else
{
// 其他错误 不读了直接撤
// 退出 把连接关了
std::cerr << std::system_category().message(errno) << std::endl;
close(task.fd);
break;
}
}
else
{
str.append(buf.data(), n);
continue;
}
}
std::cout << "Worker: " << threadNum << "Received: " << str << std::endl;
}
}
int main()
{
// 先起读数据的线程,让他们自己阻塞自己然后等待
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++)
{
auto func = std::bind(worker, i);
threads.push_back(std::thread(func));
}
// 再起主线程,来唤醒子线程读数据
std::thread mainThread(worker_main);
mainThread.join();
for (int i = 0; i < 10; i++)
{
threads[i].join();
}
}
总结
这个程序其实还存在着很多问题,如setNoBlocking的返回值没检查,epoll_ctl的返回值没检查,当你和内核打交道时,必须谨慎处理内核返回值,因为内核可能吐出各种各样的错误,每个影响都很大。我们在这里用到了很多系统调用如accept()、recv()、epoll_ctl()、epoll_wait()、set_nonblocking()、close() ,严谨的编程应该检查每一个函数的返回值和错误。
Linux上运行
common.h
#include "sys/socket.h"
#include <netinet/in.h>
#include <arpa/inet.h>
#include "unistd.h"
#include <iostream>
#include <fcntl.h>
int listenFD();
// 这是一个非常经典的封装函数
int set_nonblocking(int fd);
common.cpp
#include "common.h"
int listenFD()
{
// 进入准备连接的状态
int listenFD = socket(AF_INET, SOCK_STREAM, 0);
if (listenFD == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
return -1;
}
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
int ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
if (ret == 0)
{
std::cerr << "src does not contain a character string\
representing a valid network address in the specified address family ";
return -1;
}
else if (ret == -1)
{
std::cerr << "af does not contain a valid address family";
return -1;
}
ret = bind(listenFD, (sockaddr *)&addr, sizeof(addr));
if (ret == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
close(listenFD);
return -1;
}
ret = listen(listenFD, 3);
if (ret == -1)
{
std::cerr << std::system_category().message(errno) << std::endl;
close(listenFD);
return -1;
}
return listenFD;
}
int set_nonblocking(int fd)
{
// 1. 获取原来的 file status flags
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
perror("fcntl F_GETFL");
return -1;
}
// 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志
flags |= O_NONBLOCK;
// 3. 将新的 flags 设置回去
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl F_SETFL");
return -1;
}
return 0;
}
我们这里使用一台Ubuntu24虚拟机,vscode使用ssh连接到这台机器,然后就可以开始调试了
命令
g++ -o server ./server2.cpp ./common/common.cpp -I .
g++ client.cpp -o client
然后两个终端一个起server 一个起client 就能看到server在跑数据了
2 个帖子 - 2 位参与者