文章 12
评论 4
浏览 20907
C++高性能服务端编程——muduo网络库源码阅读(四)TcpConnection类

C++高性能服务端编程——muduo网络库源码阅读(四)TcpConnection类

概述

前面几篇文章讲了muduo的事件驱动循环模型以及相关的实现与优化,它们是muduo网络库实现高并发和高吞吐量的基础,但这都没有涉及到网络编程相关的内容,这篇文章开始我们从muduo的TcpConnection类入手,探讨muduo网络部分的实现。

网络编程中的高性能IO实现探讨

在分析TcpConnection类的源码前,我们先来简单探讨下网络编程中如何最大化CPU资源的利用率,保证高并发场景下的吞吐率。在Linux系统中,用户对某个文件进行的IO操作是分两个阶段的,以读数据为例,用户程序调用read系统调用时,系统的行为包括:(1)等待数据准备好,数据会被拷贝到内核的缓冲区中。(2)数据准备完毕后,内核会将数据从内核缓冲区拷贝到用户空间。这个过程涉及到同步异步、阻塞非阻塞的概念

  • 同步和非同步:所谓同步IO是指当用户进程触发IO操作时会原地等待或轮询查看数据是否就绪,一旦就绪则对数据进行内核空间和用户空间之间的拷贝,用户进程同样需要等待这个过程完成,而异步IO则是进程触发IO操作后就去做其他事情,由内核完成整个IO操作并通知进程。换句话说,同步IO是一种顺序操作,用户进程需要等待IO这件事的结果才能进行下一步操作,而异步IO则无需等待。
  • 阻塞和非阻塞:同样以读数据为例,如果进程在调用read函数读时没有数据可读,阻塞情况下进程就会阻塞在这个系统调用上,直到有数据可读为止,而非阻塞情况下系统调用会直接返回一个错误。用户进程进行IO操作时,很多情况下是不能一次性完成的,例如进程向某个套接字写入大量数据,如果网络环境较差,内核的TCP发送缓冲区被数据填满等待发送,新的数据就无法写入。此时进程的做法就分为两种:阻塞情况下,进程会放弃CPU一直阻塞在系统调用上,直到数据可以发送,而在非阻塞情况下,系统调用会直接返回,并设置errno,等待用户的下一次调用。

根据某个IO为同步或异步、阻塞或非阻塞,可以将Linux下的IO操作分为阻塞IO、非阻塞IO、IO多路复用、信号驱动IO和异步IO五类,它们的特点如下:

  • 阻塞IO:这是一种同步模型,也是最简单的一种IO模型,用户程序调用系统调用进行IO时,在等待数据就绪的两个阶段,整个进程(线程)都被阻塞,无法进行其他操作。这种模型虽然简单,但效率较低。
  • 非阻塞IO:这也是一种同步模型,与阻塞IO不同之处在于进程在等待数据阶段不会被阻塞,而是通过轮询的方式查看数据是否准备好,但第二阶段仍然会被阻塞。这种方式下进程在等待数据可读或可写时可以去做其他事情,但如果轮询间隔较长,就会导致IO操作延迟变大,影响整体吞吐率。间隔过短则会导致进程需要频繁检查,造成CPU资源的浪费。
  • IO多路复用:前面的文章多次提到过IO多路复用模型,这也是muduo所使用的IO模型。IO多路复用机制依赖于Linux的select/poll/epoll函数,它的引入避免了非阻塞IO中进程频繁轮询造成的CPU资源浪费,IO操作的第一阶段不再由用户轮询,而是内核负责监视,一旦满足条件就通知进程进行数据拷贝操作,这个阶段依旧是阻塞的。这种方式下数据准备阶段进程不再直接阻塞在IO的系统调用上,而是阻塞在select/poll/epoll系统调用上。就单个文件描述符而言,IO多路复用并不具备比阻塞IO更大的优势,甚至因为它需要两次系统调用,其开销反而更大,但IO多路复用模型最大的优势在于它的“多路复用”,利用这个机制可以同时监视多个文件描述符,一旦有文件描述符就绪就可以进行IO操作,这样一个线程就可以处理多个客户端请求,无需创建大量线程,总体的系统开销小。需要强调的是,IO多路复用模型仍然是同步IO模型
  • 信号驱动IO:这种IO方式下,进程在等待数据准备时同样不陷入阻塞,内核准备好数据后会向用户进程发送一个信号,进程捕捉该信号后进入信号处理函数进行IO操作。
  • 异步IO:异步IO下进程调用相关系统调用后,无论内核是否将数据准备好都会立即返回,两个阶段的拷贝操作均由内核完成,因此进程均不会被阻塞,内核将数据拷贝到指定的用户空间缓冲区后会通知用户进程。

