diff --git a/README.md b/README.md index 7b87780..634ee36 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ in JSON, YAML or one line text format and more. * [Metrics](#metrics) * [Dnstap](#dnstap) * [Kafka](#kafka) + * [PostgreSQL](#postgresql) * [More options](#more-options) * [External config file](#external-config-file) * [Verbose mode](#verbose-mode) @@ -459,6 +460,47 @@ Configuration topic: null ``` +### PostgreSQL + +This output enables to send dnstap messages to a PostgreSQL. + +Install extra python library for PostgreSQL (asyncpg). + +See [output_pgsql_userfunc.py](./dnstap_receiver/outputs/output_pgsql_userfunc.py) to replace table definition and data insertion. + +```python +pip install dnstap_receiver[pgsql] +``` + +Configuration + +```yaml + # forward to postgresql server + pgsql: + # enable or disable + enable: false + # retry interval in seconds to connect + retry: 1 + # dsn := postgres://user@host:port/database + # To explicitly write passwd in dsn is not recommended though possible. + # Instead use passfile below. + dsn: postgres://postgres@localhost:5432/postgres + # passfile := /path/to/.pgpass + # https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-CONNECT-PASSFILE + passfile: ~/.pgpass + # min_size: minimum number of connections in the pool + min_size: 5 + # max_size: maximum number of connections in the pool + max_size: 10 + # busy_wait: wait this amount of seconds in the busy loop to write to PostgreSQL. + busy_wait: 1.0 + # timeout: wait this amount of seconds to re-create the connection pool to PostgreSQL after it failed. + timeout: 60 + # filename including user defined functions + userfuncfile: null +``` + + ## More options ### External config file diff --git a/dnstap_receiver/dnstap.conf b/dnstap_receiver/dnstap.conf index d38731c..afc7960 100644 --- a/dnstap_receiver/dnstap.conf +++ b/dnstap_receiver/dnstap.conf @@ -216,3 +216,27 @@ output: "sasl.password": null # Kafka topic to forward messages to topic: null + + # forward to postgresql server + pgsql: + # enable or disable + enable: false + # retry interval in seconds to connect + retry: 1 + # dsn := postgres://user@host:port/database + # To explicitly write passwd in dsn is not recommended though possible. + # Instead use passfile below. + dsn: postgres://postgres@localhost:5432/postgres + # passfile := /path/to/.pgpass + # https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-CONNECT-PASSFILE + passfile: ~/.pgpass + # min_size: minimum number of connections in the pool + min_size: 5 + # max_size: maximum number of connections in the pool + max_size: 10 + # busy_wait: wait this amount of seconds in the busy loop to write to PostgreSQL. + busy_wait: 1.0 + # timeout: wait this amount of seconds to re-create the connection pool to PostgreSQL after it failed. + timeout: 60 + # filename including user defined functions + userfuncfile: null diff --git a/dnstap_receiver/outputs/output_pgsql.py b/dnstap_receiver/outputs/output_pgsql.py new file mode 100644 index 0000000..057e288 --- /dev/null +++ b/dnstap_receiver/outputs/output_pgsql.py @@ -0,0 +1,136 @@ +import asyncio +import logging + +from os.path import abspath,realpath,expanduser,expandvars +from importlib.util import spec_from_file_location, module_from_spec + +try: + import asyncpg + has_pgsql = True +except: + has_pgsql = False + +clogger = logging.getLogger("dnstap_receiver.console") + +from dnstap_receiver.outputs import transform + + +def checking_conf(cfg): + """validate the config""" + clogger.debug("Output handler: pgsql") + + valid_conf = True + + if not has_pgsql: + valid_conf = False + clogger.error("Output handler: pgsql: asyncpg dependency is missing") + + if cfg["dsn"] is None: + valid_conf = False + clogger.error("Output handler: no dsn provided") + + return valid_conf + +async def plaintext_pgclient(output_cfg, queue, start_shutdown): + dsn = output_cfg["dsn"] + clogger.debug("Output handler: connection to %s" % (dsn,)) + + passfile = output_cfg["passfile"] + min_size = output_cfg["min_size"] + max_size = output_cfg["max_size"] + busy_wait = float(output_cfg["busy_wait"]) + userfuncfile = output_cfg["userfuncfile"] + + # importing functions to handle PostgreSQL. + # pgsql_init shall be executed once just after connection pool + # to PostgreSQL. Ususally it should contain "CREATE TABLE IF NOT + # EXISTS..." + # pgsql_main shall be executed on receiving every DNS queries. + # Usually it should be "INSERT INTO..." + # dnstap_receiver has default functions to fall back to, or + # user can define his/her own function in the 'userfuncfile'. + # For example, + # $ cp output_pgsql_userfunc.py output_pgsql_myfunc.py + # $ vi output_pgsql_myfunc.py + # and make 'userfuncfile: /path/to/output_pgsql_myfunc.py' in dnstap.conf + if userfuncfile is None: + clogger.debug(f"Output handler: pgsql: loading default userfuncfile.") + from .output_pgsql_userfunc import pgsql_init, pgsql_main + else: + try: + userfuncfile = abspath(realpath(expandvars(expanduser(userfuncfile)))) + # Should check process euid == file owner ? + + spec = spec_from_file_location('userfunc', userfuncfile) + userfunc = module_from_spec(spec) + spec.loader.exec_module(userfunc) + pgsql_init = userfunc.pgsql_init + pgsql_main = userfunc.pgsql_main + clogger.debug(f"Output handler: pgsql: loaded userfunc in {userfuncfile}.") + except: + clogger.info("Output handler: pgsql faild to load userfunc. fallback to default.") + from .output_pgsql_userfunc import pgsql_init, pgsql_main + + # create connection pool to PostgreSQL server. + async with asyncpg.create_pool(dsn=dsn, passfile=passfile, min_size=min_size, max_size=max_size, timeout=15) as pool: + clogger.debug("Output handler: pgsql connected") + + # acquire a connection and execute pgsql_init() + # such as "CREATE TABLE IF NOT EXISTS..." + async with pool.acquire() as conn: + async with conn.transaction(): + await pgsql_init(conn) + + # consume queue + while not start_shutdown.is_set(): + #clogger.debug(f'Output handler: pgsql receiving tapmsg from queue.') + # 'tapmsg = await queue.get()' will block start_shutdown_task + # to gracefully shutdown dnstap_receiver itself. + # 'queue.get_nowait()' won't block but introduces + # busy-wait-loop instead. which do yo like? + try: + tapmsg = queue.get_nowait() + except asyncio.QueueEmpty as e: + if start_shutdown.is_set(): + clogger.debug('Output handler: pgsql shutting down. ') + break + else: + await asyncio.sleep(busy_wait) + continue + else: + clogger.debug(f'Output handler: pgsql received tapmsg: {tapmsg}.') + + # acquire a connection and send 'INSERT...' to PostgreSQL server. + async with pool.acquire() as conn: + async with conn.transaction(): + await pgsql_main(tapmsg, conn) + clogger.debug('Output handler: pgsql INSERT dispached.') + + # done continue to next item + queue.task_done() + + clogger.debug(f'Output handler: pgsql closing pool.') + + # something + if not start_shutdown.is_set(): + clogger.error("Output handler: pgclient connection lost") + +async def handle(output_cfg, queue, metrics, start_shutdown): + """pgsql reconnect""" + loop = asyncio.get_event_loop() # do we need this? + + clogger.debug("Output handler: PostgreSQL enabled") + + while not start_shutdown.is_set(): + try: + await plaintext_pgclient(output_cfg, queue, start_shutdown) + except ConnectionRefusedError: + clogger.error('Output handler: connection to pgsql server failed!') + except asyncio.TimeoutError: + clogger.error('Output handler: connection to pgsql server timed out!') + else: + clogger.error('Output handler: connection to pgsql is closed.') + + if not start_shutdown.is_set(): + clogger.debug("'Output handler: retry to connect every %ss" % output_cfg["retry"]) + await asyncio.sleep(output_cfg["retry"]) diff --git a/dnstap_receiver/outputs/output_pgsql_userfunc.py b/dnstap_receiver/outputs/output_pgsql_userfunc.py new file mode 100644 index 0000000..2d9a6a0 --- /dev/null +++ b/dnstap_receiver/outputs/output_pgsql_userfunc.py @@ -0,0 +1,52 @@ +''' +This file contains the default functions of pgsql_init and pgsql_main. +If you feel like replace them, + 1) copy this file and edit it as you like (but keep function names), + 2) then designate it in the `userfuncefile` in the configuration. + (See outputs: pgsql: section in ../dnstap.conf) +''' + +import logging +clogger = logging.getLogger("dnstap_receiver.console") + +async def pgsql_init(conn): + ''' + pgsql_init is a function which is executed once just after + creation of asyncpg connection pool (nearly equals to every time + when the dnstap_receiver being started). + It is expected to do something like "CREATE TABLE IF NOT EXISTS..." here. + + `conn` is a connection to PostgreSQL server acquired from pool. + ''' + clogger.info("pgsql_init: createing table if not exists.") + return await conn.execute(""" + CREATE TABLE IF NOT EXISTS dnstap_receiver ( + message TEXT -- "AUTH_QUERY" + ,type TEXT -- "query" + ,timestamp TIMESTAMPTZ -- "1625636652.113565" + ,query_ip TEXT -- "192.0.2.100" + ,response_ip TEXT -- "203.0.113.200" + ,qname TEXT -- "www.example.com." + ,rrtype TEXT -- "A" + ,rcode TEXT -- "NOERROR" + ) + """) + +async def pgsql_main(tapmsg, conn): + ''' + pgsql_main is a function which is executed on each dnstap data delivered. + It is expected to do something like "INSERT INTO..." here. + + `conn` is a connection to PostgreSQL server acquired from pool. + `tapmsg` is a dict that contains dnstap data delivered. + ''' + clogger.info("pgsql_main: inserting data.") + return await conn.execute(""" + INSERT INTO dnstap_receiver VALUES + ($1, $2, to_timestamp($3), $4, $5, $6, $7, $8) + """, + tapmsg['message'], tapmsg['type'] + ,tapmsg['timestamp'], tapmsg['query-ip'] + ,tapmsg['response-ip'], tapmsg['qname'] + ,tapmsg['rrtype'], tapmsg['rcode'] + ) diff --git a/dnstap_receiver/receiver.py b/dnstap_receiver/receiver.py index 70ae6f0..1df29f7 100644 --- a/dnstap_receiver/receiver.py +++ b/dnstap_receiver/receiver.py @@ -28,6 +28,7 @@ from dnstap_receiver.outputs import output_metrics from dnstap_receiver.outputs import output_dnstap from dnstap_receiver.outputs import output_kafka +from dnstap_receiver.outputs import output_pgsql from dnstap_receiver import api_server from dnstap_receiver import statistics @@ -169,6 +170,12 @@ def setup_outputs(cfg, stats, start_shutdown): queues_list.append(queue_kafka) loop.create_task(output_kafka.handle(conf["kafka"], queue_kafka, stats, start_shutdown)) + if conf["pgsql"]["enable"]: + if not output_pgsql.checking_conf(cfg=conf["pgsql"]): return + queue_pgsql = asyncio.Queue() + queues_list.append(queue_pgsql) + loop.create_task(output_pgsql.handle(conf["pgsql"], queue_pgsql, stats, start_shutdown)) + return queues_list def setup_inputs(cfg, queues_outputs, stats, geoip_reader, start_shutdown): diff --git a/setup.j2 b/setup.j2 index 144fafd..4163596 100644 --- a/setup.j2 +++ b/setup.j2 @@ -38,5 +38,6 @@ setuptools.setup( ], extras_require={ "kafka": ["confluent-kafka"], + "pgsql": ["asyncpg"], } ) diff --git a/tests/dnstap_pgsql.conf b/tests/dnstap_pgsql.conf new file mode 100644 index 0000000..2fe41be --- /dev/null +++ b/tests/dnstap_pgsql.conf @@ -0,0 +1,7 @@ +trace: + # enable verbose mode + verbose: true + +output: + pgsql: + enable: true diff --git a/tests/test_external_config.py b/tests/test_external_config.py index 28e0106..c4f1b45 100644 --- a/tests/test_external_config.py +++ b/tests/test_external_config.py @@ -47,4 +47,11 @@ def test4_output_tcp_enable(self): cmd = 'python3 -c "from dnstap_receiver.receiver import start_receiver; start_receiver()" -c ./tests/dnstap_tcp.conf' o = execute_dnstap(cmd) - self.assertRegex(o, b"Output handler: tcp") \ No newline at end of file + self.assertRegex(o, b"Output handler: tcp") + + def test5_output_pgsql_enable(self): + """test to enable pgsql output„nwill take 1 minute or so on.""" + cmd = 'python3 -c "from dnstap_receiver.receiver import start_receiver; start_receiver()" -c ./tests/dnstap_pgsql.conf' + o = execute_dnstap(cmd) + + self.assertRegex(o, b"Output handler: pgsql")