文章 12
评论 4
浏览 32307
C++高性能服务端编程——muduo网络库源码阅读(三)多线程模型

C++高性能服务端编程——muduo网络库源码阅读(三)多线程模型

概述

前面两篇文章简要分析了muduo中文件描述符的管理以及事件的注册和响应机制,通过这两篇文章的介绍可以大致勾勒出一个基本的事件循环模型,利用muduo编写的网络程序都基于这一模型驱动。当然,高性能网络库必然少不了对多线程的支持,本文就来探讨一下muduo中的多线程模型。

One loop per thread模型

关于并发网络服务程序的设计方案有多种,并且绝大部分都有它们的应用场景,《muduo》书中有一节专门介绍了常见的11种并发网络程序设计方案,并且比较了这几种方案的特点和优劣。muduo使用的多线程模型是“one loop per thread”,关于这种模式的特点,这里摘抄一段陈硕博客中的介绍

此种模型下,程序里的每个 IO 线程有一个 event loop (或者叫 Reactor),用于处理读写和定时事件(无论周期性的还是单次的),代码框架跟第 2 节一样。

这种方式的好处是:

  • 线程数目基本固定,可以在程序启动的时候设置,不会频繁创建与销毁。
  • 可以很方便地在线程间调配负载。

event loop 代表了线程的主循环,需要让哪个线程干活,就把 timer 或 IO channel (TCP connection) 注册到那个线程的 loop 里即可。对实时性有要求的 connection 可以单独用一个线程;数据量大的 connection 可以独占一个线程,并把数据处理任务分摊到另几个线程中;其他次要的辅助性 connections 可以共享一个线程。

对于 non-trivial 的服务端程序,一般会采用 non-blocking IO + IO multiplexing,每个 connection/acceptor 都会注册到某个 Reactor 上,程序里有多个 Reactor,每个线程至多有一个 Reactor。

muduo中实现这一模型的类主要是EventLoopThread和EventLoopThreadPool两个类,其中EventLoopThreadPool为线程池的实现,我们在下一节探讨,本节主要分析EventLoopThread类的作用。

EventLoopThread

EventLoopThread类顾名思义就是一个封装了EvetnLoop对象的线程类,它的主要功能就是启动一个新的线程,然后在这个子线程中创建一个EventLoop对象并调用其loop方法开启事件循环,我们先来看看这个类的声明

class EventLoopThread : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();

 private:
  void threadFunc();

  EventLoop* loop_ GUARDED_BY(mutex_);
  bool exiting_;
  Thread thread_;
  MutexLock mutex_;
  Condition cond_ GUARDED_BY(mutex_);
  ThreadInitCallback callback_;
};

其中 ThreadMutexLockCondition分别为muduo实现的线程类、互斥锁类和条件变量类,具体的实现可以参照muduo/base目录下的代码,这里我们并不关心他们的具体实现,只需要知道它们的大致功能即可。EventLoopThread::threadFunc函数是 thread_对象所绑定的线程函数,EventLoopThread::startLoop方法则用于开启这个线程的事件循环。一般来说,我们会在主线程中调用这个方法,而EventLoop对象是在子线程中创建的,这样就会带来一个问题:我们希望主线程能够获取到子线程EventLoop对象的指针以便交互和管理,而该对象是是在子线程中异步创建的,如何在对象创建完毕时向主线程传递对象的指针?muduo使用了条件变量来实现这一过程,我们来看下面的代码

EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  thread_.start();

  EventLoop* loop = NULL;
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait();
    }
    loop = loop_;
  }

  return loop;
}

void EventLoopThread::threadFunc()
{
  EventLoop loop;

  if (callback_)
  {
    callback_(&loop);
  }

  {
    MutexLockGuard lock(mutex_);
    loop_ = &loop;
    cond_.notify();
  }

  loop.loop();
  //assert(exiting_);
  MutexLockGuard lock(mutex_);
  loop_ = NULL;
}

