Analyzing Ceph Network Module
Table of Contents
This post explains how Ceph daemons and clients communicate with each other, with Ceph network architecture.
Ceph offical document provides a very high-level diagram that depicts the Ceph architecture:
However, I could not find out detailed documents explaining how clients with librados actually communicate with daemons, except a few blog post 1 2 3 4 5 6 7. Even after reading those, I was not clear how they work. Therefore, I visualize the execution flow for network communication of Ceph and summarizes the concept here.
1. Ceph Network Architecture Overview #
Explanations of three types of arrows used in the class diagram:
- solid line with white block arrow: indicates class inheritance.
- solid line with line arrow: indicates ownership.
- dot line with line arrow: indicates reference.
Roughly those classes can be grouped into four categories:
Messenger
,Connecttion
, andProcessor
Dispatcher
,DispatchQueue
, andDispatchThread
(DispatchThread
is not illustrated in the class diagram)NetworkStack
andWorker
EventCenter
andEventDriver
The function execution flows illustrated above is a rough illustration of function calls happening in connection establishment. First one is by OSD, and the other one is by librados. It would be helpful to see these flow diagram with the code below.
2. Messenger
, Connection
, and Processor
#
Messenger
#
Central to the architecture is Messenger
class. Currently only AsyncMessenger
is provided and used.
Messenger manages all the other class instances: connections, processors, a dispatch queue, a network stack (which manages workers, which owns an EventCenter, which owns an EventDriver), and dispatchers.
Every time it connected to a peer, it creates an AsyncConnection
class and returns it. To create a connection, AsyncMessenger
provides the following two functions:
connect_to()
: connect to peers with the givenentity_addrvec_t& addrs
address information.bindv()
: bind a server socket so that it can accept a connection establishment request.
Usage example (connect_to()
by client):
librados::RadosClient::connect()
-> MonClient::authenticate()
-> MonClient::_reopen_session()
-> MonClient::_add_conn()
-> Messenger::connect_to_mon()
-> AsyncMessenger::connect_to()
src/librados/RadosClient.cc
int librados::RadosClient::connect() {
...
monclient.build_initial_monmap();
messenger = Messenger::create_client_messenger(cct, "radosclient");
...
monclient.set_messenger(messenger);
messenger->add_dispatcher_tail(this);
messenger->start();
...
monclient.init();
monclient.authenticate(conf->client_mount_timeout);
...
}
src/mon/MonClient.cc
int MonClient::authenticate(double timeout) {
...
if (!_opened())
_reopen_session();
...
auth_cond.wait(lock);
...
}
void MonClient::_reopen_session(int rank) {
active_con.reset();
pending_cons.clear();
if (rank >= 0) _add_conn(rank, global_id);
else _add_conns(global_id);
...
for (auto& c : pending_cons) {
c.second.start(monmap.get_epoch(), entity_name);
}
...
}
int MonClient::_add_conn(unsigned rank, uint64_t global_id) {
auto peer = monmap.get_addrs(rank);
// type: ConnectionRef == ceph::ref_t<Connection>
// == boost::intrusive_ptr<Connection>
auto conn = messenger->connect_to_mon(peer);
...
}
src/msg/Messenger.h
// AsyncMessenger inherits this class
class Messenger {
...
virtual ConnectionRef connect_to(
int type, const entity_addrvec_t& dest,
bool anon=false, bool not_local_dest=false) = 0;
ConnectionRef connect_to_mon(const entity_addrvec_t& dest,
bool anon=false, bool not_local_dest=false) {
return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest);
}
ConnectionRef connect_to_mds(const entity_addrvec_t& dest,
bool anon=false, bool not_local_dest=false) {
return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest);
}
ConnectionRef connect_to_osd(const entity_addrvec_t& dest,
bool anon=false, bool not_local_dest=false) {
return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest);
}
ConnectionRef connect_to_mgr(const entity_addrvec_t& dest,
bool anon=false, bool not_local_dest=false) {
return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest);
}
...
}
Usage example (bindv()
by osd server):
main() (ceph-osd.cc)
-> AsyncMessenger::bindv()
src/ceph-osd.cc
int main(int argc, const char **argv) {
...
Messenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "client", nonce);
Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,
entity_name_t::OSD(whoami), "cluster", nonce);
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
entity_name_t::OSD(whoami), "hb_back_client", nonce);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "hb_front_client", nonce);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
entity_name_t::OSD(whoami), "hb_back_server", nonce);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "hb_front_server", nonce);
Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msg_type,
entity_name_t::OSD(whoami), "ms_objecter", nonce);
...
ms_public->bindv(public_addrs);
ms_cluster->bindv(cluster_addrs);
ms_hb_front_server->bindv(hb_front_addrs);
ms_hb_front_client->client_bind(hb_front_addrs.front());
ms_hb_back_server->bindv(hb_back_addrs);
ms_hb_back_client->client_bind(hb_back_addrs.front());
...
ms_public->start();
ms_hb_front_client->start();
ms_hb_back_client->start();
ms_hb_front_server->start();
ms_hb_back_server->start();
ms_cluster->start();
...
}
Connection
#
AsyncConnection maintains a logic session between two endpoints. In other words, a pair of addresses can find the only AsyncConnection. AsyncConnection will handle with network fault or read/write transactions. If one file descriptor broken, AsyncConnection will maintain the message queue and sequence, try to reconnect peer endpoint.
When a client connects to a peer server or a server accepts a connection request from a client peer, an AsyncConnection
class instance is created.
It is managed by AsyncMessenger
, and any read or write is at first handled by the AsyncConnection
.
AsyncMessenger::connect_to()
-> AsyncMessenger::create_connect()
-> ceph::make_ref<AsyncConnection>()
-> new ceph::ref_t<AsyncConnection>({new AsyncConnection(...), false})
== boost::intrusive_ptr<AsyncConnection>({new AsyncConnection(...), false})
src/msg/async/AsyncMessenger.cc
ConnectionRef AsyncMessenger::connect_to(int type,
const entity_addrvec_t& addrs,
bool anon, bool not_local_dest) {
...
AsyncConnectionRef conn = _lookup_conn(av);
if (!conn) {
conn = create_connect(av, type, false);
}
return conn;
}
AsyncConnectionRef AsyncMessenger::create_connect(
const entity_addrvec_t& addrs, int type, bool anon) {
...
Worker *w = stack->get_worker();
auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
target.is_msgr2(), false);
conn->connect(addrs, type, target);
return conn;
}
Server processes create an AsyncConnection
when it receives a connection establishment request, which means we need to discuss more about asynchronous mechanism how accept()
callback is called for full understanding.
Here, however, let’s just show a simple class instantiation only.
Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> accept_cloexec())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>()
src/msg/async/AsyncMessenger.cc
class Processor::C_processor_accept : public EventCallback {
Processor *pro;
explicit C_processor_accept(Processor *p): pro(p) {}
void do_request(uint64_t id) override {
pro->accept();
}
};
void Processor::accept() {
...
for (auto& listen_socket : listen_sockets) {
...
ConnectedSocket cli_socket;
listen_socket.accept(&cli_socket, opts, &addr, w);
msgr->add_accept(
w, std::move(cli_socket),
msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
addr);
continue;
}
}
// listen_socket.accept() in src/msg/async/PosixStack.cc
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
...
std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
*sock = ConnectedSocket(std::move(csi));
return 0;
}
// msgr->add_accept() in src/msg/async/AsyncMessenger.cc
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
const entity_addr_t &listen_addr,
const entity_addr_t &peer_addr) {
auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w,
listen_addr.is_msgr2(), false);
conn->accept(std::move(cli_socket), listen_addr, peer_addr);
accepting_conns.insert(conn);
}
Processor
#
Currently, Processor
seems to be used only by servers, and not used by clients.
It is responsible for:
bind()
: bind and listen to a socket so that the socket can receive a request.accept()
: handle accept callback event.
When binding a server socket:
AsyncMessenger::bindv()
-> Processor::bind()
-> PosixWorker::listen()
-> ::bind() and ::listen() and new PosixServerSocketImpl()
src/msg/async/AsyncMessenger.cc
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) {
...
for (auto &&p : processors) {
p->bind(bind_addrs, avoid_ports, &bound_addrs);
}
...
}
int Processor::bind(const entity_addrvec_t &bind_addrs,
const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs) {
for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
worker->center.submit_to(
worker->center.get_id(),
[this, k, &listen_addr, &opts, &r]() {
r = worker->listen(listen_addr, k, opts, &listen_socket[k]);
}, false
);
}
}
Now the lambda function that contains worker->listen()
call is submitted to the EventCenter
and will be executed asynchronously later, but let’s see just how it looks first. Will be discussed the asynchronous mechanism later.
src/msg/async/PosixStack.cc
int PosixWorker::listen(entity_addr_t &sa, unsigned addr_slot,
const SocketOptions &opt, ServerSocket *sock) {
int listen_sd = net.create_socket(sa.get_family(), true);
::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
*sock = ServerSocket(
std::unique_ptr<PosixServerSocketImpl>(
new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
return 0;
}
And the created socket file descriptor has an event, EventCenter
calls Processor::C_processor_accept::do_request()
as a callback (Will discuss it later too).
This callback function actually accepts a request and create a ConnectedSocket
and an AsyncConnection
.
Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> accept_cloexec(), and new PosixConnectedSocketImpl())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>()
This process is already handled in Messenger
section, so I would skip duplicated explanation here.
3. Dispatcher
, DispatchQueue
, and DispatchThread
#
Dispatcher
is a virtual class that declares several callback handlers, and several classes inherit Dispatcher
class and override the callback handlers to implement their functionalities.
For example, OSD (src/osd/OSD.h)
inherits Dispatcher
and implements ms_dispatch(Message *m)
to handle messages from clients.
librados::RadosClient (src/librados/RadosClient.h)
also inherits Dispatcher
to wait replies from server processes.
src/osd/OSD.h
class OSD : public Dispatcher, public md_config_obs_t {
private:
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch(const Message *m) const override {
...
}
void ms_fast_dispatch(Message *m) override;
bool ms_dispatch(Message *m) override;
void ms_handle_connect(Connection *con) override;
void ms_handle_fast_connect(Connection *con) override;
void ms_handle_fast_accept(Connection *con) override;
int ms_handle_authentication(Connection *con) override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
...
}
src/librados/RadosClient.h
class librados::RadosClient : public Dispatcher, public md_config_obs_t {
private:
bool _dispatch(Message *m);
bool ms_dispatch(Message *m) override;
void ms_handle_connect(Connection *con) override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override;
bool ms_handle_refused(Connection *con) override;
...
}
The ms_handle_*()
functions will be called by DispatchThread
, when a queue item is enqueued into the DispatchQueue
.
DispatchThread
and an actual thread is created when you call Messenger::add_dispatcher_head()
or Messenger::add_dispatcher_tail()
for the first time: these functions are required to be called by dispatchers so that Messenger
can call registered dispatcher functions.
For OSD daemon, an OSD
class instance (which inherited Dispatcher
) adds itself to an AsyncMessenger
class instance by calling add_dispatcher_tail(this)
during OSD initialization:
main() (ceph-osd.cc)
-> OSD::init()
-> AsyncMessenger::add_dispatcher_tail()
-> AsyncMessenger::ready()
-> DispatchQueue::start()
-> Thread::create()
-> Thread::try_create()
-> pthread_create()
-> (thread) Thread::entry_wrapper()
-> (thread) DispatchThread::entry()
-> (thread) DispatchQueue::entry()
src/msg/Messenger.h
void Messenger::add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_back(d);
if (first)
ready();
}
src/msg/async/AsyncMessenger.cc
void AsyncMessenger::ready() {
stack->ready();
...
dispatch_queue.start(); // Here, dispatch queue is started.
}
src/msg/DispatchQueue.cc
void DisaptchQueue::start() {
...
dispatch_thread.create("ms_dispatch");
}
src/common/Thread.cc
// DispatchThread inherits Thread.
void Thread::create(const char *name, size_t stacksize) {
...
try_create(stacksize);
}
int Thread::try_create(size_t stacksize) {
...
pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
...
}
void *Thread::_entry_func(void *arg) {
void *r = ((Thread*)arg)->entry_wrapper();
return r;
}
void *Thread::entry_wrapper() {
...
return entry();
}
// Note that entry() of DispatchThread is overridden:
// in src/msg/DispatchQueue.h
class DispatchThread : public Thread {
DispatchQueue *dq;
public:
explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
void *entry() override {
dq->entry();
return 0;
}
} dispatch_thread;
Note that dq->entry()
calls DispatchQueue::entry()
, which runs in an additional dispatch thread, and it handles enqueued items by calling Messenger::ms_deliver_handle_*()
functions that actually calls Dispatcher
s' callback functions:
src/msg/DispatchQueue.cc
void DispatchQueue::entry() {
while (true) {
while (!mqueue.empty()) {
QueueItem qitem = mqueue.dequeue();
...
if (qitem.is_code()) {
switch (qitem.get_code()) {
case D_BAD_REMOTE_RESET:
msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
break;
case D_CONNECT:
msgr->ms_deliver_handle_connect(qitem.get_connection());
break;
case D_ACCEPT:
msgr->ms_deliver_handle_accept(qitem.get_connection());
break;
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
case D_CONN_REFUSED:
msgr->ms_deliver_handle_refused(qitem.get_connection());
break;
default:
ceph_abort();
}
} else {
const ref_t<Message>& m = qitem.get_message();
uint64_t msize = pre_dispatch(m);
msgr->ms_deliver_dispatch(m);
post_dispatch(m, msize);
}
}
l.lock();
}
if (stop)
break;
// wait for something to be put on queue
cond.wait(l);
}
For instance, if a message is delivered, msgr->ms_deliver_dispatch(m)
is called:
src/msg/Messenger.h
void Messenger::ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : dispatchers) {
dispatcher->ms_dispatch2(m);
}
}
// Note that ms_dispatch2 is a tricky implementation that actually calls ms_dispatch.
// src/msg/Dispatcher.h
/* ms_dispatch2 because otherwise the child must define both */
virtual bool Dispatcher::ms_dispatch2(const MessageRef &m) {
MessageRef mr(m);
ms_dispatch(mr.get());
}
Here, we know that Dispatcher
class instances added themselves into the Messenger
, so they are stored in dispatchers
variable: the Messenger
iterates those added dispatchers and call the implemented ms_dispatch()
function.
4. NetworkStack
and Worker
#
NetworkStack
represents which network stack you want to use for communication. Currently Ceph provides POSIX, RDMA, and DPDK (PosixNetworkStack
, RDMANetworkStack
, and DPDKNetworkStack
, respectively).
PosixNetworkStack
, for example, uses POSIX library APIs for network communication (e.g. accept_cloexec()
, connect()
, sendmsg()
, and read()
).
NetworkStack spawns multiple worker threads with Worker
class instance.
These threads are independent from dispatch thread; Worker
threads are responsible to wait any messages from peers and send messages to them.
NetworkStack
and Worker
s are created when AsyncMessenger
is created.
// Clients uses Messenger::create_client_messenger() to call AsyncMessenger::create().
// Servers directly calls AsyncMessenger::create().
AsyncMessenger::create() (new AsyncMessenger() -> NetworkStack::create() -> new PosixNetworkStack() and NetworkStack::create())
-> PosixNetworkStack::start()
-> PosixNetworkStack::spawn_worker()
-> std::thread()
-> (thread) lambda function defined in NetworkStack::add_thread()
src/msg/Messenger.cc
and src/msg/async/AsyncMessenger.cc
// in src/msg/Messenger.cc
Messenger *Messenger::create(CephContext *cct, const std::string &type,
entity_name_t name, std::string lname, uint64_t nonce) {
return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
}
// in src/msg/async/AsyncMessenger.cc
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
const std::string &type, std::string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name),
dispatch_queue(cct, this, mname),
nonce(_nonce) {
std::string transport_type = "posix";
// StackSingletone ensures that there is exactly only one network stack in the process.
// Return type: StackSingleton&
auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
"AsyncMessenger::NetworkStack::" + transport_type, true, cct);
single->ready();
stack = single->stack.get();
stack->start();
...
}
struct StackSingletone {
...
void ready(std::string &type) {
if (!stack) stack = NetworkStack::create(cct, type);
}
};
src/msg/async/Stack.cc
and src/msg/async/PosixStack.h
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
const std::string &t) {
std::shared_ptr<NetworkStack> stack = nullptr;
if (t == "posix") stack.reset(new PosixNetworkStack(c));
...
for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
Worker *w = stack->create_worker(c, worker_id);
...
}
return stack;
}
Worker *PosixNetworkStack::create_worker(CephContext *c, unsigned worker_id) override {
return new PosixWorker(c, worker_id);
}
src/msg/async/Stack.cc
and src/msg/async/PosixStack.h
void NetworkStack::start() {
...
for (unsigned i = 0; i < num_workers; ++i) {
if (workers[i]->is_init()) continue;
std::function<void ()> thread = add_thread(i);
spawn_worker(i, std::move(thread));
}
}
void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
threads[i] = std::thread(func);
}
Note that func
is passed by add_thread()
function, which returns a lambda function for thread execution:
src/msg/async/Stack.cc
std::function<void()> NetworkStack::add_thread(unsigned worker_id) {
Worker* w = workers[worker_id];
return [this, w]() {
...
w->initialize();
w->init_done();
while (!w->done) {
int r = w->center.process_events(EventMaxWaitUs, &dur);
}
w->reset();
w->destroy();
};
}
The number of threads to be created depends on Ceph configuration value ms_async_op_threads
[^cephmsfconf]. The default value is 3, and you can override it by specifying the number in ceph.conf.
Note that there also exists ms_async_max_op_threads
configuration, which is by default 5, that limits the maximum number of threads.
Created threads create their own EventCenter
and wait events in parallel, which will be discussed in the next subsection.
Socket subclasses #
Socket file descriptors are encapsulated into Ceph’s classes (ServerSocket
and ConnectedSocket
, which have ServerSocketImpl
and ConnectedSocketImpl
respectively), and for each type of network stack they are inherited:
PosixNetworkStack
-> usePosixServerSocketImpl
andPosixConnectedSocketImpl
RDMANetworkStack
-> useRDMAServerSocketImpl
andRDMAConnectedSocketImpl
DPDKNetworkStack
-> useDPDKServerSocketImpl
andDPDKConnectedSocketImpl
If OSD
binds a server socket:
main() (ceph-osd.cc)
-> AsyncMessenger::bindv()
-> AsyncMessenger::Processor::bind()
-> PosixWorker::listen() (-> ::bind(), ::listen(), and new PosixServerSocketImpl())
src/msg/async/AsyncMessenger.cc
and src/msg/async/PosixStack.cc
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs) {
...
for (auto &&p : processors) {
p->bind(bind_addrs, avoid_ports, &bound_addrs);
...
}
}
int Processor::bind(const entity_addrvec_t &bind_addrs,
const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs) {
worker->center.submit_to(
worker->center.get_id(),
[this, k, &listen_addr, &opts, &r]() {
r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
},
false
);
...
}
// in src/msg/async/PosixStack.cc
int PosixWorker::listen(entity_addr_t& sa,
unsigned addr_slot,
const SocketOptions& opt,
ServerSocket* sock) {
int listen_sd = net.create_socket(sa.get_family(), true);
net.set_nonblock(listen_sd);
net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
*sock = ServerSocket(std::unique_ptr<PosixServerSocketImpl>(
new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
return 0;
}
Note that
ServerSocket
andConnectedSocket
illustrated in connection subsection indicate these classes.
All I/O communication are passed to and from those socket classes. Will be discussed a little bit more detail below.
Sending messages #
Before going to the next subsection, let’s talk about sending messages, since worker threads are for receiving ones asynchronously.
Very simply, upper class instances (e.g. OSD
) use Connection::send_message()
to send a message to the peer.
src/osd/OSD.cc
bool OSD::ms_dispatch(Message *m) {
...
_dispatch(m);
}
void OSD::_dispatch(Message *m) {
switch (m->get_type()) {
...
case MSG_COMMAND:
handle_command(static_cast<MCommand*>(m));
return;
}
...
}
void OSD::handle_command(MCommand *m) {
ConnectionRef con = m->get_connection();
auto session = ceph::ref_cast<Session>(con->get_priv());
if (!session) {
con->send_message(new MCommandReply(m, -EACCES));
m->put;
return;
}
...
}
Then, AsyncConnection
internally uses ConnectedSocket
to send data to the peer:
AsyncConnection::write()
-> AsyncConnection::_try_send()
-> PosixConnectedSocketImpl::send()
-> PosixConnectedSocketImpl::do_sendmsg()
-> ::sendmsg()
src/msg/async/AsyncConnection.cc
and src/msg/async/PosixStack.cc
ssize_t AsyncConnection::write(ceph::buffer::list &bl,
std::function<void(ssize_t)> callback,
bool more) {
...
outgoing_bl.claim_append(bl);
ssize_t r = _try_send(more);
if (r > 0) {
writeCallback = callback;
}
return r;
}
ssize_t AsyncConnection::_try_send(bool more) {
...
// type of variable cs: ConnectedSocket
cs.send(outgoing_bl, more);
...
}
// src/msg/async/PosixStack.cc
ssize_t send(ceph::buffer::list &bl, bool more) override {
...
uint64_t left_pbrs = bl.get_num_buffers();
while (left_pbrs) {
...
do_sendmsg(_fd, msg, msglen, left_pbrs || more);
...
}
}
#ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
size_t sent = 0;
while (1) {
r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
sent += r;
if (len == sent) break;
...
}
return (ssize_t) sent;
}
5. EventCenter
and EventDriver
#
EventCenter
is responsible to pass events from remote peers to worker threads. A class instance is created and owned by a worker thread mentioned above.
The worker threads use EventCenter
to receive events (e.g. connection request from ServerSocket
, and message arrive from ConnectedSocket
, connection reset, etc).
Initialization #
At first, when PosixWorker
instances are created, each class instance initializes EventCenter
as well.
src/msg/async/PosixStack.h
and src/msg/async/Stack.h
class PosixWorker : public Worker {
ceph::NetHandle net;
void initialize() override;
public:
PosixWorker(CephContext *c, unsigned i) : Worker(c, i), net(c) {}
...
};
class Worker {
...
Worker(CephContext *c, unsigned worker_id)
: cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
...
}
}
The Worker
constructor initializes EventCenter
member variable named center
.
After creating EventCenter
instances, all workers call center.process_events()
infinitely to process pending events asynchronously:
src/msg/async/Stack.cc
// lambda function defined in NetworkStack::add_thread
// executed by worker threads
[this, w]() {
...
w->initialize();
w->init_done();
while(!w->done()) {
...
w->center.process_events(EventMaxWaitUs, &dur);
...
}
};
EventCenter::process_events()
internally uses EventDriver
to receive events.
EventDriver #
EventDriver
, like NetworkStack
, is an abstracted class for event control mechanism. In Linux, Ceph uses epoll
by default, kqueue
for BSD, etc: it provides four types of EventDriver
: EpollDriver
, SelectDriver
, KqueueDriver
, and DPDKDriver
.
src/msg/async/Event.cc
and src/msg/async/EventEpoll.cc
int EventCenter::init(int nevent, unsigned center_id, const std::string &type) {
if (type == "dpdk") {
#ifdef HAVE_DPDK
driver = new DPDKDriver(cct);
#endif
} else {
#ifdef HAVE_EPOLL
driver = new EpollDriver(cct);
#else
#ifdef HAVE_KQUEUE
driver = new KqueueDriver(cct);
#else
driver = new SelectDriver(cct);
#endif
#endif
}
...
}
int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *working_dur) {
...
std::vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
for (int event_id = 0; event_id < numevents; event_id++) {
FileEvent *event = _get_file_event(fired_events[event_id].fd);
cb->do_request(fired_events[event_id].fd);
}
...
}
int EpollDriver::event_wait(std::vector<FiredFileEvent> &fired_events, struct timeval *tvp) {
int retval = epoll_wait(epfd, events, nevent,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
...
return numevents;
}
when the worker thread receives an event, it calls the specified callback function cb->do_request()
.
What is it?
Event Callback (receiving messages) #
It is normal to say that we must register a callback function to be called when an event has been arrived.
Ceph provides EventCallback
class and asks them to implement a child class of EventCallback
and override EventCallback::do_request()
, the function that would be called by the worker thread when an event occurs.
For example, AsyncMessenger::Processor
has Processor::C_processor_accept
class, which is used to accept a connection request in an asynchronous manner:
src/msg/async/AsyncMessenger.cc
class Processor::C_processor_accept : public EventCallback {
Processor *pro;
public:
explicit C_processor_accept(Processor *p): pro(p) {}
void do_request(uint64_t id) override {
pro->accept();
}
};
void Processor::accept() {
for (auto& listen_socket : listen_sockets) {
while (true) {
ConnectedSocket cli_socket;
listen_socket.accept(&cli_socket, opts, &addr, w);
msgr->add_accept(w, std::move(cli_socket),
msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
addr);
...
}
}
}
When a Processor
class instance is created, a Processor::C_processor_accept
class instance is created as well, and this is passed with server socket file descriptor to the EventCenter
when the processor begins execution.
src/msg/async/AsyncMessenger.cc
Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
: msgr(r), net(c), worker(w),
listen_handler(new C_processor_accept(this)) {}
void Processor::start() {
worker->center.submit_to(
worker->center.get_id(),
[this]() {
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
},
false
);
)
}
EventCenter::create_file_event()
uses EventDriver
again to register the passed file descriptor to I/O multiplexer (in this post, EPOLL):
src/msg/async/Event.cc
and src/msg/async/EventEpoll.cc
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
...
driver->add_event(fd, event->mask, mask);
...
}
int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
...
epoll_ctl(epfd, op, fd, &ee);
...
}
As aforementioned, the worker threads are waiting to receive any events from the epoll, and the other threads addds the file descriptor into the epoll and when an event occurs, worker threads would wake up and handle it by calling do_request()
callback function.
For instance, accepting a connection request is handled as follows:
EventCenter::process_events() (-> EventDriver::event_wait() -> epoll_wait())
-> Processor::C_processor_accept::do_request()
-> Processor::accept() (-> PosixServerSocketImpl::accept() -> ::accept_cloexec() and new PosixConnectedSocketImpl())
-> AsyncMessenger::add_accept()
-> ceph::make_ref<AsyncConnection>() // AsyncConnection is created with the accepted socket fd and ConnectedSocket class instance