It’s all in the Background

In contrast to MySQL—which is a multi-thread database system—PostgreSQL is a multi-process database system. MySQL is a typical multi-threaded system: there is a single process running and threads are spawned to handle connecting clients or handle other sub-tasks. In multi-process systems you instead have a process tree with several processes that interact to share the work, but only a single thread for each process. Multi-process and multi-thread systems both have advantages and disadvantages, but here are a few that are relevant to a database system.

Managing connections. Each time a client connects to the database, a new process or thread is created that will handle the connection and process requests. Since threads are significantly easier to create than processes, performance will be better for a multi-thread system if your connections are short-lived. In contrast, creating a new process is expensive and leads to worse performance when connections are short-lived. Short-lived connections are typical for web applications where the web servers create short-lived connections for fetching data to render a page.

Access to shared data. A multi-thread system allows you to easily share data between threads, but this also requires you to ensure that two threads are not accessing the same data using mutexes and condition variables, which in general makes the code more complicated and can lead to subtle bugs. In contrast, multi-process systems are usually straightforward to write and there are less caveats about how you handle memory within a process. Multi-processing systems have, because of this, the disadvantage that sharing data is more complicated and require an extra effort to implement.

Libraries. Libraries has to be written with multi-threading in mind to be possible to used for a multi-thread system. You have to consider such things as controlling access to all variables with external or internal linkage and make sure that functions are re-entrant. In contrast, multi-process systems can work with almost any library, even with libraries that are not designed with concurrent execution in mind.

Managing catastrophic failures. All software contains bugs, and database systems are no exception. If there is a bug and one thread crashes, it brings the entire process down. Multi-process systems, on the other hand, can avoid bringing the rest of the system down or do it in a controlled manner.

These are only a few of the considerations for multi-threaded and multi-process systems and by no means covering all aspects. Since PostgreSQL is a multi-process system, the focus will be on PostgreSQL and multi-process systems. One of the disadvantages of PostgreSQL is that it contains design decisions that make it hard to use multi-threading in an extension. I might cover this in a future post since it is quite interesting to understand how a good strategy for using multi-threading in PostgreSQL would look like.

This post assume that you’re familiar with C programming and also familiar with programming for Linux. In particular, you need to know about processes, signals, and sockets in Linux: what they are, how they work, and what they are used for. A good coverage of signals and processes is Linux Processes and Signals and The Linux Signals Handling Model. If you’re interested in material on socket programming you can take a look at Socket programming in C on Linux.

High-Level Architecture of PostgreSQL

The high-level architecture of a running PostgreSQL server is very simple: it consists of a shared memory region and a bunch of processes that access shared memory. The PostgreSQL processes falls into one of four categories:

  • The postmaster is the central hub of the database and handles requests to, for example, startup and shutdown. It is also responsible for spawning new processes to handle connecting clients or for doing other work as well and reaping terminating processes. It is the parent process of all the other processes.
  • The client process (or just client) is the process that connects to the database to perform some work. You can create an interactive client using psql and send explicit queries to the database, but you can also create your own clients using the libpq library. Note that a client does not have access to the shared memory of the system and does not necessarily have to be on the same physical machine as the other database processes. It communicates with the PostgreSQL system over the network, which means that security is a critical part of the client-server interaction.
  • The backend process (or just backend) is spawned when a client connects to the PostgreSQL server. The backend will handle all requests arriving to the database from that client, process them, and send back a response to the client. Note that the backend process will be on the same machine as a database server, hence have access to the shared memory mentioned above.
  • The background worker processes (or just background workers) are independent processes that handles different tasks for the database. Note that in contrast to the backend processes, the background workers have no connected terminal and act independently. You can write your own background workers—which is what you are going to do later in this post—but there are a few background workers already built into PostgreSQL. They are not covered in detail here, but it is good to understand that they exist and roughly what they do.
    • The checkpointer and background writer (or just writer) is responsible for writing dirty shared buffers to disk.
    • The autovacuum launcher is responsible for spawning autovacuum workers that will vacuum tables and remove data marked as deleted from the tables.
    • The WAL writer writes the write-ahead log (WAL) to disk regularly. The WAL is necessary to ensure that the database can recover on a crash.
    • The stats collector processes statistics messages and collect statistics on the tables in the database. This is something that the query planner can use to optimize queries, but they are also available though a set of views. You can find more information on the statistics collector in the The Statistics Collector in the PostgreSQL manual.

