
High level Ceph architecture. [src]
In the previous posts 1 2, I illustrated how we establishes a connection between a server and a client using AsyncMessenger
functions.
However, in high level Ceph architecture, there are some more abstracted APIs that are useful for specific purpose, e.g., accessing ceph-mon or ceph-osd, etc.
This post illustrates how MonClient
is implemented.
MonClient
MonClient, considering its name, is a class that can be used to connect a Ceph monitor daemon.
Which client library a user uses to use Ceph, all libraries use MonClient
to get a cluster map of the Ceph cluster from a Ceph monitor daemon.
librados
:librados::RadosClient
has aMonClient
class instance to access the daemon.librgw
:RGWRados
useslibrados::IoCtx
andlibrados::Rados
, which useslibrados::RadosClient
to connect to RADOS.librbd
:librbd::ImageCtx
useslibrados::IoCtx
to connect to RADOS.libcephfs
:Client
usesMonClient
directly to connect to RADOS.

MonClient
internal execution flow. It internally uses AsyncMessenger
for communication, which has been illustrated in the previous post. Note the bold function calls Messenger::connect_to()
and AsyncConnection::send_message()
.
1. Connecting to a monitor
A client needs to provide an information about a target monitor daemon. There are several ways to provide the information:
- use
--mon-host-override <ip>
command-line argument ormon_host_override
config option. - use a file located in the path specified by
monmap
config option. - use
fsid
config option (which I don’t know much). - use
mon_host
config option.
src/mon/MonClient.cc
int MonMap::build_initial(CephContext *cct, bool for_mkfs, ostream& errout) {
const auto& conf = cct->_conf;
// mon_host_override?
auto mon_host_override = conf.get_val<std::string>("mon_host_override");
if (!mon_host_override.empty()) {
init_with_ips(mon_host_override, for_mkfs, "noname-");
// or
init_with_hosts(mon_host_override, for_mkfs, "noname-");
}
// or, cct?
auto addrs = cct->get_mon_addrs();
init_with_addrs(*addrs, for_mkfs, "noname-");
// or, file?
const auto monmap = conf.get_val<std::string>("monmap");
init_with_monmap(monmap, errout);
// fsid from conf (omitted)
// -m foo?
const auto mon_host = conf.get_val<std::string>("mon_host");
init_with_ips(mon_host, for_mkfs, "noname-");
// or
init_with_hosts(mon_host, for_mkfs, "noname-");
...
}
void MonMap::init_with_addrs(const std::vector<entity_addrvec_t>& addrs, bool for_mkfs, std::string_view prefix) {
for (auto& addr: addrs) {
add(name, addr, 0);
}
}
// In src/mon/MonMap.h
void add (const mon_info_t& m) {
...
mon_info[m.name] = m;
ranks.push_back(m.name);
calc_addr_mons();
}
void calc_addr_mons() {
addr_mons.clear();
for (auto& p : mon_info) {
for (Auto& a : p.second.public_addrs.v) {
addr_mons[a] = p.first; // type of addr_mons: std::map<entity_addr_t, std::string>
}
}
}
In RadosClient::connect()
it just creates an AsyncMessenger
class instance by calling Messenger::create_client_messenger()
.
Interestingly, an actual connection is not established in RadosClient::connect()
. A connection seems to be established whenever a client sends a command.
So whenever you call RadosClient::mon_command()
or RadosClient::mon_command::async()
, it connects to the monitor that has the closest rank by calling AsyncMessenger::connect_to_mon(monmap.get_addrs(...))
.
src/mon/MonClient.h
, src/mon/MonClient.cc
class MonClient : public Dispatcher, public AuthClient, public AuthServer {
public:
template<typename CompletionToken>
auto start_mon_command(const std::vector<std::string>& cmd, const cepb::buffer::list& inbl, CompletionToken&& token) {
boost::asio::async_completion<CompletionToken, CommandSig> init(token);
auto h = CommandCompletion::create(service.get_executor(), std::move(init.completion_handler));
auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
r->cmd = cmd;
r->inbl = inbl;
mon_commands.emplace(r->tid, r);
_send_command();
}
return init.result.get();
...
};
// In src/mon/MonClient.cc
void MonClient::_send_command(MonCommand *r) {
...
r->target_con = messenger->connect_to_mon(monmap.get_addrs(r->target_rank), true /* anon */);
r->target_session.reset(new MonConnection(cct, r->target_con, 0, *auth_registry));
r->target_session->start(monmap.get_epoch(), entity_name);
MCommand *m = new MCommand(monmap.fsid);
m->set_tid(r->tid);
m->cmd = r->cmd;
m->set_data(r->inbl);
r->target_session->queue_command(m);
}
MonConnection::queue_command()
actually does not use a queue; instead, it stores the message into the pending variable pending_tell_command
:
src/mon/MonClient.h
class MonConnection {
...
void queue_command(Message *m) {
pending_tell_command = m;
}
};
The stored pending command will be executed after authentication is done.
This is a bit tricky that I could not fully understand it yet. Ceph seems to use Protocol
class for authentication.

