Skip to content

Commit

Permalink
Merge pull request #18 from trycourier/bryan/bulk-profiles-chunking
Browse files Browse the repository at this point in the history
adding chunking and error output to users:bulk
  • Loading branch information
bwebs authored Aug 2, 2024
2 parents b7b90cb + 9bced95 commit b6b7c62
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 54 deletions.
8 changes: 8 additions & 0 deletions source/bulk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ const getDb = (filename: string) => {
}
}

export const getChunk = (data: duckdb.TableData, chunk_size: number = 1) => {
let rows = data.splice(0, chunk_size)
return {
rows,
data
}
}

export default getDb;
5 changes: 5 additions & 0 deletions source/cli.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import args from './lib/args.js';
import loadEnv from './lib/load-env.js';
import Version from './components/Version.js';

// @ts-ignore
BigInt.prototype.toJSON = function () {
return this.toString();
};

const CLI = async () => {
process.removeAllListeners('warning');
await loadEnv();
Expand Down
5 changes: 2 additions & 3 deletions source/commands/TenantsBulk.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,8 @@ export default () => {
}
}
}
const add = modifiedToTenant(next as TTenantSubscriptionModified);
console.log(JSON.stringify(add, null, 2));
await delay(20000);

await delay(2000);
await courier.tenants.createOrReplace(
tenant_id,
modifiedToTenant(next as TTenantSubscriptionModified),
Expand Down
235 changes: 185 additions & 50 deletions source/commands/UsersBulk.tsx
Original file line number Diff line number Diff line change
@@ -1,26 +1,52 @@
import {ProgressBar} from '@inkjs/ui';
import {
MergeProfileResponse,
ReplaceProfileResponse,
SubscribeToListsResponse,
} from '@trycourier/courier/api/index.js';
import duckdb from 'duckdb';
import {Box, Text} from 'ink';
import _ from 'lodash';
import fs from 'fs/promises';
import React, {useEffect, useState} from 'react';
import {useBoolean, useCounter} from 'usehooks-ts';
import getDb from '../bulk.js';
import getDb, {getChunk} from '../bulk.js';
import {useCliContext} from '../components/Context.js';
import Spinner from '../components/Spinner.js';
import UhOh from '../components/UhOh.js';
import delay from '../lib/delay.js';

const DEFAULT_DELAY = 5000;
const DEFAULT_CHUNK_SIZE = 500;
const DEFAULT_TIMEOUT = 10;
const DEFAULT_ERROR_FILENAME = 'errors.json';

interface RowResponse {
userId: string;
success: Boolean;
error?: string;
index: number;
}

export default () => {
const {parsedParams, courier} = useCliContext();
const [error, setError] = useState<string | undefined>();
const processing = useBoolean(false);
const processing = useBoolean(true);
const running = useBoolean(true);
const [data, setData] = useState<duckdb.TableData | undefined>();
const [data_errors, setDataErrors] = useState<string[]>([]);
const counter = useCounter(0);
const [row_errors, setRowErrors] = useState<duckdb.RowData[]>([]);

const filename = String(_.get(parsedParams, ['_', 0], ''));
const {db, filetype, sql} = getDb(filename);

const delay_between_chunks = Number(parsedParams['delay']) ?? DEFAULT_DELAY;
const chunk_size = parsedParams['chunk_size']
? Number(parsedParams['chunk_size'])
: DEFAULT_CHUNK_SIZE;

const log_errors = Boolean(parsedParams['errors']);
const keep_flat = Boolean(parsedParams['keep-flat']);
const remove_nulls = Boolean(parsedParams['remove-nulls']);
const replace = Boolean(parsedParams['replace']);
Expand All @@ -47,9 +73,13 @@ export default () => {
}
}, [data]);

