Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: If several clusters are saving to one primary be able identify them uniquely #17

Open
wants to merge 1 commit into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions bin/pgaudit_analyze
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pgaudit_analyze [options] <pg-log-path>
--socket-path socket directory used by PostgreSQL (default to system default directory)
--log-file location of the log file for pgaudit_analyze (defaults to /var/log/pgaudit_analyze.log)
--user specify postgres user instead of using pgaudit_analyze invoker
--cluster-name identifier to use if logging from multiple clusters (ie several replicas to one primary)

General Options:
--help display usage and exit
Expand Down Expand Up @@ -121,14 +122,15 @@ my $iPort = 5432;
my $strSocketPath;
my $strLogOutFile = '/var/log/pgaudit_analyze.log';
my $strDbUser = getpwuid($<);

my $strClusterName;

GetOptions ('help' => \$bHelp,
'daemon' => \$bDaemon,
'port=s' => \$iPort,
'socket-path=s' => \$strSocketPath,
'log-file=s' => \$strLogOutFile,
'user=s' => \$strDbUser)
'user=s' => \$strDbUser,
'cluster-name=s' => \$strClusterName)
or pod2usage(2);

# Display version and exit if requested
Expand Down Expand Up @@ -296,9 +298,60 @@ sub databaseGet
" audit_type, class, command, object_type, object_name)\n" .
" values (?, ?, ?, ?, ?, ?, ?, ?, ?)");

$oDbHash{$strDatabaseName}{hSqlClusterExists} = $oDbHash{$strDatabaseName}{hDb}->prepare(
"select 1 FROM pg_tables WHERE schemaname = 'pgaudit' AND tablename='cluster'" );

$oDbHash{$strDatabaseName}{hSqlClusterSelect} = $oDbHash{$strDatabaseName}{hDb}->prepare(
"select cluster_id \n" .
"from pgaudit.cluster \n" .
"where cluster_name = ?");

$oDbHash{$strDatabaseName}{hSqlClusterInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
"insert into pgaudit.cluster (cluster_name) \n" .
"values (?)");
return true;
}

####################################################################################################################################
# clusterCheck
####################################################################################################################################
my %oClusterHash;

sub clusterCheck
{
my $strSessionId = shift;
my $strDatabaseName = shift;
if ($strClusterName)
{
if (!defined $oClusterHash{$strClusterName}) {
if (!defined $oClusterHash{$strClusterName})
{
if ($oDbHash{$strDatabaseName}{hSqlClusterExists}->execute() &&
$oDbHash{$strDatabaseName}{hSqlClusterExists}->fetchrow_array())
{
$oDbHash{$strDatabaseName}{hSqlClusterSelect}->execute($strClusterName);
($oClusterHash{$strClusterName}) = $oDbHash{$strDatabaseName}{hSqlClusterSelect}->fetchrow_array();
if (!defined $oClusterHash{$strClusterName})
{
print("Insert cluster_name '$strClusterName'\n");
$oDbHash{$strDatabaseName}{hSqlClusterInsert}->execute($strClusterName);
$oDbHash{$strDatabaseName}{hSqlClusterSelect}->execute($strClusterName);
($oClusterHash{$strClusterName}) = $oDbHash{$strDatabaseName}{hSqlClusterSelect}->fetchrow_array();
}
}
else
{#for backwards compatibilty for older pgaudit schema
print("Appears to be older version of schema, using cluster name '$strClusterName' as identifier in session_id\n");
# prepend '_' so will never collide with cluster_id if schema is upgraded in future
$oClusterHash{$strClusterName} = "_$strClusterName";
}
}
}
$strSessionId .= "." . $oClusterHash{$strClusterName};
}
return $strSessionId;
}

####################################################################################################################################
# sessionGet
####################################################################################################################################
Expand Down Expand Up @@ -692,6 +745,7 @@ while(!$bDone)