关于这五种模型更具体的介绍可以查看《Unix网络编程:卷一》6.2节,muduo的IO模型正是IO多路复用模型,前面花了很大篇幅介绍的EventLoop就是这种IO方式的核心,在具体的网络IO中,为了达到更高的资源利用率和吞吐率,还涉及到一些其他的细节。

非阻塞IO+IO多路复用

通常情况下,IO多路复用模型总是和非阻塞IO同时使用的。对于非阻塞IO而言,我们不应该使用轮询的方式检查数据是否准备好,这样太浪费CPU资源,在IO多路复用模型中,我们则不应使线程在read、write等系统调用上阻塞,因为这时线程就不能去处理其他事情,依旧会造成浪费。

前面介绍过,muduo是一个事件驱动的网络编程库,所谓的“事件驱动”简单来说就是“需要我做”并且“允许我做”的时候才进行IO操作,否则就阻塞等待事件发生,而是否需要以及是否可以IO就是一个个具体的事件。muduo中所有的文件描述符都是以非阻塞的方式打开的,这意味这muduo中的工作线程不会阻塞在read、write等系统调用上等待数据准备(但第二阶段真正的数据拷贝操作仍然是阻塞的,因为这是一个同步IO模型),只会阻塞在poll/epoll系统调用上,这也符合我们的直觉,因为如果线程阻塞在poll/epoll系统调用上,说明该线程所负责的所有连接都没有客户端消息或请求,或者有IO的需求但网络环境不允许,这些情况的改善显然不是运行在应用层的网络库负责的,线程只能阻塞等待。而其他情况下线程都会有需要处理的事件,因此不应当被阻塞。

应用层缓冲区

《muduo》书中专门强调了应用层缓冲区在实现高性能IO中的重要作用,如前所述,我们引入非阻塞IO的目的就是不让线程阻塞在IO的系统调用上,最大程度复用线程资源。对于每个TCP套接字,网络库应当分别为它们配备输入和输出缓冲区,《muduo》书中详细论述了这一措施的必要性,这里做一个总结

  • 输出缓冲区:依照作者陈硕的观点,应用程序只负责生成数据并调用网络库的接口发送,至于数据如何发送、分几次发送都不应当是应用程序需要考虑的,网络库将接管这些数据并负责将其完全发送出去,这样一来输出缓冲区就是必须的。对于网络库而言,它必须保证数据的完整性和有序性,程序通过write系统调用向套接字写入数据时,受TCP拥塞控制协议影响,很多时候是不能一次性发送完毕的,那么剩下未发送的数据就存入输出缓冲区中,由网络库负责在适当的时候将其发送,这就是所谓的“完整性”。此外,如果缓冲区中尚有未发送数据,此时应用程序又有发送数据的请求,那么就应该将新的数据放在缓冲区数据之后,并择机一起发送,这里的“有序性”并不关心不同批次数据的顺序,而是保证每次的数据是连续发送的,不会混入其他批次的数据。
  • 输入缓冲区:引入输入缓冲区的逻辑与输出缓冲区类似,同样是需要保证数据的完整和有序,TCP是一个无边界的字节流协议,网络库每次读取数据时必须一次性读完,这同样需要缓冲区的帮助。

muduo中的缓冲区Buffer类内部是用一个 std::vector来存储数据的,对外是一片连续的内存,更多的设计细节可以参阅《muduo》书中的讲解,本文不关注Buffer类的代码实现。

最后,再探讨下我对muduo IO模型的理解,前面介绍多线程模型时提到过muduo的“one loop per thread”模型,这种模式下每个线程会开启一个EventLoop处理各类事件,每当有新的连接到达时,会通过round-robin的方式将新的Channel注册到某个线程的循环里。这种方式很简单,也很符合我们的直觉,并且在大多数情况下是非常高效的,之所以用“大多数”这个词是因为现实场景中往往会遇到一些我们无法预知的情况。

如果我们设置线程池的大小和CPU核心数一致,那么这些线程在物理上是可以做到并行执行的,利用round-robin的方式使每个线程处理的客户端连接数相近,理想情况下我们希望每个线程的负载是均衡的,即计算及IO的工作量接近,但实际上可能出现某个或某几个线程上的客户端请求特别频繁,这些线程一直忙于处理各种计算和IO事件,但某些线程的客户端并不活跃,这些线程大多数时间阻塞在Poller上,这就造成了线程资源的浪费,使吞吐率受限。另一种可能出现的情况是,某个TCP连接数据量或计算量较大,长时间占用线程资源,会导致这个线程上其他客户端的响应延迟变大,影响整体吞吐率。对于类似的问题,我们可以从两个层面进行考虑。首先,从网络库层面考虑,round-robin的方式并不能保证所有情况下都能有非常高的效率,可以考虑对TCP连接的Channel进行动态分配,如果出现前面提到的第一种情况,可以将忙碌线程上的连接与闲置线程的不活跃连接交换,实现动态的负载均衡,当然这种方式实现起来十分困难,我也不知道该怎么实现......🤥。另一个就是从业务层面考虑,可以在业务设计时将数据处理和计算任务拆分到其他线程上。

