陳錄生 2026-01-09 09:33 采纳率: 0%
浏览 4

epoll连接不稳定,请问什么原因?

有一个客户端(192.168.31.26)没有触发epoll事件,所以没有打印出来。

#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <string.h>
#include <unistd.h>
#include <thread>
#include <fcntl.h>

#include "sock.h"
#include "constant.h"
#include "util.h"
#include "ring_buffer.h"

using namespace std;

class Acq103{

    int epoll_fd;
    Sock sock;

    RingBuffer recv_ring_buffer;
    char recv_string[Constant::STRING_BUFFER_SIZE];

    thread main_thread;

    void setNonBlocking(int _client_socket_fd){
        int flags = fcntl(_client_socket_fd, F_GETFL, 0);
        fcntl(_client_socket_fd, F_SETFL, flags | O_NONBLOCK);
    }

    void addEvent(int _client_socket_fd){
        epoll_event event;
        event.data.fd = _client_socket_fd;        
        event.events = EPOLLIN | EPOLLOUT | EPOLLET;
        epoll_ctl(this->epoll_fd, EPOLL_CTL_ADD, _client_socket_fd, &event);

    }

    void handleNewConnection(){
        int client_socket_fd = sock.accept();
        if (client_socket_fd < 0){
            cout << "Accept error" << endl;
            return; 
        }
        cout <<  Util::get_ip_from_sock(client_socket_fd) << " is connected to the server at "<< Util::get_current_time() <<"." << endl;

        setNonBlocking(client_socket_fd);
        addEvent(client_socket_fd);       
    }

    void handleSend(int _client_socket_fd){

    }
    

    void handleReceive(int _client_socket_fd){ 
        Sock client_socket;
        client_socket.set_handle(_client_socket_fd);
        recv_ring_buffer.read_from_socket(client_socket, Constant::MAX_LENGTH);
        int m = recv_ring_buffer.read(recv_string, Constant::STRING_BUFFER_SIZE);
        recv_ring_buffer.clear();
    }

    
public:
    Acq103(int _port){
        
        sock.open(AF_INET, SOCK_STREAM, 0);
        sock.set_reuse_addr();
        sock.set_non_blocking();
        sock.bind(Addr(INADDR_ANY, _port));
        sock.listen();
        cout << "Server is listening on port " << _port << endl;

        epoll_fd = epoll_create1(0);

        addEvent(sock.get_handle());      
        
    }

    void run(){ 
        epoll_event events[Constant::MAX_EVENTS];

        while(true){
            int nfds = epoll_wait(epoll_fd, events, Constant::MAX_EVENTS, 200);
            
           for (int n = 0; n < nfds; n++) {   
                cout << "Event: " << events[n].data.fd << " " << events[n].events << endl;
                if (events[n].data.fd == sock.get_handle()) {
                    handleNewConnection();                    
                } else if (events[n].events & EPOLLIN) {
                    handleReceive(events[n].data.fd);
                } else if (events[n].events & EPOLLOUT) {
                    handleSend(events[n].data.fd);
                }
            }
        }
    }

    ~Acq103(){
        close(epoll_fd);
    }

    void start(){
        main_thread = thread(&Acq103::run, this);
    }

    void join(){
        main_thread.join();
    }


};



void thread_udp(void){
    /*
    * a thread sending UDP message to relay equipment
    */
    int socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
    if (socket_fd < 0){
        cout << "Create UDP Socket Error!" << endl;
        return;
    }

    uint8_t buf[41];
    buf[0]= 0xFF;
    for(int i = 1; i < 41; i++)
    buf[i] = 0x00;
    
    
    while(true) {
       
        const int opt = -1;
        int nb = 0;
        setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        nb = setsockopt(socket_fd, SOL_SOCKET, SO_BROADCAST, &opt, sizeof(opt));
        if(nb == -1){
            cout << "Set Broadcast Error!" << endl;
            return;
        }

        sockaddr_in relayAddr;
        memset(&relayAddr, 0, sizeof(relayAddr));

        relayAddr.sin_family = AF_INET;
        relayAddr.sin_port = htons(1032);
        //relayAddr.sin_addr.s_addr = inet_addr("192.168.31.26");
        inet_pton(AF_INET, "192.168.31.255", &relayAddr.sin_addr);
        
        int len = sizeof(relayAddr);


        ///send message to relay equipment
        int send_num = sendto(socket_fd, buf, 41, 0, (sockaddr *)&relayAddr, len);

        if(send_num < 0){
            cerr << "Send to equipment error" << endl;
            close(socket_fd);
            return;
        }
               
        sleep(5);
    }
    
}


int main(){
    thread t1(thread_udp);

    Acq103 acq103(1048);
    acq103.start();
    
    t1.join();
    acq103.join();

    delete &acq103;
    return 0;
}

输出:
Event: 3 1
192.168.31.127 is connected to the server at 2026-01-09 03:17:30.
Event: 6 4
Event: 3 1
192.168.31.27 is connected to the server at 2026-01-09 03:17:30.

有一个客户端(192.168.31.26)没有触发epoll事件,所以没有打印出来。

  • 写回答

