Analyzing Ceph Network Module 3

Dec 11, 2020
image

High level Ceph architecture. [src]

In the previous posts 1 2, I illustrated how we establishes a connection between a server and a client using AsyncMessenger functions.

However, in high level Ceph architecture, there are some more abstracted APIs that are useful for specific purpose, e.g., accessing ceph-mon or ceph-osd, etc. This post illustrates how MonClient is implemented.

MonClient

MonClient, considering its name, is a class that can be used to connect a Ceph monitor daemon.

Which client library a user uses to use Ceph, all libraries use MonClient to get a cluster map of the Ceph cluster from a Ceph monitor daemon.

  • librados: librados::RadosClient has a MonClient class instance to access the daemon.
  • librgw: RGWRados uses librados::IoCtx and librados::Rados, which uses librados::RadosClient to connect to RADOS.
  • librbd: librbd::ImageCtx uses librados::IoCtx to connect to RADOS.
  • libcephfs: Client uses MonClient directly to connect to RADOS.
image

MonClient internal execution flow. It internally uses AsyncMessenger for communication, which has been illustrated in the previous post. Note the bold function calls Messenger::connect_to() and AsyncConnection::send_message().

1. Connecting to a monitor

A client needs to provide an information about a target monitor daemon. There are several ways to provide the information:

  • use --mon-host-override <ip> command-line argument or mon_host_override config option.
  • use a file located in the path specified by monmap config option.
  • use fsid config option (which I don’t know much).
  • use mon_host config option.

src/mon/MonClient.cc

int MonMap::build_initial(CephContext *cct, bool for_mkfs, ostream& errout) {
  const auto& conf = cct->_conf;

  // mon_host_override?
  auto mon_host_override = conf.get_val<std::string>("mon_host_override");
  if (!mon_host_override.empty()) {
    init_with_ips(mon_host_override, for_mkfs, "noname-");
    // or
    init_with_hosts(mon_host_override, for_mkfs, "noname-");
  }

  // or, cct?
  auto addrs = cct->get_mon_addrs();
  init_with_addrs(*addrs, for_mkfs, "noname-");

  // or, file?
  const auto monmap = conf.get_val<std::string>("monmap");
  init_with_monmap(monmap, errout);

  // fsid from conf (omitted)

  // -m foo?
  const auto mon_host = conf.get_val<std::string>("mon_host");
  init_with_ips(mon_host, for_mkfs, "noname-");
  // or
  init_with_hosts(mon_host, for_mkfs, "noname-");
  ...
}

void MonMap::init_with_addrs(const std::vector<entity_addrvec_t>& addrs, bool for_mkfs, std::string_view prefix) {
  for (auto& addr: addrs) {
    add(name, addr, 0);
  }
}

// In src/mon/MonMap.h
void add (const mon_info_t& m) {
  ...
  mon_info[m.name] = m;
  ranks.push_back(m.name);
  calc_addr_mons();
}

void calc_addr_mons() {
  addr_mons.clear();
  for (auto& p : mon_info) {
    for (Auto& a : p.second.public_addrs.v) {
      addr_mons[a] = p.first; // type of addr_mons: std::map<entity_addr_t, std::string>
    }
  }
}

In RadosClient::connect() it just creates an AsyncMessenger class instance by calling Messenger::create_client_messenger(). Interestingly, an actual connection is not established in RadosClient::connect(). A connection seems to be established whenever a client sends a command. So whenever you call RadosClient::mon_command() or RadosClient::mon_command::async(), it connects to the monitor that has the closest rank by calling AsyncMessenger::connect_to_mon(monmap.get_addrs(...)).

src/mon/MonClient.h, src/mon/MonClient.cc

class MonClient : public Dispatcher, public AuthClient, public AuthServer {
public:
  template<typename CompletionToken>
  auto start_mon_command(const std::vector<std::string>& cmd, const cepb::buffer::list& inbl, CompletionToken&& token) {
    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
    auto h = CommandCompletion::create(service.get_executor(), std::move(init.completion_handler));
    auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
    r->cmd = cmd;
    r->inbl = inbl;
    mon_commands.emplace(r->tid, r);
    _send_command();
  }
  return init.result.get();
  ...
};

// In src/mon/MonClient.cc
void MonClient::_send_command(MonCommand *r) {
  ...
  r->target_con = messenger->connect_to_mon(monmap.get_addrs(r->target_rank), true /* anon */);
  r->target_session.reset(new MonConnection(cct, r->target_con, 0, *auth_registry));
  r->target_session->start(monmap.get_epoch(), entity_name);

  MCommand *m = new MCommand(monmap.fsid);
  m->set_tid(r->tid);
  m->cmd = r->cmd;
  m->set_data(r->inbl);
  r->target_session->queue_command(m);
}

MonConnection::queue_command() actually does not use a queue; instead, it stores the message into the pending variable pending_tell_command:

src/mon/MonClient.h

class MonConnection {
  ...
  void queue_command(Message *m) {
    pending_tell_command = m;
  }
};

The stored pending command will be executed after authentication is done. This is a bit tricky that I could not fully understand it yet. Ceph seems to use Protocol class for authentication.

image

Authentication handling function execution flow. Protocol class is implemented with a lot of preprocessors that I could not follow the execution well.

Here, the diagram above explains how the stored pending_tell_command is sent to the server. After authentication is done, several nested callback handlers are executed, and MonConnection calls AsyncConnection::send_message2() to send the stored command to the server at last.

2. Sending a monitor command

I found out that AsyncConnection::send_message2() is used to send a command to the server. Let’s dig into this function.