值得一提的是,线程数的设置也是十分关键的,前面的讨论中是将线程数设置为与物理核心数一致,这种物理上并行的线程在前面讨论的场景中不见得能有非常高的效率,我们应当根据业务特点合理设置线程数量。对于计算密集型的业务场景,线程数不宜过多,否则线程之间频繁切换的开销反而会拖慢整体计算。对于IO密集型业务,可以考虑将线程数设置得大一些(超过物理核心数),这样某个线程因为IO而被阻塞时(这里需要强调一下,虽然我们前面提到muduo使用的是非阻塞IO,但这仅仅指IO的第一个阶段不会被阻塞,由于muduo的模型仍属于同步IO模型,因此同样是需要等一个IO操作完全完成后才能进行下一步操作,用户线程在第二阶段拷贝数据时依旧是处于阻塞状态的),就可以让出CPU执行其他线程的计算需求,甚至可以让传输数据量大的连接独占一个线程,该线程大部分时间处于IO当中不会占用太多CPU资源,并且程序也无需等待这个连接的IO完成才能去处理其他连接的事件。

TcpConnection类

上一节对muduo的IO模型做了一个总体的介绍,但很少涉及到一个具体的TCP连接如何处理,如何保证数据的完整接收与发送,以及如何处理连接的建立与断开,这个小节我们就通过TcpConnection类的源码来分析如何实现这些需求。TcpConnection类是muduo中最复杂的一个类,正因为网络的IO不确定因素较多,因此需要网络库能考虑到各种情况的处理。

TcpConnection类的作用比较复杂,我们从构造函数开始分析它的源码

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
  channel_->setReadCallback(
      std::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      std::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      std::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      std::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}

其中Socket类是muduo中一个封装了套接字的类,使用了类似智能指针的RAII机制控制套接字的关闭。此外,每个TcpConnection对象都会配备一个输入缓冲区和输出缓冲区。

Buffer inputBuffer_;
Buffer outputBuffer_;

前面已经分析过Channel类的作用,一个TCP套接字就是一个文件描述符,需要为它设置一个Channel类并注册回调函数,TCP连接中遇到的情况十分复杂,因此需要在Channel中注册多个事件的回调函数,包括可读事件回调 TcpConnection::handleRead,可写事件回调 TcpConnection::handleWrite,客户端关闭事件回调 TcpConnection::handleClose,错误事件回调 TcpConnection::handleError。除了这些关键的回调以外,TcpConnection类还可以设置一些与业务相关的回调函数,定义如下(为了更直观,这里将回调函数的详细类型展示出来)

std::function<void (const TcpConnectionPtr&)> connectionCallback_;    // 连接建立和销毁时的回调函数
std::function<void (const TcpConnectionPtr&,
                            Buffer*,
                            Timestamp)> messageCallback_;  // 成功接收到消息时的回调函数
std::function<void (const TcpConnectionPtr&)> writeCompleteCallback_;    // 发送完一次完整数据的回调函数
std::function<void (const TcpConnectionPtr&, size_t)> highWaterMarkCallback_;    // 缓冲区数据高水位回调函数
std::function<void (const TcpConnectionPtr&)> closeCallback_;            // 连接关闭的回调函数

先来看读事件回调函数

void TcpConnection::handleRead(Timestamp receiveTime)
{
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}

当有客户端消息达到时,调用Buffer类的 Buffer::readFd函数从文件中读取数据,调用这个函数时会在输入缓冲区的基础上新增一个64KB大小的缓冲区,并调用Linux的 readv函数来将数据依次读入两个缓冲区,这是为了每次尽可能将所有数据一次性读完,n表示读取数据的字节数,如果n为0则说明客户端断开了连接,调用 TcpConnection::handleClose关闭连接,其他情况则表示出错, 调用 TcpConnection::handleError处理,错误回调函数的行为很简单,只是在日志中记录错误信息

void TcpConnection::handleError()
{
  int err = sockets::getSocketError(channel_->fd());
  LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

连接关闭事件的回调函数如下

void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);
  channel_->disableAll();

  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);
  // must be the last line
  closeCallback_(guardThis);
}

一旦客户端断开连接,则由该函数负责“清理残局”,设置连接状态,并依次调用 connectionCallback_closeCallback_处理连接关闭时的业务。

顺便提一下这里的连接状态,这是一个枚举类型,它们的含义如下

enum StateE { kDisconnected,  // 尚未建立连接
              kConnecting,    // 处于建立连接过程,处理连接建立时的相关操作
              kConnected,     // 连接建立
              kDisconnecting }; // 关闭连接过程,等待剩余未发送的数据发送完毕后关闭连接

