Skip to content

Commit

Permalink
Merge pull request #62 from supriyopaul/forwarder
Browse files Browse the repository at this point in the history
README.md for new changes
  • Loading branch information
supriyopaul authored Mar 10, 2018
2 parents 5ea523f + e9a7127 commit 5c8de8b
Show file tree
Hide file tree
Showing 12 changed files with 589 additions and 382 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ deploy:
skip_cleanup: true
api-key:
secure: Rxl45qbTHWIbOhst3PS60ETfW5wDByxp0xv4ZbtgRGe4SPvHtOLHRNGiajsQX37pgUFF9ALcCseY2cTk46jNEA1jOzFx4DDSKyH+Wu4H5F4M8JDBBlIsvsgezumLsYMqOL18caZA8J84N9UyuzgdPBDb0B0mMclRa9xRaxWncrUZgXwW9r3N2zU1LvGtd0Su4zLXXP6HC6mKHdOOaNSDONqaesx1njYTGr5fbWy7IXrjSg75wWCtHW1dKDPXmyyWZomwpmhURYfYXn/o9lRaXSDpLWx4xTsbJQdG9EiSPm5fLjfv9tZTxIF7jB0tTrOB63gGAgrLu0zC5Z5MJ1Y0+sbotI8eySI4w0GTffhi4WQjTTyO02vgPuSCm9JV5aW+YeNJtSncEgaVgsuUmZUiWdqMsvPG+bqOjh/i0eIkHr/v7cyf3HndFieZH9H3XdlEDtyr4SRExQSjG+be6mcGOJMWMrXervcW6kGP3pcX7EWgrFxnkz9lSgx/0meNMP4JDo8pZWg50b0xpni3zUcweTgCIeYUBd5aIKUvPaCqSHC1BAyZI5z3Cvdlq0tjCS726drQcV4OJNjrnmb301/K6MBbXhAsyhbkB1NpUZ0k0ZwmGxQ7iE4N1pod2BQbTPxjNUL1KNQJXFvjr9Clrw9Arqo6X9S9t//GP2DDl5Ke5KQ=
name: logagg-0.2.5
tag_name: 0.2.5
name: logagg-0.2.6
tag_name: 0.2.6
on:
branch: master
repo: deep-compute/logagg
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ WORKDIR /logagg
ADD . /logagg

RUN apt-get install python-pip -y

RUN pip install --upgrade pip
RUN pip install .

RUN easy_install https://github.com/deep-compute/pygtail/tarball/master/#egg=pygtail-0.6.1
Expand Down
587 changes: 329 additions & 258 deletions README.md

Large diffs are not rendered by default.

Empty file added _
Empty file.
2 changes: 2 additions & 0 deletions logagg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import util
from collector import LogCollector
from nsqsender import NSQSender
from forwarder import LogForwarder
from command import main
import formatters
import forwarders

84 changes: 42 additions & 42 deletions logagg/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,17 @@
from operator import attrgetter
import traceback

from logagg import util
from logagg.formatters import RawLog
from deeputil import AttrDict, keeprunning
from pygtail import Pygtail
from logagg import util
from logagg.formatters import RawLog

# TODO
'''
After a downtime of collector, pygtail is missing logs from rotational files
'''

class LogCollector(object):
'''
Instantiate LogCollector class with an object
>>> from logagg import LogCollector
>>> lc = LogCollector('~/Desktop/log_samples/access_new.log:logagg.formatters.nginx_access', \
'test_topic', 'localhost:4151', 10000, 30)
>>> lc.log
<deeputil.misc.Dummy object at 0x7fc902bb7c10>
>>> lc.nsqd_http_address
'localhost:4151'
>>> lc.nsq_max_depth
10000
'''
DESC = 'Collects the log information and sends to NSQTopic'

QUEUE_MAX_SIZE = 2000 # Maximum number of messages in in-mem queue
Expand All @@ -46,7 +33,21 @@ class LogCollector(object):
HOST = socket.gethostname()
HEARTBEAT_RESTART_INTERVAL = 30 # Wait time if heartbeat sending stops
#TODO check for structure in validate_log_format
STRUCT = ('id', 'timestamp', 'file', 'host', 'formatter', 'raw', 'type', 'level')

LOG_STRUCTURE = {
'id': basestring,
'timestamp': basestring,
'file' : basestring,
'host': basestring,
'formatter' : basestring,
'raw' : basestring,
'type' : basestring,
'level' : basestring,
'event' : basestring,
'data' : dict,
'error' : basestring,
'error_tb' : basestring,
}