This post focuses on building a background worker that receives metrics over a socket and writes them to the database. As protocol for the metrics, the InfluxDB Line Protocol will be used, which is a very simple protocol and also supports sending metrics over UDP. The protocol consists of lines like this, consisting of a metric (or measurement) name, a list of zero or more tags, a list of one or more fields, and a timestamp.

cpu,cpu=cpu0,host=fury usage_idle=95.9,usage_system=2.0,usage_user=2.0 1574753954000000000

For the implementation, the metric name will be used as the table name and the tags and fields will be stored in the table either in JSON format or as separate columns. To avoid conflicts with any other tables, a dedicated schema—also called namespace in PostgreSQL—is provided where tables are defined. Parsing the lines and inserting the data into the tables is the topic of the following posts. For this post, it is sufficient to read the packets and log them just to see how to set up a background worker. As in the previous post, you will find the code in the pg_influx repository.

Spawning a Background Worker

Background workers can be spawned dynamically or statically but regardless of how they are spawned, it is the postmaster that does it. This means that any new background workers spawned have the postmaster as parent and that when spawning a new background worker, the postmaster is asked to do this. A good starting point is a function that can spawn background workers dynamically. In this case, it is written as a normal PostgreSQL function so that it can be called from a client. This simplifies testing significantly since you can spawn a worker in a test script and then check that it is processed.

CREATE FUNCTION worker_launch(ns regnamespace, service text) RETURNS integer
LANGUAGE C AS 'MODULE_PATHNAME';

This function accept a namespace OID—that is, a schema name or OID—and a service name. A service name here can be either a port number in text format, or a service name that will be looked up in /etc/services. It will return the PID of the background worker as an integer so that you can kill it using pg_terminate_backend later in the program.

Datum worker_launch(PG_FUNCTION_ARGS) {
  Oid nspid = PG_GETARG_OID(0);
  char *service = text_to_cstring(PG_GETARG_TEXT_P(1));
  BackgroundWorker worker;
  BackgroundWorkerHandle *handle;
  BgwHandleStatus status;
  pid_t pid;
  WorkerArgs args;

  /* Check that we have a valid namespace id */
  if (get_namespace_name(nspid) == NULL)
    ereport(ERROR, (errcode(ERRCODE_UNDEFINED_SCHEMA),
                    errmsg("schema with OID %d does not exist", nspid)));

    .
    .
    .
  PG_RETURN_INT32(pid);
}

You might have noticed that the worker launch function is using snake_case while the other functions are using camel-case. The reason for this is because the convention for PostgreSQL code is camel-case, so we stick to this for the usual functions. However, the convention for PostgreSQL functions is snake-case, and the default name used when using CREATE FUNCTION is to use the same name for the C function. Instead of overriding this when defining the function, we keep the default function name for the PostgreSQL functions.

Passing Arguments

To pass down parameters to the background worker process you can use the two fields bgw_main_arg and bgw_extra. The bgw_main_arg field can pass down a Datum to the background worker which is sufficient for most uses, but if you need to pass down more arguments, you can use the field bgw_extra. In contrast to bgw_main_arg, the bgw_extra is a string buffer of size BGW_EXTRALEN and you can either pass down some text, or define a structure and copy it into the array. In this case, it is easiest to define a new structure WorkerArgs to contain the arguments and pass it to the worker process by copying the bytes into the bgw_extra.

typedef struct WorkerArgs {
  Oid namespace_id;
  char service[NI_MAXSERV];
} WorkerArgs;

static char c1[BGW_EXTRALEN - sizeof(WorkerArgs)] pg_attribute_unused();

Since the bgw_extra field has a fixed size, it is prudent to add a compile-time check that the structure is not too large, which is what you see on line 6. This will cause the compile to fail if the size of the structure is larger than BGW_EXTRALEN since you cannot define an array with a negative size. The database id of the current database is passed in as bgw_main_arg argument, which is something that will be used in the following posts when inserting data into the database.

It is quite common to add compile-time checks, if possible, rather than run-time checks. There are mainly two advantages to this: the compiler does not need to generate code for this check at runtime, which would be a waste of CPU cycles, and you would catch this error when building the system rather than after you have deployed the software.

BackgroundWorker worker;
WorkerArgs args;
    .
    .
    .
/* Set up arguments to worker */
memset(&args, 0, sizeof(args));
strncpy(args.service, service, sizeof(args.service));
args.namespace_id = nspid;

memset(worker, 0, sizeof(worker));
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "influx");
sprintf(worker.bgw_function_name, "WorkerMain");
snprintf(worker.bgw_name, BGW_MAXLEN, "Influx listener for schema %s",
         get_namespace_name(args.namespace_id));
snprintf(worker.bgw_type, BGW_MAXLEN, "Influx line protocol listener");
worker.bgw_main_arg = MyDatabaseId;
memcpy(worker.bgw_extra, &args, sizeof(args));

When creating a new backend, there are a few fields that you need to fill in.

  • The bgw_start_time decide when the postmaster should start the background worker. In this case the background worker should start then the database is recovered.
  • The bgw_restart_time field decides how many seconds should elapse before the background worker is restarted, if it is shut down. In this case, the background worker is not restarted.
  • The bgw_name and bgw_type are used to provide some useful information about the background worker. The bgw_type field is typically the same for all workers of the same kind, while the bgw_name provides some useful information about the specific process. In this case, the schema name is added to the type name.
  • The bgw_library_name is the name of the shared library where the main function is defined and bgw_function_name is the name of the function to call.

Register the background Worker

Next step is to spawn the background worker by registering it with the postmaster using the RegisterDynamicBackgroundWorker. In order to be able to wait for the background worker to start, it is necessary to set the waiting PID in the bgw_notify_pid field of the background worker structure, in this case the PID of the calling process, which is available in MyProcPid.

BackgroundWorkerHandle *handle;
BgwHandleStatus status;
    .
    .
    .
worker.bgw_notify_pid = MyProcPid;

if (!RegisterDynamicBackgroundWorker(&worker, &handle))
  PG_RETURN_NULL();

status = WaitForBackgroundWorkerStartup(handle, &pid);

A Quick Peek at Latches

Before delving into writing the background worker, it is useful to take brief look at how PostgreSQL handles synchronization between processes.

For database systems, you separate between two types of methods for synchronization: locks and latches. Locks are used to ensure the logical consistency of the data in the database, while latches are used to ensure the integrity of data structures internally.

  • Use a latch to ensure that critical internal variables are not accessed by multiple threads or processes at the same time. This will make sure the critical region is accessed in a serial manner. Latches are usually short-lived.
  • Use a lock to protect changes on a table. The changes might still be visible to users of the table, but the lock will prevent changes. Locks are long-lasting, typically last for a full transaction.

Unfortunately, what in database jargon is usually refereed to as latches are called locks in many programming context (see for example C++ std::lock) so there is potential for some confusion. In this post, we will stick to using the term latch.

There are three operations that are of interest in this post: setting a latch, resetting a latch, and waiting on a latch.

void SetLatch(Latch *latch);
void ResetLatch(Latch *latch);
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 info);
WL_LATCH_SETWait for the latch to be set
WL_POSTMASTER_DEATHWait for Postmaster to die
WL_SOCKET_READABLEWait for socket to have readable data
Table 1. WaitLatch Events

Setting a latch means waking up any process that waits for it and is done by calling SetLatch. This can either be done from another process, or from a signal handler in the same process. After a latch is set, it will remain set until it is explicitly reset using ResetLatch. Note that setting the latch just signals that something has happened, but not what has happened. To pass information about what has happened, you need to use either global variables or shared memory depending on whether the latch as set in a signal handler or a different process.

