0%

分析基于epoll的C++高性能webServer代码(三)

分析基于epoll的C++高性能webServer代码(三)

epoll用法参考

Socket类

  • Socket类基于C语言的tcp通信库实现,将C语言提供的Socket相关操作函数利用C++的面向对象进行了包装

  • 成员数据对象只有一个int serverfd_;

构造函数Socket(/* args */)

  • 创建socket对象
Socket::Socket(/* args */)
{
serverfd_ = socket(AF_INET, SOCK_STREAM, 0);
if(-1 == serverfd_)
{
perror("socket create fail!");
exit(-1);
}
std::cout << "server create socket" << serverfd_ << std::endl;
}

析构函数

  • 关闭socket文件描述符
Socket::~Socket()
{
close(serverfd_);
std::cout << "server close..." << std::endl;
}

SetReuseAddr()

  • 设置地址
void Socket::SetReuseAddr()
{
int on = 1;
setsockopt(serverfd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}

Setnonblocking()

  • 设置为非阻塞模式
void Socket::Setnonblocking()
{
int opts = fcntl(serverfd_, F_GETFL);
if (opts<0)
{
perror("fcntl(serverfd_,GETFL)");
exit(1);
}
if (fcntl(serverfd_, F_SETFL, opts | O_NONBLOCK) < 0)
{
perror("fcntl(serverfd_,SETFL,opts)");
exit(1);
}
std::cout << "server setnonblocking..." << std::endl;
}

BindAddress(int serverport)

  • 将主机对应的端口号与socket进行绑定
bool Socket::BindAddress(int serverport)
{
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//inet_addr(_ServerIP.c_str());
serveraddr.sin_port = htons(serverport);
int resval = bind(serverfd_, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
if (resval == -1)
{
close(serverfd_);
perror("error bind");
exit(1);
}
std::cout << "server bindaddress..." << std::endl;
return true;
}

Listen()

  • 就是调用C提供的Listen函数
bool Socket::Listen()
{
if (listen(serverfd_, 2048) < 0)
{
perror("error listen");
close(serverfd_);
exit(1);
}
std::cout << "server listenning..." << std::endl;
return true;
}

Accept(struct sockaddr_in &clientaddr)

  • 调用C提供的Accept()函数
  • 返回用户socket的文件描述符
int Socket::Accept(struct sockaddr_in &clientaddr)
{
socklen_t lengthofsockaddr = sizeof(clientaddr);
int clientfd = accept(serverfd_, (struct sockaddr*)&clientaddr, &lengthofsockaddr);
if (clientfd < 0)
{
//perror("error accept");
//if(errno == EAGAIN)
//return 0;
//std::cout << "error accept:there is no new connection accept..." << std::endl;
return clientfd;
}
//std::cout << "server accept,clientfd: " << clientfd << std::endl;
return clientfd;
}

Close()

  • 关闭对应的文件描述符
bool Socket::Close()
{
close(serverfd_);
std::cout << "server close..." << std::endl;
return true;
}

Epoll介绍

等待队列

  • 当进程A执行到创建socket的语句时,操作系统会创建一个由文件系统管理的socket对象(如下图)。这个socket对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该socket事件的进程

img

  • 当程序执行到recv时,操作系统会将进程A从工作队列移动到该socket的等待队列中(如下图)。由于工作队列只剩下了进程B和C,依据进程调度,cpu会轮流执行这两个进程的程序,不会执行进程A的程序。所以进程A被阻塞,不会往下执行代码,也不会占用cpu资源

img

  • ps:操作系统添加等待队列只是添加了对这个“等待中”进程的引用,以便在接收到数据时获取进程对象、将其唤醒,而非直接将进程管理纳入自己之下。上图为了方便说明,直接将进程挂到等待队列之下。
  • 当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。也由于socket的接收缓冲区已经有了数据,recv可以返回接收到的数据

内核接收网络数据全过程

  • 如下图所示,进程在recv阻塞期间,计算机收到了对端传送的数据(步骤①)。数据经由网卡传送到内存(步骤②),然后网卡通过中断信号通知cpu有数据到达,cpu执行中断程序(步骤③)。此处的中断程序主要有两项功能,先将网络数据写入到对应socket的接收缓冲区里面(步骤④),再唤醒进程A(步骤⑤),重新将进程A放入工作队列中。

img

  • 一个socket对应着一个端口号,而网络数据包中包含了ip和端口的信息,内核可以通过端口号找到对应的socket。当然,为了提高处理速度,操作系统会维护端口号到socket的索引结构,以快速读取。

同时监视多个socket的简单方法

  • 服务端需要管理多个客户端连接,而recv只能监视单个socket,这种矛盾下,人们开始寻找监视多个socket的方法。epoll的要义是高效的监视多个socket。从历史发展角度看,必然先出现一种不太高效的方法,人们再加以改进。只有先理解了不太高效的方法,才能够理解epoll的本质。
  • 假如能够预先传入一个socket列表,如果列表中的socket都没有数据,挂起进程,直到有一个socket收到数据,唤醒进程。这种方法很直接,也是select的设计思想。
  • 为方便理解,我们先复习select的用法。在如下的代码中,先准备一个数组(下面代码中的fds),让fds存放着所有需要监视的socket。然后调用select,如果fds中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒进程。用户可以遍历fds,通过FD_ISSET判断具体哪个socket收到数据,然后做出处理。
int s = socket(AF_INET, SOCK_STREAM, 0);  
bind(s, ...)
listen(s, ...)

int fds[] = 存放需要监听的socket

while(1){
int n = select(..., fds, ...)
for(int i=0; i < fds.count; i++){
if(FD_ISSET(fds[i], ...)){
//fds[i]的数据处理
}
}
}

select的流程

  • select的实现思路很直接。假如程序同时监视如下图的sock1、sock2和sock3三个socket,那么在调用select之后,操作系统把进程A分别加入这三个socket的等待队列中。

img

  • 当任何一个socket收到数据后,中断程序将唤起进程。下图展示了sock2接收到了数据的处理流程。

img

  • 所谓唤起进程,就是将进程从所有的等待队列中移除,加入到工作队列里面。如下图所示。

img

  • 经由这些步骤,当进程A被唤醒后,它知道至少有一个socket接收了数据。程序只需遍历一遍socket列表,就可以得到就绪的socket。

  • 这种简单方式行之有效,在几乎所有操作系统都有对应的实现。

select的缺点

  • 其一,每次调用select都需要将进程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fds列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。
  • 其二,进程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。

epoll的设计思路

  • epoll是在select出现N多年后才被发明的,是select和poll的增强版本。epoll通过以下一些措施来改进效率。
  • select低效的原因之一是将“维护等待队列”和“阻塞进程”两个步骤合二为一。如下图所示,每次调用select都需要这两步操作,然而大多数应用场景中,需要监视的socket相对固定,并不需要每次都修改。epoll将这两个操作分开,先用epoll_ctl维护等待队列,再调用epoll_wait阻塞进程。显而易见的,效率就能得到提升。

图 1

  • 为方便理解后续的内容,我们先复习下epoll的用法。如下的代码中,先用epoll_create创建一个epoll对象epfd,再通过epoll_ctl将需要监视的socket添加到epfd中,最后调用epoll_wait等待数据。
    int s = socket(AF_INET, SOCK_STREAM, 0);   
    bind(s, ...)
    listen(s, ...)

    int epfd = epoll_create(...);
    epoll_ctl(epfd, ...); //将所有需要监听的socket添加到epfd中

    while(1){
    int n = epoll_wait(...)
    for(接收到数据的socket){
    //处理
    }
    }
  • select低效的另一个原因在于程序不知道哪些socket收到数据,只能一个个遍历。如果内核维护一个“就绪列表”,引用收到数据的socket,就能避免遍历。如下图所示,计算机共有三个socket,收到数据的sock2和sock3被rdlist(就绪列表)所引用。当进程被唤醒后,只要获取rdlist的内容,就能够知道哪些socket收到数据。

图 2

epoll的原理和流程

  • 创建epoll对象
    • 如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。
    • 图 3
    • 创建一个代表该epoll的eventpoll对象是必须的,因为内核要维护“就绪列表”等数据,“就绪列表”可以作为eventpoll的成员。
  • 维护监视列表
    • 创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。
    • 图 4
    • 当socket收到数据后,中断程序会操作eventpoll对象,而不是直接操作进程。
  • 接收数据
    • 当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。
    • 图 5
    • eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。
    • 当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。

Channel类

  • 该类主要用于处理服务器用到的epoll的各种事件以及该做出反应
  • 主要内容包括设置各种回调函数的操作函数
  • void SetReadHandle(Callback cb), void SetWriteHandle(Callback cb), void SetErrorHandle(Callback cb), void SetCloseHandle(Callback cb)
  • 另一个是epoll事件的处理函数,负责检测epoll读、写、对方关闭或者连接错误等情况
    void Channel::HandleEvent()
    {
    if(events_ & EPOLLRDHUP)//对方异常关闭事件
    {
    std::cout << "Event EPOLLRDHUP" << std::endl;
    closehandler_();
    }
    else if(events_ & (EPOLLIN | EPOLLPRI))//读事件,对端有数据或者正常关闭
    {
    //std::cout << "Event EPOLLIN" << std::endl;
    readhandler_();
    }
    else if(events_ & EPOLLOUT)//写事件
    {
    std::cout << "Event EPOLLOUT" << std::endl;
    writehandler_();
    }
    else
    {
    std::cout << "Event error" << std::endl;
    errorhandler_();//连接错误
    }
    }

Poller

  • 这个类使用面向对象的方式封装了C传统提供的epoll相关的方法

    构造函数 Poller(/* args */)

  • 使用初始化列表初始化了一个内容为epoll_eventvector,其中epoll_event的结构体定义为

    struct epoll_event
    {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    } __EPOLL_PACKED;
  • 初始化列表创建了一个映射关系为<int, Channel*>的map对象

  • 然后调用epoll_create函数创建epoll对象, epoll_create(int)函数的参数含义是监听socket的数量

    poll(ChannelList &activechannellist)

  • 等待IO事件的函数

  • 开始使用epoll_wait函数等待IO事件

    int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)eventlist_.capacity(), timeout);
  • 然后就是从等待得到的事件列表中逐个遍历处理,包括从事件数组中取出数据、寻找数据对应的文件描述符,找到之后通过Channel对象的SetEvents设置事件类型,同时将将其加入activechannellist。如果没找到的话就显示未找到的提示

  • 同时还包括假如事件数组的大小不足的时候进行扩容的部分

    void Poller::poll(ChannelList &activechannellist)
    {
    int timeout = TIMEOUT;
    //std::cout << "epoll_wait..." << std::endl;(int)eventlist_.capacity()
    int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)eventlist_.capacity(), timeout);
    //int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)channelmap_.size()*0.7+1, timeout);
    if(nfds == -1)
    {
    printf("error code is:%d", errno);
    perror("epoll wait error");
    //exit(1);
    }
    //printf("event num:%d\n", nfds);
    //std::cout << "event num:" << nfds << "\n";// << std::endl;
    for(int i = 0; i < nfds; ++i)
    {
    int events = eventlist_[i].events;
    //int fd = eventlist_[i].data.fd;
    Channel *pchannel = (Channel*)eventlist_[i].data.ptr;
    int fd = pchannel->GetFd();
    if(channelmap_.find(fd) != channelmap_.end())
    {
    pchannel->SetEvents(events);
    activechannellist.push_back(pchannel);
    }
    else
    {
    std::cout << "not find channel!" << std::endl;
    }
    }
    if(nfds == (int)eventlist_.capacity())
    {
    std::cout << "resize:" << nfds << std::endl;
    eventlist_.resize(nfds * 2);
    }
    //eventlist_.clear();

    }

    AddChannel(Channel *pchannel)、RemoveChannel(Channel *pchannel)、UpdateChannel(Channel *pchannel)

  • 三个函数大体的代码逻辑相同,都是利用传入的Channel参数获取文件描述符fd,然后将前文提到的映射的map中相应的对象做出修改。然后使用epoll_ctl修改epoll对其的监视。

    //添加事件
    void Poller::AddChannel(Channel *pchannel)
    {
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    //data是联合体
    //ev.data.fd = fd;
    ev.data.ptr = pchannel;
    channelmap_[fd] = pchannel;

    if(epoll_ctl(pollfd_, EPOLL_CTL_ADD, fd, &ev) == -1)
    {
    perror("epoll add error");
    exit(1);
    }
    //std::cout << "addchannel!" << std::endl;
    }

    //删除事件
    void Poller::RemoveChannel(Channel *pchannel)
    {
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    ///ev.data.fd = fd;
    ev.data.ptr = pchannel;
    channelmap_.erase(fd);

    if(epoll_ctl(pollfd_, EPOLL_CTL_DEL, fd, &ev) == -1)
    {
    perror("epoll del error");
    exit(1);
    }
    //std::cout << "removechannel!" << std::endl;
    }

    //更新事件
    void Poller::UpdateChannel(Channel *pchannel)
    {
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    //ev.data.fd = fd;
    ev.data.ptr = pchannel;

    if(epoll_ctl(pollfd_, EPOLL_CTL_MOD, fd, &ev) == -1)
    {
    perror("epoll update error");
    exit(1);
    }
    //std::cout << "updatechannel!" << std::endl;
    }

    epoll结构体的定义

    typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
    } epoll_data_t;

    struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };
  • events的定义

