diff --git a/.zuul.yaml b/.zuul.yaml index 898f5470..39e0e871 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -94,6 +94,17 @@ CLOUDKITTY_STORAGE_BACKEND: elasticsearch CLOUDKITTY_STORAGE_VERSION: 2 +- job: + name: cloudkitty-tempest-full-v2-storage-opensearch + parent: base-cloudkitty-v2-api-tempest-job + description: | + Job testing cloudkitty installation on devstack with python 3 and the + OpenSearch v2 storage driver and running tempest tests + vars: + devstack_localrc: + CLOUDKITTY_STORAGE_BACKEND: opensearch + CLOUDKITTY_STORAGE_VERSION: 2 + - job: name: cloudkitty-tox-bandit parent: openstack-tox @@ -130,6 +141,8 @@ - cloudkitty-tempest-full-v2-storage-influxdb - cloudkitty-tempest-full-v2-storage-elasticsearch: voting: false + - cloudkitty-tempest-full-v2-storage-opensearch: + voting: false - cloudkitty-tempest-full-v1-storage-sqlalchemy - cloudkitty-tempest-full-ipv6-only - cloudkitty-tox-bandit: diff --git a/cloudkitty/common/config.py b/cloudkitty/common/config.py index e3fdf4d2..35501142 100644 --- a/cloudkitty/common/config.py +++ b/cloudkitty/common/config.py @@ -32,6 +32,7 @@ import cloudkitty.storage.v1.hybrid.backends.gnocchi import cloudkitty.storage.v2.elasticsearch import cloudkitty.storage.v2.influx +import cloudkitty.storage.v2.opensearch import cloudkitty.utils __all__ = ['list_opts'] @@ -70,6 +71,8 @@ cloudkitty.storage.v2.influx.influx_storage_opts))), ('storage_elasticsearch', list(itertools.chain( cloudkitty.storage.v2.elasticsearch.elasticsearch_storage_opts))), + ('storage_opensearch', list(itertools.chain( + cloudkitty.storage.v2.opensearch.opensearch_storage_opts))), ('storage_gnocchi', list(itertools.chain( cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))), (None, list(itertools.chain( diff --git a/cloudkitty/storage/v2/opensearch/__init__.py b/cloudkitty/storage/v2/opensearch/__init__.py new file mode 100644 index 00000000..5fb89087 --- /dev/null +++ b/cloudkitty/storage/v2/opensearch/__init__.py @@ -0,0 +1,205 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import datetime + +from oslo_config import cfg +from oslo_log import log + +from cloudkitty import dataframe +from cloudkitty.storage import v2 as v2_storage +from cloudkitty.storage.v2.opensearch import client as os_client +from cloudkitty.storage.v2.opensearch import exceptions +from cloudkitty.utils import tz as tzutils + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +OPENSEARCH_STORAGE_GROUP = 'storage_opensearch' + +opensearch_storage_opts = [ + cfg.StrOpt( + 'host', + help='OpenSearch host, along with port and protocol. ' + 'Defaults to http://localhost:9200', + default='http://localhost:9200'), + cfg.StrOpt( + 'index_name', + help='OpenSearch index to use. Defaults to "cloudkitty".', + default='cloudkitty'), + cfg.BoolOpt('insecure', + help='Set to true to allow insecure HTTPS ' + 'connections to OpenSearch', + default=False), + cfg.StrOpt('cafile', + help='Path of the CA certificate to trust for ' + 'HTTPS connections.', + default=None), + cfg.IntOpt('scroll_duration', + help="Duration (in seconds) for which the OpenSearch scroll " + "contexts should be kept alive.", + advanced=True, + default=30, min=0, max=300), +] + +CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP) + +CLOUDKITTY_INDEX_MAPPING = { + "dynamic_templates": [ + { + "strings_as_keywords": { + "match_mapping_type": "string", + "mapping": { + "type": "keyword" + } + } + } + ], + "dynamic": False, + "properties": { + "start": {"type": "date"}, + "end": {"type": "date"}, + "type": {"type": "keyword"}, + "unit": {"type": "keyword"}, + "qty": {"type": "double"}, + "price": {"type": "double"}, + "groupby": {"dynamic": True, "type": "object"}, + "metadata": {"dynamic": True, "type": "object"} + }, +} + + +class OpenSearchStorage(v2_storage.BaseStorage): + + def __init__(self, *args, **kwargs): + super(OpenSearchStorage, self).__init__(*args, **kwargs) + + LOG.warning('The OpenSearch storage driver is experimental. ' + 'DO NOT USE IT IN PRODUCTION.') + + verify = not CONF.storage_opensearch.insecure + if verify and CONF.storage_opensearch.cafile: + verify = CONF.storage_opensearch.cafile + + self._conn = os_client.OpenSearchClient( + CONF.storage_opensearch.host, + CONF.storage_opensearch.index_name, + "_doc", + verify=verify) + + def init(self): + r = self._conn.get_index() + if r.status_code != 200: + raise exceptions.IndexDoesNotExist( + CONF.storage_opensearch.index_name) + LOG.info('Creating mapping "_doc" on index {}...'.format( + CONF.storage_opensearch.index_name)) + self._conn.post_mapping(CLOUDKITTY_INDEX_MAPPING) + LOG.info('Mapping created.') + + def push(self, dataframes, scope_id=None): + for frame in dataframes: + for type_, point in frame.iterpoints(): + start, end = self._local_to_utc(frame.start, frame.end) + self._conn.add_point(point, type_, start, end) + + self._conn.commit() + + @staticmethod + def _local_to_utc(*args): + return [tzutils.local_to_utc(arg) for arg in args] + + @staticmethod + def _doc_to_datapoint(doc): + return dataframe.DataPoint( + doc['unit'], + doc['qty'], + doc['price'], + doc['groupby'], + doc['metadata'], + ) + + def _build_dataframes(self, docs): + dataframes = {} + nb_points = 0 + for doc in docs: + source = doc['_source'] + start = tzutils.dt_from_iso(source['start']) + end = tzutils.dt_from_iso(source['end']) + key = (start, end) + if key not in dataframes.keys(): + dataframes[key] = dataframe.DataFrame(start=start, end=end) + dataframes[key].add_point( + self._doc_to_datapoint(source), source['type']) + nb_points += 1 + + output = list(dataframes.values()) + output.sort(key=lambda frame: (frame.start, frame.end)) + return output + + def retrieve(self, begin=None, end=None, + filters=None, + metric_types=None, + offset=0, limit=1000, paginate=True): + begin, end = self._local_to_utc(begin or tzutils.get_month_start(), + end or tzutils.get_next_month()) + total, docs = self._conn.retrieve( + begin, end, filters, metric_types, + offset=offset, limit=limit, paginate=paginate) + return { + 'total': total, + 'dataframes': self._build_dataframes(docs), + } + + def delete(self, begin=None, end=None, filters=None): + self._conn.delete_by_query(begin, end, filters) + + @staticmethod + def _normalize_time(t): + if isinstance(t, datetime.datetime): + return tzutils.utc_to_local(t) + return tzutils.dt_from_iso(t) + + def _doc_to_total_result(self, doc, start, end): + output = { + 'begin': self._normalize_time(doc.get('start', start)), + 'end': self._normalize_time(doc.get('end', end)), + 'qty': doc['sum_qty']['value'], + 'rate': doc['sum_price']['value'], + } + # Means we had a composite aggregation + if 'key' in doc.keys(): + for key, value in doc['key'].items(): + if key == 'begin' or key == 'end': + # OpenSearch returns ts in milliseconds + value = tzutils.dt_from_ts(value // 1000) + output[key] = value + return output + + def total(self, groupby=None, begin=None, end=None, metric_types=None, + filters=None, custom_fields=None, offset=0, limit=1000, + paginate=True): + begin, end = self._local_to_utc(begin or tzutils.get_month_start(), + end or tzutils.get_next_month()) + + total, docs = self._conn.total(begin, end, metric_types, filters, + groupby, custom_fields=custom_fields, + offset=offset, limit=limit, + paginate=paginate) + return { + 'total': total, + 'results': [self._doc_to_total_result(doc, begin, end) + for doc in docs], + } diff --git a/cloudkitty/storage/v2/opensearch/client.py b/cloudkitty/storage/v2/opensearch/client.py new file mode 100644 index 00000000..0b929552 --- /dev/null +++ b/cloudkitty/storage/v2/opensearch/client.py @@ -0,0 +1,412 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import itertools + +from oslo_log import log +import requests + +from cloudkitty.storage.v2.opensearch import exceptions +from cloudkitty.utils import json + +LOG = log.getLogger(__name__) + + +class OpenSearchClient(object): + """Class used to ease interaction with OpenSearch. + + :param autocommit: Defaults to True. Automatically push documents to + OpenSearch once chunk_size has been reached. + :type autocommit: bool + :param chunk_size: Maximal number of documents to commit/retrieve at once. + :type chunk_size: int + :param scroll_duration: Defaults to 60. Duration, in seconds, for which + search contexts should be kept alive + :type scroll_duration: int + """ + + def __init__(self, url, index_name, mapping_name, + verify=True, + autocommit=True, + chunk_size=5000, + scroll_duration=60): + self._url = url.strip('/') + self._index_name = index_name.strip('/') + self._mapping_name = mapping_name.strip('/') + self._autocommit = autocommit + self._chunk_size = chunk_size + self._scroll_duration = str(scroll_duration) + 's' + self._scroll_params = {'scroll': self._scroll_duration} + + self._docs = [] + self._scroll_ids = set() + + self._sess = requests.Session() + self._verify = self._sess.verify = verify + self._sess.headers = {'Content-Type': 'application/json'} + + @staticmethod + def _log_query(url, query, response): + message = 'Query on {} with body "{}" took {}ms'.format( + url, query, response['took']) + if 'hits' in response.keys(): + message += ' for {} hits'.format(response['hits']['total']) + LOG.debug(message) + + @staticmethod + def _build_must(start, end, metric_types, filters): + must = [] + if start: + must.append({"range": {"start": {"gte": start.isoformat()}}}) + if end: + must.append({"range": {"end": {"lte": end.isoformat()}}}) + + if filters and 'type' in filters.keys(): + must.append({'term': {'type': filters['type']}}) + + if metric_types: + must.append({"terms": {"type": metric_types}}) + + return must + + @staticmethod + def _build_should(filters): + if not filters: + return [] + + should = [] + for k, v in filters.items(): + if k != 'type': + should += [{'term': {'groupby.' + k: v}}, + {'term': {'metadata.' + k: v}}] + return should + + def _build_composite(self, groupby): + if not groupby: + return [] + sources = [] + for elem in groupby: + if elem == 'type': + sources.append({'type': {'terms': {'field': 'type'}}}) + elif elem == 'time': + # Not doing a date_histogram aggregation because we don't know + # the period + sources.append({'begin': {'terms': {'field': 'start'}}}) + sources.append({'end': {'terms': {'field': 'end'}}}) + else: + sources.append({elem: {'terms': {'field': 'groupby.' + elem}}}) + + return {"sources": sources} + + @staticmethod + def _build_query(must, should, composite): + query = {} + + if must or should: + query["query"] = {"bool": {}} + + if must: + query["query"]["bool"]["must"] = must + + if should: + query["query"]["bool"]["should"] = should + # We want each term to match exactly once, and each term introduces + # two "term" aggregations: one for "groupby" and one for "metadata" + query["query"]["bool"]["minimum_should_match"] = len(should) // 2 + + if composite: + query["aggs"] = {"sum_and_price": { + "composite": composite, + "aggregations": { + "sum_price": {"sum": {"field": "price"}}, + "sum_qty": {"sum": {"field": "qty"}}, + } + }} + + return query + + def _req(self, method, url, data, params, deserialize=True): + r = method(url, data=data, params=params) + if r.status_code < 200 or r.status_code >= 300: + raise exceptions.InvalidStatusCode( + 200, r.status_code, r.text, data) + if not deserialize: + return r + output = r.json() + self._log_query(url, data, output) + return output + + def post_mapping(self, mapping): + """Does a POST request against OpenSearch's mapping API. + + The POST request will be done against + `//` + + :mapping: body of the request + :type mapping: dict + :rtype: requests.models.Response + """ + url = '/'.join( + (self._url, self._index_name, self._mapping_name)) + return self._req( + self._sess.post, url, json.dumps(mapping), {}, deserialize=False) + + def get_index(self): + """Does a GET request against OpenSearch's index API. + + The GET request will be done against `/` + + :rtype: requests.models.Response + """ + url = '/'.join((self._url, self._index_name)) + return self._req(self._sess.get, url, None, None, deserialize=False) + + def search(self, body, scroll=True): + """Does a GET request against OpenSearch's search API. + + The GET request will be done against `//_search` + + :param body: body of the request + :type body: dict + :rtype: dict + """ + url = '/'.join((self._url, self._index_name, '_search')) + params = self._scroll_params if scroll else None + return self._req( + self._sess.get, url, json.dumps(body), params) + + def scroll(self, body): + """Does a GET request against OpenSearch's scroll API. + + The GET request will be done against `/_search/scroll` + + :param body: body of the request + :type body: dict + :rtype: dict + """ + url = '/'.join((self._url, '_search/scroll')) + return self._req(self._sess.get, url, json.dumps(body), None) + + def close_scroll(self, body): + """Does a DELETE request against OpenSearch's scroll API. + + The DELETE request will be done against `/_search/scroll` + + :param body: body of the request + :type body: dict + :rtype: dict + """ + url = '/'.join((self._url, '_search/scroll')) + resp = self._req( + self._sess.delete, url, json.dumps(body), None, deserialize=False) + body = resp.json() + LOG.debug('Freed {} scrolls contexts'.format(body['num_freed'])) + return body + + def close_scrolls(self): + """Closes all scroll contexts opened by this client.""" + ids = list(self._scroll_ids) + LOG.debug('Closing {} scroll contexts: {}'.format(len(ids), ids)) + self.close_scroll({'scroll_id': ids}) + self._scroll_ids = set() + + def bulk_with_instruction(self, instruction, terms): + """Does a POST request against OpenSearch's bulk API + + The POST request will be done against + `//_bulk` + + The instruction will be appended before each term. For example, + bulk_with_instruction('instr', ['one', 'two']) will produce:: + + instr + one + instr + two + + :param instruction: instruction to execute for each term + :type instruction: dict + :param terms: list of terms for which instruction should be executed + :type terms: collections.abc.Iterable + :rtype: requests.models.Response + """ + instruction = json.dumps(instruction) + data = '\n'.join(itertools.chain( + *[(instruction, json.dumps(term)) for term in terms] + )) + '\n' + url = '/'.join( + (self._url, self._index_name, '_bulk')) + return self._req(self._sess.post, url, data, None, deserialize=False) + + def bulk_index(self, terms): + """Indexes each of the documents in 'terms' + + :param terms: list of documents to index + :type terms: collections.abc.Iterable + """ + LOG.debug("Indexing {} documents".format(len(terms))) + return self.bulk_with_instruction({"index": {}}, terms) + + def commit(self): + """Index all documents""" + while self._docs: + self.bulk_index(self._docs[:self._chunk_size]) + self._docs = self._docs[self._chunk_size:] + + def add_point(self, point, type_, start, end): + """Append a point to the client. + + :param point: DataPoint to append + :type point: cloudkitty.dataframe.DataPoint + :param type_: type of the DataPoint + :type type_: str + """ + self._docs.append({ + 'start': start, + 'end': end, + 'type': type_, + 'unit': point.unit, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + }) + if self._autocommit and len(self._docs) >= self._chunk_size: + self.commit() + + def _get_chunk_size(self, offset, limit, paginate): + if paginate and offset + limit < self._chunk_size: + return offset + limit + return self._chunk_size + + def retrieve(self, begin, end, filters, metric_types, + offset=0, limit=1000, paginate=True): + """Retrieves a paginated list of documents from OpenSearch.""" + if not paginate: + offset = 0 + + query = self._build_query( + self._build_must(begin, end, metric_types, filters), + self._build_should(filters), None) + query['size'] = self._get_chunk_size(offset, limit, paginate) + + resp = self.search(query) + + scroll_id = resp['_scroll_id'] + self._scroll_ids.add(scroll_id) + total_hits = resp['hits']['total'] + + if isinstance(total_hits, dict): + LOG.debug("Total hits [%s] is a dict. Therefore, we only extract " + "the 'value' attribute as the total option.", total_hits) + total_hits = total_hits.get("value") + + total = total_hits + chunk = resp['hits']['hits'] + + output = chunk[offset:offset+limit if paginate else len(chunk)] + offset = 0 if len(chunk) > offset else offset - len(chunk) + + while (not paginate) or len(output) < limit: + resp = self.scroll({ + 'scroll_id': scroll_id, + 'scroll': self._scroll_duration, + }) + + scroll_id, chunk = resp['_scroll_id'], resp['hits']['hits'] + self._scroll_ids.add(scroll_id) + # Means we've scrolled until the end + if not chunk: + break + + output += chunk[offset:offset+limit if paginate else len(chunk)] + offset = 0 if len(chunk) > offset else offset - len(chunk) + + self.close_scrolls() + return total, output + + def delete_by_query(self, begin=None, end=None, filters=None): + """Does a POST request against ES's Delete By Query API. + + The POST request will be done against + `//_delete_by_query` + + :param filters: Optional filters for documents to delete + :type filters: list of dicts + :rtype: requests.models.Response + """ + url = '/'.join((self._url, self._index_name, '_delete_by_query')) + must = self._build_must(begin, end, None, filters) + data = (json.dumps({"query": {"bool": {"must": must}}}) + if must else None) + return self._req(self._sess.post, url, data, None) + + def total(self, begin, end, metric_types, filters, groupby, + custom_fields=None, offset=0, limit=1000, paginate=True): + + if custom_fields: + LOG.warning("'custom_fields' are not implemented yet for " + "OpenSearch. Therefore, the custom fields [%s] " + "informed by the user will be ignored.", custom_fields) + if not paginate: + offset = 0 + + must = self._build_must(begin, end, metric_types, filters) + should = self._build_should(filters) + composite = self._build_composite(groupby) if groupby else None + if composite: + composite['size'] = self._chunk_size + query = self._build_query(must, should, composite) + + if "aggs" not in query.keys(): + query["aggs"] = { + "sum_price": {"sum": {"field": "price"}}, + "sum_qty": {"sum": {"field": "qty"}}, + } + + query['size'] = 0 + + resp = self.search(query, scroll=False) + + # Means we didn't group, so length is 1 + if not composite: + return 1, [resp["aggregations"]] + + after = resp["aggregations"]["sum_and_price"].get("after_key") + chunk = resp["aggregations"]["sum_and_price"]["buckets"] + + total = len(chunk) + + output = chunk[offset:offset+limit if paginate else len(chunk)] + offset = 0 if len(chunk) > offset else offset - len(chunk) + + # FIXME(peschk_l): We have to iterate over ALL buckets in order to get + # the total length. If there is a way for composite aggregations to get + # the total amount of buckets, please fix this + while after: + composite_query = query["aggs"]["sum_and_price"]["composite"] + composite_query["size"] = self._chunk_size + composite_query["after"] = after + resp = self.search(query, scroll=False) + after = resp["aggregations"]["sum_and_price"].get("after_key") + chunk = resp["aggregations"]["sum_and_price"]["buckets"] + if not chunk: + break + output += chunk[offset:offset+limit if paginate else len(chunk)] + offset = 0 if len(chunk) > offset else offset - len(chunk) + total += len(chunk) + + if paginate: + output = output[offset:offset+limit] + return total, output diff --git a/cloudkitty/storage/v2/opensearch/exceptions.py b/cloudkitty/storage/v2/opensearch/exceptions.py new file mode 100644 index 00000000..e89c92a3 --- /dev/null +++ b/cloudkitty/storage/v2/opensearch/exceptions.py @@ -0,0 +1,32 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class BaseOpenSearchException(Exception): + """Base exception raised by the OpenSearch v2 storage driver""" + + +class InvalidStatusCode(BaseOpenSearchException): + def __init__(self, expected, actual, msg, query): + super(InvalidStatusCode, self).__init__( + "Expected {} status code, got {}: {}. Query was {}".format( + expected, actual, msg, query)) + + +class IndexDoesNotExist(BaseOpenSearchException): + def __init__(self, index_name): + super(IndexDoesNotExist, self).__init__( + "OpenSearch index {} does not exist".format(index_name) + ) diff --git a/cloudkitty/tests/storage/v2/opensearch/__init__.py b/cloudkitty/tests/storage/v2/opensearch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cloudkitty/tests/storage/v2/opensearch/test_client.py b/cloudkitty/tests/storage/v2/opensearch/test_client.py new file mode 100644 index 00000000..2fba72dc --- /dev/null +++ b/cloudkitty/tests/storage/v2/opensearch/test_client.py @@ -0,0 +1,482 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import collections +import datetime +import unittest +from unittest import mock + +from dateutil import tz + +from cloudkitty import dataframe +from cloudkitty.storage.v2.opensearch import client +from cloudkitty.storage.v2.opensearch import exceptions + + +class TestOpenSearchClient(unittest.TestCase): + + def setUp(self): + super(TestOpenSearchClient, self).setUp() + self.client = client.OpenSearchClient( + 'http://opensearch:9200', + 'index_name', + 'test_mapping', + autocommit=False) + + def test_build_must_no_params(self): + self.assertEqual(self.client._build_must(None, None, None, None), []) + + def test_build_must_with_start_end(self): + start = datetime.datetime(2019, 8, 30, tzinfo=tz.tzutc()) + end = datetime.datetime(2019, 8, 31, tzinfo=tz.tzutc()) + self.assertEqual( + self.client._build_must(start, end, None, None), + [{'range': {'start': {'gte': '2019-08-30T00:00:00+00:00'}}}, + {'range': {'end': {'lte': '2019-08-31T00:00:00+00:00'}}}], + ) + + def test_build_must_with_filters(self): + filters = {'one': '1', 'two': '2', 'type': 'awesome'} + self.assertEqual( + self.client._build_must(None, None, None, filters), + [{'term': {'type': 'awesome'}}], + ) + + def test_build_must_with_metric_types(self): + types = ['awesome', 'amazing'] + self.assertEqual( + self.client._build_must(None, None, types, None), + [{'terms': {'type': ['awesome', 'amazing']}}], + ) + + def test_build_should_no_filters(self): + self.assertEqual( + self.client._build_should(None), + [], + ) + + def test_build_should_with_filters(self): + filters = collections.OrderedDict([ + ('one', '1'), ('two', '2'), ('type', 'awesome')]) + self.assertEqual( + self.client._build_should(filters), + [ + {'term': {'groupby.one': '1'}}, + {'term': {'metadata.one': '1'}}, + {'term': {'groupby.two': '2'}}, + {'term': {'metadata.two': '2'}}, + ], + ) + + def test_build_composite_no_groupby(self): + self.assertEqual(self.client._build_composite(None), []) + + def test_build_composite(self): + self.assertEqual( + self.client._build_composite(['one', 'type', 'two']), + {'sources': [ + {'one': {'terms': {'field': 'groupby.one'}}}, + {'type': {'terms': {'field': 'type'}}}, + {'two': {'terms': {'field': 'groupby.two'}}}, + ]}, + ) + + def test_build_query_no_args(self): + self.assertEqual(self.client._build_query(None, None, None), {}) + + def test_build_query(self): + must = [{'range': {'start': {'gte': '2019-08-30T00:00:00+00:00'}}}, + {'range': {'start': {'lt': '2019-08-31T00:00:00+00:00'}}}] + should = [ + {'term': {'groupby.one': '1'}}, + {'term': {'metadata.one': '1'}}, + {'term': {'groupby.two': '2'}}, + {'term': {'metadata.two': '2'}}, + ] + composite = {'sources': [ + {'one': {'terms': {'field': 'groupby.one'}}}, + {'type': {'terms': {'field': 'type'}}}, + {'two': {'terms': {'field': 'groupby.two'}}}, + ]} + expected = { + 'query': { + 'bool': { + 'must': must, + 'should': should, + 'minimum_should_match': 2, + }, + }, + 'aggs': { + 'sum_and_price': { + 'composite': composite, + 'aggregations': { + "sum_price": {"sum": {"field": "price"}}, + "sum_qty": {"sum": {"field": "qty"}}, + }, + }, + }, + } + self.assertEqual( + self.client._build_query(must, should, composite), expected) + + def test_log_query_no_hits(self): + url = '/endpoint' + body = {'1': 'one'} + response = {'took': 42} + expected = """Query on /endpoint with body "{'1': 'one'}" took 42ms""" + with mock.patch.object(client.LOG, 'debug') as debug_mock: + self.client._log_query(url, body, response) + debug_mock.assert_called_once_with(expected) + + def test_log_query_with_hits(self): + url = '/endpoint' + body = {'1': 'one'} + response = {'took': 42, 'hits': {'total': 1337}} + expected = """Query on /endpoint with body "{'1': 'one'}" took 42ms""" + expected += " for 1337 hits" + with mock.patch.object(client.LOG, 'debug') as debug_mock: + self.client._log_query(url, body, response) + debug_mock.assert_called_once_with(expected) + + def test_req_valid_status_code_no_deserialize(self): + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + method_mock = mock.MagicMock() + method_mock.return_value = resp_mock + req_resp = self.client._req( + method_mock, None, None, None, deserialize=False) + method_mock.assert_called_once_with(None, data=None, params=None) + self.assertEqual(req_resp, resp_mock) + + def test_req_valid_status_code_deserialize(self): + resp_mock = mock.MagicMock() + resp_mock.status_code = 200 + resp_mock.json.return_value = 'output' + method_mock = mock.MagicMock() + method_mock.return_value = resp_mock + with mock.patch.object(self.client, '_log_query') as log_mock: + req_resp = self.client._req( + method_mock, None, None, None, deserialize=True) + method_mock.assert_called_once_with(None, data=None, params=None) + self.assertEqual(req_resp, 'output') + log_mock.assert_called_once_with(None, None, 'output') + + def test_req_invalid_status_code(self): + resp_mock = mock.MagicMock() + resp_mock.status_code = 400 + method_mock = mock.MagicMock() + method_mock.return_value = resp_mock + self.assertRaises(exceptions.InvalidStatusCode, + self.client._req, + method_mock, None, None, None) + + def test_post_mapping(self): + mapping = {'a': 'b'} + with mock.patch.object(self.client, '_req') as rmock: + self.client.post_mapping(mapping) + rmock.assert_called_once_with( + self.client._sess.post, + 'http://opensearch:9200/index_name/test_mapping', + '{"a": "b"}', {}, deserialize=False) + + def test_get_index(self): + with mock.patch.object(self.client, '_req') as rmock: + self.client.get_index() + rmock.assert_called_once_with( + self.client._sess.get, + 'http://opensearch:9200/index_name', + None, None, deserialize=False) + + def test_search_without_scroll(self): + mapping = {'a': 'b'} + with mock.patch.object(self.client, '_req') as rmock: + self.client.search(mapping, scroll=False) + rmock.assert_called_once_with( + self.client._sess.get, + 'http://opensearch:9200/index_name/_search', + '{"a": "b"}', None) + + def test_search_with_scroll(self): + mapping = {'a': 'b'} + with mock.patch.object(self.client, '_req') as rmock: + self.client.search(mapping, scroll=True) + rmock.assert_called_once_with( + self.client._sess.get, + 'http://opensearch:9200/index_name/_search', + '{"a": "b"}', {'scroll': '60s'}) + + def test_scroll(self): + body = {'a': 'b'} + with mock.patch.object(self.client, '_req') as rmock: + self.client.scroll(body) + rmock.assert_called_once_with( + self.client._sess.get, + 'http://opensearch:9200/_search/scroll', + '{"a": "b"}', None) + + def test_close_scroll(self): + body = {'a': 'b'} + with mock.patch.object(self.client, '_req') as rmock: + self.client.close_scroll(body) + rmock.assert_called_once_with( + self.client._sess.delete, + 'http://opensearch:9200/_search/scroll', + '{"a": "b"}', None, deserialize=False) + + def test_close_scrolls(self): + with mock.patch.object(self.client, 'close_scroll') as func_mock: + with mock.patch.object(self.client, '_scroll_ids', + new=['a', 'b', 'c']): + self.client.close_scrolls() + func_mock.assert_called_once_with( + {'scroll_id': ['a', 'b', 'c']}) + self.assertSetEqual(set(), self.client._scroll_ids) + + def test_bulk_with_instruction(self): + instruction = {'instruction': {}} + terms = ('one', 'two', 'three') + expected_data = ''.join([ + '{"instruction": {}}\n' + '"one"\n' + '{"instruction": {}}\n' + '"two"\n' + '{"instruction": {}}\n' + '"three"\n', + ]) + + with mock.patch.object(self.client, '_req') as rmock: + self.client.bulk_with_instruction(instruction, terms) + rmock.assert_called_once_with( + self.client._sess.post, + 'http://opensearch:9200/index_name/_bulk', + expected_data, None, deserialize=False) + + def test_bulk_index(self): + terms = ('one', 'two', 'three') + with mock.patch.object(self.client, 'bulk_with_instruction') as fmock: + self.client.bulk_index(terms) + fmock.assert_called_once_with({'index': {}}, terms) + + def test_commit(self): + docs = ['one', 'two', 'three', 'four', 'five', 'six', 'seven'] + size = 3 + with mock.patch.object(self.client, 'bulk_index') as bulk_mock: + with mock.patch.object(self.client, '_docs', new=docs): + with mock.patch.object(self.client, '_chunk_size', new=size): + self.client.commit() + bulk_mock.assert_has_calls([ + mock.call(['one', 'two', 'three']), + mock.call(['four', 'five', 'six']), + mock.call(['seven']), + ]) + + def test_add_point_no_autocommit(self): + point = dataframe.DataPoint( + 'unit', '0.42', '0.1337', {}, {}) + start = datetime.datetime(2019, 1, 1) + end = datetime.datetime(2019, 1, 1, 1) + with mock.patch.object(self.client, 'commit') as func_mock: + with mock.patch.object(self.client, '_autocommit', new=False): + with mock.patch.object(self.client, '_chunk_size', new=3): + self.client._docs = [] + for _ in range(5): + self.client.add_point( + point, 'awesome_type', start, end) + + func_mock.assert_not_called() + self.assertEqual(self.client._docs, [{ + 'start': start, + 'end': end, + 'type': 'awesome_type', + 'unit': point.unit, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + } for _ in range(5)]) + + self.client._docs = [] + + def test_add_point_with_autocommit(self): + point = dataframe.DataPoint( + 'unit', '0.42', '0.1337', {}, {}) + start = datetime.datetime(2019, 1, 1) + end = datetime.datetime(2019, 1, 1, 1) + + commit_calls = {'count': 0} + + def commit(): + # We can't re-assign nonlocal variables in python2 + commit_calls['count'] += 1 + self.client._docs = [] + + with mock.patch.object(self.client, 'commit', new=commit): + with mock.patch.object(self.client, '_autocommit', new=True): + with mock.patch.object(self.client, '_chunk_size', new=3): + self.client._docs = [] + for i in range(5): + self.client.add_point( + point, 'awesome_type', start, end) + + self.assertEqual(commit_calls['count'], 1) + self.assertEqual(self.client._docs, [{ + 'start': start, + 'end': end, + 'type': 'awesome_type', + 'unit': point.unit, + 'qty': point.qty, + 'price': point.price, + 'groupby': point.groupby, + 'metadata': point.metadata, + } for _ in range(2)]) + + # cleanup + self.client._docs = [] + + def test_delete_by_query_with_must(self): + with mock.patch.object(self.client, '_req') as rmock: + with mock.patch.object(self.client, '_build_must') as func_mock: + func_mock.return_value = {'a': 'b'} + self.client.delete_by_query() + rmock.assert_called_once_with( + self.client._sess.post, + 'http://opensearch:9200/index_name/_delete_by_query', + '{"query": {"bool": {"must": {"a": "b"}}}}', None) + + def test_delete_by_query_no_must(self): + with mock.patch.object(self.client, '_req') as rmock: + with mock.patch.object(self.client, '_build_must') as func_mock: + func_mock.return_value = {} + self.client.delete_by_query() + rmock.assert_called_once_with( + self.client._sess.post, + 'http://opensearch:9200/index_name/_delete_by_query', + None, None) + + def test_retrieve_no_pagination(self): + search_resp = { + '_scroll_id': '000', + 'hits': {'hits': ['one', 'two', 'three'], 'total': 12}, + } + scroll_resps = [{ + '_scroll_id': str(i + 1) * 3, + 'hits': {'hits': ['one', 'two', 'three']}, + } for i in range(3)] + scroll_resps.append({'_scroll_id': '444', 'hits': {'hits': []}}) + + self.client._scroll_ids = set() + + with mock.patch.object(self.client, 'search') as search_mock: + with mock.patch.object(self.client, 'scroll') as scroll_mock: + with mock.patch.object(self.client, 'close_scrolls') as close: + search_mock.return_value = search_resp + scroll_mock.side_effect = scroll_resps + + total, resp = self.client.retrieve( + None, None, None, None, paginate=False) + search_mock.assert_called_once() + scroll_mock.assert_has_calls([ + mock.call({ + 'scroll_id': str(i) * 3, + 'scroll': '60s', + }) for i in range(4) + ]) + self.assertEqual(total, 12) + self.assertEqual(resp, ['one', 'two', 'three'] * 4) + self.assertSetEqual(self.client._scroll_ids, + set(str(i) * 3 for i in range(5))) + close.assert_called_once() + + self.client._scroll_ids = set() + + def test_retrieve_with_pagination(self): + search_resp = { + '_scroll_id': '000', + 'hits': {'hits': ['one', 'two', 'three'], 'total': 12}, + } + scroll_resps = [{ + '_scroll_id': str(i + 1) * 3, + 'hits': {'hits': ['one', 'two', 'three']}, + } for i in range(3)] + scroll_resps.append({'_scroll_id': '444', 'hits': {'hits': []}}) + + self.client._scroll_ids = set() + + with mock.patch.object(self.client, 'search') as search_mock: + with mock.patch.object(self.client, 'scroll') as scroll_mock: + with mock.patch.object(self.client, 'close_scrolls') as close: + search_mock.return_value = search_resp + scroll_mock.side_effect = scroll_resps + + total, resp = self.client.retrieve( + None, None, None, None, + offset=2, limit=4, paginate=True) + search_mock.assert_called_once() + scroll_mock.assert_called_once_with({ + 'scroll_id': '000', + 'scroll': '60s', + }) + self.assertEqual(total, 12) + self.assertEqual(resp, ['three', 'one', 'two', 'three']) + self.assertSetEqual(self.client._scroll_ids, + set(str(i) * 3 for i in range(2))) + close.assert_called_once() + + self.client._scroll_ids = set() + + def _do_test_total(self, groupby, paginate): + with mock.patch.object(self.client, 'search') as search_mock: + if groupby: + search_resps = [{ + 'aggregations': { + 'sum_and_price': { + 'buckets': ['one', 'two', 'three'], + 'after_key': str(i), + } + } + } for i in range(3)] + last_resp_aggs = search_resps[2]['aggregations'] + last_resp_aggs['sum_and_price'].pop('after_key') + last_resp_aggs['sum_and_price']['buckets'] = [] + search_mock.side_effect = search_resps + else: + search_mock.return_value = { + 'aggregations': ['one', 'two', 'three'], + } + resp = self.client.total(None, None, None, None, groupby, + offset=2, limit=4, paginate=paginate) + if not groupby: + search_mock.assert_called_once() + + return resp + + def test_total_no_groupby_no_pagination(self): + total, aggs = self._do_test_total(None, False) + self.assertEqual(total, 1) + self.assertEqual(aggs, [['one', 'two', 'three']]) + + def test_total_no_groupby_with_pagination(self): + total, aggs = self._do_test_total(None, True) + self.assertEqual(total, 1) + self.assertEqual(aggs, [['one', 'two', 'three']]) + + def test_total_with_groupby_no_pagination(self): + total, aggs = self._do_test_total(['x'], False) + self.assertEqual(total, 6) + self.assertEqual(aggs, ['one', 'two', 'three'] * 2) + + def test_total_with_groupby_with_pagination(self): + total, aggs = self._do_test_total(['x'], True) + self.assertEqual(total, 6) + self.assertEqual(aggs, ['three', 'one', 'two', 'three']) diff --git a/cloudkitty/tests/storage/v2/opensearch_utils.py b/cloudkitty/tests/storage/v2/opensearch_utils.py new file mode 100644 index 00000000..ffd86963 --- /dev/null +++ b/cloudkitty/tests/storage/v2/opensearch_utils.py @@ -0,0 +1,97 @@ +# Copyright 2019 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import copy +import functools +import itertools + +import requests + +from cloudkitty.storage.v2.opensearch import client + + +class FakeOpenSearchClient(client.OpenSearchClient): + + def __init__(self, *args, **kwargs): + kwargs["autocommit"] = False + super(FakeOpenSearchClient, self).__init__(*args, **kwargs) + for method in ('get_index', 'post_mapping'): + setattr(self, method, self.__base_response) + + @staticmethod + def __base_response(*args, **kwargs): + r = requests.Response() + r.status_code = 200 + return r + + def commit(self): + pass + + @staticmethod + def __filter_func(begin, end, filters, mtypes, doc): + type_filter = lambda doc: ( # noqa: E731 + doc['type'] in mtypes if mtypes else True) + time_filter = lambda doc: ( # noqa: E731 + (doc['start'] >= begin if begin else True) + and (doc['start'] < end if end else True)) + + def filter_(doc): + return all((doc['groupby'].get(k) == v + or (doc['metadata'].get(k) == v) + for k, v in filters.items())) if filters else True + + return type_filter(doc) and time_filter(doc) and filter_(doc) + + def retrieve(self, begin, end, filters, metric_types, + offset=0, limit=1000, paginate=True): + filter_func = functools.partial( + self.__filter_func, begin, end, filters, metric_types) + output = list(filter(filter_func, self._docs))[offset:offset+limit] + for doc in output: + doc["start"] = doc["start"].isoformat() + doc["end"] = doc["end"].isoformat() + doc["_source"] = copy.deepcopy(doc) + return len(output), output + + def total(self, begin, end, metric_types, filters, groupby, + custom_fields=None, offset=0, limit=1000, paginate=True): + filter_func = functools.partial( + self.__filter_func, begin, end, filters, metric_types) + docs = list(filter(filter_func, self._docs)) + if not groupby: + return 1, [{ + 'sum_qty': {'value': sum(doc['qty'] for doc in docs)}, + 'sum_price': {'value': sum(doc['price'] for doc in docs)}, + 'begin': begin, + 'end': end, + }] + + output = [] + key_func = lambda d: tuple( # noqa: E731 + d['type'] if g == 'type' else d['groupby'][g] for g in groupby) + docs.sort(key=key_func) + + for groups, values in itertools.groupby(docs, key_func): + val_list = list(values) + output.append({ + 'begin': begin, + 'end': end, + 'sum_qty': {'value': sum(doc['qty'] for doc in val_list)}, + 'sum_price': {'value': sum(doc['price'] for doc in val_list)}, + 'key': dict(zip(groupby, groups)), + }) + return len(output), output[offset:offset+limit] + + def _req(self, method, url, data, params, deserialize=True): + pass diff --git a/cloudkitty/tests/storage/v2/test_storage_unit.py b/cloudkitty/tests/storage/v2/test_storage_unit.py index dbd6b1ff..8a5e90abe 100644 --- a/cloudkitty/tests/storage/v2/test_storage_unit.py +++ b/cloudkitty/tests/storage/v2/test_storage_unit.py @@ -21,6 +21,7 @@ from cloudkitty.tests import samples from cloudkitty.tests.storage.v2 import es_utils from cloudkitty.tests.storage.v2 import influx_utils +from cloudkitty.tests.storage.v2 import opensearch_utils from cloudkitty.tests import TestCase from cloudkitty.tests import utils as test_utils from cloudkitty.utils import tz as tzutils @@ -32,11 +33,16 @@ _INFLUX_CLIENT_PATH = 'cloudkitty.storage.v2.influx.InfluxClient' +_OS_CLIENT_PATH = ('cloudkitty.storage.v2.opensearch' + '.client.OpenSearchClient') + + class StorageUnitTest(TestCase): storage_scenarios = [ - ('influx', dict(storage_backend='influxdb')), - ('elastic', dict(storage_backend='elasticsearch'))] + ('influxdb', dict(storage_backend='influxdb')), + ('elasticsearch', dict(storage_backend='elasticsearch')), + ('opensearch', dict(storage_backend='opensearch'))] @classmethod def generate_scenarios(cls): @@ -48,6 +54,8 @@ def generate_scenarios(cls): new=es_utils.FakeElasticsearchClient) @mock.patch(_INFLUX_CLIENT_PATH, new=influx_utils.FakeInfluxClient) + @mock.patch(_OS_CLIENT_PATH, + new=opensearch_utils.FakeOpenSearchClient) @mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf) def setUp(self): super(StorageUnitTest, self).setUp() diff --git a/devstack/plugin.sh b/devstack/plugin.sh index bc07e9f4..349943f7 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -178,6 +178,11 @@ function configure_cloudkitty { iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_ELASTICSEARCH_INDEX} fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "opensearch" ]; then + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_OPENSEARCH_HOST} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_OPENSEARCH_INDEX} + fi + # collect iniset $CLOUDKITTY_CONF collect collector $CLOUDKITTY_COLLECTOR iniset $CLOUDKITTY_CONF "collector_${CLOUDKITTY_COLLECTOR}" auth_section authinfos @@ -248,6 +253,12 @@ function create_elasticsearch_index { fi } +function create_opensearch_index { + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "opensearch" ]; then + curl -XPUT "${CLOUDKITTY_OPENSEARCH_HOST}/${CLOUDKITTY_OPENSEARCH_INDEX}" + fi +} + # init_cloudkitty() - Initialize CloudKitty database function init_cloudkitty { # Delete existing cache @@ -265,6 +276,7 @@ function init_cloudkitty { create_influxdb_database create_elasticsearch_index + create_opensearch_index # Migrate cloudkitty database $CLOUDKITTY_BIN_DIR/cloudkitty-dbsync upgrade @@ -301,13 +313,38 @@ function install_influx { sudo systemctl start influxdb || sudo systemctl restart influxdb } +function install_elasticsearch_ubuntu { + local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.deb) + sudo dpkg -i --skip-same-version ${opensearch_file} +} + +function install_elasticsearch_fedora { + local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.rpm) + sudo yum localinstall -y ${opensearch_file} +} + +function install_opensearch { + if is_ubuntu; then + install_opensearch_ubuntu + elif is_fedora; then + install_opensearch_fedora + else + die $LINENO "Distribution must be Debian or Fedora-based" + fi + if ! sudo grep plugins.security.disabled /etc/opensearch/opensearch.yml >/dev/null; then + echo "plugins.security.disabled: true" | sudo tee -a /etc/opensearch/opensearch.yml >/dev/null + fi + sudo systemctl enable opensearch + sudo systemctl start opensearch || sudo systemctl restart opensearch +} + function install_opensearch_ubuntu { - local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/2.5.0/opensearch-2.5.0-linux-x64.deb) + local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/2.11.0/opensearch-2.11.0-linux-x64.deb) sudo dpkg -i --skip-same-version ${opensearch_file} } function install_opensearch_fedora { - local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/2.5.0/opensearch-2.5.0-linux-x64.rpm) + local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/2.11.0/opensearch-2.11.0-linux-x64.rpm) sudo yum localinstall -y ${opensearch_file} } @@ -335,6 +372,8 @@ function install_cloudkitty { install_influx elif [ $CLOUDKITTY_STORAGE_BACKEND == 'elasticsearch' ]; then install_elasticsearch + elif [ $CLOUDKITTY_STORAGE_BACKEND == 'opensearch' ]; then + install_opensearch fi } diff --git a/devstack/settings b/devstack/settings index 5afb2ba6..106f81b4 100644 --- a/devstack/settings +++ b/devstack/settings @@ -76,3 +76,7 @@ CLOUDKITTY_INFLUXDB_DATABASE=${CLOUDKITTY_INFLUXDB_DATABASE:-"cloudkitty"} # Set elasticsearch info CLOUDKITTY_ELASTICSEARCH_HOST=${CLOUDKITTY_ELASTICSEARCH_HOST:-"http://localhost:9200"} CLOUDKITTY_ELASTICSEARCH_INDEX=${CLOUDKITTY_ELASTICSEARCH_INDEX:-"cloudkitty"} + +# Set opensearch info +CLOUDKITTY_OPENSEARCH_HOST=${CLOUDKITTY_OPENSEARCH_HOST:-"http://localhost:9200"} +CLOUDKITTY_OPENSEARCH_INDEX=${CLOUDKITTY_OPENSEARCH_INDEX:-"cloudkitty"} diff --git a/doc/source/admin/configuration/storage.rst b/doc/source/admin/configuration/storage.rst index 015d8b38..4b6d8b33 100644 --- a/doc/source/admin/configuration/storage.rst +++ b/doc/source/admin/configuration/storage.rst @@ -27,6 +27,7 @@ the configuration file. The following options are available: - ``influxdb`` - ``elasticsearch`` + - ``opensearch`` Driver-specific options ======================= @@ -75,7 +76,7 @@ Section: ``storage_influxdb``. regular exports of your data and create a custom retention policy on cloudkitty's database. -ElasticSearch (v2) +Elasticsearch (v2) ------------------ Section ``storage_elasticsearch``: @@ -90,5 +91,23 @@ Section ``storage_elasticsearch``: * ``cafile``: Path of the CA certificate to trust for HTTPS connections. -* ``scroll_duration``: Defaults to 30. Duration (in seconds) for which the ES - scroll contexts should be kept alive. +* ``scroll_duration``: Defaults to 30. Duration (in seconds) for which the + Elasticsearch scroll contexts should be kept alive. + +OpenSearch 2.x (v2) +------------------- + +Section ``storage_opensearch``: + +* ``host``: Defaults to ``http://localhost:9200``. OpenSearch 2.x host, along + with port and protocol. + +* ``index_name``: Defaults to ``cloudkitty``. OpenSearch index to use. + +* ``insecure``: Defaults to ``false``. Set to true to allow insecure HTTPS + connections to OpenSearch. + +* ``cafile``: Path of the CA certificate to trust for HTTPS connections. + +* ``scroll_duration``: Defaults to 30. Duration (in seconds) for which the + OpenSearch scroll contexts should be kept alive. diff --git a/releasenotes/notes/add-opensearch-as-v2-storage-backend-ff4080d6d32d8a2a.yaml b/releasenotes/notes/add-opensearch-as-v2-storage-backend-ff4080d6d32d8a2a.yaml new file mode 100644 index 00000000..ba311bd5 --- /dev/null +++ b/releasenotes/notes/add-opensearch-as-v2-storage-backend-ff4080d6d32d8a2a.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + OpenSearch has been added as an alternative v2 storage backend. It is a + duplicate of the ElasticSearch backend, with the naming changed where + appropriate. This change is in support of the deprecation of ElasticSearch + as a backend. diff --git a/setup.cfg b/setup.cfg index 5f032069..d83166b2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -71,6 +71,7 @@ cloudkitty.storage.v1.backends = cloudkitty.storage.v2.backends = influxdb = cloudkitty.storage.v2.influx:InfluxStorage elasticsearch = cloudkitty.storage.v2.elasticsearch:ElasticsearchStorage + opensearch = cloudkitty.storage.v2.opensearch:OpenSearchStorage cloudkitty.storage.hybrid.backends = gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage