From 38d30cfa2436126e244dabdaca985f2e3ce64ba0 Mon Sep 17 00:00:00 2001
From: kahirokunn <okinakahiro@gmail.com>
Date: Tue, 25 Jun 2024 13:46:30 +0900
Subject: [PATCH] feat(kubekit-client): use valibot

---
 packages/kubekit-client/package.json        |   5 +-
 packages/kubekit-client/src/client/index.ts |  39 ++++--
 packages/kubekit-client/src/lib/error.ts    | 131 +++++++++++---------
 3 files changed, 104 insertions(+), 71 deletions(-)

diff --git a/packages/kubekit-client/package.json b/packages/kubekit-client/package.json
index b3d81c92..9df7e69f 100644
--- a/packages/kubekit-client/package.json
+++ b/packages/kubekit-client/package.json
@@ -1,6 +1,6 @@
 {
   "name": "@kubekit/client",
-  "version": "0.2.23",
+  "version": "0.2.26",
   "main": "dist/index.js",
   "types": "dist/index.d.ts",
   "author": "kahirokunn",
@@ -42,7 +42,8 @@
     "openid-client": "^5.6.5",
     "request": "^2.88.2",
     "rfc4648": "^1.5.3",
-    "undici": "^5.28.3"
+    "undici": "^5.28.3",
+    "valibot": "^0.34.0"
   },
   "husky": {
     "hooks": {
diff --git a/packages/kubekit-client/src/client/index.ts b/packages/kubekit-client/src/client/index.ts
index de5c466f..d804ae0a 100644
--- a/packages/kubekit-client/src/client/index.ts
+++ b/packages/kubekit-client/src/client/index.ts
@@ -3,10 +3,16 @@ import { Agent } from 'undici';
 import { ReadableStream, TransformStream } from 'node:stream/web';
 import { type ObjectReference } from '../lib/types';
 import { KubeConfig } from '../lib/config';
-import { KubernetesError, isAlreadyExists, isKubernetesError, isTooLargeResourceVersion } from '../lib/error';
+import {
+  type KubernetesStatus,
+  ErrorWatchObjectSchema,
+  isTooLargeResourceVersionKubernetesStatus,
+  isAlreadyExistsKubernetesStatus,
+} from '../lib/error';
 import { sleep } from '../lib/sleep';
 export { sleep } from '../lib/sleep';
 export { TaskManager } from '../lib/task_manager';
+import * as v from 'valibot';
 
 type Id<T> = {
   [K in keyof T]: T[K];
@@ -45,9 +51,10 @@ function removeNullableProperties<T extends Record<string, unknown | undefined>
  * @param attempt - Current attempt
  * @param maxRetries - Maximum number of retries
  */
-async function defaultBackoff(attempt: number, maxRetries: number, error: unknown | KubernetesError) {
-  if (isKubernetesError(error) && 'retryAfterSeconds' in error.details) {
-    await sleep(error.details.retryAfterSeconds * 1000);
+async function defaultBackoff(attempt: number, maxRetries: number, error: unknown | KubernetesStatus) {
+  const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
+  if (errorWatchObject.success && errorWatchObject.output.object.details?.retryAfterSeconds) {
+    await sleep(errorWatchObject.output.object.details.retryAfterSeconds * 1000);
     return;
   }
   const attempts = Math.min(attempt, maxRetries);
@@ -135,7 +142,7 @@ type FinalizerEvent<T extends K8sObj> = {
 };
 type InnerWatchEvent<T> = { type: InnerWatchEventType; object: T };
 export type WatchExtraOptions<T extends K8sListResponse<K8sObj>> = {
-  onError?: (err: unknown | KubernetesError) => void | Promise<void>;
+  onError?: (err: unknown | KubernetesStatus) => void | Promise<void>;
   watchHandler: (e: WatchEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
   finalizeHandler?: (e: FinalizerEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
   syncedHandler?: (ctx: Context) => MaybePromise<unknown>;
@@ -153,7 +160,11 @@ export const defaultRetryCondition: RetryConditionFunction = ({ ...object }) =>
     return false;
   }
 
-  if (isKubernetesError(error) && (isTooLargeResourceVersion(error) || isAlreadyExists(error))) {
+  const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
+  if (
+    errorWatchObject.success &&
+    (isTooLargeResourceVersionKubernetesStatus(errorWatchObject.output.object) || isAlreadyExistsKubernetesStatus(errorWatchObject.output.object))
+  ) {
     return false;
   }
 
@@ -252,7 +263,7 @@ export async function apiClient<Response>(
         allowWatchBookmarks: true,
       };
     }
-    baseUrl += (baseUrl.includes('?') ? '&' : '?') + toSearchParameters(params);
+    baseUrl += (baseUrl.includes('?') ? '&' : '?') + searchParameters;
   }
   const url = new URL(baseUrl);
   if (httpsOptions.port) {
@@ -296,6 +307,9 @@ export async function apiClient<Response>(
       const isJsonResponse = contentType?.includes('application/json') ?? false;
 
       if (isSuccess && isJsonResponse) {
+        if (!isWatch && response.body) {
+          return (await response.json()) as Response;
+        }
         if (isWatch && response.body) {
           const {
             watchHandler,
@@ -357,7 +371,7 @@ export async function apiClient<Response>(
             for await (const k8sObj of (response.body as ReadableStream<Uint8Array>).pipeThrough(
               new JsonStream<InnerWatchEvent<K8sObj>>()
             )) {
-              if (isKubernetesError(k8sObj) || k8sObj.type === 'ERROR') {
+              if (k8sObj.type === 'ERROR') {
                 throw k8sObj;
               }
 
@@ -371,7 +385,6 @@ export async function apiClient<Response>(
                 continue;
               }
 
-              ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
               if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) {
                 await finalizeHandler(k8sObj as any, {
                   ...ctx,
@@ -386,7 +399,6 @@ export async function apiClient<Response>(
             clearInterval(intervalId);
           }
         }
-        return (await response.json()) as Response;
       }
 
       const text = await response.text();
@@ -407,11 +419,12 @@ export async function apiClient<Response>(
 
       await options.onError(error);
 
+      const errorWatchObject = v.safeParse(ErrorWatchObjectSchema, error);
       // When Invalid, it will not pass no matter how many times it is re-run, so it is terminated without retry.
-      if (isKubernetesError(error) && error.reason === 'Invalid') {
+      if (errorWatchObject.success && errorWatchObject.output.object.reason === 'Invalid') {
         if (
-          error.details.causes.some((cause) =>
-            cause.message.includes(
+          errorWatchObject.output.object.details?.causes?.some((cause) =>
+            cause?.message?.includes(
               'sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled'
             )
           )
diff --git a/packages/kubekit-client/src/lib/error.ts b/packages/kubekit-client/src/lib/error.ts
index 8c0cc8fe..c1238f30 100644
--- a/packages/kubekit-client/src/lib/error.ts
+++ b/packages/kubekit-client/src/lib/error.ts
@@ -1,5 +1,7 @@
+import * as v from 'valibot';
+
 /**
- * Sample KubernetesError object:
+ * Sample Status object:
  * {
  *   "kind": "Status",
  *   "apiVersion": "v1",
@@ -20,68 +22,85 @@
  *   },
  *   "code": 422
  * }
+ *
+ * {
+ *   kind: 'Status',
+ *   apiVersion: 'v1',
+ *   metadata: {},
+ *   status: 'Failure',
+ *   message: 'secrets "pj-01j0qhebjej5xrdcrqhwrvgbdt-repo" already exists',
+ *   reason: 'AlreadyExists',
+ *   details: { name: 'pj-01j0qhebjej5xrdcrqhwrvgbdt-repo', kind: 'secrets' },
+ *   code: 409
+ * }
  */
-export type KubernetesError = {
-  kind: 'Status';
-  apiVersion: string;
-  metadata: Record<string, unknown>;
-  status: string;
-  message: string;
-  reason: string;
-  details: {
-    causes: {
-      reason: string;
-      message: string;
-      field: string;
-    }[];
-  } & (
-    | {
-        group: string;
-        kind: string;
-      }
-    | {
-        retryAfterSeconds: number;
-      }
-  );
-  code: number;
-};
-
-export function isKubernetesError(obj: unknown): obj is KubernetesError {
-  if (typeof obj !== 'object' || obj === null) return false;
-
-  const error = obj as Record<string, unknown>;
+export const StatusSchema = v.looseObject({
+  kind: v.literal('Status'),
+  apiVersion: v.literal('v1'),
+  metadata: v.optional(
+    v.looseObject({
+      resourceVersion: v.optional(v.string()),
+      continue: v.optional(v.string()),
+    })
+  ),
+  status: v.optional(v.union([v.literal('Success'), v.literal('Failure')])),
+  message: v.optional(v.string()),
+  reason: v.optional(
+    v.union([
+      v.literal('Unknown'),
+      v.literal('Unauthorized'),
+      v.literal('Forbidden'),
+      v.literal('NotFound'),
+      v.literal('AlreadyExists'),
+      v.literal('Conflict'),
+      v.literal('Gone'),
+      v.literal('Invalid'),
+      v.literal('ServerTimeout'),
+      v.literal('NotAcceptable'),
+    ])
+  ),
+  details: v.optional(
+    v.looseObject({
+      name: v.optional(v.string()),
+      group: v.optional(v.string()),
+      kind: v.optional(v.string()),
+      uid: v.optional(v.string()),
+      causes: v.optional(
+        v.array(
+          v.looseObject({
+            type: v.optional(v.string()),
+            message: v.optional(v.string()),
+            field: v.optional(v.string()),
+          })
+        )
+      ),
+      retryAfterSeconds: v.optional(v.number()),
+    })
+  ),
+  code: v.optional(v.number()),
+});
 
-  if (error.kind !== 'Status') return false;
-  if (typeof error.apiVersion !== 'string') return false;
-  if (typeof error.metadata !== 'object' || error.metadata === null) return false;
-  if (typeof error.status !== 'string') return false;
-  if (typeof error.message !== 'string') return false;
-  if (typeof error.reason !== 'string') return false;
-  if (typeof error.details !== 'object' || error.details === null) return false;
+export const ErrorWatchObjectSchema = v.looseObject({
+  type: v.union([v.literal('ERROR')]),
+  object: StatusSchema,
+});
 
-  const details = error.details as Record<string, unknown>;
-  if (typeof details.group !== 'string') return false;
-  if (typeof details.kind !== 'string') return false;
-  if (!Array.isArray(details.causes)) return false;
+export type KubernetesStatus = v.InferInput<typeof StatusSchema>;
 
-  for (const cause of details.causes) {
-    if (typeof cause !== 'object' || cause === null) return false;
-
-    const causeObj = cause as Record<string, unknown>;
-    if (typeof causeObj.reason !== 'string') return false;
-    if (typeof causeObj.message !== 'string') return false;
-    if (typeof causeObj.field !== 'string') return false;
-  }
-
-  if (typeof error.code !== 'number') return false;
+export function isTooLargeResourceVersionKubernetesStatus(status: KubernetesStatus) {
+  return status.message?.includes('Too large resource version');
+}
 
-  return true;
+export function isAlreadyExistsKubernetesStatus(err: KubernetesStatus) {
+  return err.reason === 'AlreadyExists';
 }
 
-export function isTooLargeResourceVersion(err: KubernetesError) {
-  return err.message.includes('Too large resource version');
+export function isTooLargeResourceVersion(err: unknown) {
+  const result = v.safeParse(StatusSchema, err)
+  return result.success && isTooLargeResourceVersionKubernetesStatus(result.output)
 }
 
-export function isAlreadyExists(err: KubernetesError) {
-  return err.reason === 'AlreadyExists';
+export function isAlreadyExists(err: unknown) {
+  const result = v.safeParse(StatusSchema, err)
+  return result.success && isAlreadyExistsKubernetesStatus(result.output)
 }