Skip to main content

Analyzing Ceph Network Module

This post explains how Ceph daemons and clients communicate with each other, with Ceph network architecture.

Ceph offical document provides a very high-level diagram that depicts the Ceph architecture:

image
High level Ceph architecture. [src]

However, I could not find out detailed documents explaining how clients with librados actually communicate with daemons, except a few blog post 1 2 3 4 5 6 7. Even after reading those, I was not clear how they work. Therefore, I visualize the execution flow for network communication of Ceph and summarizes the concept here.

1. Ceph Network Architecture Overview #

image
Ceph network class diagram.

Explanations of three types of arrows used in the class diagram:

  1. solid line with white block arrow: indicates class inheritance.
  2. solid line with line arrow: indicates ownership.
  3. dot line with line arrow: indicates reference.

Roughly those classes can be grouped into four categories:

  • Messenger, Connecttion, and Processor
  • Dispatcher, DispatchQueue, and DispatchThread (DispatchThread is not illustrated in the class diagram)
  • NetworkStack and Worker
  • EventCenter and EventDriver
image
Function execution flow diagram when OSD accepts a connection and communicate.
image
Function execution flow diagram when a client connects to a peer server and communicate using librados.

The function execution flows illustrated above is a rough illustration of function calls happening in connection establishment. First one is by OSD, and the other one is by librados. It would be helpful to see these flow diagram with the code below.

2. Messenger, Connection, and Processor #

Messenger #

Central to the architecture is Messenger class. Currently only AsyncMessenger is provided and used. Messenger manages all the other class instances: connections, processors, a dispatch queue, a network stack (which manages workers, which owns an EventCenter, which owns an EventDriver), and dispatchers.

Every time it connected to a peer, it creates an AsyncConnection class and returns it. To create a connection, AsyncMessenger provides the following two functions:

  • connect_to(): connect to peers with the given entity_addrvec_t& addrs address information.
  • bindv(): bind a server socket so that it can accept a connection establishment request.

Usage example (connect_to() by client):

librados::RadosClient::connect()
-> MonClient::authenticate()
-> MonClient::_reopen_session()
-> MonClient::_add_conn()
-> Messenger::connect_to_mon()
-> AsyncMessenger::connect_to()

src/librados/RadosClient.cc

int librados::RadosClient::connect() {
  ...
  monclient.build_initial_monmap();
  messenger = Messenger::create_client_messenger(cct, "radosclient");
  ...
  monclient.set_messenger(messenger);
  messenger->add_dispatcher_tail(this);
  messenger->start();
  ...
  monclient.init();
  monclient.authenticate(conf->client_mount_timeout);
  ...
}

src/mon/MonClient.cc

int MonClient::authenticate(double timeout) {
  ...
  if (!_opened())
    _reopen_session();

  ...
  auth_cond.wait(lock);
  ...
}

void MonClient::_reopen_session(int rank) {
  active_con.reset();
  pending_cons.clear();

  if (rank >= 0) _add_conn(rank, global_id);
  else _add_conns(global_id);
  ...

  for (auto& c : pending_cons) {
    c.second.start(monmap.get_epoch(), entity_name);
  }
  ...
}

int MonClient::_add_conn(unsigned rank, uint64_t global_id) {
  auto peer = monmap.get_addrs(rank);
  // type: ConnectionRef == ceph::ref_t<Connection>
  //                     == boost::intrusive_ptr<Connection>
  auto conn = messenger->connect_to_mon(peer);
  ...
}

src/msg/Messenger.h

// AsyncMessenger inherits this class
class Messenger {
  ...
  virtual ConnectionRef connect_to(
    int type, const entity_addrvec_t& dest,
    bool anon=false, bool not_local_dest=false) = 0;
  ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
        bool anon=false, bool not_local_dest=false) {
	  return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
        bool anon=false, bool not_local_dest=false) {
	  return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
        bool anon=false, bool not_local_dest=false) {
	  return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
  }
  ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
        bool anon=false, bool not_local_dest=false) {
	  return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
  }
  ...
}

Usage example (bindv() by osd server):

main() (ceph-osd.cc)
-> AsyncMessenger::bindv()

src/ceph-osd.cc

