Skip to content

Commit

Permalink
[feature] Added support for InfluxDB UDP mode #458
Browse files Browse the repository at this point in the history
- Additional options can be specified to the TIMESERIES_DATABASE setting
  which are passed to the underlying backend library. This allows using
  UDP for writing to InfluxDB.
- "openwisp_monitoring.db.backends.influxdb.client.DatabaseClient.write"
  will use "InfluxDBClient.write_points" instead of "InfluxDBClient.write"
  because the former allows writing data using the UDP protocol.
- If device data is too big to be written with UDP, HTTP is used instead.

Closes #458
  • Loading branch information
pandafy authored Feb 9, 2023
1 parent 7f92b01 commit 476e7a3
Show file tree
Hide file tree
Showing 18 changed files with 325 additions and 83 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ jobs:
image: redis
ports:
- 6379:6379
influxdb:
image: influxdb:1.8.4-alpine
options: >-
--name "influxdb"
ports:
- 8086:8086

strategy:
fail-fast: false
Expand Down Expand Up @@ -74,6 +68,9 @@ jobs:
- name: Install npm dependencies
run: sudo npm install -g install jshint stylelint

- name: Start InfluxDB container
run: docker-compose up -d influxdb

- name: Install test dependencies
run: |
pip install -r requirements-test.txt
Expand All @@ -91,6 +88,7 @@ jobs:
run: |
SAMPLE_APP=1 coverage run --source=openwisp_monitoring runtests.py
coverage run -a --source=openwisp_monitoring runtests.py
TIMESERIES_UDP=1 coverage run -a --source=openwisp_monitoring runtests.py
- name: Upload Coverage
run: coveralls --service=github
Expand Down
92 changes: 92 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ Follow the setup instructions of `openwisp-controller
'NAME': 'openwisp2',
'HOST': 'localhost',
'PORT': '8086',
'OPTIONS': {
# Specify additional options to be used while initializing
# database connection.
# Note: These options may differ based on the backend used.
'udp_writes': True,
'udp_port': 8089,
}
}
``urls.py``:
Expand Down Expand Up @@ -1397,6 +1404,87 @@ these permissions are included by default in the "Administrator" and "Operator"
Settings
--------

``TIMESERIES_DATABASE``
~~~~~~~~~~~~~~~~~~~~~~~

+--------------+-----------+
| **type**: | ``str`` |
+--------------+-----------+
| **default**: | see below |
+--------------+-----------+

.. code-block:: python
TIMESERIES_DATABASE = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
'NAME': 'openwisp2',
'HOST': 'localhost',
'PORT': '8086',
'OPTIONS': {
'udp_writes': False,
'udp_port': 8089,
}
}
The following table describes all keys available in ``TIMESERIES_DATABASE``
setting:

+---------------+--------------------------------------------------------------------------------------+
| **Key** | ``Description`` |
+---------------+--------------------------------------------------------------------------------------+
| ``BACKEND`` | The timeseries database backend to use. You can select one of the backends |
| | located in ``openwisp_monitoring.db.backends`` |
+---------------+--------------------------------------------------------------------------------------+
| ``USER`` | User for logging into the timeseries database |
+---------------+--------------------------------------------------------------------------------------+
| ``PASSWORD`` | Password of the timeseries database user |
+---------------+--------------------------------------------------------------------------------------+
| ``NAME`` | Name of the timeseries database |
+---------------+--------------------------------------------------------------------------------------+
| ``HOST`` | IP address/hostname of machine where the timeseries database is running |
+---------------+--------------------------------------------------------------------------------------+
| ``PORT`` | Port for connecting to the timeseries database |
+---------------+--------------------------------------------------------------------------------------+
| ``OPTIONS`` | These settings depends on the timeseries backend: |
| | |
| | +-----------------+----------------------------------------------------------------+ |
| | | ``udp_writes`` | Whether to use UDP for writing data to the timeseries database | |
| | +-----------------+----------------------------------------------------------------+ |
| | | ``udp_port`` | Timeseries database port for writing data using UDP | |
| | +-----------------+----------------------------------------------------------------+ |
+---------------+--------------------------------------------------------------------------------------+

**Note:** UDP packets can have a maximum size of 64KB. When using UDP for writing timeseries
data, if the size of the data exceeds 64KB, TCP mode will be used instead.

**Note:** If you want to use the ``openwisp_monitoring.db.backends.influxdb`` backend
with UDP writes enabled, then you need to enable two different ports for UDP
(each for different retention policy) in your InfluxDB configuration. The UDP configuration
section of your InfluxDB should look similar to the following:

.. code-block:: text
# For writing data with the "default" retention policy
[[udp]]
enabled = true
bind-address = "127.0.0.1:8089"
database = "openwisp2"
# For writing data with the "short" retention policy
[[udp]]
enabled = true
bind-address = "127.0.0.1:8090"
database = "openwisp2"
retention-policy = 'short'
If you are using `ansible-openwisp2 <https://github.com/openwisp/ansible-openwisp2>`_
for deploying OpenWISP, you can set the ``influxdb_udp_mode`` ansible variable to ``true``
in your playbook, this will make the ansible role automatically configure the InfluxDB UDP listeners.
You can refer to the `ansible-ow-influxdb's <https://github.com/openwisp/ansible-ow-influxdb#role-variables>`_
(a dependency of ansible-openwisp2) documentation to learn more.

``OPENWISP_MONITORING_DEFAULT_RETENTION_POLICY``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -1736,6 +1824,10 @@ For more information regarding these settings, consult the `celery documentation
regarding automatic retries for known errors
<https://docs.celeryproject.org/en/stable/userguide/tasks.html#automatic-retry-for-known-exceptions>`_.

**Note:** The retry mechanism does not work when using ``UDP`` for writing
data to the timeseries database. It is due to the nature of ``UDP`` protocol
which does not acknowledge receipt of data packets.

``OPENWISP_MONITORING_TIMESERIES_RETRY_OPTIONS``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ services:
image: influxdb:1.8-alpine
volumes:
- influxdb-data:/var/lib/influxdb
- ./tests/influxdb.conf:/etc/influxdb/influxdb.conf
ports:
- "8086:8086"
- "8089:8089/udp"
- "8090:8090/udp"
- "8091:8091/udp"
- "8092:8092/udp"
environment:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
Expand Down
4 changes: 3 additions & 1 deletion openwisp_monitoring/check/tests/test_iperf3.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,9 @@ def test_iperf3_check(self, mock_warn, mock_exec_command):
iperf3_metric = Metric.objects.get(key='iperf3')
self.assertEqual(Metric.objects.count(), 3)
self.assertEqual(iperf3_metric.content_object, self.device)
points = iperf3_metric.read(limit=None, extra_fields=list(result.keys()))
points = self._read_metric(
iperf3_metric, limit=None, extra_fields=list(result.keys())
)
self.assertEqual(len(points), 1)
self.assertEqual(points[0]['iperf3_result'], result['iperf3_result'])
self.assertEqual(points[0]['sent_bps_tcp'], result['sent_bps_tcp'])
Expand Down
11 changes: 5 additions & 6 deletions openwisp_monitoring/check/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ def test_config_modified_device_problem(self):
d = Device.objects.first()
d.monitoring.update_status('ok')
self.assertEqual(Check.objects.count(), 3)
self.assertEqual(Metric.objects.count(), 0)
self.assertEqual(Metric.objects.filter(object_id=d.id).count(), 0)
self.assertEqual(AlertSettings.objects.count(), 0)
check = Check.objects.filter(check_type=self._CONFIG_APPLIED).first()
with freeze_time(now() - timedelta(minutes=10)):
check.perform_check()
self.assertEqual(Metric.objects.count(), 1)
self.assertEqual(Metric.objects.filter(object_id=d.id).count(), 1)
self.assertEqual(AlertSettings.objects.count(), 1)
# Check needs to be run again without mocking time for threshold crossed
check.perform_check()
Expand All @@ -170,7 +170,6 @@ def test_config_modified_device_problem(self):
dm = d.monitoring
dm.refresh_from_db()
self.assertEqual(m.is_healthy, False)
self.assertEqual(m.is_healthy_tolerant, False)
self.assertEqual(dm.status, 'problem')
self.assertEqual(Notification.objects.count(), 1)

Expand All @@ -187,15 +186,15 @@ def test_config_error(self):
dm = Device.objects.first().monitoring
dm.update_status('ok')
self.assertEqual(Check.objects.count(), 3)
self.assertEqual(Metric.objects.count(), 0)
self.assertEqual(Metric.objects.filter(object_id=dm.id).count(), 0)
self.assertEqual(AlertSettings.objects.count(), 0)
check = Check.objects.filter(check_type=self._CONFIG_APPLIED).first()
with freeze_time(now() - timedelta(minutes=10)):
check.perform_check()
# Check needs to be run again without mocking time for threshold crossed
self.assertEqual(check.perform_check(), 0)
self.assertEqual(Metric.objects.count(), 1)
m = Metric.objects.first()
self.assertEqual(Metric.objects.filter(object_id=dm.device_id).count(), 1)
m = Metric.objects.filter(object_id=dm.device_id).first()
self.assertEqual(AlertSettings.objects.count(), 1)
dm.refresh_from_db()
self.assertEqual(dm.status, 'problem')
Expand Down
2 changes: 1 addition & 1 deletion openwisp_monitoring/check/tests/test_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_store_result(self, mocked_method):
m = Metric.objects.first()
self.assertEqual(m.content_object, device)
self.assertEqual(m.key, 'ping')
points = m.read(limit=None, extra_fields=list(result.keys()))
points = self._read_metric(m, limit=None, extra_fields=list(result.keys()))
self.assertEqual(len(points), 1)
self.assertEqual(points[0]['reachable'], result['reachable'])
self.assertEqual(points[0]['loss'], result['loss'])
Expand Down
80 changes: 59 additions & 21 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import operator
import re
import sys
from collections import OrderedDict
from datetime import datetime

