Skip to content

Commit

Permalink
Merge pull request #85 from stackhpc/upstream/yoga-2024-05-27
Browse files Browse the repository at this point in the history
Synchronise yoga with upstream
  • Loading branch information
Alex-Welsh authored Jun 26, 2024
2 parents f45727b + 1e49dd0 commit 8f56ce9
Show file tree
Hide file tree
Showing 14 changed files with 1,329 additions and 7 deletions.
13 changes: 13 additions & 0 deletions .zuul.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@
CLOUDKITTY_STORAGE_BACKEND: elasticsearch
CLOUDKITTY_STORAGE_VERSION: 2

- job:
name: cloudkitty-tempest-full-v2-storage-opensearch
parent: base-cloudkitty-v2-api-tempest-job
description: |
Job testing cloudkitty installation on devstack with python 3 and the
OpenSearch v2 storage driver and running tempest tests
vars:
devstack_localrc:
CLOUDKITTY_STORAGE_BACKEND: opensearch
CLOUDKITTY_STORAGE_VERSION: 2

- job:
name: cloudkitty-tox-bandit
parent: openstack-tox
Expand Down Expand Up @@ -130,6 +141,8 @@
- cloudkitty-tempest-full-v2-storage-influxdb
- cloudkitty-tempest-full-v2-storage-elasticsearch:
voting: false
- cloudkitty-tempest-full-v2-storage-opensearch:
voting: false
- cloudkitty-tempest-full-v1-storage-sqlalchemy
- cloudkitty-tempest-full-ipv6-only
- cloudkitty-tox-bandit:
Expand Down
3 changes: 3 additions & 0 deletions cloudkitty/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cloudkitty.storage.v1.hybrid.backends.gnocchi
import cloudkitty.storage.v2.elasticsearch
import cloudkitty.storage.v2.influx
import cloudkitty.storage.v2.opensearch
import cloudkitty.utils