int main(int argc, const char **argv) {
  ...
  Messenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,
					   entity_name_t::OSD(whoami), "client", nonce);
  Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,
					    entity_name_t::OSD(whoami), "cluster", nonce);
  Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
					     entity_name_t::OSD(whoami), "hb_back_client", nonce);
  Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
					     entity_name_t::OSD(whoami), "hb_front_client", nonce);
  Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
						   entity_name_t::OSD(whoami), "hb_back_server", nonce);
  Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
						    entity_name_t::OSD(whoami), "hb_front_server", nonce);
  Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msg_type,
					     entity_name_t::OSD(whoami), "ms_objecter", nonce);
  ...

  ms_public->bindv(public_addrs);
  ms_cluster->bindv(cluster_addrs);
  ms_hb_front_server->bindv(hb_front_addrs);
  ms_hb_front_client->client_bind(hb_front_addrs.front());
  ms_hb_back_server->bindv(hb_back_addrs);
  ms_hb_back_client->client_bind(hb_back_addrs.front());
  ...

  ms_public->start();
  ms_hb_front_client->start();
  ms_hb_back_client->start();
  ms_hb_front_server->start();
  ms_hb_back_server->start();
  ms_cluster->start();
  ...
}

Connection #

AsyncConnection maintains a logic session between two endpoints. In other words, a pair of addresses can find the only AsyncConnection. AsyncConnection will handle with network fault or read/write transactions. If one file descriptor broken, AsyncConnection will maintain the message queue and sequence, try to reconnect peer endpoint.

When a client connects to a peer server or a server accepts a connection request from a client peer, an AsyncConnection class instance is created. It is managed by AsyncMessenger, and any read or write is at first handled by the AsyncConnection.

AsyncMessenger::connect_to()
-> AsyncMessenger::create_connect()
-> ceph::make_ref<AsyncConnection>()
-> new ceph::ref_t<AsyncConnection>({new AsyncConnection(...), false})
    == boost::intrusive_ptr<AsyncConnection>({new AsyncConnection(...), false})

src/msg/async/AsyncMessenger.cc

ConnectionRef AsyncMessenger::connect_to(int type,
					const entity_addrvec_t& addrs,
					bool anon, bool not_local_dest) {
  ...
  AsyncConnectionRef conn = _lookup_conn(av);
  if (!conn) {
    conn = create_connect(av, type, false);
  }
  return conn;
}

AsyncConnectionRef AsyncMessenger::create_connect(
          const entity_addrvec_t& addrs, int type, bool anon) {
  ...
  Worker *w = stack->get_worker();
  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
						target.is_msgr2(), false);
  conn->connect(addrs, type, target);
  return conn;
}

Server processes create an AsyncConnection when it receives a connection establishment request, which means we need to discuss more about asynchronous mechanism how accept() callback is called for full understanding. Here, however, let’s just show a simple class instantiation only.

Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> accept_cloexec())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>()

src/msg/async/AsyncMessenger.cc

class Processor::C_processor_accept : public EventCallback {
  Processor *pro;
  explicit C_processor_accept(Processor *p): pro(p) {}
  void do_request(uint64_t id) override {
    pro->accept();
  }
};

void Processor::accept() {
  ...
  for (auto& listen_socket : listen_sockets) {
    ...
    ConnectedSocket cli_socket;
    listen_socket.accept(&cli_socket, opts, &addr, w);
    msgr->add_accept(
            w, std::move(cli_socket),
            msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
            addr);
    continue;
  }
}

// listen_socket.accept() in src/msg/async/PosixStack.cc
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
  int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
  ...

  std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
  *sock = ConnectedSocket(std::move(csi));
  return 0;
}

// msgr->add_accept() in src/msg/async/AsyncMessenger.cc
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
				const entity_addr_t &listen_addr,
				const entity_addr_t &peer_addr) {
  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
						listen_addr.is_msgr2(), false);
  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
  accepting_conns.insert(conn);
}

Processor #

Currently, Processor seems to be used only by servers, and not used by clients. It is responsible for:

  • bind(): bind and listen to a socket so that the socket can receive a request.
  • accept(): handle accept callback event.

When binding a server socket:

AsyncMessenger::bindv()
-> Processor::bind()
-> PosixWorker::listen()
-> ::bind() and ::listen() and new PosixServerSocketImpl()