Expand All @@ -11,6 +12,7 @@
from django.utils.translation import gettext_lazy as _
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from influxdb.line_protocol import make_lines

from openwisp_monitoring.utils import retry

Expand Down Expand Up @@ -75,13 +77,45 @@ def drop_database(self):
@cached_property
def db(self):
"""Returns an ``InfluxDBClient`` instance"""
return InfluxDBClient(
TIMESERIES_DB['HOST'],
TIMESERIES_DB['PORT'],
TIMESERIES_DB['USER'],
TIMESERIES_DB['PASSWORD'],
self.db_name,
)
return self.dbs['default']

@cached_property
def dbs(self):
dbs = {
'default': InfluxDBClient(
TIMESERIES_DB['HOST'],
TIMESERIES_DB['PORT'],
TIMESERIES_DB['USER'],
TIMESERIES_DB['PASSWORD'],
self.db_name,
use_udp=TIMESERIES_DB.get('OPTIONS', {}).get('udp_writes', False),
udp_port=TIMESERIES_DB.get('OPTIONS', {}).get('udp_port', 8089),
),
}
if TIMESERIES_DB.get('OPTIONS', {}).get('udp_writes', False):
# When using UDP, InfluxDB allows only using one retention policy
# per port. Therefore, we need to have different instances of
# InfluxDBClient.
dbs['short'] = InfluxDBClient(
TIMESERIES_DB['HOST'],
TIMESERIES_DB['PORT'],
TIMESERIES_DB['USER'],
TIMESERIES_DB['PASSWORD'],
self.db_name,
use_udp=TIMESERIES_DB.get('OPTIONS', {}).get('udp_writes', False),
udp_port=TIMESERIES_DB.get('OPTIONS', {}).get('udp_port', 8089) + 1,
)
dbs['__all__'] = InfluxDBClient(
TIMESERIES_DB['HOST'],
TIMESERIES_DB['PORT'],
TIMESERIES_DB['USER'],
TIMESERIES_DB['PASSWORD'],
self.db_name,
)
else:
dbs['short'] = dbs['default']
dbs['__all__'] = dbs['default']
return dbs

@retry
def create_or_alter_retention_policy(self, name, duration):
Expand Down Expand Up @@ -110,11 +144,19 @@ def query(self, query, precision=None, **kwargs):
database=database,
)

def _write(self, data, params):
def _write(self, points, database, retention_policy):
db = self.dbs['short'] if retention_policy else self.dbs['default']
# If the size of data exceeds the limit of the UDP packet, then
# fallback to use TCP connection for writing data.
lines = make_lines({'points': points})
if sys.getsizeof(lines) > 65000:
db = self.dbs['__all__']
try:
self.db.write(
data=data,
params=params,
db.write_points(
points=lines.split('\n')[:-1],
database=database,
retention_policy=retention_policy,
protocol='line',
)
except Exception as exception:
logger.warning(f'got exception while writing to tsdb: {exception}')
Expand Down Expand Up @@ -143,11 +185,9 @@ def write(self, name, values, **kwargs):
'time': timestamp,
}
self._write(
data={'points': [point]},
params={
'db': kwargs.get('database') or self.db_name,
'rp': kwargs.get('retention_policy'),
},
points=[point],
database=kwargs.get('database') or self.db_name,
retention_policy=kwargs.get('retention_policy'),
)

def batch_write(self, metric_data):
Expand All @@ -171,11 +211,9 @@ def batch_write(self, metric_data):
for database in data_points.keys():
for rp in data_points[database].keys():
self._write(
data={'points': data_points[database][rp]},
params={
'db': database,
'rp': rp,
},
points=data_points[database][rp],
database=database,
retention_policy=rp,
)

def read(self, key, fields, tags, **kwargs):
Expand Down
20 changes: 19 additions & 1 deletion openwisp_monitoring/db/backends/influxdb/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from celery.exceptions import Retry
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.test import TestCase, tag
from django.utils.timezone import now
from freezegun import freeze_time
from influxdb import InfluxDBClient
Expand Down Expand Up @@ -32,6 +32,7 @@
Notification = load_model('openwisp_notifications', 'Notification')


@tag('timeseries_client')
class TestDatabaseClient(TestMonitoringMixin, TestCase):
def test_forbidden_queries(self):
queries = [
Expand Down Expand Up @@ -353,3 +354,20 @@ def test_retry_mechanism(self, mock_query):
'Error while executing method "query":\nServer error\nAttempt '
f'{max_retries} out of {max_retries}.\n'
)


class TestDatabaseClientUdp(TestMonitoringMixin, TestCase):
def test_exceed_udp_packet_limit(self):
# When using UDP to write data to InfluxDB, writing
# huge data that exceeds UDP packet limit should not raise
# an error. Instead, the client should fallback to the
# TCP connection.
timeseries_db.write(
'test_udp_write', dict(value='O' * 66000), database=self.TEST_DB
)
measurement = list(
timeseries_db.query(
'select * from test_udp_write', database=self.TEST_DB
).get_points()
)
self.assertEqual(len(measurement), 1)
Loading

0 comments on commit 476e7a3

Please sign in to comment.