In the previous posts 1 2, I illustrated how we establishes a connection between a server and a client using AsyncMessenger
functions.
However, in high level Ceph architecture, there are some more abstracted APIs that are useful for specific purpose, e.g., accessing ceph-mon or ceph-osd, etc.
This post illustrates how MonClient
is implemented.
MonClient
MonClient, considering its name, is a class that can be used to connect a Ceph monitor daemon.
Which client library a user uses to use Ceph, all libraries use MonClient
to get a cluster map of the Ceph cluster from a Ceph monitor daemon.
librados
:librados::RadosClient
has aMonClient
class instance to access the daemon.librgw
:RGWRados
useslibrados::IoCtx
andlibrados::Rados
, which useslibrados::RadosClient
to connect to RADOS.librbd
:librbd::ImageCtx
useslibrados::IoCtx
to connect to RADOS.libcephfs
:Client
usesMonClient
directly to connect to RADOS.
1. Connecting to a monitor
A client needs to provide an information about a target monitor daemon. There are several ways to provide the information:
- use
--mon-host-override <ip>
command-line argument ormon_host_override
config option. - use a file located in the path specified by
monmap
config option. - use
fsid
config option (which I don’t know much). - use
mon_host
config option.
src/mon/MonClient.cc
int MonMap::build_initial(CephContext *cct, bool for_mkfs, ostream& errout) {
const auto& conf = cct->_conf;
// mon_host_override?
auto mon_host_override = conf.get_val<std::string>("mon_host_override");
if (!mon_host_override.empty()) {
init_with_ips(mon_host_override, for_mkfs, "noname-");
// or
init_with_hosts(mon_host_override, for_mkfs, "noname-");
}
// or, cct?
auto addrs = cct->get_mon_addrs();
init_with_addrs(*addrs, for_mkfs, "noname-");
// or, file?
const auto monmap = conf.get_val<std::string>("monmap");
init_with_monmap(monmap, errout);
// fsid from conf (omitted)
// -m foo?
const auto mon_host = conf.get_val<std::string>("mon_host");
init_with_ips(mon_host, for_mkfs, "noname-");
// or
init_with_hosts(mon_host, for_mkfs, "noname-");
...
}
void MonMap::init_with_addrs(const std::vector<entity_addrvec_t>& addrs, bool for_mkfs, std::string_view prefix) {
for (auto& addr: addrs) {
add(name, addr, 0);
}
}
// In src/mon/MonMap.h
void add (const mon_info_t& m) {
...
mon_info[m.name] = m;
ranks.push_back(m.name);
calc_addr_mons();
}
void calc_addr_mons() {
addr_mons.clear();
for (auto& p : mon_info) {
for (Auto& a : p.second.public_addrs.v) {
addr_mons[a] = p.first; // type of addr_mons: std::map<entity_addr_t, std::string>
}
}
}
In RadosClient::connect()
it just creates an AsyncMessenger
class instance by calling Messenger::create_client_messenger()
.
Interestingly, an actual connection is not established in RadosClient::connect()
. A connection seems to be established whenever a client sends a command.
So whenever you call RadosClient::mon_command()
or RadosClient::mon_command::async()
, it connects to the monitor that has the closest rank by calling AsyncMessenger::connect_to_mon(monmap.get_addrs(...))
.
src/mon/MonClient.h
, src/mon/MonClient.cc
class MonClient : public Dispatcher, public AuthClient, public AuthServer {
public:
template<typename CompletionToken>
auto start_mon_command(const std::vector<std::string>& cmd, const cepb::buffer::list& inbl, CompletionToken&& token) {
boost::asio::async_completion<CompletionToken, CommandSig> init(token);
auto h = CommandCompletion::create(service.get_executor(), std::move(init.completion_handler));
auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
r->cmd = cmd;
r->inbl = inbl;
mon_commands.emplace(r->tid, r);
_send_command();
}
return init.result.get();
...
};
// In src/mon/MonClient.cc
void MonClient::_send_command(MonCommand *r) {
...
r->target_con = messenger->connect_to_mon(monmap.get_addrs(r->target_rank), true /* anon */);
r->target_session.reset(new MonConnection(cct, r->target_con, 0, *auth_registry));
r->target_session->start(monmap.get_epoch(), entity_name);
MCommand *m = new MCommand(monmap.fsid);
m->set_tid(r->tid);
m->cmd = r->cmd;
m->set_data(r->inbl);
r->target_session->queue_command(m);
}
MonConnection::queue_command()
actually does not use a queue; instead, it stores the message into the pending variable pending_tell_command
:
src/mon/MonClient.h
class MonConnection {
...
void queue_command(Message *m) {
pending_tell_command = m;
}
};
The stored pending command will be executed after authentication is done.
This is a bit tricky that I could not fully understand it yet. Ceph seems to use Protocol
class for authentication.
Here, the diagram above explains how the stored pending_tell_command
is sent to the server. After authentication is done, several nested callback handlers are executed, and MonConnection
calls AsyncConnection::send_message2()
to send the stored command to the server at last.
2. Sending a monitor command
I found out that AsyncConnection::send_message2()
is used to send a command to the server. Let’s dig into this function.
AsyncConnection::send_message2()
has been inherited from its parent class Connection
(src/msg/Connection.h), which calls a pure virtual function send_message()
. In this case, AsyncConnection::send_message()
will be called.
src/msg/async/AsyncConnection.cc
int AsyncConnection::send_message(Message *m) {
...
if (!m->get_priority()) {
m->set_priority(async_msgr->get_Default_send_priority());
}
m->get_header().src = async_msgr->get_myname();
m->set_connection(this);
...
protocol->send_message(m);
return 0;
}
It again uses Protocol
class instance to send a message.
src/msg/async/ProtocolV2.cc
, src/msg/async/AsyncConnection.cc
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::send_message(Message *m) {
...
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(out_queue_entry_t{is_prepared, m});
if (((!replacing && can_wirte) || state == STANDBY) && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
// in src/msg/async/AsyncConnection.cc
AsyncConnection::AsyncConnection(...) : ... {
...
write_handler = new C_handler_write(this); // This callback is used by ProtocolV2::send_message().
...
}
class C_handle_write : public EventCallback {
AsyncConnectionRef conn;
public:
explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
void do_request(uint64_t fd) override {
conn->handle_write();
}
}
void AsyncConnection::handle_write() {
protocol->write_event();
}
// in src/msg/async/ProtocolV2.cc
void ProtocolV2::write_event() {
...
do {
// deduced type: ProtocolV2::out_queue_entry_t
const auto out_entry = _get_next_outgoing();
...
// send message or requeue messages amy not encode message
if (!out_entry.is_prepared) {
prepare_send_message(connection->get_features(), out_entry.m);
}
r = write_message(out_entry.m, more);
} while (can_write);
write_in_progress = false;
...
}
ssize_t ProtocolV2::write_message(Message *m, bool more) {
...
ceph_msg_header &header = m->get_header();
ceph_msg_footer &footer = m->get_footer();
ceph_msg_header2 header2 {header.seq, header.tid,
header.type, header.priority,
header.version,
init_le32(0), header.data_off,
init_le64(ack_sqe),
footer.flags, header.compat_version,
header.reserved};
// deduced type: MessageFrame
auto message = MessageFrame::Encode(header2, m->get_payload(), m->get_middle(), m->get_data());
append_frame(message); // The message is appended to connection->outgoing_bl here.
ssize_t rc = connection->_try_send(more);
...
return rc;
}
template <class F>
bool ProtocolV2::append_frame(F& frame) {
ceph::bufferlist bl;
bl = frame.get_buffer(tx_frame_asm);
connection->outgoing_bl.append(bl);
return true;
}
// in src/msg/async/AsyncConnection.cc
ssize_t AsyncConnection::_try_send(bool more) {
...
ssize_t r = cs.send(outgoing_bl, more);
...
}
Again, ProtocolV2::send_message()
uses EventCenter::dispatch_event_external()
function to pass the job to a seperate worker thread. Through the write handler of AsyncConnection
, ProtocolV2::write_message()
is responsible to build a message and send it.
ProtocolV2::write_message()
builds an encoded message using MessageFrame::Encode()
static function, appends it into an outgoing_bl
(buffer variable) of AsyncConnection
class variable by calling ProtocolV2::append_frame()
, and sends it by calling AsyncConection::_try_send()
.
In the function AsyncConnection::_try_send()
, the class instance uses ConnectedSocket
(variable name: cs) to send a message to the remote peer. Here, if we specified a network stack as POSIX, PosixConnectedSocketImpl
(defined in src/msg/async/PosixStack.cc) is used for its internal implementation:
src/msg/async/Stack.h
class ConnectedSocket {
std::unique_ptr<ConnectedSocketImpl> _csi;
public:
ssize_t send(ceph::buffer::list &bl, bool more) {
return _csi->send(bl, more);
}
...
};
src/msg/async/PosixStack.cc
int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
...
*socket = ConnectedSocket(
std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
return 0;
}
So, calling cs.send()
eventually calls PosixConnectedSocketImpl::send()
, which calls ::sendmsg()
system call until all bytes are transferred.
src/msg/async/PosixStack.cc
class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
...
public:
...
ssize_t send(ceph::buffer::list &bl, bool more) override {
...
ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
...
}
#ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
size_t sent = 0;
while (1) {
MSGR_SIGPIPE_STOPPER;
ssize_t r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
if (r < 0) {
int err = ceph_sock_errno();
if (err == EINTR) {
continue;
} else if (err == EAGAIN) {
break;
}
return -err;
}
sent += r;
if (len == sent) break;
...
}
return (ssize_t)sent;
}
#endif
Note that all transmission is done by a seperate worker thread, hence an asynchronous manner from the viewpoint of the main thread.