diff --git a/packages/kubekit-client/package.json b/packages/kubekit-client/package.json index 9df7e69f..3b8d6916 100644 --- a/packages/kubekit-client/package.json +++ b/packages/kubekit-client/package.json @@ -1,6 +1,6 @@ { "name": "@kubekit/client", - "version": "0.2.26", + "version": "0.2.29", "main": "dist/index.js", "types": "dist/index.d.ts", "author": "kahirokunn", @@ -23,7 +23,8 @@ "test:coverage": "vitest run --coverage" }, "files": [ - "dist" + "dist", + "package.json" ], "devDependencies": { "@types/js-yaml": "^4.0.9", diff --git a/packages/kubekit-client/src/client/index.ts b/packages/kubekit-client/src/client/index.ts index d804ae0a..aa7ba72d 100644 --- a/packages/kubekit-client/src/client/index.ts +++ b/packages/kubekit-client/src/client/index.ts @@ -13,6 +13,7 @@ import { sleep } from '../lib/sleep'; export { sleep } from '../lib/sleep'; export { TaskManager } from '../lib/task_manager'; import * as v from 'valibot'; +import { verboseLog } from '../lib/env'; type Id = { [K in keyof T]: T[K]; @@ -163,7 +164,8 @@ export const defaultRetryCondition: RetryConditionFunction = ({ ...object }) => const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error); if ( errorWatchObject.success && - (isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) || isAlreadyExistsKubernetesStatus(errorWatchObject.output.object)) + (isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) || + isAlreadyExistsKubernetesStatus(errorWatchObject.output.object)) ) { return false; } @@ -286,6 +288,14 @@ export async function apiClient( headers['Content-Type'] = 'application/json'; } + verboseLog({ + message: 'Fetch Parameters', + body: { searchParameters }, + type: 'debug', + level: 8, + path: url.pathname, + }); + let retry = 0; while (true) { try { @@ -307,97 +317,143 @@ export async function apiClient( const isJsonResponse = contentType?.includes('application/json') ?? false; if (isSuccess && isJsonResponse) { - if (!isWatch && response.body) { + if (!response.body) { + throw new Error('Response body is missing.'); + } + if (!isWatch) { + verboseLog({ + message: 'Return Response Immediately', + body: { + options, + headers: { + ...headers, + Authorization: 'Bearer xxxxxx', + }, + searchParameters, + }, + path: url.pathname, + type: 'debug', + level: 8, + }); return (await response.json()) as Response; } - if (isWatch && response.body) { - const { - watchHandler, - syncPeriod = 3_6000_000, - finalizeHandler, - syncedHandler, - } = { - finalizeHandler: () => {}, - syncedHandler: () => {}, - watchHandler: () => {}, - ...extraOptions, - }; - - const ctx: Context = { - isInitialized: false, - resourceVersion: '', - }; - - const listArgs = { - ...arguments_, - params: { - ...arguments_.params, + verboseLog({ + message: 'Start Watch', + body: { + options, + headers: { + ...headers, + // mask + ...(headers.Authorization && { + Authorization: 'Bearer xxxxxx', + }), }, - }; - delete listArgs.params.watch; - const { syncPeriod: __ = 0, ...listExtraOptions } = { ...extraOptions }; - - const intervalId = setInterval(async () => { - const result = (await apiClient>( - listArgs, - listExtraOptions - )) as K8sListResponse; - - result.items.forEach(async (k8sObj) => { - if (k8sObj.metadata.deletionTimestamp && k8sObj.metadata.finalizers?.length) { - await finalizeHandler( - { - type: 'MODIFIED', - object: k8sObj as any, - }, - { - ...ctx, - } - ); - } else { - await watchHandler( - { - type: 'MODIFIED', - object: k8sObj, - }, - { - ...ctx, - } - ); - } - }); - }, syncPeriod); - try { - for await (const k8sObj of (response.body as ReadableStream).pipeThrough( - new JsonStream>() - )) { - if (k8sObj.type === 'ERROR') { - throw k8sObj; - } - - if (k8sObj.type === 'BOOKMARK') { - ctx.resourceVersion = k8sObj.object.metadata.resourceVersion; - if (k8sObj.object.metadata.annotations?.['k8s.io/initial-events-end'] === 'true') { - await syncedHandler({ - ...ctx, - }); + searchParameters, + }, + path: url.pathname, + type: 'debug', + level: 8, + }); + const { + watchHandler, + syncPeriod = 3_6000_000, + finalizeHandler, + syncedHandler, + } = { + finalizeHandler: () => {}, + syncedHandler: () => {}, + watchHandler: () => {}, + ...extraOptions, + }; + + const ctx: Context = { + isInitialized: false, + resourceVersion: '', + }; + verboseLog({ + message: 'Print Current Context', + body: { ctx }, + type: 'debug', + path: url.pathname, + level: 8, + }); + + const listArgs = { + ...arguments_, + params: { + ...arguments_.params, + }, + }; + delete listArgs.params.watch; + const { syncPeriod: __ = 0, ...listExtraOptions } = { ...extraOptions }; + + const intervalId = setInterval(async () => { + const result = (await apiClient>( + listArgs, + listExtraOptions + )) as K8sListResponse; + + result.items.forEach(async (k8sObj) => { + if (k8sObj.metadata.deletionTimestamp && k8sObj.metadata.finalizers?.length) { + await finalizeHandler( + { + type: 'MODIFIED', + object: k8sObj as any, + }, + { + ...ctx, } - continue; - } - - if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) { - await finalizeHandler(k8sObj as any, { + ); + } else { + await watchHandler( + { + type: 'MODIFIED', + object: k8sObj, + }, + { ...ctx, - }); - } else { - await watchHandler(k8sObj as any, { + } + ); + } + }); + }, syncPeriod); + try { + for await (const k8sObj of (response.body as ReadableStream).pipeThrough( + new JsonStream>() + )) { + if (k8sObj.type === 'ERROR') { + verboseLog({ + message: 'throw error', + body: k8sObj, + type: 'debug', + path: url.pathname, + level: 5, + }); + throw k8sObj; + } + + if (k8sObj.type === 'BOOKMARK') { + ctx.resourceVersion = k8sObj.object.metadata.resourceVersion; + if (k8sObj.object.metadata.annotations?.['k8s.io/initial-events-end'] === 'true') { + await syncedHandler({ ...ctx, }); } + continue; + } + + if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) { + await finalizeHandler(k8sObj as any, { + ...ctx, + }); + } else { + await watchHandler(k8sObj as any, { + ...ctx, + }); } - } finally { - clearInterval(intervalId); } + } finally { + clearInterval(intervalId); } } diff --git a/packages/kubekit-client/src/index.ts b/packages/kubekit-client/src/index.ts index 6c98c0b0..524ca1aa 100644 --- a/packages/kubekit-client/src/index.ts +++ b/packages/kubekit-client/src/index.ts @@ -1,3 +1,4 @@ export * from './client'; export * from './lib/wait'; export * from './lib/error'; +export { verboseLog } from './lib/env'; diff --git a/packages/kubekit-client/src/lib/env.ts b/packages/kubekit-client/src/lib/env.ts new file mode 100644 index 00000000..9b46a7ab --- /dev/null +++ b/packages/kubekit-client/src/lib/env.ts @@ -0,0 +1,37 @@ +import * as v from 'valibot'; +import packageJson from '../../package.json'; + +export const env = { + verboseLevel: v.parse( + v.pipe( + v.optional(v.string(), '0'), + v.regex(/[0-9]/, 'must contain a number.'), + v.transform((strNum) => Number(strNum)) + ), + process?.env?.VERBOSE + ), +} as const; + +type Type = 'debug' | 'info' | 'warn'; +type verboseLogParameters = { + message: string; + body: unknown; + type: Type; + path: string; + level: 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9; +}; +export const verboseLog = ({ message, body, type, path, level }: verboseLogParameters) => { + if (env.verboseLevel >= level) { + console[type]( + JSON.stringify({ + package: packageJson.name, + version: packageJson.version, + timestamp: new Date(), + type: type.toUpperCase(), + message, + body, + path, + }) + ); + } +}; diff --git a/packages/kubekit-client/src/lib/error.ts b/packages/kubekit-client/src/lib/error.ts index c1238f30..46d71428 100644 --- a/packages/kubekit-client/src/lib/error.ts +++ b/packages/kubekit-client/src/lib/error.ts @@ -96,11 +96,11 @@ export function isAlreadyExistsKubernetesStatus(err: KubernetesStatus) { } export function isTooLargeResourceVersion(err: unknown) { - const result = v.safeParse(StatusSchema, err) - return result.success && isTooLargeResourceVersionKubernetesStatus(result.output) + const result = v.safeParse(StatusSchema, err); + return result.success && isTooLargeResourceVersionKubernetesStatus(result.output); } export function isAlreadyExists(err: unknown) { - const result = v.safeParse(StatusSchema, err) - return result.success && isAlreadyExistsKubernetesStatus(result.output) + const result = v.safeParse(StatusSchema, err); + return result.success && isAlreadyExistsKubernetesStatus(result.output); }