Skip to content

Commit

Permalink
feat(@kubekit/client): support finalizeHandler and initializedHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
kahirokunn committed May 17, 2024
1 parent 299953f commit 1f3d1a4
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 94 deletions.
2 changes: 1 addition & 1 deletion packages/kubekit-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kubekit/client",
"version": "0.1.1",
"version": "0.1.19",
"main": "lib/index.js",
"types": "lib/index.d.ts",
"author": "kahirokunn",
Expand Down
43 changes: 34 additions & 9 deletions packages/kubekit-client/src/client/concurrent_task_runner.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,53 @@
import { sleep } from "./sleep";

type PromiseFunction = () => Promise<unknown>;

export class ConcurrentTaskRunner {
private concurrency: number;
private currentRunning = 0;
private taskQueue: PromiseFunction[] = [];

private taskId = 0;
private queueTaskIds: number[] = [];

constructor(concurrency: number) {
this.concurrency = concurrency;
}

add(promiseFunction: PromiseFunction): void {
this.taskQueue.push(promiseFunction);
this.#runNextTask();
add(promiseFunction: PromiseFunction): Promise<void> {
return new Promise((res) => {
this.taskId++
const taskId = this.taskId
this.queueTaskIds.push(taskId)
this.taskQueue.push(() => promiseFunction().finally(() => {
this.queueTaskIds = this.queueTaskIds.filter(id => id !== taskId)
this.currentRunning--;
this.#runNextTask();
res()
}));
this.#runNextTask();
})
}

waitFinishedCurrentQueuedTasks(): Promise<void> {
return new Promise(async (res) => {
const latestTaskId = this.taskId
while (true) {
await sleep(1)
if (this.queueTaskIds.filter(id => id <= latestTaskId).length === 0) {
break
}
}
res()
})
}

#runNextTask(): void {
if (this.currentRunning < this.concurrency && this.taskQueue.length) {
const task = this.taskQueue.shift()!;
const task = this.taskQueue.shift();
if (task) {
this.currentRunning++;
task().finally(() => {
this.currentRunning--;
this.#runNextTask();
});
this.currentRunning++
task()
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions packages/kubekit-client/src/client/debounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { sleep } from './sleep';
import { type ObjectReference } from './types';

type CacheValue = Record<string, number>;

export class Debounce {
#cache = new Map<string, CacheValue>();
#waitMilliSeconds: number;
#maxWaitMilliSeconds: number;
#latestVersion: string;

constructor(waitMilliSeconds: number, maxWaitMilliSeconds: number) {
this.#waitMilliSeconds = waitMilliSeconds;
this.#maxWaitMilliSeconds = maxWaitMilliSeconds;
this.#latestVersion = '';
}

public static getCacheKey({ namespace, name }: ObjectReference) {
return `${namespace || ''}/${name}`;
}

pushLatestItem(cacheKey: string, version: string) {
this.#latestVersion = version;
this.#cache.set(cacheKey, {
...(this.#cache.get(cacheKey) || {}),
[version]: Number(new Date()),
});
}

async skipOrExec(cacheKey: string, version: string, func: () => unknown | Promise<unknown>) {
const now = Number(new Date());
const getCurrentPushedAt = this.#cache.get(cacheKey)?.[version] || now;
const currentElapsedTime = now - getCurrentPushedAt;
const getOldestPushedAt = Object.values(this.#cache.get(cacheKey) || {})[0] || now;
const oldestElapsedTime = now - getOldestPushedAt;

if (this.#maxWaitMilliSeconds > oldestElapsedTime && this.#waitMilliSeconds > currentElapsedTime) {
await sleep(this.#waitMilliSeconds - currentElapsedTime);
}
if (version === this.#latestVersion) {
this.#cache.delete(cacheKey);
await func();
}
}
}
191 changes: 107 additions & 84 deletions packages/kubekit-client/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@ import * as k8s from '@kubernetes/client-node';
import type * as https from 'node:https';
import { Agent } from 'undici';
import { ConcurrentTaskRunner } from './concurrent_task_runner';
import { sleep } from './sleep';
import { ReadableStream, TransformStream } from 'node:stream/web';
import { Debounce } from './debounce';
import { type ObjectReference } from './types';
export { sleep } from './sleep'

type IoK8SApiCoreV1ObjectReference = {
apiVersion: string | undefined;
fieldPath?: string | undefined;
kind: string | undefined;
name: string | undefined;
namespace?: string | undefined;
resourceVersion: string;
uid?: string | undefined;
export { ConcurrentTaskRunner, Debounce };

type Id<T> = {
[K in keyof T]: T[K];
} & {};
type RequiredAndDefined<T> = {
[P in keyof T]-?: Exclude<T[P], null | undefined>;
};
type PartialRequired<T, K extends keyof T> = Id<RequiredAndDefined<Pick<T, K>> & Omit<T, K>>;

type K8sObj = {
metadata: IoK8SApiCoreV1ObjectReference;
metadata: ObjectReference;
};

type RemoveUndefined<T> = {
[K in keyof T]: Exclude<T[K], undefined | null>;
};

type CacheValue = Record<string, number>;
function removeNullableProperties<T extends Record<string, unknown | undefined> | undefined>(
object: T
): RemoveUndefined<T> {
Expand Down Expand Up @@ -114,13 +115,29 @@ type K8sListResponse<T> = {
items: T[];
};

export type WatchEventType = 'ADDED' | 'Modified' | 'Deleted' | 'BOOKMARK';
export type Context = {
isInitialized: boolean;
resourceVersion: string;
};
export type WatchEventType = 'ADDED' | 'MODIFIED' | 'DELETED';
export type InnerWatchEventType = WatchEventType | 'BOOKMARK';
type WatchEvent<T> = { type: WatchEventType; object: T };
export type WatchExtraOptions<T extends K8sListResponse<unknown>> = {
type FinalizerEvent<T extends K8sObj> = {
type: 'ADDED' | 'MODIFIED';
object: Id<
Omit<T, 'metadata'> & {
metadata: PartialRequired<T['metadata'], 'finalizers'>;
}
>;
};
type InnerWatchEvent<T> = { type: InnerWatchEventType; object: T };
export type WatchExtraOptions<T extends K8sListResponse<K8sObj>> = {
maxWait?: number;
wait?: number;
concurrency?: number;
watchHandler: (e: WatchEvent<T['items'][number]>) => MaybePromise<unknown>;
watchHandler: (e: WatchEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
finalizeHandler?: (e: FinalizerEvent<T['items'][number]>, ctx: Context) => MaybePromise<unknown>;
initializedHandler?: (ctx: Context) => MaybePromise<unknown>;
syncPeriod?: number; // default 10h
};
export type Options = RetryOptions & HttpOptions;
Expand Down Expand Up @@ -174,6 +191,7 @@ export async function apiClient<Response>(
};

let { path, method, params = {}, body, contentType } = { ...arguments_ };
const isWatch = 'watch' in params && Boolean(params.watch);

let httpsOptions: https.RequestOptions = {
path,
Expand Down Expand Up @@ -219,6 +237,13 @@ export async function apiClient<Response>(
// TODO: defaultのfeature flagがtrueになったら、watchの場合は自動で以下の2つのparamsを追加してあげたい
// sendInitialEvents: true,
// resourceVersionMatch: "NotOlderThan"

if (isWatch) {
params = {
...params,
allowWatchBookmarks: true,
};
}
baseUrl += (baseUrl.includes('?') ? '&' : '?') + toSearchParameters(params);
}
const url = new URL(baseUrl);
Expand Down Expand Up @@ -263,17 +288,25 @@ export async function apiClient<Response>(
const isJsonResponse = contentType?.includes('application/json') ?? false;

if (isSuccess && isJsonResponse) {
if ('watch' in params && params.watch && response.body && 'watchHandler' in extraOptions) {
if (isWatch && response.body && 'watchHandler' in extraOptions) {
const {
watchHandler,
wait = 200,
maxWait = 1000,
concurrency = 4,
syncPeriod = 3_6000_000,
finalizeHandler,
initializedHandler,
} = {
finalizeHandler: () => {},
initializedHandler: () => {},
...extraOptions,
};

const ctx: Context = {
isInitialized: false,
resourceVersion: '',
};
const taskRunner = new ConcurrentTaskRunner(concurrency);
const debounce = new Debounce(wait, maxWait);

Expand All @@ -285,32 +318,81 @@ export async function apiClient<Response>(
};
delete listArgs.params.watch;
const { syncPeriod: __, ...listExtraOptions } = { ...extraOptions };

const intervalId = setInterval(async () => {
const result = (await apiClient<K8sListResponse<K8sObj>>(
listArgs,
listExtraOptions
)) as K8sListResponse<K8sObj>;

result.items.forEach((k8sObj) => {
result.items.forEach((k8sObj, i) => {
taskRunner.add(async () => {
const objectReference = k8sObj.metadata;

await debounce.skipOrExec(Debounce.getCacheKey(objectReference), objectReference.resourceVersion, () =>
watchHandler(k8sObj as any)
await debounce.skipOrExec(
Debounce.getCacheKey(objectReference),
objectReference.resourceVersion,
async () => {
if (k8sObj.metadata.deletionTimestamp && k8sObj.metadata.finalizers?.length) {
await finalizeHandler(
{
type: 'MODIFIED',
object: k8sObj as any,
},
{
...ctx,
}
);
} else {
await watchHandler(
{
type: 'MODIFIED',
object: k8sObj,
},
{
...ctx,
}
);
}
}
);
});
});
}, syncPeriod);
try {
for await (const k8sObj of (response.body as ReadableStream<Uint8Array>).pipeThrough(
new JsonStream<WatchEvent<K8sObj>>()
new JsonStream<InnerWatchEvent<K8sObj>>()
)) {
const { resourceVersion } = k8sObj.object.metadata;
const cacheKey = Debounce.getCacheKey(k8sObj.object.metadata);
debounce.push(cacheKey, resourceVersion);
taskRunner.add(async () => {
await debounce.skipOrExec(cacheKey, resourceVersion, () => watchHandler(k8sObj as any));
});
if (k8sObj.type === 'BOOKMARK') {
ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
if (k8sObj.object.metadata.annotations?.['k8s.io/initial-events-end'] === 'true') {
await taskRunner.waitFinishedCurrentQueuedTasks()
ctx.isInitialized = true;
await taskRunner.add(async () => {
await initializedHandler({
...ctx
});
});
}
} else {
const { resourceVersion } = k8sObj.object.metadata;
const cacheKey = Debounce.getCacheKey(k8sObj.object.metadata);
debounce.pushLatestItem(cacheKey, resourceVersion);
taskRunner.add(async () => {
ctx.resourceVersion = k8sObj.object.metadata.resourceVersion;
await debounce.skipOrExec(cacheKey, resourceVersion, () => {
if (k8sObj.object.metadata.deletionTimestamp && k8sObj.object.metadata.finalizers?.length) {
return finalizeHandler(k8sObj as any, {
...ctx,
});
} else {
return watchHandler(k8sObj as any, {
...ctx,
});
}
});
});
}
}
} finally {
clearInterval(intervalId);
Expand Down Expand Up @@ -353,65 +435,6 @@ const toSearchParameters = (parameters: Record<string, string>) => {
return new URLSearchParams(removeNullableProperties(parameters));
};

class Debounce {
#cache = new Map<string, CacheValue>();
#waitMilliSeconds: number;
#maxWaitMilliSeconds: number;

constructor(waitMilliSeconds: number, maxWaitMilliSeconds: number) {
this.#waitMilliSeconds = waitMilliSeconds;
this.#maxWaitMilliSeconds = maxWaitMilliSeconds;
}

public static getCacheKey({ namespace, name }: IoK8SApiCoreV1ObjectReference) {
return `${namespace || ''}/${name}`;
}

push(cacheKey: string, resourceVersion: string) {
this.#cache.set(cacheKey, {
...(this.#cache.get(cacheKey) || {}),
[resourceVersion]: Number(new Date()),
});
}

async skipOrExec(cacheKey: string, resourceVersion: string, func: () => unknown | Promise<unknown>) {
const getLatestResourceVersion = () => {
const resourceVersions = Object.keys(this.#cache.get(cacheKey) || {});

if (resourceVersions.length === 0) {
return BigInt(-1);
}

const sorted = resourceVersions.map(BigInt).sort();

const max = sorted[sorted.length - 1];
const min = sorted[0];

// resourceVersionはint64を超えたら0にwrapされる仕様です.
// minとmaxでNumber.MAX_VALUEよりもresourceVersionが離れていたら、離れすぎなので、wrapされたと判定する事にします
if (max - BigInt(Number.MAX_VALUE) > min) {
return min;
}

return max;
};

const now = Number(new Date());
const getCurrentPushedAt = this.#cache.get(cacheKey)?.[resourceVersion] || now;
const currentElapsedTime = now - getCurrentPushedAt;
const getOldestPushedAt = Object.values(this.#cache.get(cacheKey) || {})[0] || now;
const oldestElapsedTime = now - getOldestPushedAt;

if (this.#maxWaitMilliSeconds > oldestElapsedTime && this.#waitMilliSeconds > currentElapsedTime) {
await sleep(this.#waitMilliSeconds - currentElapsedTime);
}
if (BigInt(resourceVersion) === getLatestResourceVersion()) {
this.#cache.delete(cacheKey);
await func();
}
}
}

class JsonStream<T> extends TransformStream<Uint8Array, T> {
constructor() {
let buffer = '';
Expand Down
Loading

0 comments on commit 1f3d1a4

Please sign in to comment.