To wait for a latch, there are several useful functions, but in the code below you will use WaitLatchOrSocket. As you might have seen above, the WaitLatchOrSocket function accepts a wakeEvents parameter and an info parameter. The info parameter is use to indicate what is being waited for. For an extension, you should use PG_WAIT_EXTENSION, but there are other wait events for the built-in processes. The wakeEvents parameter is a bitmap of events to wait for and allow you to wait not only for the latch to be set, but also other events that you would have to take care of. One event that is particularly important if you’re writing a background worker is that you need to terminate the background worker if the postmaster is shutting down. The three that we care about here are the ones in Table 1. For example, to wait for a latch to be set or the postmaster to die, you would use

wait_result = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, PG_WAIT_EXTENSION);

All PostgreSQL processes has one latch automatically set up: MyLatch. This latch is used in signal handlers to signal that the child need to process some event. (The latch is a local latch and cannot be used to signal between processes.) This allow the main thread of the process to wait for some event, but also handle the reception of signals. For example, the signal handlers to handle SIGTERM and SIGHUP can look like this:

static volatile sig_atomic_t ReloadConfig = false;
static volatile sig_atomic_t ShutdownWorker = false;

static void WorkerSigterm(SIGNAL_ARGS) {
  int save_errno = errno;
  ShutdownWorker = true;
  SetLatch(MyLatch);
  errno = save_errno;
}

static void WorkerSighup(SIGNAL_ARGS) {
  int save_errno = errno;
  ReloadConfig = true;
  SetLatch(MyLatch);
  errno = save_errno;
}
Figure 1. Race condition between SetLatch and ResetLatch

Here the signal handlers set a global variable and reset the latch to signal the main process that some action needs to be taken. The waiting process can now check these global variables when the latch is set and handle it accordingly.

When using global variables this way it is important to call the ResetLatch and SetLatch in the right order:

  • In the signal handler, you need to set the condition before calling SetLatch.
  • In the process, you need to call ResetLatch before checking the condition and waiting.

If you reset the latch after checking the condition, you have a race condition as in Figure 1 and as a result the WaitLatch will block even though it should start to shut down. This do not only apply when you have global variables and signal handlers: it applies in any situation where you want to trigger a process to act on some condition.

Writing the Background Worker

After that brief digression, you can now start writing the background worker function.

Writing the background worker function is straightforward: it accepts a single argument of type Datum—which is set to whatever was stored in bgw_main_arg—and the entire worker structure set up by the spawning process is available in the global variable MyBgworkerEntry. This includes bgw_extra so you can extract the parameters from that field.

void WorkerMain(Datum arg) {
  const Oid database_id = DatumGetInt32(arg);
  int sfd;
  char buffer[1500];
  WorkerArgs *args = (WorkerArgs *)&MyBgworkerEntry->bgw_extra;
    .
    .
    . 
  proc_exit(0);
}

Initializing the Background Worker

The background worker is started with the signals blocked and you need to set up signal handlers before unblocking the signals. In this case you can register signal handlers for two signals SIGTERM and SIGHUP. On SIGTERM, the background worker should be shut down, and on SIGHUP the background worker should reload the configuration file. Each signal handler will temporarily save away the errno and restore it at the end of the handler to make sure that it does not interfere with any system calls that the main process is doing at the time of receiving the signal.

void WorkerMain(Datum arg) {
    .
    .
    .
  pqsignal(SIGTERM, WorkerSigterm);
  pqsignal(SIGHUP, WorkerSighup);
  BackgroundWorkerUnblockSignals();
    .
    .
    .
}

After the signal handlers are set and the signals unblocked, you can initialize the background worker by setting up the port to receive packets on. The CreateSocket function that is used to create the socket is not part of PostgreSQL, but you can see the full code last in the post with a brief explanation of how it works.

void WorkerMain(Datum arg) {
    .
    .
    .
  sfd = CreateSocket(NULL, args->service, &UdpRecvSocket, NULL, 0);
  if (sfd == -1) {
    ereport(LOG, (errcode_for_socket_access(),
                  errmsg("could not create socket for service '%s': %m",
                         args->service)));
    proc_exit(1);
  }
    .
    .
    .
}

