Skip to content

Commit

Permalink
feat: add verbose log
Browse files Browse the repository at this point in the history
  • Loading branch information
kahirokunn committed Jun 27, 2024
1 parent 38d30cf commit 1479182
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 89 deletions.
5 changes: 3 additions & 2 deletions packages/kubekit-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kubekit/client",
"version": "0.2.26",
"version": "0.2.28",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"author": "kahirokunn",
Expand All @@ -23,7 +23,8 @@
"test:coverage": "vitest run --coverage"
},
"files": [
"dist"
"dist",
"package.json"
],
"devDependencies": {
"@types/js-yaml": "^4.0.9",
Expand Down
204 changes: 121 additions & 83 deletions packages/kubekit-client/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { sleep } from '../lib/sleep';
export { sleep } from '../lib/sleep';
export { TaskManager } from '../lib/task_manager';
import * as v from 'valibot';
import { verboseLog } from '../lib/env';

type Id<T> = {
[K in keyof T]: T[K];
Expand Down Expand Up @@ -163,7 +164,8 @@ export const defaultRetryCondition: RetryConditionFunction = ({ ...object }) =>
const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
if (
errorWatchObject.success &&
(isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) || isAlreadyExistsKubernetesStatus(errorWatchObject.output.object))
(isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) ||
isAlreadyExistsKubernetesStatus(errorWatchObject.output.object))
) {
return false;
}
Expand Down Expand Up @@ -286,6 +288,14 @@ export async function apiClient<Response>(
headers['Content-Type'] = 'application/json';
}

verboseLog({
message: 'Fetch Parameters',
body: { searchParameters },
type: 'debug',
level: 8,
path: url.pathname,
});

let retry = 0;
while (true) {
try {
Expand All @@ -307,97 +317,125 @@ export async function apiClient<Response>(
const isJsonResponse = contentType?.includes('application/json') ?? false;

if (isSuccess && isJsonResponse) {
if (!isWatch && response.body) {
return (await response.json()) as Response;
if (!response.body) {
throw new Error('Response body is missing.');
}
if (isWatch && response.body) {
const {
watchHandler,
syncPeriod = 3_6000_000,
finalizeHandler,
syncedHandler,
} = {
finalizeHandler: () => {},
syncedHandler: () => {},
watchHandler: () => {},
...extraOptions,
};

const ctx: Context = {
isInitialized: false,
resourceVersion: '',
};

const listArgs = {
...arguments_,
params: {
...arguments_.params,
if (!isWatch) {
verboseLog({
message: 'Return Response',
body: {
options,
headers: {
...headers,
Authorization: "Bearer xxxxxx"
},
},
};
delete listArgs.params.watch;
const { syncPeriod: __ = 0, ...listExtraOptions } = { ...extraOptions };

const intervalId = setInterval(async () => {
const result = (await apiClient<K8sListResponse<K8sObj>>(
listArgs,
listExtraOptions
)) as K8sListResponse<K8sObj>;

result.items.forEach(async (k8sObj) => {
if (k8sObj.metadata.deletionTimestamp && k8sObj.metadata.finalizers?.length) {
await finalizeHandler(
{
type: 'MODIFIED',
object: k8sObj as any,
},
{
...ctx,
}
);
} else {
await watchHandler(
{
type: 'MODIFIED',
object: k8sObj,
},
{
...ctx,
}
);
}
});
}, syncPeriod);
try {
for await (const k8sObj of (response.body as ReadableStream<Uint8Array>).pipeThrough(
new JsonStream<InnerWatchEvent<K8sObj>>()
)) {
if (k8sObj.type === 'ERROR') {
throw k8sObj;
}

if (k8sObj.type === 'BOOKMARK') {
ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
if (k8sObj.object.metadata.annotations?.['k8s.io/initial-events-end'] === 'true') {
await syncedHandler({
...ctx,
});
path: url.pathname,
type: 'debug',
level: 8,
});
return (await response.json()) as Response;
}
const {
watchHandler,
syncPeriod = 3_6000_000,
finalizeHandler,
syncedHandler,
} = {
finalizeHandler: () => {},
syncedHandler: () => {},
watchHandler: () => {},
...extraOptions,
};

const ctx: Context = {
isInitialized: false,
resourceVersion: '',
};
verboseLog({
message: 'Print Current Context',
body: { ctx },
type: 'debug',
path: url.pathname,
level: 8,
});

const listArgs = {
...arguments_,
params: {
...arguments_.params,
},
};
delete listArgs.params.watch;
const { syncPeriod: __ = 0, ...listExtraOptions } = { ...extraOptions };

const intervalId = setInterval(async () => {
const result = (await apiClient<K8sListResponse<K8sObj>>(
listArgs,
listExtraOptions
)) as K8sListResponse<K8sObj>;

result.items.forEach(async (k8sObj) => {
if (k8sObj.metadata.deletionTimestamp && k8sObj.metadata.finalizers?.length) {
await finalizeHandler(
{
type: 'MODIFIED',
object: k8sObj as any,
},
{
...ctx,
}
continue;
}

if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) {
await finalizeHandler(k8sObj as any, {
);
} else {
await watchHandler(
{
type: 'MODIFIED',
object: k8sObj,
},
{
...ctx,
});
} else {
await watchHandler(k8sObj as any, {
}
);
}
});
}, syncPeriod);
try {
for await (const k8sObj of (response.body as ReadableStream<Uint8Array>).pipeThrough(
new JsonStream<InnerWatchEvent<K8sObj>>()
)) {
if (k8sObj.type === 'ERROR') {
verboseLog({
message: 'throw error',
body: k8sObj,
type: 'debug',
path: url.pathname,
level: 5,
});
throw k8sObj;
}

if (k8sObj.type === 'BOOKMARK') {
ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
if (k8sObj.object.metadata.annotations?.['k8s.io/initial-events-end'] === 'true') {
await syncedHandler({
...ctx,
});
}
continue;
}

if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) {
await finalizeHandler(k8sObj as any, {
...ctx,
});
} else {
await watchHandler(k8sObj as any, {
...ctx,
});
}
} finally {
clearInterval(intervalId);
}
} finally {
clearInterval(intervalId);
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/kubekit-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './client';
export * from './lib/wait';
export * from './lib/error';
export { verboseLog } from './lib/env';
37 changes: 37 additions & 0 deletions packages/kubekit-client/src/lib/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import * as v from 'valibot';
import packageJson from '../../package.json';

export const env = {
verboseLevel: v.parse(
v.pipe(
v.optional(v.string(), '0'),
v.regex(/[0-9]/, 'must contain a number.'),
v.transform((strNum) => Number(strNum))
),
process?.env?.VERBOSE
),
} as const;

type Type = 'debug' | 'info' | 'warn';
type verboseLogParameters = {
message: string;
body: unknown;
type: Type;
path: string;
level: 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9;
};
export const verboseLog = ({ message, body, type, path, level }: verboseLogParameters) => {
if (env.verboseLevel >= level) {
console[type](
JSON.stringify({
package: packageJson.name,
version: packageJson.version,
timestamp: new Date(),
type: type.toUpperCase(),
message,
body,
path,
})
);
}
};
8 changes: 4 additions & 4 deletions packages/kubekit-client/src/lib/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ export function isAlreadyExistsKubernetesStatus(err: KubernetesStatus) {
}

export function isTooLargeResourceVersion(err: unknown) {
const result = v.safeParse(StatusSchema, err)
return result.success && isTooLargeResourceVersionKubernetesStatus(result.output)
const result = v.safeParse(StatusSchema, err);
return result.success && isTooLargeResourceVersionKubernetesStatus(result.output);
}

export function isAlreadyExists(err: unknown) {
const result = v.safeParse(StatusSchema, err)
return result.success && isAlreadyExistsKubernetesStatus(result.output)
const result = v.safeParse(StatusSchema, err);
return result.success && isAlreadyExistsKubernetesStatus(result.output);
}

0 comments on commit 1479182

Please sign in to comment.