Network-packets buffering in kernel qdiscs module

671 views Asked by At

I want to buffer output packets originating from a container's network interface. This netlink library file named sch_plug.c https://code.woboq.org/linux/linux/net/sched/sch_plug.c.html looks like it can solve the problem but i'm finding it hard to use it? How should i call these functions to actually make it work? How to get hold of the parameters like struct netlink_ext_ack *extack, struct sk_buff *skb, etc passed to those functons defined in the source code?

1

There are 1 answers

17
f9c69e9781fa194211448473495534 On BEST ANSWER

Command line

The qdiscs can be controlled with the commands nl-qdisc-add, nl-qdisc-delete, nl-qdisc-list (part of libnl). The --help flag can be used to show some usage example (link):

  • Create the plug qdisc with a buffer of size 32KB for the network interface eth0:

    # nl-qdisc-add --dev=eth0 --parent=root plug --limit=32768
    
  • By default, the plug qdisc will be in buffered mode (meaning it holds back all outgoing traffic). You can switch between buffered and release mode with the following commands:

    • Switch to release mode:

      # nl-qdisc-add --dev=eth0 --parent=root --update plug --release-indefinite
      
    • Switch back to buffered mode:

      # nl-qdisc-add --dev=eth0 --parent=root --update plug --buffer
      
  • You can inspect the active qdiscs with:

      # nl-qdisc-list --kind=plug --details --stats
    

    This will also tell you the id of each qdisc.

  • Based on the id, you can remove a qdisc again:

     # nl-qdisc-delete --id <id>
    

From code

The code of the tools used above can be inspected to write a custom implementation (link):

#include <linux/netlink.h>

#include <netlink/netlink.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/qdisc/plug.h>
#include <netlink/socket.h>

#include <atomic>
#include <csignal>
#include <iostream>
#include <stdexcept>

/**
 * Netlink route socket.
 */
struct Socket {
  Socket() : handle{nl_socket_alloc()} {

    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate socket!"};
    }

    if (int err = nl_connect(handle, NETLINK_ROUTE); err < 0) {
      throw std::runtime_error{"Unable to connect netlink socket: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket(const Socket &) = delete;
  Socket &operator=(const Socket &) = delete;
  Socket(Socket &&) = delete;
  Socket &operator=(Socket &&) = delete;

  ~Socket() { nl_socket_free(handle); }

  struct nl_sock *handle;
};

/**
 * Read all links from netlink socket.
 */
struct LinkCache {
  explicit LinkCache(Socket *socket) : handle{nullptr} {
    if (int err = rtnl_link_alloc_cache(socket->handle, AF_UNSPEC, &handle);
        err < 0) {
      throw std::runtime_error{"Unable to allocate link cache: " +
                               std::string{nl_geterror(err)}};
    }
  }

  LinkCache(const LinkCache &) = delete;
  LinkCache &operator=(const LinkCache &) = delete;
  LinkCache(LinkCache &&) = delete;
  LinkCache &operator=(LinkCache &&) = delete;

  ~LinkCache() { nl_cache_free(handle); }

  struct nl_cache *handle;
};

/**
 * Link (such as "eth0" or "wlan0").
 */
struct Link {
  Link(LinkCache *link_cache, const std::string &iface)
      : handle{rtnl_link_get_by_name(link_cache->handle, iface.c_str())} {

    if (handle == nullptr) {
      throw std::runtime_error{"Link does not exist:" + iface};
    }
  }

  Link(const Link &) = delete;
  Link &operator=(const Link &) = delete;
  Link(Link &&) = delete;
  Link &operator=(Link &&) = delete;

  ~Link() { rtnl_link_put(handle); }

  struct rtnl_link *handle;
};

/**
 * Queuing discipline.
 */
struct QDisc {
  QDisc(const std::string &iface, const std::string &kind)
      : handle{rtnl_qdisc_alloc()} {
    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate qdisc!"};
    }

    struct rtnl_tc *tc = TC_CAST(handle);

    // Set link
    LinkCache link_cache{&socket};
    Link link{&link_cache, iface};
    rtnl_tc_set_link(tc, link.handle);

    // Set parent qdisc
    uint32_t parent = 0;

    if (int err = rtnl_tc_str2handle("root", &parent); err < 0) {
      throw std::runtime_error{"Unable to parse handle: " +
                               std::string{nl_geterror(err)}};
    }

    rtnl_tc_set_parent(tc, parent);

    // Set kind (e.g. "plug")
    if (int err = rtnl_tc_set_kind(tc, kind.c_str()); err < 0) {
      throw std::runtime_error{"Unable to set kind: " +
                               std::string{nl_geterror(err)}};
    }
  }

  QDisc(const QDisc &) = delete;
  QDisc &operator=(const QDisc &) = delete;
  QDisc(QDisc &&) = delete;
  QDisc &operator=(QDisc &&) = delete;

  ~QDisc() {
    if (int err = rtnl_qdisc_delete(socket.handle, handle); err < 0) {
      std::cerr << "Unable to delete qdisc: " << nl_geterror(err) << std::endl;
    }

    rtnl_qdisc_put(handle);
  }

  void send_msg() {
    int flags = NLM_F_CREATE;

    if (int err = rtnl_qdisc_add(socket.handle, handle, flags); err < 0) {
      throw std::runtime_error{"Unable to add qdisc: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket socket;
  struct rtnl_qdisc *handle;
};

/**
 * Queuing discipline for plugging traffic.
 */
class Plug {
public:
  Plug(const std::string &iface, uint32_t limit, bool enabled)
      : qdisc_{iface, "plug"}, enabled_{enabled} {

    rtnl_qdisc_plug_set_limit(qdisc_.handle, limit);
    qdisc_.send_msg();

    set_enabled(enabled_);
  }

  void set_enabled(bool enabled) {
    if (enabled) {
      rtnl_qdisc_plug_buffer(qdisc_.handle);
    } else {
      rtnl_qdisc_plug_release_indefinite(qdisc_.handle);
    }

    qdisc_.send_msg();
    enabled_ = enabled;
  }

  bool is_enabled() const { return enabled_; }

private:
  QDisc qdisc_;

  bool enabled_;
};

std::atomic<bool> quit{false};

void exit_handler(int /*signal*/) { quit = true; }

int main() {
  std::string iface{"eth0"};
  constexpr uint32_t buffer_size = 32768;
  bool enabled = true;

  Plug plug{iface, buffer_size, enabled};

  /**
   * Set custom exit handler to ensure destructor runs to delete qdisc.
   */
  struct sigaction sa {};
  sa.sa_handler = exit_handler;
  sigfillset(&sa.sa_mask);
  sigaction(SIGINT, &sa, nullptr);

  while (!quit) {
    std::cout << "Plug set to " << plug.is_enabled() << std::endl;
    std::cout << "Press <Enter> to continue.";
    std::cin.get();

    plug.set_enabled(!plug.is_enabled());
  }

  return EXIT_SUCCESS;
}

Set the network interface you want to use in the main function (e.g. eth0 or wlan0). The program can then be used with:

# g++ -std=c++17 -Wall -Wextra -pedantic netbuf.cpp $( pkg-config --cflags --libs libnl-3.0 libnl-route-3.0 )
# ./a.out 
Plug set to 1
Press <Enter> to continue.
Plug set to 0
Press <Enter> to continue.
Plug set to 1
Press <Enter> to continue.

(Exit with Ctrl+c.)