Skip to content

Commit

Permalink
refactor blob store config
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Nov 20, 2024
1 parent 3d3b292 commit edc473d
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 281 deletions.
1 change: 1 addition & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ type MetricName =
| 'fullhistory_short'
| 'match_archive_read'
| 'match_archive_write'
| 'blob_archive_read'
| 'auto_parse'
| 'added_match'
| 'distinct_match_player'
Expand Down
2 changes: 1 addition & 1 deletion store/buildMatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async function buildMatch(
let [match, odData]: [
Match | ParsedMatch | null,
GetMatchDataMetadata | null,
] = await getMatchDataFromBlobWithMetadata(matchId, true);
] = await getMatchDataFromBlobWithMetadata(matchId);
if (!match) {
return null;
}
Expand Down
68 changes: 39 additions & 29 deletions store/buildStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,57 +72,71 @@ export async function buildStatus() {
'+inf',
),
tracked_players: async () => redis.zcard('tracked'),

registry_proxy: async () => redis.zcard('registry:proxy'),
registry_retriever: async () => redis.zcard('registry:retriever'),
registry_parser: async () => redis.zcard('registry:parser'),

matches_last_day: async () => countDay('added_match'),
matches_prev_hour: async () => countLastHour('added_match'),
distinct_match_players_last_day: async () =>
countDayDistinct('distinct_match_player'),
distinct_match_players_user_last_day: async () =>
countDayDistinct('distinct_match_player_user'),
distinct_match_players_recent_user_last_day: async () =>
countDayDistinct('distinct_match_player_recent_user'),
matches_prev_hour: async () => countLastHour('added_match'),
auto_parse_last_day: async () => countDay('auto_parse'),
requests_last_day: async () => countDay('request'),
distinct_requests_last_day: async () =>
countDayDistinct('distinct_request'),
requests_ui_day: async () => countDay('request_ui'),
requests_api_key_last_day: async () => countDay('request_api_key'),
registry_proxy: async () => redis.zcard('registry:proxy'),
registry_retriever: async () => redis.zcard('registry:retriever'),
registry_parser: async () => redis.zcard('registry:parser'),

retriever_matches_current_hour: async () => countHour('retriever'),
retriever_matches_last_day: async () => countDay('retriever'),
retriever_players_last_day: async () => countDay('retriever_player'),
auto_parse_last_day: async () => countDay('auto_parse'),
parse_jobs_last_day: async () => countDay('parser_job'),
parse_fails_last_day: async () => countDay('parser_fail'),
parse_crashes_last_day: async () => countDay('parser_crash'),
parse_skips_last_day: async () => countDay('parser_skip'),
parsed_matches_last_day: async () => countDay('parser'),
reparse_early_last_day: async () => countDay('reparse_early'),
reapi_last_day: async () => countDay('reapi'),
regcdata_last_day: async () => countDay('regcdata'),
reparse_last_day: async () => countDay('reparse'),
oldparse_last_day: async () => countDay('oldparse'),
meta_parsed_last_day: async () => countDay('meta_parse'),

requests_last_day: async () => countDay('request'),
distinct_requests_last_day: async () =>
countDayDistinct('distinct_request'),
requests_ui_day: async () => countDay('request_ui'),
requests_api_key_last_day: async () => countDay('request_api_key'),
fullhistory_last_day: async () => countDay('fullhistory'),
fullhistory_short_last_day: async () => countDay('fullhistory_short'),
fullhistory_ops_last_day: async () => countDay('fullhistory_op'),
fullhistory_skips_last_day: async () => countDay('fullhistory_skip'),
// gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'),
steam_api_calls_last_day: async () => countDay('steam_api_call'),
steam_proxy_calls_last_day: async () => countDay('steam_proxy_call'),
steam_429_last_day: async () => countDay('steam_429'),
steam_403_last_day: async () => countDay('steam_403'),
steam_api_backfill_last_day: async () => countDay('steam_api_backfill'),
steam_api_notfound_last_day: async () => countDay('steam_api_notfound'),
steam_gc_backfill_last_day: async () => countDay('steam_gc_backfill'),
meta_parsed_last_day: async () => countDay('meta_parse'),

