Skip to content

Commit

Permalink
refactor(bigquery): add table name to filter logadmin query (#357)
Browse files Browse the repository at this point in the history
* refactor(bigquery): add table name to filter logadmin query and do log collection per table instead

* feat(bigquery): increase coverage
  • Loading branch information
mabdh authored Jun 7, 2022
1 parent 4dba2f7 commit f373bb2
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 88 deletions.
40 changes: 25 additions & 15 deletions plugins/extractors/bigquery/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
`resource.type="bigquery_resource" AND NOT ` +
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
`timestamp >= "%s" AND timestamp < "%s"`
`timestamp >= "%s" AND timestamp < "%s" AND %s`

type AuditLog struct {
logger log.Logger
Expand All @@ -40,17 +40,22 @@ func New(logger log.Logger) *AuditLog {
}
}

func (l *AuditLog) Init(ctx context.Context, cfg Config) (err error) {
if len(cfg.UsageProjectIDs) == 0 {
cfg.UsageProjectIDs = []string{cfg.ProjectID}
func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
for _, opt := range opts {
opt(l)
}
l.config = cfg
l.client, err = l.createClient(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create logadmin client")
return

if len(l.config.UsageProjectIDs) == 0 {
l.config.UsageProjectIDs = []string{l.config.ProjectID}
}

if l.client == nil {
l.client, err = l.createClient(ctx)
if err != nil {
err = errors.Wrap(err, "failed to create logadmin client")
return
}
}
return
}

Expand All @@ -68,10 +73,15 @@ func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, e
return
}

func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err error) {
func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) {
if l.client == nil {
err = errors.New("auditlog client is nil")
return
}

tableStats = NewTableStats()

filter := l.buildFilter()
filter := l.buildFilter(tableID)
it := l.client.Entries(ctx,
logadmin.ProjectIDs(l.config.UsageProjectIDs),
logadmin.Filter(filter))
Expand All @@ -91,7 +101,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err

logData, errF := parsePayload(entry.Payload)
if errF != nil {
l.logger.Warn("error parsing LogEntry payload", "err", errF, "payload", entry.Payload)
l.logger.Warn("error parsing LogEntry payload", "err", errF)
continue
}

Expand All @@ -103,7 +113,7 @@ func (l *AuditLog) Collect(ctx context.Context) (tableStats *TableStats, err err
return
}

func (l *AuditLog) buildFilter() string {
func (l *AuditLog) buildFilter(tableID string) string {

timeNow := time.Now().UTC()
dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour
Expand All @@ -112,7 +122,7 @@ func (l *AuditLog) buildFilter() string {
timeNowFormatted := timeNow.Format(time.RFC3339)
timeFromFormatted := timeFrom.Format(time.RFC3339)

return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted)
return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID)
}

func parsePayload(payload interface{}) (ld *LogData, err error) {
Expand All @@ -125,7 +135,7 @@ func parsePayload(payload interface{}) (ld *LogData, err error) {
}

if errPB := getAuditData(pl, ad); errPB != nil {
err = errors.Wrap(err, "failed to get audit data from metadata")
err = errors.Wrap(errPB, "failed to get audit data from metadata")
return
}

Expand Down
78 changes: 56 additions & 22 deletions plugins/extractors/bigquery/auditlog/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"cloud.google.com/go/logging/logadmin"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/test/utils"
"github.com/stretchr/testify/assert"
Expand All @@ -15,29 +16,51 @@ import (
)

func TestInit(t *testing.T) {
t.Run("should return error if failed to init client", func(t *testing.T) {
t.Run("should return error if config is wrong to init client", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, Config{
ProjectID: "---",
ServiceAccountJSON: "---",
})
err := la.Init(ctx,
InitWithConfig(Config{
ProjectID: "---",
ServiceAccountJSON: "---",
}),
)

assert.EqualError(t, err, "failed to create logadmin client: client is nil, failed initiating client")
})

t.Run("should not return error if init client is success", func(t *testing.T) {
t.Run("should not return error invalid config if config is not wrong", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, Config{})
err := la.Init(ctx)

assert.NotEqual(t, plugins.InvalidConfigError{}, err)
})

t.Run("should return no error init succeed", func(t *testing.T) {
la := New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := la.Init(ctx, InitWithClient(&logadmin.Client{}))

assert.Nil(t, err)
})
}

func TestBuildFilter(t *testing.T) {
var (
la = &AuditLog{}
tableID = "table-id"
)

filterQuery := la.buildFilter(tableID)

assert.Contains(t, filterQuery, tableID)
}

func TestGetAuditData(t *testing.T) {
func TestParsePayload(t *testing.T) {
t.Run("should parse with service data if service data exists", func(t *testing.T) {
loggingData, err := anypb.New(&loggingpb.AuditData{
JobCompletedEvent: &loggingpb.JobCompletedEvent{
Expand All @@ -62,37 +85,48 @@ func TestGetAuditData(t *testing.T) {
ServiceData: loggingData,
}

auditData := &loggingpb.AuditData{}
err = getAuditData(auditLog, auditData)
ld, err := parsePayload(auditLog)

assert.Nil(t, err)
assert.NotNil(t, auditData)
assert.NotEmpty(t, auditData)
assert.NotNil(t, ld.AuditData)
assert.NotEmpty(t, ld.AuditData)
})

t.Run("should parse with metadata if service data not exists and metadata exist", func(t *testing.T) {
loggingData, err := structpb.NewStruct(map[string]interface{}{
"jobCompletedEvent": nil,
"jobCompletedEvent": map[string]interface{}{
"event_name": "name",
"job": map[string]interface{}{
"job_statistics": map[string]interface{}{
"referenced_tables": []interface{}{map[string]interface{}{
"project_id": "project_id",
}},
},
"job_status": map[string]interface{}{
"state": "DONE",
},
},
},
})

require.Nil(t, err)

auditLog := &auditpb.AuditLog{
Metadata: loggingData,
}

auditData := &loggingpb.AuditData{}
err = getAuditData(auditLog, auditData)
ld, err := parsePayload(auditLog)

assert.Nil(t, err)
assert.NotNil(t, auditData)
assert.NotEmpty(t, auditData)
assert.NotNil(t, ld.AuditData)
assert.NotEmpty(t, ld.AuditData)
})

t.Run("should return error if neither service data nor metadata field exist", func(t *testing.T) {
auditLog := &auditpb.AuditLog{}

auditData := &loggingpb.AuditData{}
err := getAuditData(auditLog, auditData)
assert.EqualError(t, err, "metadata field is nil")
assert.Empty(t, auditData)
ld, err := parsePayload(auditLog)

assert.EqualError(t, err, "failed to get audit data from metadata: metadata field is nil")
assert.Nil(t, ld)
})
}
17 changes: 17 additions & 0 deletions plugins/extractors/bigquery/auditlog/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package auditlog

import "cloud.google.com/go/logging/logadmin"

type InitOption func(*AuditLog)

func InitWithClient(c *logadmin.Client) func(*AuditLog) {
return func(al *AuditLog) {
al.client = c
}
}

func InitWithConfig(cfg Config) func(*AuditLog) {
return func(al *AuditLog) {
al.config = cfg
}
}
45 changes: 24 additions & 21 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ usage_period_in_day: 7`

// Extractor manages the communication with the bigquery service
type Extractor struct {
logger log.Logger
client *bigquery.Client
config Config
galClient *auditlog.AuditLog
tableStats *auditlog.TableStats
logger log.Logger
client *bigquery.Client
config Config
galClient *auditlog.AuditLog
}

func New(logger log.Logger) *Extractor {
Expand Down Expand Up @@ -104,13 +103,15 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})
}

if e.config.IsCollectTableUsage {
errL := e.galClient.Init(ctx, auditlog.Config{
ProjectID: e.config.ProjectID,
ServiceAccountJSON: e.config.ServiceAccountJSON,
IsCollectTableUsage: e.config.IsCollectTableUsage,
UsagePeriodInDay: e.config.UsagePeriodInDay,
UsageProjectIDs: e.config.UsageProjectIDs,
})
errL := e.galClient.Init(ctx,
auditlog.InitWithConfig(auditlog.Config{
ProjectID: e.config.ProjectID,
ServiceAccountJSON: e.config.ServiceAccountJSON,
IsCollectTableUsage: e.config.IsCollectTableUsage,
UsagePeriodInDay: e.config.UsagePeriodInDay,
UsageProjectIDs: e.config.UsageProjectIDs,
}),
)
if errL != nil {
e.logger.Error("failed to create google audit log client", "err", errL)
}
Expand All @@ -121,14 +122,6 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
ts, errL := e.galClient.Collect(ctx)
e.tableStats = ts
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
}

// Fetch and iterate over datasets
it := e.client.Datasets(ctx)
Expand Down Expand Up @@ -181,10 +174,20 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit

// Build the bigquery table metadata
func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) *assetsv1beta1.Table {
var tableStats *auditlog.TableStats
if e.config.IsCollectTableUsage {
// Fetch and extract logs first to build a map
var errL error
tableStats, errL = e.galClient.Collect(ctx, t.TableID)
if errL != nil {
e.logger.Warn("error populating table stats usage", "error", errL)
}
}

tableFQN := t.FullyQualifiedName()
tableURN := models.TableURN("bigquery", t.ProjectID, t.DatasetID, t.TableID)

tableProfile := e.buildTableProfile(tableURN)
tableProfile := e.buildTableProfile(tableURN, tableStats)

var partitionField string
if md.TimePartitioning != nil {
Expand Down
15 changes: 9 additions & 6 deletions plugins/extractors/bigquery/profile.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package bigquery

import assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1"
import (
assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1"
"github.com/odpf/meteor/plugins/extractors/bigquery/auditlog"
)

func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableProfile) {
func (e *Extractor) buildTableProfile(tableURN string, tableStats *auditlog.TableStats) (tp *assetsv1beta1.TableProfile) {
var tableUsage int64
var commonJoins []*assetsv1beta1.Join
var filterConditions []string

if e.config.IsCollectTableUsage && e.tableStats != nil {
if e.config.IsCollectTableUsage && tableStats != nil {
// table usage
tableUsage = e.tableStats.TableUsage[tableURN]
tableUsage = tableStats.TableUsage[tableURN]

// common join
if jdMapping, exist := e.tableStats.JoinDetail[tableURN]; exist {
if jdMapping, exist := tableStats.JoinDetail[tableURN]; exist {
for joinedTableURN, jd := range jdMapping {
var joinConditions []string
for jc := range jd.Conditions {
Expand All @@ -27,7 +30,7 @@ func (e *Extractor) buildTableProfile(tableURN string) (tp *assetsv1beta1.TableP
}

// filter conditions
if filterMapping, exist := e.tableStats.FilterConditions[tableURN]; exist {
if filterMapping, exist := tableStats.FilterConditions[tableURN]; exist {
for filterExpression := range filterMapping {
filterConditions = append(filterConditions, filterExpression)
}
Expand Down
Loading

0 comments on commit f373bb2

Please sign in to comment.