Skip to content

Commit

Permalink
DB Performance: Updates and bugfixes and new features (project-koku#3784
Browse files Browse the repository at this point in the history
)

* Updates and bugfixes and new features

* include public and template0 in sizes page

* Fix test args
  • Loading branch information
Red-HAP authored Sep 28, 2022
1 parent f0fcf4c commit cb25cb0
Show file tree
Hide file tree
Showing 11 changed files with 645 additions and 211 deletions.
12 changes: 12 additions & 0 deletions .github/postgres/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,15 @@ services:
- autovacuum_vacuum_cost_delay=10
- -c
- max_locks_per_transaction=72
- -c
- shared_preload_libraries=pg_stat_statements
- -c
- pg_stat_statements.max=2000
- -c
- pg_stat_statements.save=off
- -c
- pg_stat_statements.track_utility=off
- -c
- track_activity_query_size=2048
- -c
- track_functions=pl
225 changes: 120 additions & 105 deletions koku/masu/api/db_performance/db_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@


class DBPerformanceStats:
def __init__(self, username, configurator, application_name="database_performance_stats", database_ranking=[]):
def __init__(self, username, configurator, application_name="database_performance_stats"):
self.conn = None
self.username = username
self.config = configurator
self.app_db_name = configurator.get_database_name()
self.application_name = application_name
self.read_only = True
self.database_ranking = database_ranking
self._connect()

def __enter__(self):
Expand Down Expand Up @@ -90,24 +90,17 @@ def _execute(self, sql, params=None):
def _prep_log_message(self, message):
return f"USER:{self.username} {message}"

def _case_db_ordering_clause(self, database_name_col):
if not self.database_ranking:
return ("", {})

params = {}
case = [f"case {database_name_col}"]
for ix, dbname in enumerate(self.database_ranking):
ix_str = str(ix)
dbval_key = f"db_val_{ix_str}"
dbrank_key = f"db_rank_{ix_str}"
params[dbval_key] = dbname
params[dbrank_key] = ix_str
case.append(f" when %({dbval_key})s then %({dbrank_key})s")
case.append(" else %(def_case_val)s")
params["def_case_val"] = str(len(self.database_ranking))
case.append(f"end::text || {database_name_col}")

return (" ".join(case), params)
def get_databases(self):
sql = """
select oid,
datname
from pg_database
where datname !~ '^template'
order
by datname
;
"""
return self._execute(sql).fetchall()

def get_pg_settings(self, setting_names=None):
params = {}
Expand Down Expand Up @@ -201,7 +194,7 @@ def _validate_pg_stat_statements(self):
ext_version = extn["extversion"] if ext_exists else None
return (ext_exists, ext_version)

def get_statement_stats(self, limit=500, offset=None, records_per_db=100):
def get_statement_stats(self, dbname, limit=100, offset=None):
params = {}

has_pss, pss_ver = self._validate_pg_stat_statements()
Expand All @@ -211,65 +204,46 @@ def get_statement_stats(self, limit=500, offset=None, records_per_db=100):
limit_clause = self._handle_limit(limit, params)
offset_clause = self._handle_offset(offset, params)
col_name_sep = "_" if Decimal(pss_ver) < Decimal("1.8") else "_exec_"
rank_case, rank_params = self._case_db_ordering_clause("d.datname")
partition = f"partition by {rank_case}" if rank_case else ""
params.update(rank_params)
params["records_per_db"] = records_per_db
params["dbname"] = dbname
sql = f"""
-- STATEMENT STATISTICS
select "dbname",
"user",
calls,
rows,
min_exec_time,
mean_exec_time,
max_exec_time,
shared_blks_hit,
shared_blks_read,
local_blks_hit,
local_blks_read,
temp_blks_read,
temp_blks_written,
query
from (
select row_number() over ({partition}) as "rec_by_db",
d.datname as "dbname",
r.rolname as "user",
s.calls,
s.rows,
s.min{col_name_sep}time as min_exec_time,
s.mean{col_name_sep}time as mean_exec_time,
s.max{col_name_sep}time as max_exec_time,
s.shared_blks_hit,
s.shared_blks_read,
s.local_blks_hit,
s.local_blks_read,
s.temp_blks_read,
s.temp_blks_written,
s.query
from public.pg_stat_statements s
join pg_database d
on d.oid = s.dbid
join pg_roles r
on r.oid = s.userid
where s.dbid is not null
and s.userid is not null
) enumerated_query_stats
where "rec_by_db" <= %(records_per_db)s
select d.datname as "dbname",
r.rolname as "user",
s.calls,
s.rows,
s.min{col_name_sep}time as min_exec_time,
s.mean{col_name_sep}time as mean_exec_time,
s.max{col_name_sep}time as max_exec_time,
s.shared_blks_hit,
s.shared_blks_read,
s.local_blks_hit,
s.local_blks_read,
s.temp_blks_read,
s.temp_blks_written,
s.query
from public.pg_stat_statements s
join pg_database d
on d.oid = s.dbid
join pg_roles r
on r.oid = s.userid
where d.datname = %(dbname)s
and s.userid is not null
{limit_clause}
{offset_clause}
;
"""
LOG.info(self._prep_log_message("requesting data from pg_stat_statements"))
return self._execute(sql, params).fetchall()

