Parsing InfluxDB Line Protocol

In the previous post you could see how to create a background worker that received data over a socket as well as how to spawn a new background worker. In this post you will see how to write a simple parser for the InfluxDB Line Protocol and also get an introduction into PostgreSQL Memory Contexts and the Set-Returning Function (SRF) interface and learn how to write a function that returns multiple tuples. In this case, the interface will be used to write a function to test the parser, but it is very useful to understand and is used also in the built-in functions. As in the previous posts, the code will be available in the pg_influx repository on GitHub.

Writing a Recursive Descent Parser for the InfluxDB Line Protocol

Writing a parser is not something that deals with database internals, but since one is needed for the extension, it can be interesting to cover the basics of how to write a simple parser. The InfluxDB Line Protocol is a very simple protocol, both in structure and how easy it is to parse, which renders it very suitable for telemetry data from IoT devices—even the most simple of devices can send data in this format. The data comes in the form of lines, and the format of each line is:

<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]

It is possible to write a parser using tools like Bison and Flex, but since the format is so simple, it is easier—and also more fun—to write a recursive decent parser. The format of the lines is suitable to parse using a predictive parser, which is a subclass of recursive decent parsers that do not require backtracking, hence are both simple to implement and also very efficient. The trick is to rewrite the grammar so that it is always possible to look at the first token(s) to decide what to do and the grammar—this is called an LL(k) grammar. Fortunately, the InfluxDB Line Protocol is written so that it is suitable for a predictive parser and the full grammar in Extended Backus-Naur Form (EBNF) is given as:

Line = Ident, {",", Item}, " ", Item, {",", Item};
Item = Ident, "=", Value;
Ident = LETTER, {LETTER | DIGIT | "_" | "-"};
Value = QString | BString;
BString = LETTER, {"\", ANY | VALUE_CHAR};
QString = '"', {"\", ANY | STRING_CHAR}, '"';
VALUE_CHAR = ? any character except space, comma, or backslash ?;
STRING_CHAR = ? any character except quote or backslash ?;
LETTER = ? any letter ?;
DIGIT = ? any digit ?;

The Parser State

Figure 1. Parser state and buffer content

All parsers have a parse state and this parser is no exception. The parser state can be handled with just the first two fields, start and current, which are pointers to the beginning of the input buffer and the next character of the input, respectively. The remaining fields in the parse state is the result of the parse and it will be read when inserting data into the database and copied where necessary. Since the packets received from the network are stored in a memory buffer and the buffer is not needed after the parse, it is sufficient to keep pointers into the buffer for all tokens and store null characters in the buffer to terminate the strings as shows in Figure 1.

typedef struct ParseState {
  char *start;
  char *current;
  const char *metric;
  const char *timestamp;
  List *tags;
  List *fields;
} ParseState;

bool ReadNextLine(ParseState *state);
void ParseStateInit(ParseState *state, char *line);

The parser just needs two functions: one function to initialize the state with a buffer and one function to parse and read the next line. When writing a parser, few convenience functions for the parser state is useful. These are added to the parser module so they are not visible outside the module, hence there is no mention of them in the header file for the parser and they are defined as static functions, giving them internal linkage.

  • The function ExpectNextChar takes a character (in addition to the parse state) and errors out if the next character is not the expected one. It will also write a null character at the position to terminate the previous token. This function is used when the next character is required by the grammar and it is an error if it is not present.
  • The function CheckNextChar takes a character and returns true and terminate the previous token if the character match, otherwise it will do nothing. This function is used when the next character is optional: for example when parsing a list.
static void ExpectNextChar(ParseState *state, char ch) {
  if (*state->current != ch)
    ereport(ERROR,
            (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unexpected character"),
             errdetail("expected '%c' at position %u, saw '%c'", ch,
                       (unsigned int)(state->current - state->start),
                       *state->current)));
  *state->current++ = '\0';
}

static bool CheckNextChar(ParseState *state, char ch) {
  if (*state->current != ch)
    return false;
  *state->current++ = '\0';
  return true;
}

Writing the Parser

To write a predictive parser you first define a grammar and based on that grammar you write one functions for each rule. For a grammar to be possible to implement using a predictive parser it has to be LL(k) for some integer k. Transforming a grammar to be LL(k) given an arbitrary grammar requires some knowledge in formal language theory: something that will not be covered here. In case you’re interested in learning more about parsing and grammars, there are a few introductions on how to re-write grammars to be LL(k). For example GeeksForGeeks has a post Construction of LL(1) Parsing Table as part of a tutorial on writing compilers which includes a description on how to rewrite a grammar as a LL(k) grammar.

The full grammar is available in the repository, but just to illustrate how rules are written, consider a fragment of the grammar for the InfluxDB Line Protocol in EBNF form (this is for parsing the tags and fields). This fragment is LL(1) and hence suitable for implementation using a predictive parser.

ItemList = Item, {",", Item};
Item = Ident, "=", Value;

To write a parser for this grammar, one function is written for each rule. The functions above help you to write the rules in a straightforward manner and, as you can see, the functions that implement the grammar look very similar to the grammar rules themselves.

static List *ReadItemList(ParseState *state) {
  List *items = NIL;
  items = lappend(items, ReadItem(state));
  while (CheckNextChar(state, ','))
    items = lappend(items, ReadItem(state));
  return items;
}

static ParseItem *ReadItem(ParseState *state) {
  ParseItem *item = palloc(sizeof(ParseItem));
  item->key = ReadIdent(state);
  ExpectNextChar(state, '=');
  item->value = ReadValue(state);
  return item;
}

A Brief Look at Memory Contexts

Before delving into the set-returning functions, it is useful to take a quick look at PostgreSQL memory contexts since they play an important role in how the set-returning functions are executed.

All memory in PostgreSQL is allocated in a memory context. The memory context control the lifetime of the allocated memory and also allow PostgreSQL to release memory efficiently since larger slabs of memory can be allocated and released at the same time. Allocating memory using memory contexts also play well with the error reporting functions—which are based on the standard C functions setjmp and longjmp—since it does not force function implementations to have elaborate logic for releasing allocated memory in error cases. You can see an example in Figure 2 on the right (XACT is the transaction execution module) where the memory allocation is completely handled in the transaction module and the called function can allocate memory without having to deal with capturing the error and freeing the memory.

All memory contexts have a name—which are used for debugging purposes—and are organized as a tree with parent and child contexts. The lifetime of a child context will never exceed that of the parent context, but the child context can live for a shorter time. If you destroy a memory context it, and all its children, will be destroyed at the same time. There are a few standard memory contexts that are used in PostgreSQL, but you can allocate your own memory contexts as well.

  • Top memory context (name “TopMemoryContext”) is the root of all memory contexts. It is allocated once and never deleted (unless the process dies, of course).
  • Top transaction memory context (name “TopTransactionContext”) is a context that is allocated at the start of a transaction and destroyed when the transaction commits or aborts.
  • Executor state memory context (name “ExecutorState”) is allocated at the start of execution of a query. This is also called per-query memory context in some parts of the code.
  • Expression memory context (name “ExprContext”) are allocated for each expression to evaluate as part of executing a query. This context is also called per-tuple memory context in some places in the code.
  • Multi-call memory context is a transient context allocated for the execution of a multi-call function and is covered below.
  • Current memory context is a pseudo-memory context and points to a context where allocations by default will be done. It is possible to switch current memory context using MemoryContextSwitchTo and it is heavily used in the code.

If you look at the diagram in Figure 2 you can see an example of error handling where the transaction handler delete all allocations in the transaction memory context. Note that the parse_influx function can allocate memory as needed but it does not have to keep track of the memory since it will automatically be deleted once the transaction is done with it, regardless of how the transaction ends.

PostgreSQL Functions

As you might recall from the first post, you had to use the macro PG_FUNCTION_ARGS when writing a PostgreSQL function. This is only a convenience macro expanding to FunctionCallInfo fcinfo, which declares a single parameter to the function. This parameter is used inside the function to figure out information about parameters (and other data) passed to the function call: number of parameters, the context of the call, and information about the return type of the called function. In addition to the simple functions that just return one value, there are set-returning functions (SRF) that can return multiple rows.

Set-Returning Functions

Returning more than one row from a function is done when you want to return either multiple values, or when you want to return a table-like structure. This example implements a set-returning function that can parse a InfluxDB Line Protocol packet and returns a table with one row for each line in the packet.

CREATE FUNCTION parse_influx(text)
RETURNS TABLE (_metric text, _time text, _tags jsonb, _fields jsonb)
LANGUAGE C AS 'MODULE_PATHNAME';

Since set-returning functions are called multiple times, once for each row, they require some special logic. In this the function returns a pre-defined tuple consisting of four columns, but it is also possible to have a more flexible scheme that allow you to return different data depending on the expected return type. This is not something that will be covered here.

There are actually two different interfaces for returning sets. The one covered here is the value-per-call mode and the other method, which will not be covered here, is called materialize mode.

Since set-returning functions are called several times, it is necessary to remember the state of the execution between the calls, which is the purpose of the FuncCallContext structure. The structure contains a number of fields that can be used to track the state of execution—as well as saving away critical information between the calls—but the most important of these fields is the multi-call memory context multi_call_memory_ctx. This memory context is used to allocate the execution state in memory that persists between all the calls that are needed to return the rows. The function to parse InfluxDB protocol is a typical example of a set-returning function and is a good example.

Datum parse_influx(PG_FUNCTION_ARGS) {
  HeapTuple tuple;
  FuncCallContext *funcctx;
  ParseState *state;

  if (SRF_IS_FIRSTCALL()) {
    /* Initialize the multi-call state */

    TupleDesc tupdesc;
    MemoryContext oldcontext;

    funcctx = SRF_FIRSTCALL_INIT();
    oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                      errmsg("function returning record called in context "
                             "that cannot accept type record")));
    funcctx->tuple_desc = tupdesc;
    funcctx->user_fctx =
        ParseInfluxSetup(text_to_cstring(PG_GETARG_TEXT_PP(0)));

    MemoryContextSwitchTo(oldcontext);
  }

  funcctx = SRF_PERCALL_SETUP();
  state = funcctx->user_fctx;

  tuple = ParseInfluxNextTuple(state, funcctx->tuple_desc);
  if (tuple == NULL)
    SRF_RETURN_DONE(funcctx);

  SRF_RETURN_NEXT(funcctx, PointerGetDatum(HeapTupleGetDatum(tuple)));
}

Initializing the multi-call state. Before using the FuncCallContext, you need to use the macro SRF_FIRSTCALL_INIT to create the multi-call memory context and the initialize fields that you need in the remaining calls. The multi-call memory context will be allocated as a child of the executor state memory context and will therefore persist until the query is done executing. This should only be done once and subsequent calls will generate an error. (If it was possible to call it multiple times, you would get a new multi-call memory context each time, but you would lose track of the previous multi-call memory context and will not be able to see state changes since the previous call.) To make sure that the memory is allocated once, and only once, this initial setup is only done only when SRF_IS_FIRSTCALL returns true.

The memory allocated by SRF_FIRSTCALL_INIT is always the per-query memory context, but the current memory context when executing functions is the per-tuple memory context. Since the per-tuple memory context is reset for each call to the function, you need to switch memory context if you set up fields that need to persist between calls to the set-returning function. By switching to the multi-call memory context, the call to get_call_result_type, text_to_cstring, and ParseInfluxSetup will allocate memory in the multi-call memory context.

Most of the fields are optional and only used by the set-returning function itself. In this case only two fields are initialized: tuple_desc and user_fctx. The field tuple_desc is used to return composite types and holds a pointer to the tuple descriptor for the composite type. The user_fctx field is free to use for anything by the set-returning function implementation, so here it is set to the parser state.

Return next row in set. To construct one row to return, you first need to do a per-call setup using the SRF_PERCALL_SETUP macro. If you have initialized it in the code above, it is actually not necessary, but it does not cause any problems (the macro just retrieves the pointer from the FuncCallInfo structure) and the code is more straightforward if it’s always done. If there are no more rows to return, you use SRF_RETURN_DONE which takes only the function call context. If you want to return a value, you use SRF_RETURN_NEXT. This function takes a Datum to return, so you need to convert whatever you want to return into a Datum. In this case, there is a heap tuple allocated on the per-tuple context, so the function returns a pointer to this.

Building Tuples

The ParseInfluxNextTuple constructs a new tuple to return. Since this is a comparably complex object and not just a simple value, it will need to allocate memory. This memory will be allocated on the per-tuple context so it will be released after the tuple has been processed.

void ParseInfluxCollect(ParseState *state, TupleDesc tupdesc, Datum *values,
                        bool *nulls) {
  values[1] = CStringGetTextDatum(state->timestamp);
  values[2] = JsonbPGetDatum(BuildJsonObject(state->tags));
  values[3] = JsonbPGetDatum(BuildJsonObject(state->fields));
}

static HeapTuple ParseInfluxNextTuple(ParseState *state, TupleDesc tupdesc) {
  Datum *values;
  bool *nulls;

  if (!ReadNextLine(state))
    return NULL;


  nulls = (bool *)palloc0(tupdesc->natts * sizeof(bool));
  values = (Datum *)palloc(tupdesc->natts * sizeof(Datum));

  values[0] = CStringGetTextDatum(state->metric);
  ParseInfluxCollect(state, tupdesc, values, nulls);

  return heap_form_tuple(tupdesc, values, nulls);
}

Creating a tuple is quite straightforward: you build a values and nulls array, fill it in, and then convert it to a heap tuple. The values array contain Datum objects converted from other types. PostgreSQL can then handle it correctly thanks to the tuple descriptor, which contains the actual type definition of the column in the tuple. Since some value can (usually) be null, this needs to be in a separate array, which is the purpose of the nulls array. In this case, the code for filling in everything but the metric name is split out since it will be used in the post where we insert the row into an actual table.

That’s it! Now you have a parsing function for the InfluxDB Line Protocol and you can test it to make sure that it works as expected.

mats@abzu:~/proj/pg_influx$ bear make && sudo env PATH=$PATH make install
...
mats@abzu:~/proj/pg_influx$ psql
psql (13.5)
Type "help" for help.

mats=# create extension influx;
CREATE EXTENSION

mats=# select * from parse_influx('system,host=fury uptime=607641i 1574753954000000000');
 _metric |        _time        |      _tags       |        _fields        
---------+---------------------+------------------+-----------------------
 system  | 1574753954000000000 | {"host": "fury"} | {"uptime": "607641i"}
(1 row)

As you might have noted, I just zoomed by the functions to build JSON data, so in the next post I will do a brief digression and describe how this works, but then it is time to start going over the Server Programming Interface (SPI), which is used to execute commands and interface with the actual tables stored in the database.

Mats

dbmsdrops.kindahl.net

Long time developer with a keen interest in databases, distributed systems, and programming languages. Currently working as Database Architect at Timescale.

Comments

Leave a Reply

Your email address will not be published.