3条回答 默认 最新

  • 檀越@新空间 2026-01-09 09:34
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    在使用 epoll 时,连接不稳定可能由多种原因引起。根据你提供的代码和描述,连接不稳定通常意味着 某些情况下无法正确监听或处理连接,导致客户端无法成功连接或服务器端无法响应。以下是详细的分析和解决方案。


    一、可能的原因

    1. 多线程环境下 epoll 的并发问题

    • 你的程序中使用了多线程(thread_udpAcq103::run()),但 epoll 是一个线程不安全的结构
    • 如果多个线程同时操作 epoll_fd,可能会导致不可预测的行为,例如:
      • 事件被遗漏
      • 文件描述符被错误地添加/删除
      • 程序崩溃

    重点: 在多线程环境中,必须确保所有对 epoll 的操作都在同一个线程中进行,或者通过同步机制(如互斥锁)保护。


    2. 缺失对连接状态的完整管理

    • 当客户端连接后,你只将它加入到 epoll 中一次,但没有考虑以下情况:
      • 客户端断开连接后,是否从 epoll 中移除?
      • 是否有超时机制?
      • 是否需要处理 EPOLLHUPEPOLLRDHUP 事件?

    重点: 每个客户端连接都应该维护其状态,并在连接关闭时从 epoll 中移除。


    3. 不正确的非阻塞设置

    • 你在 handleNewConnection() 中设置了 O_NONBLOCK,但没有检查是否成功。
    • 如果 fcntl 设置失败,可能导致后续的 epoll_wait 行为异常。

    重点: 检查 fcntl 返回值,确保非阻塞设置成功。


    4. 没有处理 EPOLLET 边缘触发模式的潜在问题

    • 你使用了 EPOLLET(边缘触发),这意味着事件只会被触发一次,需要在每次事件发生后立即处理
    • 如果处理不及时,可能导致事件丢失。

    重点: 使用 EPOLLET 时,必须确保每次事件都被及时处理,避免因未读取数据导致事件丢失。


    二、解决方案

    1. 避免多线程操作 epoll

    • epoll 的操作限制在一个线程内,比如将 Acq103::run() 放在主线程中运行。
    • 如果确实需要多线程,可以采用以下方式:
      • 使用队列传递事件给主处理线程
      • 用互斥锁保护对 epoll 的访问

    修改后的代码示例(简化版):

    // 主线程中启动 Acq103::run()
    int main() {
        thread t1(thread_udp);
    
        Acq103 acq103(1048);
        acq103.start(); // 这里应该直接调用 run() 而不是创建新线程
    
        t1.join();
        acq103.join(); // 等待 Acq103 执行完毕
    }
    

    2. 维护客户端连接状态

    • Acq103 类中增加一个 std::map<int, ClientInfo> 来记录每个客户端的信息。
    • 在连接关闭时,从 epoll 中移除该客户端。

    修改后的代码示例:

    class Acq103 {
        int epoll_fd;
        Sock sock;
        std::map<int, bool> client_fds; // 记录已加入 epoll 的客户端 FD
    
        void handleNewConnection() {
            int client_socket_fd = sock.accept();
            if (client_socket_fd < 0) {
                cout << "accept error" << endl;
                return;
            }
    
            // 设置非阻塞
            int flags = fcntl(client_socket_fd, F_GETFL, 0);
            if (fcntl(client_socket_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
                perror("fcntl");
                close(client_socket_fd);
                return;
            }
    
            // 加入 epoll
            epoll_event event;
            event.data.fd = client_socket_fd;
            event.events = EPOLLIN | EPOLLOUT | EPOLLET;
            if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket_fd, &event) == -1) {
                perror("epoll_ctl ADD");
                close(client_socket_fd);
                return;
            }
    
            client_fds[client_socket_fd] = true;
        }
    
        void handleReceive(int socket_fd) {
            // 处理接收逻辑
            char buffer[1024];
            int bytes_read = read(socket_fd, buffer, sizeof(buffer));
            if (bytes_read <= 0) {
                // 客户端断开连接
                epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, nullptr);
                client_fds.erase(socket_fd);
                close(socket_fd);
            } else {
                // 处理接收到的数据
            }
        }
    };
    

    3. 增加错误处理和日志输出

    • 在关键函数中增加 perrorcerr 输出,便于调试。
    • epoll_ctlepoll_wait 后检查返回值。

    修改后的代码示例:

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock.get_handle(), &event) == -1) {
        cerr << "Error adding socket to epoll: " << strerror(errno) << endl;
        close(epoll_fd);
        exit(1);
    }
    

    4. 使用边缘触发时确保及时处理

    • handleReceivehandleSend 中,确保每次事件都被处理完后再继续循环。

    修改后的代码示例:

    void handleReceive(int socket_fd) {
        char buffer[1024];
        while (true) {
            int bytes_read = read(socket_fd, buffer, sizeof(buffer));
            if (bytes_read <= 0) {
                break;
            }
            // 处理接收到的数据
        }
    }
    

    三、总结

    | 问题点 | 解决方案 | |--------|----------| | 多线程操作 epoll | 确保所有 epoll 操作在单一线程中执行,或使用互斥锁保护 | | 客户端连接状态未管理 | 使用 std::map 或类似结构跟踪连接状态,连接关闭时从 epoll 移除 | | 非阻塞设置未检查 | 添加 fcntl 错误检查 | | 边缘触发模式未处理 | 确保每次事件都被完全处理,避免事件丢失 | | 错误处理不足 | 增加 perrorcerr 日志输出 |


    如果你能提供更具体的错误信息(如日志或编译错误),我可以进一步帮你定位问题。

    评论

报告相同问题?

问题事件

  • 修改了问题 1月9日
  • 修改了问题 1月9日
  • 创建了问题 1月9日