分析基于epoll的C++高性能webServer代码(三)
Socket类
Socket类基于C语言的tcp通信库实现,将C语言提供的Socket相关操作函数利用C++的面向对象进行了包装
成员数据对象只有一个
int serverfd_;
构造函数Socket(/* args */)
- 创建
socket对象
Socket::Socket(/* args */) |
析构函数
- 关闭
socket文件描述符
Socket::~Socket() |
SetReuseAddr()
- 设置地址
void Socket::SetReuseAddr() |
Setnonblocking()
- 设置为非阻塞模式
void Socket::Setnonblocking() |
BindAddress(int serverport)
- 将主机对应的端口号与
socket进行绑定
bool Socket::BindAddress(int serverport) |
Listen()
- 就是调用C提供的
Listen函数
bool Socket::Listen() |
Accept(struct sockaddr_in &clientaddr)
- 调用C提供的
Accept()函数 - 返回用户
socket的文件描述符
int Socket::Accept(struct sockaddr_in &clientaddr) |
Close()
- 关闭对应的文件描述符
bool Socket::Close() |
Epoll介绍
等待队列
- 当进程A执行到创建socket的语句时,操作系统会创建一个由文件系统管理的socket对象(如下图)。这个socket对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该socket事件的进程。

- 当程序执行到recv时,操作系统会将进程A从工作队列移动到该socket的等待队列中(如下图)。由于工作队列只剩下了进程B和C,依据进程调度,cpu会轮流执行这两个进程的程序,不会执行进程A的程序。所以进程A被阻塞,不会往下执行代码,也不会占用cpu资源。

- ps:操作系统添加等待队列只是添加了对这个“等待中”进程的引用,以便在接收到数据时获取进程对象、将其唤醒,而非直接将进程管理纳入自己之下。上图为了方便说明,直接将进程挂到等待队列之下。
- 当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。也由于socket的接收缓冲区已经有了数据,recv可以返回接收到的数据。
内核接收网络数据全过程
- 如下图所示,进程在recv阻塞期间,计算机收到了对端传送的数据(步骤①)。数据经由网卡传送到内存(步骤②),然后网卡通过中断信号通知cpu有数据到达,cpu执行中断程序(步骤③)。此处的中断程序主要有两项功能,先将网络数据写入到对应socket的接收缓冲区里面(步骤④),再唤醒进程A(步骤⑤),重新将进程A放入工作队列中。

- 一个socket对应着一个端口号,而网络数据包中包含了ip和端口的信息,内核可以通过端口号找到对应的socket。当然,为了提高处理速度,操作系统会维护端口号到socket的索引结构,以快速读取。
同时监视多个socket的简单方法
- 服务端需要管理多个客户端连接,而recv只能监视单个socket,这种矛盾下,人们开始寻找监视多个socket的方法。epoll的要义是高效的监视多个socket。从历史发展角度看,必然先出现一种不太高效的方法,人们再加以改进。只有先理解了不太高效的方法,才能够理解epoll的本质。
- 假如能够预先传入一个socket列表,如果列表中的socket都没有数据,挂起进程,直到有一个socket收到数据,唤醒进程。这种方法很直接,也是select的设计思想。
- 为方便理解,我们先复习select的用法。在如下的代码中,先准备一个数组(下面代码中的fds),让fds存放着所有需要监视的socket。然后调用select,如果fds中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒进程。用户可以遍历fds,通过FD_ISSET判断具体哪个socket收到数据,然后做出处理。
int s = socket(AF_INET, SOCK_STREAM, 0); |
select的流程
- select的实现思路很直接。假如程序同时监视如下图的sock1、sock2和sock3三个socket,那么在调用select之后,操作系统把进程A分别加入这三个socket的等待队列中。

- 当任何一个socket收到数据后,中断程序将唤起进程。下图展示了sock2接收到了数据的处理流程。

- 所谓唤起进程,就是将进程从所有的等待队列中移除,加入到工作队列里面。如下图所示。