事件宏 定义
EPOLLIN 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT 表示对应的文件描述符可以写
EPOLLPRI 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR 表示对应的文件描述符发生错误
EPOLLHUP 表示对应的文件描述符被挂断
EPOLLET 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

EventLoop

  • 这个类的主要作用是维护一个处理网络事件的循环,将Channelactivechannellist中取出,然后处理事件。

    构造函数 EventLoop(/* args */)

  • 构造vector<Functor> functorlist_,其中functorfunction<void()> Functor,同时还构造两个vector<Channel*> ChannelList的数组对象,再构造一个Poller对象,然后将自身的bool标记置为true。

    EventLoop::EventLoop(/* args */)
    : functorlist_(),
    channellist_(),
    activechannellist_(),
    poller_(),
    quit(true)
    {

    }

    循环函数 EventLoop::loop()

  • 主要作用是循环调用前面的epoll类的poll函数,然后从activechannellist中遍历处理每个的事件。然后清空activechannellist,并且执行函数列表里的函数完成任务。

    void EventLoop::loop()
    {
    quit = false;
    while(!quit)
    {
    poller_.poll(activechannellist_);
    //std::cout << "server HandleEvent" << std::endl;
    for(Channel *pchannel : activechannellist_)
    {
    pchannel->HandleEvent();//处理事件
    }
    activechannellist_.clear();
    ExecuteTask();
    }
    }