src/msg/async/AsyncMessenger.cc

int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) {
  ...
  for (auto &&p : processors) {
    p->bind(bind_addrs, avoid_ports, &bound_addrs);
  }
  ...
}

int Processor::bind(const entity_addrvec_t &bind_addrs,
		    const std::set<int>& avoid_ports,
		    entity_addrvec_t* bound_addrs) {
  for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
    worker->center.submit_to(
      worker->center.get_id(),
      [this, k, &listen_addr, &opts, &r]() {
        r = worker->listen(listen_addr, k, opts, &listen_socket[k]);
      }, false
    );
  }
}

Now the lambda function that contains worker->listen() call is submitted to the EventCenter and will be executed asynchronously later, but let’s see just how it looks first. Will be discussed the asynchronous mechanism later.

src/msg/async/PosixStack.cc

int PosixWorker::listen(entity_addr_t &sa, unsigned addr_slot,
			const SocketOptions &opt, ServerSocket *sock) {
  int listen_sd = net.create_socket(sa.get_family(), true);
  ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
  ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);

  *sock = ServerSocket(
      std::unique_ptr<PosixServerSocketImpl>(
          new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
  return 0;
}

And the created socket file descriptor has an event, EventCenter calls Processor::C_processor_accept::do_request() as a callback (Will discuss it later too). This callback function actually accepts a request and create a ConnectedSocket and an AsyncConnection.

Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> accept_cloexec(), and new PosixConnectedSocketImpl())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>()

This process is already handled in Messenger section, so I would skip duplicated explanation here.

3. Dispatcher, DispatchQueue, and DispatchThread #

Dispatcher is a virtual class that declares several callback handlers, and several classes inherit Dispatcher class and override the callback handlers to implement their functionalities. For example, OSD (src/osd/OSD.h) inherits Dispatcher and implements ms_dispatch(Message *m) to handle messages from clients. librados::RadosClient (src/librados/RadosClient.h) also inherits Dispatcher to wait replies from server processes.

src/osd/OSD.h

class OSD : public Dispatcher, public md_config_obs_t {

private:
  bool ms_can_fast_dispatch_any() const override { return true; }
  bool ms_can_fast_dispatch(const Message *m) const override {
    ...
  }

  void ms_fast_dispatch(Message *m) override;
  bool ms_dispatch(Message *m) override;
  void ms_handle_connect(Connection *con) override;
  void ms_handle_fast_connect(Connection *con) override;
  void ms_handle_fast_accept(Connection *con) override;
  int ms_handle_authentication(Connection *con) override;
  bool ms_handle_reset(Connection *con) override;
  void ms_handle_remote_reset(Connection *con) override {}
  bool ms_handle_refused(Connection *con) override;
  ...
}

src/librados/RadosClient.h

class librados::RadosClient : public Dispatcher, public md_config_obs_t {
private:
  bool _dispatch(Message *m);
  bool ms_dispatch(Message *m) override;

  void ms_handle_connect(Connection *con) override;
  bool ms_handle_reset(Connection *con) override;
  void ms_handle_remote_reset(Connection *con) override;
  bool ms_handle_refused(Connection *con) override;
  ...
}

The ms_handle_*() functions will be called by DispatchThread, when a queue item is enqueued into the DispatchQueue. DispatchThread and an actual thread is created when you call Messenger::add_dispatcher_head() or Messenger::add_dispatcher_tail() for the first time: these functions are required to be called by dispatchers so that Messenger can call registered dispatcher functions.

For OSD daemon, an OSD class instance (which inherited Dispatcher) adds itself to an AsyncMessenger class instance by calling add_dispatcher_tail(this) during OSD initialization:

main() (ceph-osd.cc)
-> OSD::init()
-> AsyncMessenger::add_dispatcher_tail()
-> AsyncMessenger::ready()
-> DispatchQueue::start()
-> Thread::create()
-> Thread::try_create()
-> pthread_create()
-> (thread) Thread::entry_wrapper()
-> (thread) DispatchThread::entry()
-> (thread) DispatchQueue::entry()

src/msg/Messenger.h