经由这些步骤,当进程A被唤醒后,它知道至少有一个socket接收了数据。程序只需遍历一遍socket列表,就可以得到就绪的socket。
这种简单方式行之有效,在几乎所有操作系统都有对应的实现。
select的缺点
- 其一,每次调用select都需要将进程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fds列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。
- 其二,进程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。
epoll的设计思路
- epoll是在select出现N多年后才被发明的,是select和poll的增强版本。epoll通过以下一些措施来改进效率。
- select低效的原因之一是将“维护等待队列”和“阻塞进程”两个步骤合二为一。如下图所示,每次调用select都需要这两步操作,然而大多数应用场景中,需要监视的socket相对固定,并不需要每次都修改。epoll将这两个操作分开,先用epoll_ctl维护等待队列,再调用epoll_wait阻塞进程。显而易见的,效率就能得到提升。
- 为方便理解后续的内容,我们先复习下epoll的用法。如下的代码中,先用epoll_create创建一个epoll对象epfd,再通过epoll_ctl将需要监视的socket添加到epfd中,最后调用epoll_wait等待数据。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //将所有需要监听的socket添加到epfd中
while(1){
int n = epoll_wait(...)
for(接收到数据的socket){
//处理
}
} - select低效的另一个原因在于程序不知道哪些socket收到数据,只能一个个遍历。如果内核维护一个“就绪列表”,引用收到数据的socket,就能避免遍历。如下图所示,计算机共有三个socket,收到数据的sock2和sock3被rdlist(就绪列表)所引用。当进程被唤醒后,只要获取rdlist的内容,就能够知道哪些socket收到数据。
epoll的原理和流程
- 创建epoll对象
- 如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。
- 创建一个代表该epoll的eventpoll对象是必须的,因为内核要维护“就绪列表”等数据,“就绪列表”可以作为eventpoll的成员。
- 维护监视列表
- 创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。
- 当socket收到数据后,中断程序会操作eventpoll对象,而不是直接操作进程。
- 接收数据
- 当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。
- eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。
- 当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。
Channel类
- 该类主要用于处理服务器用到的
epoll的各种事件以及该做出反应 - 主要内容包括设置各种回调函数的操作函数
void SetReadHandle(Callback cb),void SetWriteHandle(Callback cb),void SetErrorHandle(Callback cb),void SetCloseHandle(Callback cb)等- 另一个是
epoll事件的处理函数,负责检测epoll的读、写、对方关闭或者连接错误等情况void Channel::HandleEvent()
{
if(events_ & EPOLLRDHUP)//对方异常关闭事件
{
std::cout << "Event EPOLLRDHUP" << std::endl;
closehandler_();
}
else if(events_ & (EPOLLIN | EPOLLPRI))//读事件,对端有数据或者正常关闭
{
//std::cout << "Event EPOLLIN" << std::endl;
readhandler_();
}
else if(events_ & EPOLLOUT)//写事件
{
std::cout << "Event EPOLLOUT" << std::endl;
writehandler_();
}
else
{
std::cout << "Event error" << std::endl;
errorhandler_();//连接错误
}
}
Poller
这个类使用面向对象的方式封装了C传统提供的
epoll相关的方法构造函数 Poller(/* args */)
使用初始化列表初始化了一个内容为
epoll_event的vector,其中epoll_event的结构体定义为struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;初始化列表创建了一个映射关系为
<int, Channel*>的map对象然后调用
epoll_create函数创建epoll对象,epoll_create(int)函数的参数含义是监听socket的数量poll(ChannelList &activechannellist)
等待IO事件的函数
开始使用
epoll_wait函数等待IO事件int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)eventlist_.capacity(), timeout);
然后就是从等待得到的事件列表中逐个遍历处理,包括从事件数组中取出数据、寻找数据对应的文件描述符,找到之后通过
Channel对象的SetEvents设置事件类型,同时将将其加入activechannellist。如果没找到的话就显示未找到的提示同时还包括假如事件数组的大小不足的时候进行扩容的部分
void Poller::poll(ChannelList &activechannellist)
{
int timeout = TIMEOUT;
//std::cout << "epoll_wait..." << std::endl;(int)eventlist_.capacity()
int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)eventlist_.capacity(), timeout);
//int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)channelmap_.size()*0.7+1, timeout);
if(nfds == -1)
{
printf("error code is:%d", errno);
perror("epoll wait error");
//exit(1);
}
//printf("event num:%d\n", nfds);
//std::cout << "event num:" << nfds << "\n";// << std::endl;
for(int i = 0; i < nfds; ++i)
{
int events = eventlist_[i].events;
//int fd = eventlist_[i].data.fd;
Channel *pchannel = (Channel*)eventlist_[i].data.ptr;
int fd = pchannel->GetFd();
if(channelmap_.find(fd) != channelmap_.end())
{
pchannel->SetEvents(events);
activechannellist.push_back(pchannel);
}
else
{
std::cout << "not find channel!" << std::endl;
}
}
if(nfds == (int)eventlist_.capacity())
{
std::cout << "resize:" << nfds << std::endl;
eventlist_.resize(nfds * 2);
}
//eventlist_.clear();
}AddChannel(Channel *pchannel)、RemoveChannel(Channel *pchannel)、UpdateChannel(Channel *pchannel)
三个函数大体的代码逻辑相同,都是利用传入的
Channel参数获取文件描述符fd,然后将前文提到的映射的map中相应的对象做出修改。然后使用epoll_ctl修改epoll对其的监视。//添加事件
void Poller::AddChannel(Channel *pchannel)
{
int fd = pchannel->GetFd();
struct epoll_event ev;
ev.events = pchannel->GetEvents();
//data是联合体
//ev.data.fd = fd;
ev.data.ptr = pchannel;
channelmap_[fd] = pchannel;
if(epoll_ctl(pollfd_, EPOLL_CTL_ADD, fd, &ev) == -1)
{
perror("epoll add error");
exit(1);
}
//std::cout << "addchannel!" << std::endl;
}
//删除事件
void Poller::RemoveChannel(Channel *pchannel)
{
int fd = pchannel->GetFd();
struct epoll_event ev;
ev.events = pchannel->GetEvents();
///ev.data.fd = fd;
ev.data.ptr = pchannel;
channelmap_.erase(fd);
if(epoll_ctl(pollfd_, EPOLL_CTL_DEL, fd, &ev) == -1)
{
perror("epoll del error");
exit(1);
}
//std::cout << "removechannel!" << std::endl;
}
//更新事件
void Poller::UpdateChannel(Channel *pchannel)
{
int fd = pchannel->GetFd();
struct epoll_event ev;
ev.events = pchannel->GetEvents();
//ev.data.fd = fd;
ev.data.ptr = pchannel;
if(epoll_ctl(pollfd_, EPOLL_CTL_MOD, fd, &ev) == -1)
{
perror("epoll update error");
exit(1);
}
//std::cout << "updatechannel!" << std::endl;
}epoll结构体的定义
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};events的定义
| 事件宏 | 定义 |
|---|---|
| EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
| EPOLLOUT | 表示对应的文件描述符可以写 |
| EPOLLPRI | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
| EPOLLERR | 表示对应的文件描述符发生错误 |
| EPOLLHUP | 表示对应的文件描述符被挂断 |
| EPOLLET | 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 |
| EPOLLONESHOT | 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里 |
EventLoop
这个类的主要作用是维护一个处理网络事件的循环,将
Channel从activechannellist中取出,然后处理事件。构造函数 EventLoop(/* args */)
构造
vector<Functor> functorlist_,其中functor是function<void()> Functor,同时还构造两个vector<Channel*> ChannelList的数组对象,再构造一个Poller对象,然后将自身的bool标记置为true。EventLoop::EventLoop(/* args */)
: functorlist_(),
channellist_(),
activechannellist_(),
poller_(),
quit(true)
{
}循环函数 EventLoop::loop()
主要作用是循环调用前面的
epoll类的poll函数,然后从activechannellist中遍历处理每个的事件。然后清空activechannellist,并且执行函数列表里的函数完成任务。void EventLoop::loop()
{
quit = false;
while(!quit)
{
poller_.poll(activechannellist_);
//std::cout << "server HandleEvent" << std::endl;
for(Channel *pchannel : activechannellist_)
{
pchannel->HandleEvent();//处理事件
}
activechannellist_.clear();
ExecuteTask();
}
}
TcpServer
注册业务函数函数指针的部分在此处略去
构造函数 TcpServer(EventLoop* loop, int port)
- 利用初始化列表初始化
Socket的Serversocket对象,初始化循环体loop对象,初始化Channel类型的serverchannel对象,初始化链接计数conncount对象 serversocket分别进行SetReuseAddr()、BindAddress(port),Listen(),Setnonblocking()serverchannel_初始化并且设置ReadHandle和ErrorHandleTcpServer::TcpServer(EventLoop* loop, int port)
: serversocket_(),
loop_(loop),
serverchannel_(),
conncount_(0)
{
//serversocket_.SetSocketOption();
serversocket_.SetReuseAddr();
serversocket_.BindAddress(port);
serversocket_.Listen();
serversocket_.Setnonblocking();
serverchannel_.SetFd(serversocket_.fd());
serverchannel_.SetReadHandle(std::bind(&TcpServer::OnNewConnection, this));
serverchannel_.SetErrorHandle(std::bind(&TcpServer::OnConnectionError, this));
}Start()函数
- 设置channel的events,同时将channel添加到循环loop中
void TcpServer::Start()
{
serverchannel_.SetEvents(EPOLLIN | EPOLLET);
loop_->AddChannelToPoller(&serverchannel_);
}
建立新连接的处理函数
- 主要功能是接受客户端的连接请求,然后利用接收到的文件描述符创建新的控制链接的成员对象,注册业务函数并且完成调用。
//新TCP连接处理,核心功能,业务功能注册,任务分发
void TcpServer::OnNewConnection()
{
//循环调用accept,获取所有的建立好连接的客户端fd
struct sockaddr_in clientaddr;
int clientfd;
while( (clientfd = serversocket_.Accept(clientaddr)) > 0)
{
//std::cout << "New client from IP:" << inet_ntoa(clientaddr.sin_addr)
// << ":" << ntohs(clientaddr.sin_port) << std::endl;
if(++conncount_ >= MAXCONNECTION)
{
close(clientfd);
continue;
}
Setnonblocking(clientfd);
//创建连接,注册业务函数
TcpConnection *ptcpconnection = new TcpConnection(loop_, clientfd, clientaddr);
ptcpconnection->SetMessaeCallback(messagecallback_);
ptcpconnection->SetSendCompleteCallback(sendcompletecallback_);
ptcpconnection->SetCloseCallback(closecallback_);
ptcpconnection->SetErrorCallback(errorcallback_);
ptcpconnection->SetConnectionCleanUp(std::bind(&TcpServer::RemoveConnection, this, ptcpconnection));
tcpconnlist_[clientfd] = ptcpconnection;
newconnectioncallback_(ptcpconnection);
}
}
RemoveConnection(TcpConnection *ptcpconnection)
- 连接断开的操作
//连接清理
void TcpServer::RemoveConnection(TcpConnection *ptcpconnection)
{
--conncount_;
//std::cout << "clean up connection, conncount is" << conncount_ << std::endl;
tcpconnlist_.erase(ptcpconnection->fd());
delete ptcpconnection;
}
OnConnectionError()
void TcpServer::OnConnectionError() |
Setnonblocking(int fd)
void Setnonblocking(int fd) |