Skip to content

Commit

Permalink
feat: big refactoring of @_Memo and @_AsyncMemo
Browse files Browse the repository at this point in the history
cacheErrors/cacheRejections functionality is dropped.
Previously it defaulted to true, so errors/rejections were cached and
ensured "max 1 time execution".
From now on errors are NOT cached, so, error-throwing functions will
be executed multiple times.
It's recommended to make sure they're idempotent!

This is "for simplicity sake".

.dropCache is replaced by `_getMemo().clear()`

@_AsyncMemo now does NOT default to MapCache, and needs
to pass cacheFactory explicitly.
AsyncMemo now requires cache that implements AsyncMemoCache
interface, which no longer supports sync execution (but only async).

Important!
@_Memo is proven (by tests) to be NOT prone to "async swarm" issue.
(no change here)
@_AsyncMemo WAS prone to "async swarm" issue, therefor it was
refactored with significantly more complicated EXPERIMENTAL
implementation that returns "in-flight promises" to avoid "async swarm".

AsyncMemoCache now requires to return MISS symbol on cache misses.
This allows it to support both `undefined` and `null` as cached values.
Previously `undefined` was treated as MISS, which was ambiguous.
  • Loading branch information
kirillgroshkov committed Mar 30, 2024
1 parent 38d6ffe commit 1f795e1
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 221 deletions.
46 changes: 38 additions & 8 deletions src/decorators/asyncMemo.decorator.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { _AsyncMemo } from './asyncMemo.decorator'
import { MapMemoCache } from './memo.util'
import { _range } from '../array/range'
import { pDelay } from '../promise/pDelay'
import { _AsyncMemo, _getAsyncMemo } from './asyncMemo.decorator'
import { MapAsyncMemoCache } from './memo.util'