void Messenger::add_dispatcher_tail(Dispatcher *d) {
  bool first = dispatchers.empty();
  dispatchers.push_back(d);
  if (d->ms_can_fast_dispatch_any())
    fast_dispatchers.push_back(d);
  if (first)
    ready();
}

src/msg/async/AsyncMessenger.cc

void AsyncMessenger::ready() {
  stack->ready();
  ...
  
  dispatch_queue.start(); // Here, dispatch queue is started.
}

src/msg/DispatchQueue.cc

void DisaptchQueue::start() {
  ...
  dispatch_thread.create("ms_dispatch");
}

src/common/Thread.cc

// DispatchThread inherits Thread.
void Thread::create(const char *name, size_t stacksize) {
  ...
  try_create(stacksize);
}

int Thread::try_create(size_t stacksize) {
  ...
  pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
  ...
}

void *Thread::_entry_func(void *arg) {
  void *r = ((Thread*)arg)->entry_wrapper();
  return r;
}

void *Thread::entry_wrapper() {
  ...
  return entry();
}

// Note that entry() of DispatchThread is overridden:
// in src/msg/DispatchQueue.h
class DispatchThread : public Thread {
  DispatchQueue *dq;

public:
  explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
  void *entry() override {
    dq->entry();
    return 0;
  }
} dispatch_thread;

Note that dq->entry() calls DispatchQueue::entry(), which runs in an additional dispatch thread, and it handles enqueued items by calling Messenger::ms_deliver_handle_*() functions that actually calls Dispatchers' callback functions:

src/msg/DispatchQueue.cc

void DispatchQueue::entry() {
  while (true) {
    while (!mqueue.empty()) {
      QueueItem qitem = mqueue.dequeue();
      ...
      if (qitem.is_code()) {
        switch (qitem.get_code()) {
          case D_BAD_REMOTE_RESET:
            msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
            break;
          case D_CONNECT:
            msgr->ms_deliver_handle_connect(qitem.get_connection());
            break;
          case D_ACCEPT:
            msgr->ms_deliver_handle_accept(qitem.get_connection());
            break;
          case D_BAD_RESET:
            msgr->ms_deliver_handle_reset(qitem.get_connection());
            break;
          case D_CONN_REFUSED:
            msgr->ms_deliver_handle_refused(qitem.get_connection());
            break;
          default:
            ceph_abort();
        }
      } else {
        const ref_t<Message>& m = qitem.get_message();
        uint64_t msize = pre_dispatch(m);
        msgr->ms_deliver_dispatch(m);
        post_dispatch(m, msize);
      }
    }

    l.lock();
  }

  if (stop)
    break;

  // wait for something to be put on queue
  cond.wait(l);
}

For instance, if a message is delivered, msgr->ms_deliver_dispatch(m) is called:

src/msg/Messenger.h

void Messenger::ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
  m->set_dispatch_stamp(ceph_clock_now());
  for (const auto &dispatcher : dispatchers) {
    dispatcher->ms_dispatch2(m);
  }
}

// Note that ms_dispatch2 is a tricky implementation that actually calls ms_dispatch.
// src/msg/Dispatcher.h
/* ms_dispatch2 because otherwise the child must define both */
virtual bool Dispatcher::ms_dispatch2(const MessageRef &m) {
  MessageRef mr(m);
  ms_dispatch(mr.get());
}

Here, we know that Dispatcher class instances added themselves into the Messenger, so they are stored in dispatchers variable: the Messenger iterates those added dispatchers and call the implemented ms_dispatch() function.

4. NetworkStack and Worker #

NetworkStack represents which network stack you want to use for communication. Currently Ceph provides POSIX, RDMA, and DPDK (PosixNetworkStack, RDMANetworkStack, and DPDKNetworkStack, respectively). PosixNetworkStack, for example, uses POSIX library APIs for network communication (e.g. accept_cloexec(), connect(), sendmsg(), and read()).

NetworkStack spawns multiple worker threads with Worker class instance. These threads are independent from dispatch thread; Worker threads are responsible to wait any messages from peers and send messages to them.

NetworkStack and Workers are created when AsyncMessenger is created.

// Clients uses Messenger::create_client_messenger() to call AsyncMessenger::create().
// Servers directly calls AsyncMessenger::create().
AsyncMessenger::create() (new AsyncMessenger() -> NetworkStack::create() -> new PosixNetworkStack() and NetworkStack::create())
-> PosixNetworkStack::start()
-> PosixNetworkStack::spawn_worker()
-> std::thread()
-> (thread) lambda function defined in NetworkStack::add_thread()

src/msg/Messenger.cc and src/msg/async/AsyncMessenger.cc

// in src/msg/Messenger.cc
Messenger *Messenger::create(CephContext *cct, const std::string &type,
			     entity_name_t name, std::string lname, uint64_t nonce) {
  return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
}

// in src/msg/async/AsyncMessenger.cc
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                               const std::string &type, std::string mname, uint64_t _nonce)
    : SimplePolicyMessenger(cct, name),
      dispatch_queue(cct, this, mname),
      nonce(_nonce) {
  std::string transport_type = "posix";
  // StackSingletone ensures that there is exactly only one network stack in the process.
  // Return type: StackSingleton&
  auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
    "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
  single->ready();
  stack = single->stack.get();
  stack->start();
  ...
}

struct StackSingletone {
  ...
  void ready(std::string &type) {
    if (!stack) stack = NetworkStack::create(cct, type);
  }
};

src/msg/async/Stack.cc and src/msg/async/PosixStack.h

std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
						   const std::string &t) {
  std::shared_ptr<NetworkStack> stack = nullptr;
  if (t == "posix") stack.reset(new PosixNetworkStack(c));
  ...

  for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
    Worker *w = stack->create_worker(c, worker_id);
    ...
  }
  return stack;
}

Worker *PosixNetworkStack::create_worker(CephContext *c, unsigned worker_id) override {
  return new PosixWorker(c, worker_id);
}

src/msg/async/Stack.cc and src/msg/async/PosixStack.h

void NetworkStack::start() {
  ...
  for (unsigned i = 0; i < num_workers; ++i) {
    if (workers[i]->is_init()) continue;
    std::function<void ()> thread = add_thread(i);
    spawn_worker(i, std::move(thread));
  }
}

void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
  threads.resize(i+1);
  threads[i] = std::thread(func);
}

Note that func is passed by add_thread() function, which returns a lambda function for thread execution:

src/msg/async/Stack.cc

std::function<void()> NetworkStack::add_thread(unsigned worker_id) {
  Worker* w = workers[worker_id];
  return [this, w]() {
    ...
    w->initialize();
    w->init_done();
    while (!w->done) {
      int r = w->center.process_events(EventMaxWaitUs, &dur);
    }
    w->reset();
    w->destroy();
  };
}

The number of threads to be created depends on Ceph configuration value ms_async_op_threads [^cephmsfconf]. The default value is 3, and you can override it by specifying the number in ceph.conf. Note that there also exists ms_async_max_op_threads configuration, which is by default 5, that limits the maximum number of threads.

Created threads create their own EventCenter and wait events in parallel, which will be discussed in the next subsection.

Socket subclasses #

Socket file descriptors are encapsulated into Ceph’s classes (ServerSocket and ConnectedSocket, which have ServerSocketImpl and ConnectedSocketImpl respectively), and for each type of network stack they are inherited:

  • PosixNetworkStack -> use PosixServerSocketImpl and PosixConnectedSocketImpl
  • RDMANetworkStack -> use RDMAServerSocketImpl and RDMAConnectedSocketImpl
  • DPDKNetworkStack -> use DPDKServerSocketImpl and DPDKConnectedSocketImpl

If OSD binds a server socket:

main() (ceph-osd.cc)
-> AsyncMessenger::bindv()
-> AsyncMessenger::Processor::bind()
-> PosixWorker::listen() (-> ::bind(), ::listen(), and new PosixServerSocketImpl())

src/msg/async/AsyncMessenger.cc and src/msg/async/PosixStack.cc

int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) {
  ...
  for (auto &&p : processors) {
    p->bind(bind_addrs, avoid_ports, &bound_addrs);
    ...
  }
}

int Processor::bind(const entity_addrvec_t &bind_addrs,
		    const std::set<int>& avoid_ports,
		    entity_addrvec_t* bound_addrs) {
  worker->center.submit_to(
    worker->center.get_id(),
    [this, k, &listen_addr, &opts, &r]() {
      r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
    },
    false
  );
  ...
}