One function worth noting here is the errcode_for_socket_access, which is called to translate a Linux error code stored in errno to a PostgreSQL error code that can be used with ereport. The actual error message corresponding to the errno value can be printed in the error message using the %m modifier in the format string of ereport. Also note that you should call proc_exit to exit from the background worker and not exit(2). The reason for this is that PostgreSQL will do additional cleanup actions before calling exit(2).

Reading the Packets

After the worker is initialized the main loop to read packets from the socket is added. It consists of two nested loops: an inner loop that read packets without a timeout and an outer one that waits for more data to arrive or the socket or the thread being signaled to shut down.

while (true) {
  int wait_result;

  ResetLatch(MyLatch);
  if (ShutdownWorker)
    break;

  while (!ShutdownWorker) {
    int bytes;

    if (ReloadConfig) {
      ReloadConfig = false;
      ProcessConfigFile(PGC_SIGHUP);
      elog(LOG, "configuration file reloaded");
    }

    bytes = recv(sfd, &buffer, sizeof(buffer), 0);
    if (bytes < 0) {
      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
        break;
      ereport(ERROR, (errcode_for_socket_access(),
                      errmsg("could not read lines: %m")));
    }

    ProcessPacket(buffer, bytes, args->namespace_id);
  }

  wait_result = WaitLatchOrSocket(
      MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE, sfd,
      -1L, PG_WAIT_EXTENSION);
  if (wait_result & WL_POSTMASTER_DEATH)
    break; /* Abort the worker */
}

This idea is from the statistics collector code in PostgreSQL and the intention is to improve performance and avoid calling ResetLatch and WaitLatchOrSocket for each packet received. This can be quite expensive since it involves additional system calls and system calls are in general very expensive. The drawback is that if a signal is sent to the process and it is very busy processing packets, the shutdown will not be noticed until the traffic slows down.

On receiving a SIGHUP or calling pg_reload_conf the postmaster will signal all background workers to reload the configuration file by forwarding (or sending) a SIGHUP. You already added a signal handler earlier which set the ReloadConfig variable, so here you only need to check if you should reload the configuration. the function ProcessConfigFile is used to re-read the configuration file. The parameter passed here is the context in which the function is called–or why it was called—and can be either at postmaster startup or on the reception of a SIGHUP. Since the reload is because of a SIGHUP, the value PGC_SIGHUP is passed to the function.

For the ProcessPacket function, the received packet is passed and it will just be printed in the log for now so that you can see that it works. It will be extended with code to process the packet in the following posts.

static void ProcessPacket(char *buffer, size_t bytes, Oid nspid) {
  buffer[bytes] = '\0';
  ereport(LOG, (errmsg("received %ld bytes", bytes),
                errdetail("packet: %s", buffer)));
}

You can now compile the code, install it, and see what happens if you try to send data to the socket.

$ make && sudo env PATH=$PATH make install
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -g -O2 -fPIC -I. -I./ -I/usr/local/pg/13.5/include/postgresql/server -I/usr/local/pg/13.5/include/postgresql/internal  -D_GNU_SOURCE   -c -o influx.o influx.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -g -O2 -fPIC -I. -I./ -I/usr/local/pg/13.5/include/postgresql/server -I/usr/local/pg/13.5/include/postgresql/internal  -D_GNU_SOURCE   -c -o worker.o worker.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -g -O2 -fPIC -I. -I./ -I/usr/local/pg/13.5/include/postgresql/server -I/usr/local/pg/13.5/include/postgresql/internal  -D_GNU_SOURCE   -c -o network.o network.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -g -O2 -fPIC -shared -o influx.so influx.o worker.o network.o -L/usr/local/pg/13.5/lib    -Wl,--as-needed -Wl,-rpath,'/usr/local/pg/13.5/lib',--enable-new-dtags  
/usr/bin/mkdir -p '/usr/local/pg/13.5/lib/postgresql'
/usr/bin/mkdir -p '/usr/local/pg/13.5/share/postgresql/extension'
/usr/bin/mkdir -p '/usr/local/pg/13.5/share/postgresql/extension'
/usr/bin/install -c -m 755  influx.so '/usr/local/pg/13.5/lib/postgresql/influx.so'
/usr/bin/install -c -m 644 .//influx.control '/usr/local/pg/13.5/share/postgresql/extension/'
/usr/bin/install -c -m 644 .//influx--0.1.sql  '/usr/local/pg/13.5/share/postgresql/extension/'

