Skip to content

Commit

Permalink
initial HCS report finalization (project-koku#3648)
Browse files Browse the repository at this point in the history
* HCS report finalization
  • Loading branch information
ddonahue007 authored May 24, 2022
1 parent 5219983 commit 8a245a1
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 43 deletions.
16 changes: 9 additions & 7 deletions dev/scripts/load_test_customer_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -333,32 +333,34 @@ build_all(){
}

# ---execute---
case ${1} in
"AWS"|"aws")
provider_arg=`echo ${1} |tr [a-z] [A-Z]`

case ${provider_arg} in
"AWS")
check-api-status "Koku" "${KOKU_URL_PREFIX}/v1/status/"
check-api-status "Masu" "${MASU_URL_PREFIX}/v1/status/"
build_aws_data
enable_ocp_tags ;;
"AZURE"|"azure"|"Azure")
"AZURE")
check-api-status "Koku" "${KOKU_URL_PREFIX}/v1/status/"
check-api-status "Masu" "${MASU_URL_PREFIX}/v1/status/"
build_azure_data
enable_ocp_tags ;;
"GCP"|"gcp")
"GCP")
check-api-status "Koku" "${KOKU_URL_PREFIX}/v1/status/"
check-api-status "Masu" "${MASU_URL_PREFIX}/v1/status/"
build_gcp_data
enable_ocp_tags ;;
"ONPREM"|"onprem")
"ONPREM")
check-api-status "Koku" "${KOKU_URL_PREFIX}/v1/status/"
check-api-status "Masu" "${MASU_URL_PREFIX}/v1/status/"
build_onprem_data
enable_ocp_tags ;;
"all")
"ALL")
check-api-status "Koku" "${KOKU_URL_PREFIX}/v1/status/"
check-api-status "Masu" "${MASU_URL_PREFIX}/v1/status/"
build_all
enable_ocp_tags ;;
"HELP"|"help") usage;;
"HELP") usage;;
*) usage;;
esac
7 changes: 4 additions & 3 deletions koku/hcs/csv_file_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd

from api.common import log_json
from masu.util.aws.common import copy_local_report_file_to_s3_bucket
from masu.util.aws.common import copy_local_hcs_report_file_to_s3_bucket

LOG = logging.getLogger(__name__)

Expand All @@ -18,12 +18,13 @@ def __init__(self, schema_name, provider, provider_uuid):
self._provider = provider
self._provider_uuid = provider_uuid

