The series of “analyzing ceph network module” posts explains how Ceph daemons communicate with each other. This post explains the network architecture overview.
A Ceph storage cluster consists of roughly four types of daemons: OSD Daemons (osd), Ceph Monitors (mon), Ceph Managers (mgr), and Ceph Metadata Servers (mds).
Ceph offical document provides a very high-level diagram that depicts the Ceph architecture:
But, how specifically those libraries (librbd
, librgw
, libcephfs
, librados
) are implemented to communicate with underlying Ceph daemons in RADOS?
How the daemons communicate with each other to manage the Ceph cluster?
This post summarizes posts regarding this question into one 1 2 3 4 5 6 7.
1. Ceph Client Network Architecture Overview
Ceph clients use Messenger
class (currently the only used Messenger is AsyncMessenger
) to talk to the Ceph cluster.
Ceph users can use the Ceph cluster by using either librgw
, librbd
, cephfs (either ceph kernel or ceph-fuse
, I will explain latter only), or librados
. All libraries except cephfs use librados
in their underlying system, which uses AsyncMessenger
for communication.
As you can see, MonClient
and MgrClient
, which are used to connect to ceph-mon
or ceph-mgr
, also use AsyncMessenger
.
In other words, all daemons also use Messenger
class to communicate with each other.
Detailed communication flow will be explained later.
2. Ceph Network Module Internal Architecture
To understand how clients and daemons communicate, we first know the internal network architecture that is used in Ceph. The figure above is a roughly drawn Ceph network architecture class diagram. Note that it is not a pure UML class diagram; the types of arrows mean:
- solid line with white block arrow: indicates class inheritance.
- solid line with line arrow: indicates ownership.
- dot line with line arrow: indicates reference.
The classes that I will focus on are 1. AsyncMessenger
, 2. NetworkStack
and Worker
, 3. Processor
, 4. EventCenter
, and 5. AsyncConnection
.
Those classes are entangled and interact with each other to implement an asynchronous network mechanism.
1. AsyncMeseenger
AsyncMessenger is represented for maintaining a set of asynchronous connections, it may own a bind address and the accepted connections will be managed by AsyncMessenger.
ceph/src/msg/async/AsyncMessenger.h
AsyncMessenger
manages the connections with ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns
after establishing a connection via Processor
class.
In its constructor, it prepares a lower network control plane by initializing NetworkStack
and Processor
classes.
AsyncMessenger
is the owner of the classes and is responsible to manage them.
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
const std::string &type, std::string mname, uint64_t _nonce) {
...
auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
"AsyncMessenger::NetworkStack::" + transport_type, true, cct);
// NetworkStack::create() is called in StackSingleton::ready().
single->ready(transport_type);
stack = single->stack.get();
stack->start();
...
unsigned processor_num = 1;
if (stack->support_local_listen_table())
processor_num = stack->get_num_worker();
for (unsigned i = 0; i < processor_num; ++i)
processors.push_back(new Processor(this, stack->get_worker(i), cct));
}
2. NetworkStack and Worker
NetworkStack
is an abstracted class of currently used network stack. Ceph provides posix, RDMA, and DPDK network stack.
This post only explains posix network stack: use POSIX socket API (bind()
, listen()
, send()
, recv()
, etc).
NetworkStack
constructor internally creates Worker
class instances running in seperate threads. Worker threads make the main thread free from blocked wait mechanism (such as epoll_wait()
).
NetworkStack
is the owner of all corresponding Worker
instances and is responsible to manage them.
src/msg/async/Stack.h
class NetworkStack {
std::string type;
// Overridden by PosixStack, RDMAStack, and DPDKStack.
virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
protected:
std::vector<Wroker*> workers;
explicit NetworkStack(CephContext *c, const std::string &t);
public:
static std::shared_ptr<NetworkStack> create(CephContext *c, const std::string &type);
};
// Static function. Stack can only be instantiated through this function.
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const std::string &t) {
std::shared_ptr<NetworkStack> stack = nullptr;
// Create a proper stack regarding the type t
if (t == "posix")
stack.reset(new PosixNetworkStack(c, t));
...
const int InitEventNumber = 5000;
for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
// Calls PosixNetworkStack->create_worker(), which creates a new PosixWorker.
Worker *w = stack->create_worker(c, worker_id);
w->center.init(InitEventNumber, worker_id, t);
stack->workers.push_back(w);
}
return stack;
}
Threads are not created in NetworkStack
constructor, but in NetworkStack::start()
. AsyncMessenger
calls the function after creating a NetworkStack
class instance.
src/msg/async/Stack.cc
, src/msg/async/PosixStack.h
void NetworkStack::start() {
...
for (unsigned i = 0; i < num_workers; ++i) {
if (workers[i]->is_init())
continue;
std::function<void ()> thread = add_thread(i);
spawn_worker(i, std::move(thread));
}
}
std::function<void ()> NetworkStack::add_thread(unsigned worker_id) {
Worker *w = workers[worker_id];
return [this, w]() {
...
while (!w->done) {
// Calls epoll_wait with EpollDriver.
w->center.process_events(EventMaxWaitUs, &dur);
}
w->reset();
w->destroy();
};
}
void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
threads[i] = std::thread(func);
}
I will explain later, but this post assumes we are using Epoll for event handling mechanism. w->center.process_events()
internally calls epoll_wait()
, which is blocked, and wakes up when it receives an event.
When it receives a callback, it calls do_request()
function of the given callback reference variable, which in this case, listen_handler
variable of Processor
class instance.
src/msg/async/AsyncMessenger.cc
class Processor::C_processor_accept : public EventCallback {
Processor *pro;
public:
explicit C_processor_accept(Processor *p): pro(p) {}
void do_request(uint64_t id) override {
pro->accept();
}
};
Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
: msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {}
3. Processor
Processor
is managed by AsyncMessenger
, and is responsible for
- bind and listen a socket,
- create a callback instance when an asynchronous event arrives,
- and accept a request for connection establishment.
src/msg/async/AsyncMessenger.cc
int Processor::bind(const entity_addrvec_t &bind_addrs,
const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs) {
...
for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
auto& listen_addr = bound_addrs->v[k];
worker->center.submit_to(worker->center.get_id(),
[this, k, &listen_addr, &opts, &r]() {
// In this case, it calls PosixWorker::listen().
worker->listen(listen_addr, k, opts, &listen_sockets[k]);
},
false
);
}
...
}
int Processor::start() {
...
worker->center.submit_to(worker->center.get_id(),
[this]() {
for (auto& listen_socket : listen_sockets) {
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
}
},
false
);
}
Processor:accept()
is called when a connection request arrives, by listen_handler
member variable of Processor
class instance.
void Processor::accept() {
...
ConnectedSocket cli_socket;
Worker *w = worker;
listen_socket.accept(&cli_socket, opts, &addr, w);
msgr->add_accept(w, std::move(cli_socket), msgr->get_myaddrs().v[listen_socket.get_addr_slot()], addr);
...
}
4. EventCenter
As you saw earlier, Processor
uses EventCenter
to register its callback handler for asynchronous events. EventCenter
manages asynchronous event mechanism by using EventDriver
, which is inherited by EpollDriver
, DPDKDriver
, KqueueDriver
, and SelectDriver
.
In Linux, EpollDriver
is used by default.
When Processor
class is started by AsyncMessenger
, it calls create_file_event()
of EventCenter
class.
It internally calls driver’s add_event()
, which calls epoll_add()
in EpollDriver
.
src/msg/async/AsyncMessenger.cc
, src/msg/async/Event.cc
, src/msg/async/EventEpoll.cc
void Processor::start() {
worker->center.submit_to(worker->center.get_id(), [this]() {
...
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
}, false);
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
...
driver->add_event(fd, event->mask, mask);
...
}
int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
struct epoll_event ee;
int op = cur-mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = EPOLLET;
if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN;
if (add_mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0;
ee.data.fd = fd;
epoll_ctl(epfd, op, fd, &ee);
...
}
Note that executing
epoll_ctl
while another thread already calledepoll_wait
makes no problem. Refer to this.While one thread is blocked in a call to epoll_pwait(), it is possible for another thread to add a file descriptor to the waited-upon epoll instance.
Therefore, while worker threads are already waiting epoll_wait() to be returned, we can add a file descriptor into the epoll.
5. AsyncConnection
This class is not explained in this post. After a connection is established, Processor::accept()
creates a ConnectedSocket
and AsyncConnection
class instance.
Will be discussed in the next post.