From 9bced95f819529463368c969b506fc973ba060c7 Mon Sep 17 00:00:00 2001 From: bwebs Date: Fri, 2 Aug 2024 00:00:56 -0700 Subject: [PATCH] adding chunking and error output to users:bulk --- source/bulk.ts | 8 ++ source/cli.tsx | 5 + source/commands/TenantsBulk.tsx | 5 +- source/commands/UsersBulk.tsx | 235 +++++++++++++++++++++++++------- source/components/Version.tsx | 1 - source/mappings.tsx | 4 + 6 files changed, 204 insertions(+), 54 deletions(-) diff --git a/source/bulk.ts b/source/bulk.ts index 0852127..07de479 100644 --- a/source/bulk.ts +++ b/source/bulk.ts @@ -50,4 +50,12 @@ const getDb = (filename: string) => { } } +export const getChunk = (data: duckdb.TableData, chunk_size: number = 1) => { + let rows = data.splice(0, chunk_size) + return { + rows, + data + } +} + export default getDb; \ No newline at end of file diff --git a/source/cli.tsx b/source/cli.tsx index be2975c..249aa5f 100644 --- a/source/cli.tsx +++ b/source/cli.tsx @@ -6,6 +6,11 @@ import args from './lib/args.js'; import loadEnv from './lib/load-env.js'; import Version from './components/Version.js'; +// @ts-ignore +BigInt.prototype.toJSON = function () { + return this.toString(); +}; + const CLI = async () => { process.removeAllListeners('warning'); await loadEnv(); diff --git a/source/commands/TenantsBulk.tsx b/source/commands/TenantsBulk.tsx index 8cbfe70..3983478 100644 --- a/source/commands/TenantsBulk.tsx +++ b/source/commands/TenantsBulk.tsx @@ -170,9 +170,8 @@ export default () => { } } } - const add = modifiedToTenant(next as TTenantSubscriptionModified); - console.log(JSON.stringify(add, null, 2)); - await delay(20000); + + await delay(2000); await courier.tenants.createOrReplace( tenant_id, modifiedToTenant(next as TTenantSubscriptionModified), diff --git a/source/commands/UsersBulk.tsx b/source/commands/UsersBulk.tsx index 2c9cc75..b97c636 100644 --- a/source/commands/UsersBulk.tsx +++ b/source/commands/UsersBulk.tsx @@ -1,26 +1,52 @@ import {ProgressBar} from '@inkjs/ui'; +import { + MergeProfileResponse, + ReplaceProfileResponse, + SubscribeToListsResponse, +} from '@trycourier/courier/api/index.js'; import duckdb from 'duckdb'; import {Box, Text} from 'ink'; import _ from 'lodash'; +import fs from 'fs/promises'; import React, {useEffect, useState} from 'react'; import {useBoolean, useCounter} from 'usehooks-ts'; -import getDb from '../bulk.js'; +import getDb, {getChunk} from '../bulk.js'; import {useCliContext} from '../components/Context.js'; import Spinner from '../components/Spinner.js'; import UhOh from '../components/UhOh.js'; import delay from '../lib/delay.js'; +const DEFAULT_DELAY = 5000; +const DEFAULT_CHUNK_SIZE = 500; +const DEFAULT_TIMEOUT = 10; +const DEFAULT_ERROR_FILENAME = 'errors.json'; + +interface RowResponse { + userId: string; + success: Boolean; + error?: string; + index: number; +} + export default () => { const {parsedParams, courier} = useCliContext(); const [error, setError] = useState(); - const processing = useBoolean(false); + const processing = useBoolean(true); + const running = useBoolean(true); const [data, setData] = useState(); const [data_errors, setDataErrors] = useState([]); const counter = useCounter(0); + const [row_errors, setRowErrors] = useState([]); const filename = String(_.get(parsedParams, ['_', 0], '')); const {db, filetype, sql} = getDb(filename); + const delay_between_chunks = Number(parsedParams['delay']) ?? DEFAULT_DELAY; + const chunk_size = parsedParams['chunk_size'] + ? Number(parsedParams['chunk_size']) + : DEFAULT_CHUNK_SIZE; + + const log_errors = Boolean(parsedParams['errors']); const keep_flat = Boolean(parsedParams['keep-flat']); const remove_nulls = Boolean(parsedParams['remove-nulls']); const replace = Boolean(parsedParams['replace']); @@ -47,9 +73,13 @@ export default () => { } }, [data]); - const getData = () => { - processing.setTrue(); + useEffect(() => { + if (!processing.value) { + handleErrors(); + } + }, [processing.value]); + const getData = () => { db.all(sql, (err, result) => { if (err) { setError(err.message); @@ -59,66 +89,166 @@ export default () => { }); }; - const processData = async () => { - if (data?.length) { - for (let i = 0; i < data.length; i++) { - let {user_id, ...profile} = data[i] || {}; - let userId = user_id ? String(user_id) : undefined; - if (!userId) { - setDataErrors(p => [...p, `user_id not found in index ${i}`]); + const processChunkRows = (data: duckdb.RowData[], start_index: number) => { + return data.map((row, i) => { + const curr_index = start_index + i; + let {user_id, ...profile} = row || {}; + if (!user_id) { + return Promise.resolve({ + success: false, + userId: '__unknown__', + error: `user_id not found in index ${curr_index}`, + index: curr_index, + } as RowResponse); + } else { + Object.entries(profile).forEach(([key, value]) => { + if (filetype === 'csv' && !keep_flat) { + _.unset(profile, key); + _.set(profile, key, value); + } + if (value === null && remove_nulls) { + _.unset(profile, key); + } + }); + return processRow(user_id, profile, curr_index); + } + }); + }; + + const processRow: ( + userId: string, + profile: any, + index: number, + ) => Promise = async (userId, profile, index) => { + return new Promise(async resolve => { + let promises: Promise< + | SubscribeToListsResponse + | MergeProfileResponse + | ReplaceProfileResponse + | void + >[] = []; + + try { + if (replace) { + promises.push( + courier.profiles.replace( + userId, + {profile}, + {maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT}, + ), + ); } else { - try { - Object.entries(profile).forEach(([key, value]) => { - if (filetype === 'csv' && !keep_flat) { - _.unset(profile, key); - _.set(profile, key, value); - } - if (value === null && remove_nulls) { - _.unset(profile, key); - } - }); - if (replace) { - await courier.profiles.replace(userId, {profile}); - } else { - await courier.profiles.create(userId, {profile}); - } - - if (lists.length) { - await courier.profiles.subscribeToLists(userId, { + promises.push( + courier.profiles.create( + userId, + {profile}, + {maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT}, + ), + ); + } + + if (lists.length) { + promises.push( + courier.profiles.subscribeToLists( + userId, + { lists: lists.map(l => ({listId: l})), - }); - } - if (tenants.length) { - await courier.users.tenants.addMultple(userId, { + }, + {maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT}, + ), + ); + } + + if (tenants.length) { + promises.push( + courier.users.tenants.addMultple( + userId, + { tenants: tenants.map(t => ({tenant_id: t})), - }); - } - counter.increment(); - delay(10000); - } catch (err) { - setDataErrors(p => [ - ...p, - `user_id (${user_id}) failed to update in index ${i}: ${String( - err, - )}`, - ]); - } + }, + {maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT}, + ), + ); } + await Promise.all(promises); + counter.increment(); + return resolve({userId, success: true, index}); + } catch (error: any) { + counter.increment(); + return resolve({ + userId, + success: false, + index, + error: + (String(error) ?? + error?.message ?? + error.message ?? + 'Unknown Error') + `+ ${userId}`, + }); } + }); + }; + + const processData = async () => { + if (data?.length) { + let data_copy = [...data]; + let counter = 0; + let {rows, data: rest} = getChunk(data_copy, chunk_size); + while (rows?.length) { + const chunk = processChunkRows(rows, counter); + const processed_chunks = await Promise.all(chunk); + const errors = processed_chunks.filter(r => !r.success); + if (errors.length) { + setDataErrors(p => [ + ...p, + ...errors.map(r => { + return `user_id (${r.userId}) failed to update in index ${ + r.index + }: ${String(r.error)}`; + }), + ]); + setRowErrors(r => [ + ...r, + ...errors.map(e => data[e.index]! as duckdb.RowData), + ]); + } + if (rest.length > 0) { + await delay(delay_between_chunks); + counter += rows.length; + const next = getChunk(rest, chunk_size); + rows = next.rows; + rest = next.data; + } else { + processing.setFalse(); + break; + } + } + } + }; + + const handleErrors = async () => { + if (row_errors.length && log_errors) { + await fs.writeFile( + DEFAULT_ERROR_FILENAME, + JSON.stringify(row_errors, null, 2), + { + encoding: 'utf-8', + }, + ); + running.setFalse(); + } else { + running.setFalse(); } - processing.setFalse(); }; if (!filename?.length) { return ; } else if (error?.length) { return ; - } else if (data && processing.value) { + } else if (data && running.value) { return ( <> - + ); @@ -131,6 +261,11 @@ export default () => { {data_errors.map((err, i) => { return ; })} + {log_errors && data_errors.length ? ( + Errors output to {DEFAULT_ERROR_FILENAME} + ) : ( + <> + )} ); } diff --git a/source/components/Version.tsx b/source/components/Version.tsx index 3db8773..ae9018f 100644 --- a/source/components/Version.tsx +++ b/source/components/Version.tsx @@ -12,7 +12,6 @@ const Version = () => { }, []); const getVersion = async () => { - console.log('getVersion'); try { const exc = await execa( 'npm', diff --git a/source/mappings.tsx b/source/mappings.tsx index 96759b5..e11ba64 100644 --- a/source/mappings.tsx +++ b/source/mappings.tsx @@ -286,6 +286,10 @@ mappings.set('users:bulk', { value: 'Add all users to the specified tenant. Accepts comma-separated list. Note this will not automatically create the tenant, but the tenant memberships will exist and sending to this tenant_id will still succeed. ', }, + { + option: '--errors', + value: 'Output errors', + }, ], example: [ `courier users:bulk examples/users.csv --replace`,