// reapi_last_day: async () => countDay('reapi'),
regcdata_last_day: async () => countDay('regcdata'),
reparse_last_day: async () => countDay('reparse'),
reparse_early_last_day: async () => countDay('reparse_early'),
// oldparse_last_day: async () => countDay('oldparse'),

blob_archive_read_last_day: async () => countDay('blob_archive_read'),
match_archive_read_last_day: async () => countDay('match_archive_read'),
match_archive_write_last_day: async () => countDay('match_archive_write'),
incomplete_archive_last_day: async () => countDay('incomplete_archive'),

api_hits_last_day: async () => countDay('api_hits'),
api_hits_ui_last_day: async () => countDay('api_hits_ui'),
build_match_last_day: async () => countDay('build_match'),
get_player_matches_last_day: async () => countDay('player_matches'),
self_player_matches_last_day: async () => countDay('self_profile_view'),
// self_player_matches_last_day: async () => countDay('self_profile_view'),
// gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'),
error_last_day: async () => countDay('500_error'),
web_crash_last_day: async () => countDay('web_crash'),
skip_seq_num_last_day: async () => countDay('skip_seq_num'),
secondary_scanner_last_day: async () => countDay('secondary_scanner'),

steam_api_calls_last_day: async () => countDay('steam_api_call'),
steam_proxy_calls_last_day: async () => countDay('steam_proxy_call'),
steam_429_last_day: async () => countDay('steam_429'),
steam_403_last_day: async () => countDay('steam_403'),
steam_api_backfill_last_day: async () => countDay('steam_api_backfill'),
// steam_api_notfound_last_day: async () => countDay('steam_api_notfound'),
// steam_gc_backfill_last_day: async () => countDay('steam_gc_backfill'),