def write_csv_to_s3(self, date, data, cols, tracing_id=None):
def write_csv_to_s3(self, date, data, cols, finalize=False, tracing_id=None):
"""
Generates an HCS CSV from the specified schema and provider.
:param date
:param data
:param cols
:param finalize
:param tracing_id
:return none
Expand All @@ -38,5 +39,5 @@ def write_csv_to_s3(self, date, data, cols, tracing_id=None):

LOG.info(log_json(tracing_id, "preparing to write file to object storage"))
my_df.to_csv(filename, header=cols, index=False)
copy_local_report_file_to_s3_bucket(tracing_id, s3_csv_path, filename, filename, "", date)
copy_local_hcs_report_file_to_s3_bucket(tracing_id, s3_csv_path, filename, filename, finalize, date)
os.remove(filename)
9 changes: 6 additions & 3 deletions koku/hcs/daily_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ def __init__(self, schema_name, provider, provider_uuid, tracing_id):
self._date_accessor = DateAccessor()
self._tracing_id = tracing_id

def generate_report(self, start_date, end_date):
def generate_report(self, start_date, end_date, finalize=False):
"""Generate HCS daily report
:param start_date (str) The date to start populating the table
:param end_date (str) The date to end on
:param finalize (bool) Set to True when report is final(default=False)
:returns (none)
returns (none)
"""
sql_file = f"sql/reporting_{self._provider.lower()}_hcs_daily_summary.sql"

with HCSReportDBAccessor(self._schema_name) as accessor:
for date in date_range(start_date, end_date, step=1):
accessor.get_hcs_daily_summary(date, self._provider, self._provider_uuid, sql_file, self._tracing_id)
accessor.get_hcs_daily_summary(
date, self._provider, self._provider_uuid, sql_file, self._tracing_id, finalize
)
20 changes: 16 additions & 4 deletions koku/hcs/database/report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ def __init__(self, schema):
self.date_accessor = DateAccessor()
self.jinja_sql = JinjaSql()

def get_hcs_daily_summary(self, date, provider, provider_uuid, sql_summary_file, tracing_id):
def get_hcs_daily_summary(self, date, provider, provider_uuid, sql_summary_file, tracing_id, finalize=False):
"""Build HCS daily report.
:param date (datetime.date) The date to process
:param provider (str) The provider name
:param provider_uuid (uuid) ID for cost source
:param sql_summary_file (str) The sql file used for processing
:param tracing_id (id) Logging identifier
:param finalize (bool) Set True when report is finalized(default=False)
:returns (None)
"""
Expand All @@ -53,6 +54,11 @@ def get_hcs_daily_summary(self, date, provider, provider_uuid, sql_summary_file,
try:
sql = pkgutil.get_data("hcs.database", sql_summary_file)
sql = sql.decode("utf-8")
table = HCS_TABLE_MAP.get(provider.strip("-local"))

if not self.table_exists_trino(table):
LOG.info(log_json(tracing_id, f"{table} does not exist, skipping..."))
return {}

sql_params = {
"provider_uuid": provider_uuid,
Expand All @@ -61,8 +67,9 @@ def get_hcs_daily_summary(self, date, provider, provider_uuid, sql_summary_file,
"date": date,
"schema": self.schema,
"ebs_acct_num": self._ebs_acct_num,
"table": HCS_TABLE_MAP.get(provider),
"table": table,
}

LOG.debug(log_json(tracing_id, f"SQL params: {sql_params}"))

sql, sql_params = self.jinja_sql.prepare_query(sql, sql_params)
Expand All @@ -77,9 +84,14 @@ def get_hcs_daily_summary(self, date, provider, provider_uuid, sql_summary_file,
if len(data) > 0:
LOG.info(log_json(tracing_id, f"data found for date: {date}"))
csv_handler = CSVFileHandler(self.schema, provider, provider_uuid)
csv_handler.write_csv_to_s3(date, data, cols, tracing_id)
csv_handler.write_csv_to_s3(date, data, cols, finalize, tracing_id)
else:
LOG.info(log_json(tracing_id, f"no data found for date: {date}"))
LOG.info(
log_json(
tracing_id,
f"no data found for date: {date}, " f"provider: {provider}, provider_uuid: {provider_uuid}",
)
)

except FileNotFoundError:
LOG.error(log_json(tracing_id, f"unable to locate SQL file: {sql_summary_file}"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT *, '{{ebs_acct_num | sqlsafe}}' as ebs_account_id
FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}}
WHERE source = '{{provider_uuid | sqlsafe}}'
AND year = '{{year | sqlsafe}}'
AND month = '{{month | sqlsafe}}'
AND bill_billingentity = 'AWS Marketplace'
AND lineitem_legalentity like '%Red Hat%'
AND lineitem_usagestartdate >= TIMESTAMP '{{date | sqlsafe}}'
AND lineitem_usagestartdate < date_add('day', 1, TIMESTAMP '{{date | sqlsafe}}')
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT *, '{{ebs_acct_num | sqlsafe}}' as ebs_account_id
FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}}
WHERE source = '{{provider_uuid | sqlsafe}}'
AND year = '{{year | sqlsafe}}'
AND month = '{{month | sqlsafe}}'
AND publishertype = 'Marketplace'
AND publishername like '%Red Hat%'
AND coalesce(date, usagedatetime) >= TIMESTAMP '{{date | sqlsafe}}'
AND coalesce(date, usagedatetime) < date_add('day', 1, TIMESTAMP '{{date | sqlsafe}}')
78 changes: 60 additions & 18 deletions koku/hcs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
LOG = logging.getLogger(__name__)

HCS_QUEUE = "hcs"
HCS_EXCEPTED_PROVIDERS = (
Provider.PROVIDER_AWS,
Provider.PROVIDER_AWS_LOCAL,
Provider.PROVIDER_AZURE,
Provider.PROVIDER_AZURE_LOCAL,
)

# any additional queues should be added to this list
QUEUE_LIST = [HCS_QUEUE]
Expand All @@ -44,7 +50,6 @@ def collect_hcs_report_data_from_manifest(reports_to_hcs_summarize):
Returns:
None
"""
reports = [report for report in reports_to_hcs_summarize if report]
reports_deduplicated = [dict(t) for t in {tuple(d.items()) for d in reports}]
Expand All @@ -53,7 +58,7 @@ def collect_hcs_report_data_from_manifest(reports_to_hcs_summarize):
start_date = None
end_date = None
if report.get("start") and report.get("end"):
LOG.info("using start and end dates from the manifest")
LOG.info("using start and end dates from the manifest for HCS processing")
start_date = parser.parse(report.get("start")).strftime("%Y-%m-%d")
end_date = parser.parse(report.get("end")).strftime("%Y-%m-%d")

Expand All @@ -63,37 +68,35 @@ def collect_hcs_report_data_from_manifest(reports_to_hcs_summarize):
tracing_id = report.get("tracing_id", report.get("manifest_uuid", str(uuid.uuid4())))

stmt = (
f"[collect_hcs_report_data_from_manifest] schema_name: {schema_name},"
f"[collect_hcs_report_data_from_manifest]:"
f" schema_name: {schema_name},"
f"provider_type: {provider_type},"
f"provider_uuid: {provider_uuid},"
f"start: {start_date},"
f"end: {end_date}"
)
LOG.debug(log_json(tracing_id, stmt))
LOG.info(log_json(tracing_id, stmt))

collect_hcs_report_data.s(
schema_name, provider_type, provider_uuid, start_date, end_date, tracing_id
).apply_async()


@celery_app.task(name="hcs.tasks.collect_hcs_report_data", queue=HCS_QUEUE)
def collect_hcs_report_data(schema_name, provider, provider_uuid, start_date=None, end_date=None, tracing_id=None):
def collect_hcs_report_data(
schema_name, provider, provider_uuid, start_date=None, end_date=None, tracing_id=None, finalize=False
):
"""Update Hybrid Committed Spend report.
:param provider: (str) The provider type
:param provider_uuid: (str) The provider type
:param provider_uuid: (str) The provider unique identification number
:param start_date: The date to start populating the table (default: (Today - 2 days))
:param end_date: The date to end on (default: Today)
:param schema_name: (Str) db schema name
:param tracing_id: (uuid) for log tracing
:param finalize: (boolean) If True run report finalization process for previous month(default: False)
:returns None
"""

# drop "-local" from provider name when in development environment
if "-local" in provider:
LOG.debug(log_json(tracing_id, "dropping '-local' from provider name"))
provider = provider.strip("-local")

if schema_name and not schema_name.startswith("acct"):
schema_name = f"acct{schema_name}"

Expand All @@ -106,24 +109,63 @@ def collect_hcs_report_data(schema_name, provider, provider_uuid, start_date=Non
if tracing_id is None:
tracing_id = str(uuid.uuid4())

if enable_hcs_processing(schema_name) and provider in (Provider.PROVIDER_AWS, Provider.PROVIDER_AZURE):
if enable_hcs_processing(schema_name) and provider in HCS_EXCEPTED_PROVIDERS:
stmt = (
f"Running HCS data collection: "
f"[collect_hcs_report_data]: "
f"schema_name: {schema_name}, "
f"provider_uuid: {provider_uuid}, "
f"provider: {provider}, "
f"provider_type: {provider}, "
f"dates {start_date} - {end_date}"
)
LOG.info(log_json(tracing_id, stmt))
reporter = ReportHCS(schema_name, provider, provider_uuid, tracing_id)
reporter.generate_report(start_date, end_date)
reporter.generate_report(start_date, end_date, finalize)

else:
stmt = (
f"[SKIPPED] HCS report generation: "
f"Schema-name: {schema_name}, "
f"provider: {provider}, "
f"Schema_name: {schema_name}, "
f"provider_type: {provider}, "
f"provider_uuid: {provider_uuid}, "
f"dates {start_date} - {end_date}"
)
LOG.info(log_json(tracing_id, stmt))


@celery_app.task(name="hcs.tasks.collect_hcs_report_finalization", queue=HCS_QUEUE)
def collect_hcs_report_finalization(tracing_id=None):
if tracing_id is None:
tracing_id = str(uuid.uuid4())

today = DateAccessor().today()

for excepted_provider in HCS_EXCEPTED_PROVIDERS:
LOG.debug(log_json(tracing_id, f"excepted_provider: {excepted_provider}"))

providers = Provider.objects.filter(type=excepted_provider).all()

for provider in providers:
schema_name = provider.customer.schema_name
provider_uuid = provider.uuid
provider_type = provider.type
end_date_prev_month = today.replace(day=1) - datetime.timedelta(days=1)
start_date_prev_month = today.replace(day=1) - datetime.timedelta(days=end_date_prev_month.day)

stmt = (
f"[collect_hcs_report_finalization]: "
f"schema_name: {schema_name}, "
f"provider_type: {provider_type}, "
f"provider_uuid: {provider_uuid}, "
f"dates: {start_date_prev_month} - {end_date_prev_month}"
)
LOG.info(log_json(tracing_id, stmt))

collect_hcs_report_data.s(
schema_name,
provider_type,
provider_uuid,
start_date_prev_month,
end_date_prev_month,
tracing_id,
True,
).apply_async()
28 changes: 22 additions & 6 deletions koku/hcs/test/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_get_report_dates(self, mock_report):
end_date = self.today
collect_hcs_report_data(self.schema, self.provider, self.provider_uuid, start_date, end_date)

self.assertIn("Running HCS data collection", _logs.output[0])
self.assertIn("[collect_hcs_report_data]", _logs.output[0])

def test_get_report_no_start_date(self, mock_report):
"""Test no start or end dates provided"""
Expand All @@ -52,7 +52,7 @@ def test_get_report_no_start_date(self, mock_report):
with self.assertLogs("hcs.tasks", "INFO") as _logs:
collect_hcs_report_data(self.schema, self.provider, self.provider_uuid)

self.assertIn("Running HCS data collection", _logs.output[0])
self.assertIn("[collect_hcs_report_data]", _logs.output[0])

def test_get_report_no_end_date(self, mock_report):
"""Test no start end provided"""
Expand All @@ -62,7 +62,7 @@ def test_get_report_no_end_date(self, mock_report):
start_date = self.yesterday
collect_hcs_report_data(self.schema, self.provider, self.provider_uuid, start_date)

self.assertIn("Running HCS data collection", _logs.output[0])
self.assertIn("[collect_hcs_report_data]", _logs.output[0])

def test_get_report_invalid_provider(self, mock_report):
"""Test invalid provider"""
Expand Down Expand Up @@ -98,7 +98,7 @@ def test_get_report_with_manifest(self, mock_report, rd):
}
]