if (defined($strUserName) && $strAuditUserName ne $strUserName &&
defined($strDatabaseName) && databaseGet($strDatabaseName) &&
($strSessionId = clusterCheck($strSessionId, $strDatabaseName)) &&
(!defined($oSessionHash{$strSessionId}) || !defined($oSessionHash{$strSessionId}{session_line_num}) ||
$lSessionLineNum > $oSessionHash{$strSessionId}{session_line_num}))
{
Expand Down
41 changes: 32 additions & 9 deletions sql/audit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ begin
end $$;

-- Create pgaudit schema
create schema pgaudit authorization pgaudit_owner;
create schema if not exists pgaudit authorization pgaudit_owner;

-- Set session authorization so all schema objects are owned by pgaudit_owner
set session authorization pgaudit_owner;
Expand All @@ -57,8 +57,27 @@ grant usage
on schema pgaudit
to public;

-- Create cluster table to potentially track multiple clusters
create table if not exists pgaudit.cluster
(
cluster_id serial,
cluster_name text,
unique(cluster_name)
);

grant select,
insert,
update
on pgaudit.cluster
to pgaudit_etl;

grant select,
update
on pgaudit.cluster_cluster_id_seq
to pgaudit_etl;

-- Create session table to track user database sessions
create table pgaudit.session
create table if not exists pgaudit.session
(
session_id text not null,
process_id int not null,
Expand All @@ -80,7 +99,7 @@ grant select,
to pgaudit_etl;

-- Create logon table to track recent user login info
create table pgaudit.logon
create table if not exists pgaudit.logon
(
user_name text not null,
last_success timestamp with time zone,
Expand Down Expand Up @@ -122,7 +141,7 @@ $$ language plpgsql security definer;
grant execute on function pgaudit.logon_info() to public;

-- Create log_event table to track all events logged to the PostgreSQL log
create table pgaudit.log_event
create table if not exists pgaudit.log_event
(
session_id text not null
constraint logevent_sessionid_fk
Expand Down Expand Up @@ -154,7 +173,7 @@ grant select,
to pgaudit_etl;

-- Create audit_statment table to track all user statements logged by the pgaudit extension
create table pgaudit.audit_statement
create table if not exists pgaudit.audit_statement
(
session_id text not null
constraint auditstatement_sessionid_fk
Expand All @@ -179,7 +198,7 @@ grant select,
to pgaudit_etl;

-- Create audit_statment table to track all user sub-statements logged by the pgaudit extension
create table pgaudit.audit_substatement
create table if not exists pgaudit.audit_substatement
(
session_id text not null,
statement_id numeric not null,
Expand All @@ -200,7 +219,7 @@ grant select,
to pgaudit_etl;

-- Create audit_statment table to track all user sub-statement detail logged by the pgaudit extension
create table pgaudit.audit_substatement_detail
create table if not exists pgaudit.audit_substatement_detail
(
session_id text not null,
statement_id numeric not null,
Expand Down Expand Up @@ -233,8 +252,10 @@ grant select,
to pgaudit_etl;

-- Create vw_audit_event view to allow easy access to the pgaudit log entries
drop view if exists pgaudit.vw_audit_event ;
create view pgaudit.vw_audit_event as
select session.session_id,
select cluster.cluster_name,
session.session_id,
log_event.session_line_num,
log_event.log_time,
session.user_name,
Expand All @@ -260,6 +281,8 @@ select session.session_id,
and audit_substatement.substatement_id = audit_substatement_detail.substatement_id
inner join pgaudit.audit_statement
on audit_statement.session_id = audit_substatement_detail.session_id
and audit_statement.statement_id = audit_substatement_detail.statement_id;
and audit_statement.statement_id = audit_substatement_detail.statement_id
left join pgaudit.cluster
on cluster.cluster_id = substring(log_event.session_id from '^(?:[0-9a-f]+\.){2}([0-9]+)$')::int;

COMMIT;