diff --git a/.github/postgres/docker-compose.yaml b/.github/postgres/docker-compose.yaml index ed6dcb2631..cc08edbe4d 100644 --- a/.github/postgres/docker-compose.yaml +++ b/.github/postgres/docker-compose.yaml @@ -21,3 +21,15 @@ services: - autovacuum_vacuum_cost_delay=10 - -c - max_locks_per_transaction=72 + - -c + - shared_preload_libraries=pg_stat_statements + - -c + - pg_stat_statements.max=2000 + - -c + - pg_stat_statements.save=off + - -c + - pg_stat_statements.track_utility=off + - -c + - track_activity_query_size=2048 + - -c + - track_functions=pl diff --git a/koku/masu/api/db_performance/db_performance.py b/koku/masu/api/db_performance/db_performance.py index f68da2f6c9..cc644596b5 100644 --- a/koku/masu/api/db_performance/db_performance.py +++ b/koku/masu/api/db_performance/db_performance.py @@ -27,13 +27,13 @@ class DBPerformanceStats: - def __init__(self, username, configurator, application_name="database_performance_stats", database_ranking=[]): + def __init__(self, username, configurator, application_name="database_performance_stats"): self.conn = None self.username = username self.config = configurator + self.app_db_name = configurator.get_database_name() self.application_name = application_name self.read_only = True - self.database_ranking = database_ranking self._connect() def __enter__(self): @@ -90,24 +90,17 @@ def _execute(self, sql, params=None): def _prep_log_message(self, message): return f"USER:{self.username} {message}" - def _case_db_ordering_clause(self, database_name_col): - if not self.database_ranking: - return ("", {}) - - params = {} - case = [f"case {database_name_col}"] - for ix, dbname in enumerate(self.database_ranking): - ix_str = str(ix) - dbval_key = f"db_val_{ix_str}" - dbrank_key = f"db_rank_{ix_str}" - params[dbval_key] = dbname - params[dbrank_key] = ix_str - case.append(f" when %({dbval_key})s then %({dbrank_key})s") - case.append(" else %(def_case_val)s") - params["def_case_val"] = str(len(self.database_ranking)) - case.append(f"end::text || {database_name_col}") - - return (" ".join(case), params) + def get_databases(self): + sql = """ +select oid, + datname + from pg_database + where datname !~ '^template' + order + by datname +; +""" + return self._execute(sql).fetchall() def get_pg_settings(self, setting_names=None): params = {} @@ -201,7 +194,7 @@ def _validate_pg_stat_statements(self): ext_version = extn["extversion"] if ext_exists else None return (ext_exists, ext_version) - def get_statement_stats(self, limit=500, offset=None, records_per_db=100): + def get_statement_stats(self, dbname, limit=100, offset=None): params = {} has_pss, pss_ver = self._validate_pg_stat_statements() @@ -211,51 +204,30 @@ def get_statement_stats(self, limit=500, offset=None, records_per_db=100): limit_clause = self._handle_limit(limit, params) offset_clause = self._handle_offset(offset, params) col_name_sep = "_" if Decimal(pss_ver) < Decimal("1.8") else "_exec_" - rank_case, rank_params = self._case_db_ordering_clause("d.datname") - partition = f"partition by {rank_case}" if rank_case else "" - params.update(rank_params) - params["records_per_db"] = records_per_db + params["dbname"] = dbname sql = f""" -- STATEMENT STATISTICS -select "dbname", - "user", - calls, - rows, - min_exec_time, - mean_exec_time, - max_exec_time, - shared_blks_hit, - shared_blks_read, - local_blks_hit, - local_blks_read, - temp_blks_read, - temp_blks_written, - query - from ( - select row_number() over ({partition}) as "rec_by_db", - d.datname as "dbname", - r.rolname as "user", - s.calls, - s.rows, - s.min{col_name_sep}time as min_exec_time, - s.mean{col_name_sep}time as mean_exec_time, - s.max{col_name_sep}time as max_exec_time, - s.shared_blks_hit, - s.shared_blks_read, - s.local_blks_hit, - s.local_blks_read, - s.temp_blks_read, - s.temp_blks_written, - s.query - from public.pg_stat_statements s - join pg_database d - on d.oid = s.dbid - join pg_roles r - on r.oid = s.userid - where s.dbid is not null - and s.userid is not null - ) enumerated_query_stats -where "rec_by_db" <= %(records_per_db)s +select d.datname as "dbname", + r.rolname as "user", + s.calls, + s.rows, + s.min{col_name_sep}time as min_exec_time, + s.mean{col_name_sep}time as mean_exec_time, + s.max{col_name_sep}time as max_exec_time, + s.shared_blks_hit, + s.shared_blks_read, + s.local_blks_hit, + s.local_blks_read, + s.temp_blks_read, + s.temp_blks_written, + s.query + from public.pg_stat_statements s + join pg_database d + on d.oid = s.dbid + join pg_roles r + on r.oid = s.userid + where d.datname = %(dbname)s + and s.userid is not null {limit_clause} {offset_clause} ; @@ -263,13 +235,15 @@ def get_statement_stats(self, limit=500, offset=None, records_per_db=100): LOG.info(self._prep_log_message("requesting data from pg_stat_statements")) return self._execute(sql, params).fetchall() - def get_lock_info(self, limit=500, offset=None): + def get_lock_info(self, dbname, limit=25, offset=None): params = {} limit_clause = self._handle_limit(limit, params) offset_clause = self._handle_offset(offset, params) + params["dbname"] = dbname sql = f""" -- LOCK INFO QUERY -SELECT blocking_locks.pid::int AS blocking_pid, +SELECT count(blocking_locks.pid) over (partition by blocking_locks.pid) as blocking_pid_weight, + blocking_locks.pid::int AS blocking_pid, blocking_activity.usename::text AS blocking_user, blocked_locks.pid::int AS blocked_pid, blocked_activity.usename::text AS blocked_user, @@ -293,6 +267,10 @@ def get_lock_info(self, limit=500, offset=None): JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid WHERE NOT blocked_locks.granted + AND blocking_activity.datname = %(dbname)s + ORDER + BY "blocking_pid_weight" desc, + blocking_locks.pid {limit_clause} {offset_clause} ; @@ -302,63 +280,41 @@ def get_lock_info(self, limit=500, offset=None): return res - def get_activity(self, pid=[], state=[], include_self=False, limit=500, offset=None, records_per_db=100): + def get_activity(self, dbname, pid=[], state=[], limit=100, offset=None): params = {} - conditions = ["datname is not null", "usename is not null"] + params["dbname"] = dbname + conditions = ["datname = %(dbname)s", "usename is not null"] if pid: - include_self = True + if not isinstance(pid, list): + pid = [pid] conditions.append("pid = any(%(pid)s::oid[]) ") params["pid"] = pid if state: + if not isinstance(state, list): + state = [state] conditions.append("state = any(%(state)s::text[]) ") params["state"] = state - if not include_self: - conditions.append("pid != %(mypid)s ") - params["mypid"] = self.conn.get_backend_pid() - where_clause = f" where {f'{os.linesep} and '.join(conditions)}" + where_clause = f" where {f'{os.linesep} and '.join(conditions)}" limit_clause = self._handle_limit(limit, params) offset_clause = self._handle_offset(offset, params) - rank_case, rank_params = self._case_db_ordering_clause("datname") - rank_sep = "," if rank_case else "" - params.update(rank_params) - params["records_per_db"] = records_per_db sql = f""" -- CONNECTION ACTIVITY QUERY -select "dbname", - "user", - "backend_pid", - "app_name", - "client_ip", +select datname as "dbname", + usename as "user", + pid as "backend_pid", + application_name as "app_name", + client_addr as "client_ip", backend_start, xact_start, query_start, state_change, - "wait_type_event", + '('::text || wait_event_type || ') '::text || wait_event as "wait_type_event", state, - "active_time", + case when state = 'active' then now() - query_start else null end::text as "active_time", query - from ( - select row_number() over (partition by case state when 'active' then '0' when 'idle in transaction' then '1' else '2' end::text || state{rank_sep} - {rank_case} - order by coalesce(extract(epoch from now() - query_start), 0) desc) as "rec_by_db", - datname as "dbname", - usename as "user", - pid as "backend_pid", - application_name as "app_name", - client_addr as "client_ip", - backend_start, - xact_start, - query_start, - state_change, - '('::text || wait_event_type || ') '::text || wait_event as "wait_type_event", - state, - case when state = 'active' then now() - query_start else null end::text as "active_time", - query - from pg_stat_activity - {where_clause} - ) enumerated_stats - where rec_by_db <= %(records_per_db)s + from pg_stat_activity +{where_clause} {limit_clause} {offset_clause} ; @@ -392,6 +348,65 @@ def explain_sql(self, raw_sql): return res + def get_schema_sizes(self, top=0, limit=100, offset=None): + # This will ONLY run against the primary app database at this time. + params = {"db_gb": Decimal(1024**3), "top": top} + limit_clause = self._handle_limit(limit, params) + offset_clause = self._handle_offset(offset, params) + if top > 0: + LOG.debug(f"Getting schema size and top {top} tables") + sql = f""" +select schema_name, + table_name, + round(table_size::numeric / %(db_gb)s::numeric, 10) as "table_size_gb", + round(schema_size::numeric / %(db_gb)s::numeric, 10) as "schema_size_gb" + from ( + select row_number() over (partition by n.oid order by pg_total_relation_size(t.oid) desc) as rownum, + n.nspname as "schema_name", + t.relname as "table_name", + pg_total_relation_size(t.oid) as "table_size", + sum(pg_total_relation_size(t.oid)) over (partition by n.nspname) as "schema_size" + from pg_namespace n + join pg_class t + on t.relnamespace = n.oid + and t.relkind in ('r', 'p') + where n.nspname not in ('pg_catalog','information_schema') + ) as raw_object_sizes + where "rownum" <= %(top)s + order + by "schema_size_gb" desc, + schema_name, + "table_size_gb" desc, + table_name +{limit_clause} +{offset_clause} +; +""" + else: + LOG.debug("Getting schema size only") + sql = f""" +select schema_name, + round(schema_size::numeric / %(db_gb)s::numeric, 10) as "schema_size_gb" + from ( + select n.nspname as "schema_name", + sum(pg_total_relation_size(t.oid)) as schema_size + from pg_namespace n + join pg_class t + on t.relnamespace = n.oid + and t.relkind in ('r', 'p') + where n.nspname not in ('pg_catalog','information_schema') + group + by n.nspname + ) as raw_object_sizes + order + by "schema_size_gb" desc, + schema_name +{limit_clause} +{offset_clause} +; +""" + return self._execute(sql, params).fetchall() + # def terminate_cancel_backends(self, backends=[], action_type=None): # if not backends: # return None diff --git a/koku/masu/api/db_performance/dbp_views.py b/koku/masu/api/db_performance/dbp_views.py index 7b45cdad9b..37e209194a 100644 --- a/koku/masu/api/db_performance/dbp_views.py +++ b/koku/masu/api/db_performance/dbp_views.py @@ -5,7 +5,9 @@ import json import logging import os +import threading from decimal import Decimal +from urllib.parse import urlencode from django.http import HttpResponse from django.shortcuts import redirect @@ -29,17 +31,30 @@ MY_PATH = os.path.dirname(os.path.abspath(__file__)) TEMPLATE_PATH = os.path.join(MY_PATH, "templates") -DATABASE_RANKING = [CONFIGURATOR.get_database_name()] +DBNAME_LOCK = threading.Lock() +APPLICATION_DBNAME = CONFIGURATOR.get_database_name() +ALL_DATABASES = [] # CANCEL_URL = 0 # TERMINATE_URL = 1 +def get_database_list(dbps): + with DBNAME_LOCK: + if not ALL_DATABASES: + ALL_DATABASES.append(APPLICATION_DBNAME) + for rec in dbps.get_databases(): + if rec["datname"] != APPLICATION_DBNAME: + ALL_DATABASES.append(rec["datname"]) + + return ALL_DATABASES + + def get_limit_offset(request): try: - limit = int(request.query_params.get("limit", "500")) + limit = int(request.query_params.get("limit", "100")) except (TypeError, ValueError): - limit = 500 + limit = 100 try: offset = int(request.query_params.get("offset")) @@ -49,6 +64,10 @@ def get_limit_offset(request): return limit, offset +def get_selected_db(request): + return request.query_params.get("dbname", CONFIGURATOR.get_database_name()) + + def get_parameter_list(request, param_name, default=None, sep=","): qp = request.query_params @@ -87,6 +106,7 @@ def get_menu(curr_url_name): ("conn_activity", "Connection Activity"), ("stmt_stats", "Statement Statistics"), ("lock_info", "Lock Information"), + ("schema_sizes", "Schema Sizes"), ("explain_query", "Explain Query"), ) menu_elements = ['