High level Ceph architecture. [src]
In the previous posts 1 2, I illustrated how we establishes a connection between a server and a client using AsyncMessenger functions.
However, in high level Ceph architecture, there are some more abstracted APIs that are useful for specific purpose, e.g., accessing ceph-mon or ceph-osd, etc.
This post illustrates how MonClient is implemented.
MonClient
MonClient, considering its name, is a class that can be used to connect a Ceph monitor daemon.
Which client library a user uses to use Ceph, all libraries use MonClient to get a cluster map of the Ceph cluster from a Ceph monitor daemon.
librados:librados::RadosClienthas aMonClientclass instance to access the daemon.librgw:RGWRadosuseslibrados::IoCtxandlibrados::Rados, which useslibrados::RadosClientto connect to RADOS.librbd:librbd::ImageCtxuseslibrados::IoCtxto connect to RADOS.libcephfs:ClientusesMonClientdirectly to connect to RADOS.
MonClient internal execution flow. It internally uses AsyncMessenger for communication, which has been illustrated in the previous post. Note the bold function calls Messenger::connect_to() and AsyncConnection::send_message().
1. Connecting to a monitor
A client needs to provide an information about a target monitor daemon. There are several ways to provide the information:
- use
--mon-host-override <ip>command-line argument ormon_host_overrideconfig option. - use a file located in the path specified by
monmapconfig option. - use
fsidconfig option (which I don’t know much). - use
mon_hostconfig option.
src/mon/MonClient.cc
int MonMap::build_initial(CephContext *cct, bool for_mkfs, ostream& errout) {
const auto& conf = cct->_conf;
// mon_host_override?
auto mon_host_override = conf.get_val<std::string>("mon_host_override");
if (!mon_host_override.empty()) {
init_with_ips(mon_host_override, for_mkfs, "noname-");
// or
init_with_hosts(mon_host_override, for_mkfs, "noname-");
}
// or, cct?
auto addrs = cct->get_mon_addrs();
init_with_addrs(*addrs, for_mkfs, "noname-");
// or, file?
const auto monmap = conf.get_val<std::string>("monmap");
init_with_monmap(monmap, errout);
// fsid from conf (omitted)
// -m foo?
const auto mon_host = conf.get_val<std::string>("mon_host");
init_with_ips(mon_host, for_mkfs, "noname-");
// or
init_with_hosts(mon_host, for_mkfs, "noname-");
...
}
void MonMap::init_with_addrs(const std::vector<entity_addrvec_t>& addrs, bool for_mkfs, std::string_view prefix) {
for (auto& addr: addrs) {
add(name, addr, 0);
}
}
// In src/mon/MonMap.h
void add (const mon_info_t& m) {
...
mon_info[m.name] = m;
ranks.push_back(m.name);
calc_addr_mons();
}
void calc_addr_mons() {
addr_mons.clear();
for (auto& p : mon_info) {
for (Auto& a : p.second.public_addrs.v) {
addr_mons[a] = p.first; // type of addr_mons: std::map<entity_addr_t, std::string>
}
}
}
In RadosClient::connect() it just creates an AsyncMessenger class instance by calling Messenger::create_client_messenger().
Interestingly, an actual connection is not established in RadosClient::connect(). A connection seems to be established whenever a client sends a command.
So whenever you call RadosClient::mon_command() or RadosClient::mon_command::async(), it connects to the monitor that has the closest rank by calling AsyncMessenger::connect_to_mon(monmap.get_addrs(...)).
src/mon/MonClient.h, src/mon/MonClient.cc
class MonClient : public Dispatcher, public AuthClient, public AuthServer {
public:
template<typename CompletionToken>
auto start_mon_command(const std::vector<std::string>& cmd, const cepb::buffer::list& inbl, CompletionToken&& token) {
boost::asio::async_completion<CompletionToken, CommandSig> init(token);
auto h = CommandCompletion::create(service.get_executor(), std::move(init.completion_handler));
auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
r->cmd = cmd;
r->inbl = inbl;
mon_commands.emplace(r->tid, r);
_send_command();
}
return init.result.get();
...
};
// In src/mon/MonClient.cc
void MonClient::_send_command(MonCommand *r) {
...
r->target_con = messenger->connect_to_mon(monmap.get_addrs(r->target_rank), true /* anon */);
r->target_session.reset(new MonConnection(cct, r->target_con, 0, *auth_registry));
r->target_session->start(monmap.get_epoch(), entity_name);
MCommand *m = new MCommand(monmap.fsid);
m->set_tid(r->tid);
m->cmd = r->cmd;
m->set_data(r->inbl);
r->target_session->queue_command(m);
}
MonConnection::queue_command() actually does not use a queue; instead, it stores the message into the pending variable pending_tell_command:
src/mon/MonClient.h
class MonConnection {
...
void queue_command(Message *m) {
pending_tell_command = m;
}
};
The stored pending command will be executed after authentication is done.
This is a bit tricky that I could not fully understand it yet. Ceph seems to use Protocol class for authentication.
Authentication handling function execution flow. Protocol class is implemented with a lot of preprocessors that I could not follow the execution well.
Here, the diagram above explains how the stored pending_tell_command is sent to the server. After authentication is done, several nested callback handlers are executed, and MonConnection calls AsyncConnection::send_message2() to send the stored command to the server at last.
2. Sending a monitor command
I found out that AsyncConnection::send_message2() is used to send a command to the server. Let’s dig into this function.
Sending message flow.
AsyncConnection::send_message2() has been inherited from its parent class Connection (src/msg/Connection.h), which calls a pure virtual function send_message(). In this case, AsyncConnection::send_message() will be called.
src/msg/async/AsyncConnection.cc
int AsyncConnection::send_message(Message *m) {
...
if (!m->get_priority()) {
m->set_priority(async_msgr->get_Default_send_priority());
}
m->get_header().src = async_msgr->get_myname();
m->set_connection(this);
...
protocol->send_message(m);
return 0;
}
It again uses Protocol class instance to send a message.
src/msg/async/ProtocolV2.cc, src/msg/async/AsyncConnection.cc
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::send_message(Message *m) {
...
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(out_queue_entry_t{is_prepared, m});
if (((!replacing && can_wirte) || state == STANDBY) && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
// in src/msg/async/AsyncConnection.cc
AsyncConnection::AsyncConnection(...) : ... {
...
write_handler = new C_handler_write(this); // This callback is used by ProtocolV2::send_message().
...
}
class C_handle_write : public EventCallback {
AsyncConnectionRef conn;
public:
explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
void do_request(uint64_t fd) override {
conn->handle_write();
}
}
void AsyncConnection::handle_write() {
protocol->write_event();
}
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::write_event() {
...
do {
// deduced type: ProtocolV2::out_queue_entry_t
const auto out_entry = _get_next_outgoing();
...
// send message or requeue messages amy not encode message
if (!out_entry.is_prepared) {
prepare_send_message(connection->get_features(), out_entry.m);
}
r = write_message(out_entry.m, more);
} while (can_write);
write_in_progress = false;
...
}
ssize_t ProtocolV2::write_message(Message *m, bool more) {
...
ceph_msg_header &header = m->get_header();
ceph_msg_footer &footer = m->get_footer();
ceph_msg_header2 header2 {header.seq, header.tid,
header.type, header.priority,
header.version,
init_le32(0), header.data_off,
init_le64(ack_sqe),
footer.flags, header.compat_version,
header.reserved};
// deduced type: MessageFrame
auto message = MessageFrame::Encode(header2, m->get_payload(), m->get_middle(), m->get_data());
append_frame(message); // The message is appended to connection->outgoing_bl here.
ssize_t rc = connection->_try_send(more);
...
return rc;
}
template <class F>
bool ProtocolV2::append_frame(F& frame) {
ceph::bufferlist bl;
bl = frame.get_buffer(tx_frame_asm);
connection->outgoing_bl.append(bl);
return true;
}
// in src/msg/async/AsyncConnection.cc
ssize_t AsyncConnection::_try_send(bool more) {
...
ssize_t r = cs.send(outgoing_bl, more);
...
}
Again, ProtocolV2::send_message() uses EventCenter::dispatch_event_external() function to pass the job to a seperate worker thread. Through the write handler of AsyncConnection, ProtocolV2::write_message() is responsible to build a message and send it.
ProtocolV2::write_message() builds an encoded message using MessageFrame::Encode() static function, appends it into an outgoing_bl (buffer variable) of AsyncConnection class variable by calling ProtocolV2::append_frame(), and sends it by calling AsyncConection::_try_send().
In the function AsyncConnection::_try_send(), the class instance uses ConnectedSocket (variable name: cs) to send a message to the remote peer. Here, if we specified a network stack as POSIX, PosixConnectedSocketImpl (defined in src/msg/async/PosixStack.cc) is used for its internal implementation:
src/msg/async/Stack.h
class ConnectedSocket {
std::unique_ptr<ConnectedSocketImpl> _csi;
public:
ssize_t send(ceph::buffer::list &bl, bool more) {
return _csi->send(bl, more);
}
...
};
src/msg/async/PosixStack.cc
int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
...
*socket = ConnectedSocket(
std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
return 0;
}
So, calling cs.send() eventually calls PosixConnectedSocketImpl::send(), which calls ::sendmsg() system call until all bytes are transferred.
src/msg/async/PosixStack.cc
class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
...
public:
...
ssize_t send(ceph::buffer::list &bl, bool more) override {
...
ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
...
}
#ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
size_t sent = 0;
while (1) {
MSGR_SIGPIPE_STOPPER;
ssize_t r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
if (r < 0) {
int err = ceph_sock_errno();
if (err == EINTR) {
continue;
} else if (err == EAGAIN) {
break;
}
return -err;
}
sent += r;
if (len == sent) break;
...
}
return (ssize_t)sent;
}
#endif
Note that all transmission is done by a seperate worker thread, hence an asynchronous manner from the viewpoint of the main thread.