with self.assertLogs("hcs.tasks", "DEBUG") as _logs:
with self.assertLogs("hcs.tasks", "INFO") as _logs:
collect_hcs_report_data_from_manifest(manifests)

self.assertIn("[collect_hcs_report_data_from_manifest]", _logs.output[0])
Expand All @@ -110,7 +110,7 @@ def test_get_report_with_manifest(self, mock_report, rd):

@patch("hcs.tasks.collect_hcs_report_data")
def test_get_report_with_manifest_and_dates(self, mock_report, rd):
"""Test invalid provider"""
"""Test HCS reports using manifest"""
from hcs.tasks import collect_hcs_report_data_from_manifest

manifests = [
Expand All @@ -125,5 +125,21 @@ def test_get_report_with_manifest_and_dates(self, mock_report, rd):

with self.assertLogs("hcs.tasks", "INFO") as _logs:
collect_hcs_report_data_from_manifest(manifests)
self.assertIn("using start and end dates from the manifest for HCS processing", _logs.output[0])

@patch("hcs.tasks.collect_hcs_report_data")
@patch("api.provider.models")
def test_get_collect_hcs_report_finalization(self, mock_report, rd, provider):
"""Test hcs finalization"""
from hcs.tasks import collect_hcs_report_finalization

provider.customer.schema_name.return_value = provider(side_effect=Provider.objects.filter(type="AWS"))

with self.assertLogs("hcs.tasks", "INFO") as _logs:
collect_hcs_report_finalization()

self.assertIn("using start and end dates from the manifest", _logs.output[0])
self.assertIn("[collect_hcs_report_finalization]:", _logs.output[0])
self.assertIn("schema_name:", _logs.output[0])
self.assertIn("provider_type:", _logs.output[0])
self.assertIn("provider_uuid:", _logs.output[0])
self.assertIn("dates:", _logs.output[0])
7 changes: 6 additions & 1 deletion koku/koku/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,13 @@ def readiness_check(self):
"schedule": crontab(hour=0, minute=0),
}

# Beat used for HCS report finalization
app.conf.beat_schedule["finalize_hcs_reports"] = {
"task": "hcs.tasks.collect_hcs_report_finalization",
"schedule": crontab(0, 0, day_of_month="15"),
}

# Celery timeout if broker is unavaiable to avoid blocking indefintely
# Celery timeout if broker is unavailable to avoid blocking indefinitely
app.conf.broker_transport_options = {"max_retries": 4, "interval_start": 0, "interval_step": 0.5, "interval_max": 3}

app.autodiscover_tasks()
Expand Down
Loading

0 comments on commit 8a245a1

Please sign in to comment.