Authentication handling function execution flow. Protocol class is implemented with a lot of preprocessors that I could not follow the execution well.
Here, the diagram above explains how the stored pending_tell_command
is sent to the server. After authentication is done, several nested callback handlers are executed, and MonConnection
calls AsyncConnection::send_message2()
to send the stored command to the server at last.
2. Sending a monitor command
I found out that AsyncConnection::send_message2()
is used to send a command to the server. Let’s dig into this function.

Sending message flow.
AsyncConnection::send_message2()
has been inherited from its parent class Connection
(src/msg/Connection.h), which calls a pure virtual function send_message()
. In this case, AsyncConnection::send_message()
will be called.
src/msg/async/AsyncConnection.cc
int AsyncConnection::send_message(Message *m) {
...
if (!m->get_priority()) {
m->set_priority(async_msgr->get_Default_send_priority());
}
m->get_header().src = async_msgr->get_myname();
m->set_connection(this);
...
protocol->send_message(m);
return 0;
}
It again uses Protocol
class instance to send a message.
src/msg/async/ProtocolV2.cc
, src/msg/async/AsyncConnection.cc
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::send_message(Message *m) {
...
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(out_queue_entry_t{is_prepared, m});
if (((!replacing && can_wirte) || state == STANDBY) && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
// in src/msg/async/AsyncConnection.cc
AsyncConnection::AsyncConnection(...) : ... {
...
write_handler = new C_handler_write(this); // This callback is used by ProtocolV2::send_message().
...
}
class C_handle_write : public EventCallback {
AsyncConnectionRef conn;
public:
explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
void do_request(uint64_t fd) override {
conn->handle_write();
}
}
void AsyncConnection::handle_write() {
protocol->write_event();
}
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::write_event() {
...
do {
// deduced type: ProtocolV2::out_queue_entry_t
const auto out_entry = _get_next_outgoing();
...
// send message or requeue messages amy not encode message
if (!out_entry.is_prepared) {
prepare_send_message(connection->get_features(), out_entry.m);
}
r = write_message(out_entry.m, more);
} while (can_write);
write_in_progress = false;
...
}
ssize_t ProtocolV2::write_message(Message *m, bool more) {
...
ceph_msg_header &header = m->get_header();
ceph_msg_footer &footer = m->get_footer();
ceph_msg_header2 header2 {header.seq, header.tid,
header.type, header.priority,
header.version,
init_le32(0), header.data_off,
init_le64(ack_sqe),
footer.flags, header.compat_version,
header.reserved};
// deduced type: MessageFrame
auto message = MessageFrame::Encode(header2, m->get_payload(), m->get_middle(), m->get_data());
append_frame(message); // The message is appended to connection->outgoing_bl here.
ssize_t rc = connection->_try_send(more);
...
return rc;
}
template <class F>
bool ProtocolV2::append_frame(F& frame) {
ceph::bufferlist bl;
bl = frame.get_buffer(tx_frame_asm);
connection->outgoing_bl.append(bl);
return true;
}
// in src/msg/async/AsyncConnection.cc
ssize_t AsyncConnection::_try_send(bool more) {
...
ssize_t r = cs.send(outgoing_bl, more);
...
}
Again, ProtocolV2::send_message()
uses EventCenter::dispatch_event_external()
function to pass the job to a seperate worker thread. Through the write handler of AsyncConnection
, ProtocolV2::write_message()
is responsible to build a message and send it.
ProtocolV2::write_message()
builds an encoded message using MessageFrame::Encode()
static function, appends it into an outgoing_bl
(buffer variable) of AsyncConnection
class variable by calling ProtocolV2::append_frame()
, and sends it by calling AsyncConection::_try_send()
.
In the function AsyncConnection::_try_send()
, the class instance uses ConnectedSocket
(variable name: cs) to send a message to the remote peer. Here, if we specified a network stack as POSIX, PosixConnectedSocketImpl
(defined in src/msg/async/PosixStack.cc) is used for its internal implementation:
src/msg/async/Stack.h
class ConnectedSocket {
std::unique_ptr<ConnectedSocketImpl> _csi;
public:
ssize_t send(ceph::buffer::list &bl, bool more) {
return _csi->send(bl, more);
}
...
};
src/msg/async/PosixStack.cc
int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
...
*socket = ConnectedSocket(
std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
return 0;
}
So, calling cs.send()
eventually calls PosixConnectedSocketImpl::send()
, which calls ::sendmsg()
system call until all bytes are transferred.
src/msg/async/PosixStack.cc
class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
...
public:
...
ssize_t send(ceph::buffer::list &bl, bool more) override {
...
ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
...
}
#ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
size_t sent = 0;
while (1) {
MSGR_SIGPIPE_STOPPER;
ssize_t r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
if (r < 0) {
int err = ceph_sock_errno();
if (err == EINTR) {
continue;
} else if (err == EAGAIN) {
break;
}
return -err;
}
sent += r;
if (len == sent) break;
...
}
return (ssize_t)sent;
}
#endif
Note that all transmission is done by a seperate worker thread, hence an asynchronous manner from the viewpoint of the main thread.