Skip to content

Commit

Permalink
feat(kubekit-client): use valibot
Browse files Browse the repository at this point in the history
  • Loading branch information
kahirokunn committed Jun 27, 2024
1 parent 8b84e65 commit 38d30cf
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 71 deletions.
5 changes: 3 additions & 2 deletions packages/kubekit-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kubekit/client",
"version": "0.2.23",
"version": "0.2.26",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"author": "kahirokunn",
Expand Down Expand Up @@ -42,7 +42,8 @@
"openid-client": "^5.6.5",
"request": "^2.88.2",
"rfc4648": "^1.5.3",
"undici": "^5.28.3"
"undici": "^5.28.3",
"valibot": "^0.34.0"
},
"husky": {
"hooks": {
Expand Down
39 changes: 26 additions & 13 deletions packages/kubekit-client/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ import { Agent } from 'undici';
import { ReadableStream, TransformStream } from 'node:stream/web';
import { type ObjectReference } from '../lib/types';
import { KubeConfig } from '../lib/config';
import { KubernetesError, isAlreadyExists, isKubernetesError, isTooLargeResourceVersion } from '../lib/error';
import {
type KubernetesStatus,
ErrorWatchObjectSchema,
isTooLargeResourceVersionKubernetesStatus,
isAlreadyExistsKubernetesStatus,
} from '../lib/error';
import { sleep } from '../lib/sleep';
export { sleep } from '../lib/sleep';
export { TaskManager } from '../lib/task_manager';
import * as v from 'valibot';

type Id<T> = {
[K in keyof T]: T[K];
Expand Down Expand Up @@ -45,9 +51,10 @@ function removeNullableProperties<T extends Record<string, unknown | undefined>
* @param attempt - Current attempt
* @param maxRetries - Maximum number of retries
*/
async function defaultBackoff(attempt: number, maxRetries: number, error: unknown | KubernetesError) {
if (isKubernetesError(error) && 'retryAfterSeconds' in error.details) {
await sleep(error.details.retryAfterSeconds * 1000);
async function defaultBackoff(attempt: number, maxRetries: number, error: unknown | KubernetesStatus) {
const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
if (errorWatchObject.success && errorWatchObject.output.object.details?.retryAfterSeconds) {
await sleep(errorWatchObject.output.object.details.retryAfterSeconds * 1000);
return;
}
const attempts = Math.min(attempt, maxRetries);
Expand Down Expand Up @@ -135,7 +142,7 @@ type FinalizerEvent<T extends K8sObj> = {
};
type InnerWatchEvent<T> = { type: InnerWatchEventType; object: T };
export type WatchExtraOptions<T extends K8sListResponse<K8sObj>> = {
onError?: (err: unknown | KubernetesError) => void | Promise<void>;
onError?: (err: unknown | KubernetesStatus) => void | Promise<void>;
watchHandler: (e: WatchEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
finalizeHandler?: (e: FinalizerEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
syncedHandler?: (ctx: Context) => MaybePromise<unknown>;
Expand All @@ -153,7 +160,11 @@ export const defaultRetryCondition: RetryConditionFunction = ({ ...object }) =>
return false;
}

if (isKubernetesError(error) && (isTooLargeResourceVersion(error) || isAlreadyExists(error))) {
const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
if (
errorWatchObject.success &&
(isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) || isAlreadyExistsKubernetesStatus(errorWatchObject.output.object))
) {
return false;
}

Expand Down Expand Up @@ -252,7 +263,7 @@ export async function apiClient<Response>(
allowWatchBookmarks: true,
};
}
baseUrl += (baseUrl.includes('?') ? '&' : '?') + toSearchParameters(params);
baseUrl += (baseUrl.includes('?') ? '&' : '?') + searchParameters;
}
const url = new URL(baseUrl);
if (httpsOptions.port) {
Expand Down Expand Up @@ -296,6 +307,9 @@ export async function apiClient<Response>(
const isJsonResponse = contentType?.includes('application/json') ?? false;

if (isSuccess && isJsonResponse) {
if (!isWatch && response.body) {
return (await response.json()) as Response;
}
if (isWatch && response.body) {
const {
watchHandler,
Expand Down Expand Up @@ -357,7 +371,7 @@ export async function apiClient<Response>(
for await (const k8sObj of (response.body as ReadableStream<Uint8Array>).pipeThrough(
new JsonStream<InnerWatchEvent<K8sObj>>()
)) {
if (isKubernetesError(k8sObj) || k8sObj.type === 'ERROR') {
if (k8sObj.type === 'ERROR') {
throw k8sObj;
}

Expand All @@ -371,7 +385,6 @@ export async function apiClient<Response>(
continue;
}

ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) {
await finalizeHandler(k8sObj as any, {
...ctx,
Expand All @@ -386,7 +399,6 @@ export async function apiClient<Response>(
clearInterval(intervalId);
}
}
return (await response.json()) as Response;
}

const text = await response.text();
Expand All @@ -407,11 +419,12 @@ export async function apiClient<Response>(

await options.onError(error);

const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
// When Invalid, it will not pass no matter how many times it is re-run, so it is terminated without retry.
if (isKubernetesError(error) && error.reason === 'Invalid') {
if (errorWatchObject.success && errorWatchObject.output.object.reason === 'Invalid') {
if (
error.details.causes.some((cause) =>
cause.message.includes(
errorWatchObject.output.object.details?.causes?.some((cause) =>
cause?.message?.includes(
'sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled'
)
)
Expand Down
131 changes: 75 additions & 56 deletions packages/kubekit-client/src/lib/error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as v from 'valibot';

/**
* Sample KubernetesError object:
* Sample Status object:
* {
* "kind": "Status",
* "apiVersion": "v1",
Expand All @@ -20,68 +22,85 @@
* },
* "code": 422
* }
*
* {
* kind: 'Status',
* apiVersion: 'v1',
* metadata: {},
* status: 'Failure',
* message: 'secrets "pj-01j0qhebjej5xrdcrqhwrvgbdt-repo" already exists',
* reason: 'AlreadyExists',
* details: { name: 'pj-01j0qhebjej5xrdcrqhwrvgbdt-repo', kind: 'secrets' },
* code: 409
* }
*/
export type KubernetesError = {
kind: 'Status';
apiVersion: string;
metadata: Record<string, unknown>;
status: string;
message: string;
reason: string;
details: {
causes: {
reason: string;
message: string;
field: string;
}[];
} & (
| {
group: string;
kind: string;
}
| {
retryAfterSeconds: number;
}
);
code: number;
};

export function isKubernetesError(obj: unknown): obj is KubernetesError {
if (typeof obj !== 'object' || obj === null) return false;

const error = obj as Record<string, unknown>;
export const StatusSchema = v.looseObject({
kind: v.literal('Status'),
apiVersion: v.literal('v1'),
metadata: v.optional(
v.looseObject({
resourceVersion: v.optional(v.string()),
continue: v.optional(v.string()),
})
),
status: v.optional(v.union([v.literal('Success'), v.literal('Failure')])),
message: v.optional(v.string()),
reason: v.optional(
v.union([
v.literal('Unknown'),
v.literal('Unauthorized'),
v.literal('Forbidden'),
v.literal('NotFound'),
v.literal('AlreadyExists'),
v.literal('Conflict'),
v.literal('Gone'),
v.literal('Invalid'),
v.literal('ServerTimeout'),
v.literal('NotAcceptable'),
])
),
details: v.optional(
v.looseObject({
name: v.optional(v.string()),
group: v.optional(v.string()),
kind: v.optional(v.string()),
uid: v.optional(v.string()),
causes: v.optional(
v.array(
v.looseObject({
type: v.optional(v.string()),
message: v.optional(v.string()),
field: v.optional(v.string()),
})
)
),
retryAfterSeconds: v.optional(v.number()),
})
),
code: v.optional(v.number()),
});

if (error.kind !== 'Status') return false;
if (typeof error.apiVersion !== 'string') return false;
if (typeof error.metadata !== 'object' || error.metadata === null) return false;
if (typeof error.status !== 'string') return false;
if (typeof error.message !== 'string') return false;
if (typeof error.reason !== 'string') return false;
if (typeof error.details !== 'object' || error.details === null) return false;
export const ErrorWatchObjectSchema = v.looseObject({
type: v.union([v.literal('ERROR')]),
object: StatusSchema,
});

const details = error.details as Record<string, unknown>;
if (typeof details.group !== 'string') return false;
if (typeof details.kind !== 'string') return false;
if (!Array.isArray(details.causes)) return false;
export type KubernetesStatus = v.InferInput<typeof StatusSchema>;

for (const cause of details.causes) {
if (typeof cause !== 'object' || cause === null) return false;

const causeObj = cause as Record<string, unknown>;
if (typeof causeObj.reason !== 'string') return false;
if (typeof causeObj.message !== 'string') return false;
if (typeof causeObj.field !== 'string') return false;
}

if (typeof error.code !== 'number') return false;
export function isTooLargeResourceVersionKubernetesStatus(status: KubernetesStatus) {
return status.message?.includes('Too large resource version');
}

return true;
export function isAlreadyExistsKubernetesStatus(err: KubernetesStatus) {
return err.reason === 'AlreadyExists';
}

export function isTooLargeResourceVersion(err: KubernetesError) {
return err.message.includes('Too large resource version');
export function isTooLargeResourceVersion(err: unknown) {
const result = v.safeParse(StatusSchema, err)
return result.success && isTooLargeResourceVersionKubernetesStatus(result.output)
}

export function isAlreadyExists(err: KubernetesError) {
return err.reason === 'AlreadyExists';
export function isAlreadyExists(err: unknown) {
const result = v.safeParse(StatusSchema, err)
return result.success && isAlreadyExistsKubernetesStatus(result.output)
}

0 comments on commit 38d30cf

Please sign in to comment.