match_cache_hit_last_day: async () => countDay('match_cache_hit'),
player_cache_hit_last_day: async () => countDay('player_cache_hit'),
player_cache_miss_last_day: async () => countDay('player_cache_miss'),
Expand All @@ -137,10 +151,6 @@ export async function buildStatus() {
auto_player_cache_last_day: async () => countDay('auto_player_cache'),
distinct_auto_player_cache_last_day: async () =>
countDayDistinct('distinct_auto_player_cache'),
error_last_day: async () => countDay('500_error'),
web_crash_last_day: async () => countDay('web_crash'),
skip_seq_num_last_day: async () => countDay('skip_seq_num'),
secondary_scanner_last_day: async () => countDay('secondary_scanner'),
api_paths: async () => {
const results = await redis.zrangebyscore(
'api_paths',
Expand Down
7 changes: 5 additions & 2 deletions store/getApiData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ const blobArchive = config.ENABLE_BLOB_ARCHIVE ? new Archive('blob') : null;
* @param matchId
* @returns
*/
export async function readApiData(matchId: number): Promise<ApiMatch | null> {
export async function readApiData(matchId: number, noBlobStore?: boolean): Promise<ApiMatch | null> {
const result = await cassandra.execute(
'SELECT api FROM match_blobs WHERE match_id = ?',
[matchId],
{ prepare: true, fetchSize: 1, autoPage: true },
);
const row = result.rows[0];
let data = row?.api ? (JSON.parse(row.api) as ApiMatch) : undefined;
if (!data && blobArchive) {
if (!data && blobArchive && !noBlobStore) {
const archive = await blobArchive.archiveGet(`${matchId}_api`);
if (archive) {
redisCount(redis, 'blob_archive_read');
}
data = archive ? JSON.parse(archive.toString()) as ApiMatch : undefined;
}
if (!data) {
Expand Down
227 changes: 8 additions & 219 deletions store/getArchivedData.ts
Original file line number Diff line number Diff line change
@@ -1,229 +1,18 @@
import config from '../config';
import { redisCount } from '../util/utility';
import { Archive } from './archive';
import {
getFullPlayerMatchesWithMetadata,
getMatchDataFromBlobWithMetadata,
} from './queries';
import db from './db';
import redis from './redis';
import cassandra from './cassandra';
import type { PutObjectCommandOutput } from '@aws-sdk/client-s3';
import { isDataComplete, redisCount } from '../util/utility';
import QueryStream from 'pg-query-stream';
import { Client } from 'pg';
import crypto from 'crypto';

const matchArchive = new Archive('match');
const playerArchive = new Archive('player');
const matchArchive = config.ENABLE_MATCH_ARCHIVE ? new Archive('match') : null;
const playerArchive = config.ENABLE_PLAYER_ARCHIVE ? new Archive('player') : null;

export async function doArchivePlayerMatches(
accountId: number,
): Promise<PutObjectCommandOutput | { message: string} | null> {
if (!config.ENABLE_PLAYER_ARCHIVE) {
return null;
}
// Fetch our combined list of archive and current, selecting all fields
const full = await getFullPlayerMatchesWithMetadata(accountId);
const toArchive = full[0];
console.log(full[1]);
toArchive.forEach((m, i) => {
Object.keys(m).forEach((key) => {
if (m[key as keyof ParsedPlayerMatch] === null) {
// Remove any null values from the matches for storage
delete m[key as keyof ParsedPlayerMatch];
}
});
});
// TODO (howard) Make sure the new list is longer than the old list
// Make sure we're archiving at least 1 match
if (!toArchive.length) {
return null;
}
// Put the blob
return playerArchive.archivePut(
accountId.toString(),
Buffer.from(JSON.stringify(toArchive)),
);
// TODO (howard) delete the archived values from player_caches
// TODO (howard) keep the 20 highest match IDs for recentMatches
// TODO (howard) mark the user archived so we don't need to query archive on every request
// TODO (howard) add redis counts
}

async function doArchiveFromBlob(matchId: number) {
if (!config.ENABLE_MATCH_ARCHIVE) {
return;
}
// Don't backfill when determining whether to archive
const [match, metadata] = await getMatchDataFromBlobWithMetadata(
matchId,
false,
);
if (!match) {
// Invalid/not found, skip
return;
}
if (metadata?.has_api && !metadata?.has_gcdata && !metadata?.has_parsed) {
// if it only contains API data, delete?
// If the match is old we might not be able to get back ability builds, HD/TD/HH
// We might also drop gcdata, identity, and ranks here
// await deleteMatch(matchId);
// console.log('DELETE match %s, apionly', matchId);
return;
}
if (metadata?.has_parsed) {
const isArchived = Boolean(
(
await db.raw(
'select match_id from parsed_matches where match_id = ? and is_archived IS TRUE',
[matchId],
)
).rows[0],
);
if (isArchived) {
console.log('ALREADY ARCHIVED match %s', matchId);
await deleteParsed(matchId);
return;
}
// check data completeness with isDataComplete
if (!isDataComplete(match as ParsedMatch)) {
redisCount(redis, 'incomplete_archive');
console.log('INCOMPLETE match %s', matchId);
return;
}
redisCount(redis, 'match_archive_write');
// console.log('SIMULATE ARCHIVE match %s', matchId);
// TODO (howard) don't actually archive until verification of data format
return;
// Archive the data since it's parsed. This might also contain api and gcdata
const blob = Buffer.from(JSON.stringify(match));
const result = await matchArchive.archivePut(matchId.toString(), blob);
if (result) {
// Mark the match archived
await db.raw(
`UPDATE parsed_matches SET is_archived = TRUE WHERE match_id = ?`,
[matchId],
);
// Delete the parsed data (this keeps the api and gcdata around in Cassandra since it doesn't take a ton of space)
await deleteParsed(matchId);
console.log('ARCHIVE match %s, parsed', matchId);
}
return result;
}
// if it's something else, e.g. contains api and gcdata only, leave it for now
// console.log('SKIP match %s, unparsed', matchId);
return;
}

async function deleteParsed(matchId: number) {
await cassandra.execute(
'DELETE parsed from match_blobs WHERE match_id = ?',
[matchId],
{
prepare: true,
},
);
}

export async function archivePostgresStream() {
// Archive parsed matches that aren't archived from postgres records
const max = await getCurrentMaxArchiveID();
const query = new QueryStream(
`
SELECT match_id
from parsed_matches
WHERE is_archived IS NULL
and match_id < ?
ORDER BY match_id asc`,
[max],
);
const pg = new Client(config.POSTGRES_URL);
await pg.connect();
const stream = pg.query(query);
let i = 0;
stream.on('readable', async () => {
let row;
while ((row = stream.read())) {
i += 1;
console.log(i);
try {
await doArchiveFromBlob(row.match_id);
} catch (e) {
console.error(e);
}
}
});
stream.on('end', async () => {
await pg.end();
});
}

async function archiveSequential(start: number, max: number) {
// Archive sequentially starting at a given ID (not all IDs may be valid)
for (let i = start; i < max; i++) {
console.log(i);
try {
await doArchiveFromBlob(i);
} catch (e) {
console.error(e);
}
}
}

async function archiveRandom(max: number) {
const rand = randomInt(0, max);
// Bruteforce 1000 IDs starting at a random value (not all IDs may be valid)
const page = [];
for (let i = 0; i < 1000; i++) {
page.push(rand + i);
}
console.log(page[0]);
await Promise.allSettled(page.map((i) => doArchiveFromBlob(i)));
}

export async function archiveToken(max: number) {
// Archive random matches from Cassandra using token range (not all may be parsed)
let page = await getTokenRange(1000);
page = page.filter((id) => id < max);
console.log(page[0]);
await Promise.allSettled(page.map((i) => doArchiveFromBlob(i)));
}

function randomBigInt(byteCount: number) {
return BigInt(`0x${crypto.randomBytes(byteCount).toString('hex')}`);
}

function randomInt(min: number, max: number) {
return Math.floor(Math.random() * (max - min) + min);
}

export async function getCurrentMaxArchiveID() {
// Get the current max_match_id from postgres, subtract 200000000
const max = (await db.raw('select max(match_id) from public_matches'))
?.rows?.[0]?.max;
const limit = max - 100000000;
return limit;
}

async function getTokenRange(size: number) {
// Convert to signed 64-bit integer
const signedBigInt = BigInt.asIntN(64, randomBigInt(8));
// Get a page of matches (efffectively random, but guaranteed sequential read on one node)
const result = await cassandra.execute(
'select match_id, token(match_id) from match_blobs where token(match_id) >= ? limit ? ALLOW FILTERING;',
[signedBigInt.toString(), size],
{
prepare: true,
fetchSize: size,
autoPage: true,
},
);
return result.rows.map((row) => Number(row.match_id));
}

export async function readArchivedPlayerMatches(
export async function tryReadArchivedPlayerMatches(
accountId: number,
): Promise<ParsedPlayerMatch[]> {
if (!playerArchive) {
return [];
}
console.time('archive:' + accountId);
const blob = await playerArchive.archiveGet(accountId.toString());
const arr = blob ? JSON.parse(blob.toString()) : [];
Expand All @@ -240,7 +29,7 @@ export async function tryReadArchivedMatch(
matchId: number,
): Promise<ParsedMatch | null> {
try {
if (!config.ENABLE_MATCH_ARCHIVE) {
if (!matchArchive) {
return null;
}
// Check if the parsed data is archived
Expand Down
Loading

0 comments on commit edc473d

Please sign in to comment.