diff --git a/ready.nimble b/ready.nimble index a1cbc02..45d1ffb 100644 --- a/ready.nimble +++ b/ready.nimble @@ -6,4 +6,3 @@ license = "MIT" srcDir = "src" requires "nim >= 1.6.10" -requires "waterpark >= 0.1.4" diff --git a/src/ready/pools.nim b/src/ready/pools.nim index 8a8ba2c..74eb2a1 100644 --- a/src/ready/pools.nim +++ b/src/ready/pools.nim @@ -1,11 +1,13 @@ -import connections, std/locks, std/sequtils, std/tables, std/times, waterpark +import connections, std/locks, std/random, std/tables, std/times type RedisPoolObj = object address: string port: Port - pool: Pool[RedisConn] - lastReturnedLock: Lock + conns: seq[RedisConn] + lock: Lock + cond: Cond + r: Rand lastReturned: Table[RedisConn, float] onConnect: proc(conn: RedisConn) {.gcsafe.} onBorrow: proc(conn: RedisConn, lastReturned: float) {.gcsafe.} @@ -15,11 +17,9 @@ type proc close*(pool: RedisPool) = ## Closes the database connections in the pool then deallocates the pool. ## All connections should be returned to the pool before it is closed. - let entries = toSeq(pool.pool.items) - for entry in entries: - entry.close() - pool.pool.delete(entry) - pool.pool.close() + withLock pool.lock: + for conn in pool.conns: + conn.close() `=destroy`(pool[]) deallocShared(pool) @@ -29,9 +29,11 @@ proc openNewConnection(pool: RedisPool): RedisConn = pool.onConnect(result) proc recycle*(pool: RedisPool, conn: RedisConn) {.raises: [], gcsafe.} = - withLock pool.lastReturnedLock: + withLock pool.lock: + pool.conns.add(conn) + pool.r.shuffle(pool.conns) pool.lastReturned[conn] = epochTime() - pool.pool.recycle(conn) + signal(pool.cond) proc newRedisPool*( size: int, @@ -46,8 +48,9 @@ proc newRedisPool*( result = cast[RedisPool](allocShared0(sizeof(RedisPoolObj))) result.port = port result.address = address - initLock(result.lastReturnedLock) - result.pool = newPool[RedisConn]() + initLock(result.lock) + initCond(result.cond) + result.r = initRand(2023) result.onConnect = onConnect result.onBorrow = onBorrow try: @@ -61,22 +64,26 @@ proc newRedisPool*( raise getCurrentException() proc borrow*(pool: RedisPool): RedisConn {.gcsafe.} = - result = pool.pool.borrow() + acquire(pool.lock) + while pool.conns.len == 0: + wait(pool.cond, pool.lock) + result = pool.conns.pop() + release(pool.lock) + if pool.onBorrow != nil: try: var lastReturned: float - withLock pool.lastReturnedLock: + withLock pool.lock: lastReturned = pool.lastReturned[result] pool.onBorrow(result, lastReturned) except: # Close this connection and open a new one + withLock pool.lock: + pool.lastReturned.del(result) result.close() result = pool.openNewConnection() - let lastReturned = epochTime() - withLock pool.lastReturnedLock: - pool.lastReturned[result] = lastReturned if pool.onBorrow != nil: - pool.onBorrow(result, lastReturned) + pool.onBorrow(result, epochTime()) template withConnnection*(pool: RedisPool, conn, body) = block: