Skip to content

Commit

Permalink
Merge pull request #3 from guzba/ryan
Browse files Browse the repository at this point in the history
do not depend on waterpark
  • Loading branch information
guzba authored Aug 29, 2023
2 parents e94d69d + e7ec616 commit 7ec39bb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
1 change: 0 additions & 1 deletion ready.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ license = "MIT"
srcDir = "src"

requires "nim >= 1.6.10"
requires "waterpark >= 0.1.4"
43 changes: 25 additions & 18 deletions src/ready/pools.nim
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import connections, std/locks, std/sequtils, std/tables, std/times, waterpark
import connections, std/locks, std/random, std/tables, std/times

type
RedisPoolObj = object
address: string
port: Port
pool: Pool[RedisConn]
lastReturnedLock: Lock
conns: seq[RedisConn]
lock: Lock
cond: Cond
r: Rand
lastReturned: Table[RedisConn, float]
onConnect: proc(conn: RedisConn) {.gcsafe.}
onBorrow: proc(conn: RedisConn, lastReturned: float) {.gcsafe.}
Expand All @@ -15,11 +17,9 @@ type
proc close*(pool: RedisPool) =
## Closes the database connections in the pool then deallocates the pool.
## All connections should be returned to the pool before it is closed.
let entries = toSeq(pool.pool.items)
for entry in entries:
entry.close()
pool.pool.delete(entry)
pool.pool.close()
withLock pool.lock:
for conn in pool.conns:
conn.close()
`=destroy`(pool[])
deallocShared(pool)

Expand All @@ -29,9 +29,11 @@ proc openNewConnection(pool: RedisPool): RedisConn =
pool.onConnect(result)

proc recycle*(pool: RedisPool, conn: RedisConn) {.raises: [], gcsafe.} =
withLock pool.lastReturnedLock:
withLock pool.lock:
pool.conns.add(conn)
pool.r.shuffle(pool.conns)
pool.lastReturned[conn] = epochTime()
pool.pool.recycle(conn)
signal(pool.cond)

proc newRedisPool*(
size: int,
Expand All @@ -46,8 +48,9 @@ proc newRedisPool*(
result = cast[RedisPool](allocShared0(sizeof(RedisPoolObj)))
result.port = port
result.address = address
initLock(result.lastReturnedLock)
result.pool = newPool[RedisConn]()
initLock(result.lock)
initCond(result.cond)
result.r = initRand(2023)
result.onConnect = onConnect
result.onBorrow = onBorrow
try:
Expand All @@ -61,22 +64,26 @@ proc newRedisPool*(
raise getCurrentException()

proc borrow*(pool: RedisPool): RedisConn {.gcsafe.} =
result = pool.pool.borrow()
acquire(pool.lock)
while pool.conns.len == 0:
wait(pool.cond, pool.lock)
result = pool.conns.pop()
release(pool.lock)

if pool.onBorrow != nil:
try:
var lastReturned: float
withLock pool.lastReturnedLock:
withLock pool.lock:
lastReturned = pool.lastReturned[result]
pool.onBorrow(result, lastReturned)
except:
# Close this connection and open a new one
withLock pool.lock:
pool.lastReturned.del(result)
result.close()
result = pool.openNewConnection()
let lastReturned = epochTime()
withLock pool.lastReturnedLock:
pool.lastReturned[result] = lastReturned
if pool.onBorrow != nil:
pool.onBorrow(result, lastReturned)
pool.onBorrow(result, epochTime())

template withConnnection*(pool: RedisPool, conn, body) =
block:
Expand Down

0 comments on commit 7ec39bb

Please sign in to comment.