TcpConnection类有一个非常重要的函数 send,调用它可以向对方发送数据,并且不必关心 send函数如何将数据发送,这个过程对调用者透明,它有若干个重载的版本

void send(const void* message, int len);
void send(const StringPiece& message);
void send(Buffer* message);  // this one will swap data

这个函数是线程安全的,它保证不同线程的数据完整连续地发送出去,但并不保证不同线程数据间的发送顺序,实现线程安全的方式也很简单,我们来看一个典型的重载版本的代码

void TcpConnection::send(Buffer* buf)
{
  if (state_ == kConnected)
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(buf->peek(), buf->readableBytes());
      buf->retrieveAll();
    }
    else
    {
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                    this,     // FIXME
                    buf->retrieveAllAsString()));
                    //std::forward<string>(message)));
    }
  }
}

如果是这个连接所属的loop线程调用该函数,那么就直接利用 sendInLoop将数据发送出去,否则将所有数据拷贝一份由loop线程来发送,也就是说虽然 所有线程都可以调用 send函数发送数据,但实际上所有数据是由这个连接所属于的loop线程统一发送的,这其中数据的拷贝会造成一定开销,但为了实现线程安全花费这样的开销是值得的。这样一来,所有数据实际上都是由 sendInLoop函数负责发送的,它的实现代码如下

void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool faultError = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
        {
          faultError = true;
        }
      }
    }
  }

  assert(remaining <= len);
  if (!faultError && remaining > 0)
  {
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();
    }
  }
}

除了一些必要的检查以外,这个函数实际上主要做两件事情:(1)首先,如果语句 !channel_->isWriting() && outputBuffer_.readableBytes() == 0的条件成立,说明现在没有要等待发送的数据,可以直接调用 write函数向套接字中写入数据,否则说明此前有未发送的数据,那么新的数据就不能直接发送。这里设定了几个重要的变量,remaining表示剩余的数据字节数,初始值为需要发送的数据总字节数 len,调用 write会返回一个值 nwrote表示已经发送的字节数,如果这个值小于0则说明发生错误,需做相关的判断, remaining减去这个值就是剩余的字节数,若 remaining为0,说明发送完毕。(2)如果发送前发现缓冲区中存在待发送的数据,或者是没有一次将所有数据发送完,那么就将剩余的数据插入缓冲区末尾,执行 channel_->enableWriting()设置epoll监听可写事件,等待下一次可以向套接字写入数据的时机,届时将缓冲区内数据按顺序发送出去。这样,利用一个应用层的输出缓冲区就可以保证 send函数将每次的数据完整有序地发送出去。

设置完可写事件监听后,一旦套接字可以写入数据,那么就会调用 TcpConnection::handleWrite函数将缓冲区内未发送的数据发送出去,可写事件回调函数实现如下

void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)
      {
        channel_->disableWriting();
        if (writeCompleteCallback_)
        {
          loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)
        {
          shutdownInLoop();
        }
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

首先直接调用 write函数向套接字内写入数据,如果能发送完缓冲区内数据,那么就执行 channel_->disableWriting停止监听可写事件,同时向loop内插入 writeCompleteCallback_回调函数执行相关业务。如果不能发送完成,那么就继续监听可写事件,重复这个过程直到所有数据发送完毕。值得一提的是,这个函数在数据发送完毕后有这样一个判断语句

if (state_ == kDisconnecting)
{
    shutdownInLoop();
}

如前所述,kDisconnecting表示一个处于关闭过程的状态,TcpConnection类提供了一些接口让服务端主动关闭连接,对于应用程序而言,只需调用这些接口来主动关闭连接即可,无需关心是否还有数据没有发送。关闭连接时,首先将状态设置为 kDisconnecting,如果发现缓冲区还有未发送的数据(即 channel_->isWriting()条件成立),那么程序不会立即关闭这个连接,而是等待数据发送完后再关闭,整个过程的代码如下

void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}

理解了这些,我们就能总结出 send函数的总体逻辑

  • 首先判断是否为loop线程调用,如果是则可以发送数据,否则拷贝一个数据的副本,调用runInLoop函数等待loop线程来发送。
  • 发送数据时,如果缓冲区内有数据,就将数据插入缓冲区中数据后面,等待套接字可写时一起被发送。如果缓冲区内无数据,那么就直接调用write函数将数据写入套接字,如果写入完成就执行writeCompleteCallback_回调,否则将剩余数据存入缓冲区,并设置监听可写事件,等待套接字下次可写时将剩余数据发送出去。

标题:C++高性能服务端编程——muduo网络库源码阅读(四)TcpConnection类
作者:coollwd
地址:http://coollwd.top/articles/2020/11/28/1606552249169.html

Everything that kills me makes me feel alive

取消