TcpServer

构造函数 TcpServer(EventLoop* loop, int port)

  • 利用初始化列表初始化SocketServersocket对象,初始化循环体loop对象,初始化Channel类型的serverchannel对象,初始化链接计数conncount对象
  • serversocket分别进行SetReuseAddr()BindAddress(port), Listen(), Setnonblocking()
  • serverchannel_初始化并且设置ReadHandleErrorHandle
    TcpServer::TcpServer(EventLoop* loop, int port)
    : serversocket_(),
    loop_(loop),
    serverchannel_(),
    conncount_(0)
    {
    //serversocket_.SetSocketOption();
    serversocket_.SetReuseAddr();
    serversocket_.BindAddress(port);
    serversocket_.Listen();
    serversocket_.Setnonblocking();

    serverchannel_.SetFd(serversocket_.fd());
    serverchannel_.SetReadHandle(std::bind(&TcpServer::OnNewConnection, this));
    serverchannel_.SetErrorHandle(std::bind(&TcpServer::OnConnectionError, this));

    }

    Start()函数

  • 设置channel的events,同时将channel添加到循环loop中
    void TcpServer::Start()
    {
    serverchannel_.SetEvents(EPOLLIN | EPOLLET);
    loop_->AddChannelToPoller(&serverchannel_);
    }