image

Sending message flow.

AsyncConnection::send_message2() has been inherited from its parent class Connection (src/msg/Connection.h), which calls a pure virtual function send_message(). In this case, AsyncConnection::send_message() will be called.

src/msg/async/AsyncConnection.cc

int AsyncConnection::send_message(Message *m) {
  ...
  if (!m->get_priority()) {
    m->set_priority(async_msgr->get_Default_send_priority());
  }
  m->get_header().src = async_msgr->get_myname();
  m->set_connection(this);
  ...

  protocol->send_message(m);
  return 0;
}

It again uses Protocol class instance to send a message.

src/msg/async/ProtocolV2.cc, src/msg/async/AsyncConnection.cc

// in src/msg/async/ProtocolV2.cc
void ProtocolV2::send_message(Message *m) {
  ...
  m->queue_start = ceph::mono_clock::now();
  m->trace.event("async enqueueing message");
  out_queue[m->get_priority()].emplace_back(out_queue_entry_t{is_prepared, m});
  if (((!replacing && can_wirte) || state == STANDBY) && !write_in_progress) {
    write_in_progress = true;
    connection->center->dispatch_event_external(connection->write_handler);
  }
}

// in src/msg/async/AsyncConnection.cc
AsyncConnection::AsyncConnection(...) : ... {
  ...
  write_handler = new C_handler_write(this); // This callback is used by ProtocolV2::send_message().
  ...
}

class C_handle_write : public EventCallback {
  AsyncConnectionRef conn;

 public:
  explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
  void do_request(uint64_t fd) override {
    conn->handle_write();
  }
}

void AsyncConnection::handle_write() {
  protocol->write_event();
}

// in src/msg/async/ProtocolV2.cc
void ProtocolV2::write_event() {
  ...
  do {
    // deduced type: ProtocolV2::out_queue_entry_t
    const auto out_entry = _get_next_outgoing();
    ...

    // send message or requeue messages amy not encode message
    if (!out_entry.is_prepared) {
      prepare_send_message(connection->get_features(), out_entry.m);
    }

    r = write_message(out_entry.m, more);
  } while (can_write);
  write_in_progress = false;
  ...
}

ssize_t ProtocolV2::write_message(Message *m, bool more) {
  ...
  ceph_msg_header &header = m->get_header();
  ceph_msg_footer &footer = m->get_footer();
  ceph_msg_header2 header2 {header.seq,      header.tid,
                            header.type,     header.priority,
                            header.version,
                            init_le32(0),    header.data_off,
                            init_le64(ack_sqe),
                            footer.flags,    header.compat_version,
                            header.reserved};

  // deduced type: MessageFrame
  auto message = MessageFrame::Encode(header2, m->get_payload(), m->get_middle(), m->get_data());
  append_frame(message); // The message is appended to connection->outgoing_bl here.
  ssize_t rc = connection->_try_send(more);
  ...
  return rc;
}

template <class F>
bool ProtocolV2::append_frame(F& frame) {
  ceph::bufferlist bl;
  bl = frame.get_buffer(tx_frame_asm);
  connection->outgoing_bl.append(bl);
  return true;
}

// in src/msg/async/AsyncConnection.cc
ssize_t AsyncConnection::_try_send(bool more) {
  ...
  ssize_t r = cs.send(outgoing_bl, more);
  ...
}

Again, ProtocolV2::send_message() uses EventCenter::dispatch_event_external() function to pass the job to a seperate worker thread. Through the write handler of AsyncConnection, ProtocolV2::write_message() is responsible to build a message and send it.

ProtocolV2::write_message() builds an encoded message using MessageFrame::Encode() static function, appends it into an outgoing_bl (buffer variable) of AsyncConnection class variable by calling ProtocolV2::append_frame(), and sends it by calling AsyncConection::_try_send().

In the function AsyncConnection::_try_send(), the class instance uses ConnectedSocket (variable name: cs) to send a message to the remote peer. Here, if we specified a network stack as POSIX, PosixConnectedSocketImpl (defined in src/msg/async/PosixStack.cc) is used for its internal implementation:

src/msg/async/Stack.h

class ConnectedSocket {
  std::unique_ptr<ConnectedSocketImpl> _csi;

 public:
  ssize_t send(ceph::buffer::list &bl, bool more) {
    return _csi->send(bl, more);
  }
  ...
};

src/msg/async/PosixStack.cc

int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
  ...
  *socket = ConnectedSocket(
      std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
  return 0;
}

So, calling cs.send() eventually calls PosixConnectedSocketImpl::send(), which calls ::sendmsg() system call until all bytes are transferred.

src/msg/async/PosixStack.cc

class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
  ...
 public:
  ...
  ssize_t send(ceph::buffer::list &bl, bool more) override {
    ...
    ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
    ...
  }

  #ifndef _WIN32
  static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) {
    size_t sent = 0;
    while (1) {
      MSGR_SIGPIPE_STOPPER;
      ssize_t r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
      if (r < 0) {
        int err = ceph_sock_errno();
        if (err == EINTR) {
          continue;
        } else if (err == EAGAIN) {
          break;
        }
        return -err;
      }

      sent += r;
      if (len == sent) break;
      ...
    }
    return (ssize_t)sent;
  }
  #endif

Note that all transmission is done by a seperate worker thread, hence an asynchronous manner from the viewpoint of the main thread.


  1. Analyzing Ceph Network Module 1 ↩︎

  2. Analyzing Ceph Network Module 2 ↩︎

comments powered by Disqus