Skip to content

Commit

Permalink
Merge pull request #63 from Snowflake-Labs/refactor-alert-processor
Browse files Browse the repository at this point in the history
Refactor alert_processor proc to be based on the ECS correlation logic
  • Loading branch information
sfc-gh-nlele authored Apr 11, 2023
2 parents 82b2944 + c1d02b2 commit d0e9159
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 60 deletions.
31 changes: 27 additions & 4 deletions procedures.tf
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ resource "snowflake_procedure" "alerts_merge" {
]
}

resource "snowflake_procedure" "alert_processor" {
resource "snowflake_procedure" "alert_processor_with_default_correlation_period" {
provider = snowflake.security_alerting_role

database = local.snowalert_database_name
Expand All @@ -83,10 +83,33 @@ resource "snowflake_procedure" "alert_processor" {
local.results_schema,
local.alerts_table,
])
data_alerts_view = join(".", [
})

depends_on = [
module.snowalert_grants
]
}

resource "snowflake_procedure" "alert_processor_with_custom_correlation_period" {
provider = snowflake.security_alerting_role

database = local.snowalert_database_name
schema = local.results_schema
name = "ALERT_PROCESSOR"
language = "JAVASCRIPT"

arguments {
name = "correlation_period_minutes"
type = "VARCHAR"
}

return_type = "VARIANT"
execute_as = "CALLER"
statement = templatefile("${path.module}/procedures_js/alert_processor.js", {
results_alerts_table = join(".", [
local.snowalert_database_name,
local.data_schema,
snowflake_view.alerts.name,
local.results_schema,
local.alerts_table,
])
})

Expand Down
131 changes: 75 additions & 56 deletions procedures_js/alert_processor.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// args
//args
var CORRELATION_PERIOD_MINUTES

CORRELATION_PERIOD_MINUTES = CORRELATION_PERIOD_MINUTES || 60
CORRELATION_PERIOD_MINUTES = CORRELATION_PERIOD_MINUTES || -60

var alert_correlation_result_array = []

// library
function exec(sqlText, binds = []) {
Expand All @@ -24,60 +26,77 @@ function exec(sqlText, binds = []) {
return retval
}

CORRELATE = `
MERGE INTO ${results_alerts_table} dst
USING (
SELECT
d.id alert_id_to_update,
COALESCE(
p.correlation_id,
d.correlation_id,
UUID_STRING()
) correlation_id
FROM ${data_alerts_view} d -- destination
LEFT OUTER JOIN ${data_alerts_view} p -- potential chain
ON (
d.id != p.id
AND (
d.alert_time > p.alert_time
OR (
d.alert_time = p.alert_time
AND d.id > p.id
)
)
AND p.alert_time > d.alert_time - INTERVAL '1 hour'
AND p.correlation_id IS NOT NULL
AND p.actor = d.actor
AND (
p.object = d.object
OR p.action = d.action
)
)
WHERE d.suppressed = FALSE
AND d.alert_time > CURRENT_TIMESTAMP - INTERVAL '$${CORRELATION_PERIOD_MINUTES} minutes'
QUALIFY 1=ROW_NUMBER() OVER (
PARTITION BY d.id -- one update per destination id
ORDER BY -- most recent wins
p.alert_time DESC, p.id DESC
)
) src
ON (
dst.alert:ALERT_ID = src.alert_id_to_update
AND (
dst.correlation_id IS NULL
OR dst.correlation_id != src.correlation_id
)
)
WHEN MATCHED THEN UPDATE SET
correlation_id = src.correlation_id
GET_CORRELATED_ALERT = `
SELECT correlation_id
FROM ${results_alerts_table}
WHERE alert:ACTOR = ?
AND (alert:OBJECT::STRING = ? OR alert:ACTION::STRING = ?)
AND correlation_id IS NOT NULL
AND NOT IS_NULL_VALUE(alert:ACTOR)
AND suppressed = FALSE
AND event_time > DATEADD(minutes, $${CORRELATION_PERIOD_MINUTES}, ?)
ORDER BY event_time DESC
LIMIT 1
`

function find_related_correlation_id(alert) {
if (
'ACTOR' in alert == false ||
'OBJECT' in alert == false ||
'ACTION' in alert == false ||
'EVENT_TIME' in alert == false
) {
return null
}

actor = alert['ACTOR']
object = alert['OBJECT']
action = alert['ACTION']
time = alert['EVENT_TIME']

if (object instanceof Array) {
o = object.join('","')
object = `["$${o}"]`
}
if (action instanceof Array) {
o = action.join('","')
action = `["$${o}"]`
}

match = exec(GET_CORRELATED_ALERT, [actor, object, action, time])[0] || {}

return match['CORRELATION_ID'] || null
}

GET_ALERTS_WITHOUT_CORRELATION_ID = `
SELECT *
FROM ${results_alerts_table}
WHERE correlation_id IS NULL
AND suppressed = FALSE
AND alert_time > DATEADD(hour, -2, CURRENT_TIMESTAMP())
`
var n,
results = []
do {
n = exec(CORRELATE)[0]['number of rows updated']
results.push(n)
} while (n != 0)

return {
ROWS_UPDATED: results,
UPDATE_ALERT_CORRELATION_ID = `
UPDATE ${results_alerts_table}
SET correlation_id = COALESCE(?, UUID_STRING())
WHERE alert:EVENT_TIME > DATEADD(minutes, $${CORRELATION_PERIOD_MINUTES}, ?)
AND alert:ALERT_ID = ?
`

for (const row of exec(GET_ALERTS_WITHOUT_CORRELATION_ID)) {
alert_body = row['ALERT']
correlation_id = find_related_correlation_id(alert_body)
event_time = String(alert_body['EVENT_TIME'])
alert_id = alert_body['ALERT_ID']

alert_correlation_result_array.push({
alert_id: alert_id,
alert_correlation_result: exec(UPDATE_ALERT_CORRELATION_ID, [
correlation_id,
event_time,
alert_id,
]),
})
}

return alert_correlation_result_array
19 changes: 19 additions & 0 deletions tasks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ resource "snowflake_task" "snowalert_suppression_merge_task" {
]
}


resource "snowflake_task" "alert_processor_task" {
provider = snowflake.security_alerting_role

warehouse = local.snowalert_warehouse_name
database = local.snowalert_database_name
schema = local.results_schema
name = "ALERT_PROCESSOR"

after = [snowflake_task.snowalert_suppression_merge_task.name]
sql_statement = "CALL ${local.results_schema}.${snowflake_procedure.alert_processor_with_default_correlation_period.name}()"
enabled = true

depends_on = [
module.snowalert_grants
]
}


resource "snowflake_task" "alert_dispatcher_task" {
provider = snowflake.security_alerting_role

Expand Down

0 comments on commit d0e9159

Please sign in to comment.