Analyzing Ceph Restful Module and Boost ASIO

Nov 30, 2020

This post explains how Ceph Restful Module communicates with the other Ceph daemons.

1. Ceph RESTful Module

Ceph RESTful module is a ceph-mgr-dashboard module, though which clients can access the Ceph cluster over an SSL-secured connection1. The Ceph API service is available at the same URL as the regular Ceph Dashboard under the /api bas path: http(s)://<server_addr>:<server_port>/api. I wonder how ceph-mgr establishes connections with the other daemons and handles RESTful API requests 2.

2. API Handler

I first start from the API callback functions. ceph-mgr uses Python code in src/pybind/mgr/dashboard/controllers to handle requests through RESTful API, which is bound to the main ceph-mgr c++ implemention through pybind11 3.

  • auth (/api/auth): src/pybind/mgr/dashboard/controllers/auth.py
  • cephfs (/api/cephgs): src/pybind/mgr/dashboard/controllers/cephfs.py
  • cluster configuration (/api/cluster_conf): src/pybind/mgr/dashboard/controllers/cluster_configuration.py
  • crush rule (/api/crush_rule): src/pybind/mgr/dashboard/controllers/crush_rule.py
  • grafana (/api/grafana): src/pybind/mgr/dashboard/controllers/grafana.py
  • health (/api/health): src/pybind/mgr/dashboard/controllers/health.py
  • host (/api/host): src/pybind/mgr/dashboard/controllers/host.py

In each Python code, there is a class that handles its own RESTful API group: for instance, auth module has three endpoints: POST /api/auth, POST /api/auth/check, and POST /api/auth/logout:

src/pybind/mgr/dashboard/controllers/auth.py

@ApiController('/auth', secure=False)
class Auth(RESTController):
  def create(self, username, password):   # Seems to handle POST /api/auth
    ...                                   # Not understand why it lacks @RESTController.Collection('POST') pragma

  @RESTController.Collection('POST')
  def logout(self):                       # Seems to handle POST /api/auth/logout
    ...

  @RESTController.Collection('POST', query_params=['token'])
  def check(self, token):                 # Seems to handle POST /api/auth/check
    ...

Similarly, for OSD APIs, the following APIs are provided:

  • GET /api/osd
  • POST /api/osd
  • GET /api/osd/flags: Display OSD flags
  • PUT /api/osd/flags: Sets OSD flags for the entire cluster.
  • GET /api/osd/{svc_id}: Returns collected data about an OSD.
  • PUT /api/osd/{svc_id}
  • PUT /api/osd/{svc_id}/mark: Mark OSD flags (out, in, down, lost, …)
  • POST /api/osd/{svc_id}/destroy: Make OSD as being destroyed.

These APIs are handled by functions in src/pybind/mgr/dashboard/controllers/osd.py:

src/pybind/mgr/dashboard/controllers/osd.py

@ApiController('/osd', Scope.OSD)
class Osd(RESTController):
  def list(self):                   # handle GET /api/osd
    ...

  @handle_send_command_error('osd')
  def get(self, svc_id):            # handle GET /api/osd/{svc_id}
    ...

  def set(self, svc_id, device_class):  # handle POST /api/osd/{svc_id}
    old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
                                                ids=[svc_id])
    old_device_class = old_device_class[0]['device_class']
    if old_device_class != device_class:
        CephService.send_command('mon', 'osd crush rm-device-class',
                                  ids=[svc_id])
        if device_class:
            CephService.send_command('mon', 'osd crush set-device-class', **{
                'class': device_class,
                'ids': [svc_id]
            })

  @RESTController.Resource('PUT')
  def mark(self, svc_id, action):   # handle PUT /api/osd/{svc_id}/mark
    ...

  @RESTController.Resource('POST')
  def destroy(self, svc_id):        # handle PUT /api/osd/{svc_id}/destroy
    CephService.send_command(
        'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True)

  ...

@ApiController('/osd/flags', Scope.OSD)
class OsdFlagsController(RESTController):
  ...

3. Approaching other Ceph Daemons

CephService.send_command

You can easily see the API handler functions use CephService.send_command to get data from or set data to other daemons. As Ceph monitors manage the entire cluster map, APIs regarding OSD seems to transfer an order to the monitor to update the map.

It calls mgr.send_command, which calls self._ceph_send_command:

src/pybind/mgr/mgr_module.py

class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
  ...

  def send_command(self, *args, **kwargs):
  """
  Called by the plugin to send a command to the mon
  cluster.
  """
  self._ceph_send_command(*args, **kwargs)

and _ceph_send_command is bounded into c++ function ceph_send_command through pybind in src/mgr/BaseMgrModule.cc:

src/mgr/BaseMgrModule.cc

PyMethodDef BaseMgrModule_methods[] = {
  ...

  {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS,
   "Send a mon command"},
}

static PyObject*
ceph_send_command(BaseMgrModule *self, PyObject *args) {
  char *type, *name, *cmd_json, *tag;
  PyArg_ParseTuple(args, "Ossss:ceph_send_command",
    &completion, &type, &name, &cmd_json, &tag);
  if (std::string(type) == "mon") {
    ...
    self->py_modules->get_monc().start_mon_command(...);
  } else if (std::string(type) == "osd") {
    ...
    self->py_modules->get_objecter().osd_command(...);
  } else if (std::string(type) == "mds") {
    ...
    self->py_modules->get_client().mds_command(...);
  } else if (std::string(type) == "pg") {
    ...
    self->py_modules->get_objecter().pg_command(...);
  }
}

ActivePyModules

The type of self->py_modules is ActivePyModules (src/mgr/ActivePyModules.h), which is initialized by PyModuleRegistry: src/mgr/PyModuleRegistry.cc

void PyModuleRegistry::active_start(
            DaemonStateIndex &ds, ClusterState &cs,
            const std::map<std::string, std::string> &kv_store,
            MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
            Objecter &objecter_, Client &client_, Finisher &f,
            DaemonServer &server) {
  active_modules.reset(new ActivePyModules(
        module_config, kv_store, ds, cs, mc,
        clog_, audit_clog_, objecter_, client_, f, server,
        *this));
  
  for (const auto &i : modules) {
    active_modules->start_one(i.second);
  }
}

The PyModuleRegistry::active_start() function is called when initializing Mgr:

src/mgr/Mgr.cc:

Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
         PyModuleRegistry *py_module_registry_,
	 Messenger *clientm_, Objecter *objecter_,
	 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
  monc(monc_),
  objecter(objecter_),
  client(client_),
  client_messenger(clientm_),
  finisher(g_ceph_context, "Mgr", "mgr-fin"),
  digest_received(false),
  py_module_registry(py_module_registry_),
  cluster_state(monc, nullptr, mgrmap),
  server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
         clog_, audit_clog_),
  clog(clog_),
  audit_clog(audit_clog_),
  initialized(false),
  initializing(false) { ... }

