Skip to content

Commit

Permalink
refactor(nginx-cluster-controller): update logic and dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
kahirokunn committed Jun 18, 2024
1 parent 3073c12 commit 607b6d5
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"author": "",
"license": "MIT",
"dependencies": {
"@kubekit/client": "0.2.11",
"@kubekit/client": "0.2.17",
"object-hash": "^3.0.0"
},
"devDependencies": {
Expand Down
16 changes: 14 additions & 2 deletions examples/nginx-cluster-controller-with-kubekit-sync/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ async function main() {
let stop = false
while (!stop) {
const controller = new AbortController()
controller.signal.addEventListener('abort', () => {
stop = true
})
;['SIGTERM', 'SIGINT'].forEach((signalName) => {
process.once(signalName, () => {
console.log(`${signalName} received.`)
Expand All @@ -32,9 +35,9 @@ async function main() {
await becameReaderPromise
console.debug('[main] Became leader')

await nginxClusterController(state, controller.signal)
await nginxClusterController(state, controller)
} catch (e: any) {
abortErrorHandler(e)
stop = isAbortError(e)
}
}
}
Expand All @@ -46,3 +49,12 @@ function abortErrorHandler(e: any) {
}
throw e
}

function isAbortError(e: unknown): e is Error {
return (
e !== null &&
typeof e === 'object' &&
'name' in e &&
e.name === 'AbortError'
)
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
import { TaskManager } from '@kubekit/client'
import {
listCoreV1PodForAllNamespaces,
} from './core-v1'
import {
listKubekitComV1NginxClusterForAllNamespaces,
} from './kubekit-v1'
import {
type NginxCluster,
type Pods,
type ReconcileNginxClusterContext,
labelKey,
} from './type'
import { TaskManager, isKubernetesError } from '@kubekit/client'
import { listCoreV1PodForAllNamespaces } from './core-v1'
import { listKubekitComV1NginxClusterForAllNamespaces } from './kubekit-v1'
import { type NginxCluster, type Pods, podLabelKey } from './type'
import { reconcileNginxCluster } from './reconcileNginxCluster'

type NginxClusters = Map<string, NginxCluster>
Expand All @@ -31,22 +22,9 @@ export function getInitialState(): ControllerState {

export async function nginxClusterController(
state: ControllerState,
signal: AbortSignal
abortController: AbortController
) {
const controllerCtx: Record<string, ReconcileNginxClusterContext> = {}
const { nginxClusters, pods } = state
const getControllerCtx = (key: string) => {
if (!controllerCtx[key]) {
controllerCtx[key] = {
pendingCreate: 0,
}
}
return controllerCtx[key]
}
const deleteControllerCtx = (key: string) => {
delete controllerCtx[key]
}

const taskMng = new TaskManager()
taskMng.pause()

Expand All @@ -55,133 +33,118 @@ export async function nginxClusterController(
pods: false,
}

const signal = abortController.signal
signal.addEventListener('abort', () => {
taskMng.pause()
})

const isAllSynced = () => syncedState.nginxClusters && syncedState.pods

await Promise.all([
listKubekitComV1NginxClusterForAllNamespaces(
{
watch: true,
sendInitialEvents: true,
resourceVersionMatch: 'NotOlderThan',
resourceVersion: state.nginxClusterResourceVersion,
},
{
syncedHandler: async () => {
syncedState.nginxClusters = true
if (isAllSynced()) {
taskMng.resume()
}
try {
await Promise.all([
listKubekitComV1NginxClusterForAllNamespaces(
{
watch: true,
sendInitialEvents: true,
resourceVersionMatch: 'NotOlderThan',
resourceVersion: state.nginxClusterResourceVersion,
},
watchHandler: async ({ object: nginxCluster, type }) => {
const key = `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`
if (type === 'DELETED') {
nginxClusters.delete(key)
deleteControllerCtx(
`${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`
)
return
}
{
syncedHandler: async () => {
syncedState.nginxClusters = true
if (isAllSynced()) {
taskMng.resume()
}
},
watchHandler: async ({ object: nginxCluster, type }) => {
const key = `${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`
if (type === 'DELETED') {
nginxClusters.delete(key)
return
}

nginxClusters.set(key, nginxCluster)
nginxClusters.set(key, nginxCluster)

taskMng.addTask({
key: TaskManager.getKey(nginxCluster),
task: () =>
reconcileNginxCluster(
nginxCluster,
pods,
getControllerCtx(
`${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`
)
),
})
taskMng.addTask({
key: TaskManager.getKey(nginxCluster),
task: () => reconcileNginxCluster(nginxCluster, pods),
})
},
signal,
maxRetries: Infinity,
onError: () => {
syncedState.nginxClusters = false
},
}
),
listCoreV1PodForAllNamespaces(
{
watch: true,
sendInitialEvents: true,
resourceVersionMatch: 'NotOlderThan',
resourceVersion: state.podResourceVersion,
labelSelector: podLabelKey,
},
signal,
maxRetries: Infinity,
onError: () => {
syncedState.nginxClusters = false
},
}
),
listCoreV1PodForAllNamespaces(
{
watch: true,
sendInitialEvents: true,
resourceVersionMatch: 'NotOlderThan',
resourceVersion: state.podResourceVersion,
labelSelector: labelKey,
},
{
syncedHandler: async () => {
syncedState.pods = true
if (isAllSynced()) {
taskMng.resume()
}
},
watchHandler: async ({ object: pod, type }, ctx) => {
state.podResourceVersion = ctx.resourceVersion
{
syncedHandler: async () => {
syncedState.pods = true
if (isAllSynced()) {
taskMng.resume()
}
},
watchHandler: async ({ object: pod, type }, ctx) => {
state.podResourceVersion = ctx.resourceVersion

const nginxClusterName = pod.metadata.labels?.[labelKey]
if (!nginxClusterName) {
throw Error(`${labelKey} label is missing from the pod metadata.`)
}
const nginxClusterName = pod.metadata.labels?.[podLabelKey]
if (!nginxClusterName) {
throw Error(
`${podLabelKey} label is missing from the pod metadata.`
)
}

const nginxCluster = [...nginxClusters.values()].find(
(n) => n.metadata.name === nginxClusterName
)
const nginxCluster = [...nginxClusters.values()].find(
(n) => n.metadata.name === nginxClusterName
)

const podKey = `${pod.metadata.namespace}/${pod.metadata.name}`
switch (type) {
case 'DELETED':
const podKey = `${pod.metadata.namespace}/${pod.metadata.name}`
if (type === 'DELETED') {
pods.delete(podKey)
break
case 'ADDED':
if (nginxCluster) {
const myCtx = getControllerCtx(
`${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`
)
myCtx.pendingCreate--
if (myCtx.pendingCreate < 0) {
myCtx.pendingCreate = 0
}
}
case 'MODIFIED':
} else {
pods.set(podKey, {
metadata: pod.metadata,
})
break
}

if (!nginxCluster) {
if (syncedState.nginxClusters) {
console.warn(
`[WARN] nginxCluster with name ${nginxClusterName} does not exist.`
)
}
return
}
taskMng.addTask({
key: TaskManager.getKey(nginxCluster),
task: () =>
reconcileNginxCluster(
nginxCluster,
pods,
getControllerCtx(
`${nginxCluster.metadata.namespace}/${nginxCluster.metadata.name}`

if (!nginxCluster) {
if (syncedState.nginxClusters) {
console.warn(
`[WARN] nginxCluster with name ${nginxClusterName} does not exist.`
)
),
})
},
signal,
maxRetries: Infinity,
onError: () => {
syncedState.pods = false
},
}
return
}
taskMng.addTask({
key: TaskManager.getKey(nginxCluster),
task: () => reconcileNginxCluster(nginxCluster, pods),
})
},
signal,
maxRetries: Infinity,
onError: () => {
syncedState.pods = false
},
}
),
])
} catch (e) {
if (isKubernetesError(e)) {
if (e.reason === 'Invalid') {
return abortController.abort()
} else {
console.error('KubernetesError:', e)
}
),
])
} else {
console.error('An unknown error occurred:', e)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import { sleep } from '@kubekit/client'
import { createCoreV1NamespacedPod, deleteCoreV1NamespacedPod } from './core-v1'
import { patchKubekitComV1NamespacedNginxClusterStatus } from './kubekit-v1'
import {
type NginxCluster,
type Pods,
type ReconcileNginxClusterContext,
controllerName,
labelKey,
podLabelKey,
} from './type'
import hash from 'object-hash'

export async function reconcileNginxCluster(
nginxCluster: NginxCluster,
pods: Pods,
ctx: ReconcileNginxClusterContext
pods: Pods
) {
console.debug({
message: '[Reconcile] Start Reconcile',
name: nginxCluster.metadata.name,
namespace: nginxCluster.metadata.namespace,
replicas: nginxCluster.spec?.replicas,
})
const nginxClusterHash = hash(nginxCluster.spec!.resources!)
const managedPods = [...pods.values()].filter(
(pod) =>
pod.metadata.labels?.[labelKey] === nginxCluster.metadata.name &&
pod.metadata.labels?.[podLabelKey] === nginxCluster.metadata.name &&
!pod.metadata.deletionTimestamp
)
const validPods = managedPods.filter(
Expand All @@ -34,15 +34,11 @@ export async function reconcileNginxCluster(
pod.metadata.annotations?.['nginx-cluster-hash'] !== nginxClusterHash
)
let createPodNum =
nginxCluster.spec!.replicas! -
validPods.length +
ctx.pendingCreate -
invalidPods.length
nginxCluster.spec!.replicas! - validPods.length + invalidPods.length
const tasks: Promise<unknown>[] = []
const namespace = nginxCluster.metadata.namespace
if (createPodNum > 0) {
for (let i = 0; i < createPodNum; i++) {
ctx.pendingCreate++
tasks.push(
createCoreV1NamespacedPod({
namespace,
Expand All @@ -68,7 +64,7 @@ export async function reconcileNginxCluster(
},
labels: {
...nginxCluster.metadata.labels,
[labelKey]: nginxCluster.metadata.name,
[podLabelKey]: nginxCluster.metadata.name,
},
namespace,
},
Expand Down Expand Up @@ -134,5 +130,14 @@ export async function reconcileNginxCluster(
})
}

console.debug('[DEBUG] Successful Reconcile')
if (tasks.length) {
await sleep(500)
}

console.debug({
message: '[DEBUG] Successful Reconcile',
name: nginxCluster.metadata.name,
namespace: nginxCluster.metadata.namespace,
replicas: nginxCluster.spec?.replicas,
})
}
Loading

0 comments on commit 607b6d5

Please sign in to comment.