diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/package.json b/examples/nginx-cluster-controller-with-kubekit-sync/package.json index c89e1e9e..d4265174 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/package.json +++ b/examples/nginx-cluster-controller-with-kubekit-sync/package.json @@ -18,7 +18,7 @@ "author": "", "license": "MIT", "dependencies": { - "@kubekit/client": "0.2.11", + "@kubekit/client": "0.2.17", "object-hash": "^3.0.0" }, "devDependencies": { diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/src/main.ts b/examples/nginx-cluster-controller-with-kubekit-sync/src/main.ts index 81331c2f..e762bfdd 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/src/main.ts +++ b/examples/nginx-cluster-controller-with-kubekit-sync/src/main.ts @@ -9,6 +9,9 @@ async function main() { let stop = false while (!stop) { const controller = new AbortController() + controller.signal.addEventListener('abort', () => { + stop = true + }) ;['SIGTERM', 'SIGINT'].forEach((signalName) => { process.once(signalName, () => { console.log(`${signalName} received.`) @@ -32,9 +35,9 @@ async function main() { await becameReaderPromise console.debug('[main] Became leader') - await nginxClusterController(state, controller.signal) + await nginxClusterController(state, controller) } catch (e: any) { - abortErrorHandler(e) + stop = isAbortError(e) } } } @@ -46,3 +49,12 @@ function abortErrorHandler(e: any) { } throw e } + +function isAbortError(e: unknown): e is Error { + return ( + e !== null && + typeof e === 'object' && + 'name' in e && + e.name === 'AbortError' + ) +} diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/src/nginxClusterController.ts b/examples/nginx-cluster-controller-with-kubekit-sync/src/nginxClusterController.ts index ffaeabe0..2f3ef28a 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/src/nginxClusterController.ts +++ b/examples/nginx-cluster-controller-with-kubekit-sync/src/nginxClusterController.ts @@ -1,16 +1,7 @@ -import { TaskManager } from '@kubekit/client' -import { - listCoreV1PodForAllNamespaces, -} from './core-v1' -import { - listKubekitComV1NginxClusterForAllNamespaces, -} from './kubekit-v1' -import { - type NginxCluster, - type Pods, - type ReconcileNginxClusterContext, - labelKey, -} from './type' +import { TaskManager, isKubernetesError } from '@kubekit/client' +import { listCoreV1PodForAllNamespaces } from './core-v1' +import { listKubekitComV1NginxClusterForAllNamespaces } from './kubekit-v1' +import { type NginxCluster, type Pods, podLabelKey } from './type' import { reconcileNginxCluster } from './reconcileNginxCluster' type NginxClusters = Map @@ -31,22 +22,9 @@ export function getInitialState(): ControllerState { export async function nginxClusterController( state: ControllerState, - signal: AbortSignal + abortController: AbortController ) { - const controllerCtx: Record = {} const { nginxClusters, pods } = state - const getControllerCtx = (key: string) => { - if (!controllerCtx[key]) { - controllerCtx[key] = { - pendingCreate: 0, - } - } - return controllerCtx[key] - } - const deleteControllerCtx = (key: string) => { - delete controllerCtx[key] - } - const taskMng = new TaskManager() taskMng.pause() @@ -55,133 +33,118 @@ export async function nginxClusterController( pods: false, } + const signal = abortController.signal signal.addEventListener('abort', () => { taskMng.pause() }) const isAllSynced = () => syncedState.nginxClusters && syncedState.pods - await Promise.all([ - listKubekitComV1NginxClusterForAllNamespaces( - { - watch: true, - sendInitialEvents: true, - resourceVersionMatch: 'NotOlderThan', - resourceVersion: state.nginxClusterResourceVersion, - }, - { - syncedHandler: async () => { - syncedState.nginxClusters = true - if (isAllSynced()) { - taskMng.resume() - } + try { + await Promise.all([ + listKubekitComV1NginxClusterForAllNamespaces( + { + watch: true, + sendInitialEvents: true, + resourceVersionMatch: 'NotOlderThan', + resourceVersion: state.nginxClusterResourceVersion, }, - watchHandler: async ({ object: nginxCluster, type }) => { - const key = `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` - if (type === 'DELETED') { - nginxClusters.delete(key) - deleteControllerCtx( - `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` - ) - return - } + { + syncedHandler: async () => { + syncedState.nginxClusters = true + if (isAllSynced()) { + taskMng.resume() + } + }, + watchHandler: async ({ object: nginxCluster, type }) => { + const key = `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` + if (type === 'DELETED') { + nginxClusters.delete(key) + return + } - nginxClusters.set(key, nginxCluster) + nginxClusters.set(key, nginxCluster) - taskMng.addTask({ - key: TaskManager.getKey(nginxCluster), - task: () => - reconcileNginxCluster( - nginxCluster, - pods, - getControllerCtx( - `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` - ) - ), - }) + taskMng.addTask({ + key: TaskManager.getKey(nginxCluster), + task: () => reconcileNginxCluster(nginxCluster, pods), + }) + }, + signal, + maxRetries: Infinity, + onError: () => { + syncedState.nginxClusters = false + }, + } + ), + listCoreV1PodForAllNamespaces( + { + watch: true, + sendInitialEvents: true, + resourceVersionMatch: 'NotOlderThan', + resourceVersion: state.podResourceVersion, + labelSelector: podLabelKey, }, - signal, - maxRetries: Infinity, - onError: () => { - syncedState.nginxClusters = false - }, - } - ), - listCoreV1PodForAllNamespaces( - { - watch: true, - sendInitialEvents: true, - resourceVersionMatch: 'NotOlderThan', - resourceVersion: state.podResourceVersion, - labelSelector: labelKey, - }, - { - syncedHandler: async () => { - syncedState.pods = true - if (isAllSynced()) { - taskMng.resume() - } - }, - watchHandler: async ({ object: pod, type }, ctx) => { - state.podResourceVersion = ctx.resourceVersion + { + syncedHandler: async () => { + syncedState.pods = true + if (isAllSynced()) { + taskMng.resume() + } + }, + watchHandler: async ({ object: pod, type }, ctx) => { + state.podResourceVersion = ctx.resourceVersion - const nginxClusterName = pod.metadata.labels?.[labelKey] - if (!nginxClusterName) { - throw Error(`${labelKey} label is missing from the pod metadata.`) - } + const nginxClusterName = pod.metadata.labels?.[podLabelKey] + if (!nginxClusterName) { + throw Error( + `${podLabelKey} label is missing from the pod metadata.` + ) + } - const nginxCluster = [...nginxClusters.values()].find( - (n) => n.metadata.name === nginxClusterName - ) + const nginxCluster = [...nginxClusters.values()].find( + (n) => n.metadata.name === nginxClusterName + ) - const podKey = `${pod.metadata.namespace}/${pod.metadata.name}` - switch (type) { - case 'DELETED': + const podKey = `${pod.metadata.namespace}/${pod.metadata.name}` + if (type === 'DELETED') { pods.delete(podKey) - break - case 'ADDED': - if (nginxCluster) { - const myCtx = getControllerCtx( - `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` - ) - myCtx.pendingCreate-- - if (myCtx.pendingCreate < 0) { - myCtx.pendingCreate = 0 - } - } - case 'MODIFIED': + } else { pods.set(podKey, { metadata: pod.metadata, }) - break - } - - if (!nginxCluster) { - if (syncedState.nginxClusters) { - console.warn( - `[WARN] nginxCluster with name ${nginxClusterName} does not exist.` - ) } - return - } - taskMng.addTask({ - key: TaskManager.getKey(nginxCluster), - task: () => - reconcileNginxCluster( - nginxCluster, - pods, - getControllerCtx( - `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}` + + if (!nginxCluster) { + if (syncedState.nginxClusters) { + console.warn( + `[WARN] nginxCluster with name ${nginxClusterName} does not exist.` ) - ), - }) - }, - signal, - maxRetries: Infinity, - onError: () => { - syncedState.pods = false - }, + } + return + } + taskMng.addTask({ + key: TaskManager.getKey(nginxCluster), + task: () => reconcileNginxCluster(nginxCluster, pods), + }) + }, + signal, + maxRetries: Infinity, + onError: () => { + syncedState.pods = false + }, + } + ), + ]) + } catch (e) { + if (isKubernetesError(e)) { + if (e.reason === 'Invalid') { + return abortController.abort() + } else { + console.error('KubernetesError:', e) } - ), - ]) + } else { + console.error('An unknown error occurred:', e) + } + } } diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/src/reconcileNginxCluster.ts b/examples/nginx-cluster-controller-with-kubekit-sync/src/reconcileNginxCluster.ts index edd8c25d..b0b580cd 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/src/reconcileNginxCluster.ts +++ b/examples/nginx-cluster-controller-with-kubekit-sync/src/reconcileNginxCluster.ts @@ -1,28 +1,28 @@ +import { sleep } from '@kubekit/client' import { createCoreV1NamespacedPod, deleteCoreV1NamespacedPod } from './core-v1' import { patchKubekitComV1NamespacedNginxClusterStatus } from './kubekit-v1' import { type NginxCluster, type Pods, - type ReconcileNginxClusterContext, controllerName, - labelKey, + podLabelKey, } from './type' import hash from 'object-hash' export async function reconcileNginxCluster( nginxCluster: NginxCluster, - pods: Pods, - ctx: ReconcileNginxClusterContext + pods: Pods ) { console.debug({ message: '[Reconcile] Start Reconcile', name: nginxCluster.metadata.name, namespace: nginxCluster.metadata.namespace, + replicas: nginxCluster.spec?.replicas, }) const nginxClusterHash = hash(nginxCluster.spec!.resources!) const managedPods = [...pods.values()].filter( (pod) => - pod.metadata.labels?.[labelKey] === nginxCluster.metadata.name && + pod.metadata.labels?.[podLabelKey] === nginxCluster.metadata.name && !pod.metadata.deletionTimestamp ) const validPods = managedPods.filter( @@ -34,15 +34,11 @@ export async function reconcileNginxCluster( pod.metadata.annotations?.['nginx-cluster-hash'] !== nginxClusterHash ) let createPodNum = - nginxCluster.spec!.replicas! - - validPods.length + - ctx.pendingCreate - - invalidPods.length + nginxCluster.spec!.replicas! - validPods.length + invalidPods.length const tasks: Promise[] = [] const namespace = nginxCluster.metadata.namespace if (createPodNum > 0) { for (let i = 0; i < createPodNum; i++) { - ctx.pendingCreate++ tasks.push( createCoreV1NamespacedPod({ namespace, @@ -68,7 +64,7 @@ export async function reconcileNginxCluster( }, labels: { ...nginxCluster.metadata.labels, - [labelKey]: nginxCluster.metadata.name, + [podLabelKey]: nginxCluster.metadata.name, }, namespace, }, @@ -134,5 +130,14 @@ export async function reconcileNginxCluster( }) } - console.debug('[DEBUG] Successful Reconcile') + if (tasks.length) { + await sleep(500) + } + + console.debug({ + message: '[DEBUG] Successful Reconcile', + name: nginxCluster.metadata.name, + namespace: nginxCluster.metadata.namespace, + replicas: nginxCluster.spec?.replicas, + }) } diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/src/type.ts b/examples/nginx-cluster-controller-with-kubekit-sync/src/type.ts index f2c1428f..da35b136 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/src/type.ts +++ b/examples/nginx-cluster-controller-with-kubekit-sync/src/type.ts @@ -8,7 +8,7 @@ export const leaderName = export const holderIdentity = process.env['POD_NAME'] || crypto.randomUUID() export const controllerNamespace = process.env['POD_NAMESPACE'] || 'default' -export const labelKey = 'nginx-cluster-name' +export const podLabelKey = 'nginx-cluster-name' export type Pods = Map< string, { @@ -16,4 +16,3 @@ export type Pods = Map< } > export type NginxCluster = Strict -export type ReconcileNginxClusterContext = { pendingCreate: number } diff --git a/examples/nginx-cluster-controller-with-kubekit-sync/yarn.lock b/examples/nginx-cluster-controller-with-kubekit-sync/yarn.lock index 22719cec..f64e4cbe 100644 --- a/examples/nginx-cluster-controller-with-kubekit-sync/yarn.lock +++ b/examples/nginx-cluster-controller-with-kubekit-sync/yarn.lock @@ -198,10 +198,10 @@ semver "^7.3.5" swagger2openapi "^7.0.4" -"@kubekit/client@0.2.11": - version "0.2.11" - resolved "https://registry.yarnpkg.com/@kubekit/client/-/client-0.2.11.tgz#b0993e99ba81b5168dfc703733e9b6b5e4d48078" - integrity sha512-RYmOpQRH30WPqNu+iqk/U4f3c4+MTs5/wrfezjv/pONjGCQg/hrBidh90BUfx8YZEAB4R5X8AR4Sqb4aSpKolA== +"@kubekit/client@0.2.17": + version "0.2.17" + resolved "https://registry.yarnpkg.com/@kubekit/client/-/client-0.2.17.tgz#fc0ae8e76de506c3b4609bfaf547891c02093e8c" + integrity sha512-7oZ537MdHcCSayKeSKQay1n/wwnZhIEpab5UAtkm+pVIJm+nQKEs2oy4fNf6v3B93S25S9altXx122HrQsVJTA== dependencies: js-yaml "^4.1.0" jsonpath-plus "^9.0.0"