void Mgr::init() {
  ...

  py_module_registry->active_start(daemon_state, cluster_state,
      kv_store, *monc, clog, audit_clog, *objecter, *client,
      finisher, server);
  ...
}

MgrStandby

So basically objecter, monc, and client that are used for communication is passed during Mgr initialization. Actual Mgr creation code is in MgrStandby and MgrStandby is the one that actually creates objecter, monc, and client:

src/mgr/MgrStandby.cc

MgrStandby::MgrStandby(int argc, const char** argv)
    : Dispatcher(g_ceph_context),
      monc{g_ceph_context, poolctx},
      client_messenger(
          Messenger::create(g_ceph_context,
                            cct->_conf.get_val<std::string>("ms_type"),
                            entity_name_t::MGR(),
                            "mgr",
                            Messenger::get_pid_nonce())),
      objecter{g_ceph_context, client_messenger.get(), &monc, poolctx},
      client{client_messenger.get(), &monc, &objecter},
      mgrc(g_ceph_context, client_messenger.get(), &monc.monmap),
      log_client(g_ceph_context,
                 client_messenger.get(),
                 &monc.monmap,
                 LogClient::NO_FLAGS),
      clog(log_client.create_channel(CLOG_CHANNEL_CLUSTER)),
      audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
      finisher(g_ceph_context, "MgrStandby", "mgrsb-fin"),
      timer(g_ceph_context, lock),
      py_module_registry(clog),
      active_mgr(nullptr),
      orig_argc(argc),
      orig_argv(argv),
      available_in_map(false) {}

void MgrStandby::handle_mgr_map(ref_t<MMgrMap> mmap) {
  auto& map = mmap->get_map();
  dout(4) << "received map epoch " << map.get_epoch() << dendl;
  const bool active_in_map = map.active_gid == monc.get_global_id();
  dout(4) << "active in map: " << active_in_map << " active is "
          << map.active_gid << dendl;
  ...

  if (active_in_map) {
    if (!active_mgr) {
      dout(1) << "Activating!" << dendl;
      active_mgr.reset(new Mgr(&monc, map, &py_module_registry,
                              client_messenger.get(), &objecter, &client, clog,
                              audit_clog));
      ...
    }
  }

  ...
}

bool MgrStandby::ms_dispatch2(const ref_t<Message>& m) {
  if (m->get_type() == MSG_MGR_MAP) {
    handle_mgr_map(ref_cast<MMgrMap>(m));
  }
  ...
}

MgrStandby does not initialize Mgr class instance when it is created. Until a message is dispatched (via ms_dispatch2, a function inherited from Dispatcher class), active_mgr remains nullptr, and it is initialized when ms_dispatch2 handle is executed. Still, MgrStandby is responsible for creating objecter, monc, client, and Mgr.

src/mgr/MgrStandby.h

class MgrStandby : public Dispatcher, public md_config_obs_t {
  ...
  
protected:
  ceph::async::io_context_pool poolctx;
  MonClient monc;
  std::unique_ptr<Messenger> client_messenger;
  Objecter objecter;
  Client client;

  MgrClient mgrc;

  PyModuleRegistry py_module_registry;
  std::shared_ptr<Mgr> active_mgr;

  ...
};

Maybe initializing all clients would be similar, so let’s see how MonClient monc is initialized.