// in src/msg/async/PosixStack.cc
int PosixWorker::listen(entity_addr_t& sa,
                        unsigned addr_slot,
                        const SocketOptions& opt,
                        ServerSocket* sock) {
  int listen_sd = net.create_socket(sa.get_family(), true);
  net.set_nonblock(listen_sd);
  net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);

  ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
  ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
  
  *sock = ServerSocket(std::unique_ptr<PosixServerSocketImpl>(
      new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
  return 0;
}

Note that ServerSocket and ConnectedSocket illustrated in connection subsection indicate these classes.

All I/O communication are passed to and from those socket classes. Will be discussed a little bit more detail below.

Sending messages #

Before going to the next subsection, let’s talk about sending messages, since worker threads are for receiving ones asynchronously.

Very simply, upper class instances (e.g. OSD) use Connection::send_message() to send a message to the peer.

src/osd/OSD.cc

bool OSD::ms_dispatch(Message *m) {
  ...
  _dispatch(m);
}

void OSD::_dispatch(Message *m) {
  switch (m->get_type()) {
  ...
  case MSG_COMMAND:
    handle_command(static_cast<MCommand*>(m));
    return;
  }
  ...
}

void OSD::handle_command(MCommand *m) {
  ConnectionRef con = m->get_connection();
  auto session = ceph::ref_cast<Session>(con->get_priv());
  if (!session) {
    con->send_message(new MCommandReply(m, -EACCES));
    m->put;
    return;
  }
  ...
}

Then, AsyncConnection internally uses ConnectedSocket to send data to the peer:

AsyncConnection::write()
-> AsyncConnection::_try_send()
-> PosixConnectedSocketImpl::send()
-> PosixConnectedSocketImpl::do_sendmsg()
-> ::sendmsg()

src/msg/async/AsyncConnection.cc and src/msg/async/PosixStack.cc

ssize_t AsyncConnection::write(ceph::buffer::list &bl,
                               std::function<void(ssize_t)> callback,
                               bool more) {
  ...
  outgoing_bl.claim_append(bl);
  ssize_t r = _try_send(more);
  if (r > 0) {
    writeCallback = callback;
  }
  return r;
}

ssize_t AsyncConnection::_try_send(bool more) {
  ...
  // type of variable cs: ConnectedSocket
  cs.send(outgoing_bl, more);
  ...
}

// src/msg/async/PosixStack.cc
ssize_t send(ceph::buffer::list &bl, bool more) override {
  ...
  uint64_t left_pbrs = bl.get_num_buffers();
  while (left_pbrs) {
    ...
    do_sendmsg(_fd, msg, msglen, left_pbrs || more);
    ...
  }
}

#ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
  size_t sent = 0;
  while (1) {
    r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
    sent += r;
    if (len == sent) break;
    ...
  }
  return (ssize_t) sent;
}

5. EventCenter and EventDriver #

EventCenter is responsible to pass events from remote peers to worker threads. A class instance is created and owned by a worker thread mentioned above. The worker threads use EventCenter to receive events (e.g. connection request from ServerSocket, and message arrive from ConnectedSocket, connection reset, etc).

Initialization #

At first, when PosixWorker instances are created, each class instance initializes EventCenter as well.

src/msg/async/PosixStack.h and src/msg/async/Stack.h

class PosixWorker : public Worker {
  ceph::NetHandle net;
  void initialize() override;
 public:
  PosixWorker(CephContext *c, unsigned i) : Worker(c, i), net(c) {}
  ...
};

class Worker {
  ...
  Worker(CephContext *c, unsigned worker_id)
    : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
    ...
  }
}

The Worker constructor initializes EventCenter member variable named center.

After creating EventCenter instances, all workers call center.process_events() infinitely to process pending events asynchronously:

src/msg/async/Stack.cc

// lambda function defined in NetworkStack::add_thread
// executed by worker threads
[this, w]() {
  ...
  w->initialize();
  w->init_done();
  while(!w->done()) {
    ...
    w->center.process_events(EventMaxWaitUs, &dur);
    ...
  }
};

EventCenter::process_events() internally uses EventDriver to receive events.

EventDriver #

EventDriver, like NetworkStack, is an abstracted class for event control mechanism. In Linux, Ceph uses epoll by default, kqueue for BSD, etc: it provides four types of EventDriver: EpollDriver, SelectDriver, KqueueDriver, and DPDKDriver.

src/msg/async/Event.cc and src/msg/async/EventEpoll.cc

int EventCenter::init(int nevent, unsigned center_id, const std::string &type) {
  if (type == "dpdk") {
#ifdef HAVE_DPDK
    driver = new DPDKDriver(cct);
#endif
  } else {
#ifdef HAVE_EPOLL
  driver = new EpollDriver(cct);
#else
#ifdef HAVE_KQUEUE
  driver = new KqueueDriver(cct);
#else
  driver = new SelectDriver(cct);
#endif
#endif
  }
  ...
}

int EventCenter::process_events(unsigned timeout_microseconds,  ceph::timespan *working_dur) {
  ...
  std::vector<FiredFileEvent> fired_events;
  numevents = driver->event_wait(fired_events, &tv);
  for (int event_id = 0; event_id < numevents; event_id++) {
    FileEvent *event = _get_file_event(fired_events[event_id].fd);
    cb->do_request(fired_events[event_id].fd);
  }

  ...
}

int EpollDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp) {
  int retval = epoll_wait(epfd, events, nevent,
                          tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
  ...
  return numevents;
}

when the worker thread receives an event, it calls the specified callback function cb->do_request(). What is it?

Event Callback (receiving messages) #

It is normal to say that we must register a callback function to be called when an event has been arrived. Ceph provides EventCallback class and asks them to implement a child class of EventCallback and override EventCallback::do_request(), the function that would be called by the worker thread when an event occurs.

For example, AsyncMessenger::Processor has Processor::C_processor_accept class, which is used to accept a connection request in an asynchronous manner:

src/msg/async/AsyncMessenger.cc

class Processor::C_processor_accept : public EventCallback {
  Processor *pro;

 public:
  explicit C_processor_accept(Processor *p): pro(p) {}
  void do_request(uint64_t id) override {
    pro->accept();
  }
};

void Processor::accept() {
  for (auto& listen_socket : listen_sockets) {
    while (true) {
      ConnectedSocket cli_socket;
      listen_socket.accept(&cli_socket, opts, &addr, w);
      msgr->add_accept(w, std::move(cli_socket),
                       msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
                       addr);
      ...
    }
  }
}

When a Processor class instance is created, a Processor::C_processor_accept class instance is created as well, and this is passed with server socket file descriptor to the EventCenter when the processor begins execution.

src/msg/async/AsyncMessenger.cc

Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
  : msgr(r), net(c), worker(w),
    listen_handler(new C_processor_accept(this)) {}

void Processor::start() {
  worker->center.submit_to(
      worker->center.get_id(),
      [this]() {
        worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
      },
      false
    );
  )
}

EventCenter::create_file_event() uses EventDriver again to register the passed file descriptor to I/O multiplexer (in this post, EPOLL):

src/msg/async/Event.cc and src/msg/async/EventEpoll.cc

int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
  ...
  driver->add_event(fd, event->mask, mask);
  ...
}

int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
  ...
  epoll_ctl(epfd, op, fd, &ee);
  ...
}

As aforementioned, the worker threads are waiting to receive any events from the epoll, and the other threads addds the file descriptor into the epoll and when an event occurs, worker threads would wake up and handle it by calling do_request() callback function.

For instance, accepting a connection request is handled as follows:

EventCenter::process_events() (-> EventDriver::event_wait() -> epoll_wait())
-> Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> ::accept_cloexec() and new PosixConnectedSocketImpl())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>() // AsyncConnection is created with the accepted socket fd and ConnectedSocket class instance

  1. Ceph ASyncMessenger 简析 I ↩︎

  2. Ceph Async Messenger ↩︎

  3. Ceph ASync Messenger ↩︎

  4. Ceph async network communication source code analysis 1 ↩︎

  5. Ceph AsyncMessenger source analysis (below) ↩︎

  6. Ceph network module - AsyncMessenger data structure analysis ↩︎

  7. Ceph network module - AsyncMessenger code flow analysis ↩︎