def __init__(self, fpaths, nsq_sender,
heartbeat_interval, log=util.DUMMY_LOGGER):
Expand All @@ -61,11 +62,22 @@ def __init__(self, fpaths, nsq_sender,
self.formatters = {}
self.queue = Queue.Queue(maxsize=self.QUEUE_MAX_SIZE)

def _remove_redundancy(self, log):
for key in log:
if key in log and key in log['data']:
log[key] = log['data'].pop(key)
return log

def validate_log_format(self, log):
for key in log:
assert (key in self.LOG_STRUCTURE)
assert isinstance(log[key], self.LOG_STRUCTURE[key])

@keeprunning(LOG_FILE_POLL_INTERVAL, on_error=util.log_exception)
def collect_log_lines(self, log_file):
L = log_file
fpath = L['fpath']
self.log.debug('Tracking log file for log lines', fpath=fpath)
self.log.debug('tracking_file_for_log_lines', fpath=fpath)

freader = Pygtail(fpath)
for line_info in freader:
Expand All @@ -75,6 +87,7 @@ def collect_log_lines(self, log_file):
file=fpath,
host=self.HOST,
formatter=L['formatter'],
event='event',
raw=line,
timestamp=datetime.datetime.utcnow().isoformat(),
type='log',
Expand All @@ -90,6 +103,7 @@ def collect_log_lines(self, log_file):
_log = util.load_object(formatter)(raw_log)

log.update(_log)
log = self._remove_redundancy(log)
self.validate_log_format(log)
except (SystemExit, KeyboardInterrupt) as e: raise
except:
Expand All @@ -99,27 +113,13 @@ def collect_log_lines(self, log_file):

self.queue.put(dict(log=json.dumps(log),
freader=freader, line_info=line_info))
self.log.debug("TALLY: PUT into self.queue")
self.log.debug("TALLY:PUT_into_self.queue")

while not freader.is_fully_acknowledged():
t = self.PYGTAIL_ACK_WAIT_TIME
self.log.debug('Waiting for pygtail to fully ack', wait_time=t)
self.log.debug('waiting_for_pygtail_to_fully_ack', wait_time=t)
time.sleep(t)

def validate_log_format(self, log):
# FIXME: test level, type also
# FIXME: test value of level, type, timestamp
assert isinstance(log, dict)
assert isinstance(log['id'], basestring)
assert isinstance(log['data'], dict)
assert isinstance(log['timestamp'], basestring)
assert isinstance(log['file'], str)
assert isinstance(log['host'], basestring)
assert isinstance(log['formatter'], basestring)
assert isinstance(log['raw'], basestring)
assert isinstance(log['type'], basestring)
assert isinstance(log['level'], basestring)

def _get_msgs_from_queue(self, msgs, msgs_nbytes, timeout):
read_from_q = False
ts = time.time()
Expand All @@ -128,27 +128,27 @@ def _get_msgs_from_queue(self, msgs, msgs_nbytes, timeout):
try:
msg = self.queue.get(block=True, timeout=self.QUEUE_READ_TIMEOUT)
read_from_q = True
self.log.debug("TALLY: GET from self.queue")
self.log.debug("TALLY:GET_from_self.queue")

msgs.append(msg)
msgs_nbytes += len(msg['log'])

if msgs_nbytes > self.NBYTES_TO_SEND: # FIXME: class level const
self.log.debug('Msg bytes read from in-queue got exceeded')
self.log.debug('msg_bytes_read_inQueue_exceeded')
break
#FIXME condition never met
if time.time() - ts >= timeout and msgs:
self.log.debug('Msg reading timeout from in-queue got exceeded')
self.log.debug('msg_reading_timeout_from_inQueue_got_exceeded')
break
# TODO: What if a single log message itself is bigger than max bytes limit?
except Queue.Empty:
self.log.debug('QUEUE empty')
self.log.debug('queue_empty')
time.sleep(self.QUEUE_READ_TIMEOUT)
if not msgs:
continue
else:
return msgs, msgs_nbytes, read_from_q
self.log.debug('Got msgs from in-queue')
self.log.debug('got_msgs_from_inQueue')
return msgs, msgs_nbytes, read_from_q


Expand All @@ -171,13 +171,13 @@ def send_to_nsq(self, state):
is_max_time_elapsed = time_since_last_push >= self.MAX_SECONDS_TO_PUSH

should_push = len(msgs) > 0 and (is_max_time_elapsed or have_enough_msgs)
self.log.debug('desciding wheather to push', should_push=should_push)
self.log.debug('desciding_to_push', should_push=should_push)

try:
self.log.debug('trying to push to nsq', msgs_length=len(msgs))
self.log.debug('trying_to_push_to_nsq', msgs_length=len(msgs))
self.nsq_sender.handle_logs(msgs)
self.confirm_success(msgs)
self.log.debug('pushed to nsq', msgs_length=len(msgs))
self.log.debug('pushed_to_nsq', msgs_length=len(msgs))
msgs = []
state.last_push_ts = time.time()
except (SystemExit, KeyboardInterrupt): raise
Expand Down Expand Up @@ -220,7 +220,7 @@ def _scan_fpatterns(self, state):
if log_key not in self.log_reader_threads:
# There is no existing thread tracking this log file. Start one
self.log_reader_threads[log_key] = util.start_daemon_thread(self.collect_log_lines, (log_f,))
self.log.info('Started collect_log_lines thread ', log_key=log_key)
self.log.info('started_collect_log_lines_thread', log_key=log_key)
state.files_tracked.append(fpath)
time.sleep(self.SCAN_FPATTERNS_INTERVAL)

Expand Down
1 change: 1 addition & 0 deletions logagg/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _parse_forwarder_target_arg(self, t):
path, args = t.split(':', 1)
path = path.split('=')[1]
args = dict(a.split('=', 1) for a in args.split(':'))
args['log'] = self.log
return path, args

def forward(self):
Expand Down
Loading

0 comments on commit 5c8de8b

Please sign in to comment.