Analyzing Ceph Network Module 2

Dec 10, 2020
image

Ceph Network Architecture.

In the previous post, I illustrated overall Ceph network architecture and code execution flow of initialization, specifically for servers.

This post illustrates what would occur during connection establishment, both in servers and clients.

1. Ceph Server Accepting Connection

image

Ceph network function execution flow for accepting a client connection.

In the previous post, I only showed EventCenter calls a overridden do_request() function of Processor::C_processor_accept class as a callback (BTW, this callback is for accepting a connection. Callback should be implemented for another type of communication). Processor::C_processor_accept::do_request() calls Processor::accept(), in which the following code are executed to call POSIX accept() function.

src/msg/async/AsyncMessenger.cc, src/msg/async/PosixStack.cc, src/common/compat.cc

// In src/msg/async/AsyncMessenger.cc
void Processor::accept() {
  ...
  while (true) {
    ConnectedSocket cli_socket;
    Worker *w = worker;
    // Type of listen_socket: PosixServerSocket, which uses a private PosixServerSocketImpl variable for actual operations.
    listen_socket.accept(&cli_socket, opts, &addr, w);
    ...
    msgr->add_accept(w, std::move(cli_socket), msgr->get_myaddrs().v[listen_socket.get_addr_slot()], addr);
    ...
  }
}

// In src/msg/async/PosixStack.cc
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
  int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
  ...
  std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
  *sock = ConnectedSocket(std::move(csi));
  return 0;
}

// In src/common/compat.cc
int accept_cloexec(int sockfd, struct sockaddr* addr, socklen_t* addrlen) {
#ifdef HAVE_ACCEPT4
  return accept4(sockfd, addr, addrlen, SOCK_CLOEXEC);
#else
  int fd = accept(sockfd, addr, addrlen);

  #ifndef _WIN32
  if (fcntl(fd, F_SETFD, FD_CLOEXEC) < 0)
    goto fail;
  #endif

  return fd;
#endif
}

AsyncConnection

AsyncConnection maintains a logic session between two endpoints. In other word, a pair of addresses can find the only AsyncConnection. AsyncConnection will handle with network fault or read/write transactions. If one file descriptor broken, AsyncConnection will maintain the message queue and sequence, try to reconnect peer endpoint.

ceph/src/msg/async/ASyncConnection.h

AsyncConnection represents a connection, and is created and managed by AsyncMessenger. It manages the connection with a state, like a finite-state machine.

src/msg/async/AsyncMessenger.cc

void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, const entity_addr_t &listen_addr, const entity_addr_t &peer_addr) {
  // ceph::make_ref<T>(...) is equivalent to new ceph::ref_t{new T(...), false}
  // Seems that ceph manages a reference count in its own way.
  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, listen_addr.is_msgr2(), false);
  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
  accepting_conns.insert(conn);
}

2. Ceph Client Establishing Connection

image

Ceph network function execution flow for requesting a connection establishment. Note that this illustration does not include a callback for read/write operations after establishing the connection.

Clients, who want to establish a connection to a listening server, can use AsyncMessenger::connect_to() function. Different from server implementation, it first creates an AsyncConnection class instance first, and calls AsyncConnection::connect() to establish connection with the given server address.

src/msg/async/AsyncMessenger.cc

ConnectionRef AsyncMessenger::connect_to(int type, const entity_addrvec_t& addrs, bool anon, bool not_local_dest) {
  ...
  auto av = _filter_addrs(addrs);
  return create_connect(av, type, anon);
}

AsyncConnectionRef AsyncMessenger::create_connect(const entity_addrvec_t& addrs, int type, bool anon) {
  ...
  Worker *w = stack->get_worker();
  auto conn = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue, w, target.is_msgr2(), false);
  conn->anon = anon;
  conn->connect(addrs, type, target);
  ...

  return conn;
}

Here, AsyncConnection::connect() calls EventCenter::dispatch_event_external(read_handler) to pass the job of connection establishment to a worker thread. A worker thread that is created via PosixNetworkStack::add_thread() and PosixNetworkStack::spawn_worker() is blocked in process_events() (in which it calls epoll_wait()). When it receives an event, it wakes up, handles it, and after that it handles externally disaptched events if exists.

src/msg/async/AsyncConnection.cc, src/msg/async/Event.cc

void AsyncConnection::connect(const entity_addrvec_t &addrs, int type, entity_addr_t &target) {
  ...
  _connect();
}

void AsyncConnection::_connect() {
  state = STATE_CONNECTING;
  center->dispatch_event_external(read_handler);
}

// Code from src/msg/async/Event.cc
// Executed by seperate threads
int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan *working_dur) {
  // Handle fired events
  std::vector<FiredFileEvent> fired_events;
  numevents = driver->event_wait(fired_events, &tv);
  for (int event_id = 0; event_id < numevents; event_id++) {
    ...
  }

  // If external events exist, handle them
  if (external_num_events.load()) {
    std::dequeue<EventCallbackRef> cur_process;
    cur_process.swap(external_events);
    ...
    while (!cur_process.empty()) {
      EventCallbackRef e = cur_process.front();
      e->do_request(0);
      cur_process.pop_front();
    }
  }
  ...
}

Here, EventCallbackRef e indicates read_handler in AsyncConnection, which is defined as follows. Calling read_handler->do_request() calls AsyncConnection::process().

src/msg/async/AsyncConnection.cc

class C_handle_read : public EventCallback {
  AsyncConnectionRef conn;

 public:
  explicit C_handle_read(AsyncConnectionREf c): conn(c) {}
  void do_request(uint64_t fd_or_id) override {
    conn->process();
  }
}

AsyncConnection::AsyncConnection(...) : ... {
  read_handler = new C_handle_read(this);
  ...
}

Note that in AsyncConnection::_connect(), we changed the state of the connection to STATE_CONNECTING. AsyncConnection::process() works as a finite state machine, and for STATE_CONNECTING case, it connects to the server. This kind of complex behavior is to pass the job for connection establishment to a worker thread in an asynchronous manner.

src/msg/async/AsyncConnection.cc

void AsyncConnection::process() {
  ...
  switch(state) {
    ...
    case STATE_CONNECTING: {
      ...
      worker->connect(target_addr, opts, &cs);
      center->create_file_event(cs.fd(), EVENT_READABLE, read_handler);
      state = STATE_CONNECTING_RE;
    }
    case STATE_CONNECTING_RE: {
      // At first, it is not connected yet so it should return 0.
      // If it requires writable events "during" connection, it sets the fd writable.
      ssize_t r = cs.is_connected();
      if (r == 0) {
        if (async_msgr->get_stack()->nonblock_connect_need_writable_event()) {
          center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
        }
        return;
      }

      // If it reaches here, it means the socket is connected.
      // Ceph sets the fd writable only if it is necessary, so it removes the writable handler here.
      center->delete_file_event(cs.fd(), EVENT_WRITABLE);
      state = STATE_CONNECTION_ESTABLISHED;
      break;
    }

  }
}

Interesting thing is, that Ceph does not set the socket fd always writable, but set writable only if it sends something. Seeing AsyncConnection::_try_send(), it dynamically modifies writable status of the socket fd whenever it sends a data.

src/msg/async/AsyncConnection.cc

ssize_t AsyncConnection::_try_send(bool more) {
  ...
  ssize_t r = cs.send(outgoing_bl, more);
  if (!open_write && is_queued()) {
    center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
    open_Write = true;
  }

  if (open_write && !is_queued()) {
    center->delete_file_Event(cs.fd(), EVENT_WRITABLE);
    open_write = false;
    if (WriteCallback) {
      center->dispatch_event_external(write_callback_handler);
    }
  }

  return outgoing_bl.length();
}

Now AsyncConnection has the state STATE_CONNECTION_ESTABLISHED, and the client can communicate with the peer process.

comments powered by Disqus