Skip to content

Commit

Permalink
start of overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
trisongz committed Oct 16, 2023
1 parent fa19644 commit 9c09f7d
Show file tree
Hide file tree
Showing 135 changed files with 27,577 additions and 6,058 deletions.
10 changes: 10 additions & 0 deletions CHANGELOGS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# aiokeydb changelogs

## 0.2.0rc0 (2023-10-16)

**Breaking Changes Inbound**

- Rework entire module, deprecating previous implementation into `v1`, maintaining legacy `v2` namespace, while transforming the previous `v2` into the primary module. The final release version of `0.2.0` will not contain any modules, but rather reference the `v2` namespace as the primary module.

- Rework `pydantic` dependencies to support both `v1/v2`.



## 0.1.31 (2023-03-29)
- Add new `add_fallback_function` method for Worker, which allows for a fallback function to be called when the worker fails to execute the task. (v2)

Expand Down
87 changes: 42 additions & 45 deletions aiokeydb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from __future__ import absolute_import

import sys

from aiokeydb.core import KeyDB, StrictKeyDB
from aiokeydb.cluster import KeyDBCluster
from aiokeydb.core import KeyDB, AsyncKeyDB
from aiokeydb.cluster import KeyDBCluster, AsyncKeyDBCluster
from aiokeydb.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
AsyncBlockingConnectionPool,
AsyncConnection,
AsyncConnectionPool,
AsyncSSLConnection,
AsyncUnixDomainSocketConnection,
)
from aiokeydb.credentials import CredentialProvider, UsernamePasswordCredentialProvider
from redis.credentials import CredentialProvider, UsernamePasswordCredentialProvider
from aiokeydb.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
Expand All @@ -22,66 +25,54 @@
InvalidResponse,
PubSubError,
ReadOnlyError,
KeyDBError,
ResponseError,
TimeoutError,
WatchError,
JobError,
)
from aiokeydb.sentinel import (
Sentinel,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
)
from aiokeydb.utils import from_url

# Handle Async

from aiokeydb.asyncio import (
AsyncKeyDB,
StrictAsyncKeyDB,
AsyncBlockingConnectionPool,
AsyncConnection,
AsyncConnectionPool,
AsyncSSLConnection,
AsyncUnixDomainSocketConnection,
AsyncSentinel,
AsyncSentinelConnectionPool,
AsyncSentinelManagedConnection,
AsyncSentinelManagedSSLConnection,
async_from_url
)

# Handle Client

from aiokeydb.client.serializers import SerializerType
from aiokeydb.client.config import KeyDBSettings
from aiokeydb.client.schemas.session import KeyDBSession
from aiokeydb.client.meta import KeyDBClient
from aiokeydb.utils.base import from_url
from aiokeydb.utils.lazy import get_keydb_settings

if sys.version_info >= (3, 8):
from importlib import metadata
else:
import importlib_metadata as metadata
# Handle Client
from aiokeydb.serializers import SerializerType
from aiokeydb.configs import KeyDBSettings, KeyDBWorkerSettings
from aiokeydb.types.session import KeyDBSession
from aiokeydb.client import KeyDBClient

# Handle Queues
from aiokeydb.types.jobs import Job, CronJob
from aiokeydb.types.task_queue import TaskQueue
from aiokeydb.types.worker import Worker

# Add KeyDB Index Types
from aiokeydb.types.indexes import (
KDBIndex,
KDBDict,
AsyncKDBDict,
)

from aiokeydb.version import VERSION as __version__

def int_or_str(value):
try:
return int(value)
except ValueError:
return value

from aiokeydb.version import VERSION as __version__


# try:
# __version__ = metadata.version("aiokeydb")
# except metadata.PackageNotFoundError:
# __version__ = "99.99.99"


VERSION = tuple(map(int_or_str, __version__.split(".")))

# Job.update_forward_refs()

__all__ = [
"AuthenticationError",
"AuthenticationWrongNumberOfArgsError",
Expand All @@ -98,22 +89,24 @@ def int_or_str(value):
"ReadOnlyError",
"KeyDB",
"KeyDBCluster",
"KeyDBError",
# "KeyDBError",
"ResponseError",
"Sentinel",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
"SSLConnection",
"StrictKeyDB",
# "StrictKeyDB",
"TimeoutError",
"UnixDomainSocketConnection",
"WatchError",
"JobError",
"CredentialProvider",
"UsernamePasswordCredentialProvider",
# Async
"AsyncKeyDB",
"StrictAsyncKeyDB",
"AsyncKeyDBCluster",
# "StrictAsyncKeyDB",
"AsyncBlockingConnectionPool",
"AsyncConnection",
"AsyncConnectionPool",
Expand All @@ -122,12 +115,16 @@ def int_or_str(value):
"AsyncSentinel",
"AsyncSentinelConnectionPool",
"AsyncSentinelManagedConnection",
"AsyncSentinelManagedSSLConnection",
"async_from_url",
# "AsyncSentinelManagedSSLConnection",

# Client
"SerializerType",
"KeyDBSettings",
"KeyDBWorkerSettings",
"KeyDBSession",
"KeyDBClient",

# Queues
"TaskQueue",
"Worker",
]
119 changes: 14 additions & 105 deletions aiokeydb/backoff.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,14 @@
import random
from abc import ABC, abstractmethod


class AbstractBackoff(ABC):
"""Backoff interface"""

def reset(self):
"""
Reset internal state before an operation.
`reset` is called once at the beginning of
every call to `Retry.call_with_retry`
"""
pass

@abstractmethod
def compute(self, failures):
"""Compute backoff in seconds upon failure"""
pass


class ConstantBackoff(AbstractBackoff):
"""Constant backoff upon failure"""

def __init__(self, backoff):
"""`backoff`: backoff time in seconds"""
self._backoff = backoff

def compute(self, failures):
return self._backoff


class NoBackoff(ConstantBackoff):
"""No backoff upon failure"""

def __init__(self):
super().__init__(0)


class ExponentialBackoff(AbstractBackoff):
"""Exponential backoff upon failure"""

def __init__(self, cap, base):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base

def compute(self, failures):
return min(self._cap, self._base * 2**failures)


class FullJitterBackoff(AbstractBackoff):
"""Full jitter backoff upon failure"""

def __init__(self, cap, base):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base

def compute(self, failures):
return random.uniform(0, min(self._cap, self._base * 2**failures))


class EqualJitterBackoff(AbstractBackoff):
"""Equal jitter backoff upon failure"""

def __init__(self, cap, base):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base

def compute(self, failures):
temp = min(self._cap, self._base * 2**failures) / 2
return temp + random.uniform(0, temp)


class DecorrelatedJitterBackoff(AbstractBackoff):
"""Decorrelated jitter backoff upon failure"""

def __init__(self, cap, base):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base
self._previous_backoff = 0

def reset(self):
self._previous_backoff = 0

def compute(self, failures):
max_backoff = max(self._base, self._previous_backoff * 3)
temp = random.uniform(self._base, max_backoff)
self._previous_backoff = min(self._cap, temp)
return self._previous_backoff
import typing

from redis.backoff import (
AbstractBackoff,
ConstantBackoff,
ExponentialBackoff,
FullJitterBackoff,
NoBackoff,
EqualJitterBackoff,
DecorrelatedJitterBackoff,
)

def default_backoff() -> typing.Type[AbstractBackoff]:
return EqualJitterBackoff()
Loading

0 comments on commit 9c09f7d

Please sign in to comment.