小白学Epoll网络编程2 实战 我们这里使用一台Ubuntu24虚拟机,vscode使用ssh连接到这台机器,然后就可以开始调试了 命令 g++ -o server ./server2.cpp ./common/common.cpp -I . g++ client.cpp -o client 经历了1和2后我们现在已经有了能够运行的代码,那么如何在linux上使用gdb调试他呢?不能一直用print然后看输出来调试吧, 我们要有单步 堆栈和变量!本节就是总结在linux上的调试方法(针对单文件 项目的我还没学) 我们使用g++在linux上进行调试,首先我们要先编译出一个debug版本,这里从基础说起 通用知识 如何编译一个程序 我们的目录结构是 ├── common │ ├── common.cpp │ └── common.h └── server.cpp 其中的结构是server.cpp include了common.h,common.cpp又是common.h的实现 g++ A.cpp B.cpp -o output -I . 这里涉及到一些编译原理的内容,因为编译器实际上并不会单独处理头文件(头文件不是编译单元),他只处理.cpp,头文件只是一种便利的声明,在被include到文件后会被展开,如 // header.h void functionA(); void functionB(); // impletion.cpp #include "header.h" void functionA(){ 实现 } void functionB(){ 实现 } // main.cpp #include "header.h" int main(){ } 实际上主main和impletion会被展开成 // impletion.cpp void functionA(); void functionB(); void functionA(){ 实现 } void functionB(){ 实现 } // main.cpp void functionA(); void functionB(); int main(){ } 如果你自己每个文件都手敲声明的话比较麻烦,所以c++允许你写到一个单独的文件里然后include他们。但是对编译器来说,在头文件被展开后就不单独看了;到了编译这一步头文件已经被展开到各个实际的文件里了。所以回到刚才的话题,我们要传给g++的参数是server.cpp和common.cpp,即头文件的实现文件和主文件。 -o表示你要输出的二进制的文件名,如output,-I表示搜索头文件的根目录,编译器在展开头文件的阶段会找include提到的文件名;我们的头文件放在当前文件夹的common下,就从当前文件夹开始搜索,写common也行,但是以后可能还有别的文件夹,直接从当前文件夹开始方便以后扩展。 如何编译出debug版本的程序 主要是两点 添加调试信息 -g 关闭大部分优化 -O0 g++ A.cpp B.cpp -o output -I . -g -O0 -O(大写)后面跟的数字是优化等级,可以是0 1 2 3 s等等,代表不优化/基础/通用/激进/体积 我们一般的release版本优化的等级就是-O2, debug版本自然要比2低,这里直接选0, 防止优化过度导致调试的时候断点乱跳/缺失信息 额外的警告 为了开启所有警告还要加上 -Wall 所有常见警告(不是所有警告 是所有常见) -Wextra 额外警告 -Wpedantic 迂腐的 严格的检查是否符合指定的c++标准,无扩展无非标准写法,有的话警告 g++ -std=c++17 A.cpp B.cpp -o output -I . -g -O0 -Wall -Wextra -Wpedantic 最终的命令变成了这样 kisaragi@ubuntu24:~/Documents/Epoll$ g++ -std=c++17 server2.cpp common/common.cpp -g -O0 -Wextra -Wall -Wpedantic -o testDebug 编译成功 我们得到了testDebug二进制文件 古法 直接使用GDB 打开你的终端,使用gdb 二进制文件 命令来调试 可以通过file 二进制文件 来查看二进制的信息 正常的应该是with debug_info, not stripped,表示有调试信息,符号未被剥离,这里我们自己编的,应该不会错 启动命令,正式进入到gdb的界面中 但是还没显示代码,按ctrl+x后按a键打开代码显示,效果如下,看起来就清晰多了 gdb里的操作都是通过在(gdb)后面打命令实现的,如next, step,break等等; gdb有很多命令,对于习惯windows调试的我们来说,首先要找出怎么打断点,怎么继续,怎么进入函数, 怎么查看堆栈 流程基础操作 F5继续-->continue F10下一行-->next F11进入-->step 我们尝试在main上打一个断点,打断点的命令是 break 函数名或者行号 ,我们直接berak main(),就在main函数上打了个断点, 代码旁边也会显示一个小小的标志,现在可以直接将程序运行起来, 命令是run,等待一会,程序就会断在main第一行了 B+旁边的>角标就是当前行的指示器,我们输入两个next,让他运行到190行,然后看看threads的内容,查看变量的内容是 也可以直接通过start来启动,他的作用和在main上打个断点然后run是一样的 查看变量 print 变量 可以看到目前还没有内容,这是对的,因为刚初始化,监视变量也很简单,使用watch 变量 就可以进行监视, watch threads ,当threads被改变时就会断住。 想要删除断点/监视也很简单,使用 info breakpoints 查看所有断点 这里可以看到有我们在main里打的,还有刚刚watch的变量,然后我们就可以通过disable 编号 或者 delete 编号来编辑了,如这里删除3号断点,使用delete 3,再查看就没有了 断点操作 break 行号/函数名() 打断点 info breakpoints 查看断点 delete 断点 现在我们随便进一个函数看看,比如threads.push_back()的push_back方法,先运行到这一行,然后输入step就可以进入了 进出函数 step 相当于f11进入 finish 相当于shift+11出函数 成功进入,不想看了就输入finish,就可以进入上一级,相当于shift+f11, 如果你已经step进入很深了,需要多几次才能出去。也可以直接在外面打断点然后continue,和我们在windows上使用都是一样的 调用堆栈 bt或backtrace 常用的就这些命令,玩够了,打quit直接退出 使用vscode 直接使用vscode的remote ssh插件连接到你的服务器,打开文件夹(关于这个插件的使用可以网上搜教程) 终端里tree一下(tree .),复制文件夹结构 ├── common │ ├── common.cpp │ └── common.h └── server.cpp 在你的源码目录下新建一个.vscode文件夹,里面新建个launch.json,然后打开ai: 我想要用gdb调试程序,需要/不需要编译,给我写一个vscode里的launch.json,操作系统是xxx, 文件夹结构如下: ├── common │ ├── common.cpp │ └── common.h └── server.cpp ai直接拿捏,这里因为我说需要编译, ai把tasks.json也补上了,都拷贝进来就好了 然后打开你的源代码,在vscode里按F5,选择使用g++或类似的选项(使用gdb?可能),ai的配置写的没错就可以顺利启动了 这时直接就可以看到堆栈,断点,监视,变量等信息,比gdb要现代化的多,但他的信息其实也是从gdb里取的,只是换了种展示形式;现在你可以像在windwos里一样调试程序了 本节完 3 个帖子 - 2 位参与者 阅读完整话题
前情提要: 小白学Epoll网络编程1 基础概念的理解 基础知识-Epoll 最近在学习Linux网络编程,目前进展到和epoll相关的部分了 ,我的学习路线是 第一阶段:能写阻塞式 socket 程序 目标:写出最普通的 TCP echo server / client。 第二阶段:理解 TCP 是“字节流”,不是“消息流” 目标:写一个带协议的服务,例如:4 字节长度 + body 当前阶段 :进入 non-blocking + epoll 写一个单线程并发 TCP server。 第四阶段:简单的 Reactor 网络库 按 muduo 的概念拆 实战 封装函数 我把创建一个listenFD放到了一个方法里,这样调起来比较方便 #include "sys/socket.h" #include <netinet/in.h> #include <arpa/inet.h> #include "unistd.h" #include <iostream> #include <fcntl.h> int listenFD(); // 设置fd为非阻塞 int set_nonblocking(int fd); 实现 #include "common.h" int listenFD() { // 进入准备连接的状态 int listenFD = socket(AF_INET, SOCK_STREAM, 0); if (listenFD == -1) { std::cerr << std::system_category().message(errno) << std::endl; return -1; } sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(8080); int ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); if (ret == 0) { std::cerr << "src does not contain a character string\ representing a valid network address in the specified address family "; return -1; } else if (ret == -1) { std::cerr << "af does not contain a valid address family"; return -1; } ret = bind(listenFD, (sockaddr *)&addr, sizeof(addr)); if (ret == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listenFD); return -1; } ret = listen(listenFD, 3); if (ret == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listenFD); return -1; } return listenFD; } int set_nonblocking(int fd) { // 1. 获取原来的 file status flags int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); return -1; } // 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志 flags |= O_NONBLOCK; // 3. 将新的 flags 设置回去 if (fcntl(fd, F_SETFL, flags) == -1) { perror("fcntl F_SETFL"); return -1; } return 0; } 先搭骨架 写好主线程的函数void worker_main()和void worker() worker_main 在上一部分里我们讲到,worker的任务是 等待到来的连接 accept他们 把读数据的ReadTask放到全局队列里 开一个监听fd 既然是“等待到来的连接”,肯定需要一个监听fd,所以在worker_main进入循环之前,我们要新建一个listen fd int epfd = -1; void worker(){ int listener = listenFD(); // 错误处理? } 这里设计到错误处理了,因为是socket()返回的结果,我们可以直接在linux中运行 man socket 或者网络搜索 manpage socket ,跳到其中的 return value 段落,对于其他函数的错误处理,我们都是如法炮制的。 这里可以看到错误时返回-1并设置errno(errno number),这里就直接判断-1并将errno转为str输出: iostream提供 std::error system_error 提供std::system_category().message(errno),将错误码转为str输出 int listener = listenFD(); if (listener == -1){ std::cout<< std::system_category().message(errno)<<std::endl; // listener == -1 不是有效 fd,不需要 close return; } 之后将listener加入到epoll中,记得设置非阻塞(原因上一篇说了) int epfd = -1; void worker() { int listener = listenFD(); if (listener == -1) { std::cout << std::system_category().message(errno) << std::endl; close(listener); return; } set_nonblocking(listener); epfd = epoll_create1(0); // 返回值同样用man命令查询 if (epfd == -1) { std::cout << std::system_category().message(errno) << std::endl; // listener == -1 不是有效 fd,不需要 close return; } // 把listener加到epoll ctl的方法 // 同样通过manpage查询到 // man epoll_ctl或者网络搜索manpage epoll_ctl // 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行 // 一定不要偷懒,要学会自己查的方法 epoll_event e; // man epoll_event e.data.fd = listener; // type的写法在man epoll_ctl的“The available event types are:”中 e.events = EPOLLIN; // 连接到来会发出EPOLLIN epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e); while (true) { /* code */ } } 真正的循环等待 在上一部分里我们讲到,worker的任务是 等待到来的连接 accept他们 把读数据的ReadTask放到全局队列里 这里就要真正编写循环等待逻辑了,从epoll中获取函数的事件是epoll_wait, 原型是 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 这里的epfd当然是上面得到的epoll的fd,events是一个数组,需要你提供,当事件到来时,内核会把event一个个拷到你这个数组里,maxevents表示你一次想要多少个,比如一次最多要1024个,就新建一个event[1024],然后maxevents传1024;timeout这里没特殊要求,-1即可。因为我们监听的event_type是EPOLLIN,listener fd来新连接和其他fd有数据到来都会触发,所以我们要在循环中判断 while (true) { epoll_event event[1024]; int n = epoll_wait(epfd, event, 1024, 0); for (int i = 0; i < n; i++) { if (event[i].data.fd == listener){ // 监听fd来事件了 要accept新连接 并把新连接加到epoll } else{ // 其他fd来事件 要读数据 } } } 接下来写accept连接,epoll给出了连接到来的event,但并没有告诉有几个连接,所以你需要一直accept到EAGAIN\EWOULDBLOCK,剩下的按上面的listener如法炮制就可以了; 记得设置非阻塞。 while (true) { epoll_event event[1024]; int n = epoll_wait(epfd, event, 1024, -1); for (int i = 0; i < n; i++) { if (event[i].data.fd == listener) { // 有连接需要accept,但是不知道具体来了几个连接 while (true) { // man accept int fd = accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针 if (fd == -1) { if (errno == EWOULDBLOCK || errno==EAGAIN) { // 全部accept完了 break; } if (errno=EINTR){ // 再试 continue; } std::cerr << "Falied to accept new connect"; std::cerr << std::system_category().message(errno) << std::endl; // 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了 break; } // 设置非阻塞 set_nonblocking(fd); // 加到epoll中 epoll_event e; // man epoll_event e.data.fd = fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e); } } else { // 其他fd来事件 要读数据 ReadTask task{}; task.fd = event[i].data.fd; task.event_type = event[i].events; { // 这也是一个技巧 即尽可能短的持有锁 // RAII的unique_lock在离开这个花括号作用域就会直接解锁 // 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧 std::unique_lock l(mutex); queue.push(task); } cv.notify_one(); // 唤醒一个去读 // 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办? } } } 而其他读的事件,我们要新建一个ReadTask放到队列中,这里我们补一个全局的队列和对应的锁。注意条件变量要是全局的,之前就看到小白犯这样的错误:每次要用的时候新建一个条件变量,这么做是局部的,不能做到全局约束读写,只有全局共享一个才能做到全局只有一个线程允许读写。 #include <mutex> #include <condition_variable> struct ReadTask { int fd; uint32_t event_type; }; std::queue<ReadTask> queue; std::mutex mutex; std::condition_variable cv; 然后我们回去编写while中的else分支(数据到来),在这里我们要生成一个ReadTask,给队列上锁,把task放进去,解锁,然后唤醒一个worker线程去读 加锁解锁我们可以通过RAII的包装来解决,即unique_lock,唤醒一个线程就是cv.notify_one(),具体操作就是这样 else { // 其他fd来事件 要读数据 ReadTask task{}; task.fd = event[i].data.fd; task.event_type = event[i].events; { // 这也是一个技巧 即尽可能短的持有锁 // RAII的unique_lock在离开这个花括号作用域就会直接解锁 // 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧 std::unique_lock l(mutex); queue.push(task); } cv.notify_one(); //唤醒一个 } 到这里我们的 worker_main 就写完了,完整函数如下 void worker_main() { int listener = listenFD(); if (listener == -1) { std::cerr << std::system_category().message(errno) << std::endl; // listener == -1 不是有效 fd,不需要 close return; } set_nonblocking(listener); epfd = epoll_create1(0); // 返回值同样用man命令查询 if (epfd == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listener); return; } // 把listener加到epoll ctl的方法 // 同样通过manpage查询到 // man epoll_ctl或者网络搜索manpage epoll_ctl // 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行 // 一定不要偷懒,要学会自己查的方法 epoll_event e; // man epoll_event e.data.fd = listener; // type的写法在man epoll_ctl的“The available event types are:”中 e.events = EPOLLIN | EPOLLET; // 连接到来会发出EPOLLIN epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e); while (true) { epoll_event event[1024]; int n = epoll_wait(epfd, event, 1024, -1); for (int i = 0; i < n; i++) { if (event[i].data.fd == listener) { // 有连接需要accept,但是不知道具体来了几个连接 while (true) { // man accept int fd = accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针 if (fd == -1) { if (errno == EWOULDBLOCK || errno==EAGAIN) { // 全部accept完了 break; } if (errno=EINTR){ // 再试 continue; } std::cerr << "Falied to accept new connect"; std::cerr << std::system_category().message(errno) << std::endl; // 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了 break; } // 设置非阻塞 set_nonblocking(fd); // 加到epoll中 epoll_event e; // man epoll_event e.data.fd = fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e); } } else { // 其他fd来事件 要读数据 ReadTask task{}; task.fd = event[i].data.fd; task.event_type = event[i].events; { // 这也是一个技巧 即尽可能短的持有锁 // RAII的unique_lock在离开这个花括号作用域就会直接解锁 // 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧 std::unique_lock l(mutex); queue.push(task); } cv.notify_one(); // 唤醒一个去读 // 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办? } } } } worker worker的工作就相对简单,先阻塞自己然后,等待被唤醒读数据就行了 while (true){ ReadTask task; { // 先获取锁 std::unique_lock l(mutex); // 这里等价于while(queue.empty()) { 解锁并阻塞 } cv.wait(l, []() -> bool { return !queue.empty(); }); // 运行到这里时线程已被阻塞且不再持有锁 // .. 等待中 .. // 被唤醒 // 已获取锁 task= queue.front(); queue.pop(); // 出了作用域 释放锁 } // 可以读数据了 } 接下来就可以读数据了,ET触发必须把所有数据都读完 // 读数据,ET触发必须保证读完 // c选手直接用char*数组也可以; 反正也不会有多大的开销,我习惯用vector std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节) while (true){ // 读到没数据才能停 // 恢复epoll对当前fd的监听 } 这里我们使用EAGAIN和EWOULDBLOCK作为没有数据的条件,内核在没有数据时会给出EAGAIN,而EWOULDBLOCK(再读会阻塞)也是内核在委婉地告诉你没数据了。 代码大概是这样 // 读数据,ET触发必须保证读完 std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节) std::string str; while (true) { ssize_t n = recv(task.fd, buf.data(), 1024, 0); if (n == 0) { // 对端关闭 // recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。 close(task.fd); break; } else if (n == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // 读空了 现在没数据了,但以后可能还有,连接还在 // 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾 // 不指定的话会读取越界 // 需要恢复监听 epoll_event e{}; e.data.fd = task.fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e); // 退出但不关闭连接 break; } else { // 其他错误 不读了直接撤 // 退出 把连接关了 std::cerr << std::system_category().message(errno) << std::endl; close(task.fd); break; } } else { str.append(buf.data(), n); continue; } } 因为之前我们给加入epoll的fd加了oneshot标志位,这会导致我们收到信号的时候,epoll对fd的监听已被禁用(防止再产生事件被其他线程处理),我们读完数据后必须重新调用epoll_ctl把监听再打开,代码如下 epoll_event e{}; e.data.fd = task.fd; e.events = EPOLLIN | EPOLLONESHOT; epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e); 完整函数如下 void worker(int threadNum) { std::cout << "Thread: " << threadNum << " started"; // 标识线程 while (true) { ReadTask task; { // 先获取锁 std::unique_lock l(mutex); // 这里等价于while(queue.empty()) { 解锁并阻塞 } cv.wait(l, []() -> bool { return !queue.empty(); }); // 运行到这里时线程已被阻塞且不再持有锁 // .. 等待中 .. // 被唤醒 // 已获取锁 task = queue.front(); queue.pop(); // 出了作用域 释放锁 } // 读数据,ET触发必须保证读完 std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节) std::string str; while (true) { ssize_t n = recv(task.fd, buf.data(), 1024, 0); if (n == 0) { // 对端关闭 // recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。 close(task.fd); break; } else if (n == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // 读空了 现在没数据了,但以后可能还有,连接还在 // 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾 // 不指定的话会读取越界 // 需要恢复监听 epoll_event e{}; e.data.fd = task.fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e); // 退出但不关闭连接 break; } else { // 其他错误 不读了直接撤 // 退出 把连接关了 std::cerr << std::system_category().message(errno) << std::endl; close(task.fd); break; } } else { str.append(buf.data(), n); continue; } } std::cout << "Received: " << str << std::endl; } } 恭喜你,读到这里你已经完成了大部分的工作 接下来我们只需要在main函数里起几个线程就可以完成这个简单的程序了; 主程序 int main() { // 先起读数据的线程,让他们自己阻塞自己然后等待 std::vector<std::thread> threads; for (int i = 0; i < 10; i++) { auto func = std::bind(worker, i); threads.push_back(std::thread(func)); } // 再起主线程,来唤醒子线程读数据 std::thread mainThread(worker_main); mainThread.join(); for (int i = 0; i < 10; i++) { threads[i].join(); } } 大功告成,完整程序如下 #include "common/common.h" #include <sys/epoll.h> #include <condition_variable> #include <functional> #include <iostream> #include <mutex> #include <queue> #include <system_error> #include <thread> #include <vector> int epfd = -1; struct ReadTask { int fd; uint32_t event_type; }; std::queue<ReadTask> queue; std::mutex mutex; std::condition_variable cv; void worker_main() { int listener = listenFD(); if (listener == -1) { std::cerr << std::system_category().message(errno) << std::endl; // listener == -1 不是有效 fd,不需要 close return; } set_nonblocking(listener); epfd = epoll_create1(0); // 返回值同样用man命令查询 if (epfd == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listener); return; } // 把listener加到epoll ctl的方法 // 同样通过manpage查询到 // man epoll_ctl或者网络搜索manpage epoll_ctl // 一共就三个重要的函数,epoll_create, epoll_ctl和epoll_wait 遇到挨个查就行 // 一定不要偷懒,要学会自己查的方法 epoll_event e; // man epoll_event e.data.fd = listener; // type的写法在man epoll_ctl的“The available event types are:”中 e.events = EPOLLIN | EPOLLET; // 连接到来会发出EPOLLIN epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &e); while (true) { epoll_event event[1024]; int n = epoll_wait(epfd, event, 1024, -1); for (int i = 0; i < n; i++) { if (event[i].data.fd == listener) { // 有连接需要accept,但是不知道具体来了几个连接 while (true) { // man accept int fd = accept(listener, nullptr, nullptr); // 这里暂时不获取peer的地址,传个空指针 if (fd == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // 全部accept完了 break; } if (errno = EINTR) { // 再试 continue; } std::cerr << "Falied to accept new connect"; std::cerr << std::system_category().message(errno) << std::endl; // 其实可以通过errno判断出失败原因,但这里就不分更细了,只要失败了就忽略了 break; } // 设置非阻塞 set_nonblocking(fd); // 加到epoll中 epoll_event e; // man epoll_event e.data.fd = fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // oneshot别掉了 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &e); } } else { // 其他fd来事件 要读数据 ReadTask task{}; task.fd = event[i].data.fd; task.event_type = event[i].events; { // 这也是一个技巧 即尽可能短的持有锁 // RAII的unique_lock在离开这个花括号作用域就会直接解锁 // 确保不会长时间持有,在有锁编程里你会经常看到用花括号控制锁的技巧 std::unique_lock l(mutex); queue.push(task); } cv.notify_one(); // 唤醒一个去读 // 思考:如果资源紧张,没有可用的线程让你去唤醒了怎么办? } } } } void worker(int threadNum) { std::cout << "Thread: " << threadNum << " started"; // 标识线程 while (true) { ReadTask task; { // 先获取锁 std::unique_lock l(mutex); // 这里等价于while(queue.empty()) { 解锁并阻塞 } cv.wait(l, []() -> bool { return !queue.empty(); }); // 运行到这里时线程已被阻塞且不再持有锁 // .. 等待中 .. // 被唤醒 // 已获取锁 task = queue.front(); queue.pop(); // 出了作用域 释放锁 } // 读数据,ET触发必须保证读完 std::vector<char> buf(1024); // 这是多大的缓冲区?(1kb 即1024字节) std::string str; while (true) { ssize_t n = recv(task.fd, buf.data(), 1024, 0); if (n == 0) { // 对端关闭 // recv 返回 0 没数据了,而且以后也不会再有了,因为对方关闭了连接。 close(task.fd); break; } else if (n == -1) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // 读空了 现在没数据了,但以后可能还有,连接还在 // 这里必须加n,否则他会一直读到\n结尾,显然我们这里没有\n结尾 // 不指定的话会读取越界 // 需要恢复监听 epoll_event e{}; e.data.fd = task.fd; e.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epfd, EPOLL_CTL_MOD, task.fd, &e); // 退出但不关闭连接 break; } else { // 其他错误 不读了直接撤 // 退出 把连接关了 std::cerr << std::system_category().message(errno) << std::endl; close(task.fd); break; } } else { str.append(buf.data(), n); continue; } } std::cout << "Worker: " << threadNum << "Received: " << str << std::endl; } } int main() { // 先起读数据的线程,让他们自己阻塞自己然后等待 std::vector<std::thread> threads; for (int i = 0; i < 10; i++) { auto func = std::bind(worker, i); threads.push_back(std::thread(func)); } // 再起主线程,来唤醒子线程读数据 std::thread mainThread(worker_main); mainThread.join(); for (int i = 0; i < 10; i++) { threads[i].join(); } } 总结 这个程序其实还存在着很多问题,如setNoBlocking的返回值没检查,epoll_ctl的返回值没检查,当你和内核打交道时,必须谨慎处理内核返回值,因为内核可能吐出各种各样的错误,每个影响都很大。我们在这里用到了很多系统调用如 accept() 、 recv() 、 epoll_ctl() 、 epoll_wait() 、 set_nonblocking() 、 close() ,严谨的编程应该检查每一个函数的返回值和错误。 Linux上运行 common.h #include "sys/socket.h" #include <netinet/in.h> #include <arpa/inet.h> #include "unistd.h" #include <iostream> #include <fcntl.h> int listenFD(); // 这是一个非常经典的封装函数 int set_nonblocking(int fd); common.cpp #include "common.h" int listenFD() { // 进入准备连接的状态 int listenFD = socket(AF_INET, SOCK_STREAM, 0); if (listenFD == -1) { std::cerr << std::system_category().message(errno) << std::endl; return -1; } sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(8080); int ret = inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); if (ret == 0) { std::cerr << "src does not contain a character string\ representing a valid network address in the specified address family "; return -1; } else if (ret == -1) { std::cerr << "af does not contain a valid address family"; return -1; } ret = bind(listenFD, (sockaddr *)&addr, sizeof(addr)); if (ret == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listenFD); return -1; } ret = listen(listenFD, 3); if (ret == -1) { std::cerr << std::system_category().message(errno) << std::endl; close(listenFD); return -1; } return listenFD; } int set_nonblocking(int fd) { // 1. 获取原来的 file status flags int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); return -1; } // 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志 flags |= O_NONBLOCK; // 3. 将新的 flags 设置回去 if (fcntl(fd, F_SETFL, flags) == -1) { perror("fcntl F_SETFL"); return -1; } return 0; } 我们这里使用一台Ubuntu24虚拟机,vscode使用ssh连接到这台机器,然后就可以开始调试了 命令 g++ -o server ./server2.cpp ./common/common.cpp -I . g++ client.cpp -o client 然后两个终端一个起server 一个起client 就能看到server在跑数据了 2 个帖子 - 2 位参与者 阅读完整话题
基础知识-Epoll 最近在学习Linux网络编程,目前进展到和epoll相关的部分了 ,我的学习路线是 第一阶段:能写阻塞式 socket 程序 目标:写出最普通的 TCP echo server / client。 第二阶段:理解 TCP 是“字节流”,不是“消息流” 目标:写一个带协议的服务,例如:4 字节长度 + body 当前阶段 :进入 non-blocking + epoll 写一个单线程并发 TCP server。 第四阶段:简单的 Reactor 网络库 按 muduo 的概念拆 按照之前的计划,用输出倒逼自己深入学习,目前的打算学一点就写一点 编程的部分已经写好了,不过我不懂linux下的调试,还要学习一段时间 正好做个拆分吧 其实感觉每一部分都比较长了 应该没人看完吧 Epoll是什么 Epoll是linux中的一种通知机制,即让内核通知你某些文件描述符上你感兴趣的事件,如你对几个fd上的读写事件感兴趣,你就可以用epoll_create新建一个epoll,用epoll_ctl将自己感兴趣的fd和想监听的事件类型传进去,同时给一个epoll_event类型的数组,数组的大小就是你让内核一次最多通知你的数量。linux内核会帮你监控这组fd上的io事件,如果你感兴趣的事件到来,把数据填到这个epoll_event数组里。 #include <sys/epoll.h> struct epoll_event { uint32_t events; // 事件类型 epoll_data_t data; // 用户数据,常用 data.fd 存 fd }; epoll_create1(int flag); // flag一般传0 epoll_ctl(int epoll_fd, int op, int fd, epoll_event* event); // event里写你感兴趣的事件 int epoll_wait( int epfd, struct epoll_event *events, //返回的事件 int maxevents, int timeout ); // 会阻塞 等待事件到来 这里epoll_ctl的event是你告诉内核,你想要监控什么fd上的什么事件,而epoll_wait在等待到事件后会返给你一个event,里面有fd和实际发生的事件。 非阻塞 IO有几种模型,其中两种就是阻塞式和非阻塞式,他们的区别就一句话: 当你尝试读数据而数据未就绪的时候,是否会立刻返回 在尝试使用recv从一个fd上读数据,而这个fd上没有数据到来时: 阻塞IO: 阻塞在这等待数据到来 非阻塞IO:马上返回 显然想要最大利用资源,非阻塞IO是必须用的。记得不要只设置被accept的fd为非阻塞,监听fd自身也要设置为非阻塞,否则如果队列里没有要accept的连接而你调了accept,listener fd就会被阻塞,直接前功尽弃; 设置一个fd为非阻塞的方法是使用 int fcntl(int fd, int op, ...); 中的 F_GETFL 和 F_SETFL , 先获取到flag然后|上```O_NONBLOCK ``,一般封装成一个bool setNoBlock(int fd)函数 int set_nonblocking(int fd) { // 1. 获取原来的 file status flags int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); return -1; } // 2. 在原有的 flags 基础上,按位或(|)加上 O_NONBLOCK 标志 flags |= O_NONBLOCK; // 3. 将新的 flags 设置回去 if (fcntl(fd, F_SETFL, flags) == -1) { perror("fcntl F_SETFL"); return -1; } return 0; } Epoll的水平触发和边沿触发 假设你关注的是数据到来的事件 水平触发LT:只要你没读完,缓冲区还有剩数据,就会一直收到event的通知 边沿触发ET:只有数据到来时会发一下,即无数据->有数据会触发,后面不发,你没读完也不发,所以你必须保证在一次ET中读完所有数据 Epoll+非阻塞 单线程模型 只有一个主线程,同时负责accept到来的连接和接收已经连接上的fd发出的事件。如果事件很多会导致主线程忙,不能即时accept到来的连接。 // 主线程 void work_main(){ ... epoll_event events[MAX_EVENTS]; // event是接受到来的事件的数组 int n = epoll_wait(epfd, events, MAX_EVENTS, timeout_ms); for (int i = 0; i < n; ++i) { int fd = events[i].data.fd; uint32_t ev = events[i].events; if(fd == listenerFD){ // 是监听FD,需要accept连接 } else{ // 数据来了 需要读数据 } } } Epoll+非阻塞+线程池 开一个主线程和线程池,主线程只负责accept到来的连接,然后将fd添加到epoll。当事件到来时,就唤醒线程池的一个线程去读(使用条件变量机制),大大提升效率。 重要处理,边缘/水平触发都要小心一个fd被多个worker处理的情况 水平触发就不用讲了,只要A一次没读完,他就会继续给出event,就可能被错误地分给B线程读。但出乎一些人的意料,边缘触发也会产生这种情况: fd=10 第一次可读 epoll_wait 返回 fd=10 worker A 正在处理 fd=10 这时第二批数据又到来 epoll 可能再次返回 fd=10 主线程又投递给 worker B 所以我们要使用EPOLLONESHOT阻止这种情况(具体代码放下面了),给epoll里的fd设置这个flag后,当epoll产生了数据到来的事件后会直接禁用epoll对这个fd的监听,但fd没被禁用,数据依然可以到来并进入内核缓冲区,不过epoll对他不再感兴趣,需要你重新将fd加入到epoll监听;这样就可以避免多次通知导致几个线程同时操作一个fd。 t1: fd=10 注册 EPOLLIN | EPOLLET | EPOLLONESHOT t2: 第一批数据到来 t3: epoll_wait 返回 fd=10 t4: fd=10 在 epoll 中被自动 disabled t5: 主线程把 fd=10 投递给 worker A t6: worker A 正在读 / 解析 t7: 第二批数据又到来 t8: 不会再因为 fd=10 产生新的 epoll 通知给主线程 简而言之,最好使用EPOLLONESHOT,worker 持有 fd 期间要一直读到 EAGAIN 或者 EWOULDBLOCK (内核另一种委婉地告诉你没数据了的方法,即: 你再读会阻塞),再重启监听。 重启后如果fd就绪可读且还有没读完的数据,就又会收到一个event,然后周而复始。也有一种可能是新来的数据已经被第一个worker读完了,虽然他不知道新来数据了,但他的目的是读到EAGAIN,可能碰巧完成了任务。 被禁用后启用监听的方法,其实和添加监听一模一样: void add_fd(int epfd, int fd) { epoll_event ev{}; ev.data.fd = fd; ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // 被禁用后重新启动就改成EPOLL_CTL_MOD,其他一样 epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); } 基础知识-多线程 毕竟我们要使用多线程开发,一些多线程知识还是必不可少 Linux下现代C++开启一个线程的方法 #include <thread> void function(){ while(true) print("Hello world"); } std::thread t = thread(&function); // 此时已经在运行function,也就是在疯狂打印hello world t.join() //等待线程执行完成,必须调用join(某些情况下detach),否则thread在析构时会直接terminate导致程序退出 // 但是因为function里写的是死循环,所以其实永远也等不到t主动关闭 // 这么写是为了规范 经典的生产者消费者模型 使用epoll+多线程我们用的是生产者-消费者模型 我们会有1个主线程,专门负责accept到来的连接和接收数据到来的事件,注意他只是接收事件,他自己不读,它让其他线程读 很多的worker线程,专门负责读数据,不自己处理连接和数据到来事件 那么如何让这些线程联动呢?答案就是生产者-消费者模型中的队列;我们会放个全局的std::queue队列,然后让全部的worker线程都去等待这个队列,他们会全部阻塞;然后再启主线程,不断地将到需要读数据的fd的相关信息放到queue中,放一个唤醒一个worker线程,worker线程会自己取走queue中的信息,然后自己开始读; 这种设计能大大提高效率, 锁的使用 我们用锁的方式来保护queue队列,不管是主线程还是子线程,操作queue时必须先获得锁,为了方便,我们直接使用 std::unique_lock 来获得锁,他是一个RAII的锁,也就是初始化时上锁,析构时解锁 void func(){ { std::unique_lock(mutex); //已上锁 queue.pop(); } // unique_lock析构了,此时已解锁 } 那么初始化时如何让所有的worker线程都因等待queue而阻塞呢?这样我们才能在数据到来时一个个唤醒。答案是使用 std::conditional_variable 条件变量,使用方式是这样的 // 全局的 std::mutex mutex; std::condition_variable cv; std::queue<event> queue; void workder(){ // 假设这是子线程的锁 std::unique_lock l(mutex); // 条件变量必须和锁配合使用,且必须是已经上锁的变量 cv.wait(l, [](){ return !event.isEmpty();}); // !event.isEmpty()是一个pred谓词,这里等价于 // while(!pred){ 继续等待 } 即如果队列为空就等待,而wait会解锁传给他的lock,等他被唤醒时再尝试重新上锁 // 导致的结果是,所有worker经历了:获取到了锁,再主动解锁然后阻塞 // 然后我们就可以启主线程,获取锁,往队列里放东西,解锁,唤醒一个线程让他去获取锁并拿走数据 } 参考资料 epoll(7) - Linux manual page accept(2) - Linux manual page fcntl(2) - Linux manual page https://en.cppreference.com/w/cpp/thread/condition_variable/wait https://en.cppreference.com/w/cpp/thread/thread/~thread https://en.cppreference.com/w/cpp/thread/unique_lock 8 个帖子 - 6 位参与者 阅读完整话题