Skip to content

Commit

Permalink
set up proxy in registry
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Nov 19, 2024
1 parent 5f4b311 commit 7ca4f74
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 38 deletions.
3 changes: 2 additions & 1 deletion config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ const defaults = {
DISABLE_OLD_PARSE: '', // Disable parsing for old non-league matches unlikely to have replays
API_KEY_GEN_THRESHOLD: '0', // Account ID requirement (delta from max) for generating API keys
SERVICE_REGISTRY_HOST: '', // Host for external services to register themselves at
USE_SERVICE_REGISTRY: '', // Use the service registry for determining gc and parser urls
USE_SERVICE_REGISTRY: '', // Use the service registry for determining gc, parser, and proxy urls
SCANNER_OFFSET: '0', // Delay in match seq num value to run secondary scanner (to pick up missing matches)
EXTERNAL: '', // Indicates that the service resides outside the registry and should report an external IP
};
if (process.env.NODE_ENV === 'development') {
// force PORT to null in development so we can run multiple web services without conflict
Expand Down
3 changes: 2 additions & 1 deletion store/buildStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ export async function buildStatus() {
countDayDistinct('distinct_request'),
requests_ui_day: async () => countDay('request_ui'),
requests_api_key_last_day: async () => countDay('request_api_key'),
registry_proxy: async () => redis.zcard('registry:proxy'),
registry_retriever: async () => redis.zcard('registry:retriever'),
registry_parser: async () => redis.zcard('registry:parser'),
retriever_matches_current_hour: async () => countHour('retriever'),
retriever_matches_last_day: async () => countDay('retriever'),
retriever_players_last_day: async () => countDay('retriever_player'),
registry_parser: async () => redis.zcard('registry:parser'),
parse_jobs_last_day: async () => countDay('parser_job'),
parse_fails_last_day: async () => countDay('parser_fail'),
parse_crashes_last_day: async () => countDay('parser_crash'),
Expand Down
5 changes: 1 addition & 4 deletions store/getGcData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import redis from './redis';
import cassandra from './cassandra';
import {
getRandomRetrieverUrl,
getRegistryUrl,
redisCount,
} from '../util/utility';
import axios from 'axios';
Expand Down Expand Up @@ -56,9 +55,7 @@ async function saveGcData(
matchId: number,
extraData: GcExtraData,
): Promise<string | null> {
const url = config.USE_SERVICE_REGISTRY
? await getRegistryUrl('retriever', `/match/${matchId}`)
: getRandomRetrieverUrl(`/match/${matchId}`);
const url = await getRandomRetrieverUrl(`/match/${matchId}`);
const { data, headers } = await axios.get<typeof retrieverMatch>(url, {
timeout: 5000,
});
Expand Down
5 changes: 1 addition & 4 deletions store/getParsedData.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import config from '../config';
import {
getRandomParserUrl,
getRegistryUrl,
redisCount,
} from '../util/utility';
import { Archive } from './archive';
Expand Down Expand Up @@ -74,9 +73,7 @@ export async function saveParseData(
// bunzip: 6716ms (bunzip2 7503212404_1277518156.dem.bz2)
// parse: 9407ms (curl -X POST --data-binary "@7503212404_1277518156.dem" odota-parser:5600 > output.log)
// process: 3278ms (node processors/createParsedDataBlob.mjs < output.log)
const parseUrl = config.USE_SERVICE_REGISTRY
? await getRegistryUrl('parser', `/blob?replay_url=${replayUrl}`)
: getRandomParserUrl(`/blob?replay_url=${replayUrl}`);
const parseUrl = await getRandomParserUrl(`/blob?replay_url=${replayUrl}`);
console.log('[PARSER]', parseUrl);
const resp = await axios.get<ParserMatch>(parseUrl, { timeout: 150000 });
if (!resp.data) {
Expand Down
10 changes: 8 additions & 2 deletions store/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ export async function runQueue(
queueName: QueueName,
parallelism: number,
processor: (job: any) => Promise<void>,
getCapacity?: () => Promise<number>,
) {
const executor = async () => {
const executor = async (i: number) => {
// Since this may block, we need a separate client for each parallelism!
// Otherwise the workers cannot issue redis commands since something is waiting for redis to return a job
const consumer = new Redis(config.REDIS_URL);
while (true) {
// If we have a way to measure capacity, throttle the processing speed based on capacity
if (getCapacity && i >= await getCapacity()) {
await new Promise(resolve => setTimeout(resolve, 5000));
continue;
}
const job = await consumer.blpop(queueName, '0');
if (job) {
const jobData = JSON.parse(job[1]);
Expand All @@ -37,7 +43,7 @@ export async function runQueue(
}
};
for (let i = 0; i < parallelism; i++) {
executor();
executor(i);
}
}

Expand Down
4 changes: 1 addition & 3 deletions svc/backupscanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import {
invokeIntervalAsync,
eachLimitPromise,
} from '../util/utility';
const apiKeys = config.STEAM_API_KEY.split(',');
const apiHosts = config.STEAM_API_HOST.split(',');
const parallelism = Math.min(apiHosts.length, apiKeys.length);
const parallelism = 1;
const delay = 1000;

async function processMatch(matchId: number) {
Expand Down
11 changes: 5 additions & 6 deletions svc/fullhistory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import redis from '../store/redis';
import { runQueue } from '../store/queue';
import { getPlayerMatches } from '../store/queries';
import { insertMatch } from '../store/insert';
const apiKeys = config.STEAM_API_KEY.split(',');
const apiHosts = config.STEAM_API_HOST.split(',');
// Approximately 5 req/sec limit per apiHost
// Short fullhistory uses 1 req, long 5 req, some percentage will need to query for matches
const parallelism = Math.min(apiHosts.length * 3, apiKeys.length);

async function updatePlayer(player: FullHistoryJob) {
// done with this player, update
Expand Down Expand Up @@ -145,8 +140,12 @@ async function processFullHistory(job: FullHistoryJob) {
console.timeEnd('doFullHistory: ' + player.account_id.toString());
}

// Approximately 5 req/sec limit per apiHost
// Short fullhistory uses 1 req, long 5 req, some percentage will need to query for up to 500 matches
runQueue(
'fhQueue',
Number(config.FULLHISTORY_PARALLELISM) || parallelism,
Number(config.FULLHISTORY_PARALLELISM) || 1,
processFullHistory,
// Currently not using proxy so don't need to throttle capacity
// async () => redis.zcard('registry:proxy'),
);
3 changes: 1 addition & 2 deletions svc/mmr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import {
getRetrieverCount,
redisCount,
getRandomRetrieverUrl,
getRegistryUrl,
} from '../util/utility';
import axios from 'axios';

async function processMmr(job: MmrJob) {
const accountId = job.account_id;
const url = getRandomRetrieverUrl(`/profile/${accountId}`);
const url = await getRandomRetrieverUrl(`/profile/${accountId}`);
console.log(url);
const { data } = await axios.get(url, {
timeout: 5000,
Expand Down
19 changes: 19 additions & 0 deletions svc/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import httpProxy from 'http-proxy';
import http from 'http';
import config from '../config';
import child_process from 'child_process';
import axios from 'axios';
import os from 'os';
const { PORT, PROXY_PORT } = config;
const port = PORT || PROXY_PORT;
const proxy = httpProxy.createProxyServer({
Expand All @@ -16,3 +19,19 @@ const server = http.createServer((req, res) => {
});
server.listen(port);
console.log('listening on port %s', port);
if (config.SERVICE_REGISTRY_HOST) {
let ip = os.networkInterfaces()?.eth0?.[0]?.address;
if (config.EXTERNAL) {
ip = child_process.spawnSync(`curl ${config.SERVICE_REGISTRY_HOST}/ip`).stdout.toString();
}
setInterval(() => {
// Re-register ourselves as available
const registerUrl = `https://${
config.SERVICE_REGISTRY_HOST
}/register/proxy/${ip}?key=${
config.RETRIEVER_SECRET
}`;
console.log('registerUrl: %s', registerUrl);
axios.post(registerUrl);
}, 5000);
}
15 changes: 7 additions & 8 deletions svc/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ import config from '../config';
import redis from '../store/redis';
import { insertMatch } from '../store/insert';
import type { ApiMatch } from '../store/pgroup';
import { generateJob, getSteamAPIData, redisCount } from '../util/utility';

const apiKeys = config.STEAM_API_KEY.split(',');
const apiHosts = config.STEAM_API_HOST.split(',');
const parallelism = Math.min(apiHosts.length, apiKeys.length);
import { generateJob, getApiHosts, getSteamAPIData, redisCount } from '../util/utility';
const API_KEYS = config.STEAM_API_KEY.split(',');
const PAGE_SIZE = 100;
// This endpoint is limited to something like 1 request every 5 seconds
const SCANNER_WAIT = 5000;
const SCANNER_WAIT_CATCHUP = SCANNER_WAIT / parallelism;

async function scanApi(seqNum: number) {
const offset = Number(config.SCANNER_OFFSET);
Expand All @@ -24,14 +20,17 @@ async function scanApi(seqNum: number) {
await new Promise(resolve => setTimeout(resolve, SCANNER_WAIT));
continue;
}
const apiHosts = await getApiHosts();
const parallelism = Math.min(apiHosts.length, API_KEYS.length);
const scannerWaitCatchup = SCANNER_WAIT / parallelism;
const container = generateJob('api_sequence', {
start_at_match_seq_num: nextSeqNum,
});
let data = null;
try {
data = await getSteamAPIData({
url: container.url,
proxy: true,
proxy: apiHosts,
});
} catch (err: any) {
// unretryable steam error
Expand Down Expand Up @@ -65,7 +64,7 @@ async function scanApi(seqNum: number) {
await new Promise((resolve) =>
setTimeout(
resolve,
resp.length < PAGE_SIZE ? SCANNER_WAIT : SCANNER_WAIT_CATCHUP,
resp.length < PAGE_SIZE ? SCANNER_WAIT : scannerWaitCatchup,
),
);
}
Expand Down
5 changes: 5 additions & 0 deletions svc/web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ app.get('/healthz', (req, res) => {
res.end('ok');
});

app.get('/ip', async (req, res, cb) => {
// Echo back the client's ip
res.end(req.ip);
});

app.post('/register/:service/:host', async (req, res, cb) => {
// check secret matches
if (config.RETRIEVER_SECRET && config.RETRIEVER_SECRET !== req.query.key) {
Expand Down
41 changes: 34 additions & 7 deletions util/utility.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ type GetDataOptions = {
timeout?: number;
raw?: boolean;
noRetry?: boolean;
proxy?: boolean;
proxy?: string[];
};
function getSteamAPIDataCallback(url: string | GetDataOptions, cb: ErrorCb) {
let u: string;
Expand All @@ -192,9 +192,8 @@ function getSteamAPIDataCallback(url: string | GetDataOptions, cb: ErrorCb) {
parse.query.key = apiKeys[Math.floor(Math.random() * apiKeys.length)];
parse.search = null;
if (typeof url === 'object' && url.proxy) {
// choose a steam api host
const apiHosts = config.STEAM_API_HOST.split(',');
parse.host = apiHosts[Math.floor(Math.random() * apiHosts.length)];
// choose one of the passed hosts
parse.host = url.proxy[Math.floor(Math.random() * url.proxy.length)];
}
if (parse.host === 'api.steampowered.com') {
redisCount(null, 'steam_api_call');
Expand Down Expand Up @@ -816,21 +815,31 @@ export function getRetrieverCount() {
* Return a URL to use for GC data retrieval.
* @returns
*/
export function getRandomRetrieverUrl(path: string): string {
export async function getRandomRetrieverUrl(path: string): Promise<string> {
if (config.USE_SERVICE_REGISTRY) {
return getRegistryUrl('retriever', path);
}
const urls = RETRIEVER_ARRAY.map((r) => {
return `http://${r}${path}?key=${config.RETRIEVER_SECRET}`;
});
return urls[Math.floor(Math.random() * urls.length)];
}

export function getRandomParserUrl(path: string): string {
/**
* Return a URL to use for replay parsing.
* @returns
*/
export async function getRandomParserUrl(path: string): Promise<string> {
if (config.USE_SERVICE_REGISTRY) {
return getRegistryUrl('parser', path);
}
const urls = PARSER_ARRAY.map((r) => {
return `http://${r}${path}`;
});
return urls[Math.floor(Math.random() * urls.length)];
}

export async function getRegistryUrl(service: string, path: string) {
async function getRegistryUrl(service: string, path: string) {
const redis = (await import('../store/redis.js')).redis;
// Purge values older than 10 seconds (stale heartbeat)
await redis.zremrangebyscore(
Expand All @@ -845,6 +854,24 @@ export async function getRegistryUrl(service: string, path: string) {
}`;
}

/**
* Return an array of hostnames to use for Steam API requests
* @returns
*/
export async function getApiHosts(): Promise<string[]> {
if (config.USE_SERVICE_REGISTRY) {
const redis = (await import('../store/redis.js')).redis;
// Purge values older than 10 seconds (stale heartbeat)
await redis.zremrangebyscore(
'registry:' + 'proxy',
'-inf',
Date.now() - 10000,
);
return redis.zrange('registry:proxy', 0, -1);
}
return config.STEAM_API_HOST.split(',')
}

/**
* Increments an hourly Redis counter for the metric
* @param redis The Redis instance (null to dynamic import the default redis)
Expand Down

0 comments on commit 7ca4f74

Please sign in to comment.