建立新连接的处理函数

  • 主要功能是接受客户端的连接请求,然后利用接收到的文件描述符创建新的控制链接的成员对象,注册业务函数并且完成调用。
    //新TCP连接处理,核心功能,业务功能注册,任务分发
    void TcpServer::OnNewConnection()
    {
    //循环调用accept,获取所有的建立好连接的客户端fd
    struct sockaddr_in clientaddr;
    int clientfd;
    while( (clientfd = serversocket_.Accept(clientaddr)) > 0)
    {
    //std::cout << "New client from IP:" << inet_ntoa(clientaddr.sin_addr)
    // << ":" << ntohs(clientaddr.sin_port) << std::endl;

    if(++conncount_ >= MAXCONNECTION)
    {
    close(clientfd);
    continue;
    }
    Setnonblocking(clientfd);

    //创建连接,注册业务函数
    TcpConnection *ptcpconnection = new TcpConnection(loop_, clientfd, clientaddr);
    ptcpconnection->SetMessaeCallback(messagecallback_);
    ptcpconnection->SetSendCompleteCallback(sendcompletecallback_);
    ptcpconnection->SetCloseCallback(closecallback_);
    ptcpconnection->SetErrorCallback(errorcallback_);
    ptcpconnection->SetConnectionCleanUp(std::bind(&TcpServer::RemoveConnection, this, ptcpconnection));
    tcpconnlist_[clientfd] = ptcpconnection;

    newconnectioncallback_(ptcpconnection);
    }
    }

RemoveConnection(TcpConnection *ptcpconnection)

  • 连接断开的操作
    //连接清理
    void TcpServer::RemoveConnection(TcpConnection *ptcpconnection)
    {
    --conncount_;
    //std::cout << "clean up connection, conncount is" << conncount_ << std::endl;
    tcpconnlist_.erase(ptcpconnection->fd());
    delete ptcpconnection;
    }

OnConnectionError()

void TcpServer::OnConnectionError()
{
std::cout << "UNKNOWN EVENT" << std::endl;
serversocket_.Close();
}

Setnonblocking(int fd)

void Setnonblocking(int fd)
{
int opts = fcntl(fd, F_GETFL);
if (opts < 0)
{
perror("fcntl(fd,GETFL)");
exit(1);
}
if (fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0)
{
perror("fcntl(fd,SETFL,opts)");
exit(1);
}
}