loop_是EventLoopThread类的一个成员变量,构造函数中将它赋值为 NULL,主线程在调用 EventLoopThread::startLoop首先对 mutex_加锁,然后判断 loop_是否为 NULL,如果是就执行 cond_.wait()阻塞在条件变量上(此时锁会被自动释放,这个过程与条件变量的标准用法一致)。子线程在开启时会先创建一个EventLoop对象,然后对 mutex_加锁,并将对象的指针值赋给 loop_,最后调用 cond_.notify()唤醒主线程。这表明,当主线程创建一个EventLoopThread对象并启动时,它会阻塞等待子线程创建EventLoop对象,直到对象创建完毕获得指针后才会继续执行。

这里用到的线程同步工具都是muduo库实现的,事实上这些完全可以使用C++11标准库里面提供的工具替换,并且代码可以简洁很多,下面是笔者的实现代码(为简单起见,类的声明和实现放在了一起)

class EventLoopThread : noncopyable {
public:
    typedef std::function<void (EventLoop*)> ThreadInitCallback;
    EventLoopThread(const ThreadInitCallback& cb=ThreadInitCallback(), 
                    const std::string& name=std::string())
       _loop(nullptr),
         _exiting(false),
         _name(name),
         _threadInitCallback(cb)
    {}
  
    ~EventLoopThread() {
        _exiting = true;

        if (_loop != nullptr)
            _loop->quit();

        if (_thread)
            _thread->join();
    }

    EventLoop* start() {
        std::promise<EventLoop*> ptr;
        _thread.reset(new std::thread(
            &EventLoopThread::threadFunc, this, std::ref(ptr)));

        auto loop = ptr.get_future();
        return loop.get();
    }

    const std::string& name() const { return _name; }

private:
    void threadFunc(std::promise<EventLoop*>& ptr) {
        EventLoop loop;
  
        _loop = &loop;
        ptr.set_value(&loop);

        if (_threadInitCallback)
            _threadInitCallback(&loop);

        // Start loop
        loop.loop();

        // After quit loop, reset _loop as nullptr
        _loop = nullptr;
    }
    EventLoop* _loop;
    std::unique_ptr<std::thread> _thread;
    bool _exiting;
    std::string _name;
    ThreadInitCallback _threadInitCallback;
};

上面的代码需要包含 <thread><future>两个头文件,与muduo代码的不同之处在于这里利用 std::futurestd::promise两个工具类来异步获取子线程中EventLoop对象的指针而不需要直接使用条件变量,代码实现上更为简洁。

线程池

这里介绍的muduo线程池不同于一般的线程池(一个简单而优雅的C++11风格线程池实现可以看这里),muduo的线程池是EventLoop线程的“池”,线程池在启动时会开启若干个EventLoopThread对象,并且在创建时就开启事件循环监听。主线程也拥有一个EventLoop对象,线程池类会维护一个EventLoopThread对象指针的容器和各子线程EventLoop对象的指针容器,便于主线程与子线程的交互

std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;

调用EventLoopThreadPool对象的start方法启动线程池,按照预先设定的子线程数量构造EventLoopThread对象,并且在创建后就开启它们的事件循环

EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());

当服务端接收到新的连接请求时,会按照round-robin的方式将新连接的IO任务分配到子线程上,体现在下面的代码里

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

可以看到,在子线程数量大于0的情况下, next_可以看作指向当前“轮到”的子线程指针,每调用一次 EventLoopThreadPool::getNextLoop方法,指针就向后移动一位(这里是一个环形的队列)。这样做的目的是为了使IO任务尽量均匀地分配到各个工作线程上,防止某些线程负载过大或某些线程过于空闲,提高CPU资源的利用率。

总结

这篇文章介绍了muduo的多线程模型,内容相对比较简单,但也是理解muduo结构必不可少的一环,后续分析muduo服务器模型的实现时,将会介绍更多的使用细节。


标题:C++高性能服务端编程——muduo网络库源码阅读(三)多线程模型
作者:coollwd
地址:http://coollwd.top/articles/2020/11/07/1604744639176.html

Everything that kills me makes me feel alive

取消