Initializing MonClient

src/mon/MonClient.h

class MonClient : public Dispatcher,
                  public AuthClient,
                  public AuthServer /* for mgr, osd, mds */ {
private:
  Messenger *messenger;

  std::unique_ptr<MonConnection> active_con;
  std::map<entity_addrvec_t, MonConnection> pending_cons;

  boost::asio::io_context& service;
  boost::asio::io_context::strand finish_strand{service};

  ...
public:
  MonClient(CephContext *cct_, boost::asio::io_context& service);
}

Note that monc is intialized as monc{g_ceph_context, poolctx}, where poolctx is a type ceph::async::io_context_pool, where the type of the second parameter for MonClient constructor is boost::asio::io_context. ceph::async::io_context_pool overrode its operator so that it automatically returns the refernece of its internal variable boost::asio::io_context ioctx:

src/common/async/context_pool.h

namespace ceph::async {
class io_context_pool {
  ...

  operator boost::asio::io_context&() {
    return ioctx;
  }
};
}

start_mon_command Internal

So it seems that ceph-mgr and ceph-monitor connects to each other with boost asio. MonClient::start_mon_command uses this boost::asio::io_context service variable to asynchronously communicate with ceph monitors:

src/mon/MonClient.h

class MonClient {
public:
  template<typename CompletionToken>
  auto start_mon_command(const std::string& mon_name,
                         const std::vector<std::string>& cmd,
                         const ceph::buffer::list& inbl,
                         CompletionToke&& token) {
    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
    auto h = CommandCompletion::create(service.get_executor(), std::move(init.completion_handler));
    if (!initialized || stopping) {
      ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, bufferlist{});
    } else {
      auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
      mon_commands.emplace(r->tid, r);
      _send_command(r);
    }
    
    return init.result.get();
  }
};

Looking at _send_command:

src/mon/MonClient.cc

void MonClient::_send_Command(MonCommand *r) {
  ...

  // Implementation varies for different release of Ceph. The following code is for Octopus or later.
  r->target_con = messenger->connect_to_mon(monmap.get_addrs(r->target_name), true /* anon */);
  r->target_session.reset(new MonConnection(cct, r->target_con, 0, &auth_registry));
  r->target_session->start(monmap.get_epoch(), entity_name);

  MComamnd *m = new MCommand(monmap.fsid);
  m->set_tid(r->tid);
  m->cmd = r->cmd;
  m->set_data(r->inbl);
  r->target_session->queue_command(m);
  return;
}

It creates a new MonConnection and enqueue the given command. In MonConnection::start(), it send several headers including a keepalive packet and authentication data, etc.

Interesting point is MonConnection::queue_command: it just stores the given command pointer to the pending command pointer:

class MonConnection {
  ...

  void queue_command(Message *m) {
    pending_tell_command = m;
  }
};

It is not a queue but anyway.. It temporarily stores a message and sends it only after authentication is done.

src/mon/MonClient.cc

int MonClient::handle_auth_done(...) {
  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
    if (con->is_anon()) {
      for (auto& i : mon_commands) {
        if (i.second->target_con == con) {
          return i.secod->target_session->handle_auth_done(...);
        }
      }
    }
  }
}

int MonConnection::handle_auth_done(...) {
  ...
  int auth_err = auth->handle_response(...);
  if (auth_err >= 0) {
    state = State::HAVE_SESSION;
  }
  con->set_last_keepalive_ack(auth_start);

  if (pending_tell_command) {
    con->send_message2(std::move(pending_tell_command));
  }
  return auth_err;
}

Connection::send_message2 is defined in src/msg/Connection.h, which calls a pure virtual function Connection::send_message. Again, an inherited class AsyncConnection is used:

src/msg/async/AsyncConnection.cc

int AsyncConnection::send_message(Message *m) {
  ...

  protocol->send_message(m);
}

protocol. What is it? messenger->connect_to_mon() function also uses protocol in its function call stack. It is defined in src/msg/async/Protocol.h and has virtual class methods; currently ProtocolV1 (legacy) and ProtocolV2 (default) inherit the Protocol class and are used 4.

This means, all actual communication is managed by this Protocol class instance.

Boost Executor and Sending Message

We are slighly deviated from the main road: MonClient::start_mon_command uses boost::asio::async_completion<CompletionToken, CommandSig> and MonCommand.

src/mon/MonClient.h

template <typename CompletionToken>
  auto start_mon_command(const std::string& mon_name,
                         const std::vector<std::string>& cmd,
                         const ceph::buffer::list& inbl,
                         CompletionToken&& token) {
  boost::asio::async_completion<CompletionToken, CommandSig> init(token);

  auto h = CommandCompletion::create(service.get_executor(),
					 std::move(init.completion_handler));
  auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
  _send_command(r);
}

How these two classes are used together?

TBD


  1. RESTful module [msgrv2]: Ceph Wire Protocol Revisied Messenger v2 ↩︎

  2. Ceph RESTful API ↩︎

  3. Github Repsitory: pybind11 ↩︎

  4. Ceph Messenger V2 ↩︎

comments powered by Disqus