__all__ = ['list_opts']
Expand Down Expand Up @@ -70,6 +71,8 @@
cloudkitty.storage.v2.influx.influx_storage_opts))),
('storage_elasticsearch', list(itertools.chain(
cloudkitty.storage.v2.elasticsearch.elasticsearch_storage_opts))),
('storage_opensearch', list(itertools.chain(
cloudkitty.storage.v2.opensearch.opensearch_storage_opts))),
('storage_gnocchi', list(itertools.chain(
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
(None, list(itertools.chain(
Expand Down
205 changes: 205 additions & 0 deletions cloudkitty/storage/v2/opensearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Copyright 2019 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import datetime

from oslo_config import cfg
from oslo_log import log

from cloudkitty import dataframe
from cloudkitty.storage import v2 as v2_storage
from cloudkitty.storage.v2.opensearch import client as os_client
from cloudkitty.storage.v2.opensearch import exceptions
from cloudkitty.utils import tz as tzutils

LOG = log.getLogger(__name__)

CONF = cfg.CONF

OPENSEARCH_STORAGE_GROUP = 'storage_opensearch'

opensearch_storage_opts = [
cfg.StrOpt(
'host',
help='OpenSearch host, along with port and protocol. '
'Defaults to http://localhost:9200',
default='http://localhost:9200'),
cfg.StrOpt(
'index_name',
help='OpenSearch index to use. Defaults to "cloudkitty".',
default='cloudkitty'),
cfg.BoolOpt('insecure',
help='Set to true to allow insecure HTTPS '
'connections to OpenSearch',
default=False),
cfg.StrOpt('cafile',
help='Path of the CA certificate to trust for '
'HTTPS connections.',
default=None),
cfg.IntOpt('scroll_duration',
help="Duration (in seconds) for which the OpenSearch scroll "
"contexts should be kept alive.",
advanced=True,
default=30, min=0, max=300),
]

CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP)

CLOUDKITTY_INDEX_MAPPING = {
"dynamic_templates": [
{
"strings_as_keywords": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"dynamic": False,
"properties": {
"start": {"type": "date"},
"end": {"type": "date"},
"type": {"type": "keyword"},
"unit": {"type": "keyword"},
"qty": {"type": "double"},
"price": {"type": "double"},
"groupby": {"dynamic": True, "type": "object"},
"metadata": {"dynamic": True, "type": "object"}
},
}


class OpenSearchStorage(v2_storage.BaseStorage):

def __init__(self, *args, **kwargs):
super(OpenSearchStorage, self).__init__(*args, **kwargs)

LOG.warning('The OpenSearch storage driver is experimental. '
'DO NOT USE IT IN PRODUCTION.')

verify = not CONF.storage_opensearch.insecure
if verify and CONF.storage_opensearch.cafile:
verify = CONF.storage_opensearch.cafile

self._conn = os_client.OpenSearchClient(
CONF.storage_opensearch.host,
CONF.storage_opensearch.index_name,
"_doc",
verify=verify)

def init(self):
r = self._conn.get_index()
if r.status_code != 200:
raise exceptions.IndexDoesNotExist(
CONF.storage_opensearch.index_name)
LOG.info('Creating mapping "_doc" on index {}...'.format(
CONF.storage_opensearch.index_name))
self._conn.post_mapping(CLOUDKITTY_INDEX_MAPPING)
LOG.info('Mapping created.')

def push(self, dataframes, scope_id=None):
for frame in dataframes:
for type_, point in frame.iterpoints():
start, end = self._local_to_utc(frame.start, frame.end)
self._conn.add_point(point, type_, start, end)

self._conn.commit()

@staticmethod
def _local_to_utc(*args):
return [tzutils.local_to_utc(arg) for arg in args]

@staticmethod
def _doc_to_datapoint(doc):
return dataframe.DataPoint(
doc['unit'],
doc['qty'],
doc['price'],
doc['groupby'],
doc['metadata'],
)

def _build_dataframes(self, docs):
dataframes = {}
nb_points = 0
for doc in docs:
source = doc['_source']
start = tzutils.dt_from_iso(source['start'])
end = tzutils.dt_from_iso(source['end'])
key = (start, end)
if key not in dataframes.keys():
dataframes[key] = dataframe.DataFrame(start=start, end=end)
dataframes[key].add_point(
self._doc_to_datapoint(source), source['type'])
nb_points += 1

output = list(dataframes.values())
output.sort(key=lambda frame: (frame.start, frame.end))
return output

def retrieve(self, begin=None, end=None,
filters=None,
metric_types=None,
offset=0, limit=1000, paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())
total, docs = self._conn.retrieve(
begin, end, filters, metric_types,
offset=offset, limit=limit, paginate=paginate)
return {
'total': total,
'dataframes': self._build_dataframes(docs),
}

def delete(self, begin=None, end=None, filters=None):
self._conn.delete_by_query(begin, end, filters)

@staticmethod
def _normalize_time(t):
if isinstance(t, datetime.datetime):
return tzutils.utc_to_local(t)
return tzutils.dt_from_iso(t)

def _doc_to_total_result(self, doc, start, end):
output = {
'begin': self._normalize_time(doc.get('start', start)),
'end': self._normalize_time(doc.get('end', end)),
'qty': doc['sum_qty']['value'],
'rate': doc['sum_price']['value'],
}
# Means we had a composite aggregation
if 'key' in doc.keys():
for key, value in doc['key'].items():
if key == 'begin' or key == 'end':
# OpenSearch returns ts in milliseconds
value = tzutils.dt_from_ts(value // 1000)
output[key] = value
return output

def total(self, groupby=None, begin=None, end=None, metric_types=None,
filters=None, custom_fields=None, offset=0, limit=1000,
paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month())

total, docs = self._conn.total(begin, end, metric_types, filters,
groupby, custom_fields=custom_fields,
offset=offset, limit=limit,
paginate=paginate)
return {
'total': total,
'results': [self._doc_to_total_result(doc, begin, end)
for doc in docs],
}
Loading

0 comments on commit 8f56ce9

Please sign in to comment.