Analyzing Ceph Network Module 1

Dec 5, 2020

The series of “analyzing ceph network module” posts explains how Ceph daemons communicate with each other. This post explains the network architecture overview.

A Ceph storage cluster consists of roughly four types of daemons: OSD Daemons (osd), Ceph Monitors (mon), Ceph Managers (mgr), and Ceph Metadata Servers (mds).

Ceph offical document provides a very high-level diagram that depicts the Ceph architecture:

image

High level Ceph architecture. [src]

But, how specifically those libraries (librbd, librgw, libcephfs, librados) are implemented to communicate with underlying Ceph daemons in RADOS? How the daemons communicate with each other to manage the Ceph cluster? This post summarizes posts regarding this question into one 1 2 3 4 5 6 7.

1. Ceph Client Network Architecture Overview

image

High level Ceph architecture. [src]

Ceph clients use Messenger class (currently the only used Messenger is AsyncMessenger) to talk to the Ceph cluster. Ceph users can use the Ceph cluster by using either librgw, librbd, cephfs (either ceph kernel or ceph-fuse, I will explain latter only), or librados. All libraries except cephfs use librados in their underlying system, which uses AsyncMessenger for communication.

As you can see, MonClient and MgrClient, which are used to connect to ceph-mon or ceph-mgr, also use AsyncMessenger. In other words, all daemons also use Messenger class to communicate with each other.

Detailed communication flow will be explained later.

2. Ceph Network Module Internal Architecture

image

Ceph network architecture.

To understand how clients and daemons communicate, we first know the internal network architecture that is used in Ceph. The figure above is a roughly drawn Ceph network architecture class diagram. Note that it is not a pure UML class diagram; the types of arrows mean:

  1. solid line with white block arrow: indicates class inheritance.
  2. solid line with line arrow: indicates ownership.
  3. dot line with line arrow: indicates reference.
image

Ceph network function execution flow. The flow includes asynchronous ones too, so you should not consider y-axis as a timeline.

The classes that I will focus on are 1. AsyncMessenger, 2. NetworkStack and Worker, 3. Processor, 4. EventCenter, and 5. AsyncConnection. Those classes are entangled and interact with each other to implement an asynchronous network mechanism.

1. AsyncMeseenger

AsyncMessenger is represented for maintaining a set of asynchronous connections, it may own a bind address and the accepted connections will be managed by AsyncMessenger.

ceph/src/msg/async/AsyncMessenger.h

AsyncMessenger manages the connections with ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns after establishing a connection via Processor class.

In its constructor, it prepares a lower network control plane by initializing NetworkStack and Processor classes. AsyncMessenger is the owner of the classes and is responsible to manage them.

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                               const std::string &type, std::string mname, uint64_t _nonce) {
  ...
  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
    "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
  // NetworkStack::create() is called in StackSingleton::ready().
  single->ready(transport_type);
  stack = single->stack.get();
  stack->start();
  ...
  unsigned processor_num = 1;
  if (stack->support_local_listen_table())
    processor_num = stack->get_num_worker();
  for (unsigned i = 0; i < processor_num; ++i)
    processors.push_back(new Processor(this, stack->get_worker(i), cct));
}

2. NetworkStack and Worker

NetworkStack is an abstracted class of currently used network stack. Ceph provides posix, RDMA, and DPDK network stack. This post only explains posix network stack: use POSIX socket API (bind(), listen(), send(), recv(), etc).

NetworkStack constructor internally creates Worker class instances running in seperate threads. Worker threads make the main thread free from blocked wait mechanism (such as epoll_wait()). NetworkStack is the owner of all corresponding Worker instances and is responsible to manage them.

src/msg/async/Stack.h

class NetworkStack {
  std::string type;
  // Overridden by PosixStack, RDMAStack, and DPDKStack.
  virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
  
 protected:
  std::vector<Wroker*> workers;
  explicit NetworkStack(CephContext *c, const std::string &t);

 public:
  static std::shared_ptr<NetworkStack> create(CephContext *c, const std::string &type);
};

// Static function. Stack can only be instantiated through this function.
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const std::string &t) {
  std::shared_ptr<NetworkStack> stack = nullptr;
  // Create a proper stack regarding the type t
  if (t == "posix")
    stack.reset(new PosixNetworkStack(c, t));
  ...

  const int InitEventNumber = 5000;
  for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
    // Calls PosixNetworkStack->create_worker(), which creates a new PosixWorker.
    Worker *w = stack->create_worker(c, worker_id);
    w->center.init(InitEventNumber, worker_id, t);
    stack->workers.push_back(w);
  }

  return stack;
}

Threads are not created in NetworkStack constructor, but in NetworkStack::start(). AsyncMessenger calls the function after creating a NetworkStack class instance.

src/msg/async/Stack.cc, src/msg/async/PosixStack.h

void NetworkStack::start() {
  ...
  for (unsigned i = 0; i < num_workers; ++i) {
    if (workers[i]->is_init())
      continue;
    std::function<void ()> thread = add_thread(i);
    spawn_worker(i, std::move(thread));
  }
}

std::function<void ()> NetworkStack::add_thread(unsigned worker_id) {
  Worker *w = workers[worker_id];
  return [this, w]() {
    ...
    while (!w->done) {
      // Calls epoll_wait with EpollDriver.
      w->center.process_events(EventMaxWaitUs, &dur);
    }
    w->reset();
    w->destroy();
  };
}

void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
  threads.resize(i+1);
  threads[i] = std::thread(func);
}

I will explain later, but this post assumes we are using Epoll for event handling mechanism. w->center.process_events() internally calls epoll_wait(), which is blocked, and wakes up when it receives an event.

When it receives a callback, it calls do_request() function of the given callback reference variable, which in this case, listen_handler variable of Processor class instance.

src/msg/async/AsyncMessenger.cc

class Processor::C_processor_accept : public EventCallback {
  Processor *pro;

 public:
  explicit C_processor_accept(Processor *p): pro(p) {}
  void do_request(uint64_t id) override {
    pro->accept();
  }
};

Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
  : msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {}

3. Processor

Processor is managed by AsyncMessenger, and is responsible for

  1. bind and listen a socket,
  2. create a callback instance when an asynchronous event arrives,
  3. and accept a request for connection establishment.

src/msg/async/AsyncMessenger.cc

int Processor::bind(const entity_addrvec_t &bind_addrs,
                    const std::set<int>& avoid_ports,
                    entity_addrvec_t* bound_addrs) {
  ...
  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
    auto& listen_addr = bound_addrs->v[k];

    worker->center.submit_to(worker->center.get_id(),
        [this, k, &listen_addr, &opts, &r]() {
          // In this case, it calls PosixWorker::listen().
          worker->listen(listen_addr, k, opts, &listen_sockets[k]);
        },
        false
    );
  }
  ...
}

int Processor::start() {
  ...
  worker->center.submit_to(worker->center.get_id(),
      [this]() {
        for (auto& listen_socket : listen_sockets) {
          worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
        }
      },
      false
  );
}

Processor:accept() is called when a connection request arrives, by listen_handler member variable of Processor class instance.

void Processor::accept() {
  ...
  ConnectedSocket cli_socket;
  Worker *w = worker;
  listen_socket.accept(&cli_socket, opts, &addr, w);
  msgr->add_accept(w, std::move(cli_socket), msgr->get_myaddrs().v[listen_socket.get_addr_slot()], addr);
  ...
}

4. EventCenter

As you saw earlier, Processor uses EventCenter to register its callback handler for asynchronous events. EventCenter manages asynchronous event mechanism by using EventDriver, which is inherited by EpollDriver, DPDKDriver, KqueueDriver, and SelectDriver. In Linux, EpollDriver is used by default.

When Processor class is started by AsyncMessenger, it calls create_file_event() of EventCenter class. It internally calls driver’s add_event(), which calls epoll_add() in EpollDriver.

src/msg/async/AsyncMessenger.cc, src/msg/async/Event.cc, src/msg/async/EventEpoll.cc

void Processor::start() {
  worker->center.submit_to(worker->center.get_id(), [this]() {
    ...
    worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
  }, false);
}

int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
  ...
  driver->add_event(fd, event->mask, mask);
  ...
}


int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
  struct epoll_event ee;
  int op = cur-mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

  ee.events = EPOLLET;
  if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN;
  if (add_mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
  ee.data.u64 = 0;
  ee.data.fd = fd;
  epoll_ctl(epfd, op, fd, &ee);
  ...
}

Note that executing epoll_ctl while another thread already called epoll_wait makes no problem. Refer to this.

While one thread is blocked in a call to epoll_pwait(), it is possible for another thread to add a file descriptor to the waited-upon epoll instance.

Therefore, while worker threads are already waiting epoll_wait() to be returned, we can add a file descriptor into the epoll.

5. AsyncConnection

This class is not explained in this post. After a connection is established, Processor::accept() creates a ConnectedSocket and AsyncConnection class instance.

Will be discussed in the next post.


  1. Ceph ASyncMessenger 简析 I ↩︎

  2. Ceph Async Messenger ↩︎

  3. Ceph ASync Messenger ↩︎

  4. Ceph async network communication source code analysis 1 ↩︎

  5. Ceph AsyncMessenger source analysis (below) ↩︎

  6. Ceph network module - AsyncMessenger data structure analysis ↩︎

  7. Ceph network module - AsyncMessenger code flow analysis ↩︎

comments powered by Disqus