Skip to content

Commit

Permalink
control-plane: delete old log_lines
Browse files Browse the repository at this point in the history
Upgrades the `delete_old_drafts` function to `delete_old_rows`, because
it's now our one-stop-shop for daily cleanup. This adds deletion of old
`internal.log_lines` and `catalog_stats_hourly`, which would otherwise
accumulate indefinitely.

This also reduces the retention window of drafts to 10 days, since that
is what's been used when cleaning them up manually.
  • Loading branch information
psFried committed Sep 26, 2023
1 parent 6c4f463 commit dd0b0b0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 31 deletions.
28 changes: 0 additions & 28 deletions supabase/migrations/26_delete_old_drafts.sql

This file was deleted.

49 changes: 49 additions & 0 deletions supabase/migrations/26_delete_old_rows.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
begin;
-- The purpose of this is to cleanup old drafts, draft_specs, discovers, evolutions, etc.
-- These things can pile up over time, and there's no need to retain them for a long time.
-- Note that the cleanup of draft_specs, discovers, and evolutions happens due to cascading
-- deletions from drafts.

-- We need to add the foreign key constraint to evolutions, since it was not there originally.
delete from evolutions e where not exists (select d.id from drafts d where d.id = e.draft_id);
alter table evolutions add foreign key (draft_id) references drafts(id) on delete cascade;

create or replace function internal.delete_old_rows()
returns jsonb as $$
declare
n_drafts integer;
n_logs integer;
n_hourly_stats integer;
begin
with d as (
delete from public.drafts where updated_at < (now() - '10 days'::interval) returning *
)
select into n_drafts count(*) as n from d;

-- log_lines gets a lot of volume, so we use a much shorter retention period with them.
with l as (
delete from internal.log_lines where logged_at < (now() - '2 days'::interval) returning *
)
select into n_logs count(*) as n from l;

with s as (
delete from catalog_stats_hourly where grain = 'hourly' and ts < (now() - '30 days'::interval) returning *
)
select into n_hourly_stats count(*) from s;

return json_build_object(
'drafts', coalesce(n_drafts, 0),
'log_lines', coalesce(n_logs, 0),
'catalog_stats_hourly', coalesce(n_hourly_stats, 0)
);
end;
$$ language plpgsql security definer;

create extension if not exists pg_cron with schema extensions;
select cron.schedule(
'delete-drafts', -- name of the cron job
'0 05 * * *', -- Every day at 05:00Z
$$ select internal.delete_old_rows() $$
);

commit;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create function tests.test_draft_deletion()
create function tests.test_delete_old_rows()
returns setof text as $$
declare
test_user_id uuid = '5ec31342-bfda-43d7-ac62-107e7e2f7f96';
Expand All @@ -16,7 +16,7 @@ begin
insert into connector_tags (id, connector_id, image_tag) values
(test_connector_tag_id, '33:33:33:33:33:33:33:97', ':v0');

insert into drafts (user_id, updated_at) values (test_user_id, now() - '31 days'::interval) returning id into old_draft_id;
insert into drafts (user_id, updated_at) values (test_user_id, now() - '11 days'::interval) returning id into old_draft_id;
insert into drafts (user_id) values (test_user_id) returning id into new_draft_id;

insert into draft_specs (draft_id, catalog_name, spec, spec_type) values
Expand All @@ -32,7 +32,24 @@ begin
(old_draft_id, test_connector_tag_id, 'should delete', 'a/b/c', '{"type": "discoverFailed"}', '{}'),
(new_draft_id, test_connector_tag_id, 'should retain', 'a/b/d', '{"type": "discoverFailed"}', '{}');

return query select ok(internal.delete_old_drafts() = 1, 'one draft should have been deleted');

insert into internal.log_lines (log_line, stream, logged_at, token) values
('should delete line', 'foo', now() - '90 days'::interval, gen_random_uuid()),
('should delete line too', 'foo', now() - '3 days'::interval, gen_random_uuid()),
('should keep line', 'foo', now() - '1 days'::interval, gen_random_uuid()),
('should keep line too', 'foo', now() - '5 minutes'::interval, gen_random_uuid());

insert into catalog_stats_hourly (ts, grain, catalog_name, flow_document) values
(now() - '90 days'::interval, 'hourly', 'a/b/c', '{"should":"delete1"}'::json),
(now() - '31 days'::interval, 'hourly', 'a/b/c', '{"should":"delete2"}'::json),
(now() - '29 days'::interval, 'hourly', 'a/b/c', '{"should":"keep1"}'::json),
(now() - '5 minutes'::interval, 'hourly', 'a/b/c', '{"should":"keep2"}'::json);

--return query select ok(internal.delete_old_drafts() = 1, 'one draft should have been deleted');
return query select results_eq(
$i$ select internal.delete_old_rows() $i$,
$i$ values ('{"drafts": 1, "log_lines": 2, "catalog_stats_hourly": 2}'::jsonb) $i$
);

return query select results_eq(
$i$ select ds.catalog_name::text
Expand Down Expand Up @@ -61,6 +78,20 @@ begin
$i$,
$i$ values ('should retain') $i$
);

return query select results_eq(
$i$ select flow_document->>'should'
from catalog_stats_hourly
where flow_document->>'should' is not null
order by flow_document->>'should'
$i$,
$i$ values ('keep1'), ('keep2') $i$
);

return query select results_eq(
$i$ select log_line from internal.log_lines where log_line like 'should %' order by log_line $i$,
$i$ values ('should keep line'), ('should keep line too') $i$
);
end;
$$ language plpgsql;

0 comments on commit dd0b0b0

Please sign in to comment.