const getData = () => {
processing.setTrue();
useEffect(() => {
if (!processing.value) {
handleErrors();
}
}, [processing.value]);

const getData = () => {
db.all(sql, (err, result) => {
if (err) {
setError(err.message);
Expand All @@ -59,66 +89,166 @@ export default () => {
});
};

const processData = async () => {
if (data?.length) {
for (let i = 0; i < data.length; i++) {
let {user_id, ...profile} = data[i] || {};
let userId = user_id ? String(user_id) : undefined;
if (!userId) {
setDataErrors(p => [...p, `user_id not found in index ${i}`]);
const processChunkRows = (data: duckdb.RowData[], start_index: number) => {
return data.map((row, i) => {
const curr_index = start_index + i;
let {user_id, ...profile} = row || {};
if (!user_id) {
return Promise.resolve({
success: false,
userId: '__unknown__',
error: `user_id not found in index ${curr_index}`,
index: curr_index,
} as RowResponse);
} else {
Object.entries(profile).forEach(([key, value]) => {
if (filetype === 'csv' && !keep_flat) {
_.unset(profile, key);
_.set(profile, key, value);
}
if (value === null && remove_nulls) {
_.unset(profile, key);
}
});
return processRow(user_id, profile, curr_index);
}
});
};

const processRow: (
userId: string,
profile: any,
index: number,
) => Promise<RowResponse> = async (userId, profile, index) => {
return new Promise(async resolve => {
let promises: Promise<
| SubscribeToListsResponse
| MergeProfileResponse
| ReplaceProfileResponse
| void
>[] = [];

try {
if (replace) {
promises.push(
courier.profiles.replace(
userId,
{profile},
{maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT},
),
);
} else {
try {
Object.entries(profile).forEach(([key, value]) => {
if (filetype === 'csv' && !keep_flat) {
_.unset(profile, key);
_.set(profile, key, value);
}
if (value === null && remove_nulls) {
_.unset(profile, key);
}
});
if (replace) {
await courier.profiles.replace(userId, {profile});
} else {
await courier.profiles.create(userId, {profile});
}

if (lists.length) {
await courier.profiles.subscribeToLists(userId, {
promises.push(
courier.profiles.create(
userId,
{profile},
{maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT},
),
);
}

if (lists.length) {
promises.push(
courier.profiles.subscribeToLists(
userId,
{
lists: lists.map(l => ({listId: l})),
});
}
if (tenants.length) {
await courier.users.tenants.addMultple(userId, {
},
{maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT},
),
);
}

if (tenants.length) {
promises.push(
courier.users.tenants.addMultple(
userId,
{
tenants: tenants.map(t => ({tenant_id: t})),
});
}
counter.increment();
delay(10000);
} catch (err) {
setDataErrors(p => [
...p,
`user_id (${user_id}) failed to update in index ${i}: ${String(
err,
)}`,
]);
}
},
{maxRetries: 5, timeoutInSeconds: DEFAULT_TIMEOUT},
),
);
}
await Promise.all(promises);
counter.increment();
return resolve({userId, success: true, index});
} catch (error: any) {
counter.increment();
return resolve({
userId,
success: false,
index,
error:
(String(error) ??
error?.message ??
error.message ??
'Unknown Error') + `+ ${userId}`,
});
}
});
};

const processData = async () => {
if (data?.length) {
let data_copy = [...data];
let counter = 0;
let {rows, data: rest} = getChunk(data_copy, chunk_size);
while (rows?.length) {
const chunk = processChunkRows(rows, counter);
const processed_chunks = await Promise.all(chunk);
const errors = processed_chunks.filter(r => !r.success);
if (errors.length) {
setDataErrors(p => [
...p,
...errors.map(r => {
return `user_id (${r.userId}) failed to update in index ${
r.index
}: ${String(r.error)}`;
}),
]);
setRowErrors(r => [
...r,
...errors.map(e => data[e.index]! as duckdb.RowData),
]);
}
if (rest.length > 0) {
await delay(delay_between_chunks);
counter += rows.length;
const next = getChunk(rest, chunk_size);
rows = next.rows;
rest = next.data;
} else {
processing.setFalse();
break;
}
}
}
};

const handleErrors = async () => {
if (row_errors.length && log_errors) {
await fs.writeFile(
DEFAULT_ERROR_FILENAME,
JSON.stringify(row_errors, null, 2),
{
encoding: 'utf-8',
},
);
running.setFalse();
} else {
running.setFalse();
}
processing.setFalse();
};

if (!filename?.length) {
return <UhOh text="You must specify a filename." />;
} else if (error?.length) {
return <UhOh text={error} />;
} else if (data && processing.value) {
} else if (data && running.value) {
return (
<>
<ProgressBar
value={Math.floor((counter.count + 1 / data.length) * 100)}
/>
<ProgressBar value={Math.floor((counter.count / data.length) * 100)} />
<Spinner text={`Completed Rows: ${counter.count} / ${data.length}`} />
</>
);
Expand All @@ -131,6 +261,11 @@ export default () => {
{data_errors.map((err, i) => {
return <UhOh key={i} text={err} />;
})}
{log_errors && data_errors.length ? (
<Text>Errors output to {DEFAULT_ERROR_FILENAME}</Text>
) : (
<></>
)}
</Box>
);
}
Expand Down
1 change: 0 additions & 1 deletion source/components/Version.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const Version = () => {
}, []);

const getVersion = async () => {
console.log('getVersion');
try {
const exc = await execa(
'npm',
Expand Down
4 changes: 4 additions & 0 deletions source/mappings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ mappings.set('users:bulk', {
value:
'Add all users to the specified tenant. Accepts comma-separated list. Note this will not automatically create the tenant, but the tenant memberships will exist and sending to this tenant_id will still succeed. ',
},
{
option: '--errors',
value: 'Output errors',
},
],
example: [
`courier users:bulk examples/users.csv --replace`,
Expand Down

0 comments on commit b6b7c62

Please sign in to comment.