class A {
func(n: number): void {
console.log(`func ${n}`)
}

@_AsyncMemo()
@_AsyncMemo({ cacheFactory: () => new MapAsyncMemoCache() })
async a(a1: number, a2: number): Promise<number> {
const n = a1 * a2
this.func(n)
Expand Down Expand Up @@ -34,7 +36,7 @@ test('memo a', async () => {
expect(a.func).toMatchSnapshot()

// cleanup for the next tests
await (a.a as any).dropCache()
await _getAsyncMemo(a.a).clear()
})

test('MEMO_DROP_CACHE', async () => {
Expand All @@ -45,21 +47,22 @@ test('MEMO_DROP_CACHE', async () => {
await a.a(2, 3)

// drop cache
await (a.a as any).dropCache()
await _getAsyncMemo(a.a).clear()
expect(_getAsyncMemo(a.a).getInstanceCache().size).toBe(0)

// second call
await a.a(2, 3)

expect(a.func).toBeCalledTimes(2)
expect(a.func).toHaveBeenCalledTimes(2)
})

class B {
cacheMisses = 0

@_AsyncMemo() // testing 2 layers of AsyncMemo!
@_AsyncMemo({ cacheFactory: () => new MapAsyncMemoCache() }) // testing 2 layers of AsyncMemo!
@_AsyncMemo({
// testing to provide specific cacheFactory, should be no difference in this test
cacheFactory: () => new MapMemoCache(),
cacheFactory: () => new MapAsyncMemoCache(),
})
async a(a1 = 'def'): Promise<number> {
console.log(`a called with a1=${a1}`)
Expand All @@ -85,3 +88,30 @@ test('should work with default arg values', async () => {

expect(b.cacheMisses).toBe(3)
})

class C {
calls = 0

@_AsyncMemo({
cacheFactory: () => new MapAsyncMemoCache(10),
})
async fn(): Promise<void> {
this.calls++
await pDelay(100)
}
}

test('swarm test of async function', async () => {
// Swarm is when one "slow" async function is called multiple times in parallel
// Expectation is that it should return the same Promise and `calls` should equal to 1
// Because it **synchronously** returns a Promise (despite the fact that it's async)
const c = new C()

await Promise.all(_range(10).map(() => c.fn()))

expect(c.calls).toBe(1)

await pDelay(150)
await Promise.all(_range(3).map(() => c.fn()))
expect(c.calls).toBe(1)
})
186 changes: 106 additions & 80 deletions src/decorators/asyncMemo.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,49 @@
import { _assert } from '../error/assert'
import type { CommonLogger } from '../log/commonLogger'
import type { AnyObject } from '../types'
import { _objectAssign, AnyAsyncFunction, AnyFunction, AnyObject, MISS } from '../types'
import { _getTargetMethodSignature } from './decorator.util'
import type { AsyncMemoCache } from './memo.util'
import { jsonMemoSerializer, MapMemoCache } from './memo.util'
import { AsyncMemoCache, jsonMemoSerializer } from './memo.util'

export interface AsyncMemoOptions {
/**
* Provide a custom implementation of MemoCache.
* Function that creates an instance of `MemoCache`.
* e.g LRUMemoCache from `@naturalcycles/nodejs-lib`.
* Provide a custom implementation of AsyncMemoCache.
* Function that creates an instance of `AsyncMemoCache`.
*/
cacheFactory?: () => AsyncMemoCache
cacheFactory: () => AsyncMemoCache

/**
* Provide a custom implementation of CacheKey function.
*/
cacheKeyFn?: (args: any[]) => any

/**
* Default true.
*
* Set to `false` to skip caching rejected promises (errors).
*
* True will ensure "max 1 execution", but will "remember" rejection.
* False will allow >1 execution in case of errors.
* Default to `console`
*/
cacheRejections?: boolean
logger?: CommonLogger
}

export interface AsyncMemoInstance {
/**
* Default to `console`
* Clears the cache.
*/
logger?: CommonLogger
clear: () => Promise<void>

getInstanceCache: () => Map<AnyObject, AsyncMemoCache>

getCache: (instance: AnyAsyncFunction) => AsyncMemoCache | undefined
}

/**
* Like @_Memo, but allowing async MemoCache implementation.
*
* Important: it awaits the method to return the result before caching it.
*
* todo: test for "swarm requests", it should return "the same promise" and not cause a swarm origin hit
* Implementation is more complex than @_Memo, because it needs to handle "in-flight" Promises
* while waiting for cache to resolve, to prevent "async swarm" issue.
*
* Method CANNOT return `undefined`, as undefined will always be treated as cache MISS and retried.
* Return `null` instead (it'll be cached).
* @experimental consider normal @_Memo for most of the cases, it's stable and predictable
*/
// eslint-disable-next-line @typescript-eslint/naming-convention
export const _AsyncMemo =
(opt: AsyncMemoOptions = {}): MethodDecorator =>
(opt: AsyncMemoOptions): MethodDecorator =>
(target, key, descriptor) => {
if (typeof descriptor.value !== 'function') {
throw new TypeError('Memoization can be applied only to methods')
Expand All @@ -54,89 +52,117 @@ export const _AsyncMemo =
const originalFn = descriptor.value

// Map from "instance" of the Class where @_AsyncMemo is applied to AsyncMemoCache instance.
const cache = new Map<AnyObject, AsyncMemoCache>()
const instanceCache = new Map<AnyObject, AsyncMemoCache>()

// Cache from Instance to Map<key, Promise>
// This cache is temporary, with only one purpose - to prevent "async swarm"
// It only holds values that are "in-flight", until Promise is resolved
// After it's resolved - it's evicted from the cache and moved to the "proper" `instanceCache`
const instancePromiseCache = new Map<AnyObject, Map<any, Promise<any>>>()

const {
logger = console,
cacheFactory = () => new MapMemoCache(),
cacheKeyFn = jsonMemoSerializer,
cacheRejections = true,
} = opt
const { logger = console, cacheFactory, cacheKeyFn = jsonMemoSerializer } = opt

const keyStr = String(key)
const methodSignature = _getTargetMethodSignature(target, keyStr)

descriptor.value = async function (this: typeof target, ...args: any[]): Promise<any> {
// eslint-disable-next-line @typescript-eslint/promise-function-async
descriptor.value = function (this: typeof target, ...args: any[]): Promise<any> {
const ctx = this

const cacheKey = cacheKeyFn(args)

if (!cache.has(ctx)) {
cache.set(ctx, cacheFactory())
let cache = instanceCache.get(ctx)
let promiseCache = instancePromiseCache.get(ctx)

if (!cache) {
cache = cacheFactory()
instanceCache.set(ctx, cache)
// here, no need to check the cache. It's definitely a miss, because the cacheLayers is just created
// UPD: no! AsyncMemo supports "persistent caches" (e.g Database-backed cache)
}

let value: any

try {
value = await cache.get(ctx)!.get(cacheKey)
} catch (err) {
// log error, but don't throw, treat it as a "miss"
logger.error(err)
if (!promiseCache) {
promiseCache = new Map()
instancePromiseCache.set(ctx, promiseCache)
}

if (value !== undefined) {
// hit!
if (value instanceof Error) {
throw value
}

return value
let promise = promiseCache.get(cacheKey)
// If there's already "in-flight" cache request - return that, to avoid "async swarm"
if (promise) {
// console.log('return promise', promiseCache.size)
return promise
}

// Here we know it's a MISS, let's execute the real method
try {
value = await originalFn.apply(ctx, args)

// Save the value in the Cache, without awaiting it
// This is to support both sync and async functions
void (async () => {
try {
await cache.get(ctx)!.set(cacheKey, value)
} catch (err) {
// log and ignore the error
logger.error(err)
promise = cache.get(cacheKey).then(
async value => {
if (value !== MISS) {
// console.log('hit', promiseCache.size)
promiseCache.delete(cacheKey)
return value
}
})()

return value
} catch (err) {
if (cacheRejections) {
// We put it to cache as raw Error, not Promise.reject(err)
// This is to support both sync and async functions
// Miss
// console.log('miss', promiseCache.size)
return await onMiss()
},
async err => {
// Log the cache error and proceed "as cache Miss"
logger.error(err)
return await onMiss()
},
)

promiseCache.set(cacheKey, promise)
return promise

//

async function onMiss(): Promise<any> {
try {
const value = await originalFn.apply(ctx, args)

// Save the value in the Cache, in parallel,
// not to slow down the main function execution
// and not to fail on possible cache issues
void (async () => {
try {
await cache.get(ctx)!.set(cacheKey, err)
await cache!.set(cacheKey, value)
} catch (err) {
// log and ignore the error
logger.error(err)
logger.error(err) // log and ignore the error
} finally {
// Clear the "in-flight" promise cache entry, as we now have a "permanent" cache entry
promiseCache!.delete(cacheKey)
// console.log('cache set and cleared', promiseCache!.size)
}
})()
}

throw err
return value
} catch (err) {
promiseCache!.delete(cacheKey)
throw err
}
}
} as any
;(descriptor.value as any).dropCache = async () => {
logger.log(`${methodSignature} @_AsyncMemo.dropCache()`)
try {
await Promise.all([...cache.values()].map(c => c.clear()))
cache.clear()
} catch (err) {
logger.error(err)
}
}

_objectAssign(descriptor.value as AsyncMemoInstance, {
clear: async () => {
logger.log(`${methodSignature} @_AsyncMemo.clear()`)
await Promise.all([...instanceCache.values()].map(c => c.clear()))
instanceCache.clear()
},
getInstanceCache: () => instanceCache,
getCache: instance => instanceCache.get(instance),
})

return descriptor
}

/**
Call it on a method that is decorated with `@_AsyncMemo` to get access to additional functions,
e.g `clear` to clear the cache, or get its underlying data.
*/
export function _getAsyncMemo(method: AnyFunction): AsyncMemoInstance {
_assert(
typeof (method as any)?.getInstanceCache === 'function',
'method is not an AsyncMemo instance',
)
return method as any
}
34 changes: 30 additions & 4 deletions src/decorators/memo.decorator.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { _Memo } from './memo.decorator'
import { _range } from '../array/range'
import { pDelay } from '../promise/pDelay'
import { _getMemo, _Memo } from './memo.decorator'

class A {
func(n: number): void {
Expand Down Expand Up @@ -32,8 +34,12 @@ test('memo a', () => {
// to be called once per set of arguments (2)
expect(a.func).toMatchSnapshot()

expect(_getMemo(a.a).getInstanceCache().size).toBe(1)

// cleanup for the next tests
;(a.a as any).dropCache()
_getMemo(a.a).clear()

expect(_getMemo(a.a).getInstanceCache().size).toBe(0)
})

test('MEMO_DROP_CACHE', () => {
Expand All @@ -43,8 +49,7 @@ test('MEMO_DROP_CACHE', () => {
// first call
a.a(2, 3)

// drop cache
;(a.a as any).dropCache()
_getMemo(a.a).clear()

// second call
a.a(2, 3)
Expand Down Expand Up @@ -84,3 +89,24 @@ test('should work with default arg values', () => {

expect(b.cacheMisses).toBe(3)
})

class C {
calls = 0

@_Memo()
async fn(): Promise<void> {
this.calls++
await pDelay(100)
}
}

test('swarm test of async function', async () => {
// Swarm is when one "slow" async function is called multiple times in parallel
// Expectation is that it should return the same Promise and `calls` should equal to 1
// Because it **synchronously** returns a Promise (despite the fact that it's async)
const c = new C()

await Promise.all(_range(10).map(() => c.fn()))

expect(c.calls).toBe(1)
})
Loading

0 comments on commit 1f795e1

Please sign in to comment.