def get_lock_info(self, limit=500, offset=None):
def get_lock_info(self, dbname, limit=25, offset=None):
params = {}
limit_clause = self._handle_limit(limit, params)
offset_clause = self._handle_offset(offset, params)
params["dbname"] = dbname
sql = f"""
-- LOCK INFO QUERY
SELECT blocking_locks.pid::int AS blocking_pid,
SELECT count(blocking_locks.pid) over (partition by blocking_locks.pid) as blocking_pid_weight,
blocking_locks.pid::int AS blocking_pid,
blocking_activity.usename::text AS blocking_user,
blocked_locks.pid::int AS blocked_pid,
blocked_activity.usename::text AS blocked_user,
Expand All @@ -293,6 +267,10 @@ def get_lock_info(self, limit=500, offset=None):
JOIN pg_catalog.pg_stat_activity blocking_activity
ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted
AND blocking_activity.datname = %(dbname)s
ORDER
BY "blocking_pid_weight" desc,
blocking_locks.pid
{limit_clause}
{offset_clause}
;
Expand All @@ -302,63 +280,41 @@ def get_lock_info(self, limit=500, offset=None):

return res

def get_activity(self, pid=[], state=[], include_self=False, limit=500, offset=None, records_per_db=100):
def get_activity(self, dbname, pid=[], state=[], limit=100, offset=None):
params = {}

conditions = ["datname is not null", "usename is not null"]
params["dbname"] = dbname
conditions = ["datname = %(dbname)s", "usename is not null"]
if pid:
include_self = True
if not isinstance(pid, list):
pid = [pid]
conditions.append("pid = any(%(pid)s::oid[]) ")
params["pid"] = pid
if state:
if not isinstance(state, list):
state = [state]
conditions.append("state = any(%(state)s::text[]) ")
params["state"] = state
if not include_self:
conditions.append("pid != %(mypid)s ")
params["mypid"] = self.conn.get_backend_pid()
where_clause = f" where {f'{os.linesep} and '.join(conditions)}"
where_clause = f" where {f'{os.linesep} and '.join(conditions)}"
limit_clause = self._handle_limit(limit, params)
offset_clause = self._handle_offset(offset, params)
rank_case, rank_params = self._case_db_ordering_clause("datname")
rank_sep = "," if rank_case else ""
params.update(rank_params)
params["records_per_db"] = records_per_db
sql = f"""
-- CONNECTION ACTIVITY QUERY
select "dbname",
"user",
"backend_pid",
"app_name",
"client_ip",
select datname as "dbname",
usename as "user",
pid as "backend_pid",
application_name as "app_name",
client_addr as "client_ip",
backend_start,
xact_start,
query_start,
state_change,
"wait_type_event",
'('::text || wait_event_type || ') '::text || wait_event as "wait_type_event",
state,
"active_time",
case when state = 'active' then now() - query_start else null end::text as "active_time",
query
from (
select row_number() over (partition by case state when 'active' then '0' when 'idle in transaction' then '1' else '2' end::text || state{rank_sep}
{rank_case}
order by coalesce(extract(epoch from now() - query_start), 0) desc) as "rec_by_db",
datname as "dbname",
usename as "user",
pid as "backend_pid",
application_name as "app_name",
client_addr as "client_ip",
backend_start,
xact_start,
query_start,
state_change,
'('::text || wait_event_type || ') '::text || wait_event as "wait_type_event",
state,
case when state = 'active' then now() - query_start else null end::text as "active_time",
query
from pg_stat_activity
{where_clause}
) enumerated_stats
where rec_by_db <= %(records_per_db)s
from pg_stat_activity
{where_clause}
{limit_clause}
{offset_clause}
;
Expand Down Expand Up @@ -392,6 +348,65 @@ def explain_sql(self, raw_sql):

return res

def get_schema_sizes(self, top=0, limit=100, offset=None):
# This will ONLY run against the primary app database at this time.
params = {"db_gb": Decimal(1024**3), "top": top}
limit_clause = self._handle_limit(limit, params)
offset_clause = self._handle_offset(offset, params)
if top > 0:
LOG.debug(f"Getting schema size and top {top} tables")
sql = f"""
select schema_name,
table_name,
round(table_size::numeric / %(db_gb)s::numeric, 10) as "table_size_gb",
round(schema_size::numeric / %(db_gb)s::numeric, 10) as "schema_size_gb"
from (
select row_number() over (partition by n.oid order by pg_total_relation_size(t.oid) desc) as rownum,
n.nspname as "schema_name",
t.relname as "table_name",
pg_total_relation_size(t.oid) as "table_size",
sum(pg_total_relation_size(t.oid)) over (partition by n.nspname) as "schema_size"
from pg_namespace n
join pg_class t
on t.relnamespace = n.oid
and t.relkind in ('r', 'p')
where n.nspname not in ('pg_catalog','information_schema')
) as raw_object_sizes
where "rownum" <= %(top)s
order
by "schema_size_gb" desc,
schema_name,
"table_size_gb" desc,
table_name
{limit_clause}
{offset_clause}
;
"""
else:
LOG.debug("Getting schema size only")
sql = f"""
select schema_name,
round(schema_size::numeric / %(db_gb)s::numeric, 10) as "schema_size_gb"
from (
select n.nspname as "schema_name",
sum(pg_total_relation_size(t.oid)) as schema_size
from pg_namespace n
join pg_class t
on t.relnamespace = n.oid
and t.relkind in ('r', 'p')
where n.nspname not in ('pg_catalog','information_schema')
group
by n.nspname
) as raw_object_sizes
order
by "schema_size_gb" desc,
schema_name
{limit_clause}
{offset_clause}
;
"""
return self._execute(sql, params).fetchall()

# def terminate_cancel_backends(self, backends=[], action_type=None):
# if not backends:
# return None
Expand Down
Loading

0 comments on commit cb25cb0

Please sign in to comment.