Processing packets in parallel through port reuse

The implementation of the Influx reader this far as been pretty straightforward: a single process that reads data from a single port and insert it into the database. Databases are, however, designed to be able to handle multiple inserts at the same time, so what is preventing us from using multiple processes to ingest data? Actually nothing, but to make this a little more manageable and practical, it is good to add a few more tweaks to the current implementation. As always, the source code is available in the repository timescale/pg_influx on GitHub.

The current implementation allow you to spawn multiple workers using worker_launch, which takes a schema name and a service name or port number. An obvious question is then what happens if you call the function multiple times with the same port number? The answer is that it won’t start the second process because the first process binds to the port and that does not allow another process to bind to the same port.

The conventional method for handling this situation is to have a single listener process that either accepts connections (for TCP connections) or reads packets from the port (for UDP connections) and then distribute the connections or the packets to other processes that does the actual processing. Unfortunately, this approach does not scale too well since at high load the listening process becomes a bottleneck.

Now, looking in the manuals, we see that all receive operations are atomic, so it should not be a problem to have multiple processes listening on the same port. Right?

All receive operations return only one packet. When the packet is smaller than the passed buffer, only that much data is returned; when it is bigger, the packet is truncated and the MSG_TRUNC flag is set.

manual page for udp(7)

To allow this, the notion of port reuse was introduced. This is mostly used with TCP connections but work just as well with UDP connections and you can enable it for a socket by setting the SO_REUSEPORT option. (There is another option with the name SO_REUSEADDR that serves a similar purpose, but the history behind these two options is complicated. Mecki has an excellent answer with more background to this at StackOverflow.)

To make this work, we simply replace the bind(2) function in our SocketMethod for the UDP socket with one that sets the SO_REUSEPORT option using setsockopt(2) and binds the port. Note that it is necessary to set the socket option before we actually bind the socket. (Full patch is available as a pull request at GitHub, if you want to have a look.)

static int SetupUdpRecvSocket(int fd, const struct sockaddr* addr,
                              socklen_t addrlen) {
  int optval = 1;
  if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) < 0) {
    ereport(LOG, (errmsg("%s(%s) failed: %m", "setsockopt", "SO_REUSEPORT")));
    return STATUS_ERROR;
  }

  return bind(fd, addr, addrlen);
}

struct SocketMethod UdpRecvSocket = {
    .setup = SetupUdpRecvSocket,
    .config = ConfigUdpRecvSocket,
    .name = "bind",
    .socktype = SOCK_DGRAM,
    .flags = AI_PASSIVE,
};

Looking at the socket(7) manual it says

For UDP sockets, the use of this option can provide better distribution of incoming datagrams to multiple processes (or threads) as compared to the traditional technique of having multiple processes compete to receive datagrams on the same socket.

So this should give us a good distribution of packets when we spawn multiple workers. However, a quick test using influx-faker and using bpftrace(8) seems to suggest that is not the case.

sudo bpftrace -e 'tracepoint:syscalls:sys_enter_recvfrom { @[pid] = count(); }'
Attaching 1 probe...
^C

@[352799]: 1
   .
   .
   .
@[256210]: 143
@[344351]: 433
@[352802]: 894577

It looks like all the traffic goes to the same process and that the other processes do not get any packets at all. Does this mean that using it is pointless? Actually not, but the answer requires looking through the implementation and history of SO_REUSEPORT. The background is that nothing says that it is required that packets should be distributed evenly to all connected processes—just that it is allowed—and for efficiency reasons the process to select for each packet is based on a hash of the 5-tuple for the connection consisting of the source address, source port, destination address, destination port, and transport protocol (see function lookup_reuseport in net/ipv4/udp.c of the Linux kernel). Since influx-faker binds a single source address when it starts, it will always get the same source port. The workaround to distribute packets evenly is to create a new socket for each packet, which will then use a new source port, and hence the hash will be different and will provide a good distribution of the packets:

mats@fury:~/proj/pg_influx$ sudo bpftrace -e 'tracepoint:syscalls:sys_enter_recvfrom { @[pid] = count(); }'
Attaching 1 probe...
^C
    .
    .
    .
@[427666]: 253643
@[427663]: 253939
@[427669]: 255294
@[427660]: 258104

Now, this looks pretty good, but starting worker process after we have started the server seems to be more complicated than necessary. Can’t we just start them automatically when we start the server? It turns out we can, which is the topic of the following posts.

Mats

dbmsdrops.kindahl.net

Long time developer with a keen interest in databases, distributed systems, and programming languages. Currently working as Database Architect at Timescale.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *