Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Commit

Permalink
Merge pull request #37 from motok/pgsql
Browse files Browse the repository at this point in the history
Feature Addition: PostgreSQL database as an output.
  • Loading branch information
dmachard authored Jul 24, 2021
2 parents 1c505b6 + 689f8f5 commit ce145ba
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 1 deletion.
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ in JSON, YAML or one line text format and more.
* [Metrics](#metrics)
* [Dnstap](#dnstap)
* [Kafka](#kafka)
* [PostgreSQL](#postgresql)
* [More options](#more-options)
* [External config file](#external-config-file)
* [Verbose mode](#verbose-mode)
Expand Down Expand Up @@ -459,6 +460,47 @@ Configuration
topic: null
```
### PostgreSQL
This output enables to send dnstap messages to a PostgreSQL.
Install extra python library for PostgreSQL (asyncpg).
See [output_pgsql_userfunc.py](./dnstap_receiver/outputs/output_pgsql_userfunc.py) to replace table definition and data insertion.
```python
pip install dnstap_receiver[pgsql]
```

Configuration

```yaml
# forward to postgresql server
pgsql:
# enable or disable
enable: false
# retry interval in seconds to connect
retry: 1
# dsn := postgres://user@host:port/database
# To explicitly write passwd in dsn is not recommended though possible.
# Instead use passfile below.
dsn: postgres://postgres@localhost:5432/postgres
# passfile := /path/to/.pgpass
# https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-CONNECT-PASSFILE
passfile: ~/.pgpass
# min_size: minimum number of connections in the pool
min_size: 5
# max_size: maximum number of connections in the pool
max_size: 10
# busy_wait: wait this amount of seconds in the busy loop to write to PostgreSQL.
busy_wait: 1.0
# timeout: wait this amount of seconds to re-create the connection pool to PostgreSQL after it failed.
timeout: 60
# filename including user defined functions
userfuncfile: null
```
## More options
### External config file
Expand Down
24 changes: 24 additions & 0 deletions dnstap_receiver/dnstap.conf
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,27 @@ output:
"sasl.password": null
# Kafka topic to forward messages to
topic: null

# forward to postgresql server
pgsql:
# enable or disable
enable: false
# retry interval in seconds to connect
retry: 1
# dsn := postgres://user@host:port/database
# To explicitly write passwd in dsn is not recommended though possible.
# Instead use passfile below.
dsn: postgres://postgres@localhost:5432/postgres
# passfile := /path/to/.pgpass
# https://www.postgresql.org/docs/12/libpq-connect.html#LIBPQ-CONNECT-PASSFILE
passfile: ~/.pgpass
# min_size: minimum number of connections in the pool
min_size: 5
# max_size: maximum number of connections in the pool
max_size: 10
# busy_wait: wait this amount of seconds in the busy loop to write to PostgreSQL.
busy_wait: 1.0
# timeout: wait this amount of seconds to re-create the connection pool to PostgreSQL after it failed.
timeout: 60
# filename including user defined functions
userfuncfile: null
136 changes: 136 additions & 0 deletions dnstap_receiver/outputs/output_pgsql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import asyncio
import logging

from os.path import abspath,realpath,expanduser,expandvars
from importlib.util import spec_from_file_location, module_from_spec

try:
import asyncpg
has_pgsql = True
except:
has_pgsql = False

clogger = logging.getLogger("dnstap_receiver.console")

from dnstap_receiver.outputs import transform


def checking_conf(cfg):
"""validate the config"""
clogger.debug("Output handler: pgsql")

valid_conf = True

if not has_pgsql:
valid_conf = False
clogger.error("Output handler: pgsql: asyncpg dependency is missing")

if cfg["dsn"] is None:
valid_conf = False
clogger.error("Output handler: no dsn provided")

return valid_conf

async def plaintext_pgclient(output_cfg, queue, start_shutdown):
dsn = output_cfg["dsn"]
clogger.debug("Output handler: connection to %s" % (dsn,))

passfile = output_cfg["passfile"]
min_size = output_cfg["min_size"]
max_size = output_cfg["max_size"]
busy_wait = float(output_cfg["busy_wait"])
userfuncfile = output_cfg["userfuncfile"]

# importing functions to handle PostgreSQL.
# pgsql_init shall be executed once just after connection pool
# to PostgreSQL. Ususally it should contain "CREATE TABLE IF NOT
# EXISTS..."
# pgsql_main shall be executed on receiving every DNS queries.
# Usually it should be "INSERT INTO..."
# dnstap_receiver has default functions to fall back to, or
# user can define his/her own function in the 'userfuncfile'.
# For example,
# $ cp output_pgsql_userfunc.py output_pgsql_myfunc.py
# $ vi output_pgsql_myfunc.py
# and make 'userfuncfile: /path/to/output_pgsql_myfunc.py' in dnstap.conf
if userfuncfile is None:
clogger.debug(f"Output handler: pgsql: loading default userfuncfile.")
from .output_pgsql_userfunc import pgsql_init, pgsql_main
else:
try:
userfuncfile = abspath(realpath(expandvars(expanduser(userfuncfile))))
# Should check process euid == file owner ?

spec = spec_from_file_location('userfunc', userfuncfile)
userfunc = module_from_spec(spec)
spec.loader.exec_module(userfunc)
pgsql_init = userfunc.pgsql_init
pgsql_main = userfunc.pgsql_main
clogger.debug(f"Output handler: pgsql: loaded userfunc in {userfuncfile}.")
except:
clogger.info("Output handler: pgsql faild to load userfunc. fallback to default.")
from .output_pgsql_userfunc import pgsql_init, pgsql_main

# create connection pool to PostgreSQL server.
async with asyncpg.create_pool(dsn=dsn, passfile=passfile, min_size=min_size, max_size=max_size, timeout=15) as pool:
clogger.debug("Output handler: pgsql connected")

# acquire a connection and execute pgsql_init()
# such as "CREATE TABLE IF NOT EXISTS..."
async with pool.acquire() as conn:
async with conn.transaction():
await pgsql_init(conn)

# consume queue
while not start_shutdown.is_set():
#clogger.debug(f'Output handler: pgsql receiving tapmsg from queue.')
# 'tapmsg = await queue.get()' will block start_shutdown_task
# to gracefully shutdown dnstap_receiver itself.
# 'queue.get_nowait()' won't block but introduces
# busy-wait-loop instead. which do yo like?
try:
tapmsg = queue.get_nowait()
except asyncio.QueueEmpty as e:
if start_shutdown.is_set():
clogger.debug('Output handler: pgsql shutting down. ')
break
else:
await asyncio.sleep(busy_wait)
continue
else:
clogger.debug(f'Output handler: pgsql received tapmsg: {tapmsg}.')

# acquire a connection and send 'INSERT...' to PostgreSQL server.
async with pool.acquire() as conn:
async with conn.transaction():
await pgsql_main(tapmsg, conn)
clogger.debug('Output handler: pgsql INSERT dispached.')

# done continue to next item
queue.task_done()

clogger.debug(f'Output handler: pgsql closing pool.')

# something
if not start_shutdown.is_set():
clogger.error("Output handler: pgclient connection lost")

async def handle(output_cfg, queue, metrics, start_shutdown):
"""pgsql reconnect"""
loop = asyncio.get_event_loop() # do we need this?

clogger.debug("Output handler: PostgreSQL enabled")

while not start_shutdown.is_set():
try:
await plaintext_pgclient(output_cfg, queue, start_shutdown)
except ConnectionRefusedError:
clogger.error('Output handler: connection to pgsql server failed!')
except asyncio.TimeoutError:
clogger.error('Output handler: connection to pgsql server timed out!')
else:
clogger.error('Output handler: connection to pgsql is closed.')

if not start_shutdown.is_set():
clogger.debug("'Output handler: retry to connect every %ss" % output_cfg["retry"])
await asyncio.sleep(output_cfg["retry"])
52 changes: 52 additions & 0 deletions dnstap_receiver/outputs/output_pgsql_userfunc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'''
This file contains the default functions of pgsql_init and pgsql_main.
If you feel like replace them,
1) copy this file and edit it as you like (but keep function names),
2) then designate it in the `userfuncefile` in the configuration.
(See outputs: pgsql: section in ../dnstap.conf)
'''

import logging
clogger = logging.getLogger("dnstap_receiver.console")

async def pgsql_init(conn):
'''
pgsql_init is a function which is executed once just after
creation of asyncpg connection pool (nearly equals to every time
when the dnstap_receiver being started).
It is expected to do something like "CREATE TABLE IF NOT EXISTS..." here.
`conn` is a connection to PostgreSQL server acquired from pool.
'''
clogger.info("pgsql_init: createing table if not exists.")
return await conn.execute("""
CREATE TABLE IF NOT EXISTS dnstap_receiver (
message TEXT -- "AUTH_QUERY"
,type TEXT -- "query"
,timestamp TIMESTAMPTZ -- "1625636652.113565"
,query_ip TEXT -- "192.0.2.100"
,response_ip TEXT -- "203.0.113.200"
,qname TEXT -- "www.example.com."
,rrtype TEXT -- "A"
,rcode TEXT -- "NOERROR"
)
""")

async def pgsql_main(tapmsg, conn):
'''
pgsql_main is a function which is executed on each dnstap data delivered.
It is expected to do something like "INSERT INTO..." here.
`conn` is a connection to PostgreSQL server acquired from pool.
`tapmsg` is a dict that contains dnstap data delivered.
'''
clogger.info("pgsql_main: inserting data.")
return await conn.execute("""
INSERT INTO dnstap_receiver VALUES
($1, $2, to_timestamp($3), $4, $5, $6, $7, $8)
""",
tapmsg['message'], tapmsg['type']
,tapmsg['timestamp'], tapmsg['query-ip']
,tapmsg['response-ip'], tapmsg['qname']
,tapmsg['rrtype'], tapmsg['rcode']
)
7 changes: 7 additions & 0 deletions dnstap_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dnstap_receiver.outputs import output_metrics
from dnstap_receiver.outputs import output_dnstap
from dnstap_receiver.outputs import output_kafka
from dnstap_receiver.outputs import output_pgsql

from dnstap_receiver import api_server
from dnstap_receiver import statistics
Expand Down Expand Up @@ -169,6 +170,12 @@ def setup_outputs(cfg, stats, start_shutdown):
queues_list.append(queue_kafka)
loop.create_task(output_kafka.handle(conf["kafka"], queue_kafka, stats, start_shutdown))

if conf["pgsql"]["enable"]:
if not output_pgsql.checking_conf(cfg=conf["pgsql"]): return
queue_pgsql = asyncio.Queue()
queues_list.append(queue_pgsql)
loop.create_task(output_pgsql.handle(conf["pgsql"], queue_pgsql, stats, start_shutdown))

return queues_list

def setup_inputs(cfg, queues_outputs, stats, geoip_reader, start_shutdown):
Expand Down
1 change: 1 addition & 0 deletions setup.j2
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ setuptools.setup(
],
extras_require={
"kafka": ["confluent-kafka"],
"pgsql": ["asyncpg"],
}
)
7 changes: 7 additions & 0 deletions tests/dnstap_pgsql.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
trace:
# enable verbose mode
verbose: true

output:
pgsql:
enable: true
9 changes: 8 additions & 1 deletion tests/test_external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ def test4_output_tcp_enable(self):
cmd = 'python3 -c "from dnstap_receiver.receiver import start_receiver; start_receiver()" -c ./tests/dnstap_tcp.conf'
o = execute_dnstap(cmd)

self.assertRegex(o, b"Output handler: tcp")
self.assertRegex(o, b"Output handler: tcp")

def test5_output_pgsql_enable(self):
"""test to enable pgsql output¥nwill take 1 minute or so on."""
cmd = 'python3 -c "from dnstap_receiver.receiver import start_receiver; start_receiver()" -c ./tests/dnstap_pgsql.conf'
o = execute_dnstap(cmd)

self.assertRegex(o, b"Output handler: pgsql")

0 comments on commit ce145ba

Please sign in to comment.