As discussed in the previous post, it would be nice to be able to automatically start background workers instead of having to do that manually each time you start the server, so it will be covered in this post. As always, the code is available in the timescale/pg_influx repository on GitHub. To automatically start background worker it is necessary to solve a few problems:
- To start the extension when starting the server, it is necessary to be able to load the extension when starting the server. PostgreSQL has support for allowing libraries to be preloaded, which includes extensions, and it will be covered below.
- Since the extension is preloaded, it is necessary to have some way to set configuration options for the number of workers, what ports to listen on, etc.
- When extensions are preloaded, they behave in a slightly different manner and we need to take that into consideration when starting the background workers.
We will actually cover these in opposite order, since it is more convenient from an implementation perspective.
When an extension is preloaded, the special function _PG_init
will be called and you will have a chance to do stuff that is necessary when starting the server. This function is called also when the extension is loaded dynamically, so you can use it to perform any other actions that you think is useful to do when the extension is loaded, but you need to be careful about what you do since the system behaves differently if you are preloading the extension or if the extension is dynamically loaded. (There was a function _PG_fini
as well, which in theory should be called when an extension is unloaded, but it was removed since shared libraries are not currently unloaded by PostgreSQL.) The function has to be present somewhere in the shared library, but it does not matter where. For convenience, we add this to the influx.c
file, which contain the other code to start background workers. The function usually looks something like this:
void _PG_init(void) {
/* things to do for preload and normal load */
if (!process_shared_preload_libraries_in_progress)
return;
/* things to do for preload only */
}
The variable process_shared_preload_libraries_in_progress
is used to see if the function is running as part of a preload. If set to true it indicates that _PG_init
was called as part of a preload. You can use this variable to avoid doing stuff that only makes sense when preloading. Typically, you want to do whatever you do for a normal load also when preloading, so the structure above is what you usually see in most extensions. For the InfluxDB listener, there are two things that this function do: define the configuration parameters and start the background workers.
PostgreSQL call the subsystem that handles options for the GUC (stands for Grant Unified Configuration) and you define the configuration parameters using DefineCustomIntVariable
, DefineCustomStringVariable
, DefineCustomBoolVariable
, etc. They have slightly different signatures, so only DefineCustomIntVariable
and DefineCustomStringVariable
will be covered here since these are the ones that will be used in the extension. Given that the ability to spawn multiple workers that were added in the previous post, a good first option to add as an example is an option for the number of workers to spawn at startup. Since this is an integer, you should add a definition using the DefineCustomIntVariable
function.
DefineCustomIntVariable("influx.workers", /* option name */
"Number of workers.", /* short descriptor */
"Number of workers to spawn when starting up"
"the server.", /* long description */
&InfluxWorkersCount, /* value address */
4, /* boot value */
0, /* min value */
20, /* max value */
PGC_SIGHUP, /* option context */
0, /* option flags */
NULL, /* check hook */
NULL, /* assign hook */
NULL); /* show hook */
Some of the arguments are the same for all functions that define custom configuration parameters.
The option name is the name of the option as used with the SET command and in the configuration file. There is nothing really requiring you to use any specific convention regarding naming of the option but it is customary to prefix each configuration variable with the extension name, so here influx.workers
is used as the name of the option.
The short description and long description is used to describe the option usage and provide useful information to the user. The short description is used when showing the option in the SHOW ALL display and the long description is printed when you use the --describe-config
option to postgres.
The value address is simply is the address of the variable where the value will be stored. The underlying type is different depending on the type of the option, but it is always the address of a variable. Normally, this variable is either static or global, but nothing prevents you from allocating the memory in other ways as long as the memory lives for the lifetime of the server.
The boot value is the value that the variable starts with. You can view this as the default value for the parameter.
The option context is present for all variable types as well and tells in what situations the option can be set. There are several different values, but PGC_SIGHUP
means that the variable can be set in the configuration file and will be re-read on a hangup signal (SIGHUP). Another value that is interesting is PGC_POSTMASTER
, which means that the variable can only be set in the configuration file (or through the command line) and will be read on startup. This means that a restart of the server is necessary to read a changed value of the option. One important thing to be aware of is that it is not possible to define custom options with the PGC_POSTMASTER
context when not preloading (that is, when process_shared_preload_libraries_in_progress
is false), which means that any such options have to go in the “things to do during preload”-section above.
Most of the custom option functions have means to limit the range of values, but it works differently for different types. For the number of workers, it is reasonable to allow between 0 and 20 workers (0 to be able to preload the system but not start any workers, and 20 was just arbitrarily picked.)
The last three parameters are hooks to allow customization of checking, showing, and assigning values to the option, but it is not covered here. It might be covered in a future post.
The other options defined are:
- The
influx.service
option is used to define the service or port that the extension is listening on. This will default to 8089, which is the port that InfluxDB uses for UDP traffic. - The
influx.database
is the name of the database to connect to. When usingworker_launch
the database is implicit since the function is called from a backend, but when spawning background workers during startup, the database name have to be given explicitly. - The
influx.schema
is the schema where the metric tables are located. Forworker_launch
the schema is given as a parameter, but here it is an option. - The
influx.role
is the role that the worker will connect as. In the previous post, no role was used and this means that the worker connect as a superuser. Since this is not entirely secure (like, not at all) it is good to add the role as an option.
These variables are straightforwardly set up in the second part of _PG_init
. Remember that since these variables can only be set at postmaster startup, they need to be after the check of process_shared_preload_libraries_in_progress
. As the last step of _PG_init
, we start the background workers using StartBackgroundWorkers
.
void _PG_init(void) {
.
.
.
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomStringVariable(
"influx.service", "Service name.",
"Service name to listen on, or port number. If it is a service name, it"
" will be looked up.If it is a port, it will be used as it is.",
&InfluxServiceName, "8089", PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomStringVariable(
"influx.database", "Database name.", "Database name to connect to.",
&InfluxDatabaseName, NULL, PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomStringVariable(
"influx.role", "Role name.",
"Role name to use when connecting to the database. Default is to"
" connect as superuser.",
&InfluxRoleName, NULL, PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomStringVariable(
"influx.schema", "Schema name.",
"Schema name to use for the workers. This is where the measurement"
" tables should be placed.",
&InfluxSchemaName, NULL, PGC_POSTMASTER, 0, NULL, NULL, NULL);
StartBackgroundWorkers(InfluxDatabaseName, InfluxSchemaName, InfluxRoleName,
InfluxServiceName, InfluxWorkersCount);
}
This function accepts the database name and the role name to use when connecting, so the code is rewritten to move the string parameters into the WorkerArgs
structure and so the StartBackgroundWorkers
look like this.
void StartBackgroundWorkers(const char *database_name,
const char *schema_name,
const char *role_name,
const char *service_name, int worker_count) {
MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext);
int i;
BackgroundWorker worker;
WorkerArgs args = {0};
if (schema_name)
strncpy(args.namespace, schema_name, sizeof(args.namespace));
if (database_name)
strncpy(args.database, database_name, sizeof(args.database));
if (role_name)
strncpy(args.role, role_name, sizeof(args.role));
if (service_name)
strncpy(args.service, service_name, sizeof(args.service));
elog(LOG, "starting influx workers");
InfluxWorkerInit(&worker, &args);
for (i = 1; i <= worker_count; i++)
RegisterBackgroundWorker(&worker);
elog(LOG, "background workers started");
MemoryContextSwitchTo(oldcontext);
}
In contrast to worker_launch
that spawns workers using RegisterDynamicBackgroundWorker
, the function RegisterBackgroundWorker
is used. This function should be used when starting background workers during preload. Note that the memory for the worker is allocated in TopMemoryContext
since it should stay for the duration of the backend. Since we have modified the WorkerArgs
structure not passing down the OIDs to the function, it is necessary to make changes to the worker main function InfluxWorkerMain
.
Note that it is necessary to pass the worker information by copying it into the structure, so we cannot use the pointers provided to the functions. The reason for this is that the data for the worker structure that is passed to RegisterBackgroundWorker and RegisterDynamicBackgroundWorker is copied using memcpy
into worker slots inside the postmaster which resides in shared memory. This means that any data allocated by the process locally and not placed in shared memory will not be available in the other processes since it is not in shared memory.
void InfluxWorkerMain(Datum arg) {
int sfd;
char buffer[MTU];
WorkerArgs *args = (WorkerArgs *)&MyBgworkerEntry->bgw_extra;
Oid namespace_id;
.
.
.
BackgroundWorkerInitializeConnection(args->database, args->role, 0);
pgstat_report_activity(STATE_RUNNING, "initializing worker");
.
.
.
StartTransactionCommand();
namespace_id = get_namespace_oid(args->namespace, false);
pgstat_report_activity(STATE_RUNNING, "reading events");
.
.
.
}
In the previous post BackgroundWorkerInitializeConnectionByOid
was used to connect to the database, but since this expects OIDs it cannot be used during preload. The reason is that during preload, the system is still starting so the OID is not easily available. Instead, the database name and the role name are passed in to the start function and BackgroundWorkerInitializeConnection
is used. The function will handle all the necessary work for ensuring that the correct OID is used for the database and the role.
Since the schema, or namespace, is passed by name it is necessary to look that up as well. To do that, it is necessary to start a transaction first, so it is delayed until after the StartTransactionCommand
call. If you try to call get_namespace_oid
—or any other function that searched the catalog tables—before you have started a transaction, you will get very weird errors. If that happens to you, check that you have started a transaction.
In order to load an extension (or any other shared library) on startup, you can use the option shared_preload_libraries
, which you need to set in the postgresql.conf
file present in the configuration directory for the server. For example, on my machine the configuration file for my server is /etc/postgresql/13/main/postgresql.conf
. If you open the file in your favorite editor, you will see that the line with this parameter is commented out, so remove the comment and add the name of the extension you want to load. Last, the extension options are added with suitable values.
# default configuration for text search
default_text_search_config = 'pg_catalog.english'
# - Shared Library Preloading -
shared_preload_libraries = 'influx' # (change requires restart)
#local_preload_libraries = ''
#session_preload_libraries = ''
#jit_provider = 'llvmjit' # JIT library to use
.
.
.
# Add settings for extensions here
influx.database = 'mats'
influx.role = 'mats'
influx.schema = 'magic'
influx.workers = 4
The configuration file use the convention of showing the default value for an option when the option is commented out. This makes it easy to see the default when you want to edit the file instead of having to look it up in the manual. Just a reminder, remember to uncomment the line. I always forget it.
After you have updated and saved the file you need to restart the server, and voilĂ , you have background workers automatically running after startup.
mats@abzu:~$ service postgresql restart
mats@abzu:~$ psql
psql (13.10 (Ubuntu 13.10-1.pgdg22.04+1))
Type "help" for help.
mats=# select pid, state, query, backend_type from pg_stat_activity;
pid | state | query | backend_type
--------+--------+---------------------------------------------------------------+-------------------------------
522683 | | | autovacuum launcher
522685 | idle | processing incoming packets | Influx line protocol listener
522686 | idle | processing incoming packets | Influx line protocol listener
522687 | idle | processing incoming packets | Influx line protocol listener
522688 | idle | processing incoming packets | Influx line protocol listener
522689 | | | logical replication launcher
522707 | active | select pid, state, query, backend_type from pg_stat_activity; | client backend
522681 | | | background writer
522680 | | | checkpointer
522682 | | | walwriter
(10 rows)
This is starting to look pretty good as far as features for the extension go, but there is one shortcoming: if a line arrives for a metric that does not have a table, that line will be thrown away. This is quite convenient for performance because it is easy to just ignore measurements that are not interesting. However, in some situations, it might be convenient to have the table automatically created. In the next post you will see how to call functions and procedures from the extension and how that could be used to automatically create tables, or not, depending on your needs.
Update: There were a bug in the code above where the WorkerArgs structure stored pointer to process-local data, which does not work. The post was updated to explain the situation.
Comments