Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reading redis config from env vars (optionally with prefix) #20

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,7 @@ config.yml
logs/**

#scripts
run.sh
run.sh

#local files
*.local.*
6 changes: 2 additions & 4 deletions src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from apscheduler.schedulers.background import BlockingScheduler
from pytz import utc
from yaml import Loader, load
from newrelic_logging.env import get_var, var_exists
from newrelic_logging.config import getenv
from newrelic_logging.integration import Integration
from newrelic_logging.telemetry import print_info, print_warn

Expand Down Expand Up @@ -75,10 +75,8 @@ def main():
if not run_as_service:
if 'cron_interval_minutes' in config:
cron_interval = config['cron_interval_minutes']
elif var_exists("CRON_INTERVAL_MINUTES"):
cron_interval = int(get_var("CRON_INTERVAL_MINUTES"))
else:
cron_interval = 60
cron_interval = int(getenv("CRON_INTERVAL_MINUTES", 60))

integration = Integration(config, event_mapping, cron_interval)
integration.run()
Expand Down
157 changes: 157 additions & 0 deletions src/newrelic_logging/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import redis
from datetime import timedelta

from .config import Config
from .telemetry import print_err, print_info


CONFIG_CACHE_ENABLED = 'cache_enabled'
CONFIG_REDIS_HOST = 'redis.host'
CONFIG_REDIS_PORT = 'redis.port'
CONFIG_REDIS_DB_NUMBER = 'redis.db_number'
CONFIG_REDIS_PASSWORD = 'redis.password'
CONFIG_REDIS_USE_SSL = 'redis.ssl'
CONFIG_REDIS_EXPIRE_DAYS = 'redis.expire_days'
DEFAULT_CACHE_ENABLED = False
DEFAULT_REDIS_HOST = 'localhost'
DEFAULT_REDIS_PORT = 6379
DEFAULT_REDIS_DB_NUMBER = 0
DEFAULT_REDIS_EXPIRE_DAYS = 2
DEFAULT_REDIS_SSL = False


# Local cache, to store data before sending it to Redis.
class DataCache:
redis = None
redis_expire = None
cached_events = {}
cached_logs = {}

def __init__(self, redis, redis_expire) -> None:
self.redis = redis
self.redis_expire = redis_expire

def set_redis_expire(self, key):
try:
self.redis.expire(key, timedelta(days=self.redis_expire))
except Exception as e:
print_err(f"Failed setting expire time for key {key}: {e}")
exit(1)

def persist_logs(self, record_id: str) -> bool:
if record_id in self.cached_logs:
for row_id in self.cached_logs[record_id]:
try:
self.redis.rpush(record_id, row_id)
except Exception as e:
print_err(f"Failed pushing record {record_id}: {e}")
exit(1)
# Set expire date for the whole list only once, when it find the first entry ('init')
if row_id == 'init':
self.set_redis_expire(record_id)
del self.cached_logs[record_id]
return True
else:
return False

def persist_event(self, record_id: str) -> bool:
if record_id in self.cached_events:
try:
self.redis.set(record_id, '')
except Exception as e:
print_err(f"Failed setting record {record_id}: {e}")
exit(1)
self.set_redis_expire(record_id)
del self.cached_events[record_id]
return True
else:
return False

def can_skip_downloading_record(self, record_id: str) -> bool:
try:
does_exist = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)
if does_exist:
try:
return self.redis.llen(record_id) > 1
except Exception as e:
print_err(f"Failed checking len for record {record_id}: {e}")
exit(1)

return False

def retrieve_cached_message_list(self, record_id: str):
try:
cache_key_exists = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)

if cache_key_exists:
try:
cached_messages = self.redis.lrange(record_id, 0, -1)
except Exception as e:
print_err(f"Failed getting list range for record {record_id}: {e}")
exit(1)
return cached_messages
else:
self.cached_logs[record_id] = ['init']

return None

# Cache event
def check_cached_id(self, record_id: str):
try:
does_exist = self.redis.exists(record_id)
except Exception as e:
print_err(f"Failed checking record {record_id}: {e}")
exit(1)

if does_exist:
return True
else:
self.cached_events[record_id] = ''
return False

# Cache log
def record_or_skip_row(self, record_id: str, row: dict, cached_messages: dict) -> bool:
row_id = row["REQUEST_ID"]

if cached_messages is not None:
row_id_b = row_id.encode('utf-8')
if row_id_b in cached_messages:
return True
self.cached_logs[record_id].append(row_id)
else:
self.cached_logs[record_id].append(row_id)

return False


def make_cache(config: Config):
if config.get_bool(CONFIG_CACHE_ENABLED, DEFAULT_CACHE_ENABLED):
host = config.get(CONFIG_REDIS_HOST, DEFAULT_REDIS_HOST)
port = config.get_int(CONFIG_REDIS_PORT, DEFAULT_REDIS_PORT)
db = config.get_int(CONFIG_REDIS_DB_NUMBER, DEFAULT_REDIS_DB_NUMBER)
password = config.get(CONFIG_REDIS_PASSWORD)
ssl = config.get_bool(CONFIG_REDIS_USE_SSL, DEFAULT_REDIS_SSL)
expire_days = config.get_int(CONFIG_REDIS_EXPIRE_DAYS)
password_display = "XXXXXX" if password != None else None

print_info(
f'cache enabled, connecting to redis instance {host}:{port}:{db}, ssl={ssl}, password={password_display}'
)

return DataCache(redis.Redis(
host=host,
port=port,
db=db,
password=password,
ssl=ssl
), expire_days)

print_info('cache disabled')

return None
90 changes: 90 additions & 0 deletions src/newrelic_logging/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import re
from types import SimpleNamespace
from typing import Any


BOOL_TRUE_VALS = ['true', '1', 'on', 'yes']
NOT_FOUND = SimpleNamespace()


def _get_nested_helper(val: Any, arr: list[str] = [], index: int = 0) -> Any:
if index == len(arr):
return NOT_FOUND
elif type(val) is dict:
key = arr[index]
if index == len(arr) - 1:
return val[key] if key in val else NOT_FOUND
return _get_nested_helper(val[key], arr, index + 1) if key in val else NOT_FOUND
elif type(val) is list:
key = arr[index]
if type(key) is int and key >= 0:
if index == len(arr) - 1:
return val[key] if key < len(val) else NOT_FOUND
return _get_nested_helper(val[key], arr, index + 1) if key < len(val) else NOT_FOUND

return NOT_FOUND


def get_nested(d: dict, path: str) -> Any:
return _get_nested_helper(d, path.split('.'))


def getenv(var_name, default = None, prefix = ''):
return os.environ.get(prefix + var_name, default)


def tobool(s):
if s == None:
return False
elif type(s) == bool:
return s
elif type(s) == str:
if s.lower() in BOOL_TRUE_VALS:
return True
return False

return bool(s)


class Config:
def __init__(self, config: dict, prefix: str):
self.config = config
self.prefix = prefix

def __getitem__(self, key):
return self.config[key]

def __setitem__(self, key, value):
self.config[key] = value

def __len__(self):
return len(self.config)

def __contains__(self, key):
return key in self.config

def set_prefix(self, prefix: str) -> None:
self.prefix = prefix

def getenv(self, env_var_name: str, default = None) -> str:
return getenv(env_var_name, default, self.prefix)

def get(self, key: str, default = None) -> Any:
val = get_nested(self.config, key)
if not val == NOT_FOUND:
return val

env_var_name = re.sub(r'[^a-zA-Z0-9_]', '_', key.upper())
val = self.getenv(env_var_name, default)
if not val is False and not val is None:
return val

return default

def get_int(self, key: str, default = None) -> int:
val = self.get(key, default)
return int(val) if val else val

def get_bool(self, key: str, default = None) -> bool:
return tobool(self.get(key, default))
Loading
Loading