$ psql
psql (13.5)
Type "help" for help.

mats=# SELECT worker_launch('db_worker', 4713::text);                     
NOTICE:  background worker started
DETAIL:  pid=222194
 worker_launch 
---------------
        222194
(1 row)

mats=# \q

$ echo "cpu,cpu=cpu0,host=fury usage_idle=95.9,usage_system=2.0,usage_user=2.0 1574753954000000000" | nc -Nu -w1 localhost 4713
$ tail -20 logfile
2022-02-20 21:18:42.847 CET [222183] LOG:  database system is ready to accept connections
2022-02-20 21:18:57.124 CET [222194] LOG:  worker initialized
2022-02-20 21:18:57.124 CET [222194] DETAIL:  service=4713, database_id=16384, namespace_id=115528
2022-02-20 21:21:53.836 CET [222194] LOG:  received 91 bytes
2022-02-20 21:21:53.836 CET [222194] DETAIL:  packet: cpu,cpu=cpu0,host=fury usage_idle=95.9,usage_system=2.0,usage_user=2.0 1574753954000000000

In the next posts, we’ll start to do something interesting with the packets rather than just logging them.

Appendix. Socket Creation

In the code above, the function CreateSocket was used to listen on a port to receive packets over the network. Creating sockets for listening or sending data over the network is quite straightforward:

  1. Find the list of candidates given the domain address and service name.
  2. Go over the list of candidates and see which one that can be used:
    • Create the socket.
    • If you want to listen on the address, call bind(2) to see if it can be used.
    • If you want to send to the address, call connect(2) to see if it can be used.
  3. Set any additional options you want to use.

To avoid repeating the same code all over the place, we define the CreateSocket function to handle this relying on a the SocketMethod structure to allow configuring the setup of the socket.

struct SocketMethod {
  int (*setup)(int sockfd, const struct sockaddr* addr, socklen_t addrlen);
  int (*config)(int sockfd, const struct sockaddr* addr, socklen_t addrlen);
  const char* name;
  int socktype;
  int flags;
};

extern struct SocketMethod UdpRecvSocket;
extern struct SocketMethod UdpSendSocket;

The fields of SocketMethod are:

  • The setup field is either connect(2) or bind(2) depending on whether you want to create a listening or sending socket.
  • The config field is a function that can do post-setup configuration once the socket is set up. It can be NULL, in which case nothing is done after the socket is set up.
  • The name field is just the name of the method and is used in log printouts.
  • The socktype field is the socket type—typically SOCK_DGRAM or SOCK_STREAM, but this is not required and it can be any type listed in socket(2).
  • The flags field is just additional flags to pass as hints to the address lookup function.

With a structure like this, a user can create her own structures as long as the protocol is honored. Here two pre-defined variables are used to create a listening UDP socket that does not block if there is no data available and a UDP sending socket.

static int SetNonblocking(int fd, const struct sockaddr* addr,
                          socklen_t addrlen) {
  if (!pg_set_noblock(fd))
    ereport(LOG, (errcode_for_socket_access(),
                  errmsg("could not set socket to nonblocking mode: %m")));
  return -1;
}

struct SocketMethod UdpRecvSocket = {
    .setup = bind,
    .config = SetNonblocking,
    .name = "bind",
    .socktype = SOCK_DGRAM,
    .flags = AI_PASSIVE,
};

struct SocketMethod UdpSendSocket = {
    .setup = connect,
    .name = "connect",
    .socktype = SOCK_DGRAM,
    .flags = 0,
};

The function to actually use this structure to create a socket for either sending or receiving data is straightforward.

Lines 7–21 use the function pg_getaddrinfo_all to get a list of candidate addresses to use given a domain address and a service name. The function pg_getaddrinfo_all is the PostgreSQL version of getaddrinfo(2). If a port number is passed in (as a string) as a service name, it will translate to that port number rather than trying to look up the service in the service database, which works well with the functions defined above since they always use a string.

Lines 23–39 iterate over all the candidate addresses, create a socket of the right socket type, and try to either bind or connect to it. If that fails, the socket is closed and the next one in the list is tried. If there are no more candidates to try, the code just exits with an error message.

Lines 43–50 execute post-creating configuration (if any is defined) or just return the both the file descriptor to the newly constructed socket as well as address information about the address that was actually used.

int CreateSocket(const char* hostname, const char* service,
                 const struct SocketMethod* method, struct sockaddr* paddr,
                 socklen_t addrlen) {
  int err, fd = -1;
  struct addrinfo hints, *addrs, *addr;

  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
  hints.ai_flags = method->flags;
  hints.ai_socktype = method->socktype;
  hints.ai_protocol = 0; /* Any protocol */
  hints.ai_canonname = NULL;
  hints.ai_addr = NULL;
  hints.ai_next = NULL;

  err = pg_getaddrinfo_all(hostname, service, &hints, &addrs);
  if (err) {
    ereport(LOG, (errmsg("could not resolve %s: %s\n", hostname,
                         gai_strerror(err))));
    return err;
  }

  for (addr = addrs; addr; addr = addr->ai_next) {
    fd = socket(addr->ai_family, method->socktype, addr->ai_protocol);
    if (fd == -1)
      continue;

    if (method->setup == NULL ||
        (*method->setup)(fd, addr->ai_addr, addr->ai_addrlen) != -1)
      break;

    close(fd);
  }

  if (!addr) {
    ereport(LOG, (errcode_for_socket_access(),
                  errmsg("could not %s to any address: %m", method->name)));
    return -1;
  }

  pg_freeaddrinfo_all(hints.ai_family, addrs);

  if (method->config == NULL ||
      (*method->config)(fd, addr->ai_addr, addr->ai_addrlen)) {
    if (paddr) {
      Assert(addr->ai_addrlen <= addrlen);
      memcpy(paddr, addr->ai_addr, addr->ai_addrlen);
    }
    return fd;
  }
  return -1;
}

The function was used above when creating a listening UDP socket, but it is easy to create a PostgreSQL function to send packets over UDP as well. The definition of the function allow the hostname to be omitted, but require a data packet and a service name or port number.

CREATE PROCEDURE send_packet(packet text, service text, hostname text = 'localhost')
LANGUAGE C AS 'MODULE_PATHNAME';

Using the CreateSocket function above, the packet sending function can be trivially implemented as follows. Worth noting here is that we use struct sockaddr_storage to make sure that we have a socket address structure big enough to hold anything that can be returned. It is a common error to use a smaller structure (for example struct sockaddr) but this structure is not big enough to hold, for example, an instance of struct sockaddr_in6, which is too large and will lead to a buffer overflow.

PG_FUNCTION_INFO_V1(send_packet);
Datum send_packet(PG_FUNCTION_ARGS) {
  struct sockaddr_storage serveraddr;
  const char* hostname = text_to_cstring(PG_GETARG_TEXT_P(2));
  const char* service = text_to_cstring(PG_GETARG_TEXT_P(1));
  const char* packet = text_to_cstring(PG_GETARG_TEXT_PP(0));
  const int sockfd =
      CreateSocket(hostname, service, &UdpSendSocket,
                   (struct sockaddr*)&serveraddr, sizeof(serveraddr));
  /* send the message to the server */
  const int count = sendto(sockfd, packet, strlen(packet), 0,
                           (struct sockaddr*)&serveraddr, sizeof(serveraddr));
  if (count < 0)
    ereport(ERROR,
            (errcode_for_socket_access(), errmsg("failed to send packet: %m")));
  close(sockfd);
  PG_RETURN_NULL();
}

Mats

dbmsdrops.kindahl.net

Long time developer with a keen interest in databases, distributed systems, and programming languages. Currently working as Database Architect at Timescale.

Comments

Leave a Reply

Your email address will not be published.