Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latest luigi #13

Open
wants to merge 27 commits into
base: real-latest-luigi
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
eb1132d
Load os.environ into the config singleton
hugobast Feb 24, 2017
2af8aef
Add the ability to add metrics based on certain scheduler events
thisiscab Oct 24, 2017
7d5318b
Add additional values to the task started event
thisiscab Oct 24, 2017
c85f876
Add an event to DataDog when a Luigi task fails
thisiscab Oct 25, 2017
f58b875
Add an event to DataDog when a task is flagged as disabled
thisiscab Oct 25, 2017
99e09f3
Add simple metrics to count tasks
thisiscab Nov 6, 2017
f7ff014
Add metrics and events when a task gets completed
thisiscab Nov 6, 2017
0960396
Add execution time of a task + bug fixes
thisiscab Nov 6, 2017
b2ab760
Add namespaced metrics
thisiscab Nov 6, 2017
1947aab
Add missing RPC method for datadog
thisiscab Nov 21, 2017
d4dc6da
Add support for params
thisiscab Nov 21, 2017
6e5e5a4
Fix duplicative event logging for done tasks
thisiscab Dec 13, 2017
a78e1c9
Set the scheduler to be optional for metrics logging
thisiscab Jan 4, 2018
625c9bc
Fix wrongly named event method name
thisiscab Jan 4, 2018
a633d0d
Fix minor linter issues
thisiscab Jan 4, 2018
3e5ccf5
Change metric name for execution time
thisiscab Jan 5, 2018
0e26149
Refactor the datadog metric class
thisiscab Jan 10, 2018
e3cc88c
Refactor the namespace of the metric
thisiscab Jan 10, 2018
a69d827
Fix invalid code path
thisiscab Jan 10, 2018
d7bf274
Add support for default environment flag
thisiscab Jan 23, 2018
e1c4a76
Remove unecessary IF clause for the environment ENV variable
thisiscab Jan 26, 2018
c895dcb
Fix the environment tag for DataDog events
thisiscab Jan 29, 2018
420c438
Luigi throws warning if param is None
thisiscab Jan 29, 2018
6d2b26b
Add the ability to configure statsd information in Luigi
thisiscab Jan 29, 2018
9bd57f3
Change the behavior of the default tags
thisiscab Feb 5, 2018
e2653e5
Change the DD tag from env to environment
thisiscab Feb 5, 2018
bb66ad3
Change statsd_port parameter type (#11)
cabouffard Apr 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion luigi/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,4 @@ def get_config():
"""
Convenience method (for backwards compatibility) for accessing config singleton.
"""
return LuigiConfigParser.instance()
return LuigiConfigParser.instance(os.environ)
117 changes: 117 additions & 0 deletions luigi/contrib/datadog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

from datadog import initialize, api, statsd


class datadog(Config):
api_key = parameter.Parameter(default='dummy_api_key')
app_key = parameter.Parameter(default='dummy_app_key')
default_tags = parameter.Parameter(default='application:luigi')
environment = parameter.Parameter(default='development', description='Environment of the pipeline')
metric_namespace = parameter.Parameter(default='luigi')
statsd_host = parameter.Parameter(default='localhost')
statsd_port = parameter.IntParameter(default=8125)


class DataDogMetricsCollector(MetricsCollector):
def __init__(self, *args, **kwargs):
super(DataDogMetricsCollector, self).__init__(*args, **kwargs)
self._config = datadog(**kwargs)

initialize(api_key=self._config.api_key,
app_key=self._config.app_key,
statsd_host=self._config.statsd_host,
statsd_port=self._config.statsd_port)

def handle_task_started(self, task):
title = "Luigi: A task has been started!"
text = "A task has been started in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.started', tags=tags)

event_tags = tags + ["task_state:STARTED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def handle_task_failed(self, task):
title = "Luigi: A task has failed!"
text = "A task has failed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.failed', tags=tags)

event_tags = tags + ["task_state:FAILED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_disabled(self, task, config):
title = "Luigi: A task has been disabled!"
text = """A task has been disabled in the pipeline named: {name}.
The task has failed {failures} times in the last {window}
seconds, so it is being disabled for {persist} seconds.""".format(
name=task.family,
persist=config.disable_persist,
failures=task.retry_policy.retry_count,
window=config.disable_window
)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

self.send_increment('task.disabled', tags=tags)

event_tags = tags + ["task_state:DISABLED"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')

def handle_task_done(self, task):
# The task is already done -- Let's not re-create an event
if task.time_running is None:
return

title = "Luigi: A task has been completed!"
text = "A task has completed in the pipeline named: {name}".format(name=task.family)
tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task)

time_elapse = task.updated - task.time_running

self.send_increment('task.done', tags=tags)
self.send_gauge('task.execution_time', time_elapse, tags=tags)

event_tags = tags + ["task_state:DONE"]
self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')

def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'):
all_tags = tags + self.default_tags()

api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority)

def send_gauge(self, metric_name, value, tags=[]):
all_tags = tags + self.default_tags()

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.gauge(namespaced_metric, value, tags=all_tags)

def send_increment(self, metric_name, value=1, tags=[]):
all_tags = tags + self.default_tags()

namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace,
metric_name=metric_name)
statsd.increment(namespaced_metric, value, tags=all_tags)

def _format_task_params_to_tags(self, task):
params = []
for key, value in task.params.items():
params.append("{key}:{value}".format(key=key, value=value))

return params

def default_tags(self):
default_tags = []

env_tag = "environment:{environment}".format(environment=self._config.environment)
default_tags.append(env_tag)

if self._config.default_tags:
default_tags = default_tags + str.split(self._config.default_tags, ',')

return default_tags
19 changes: 19 additions & 0 deletions luigi/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class MetricsCollector(object):
"""
Dummy MetricsCollecter base class that can be replace by tool specific
implementation
"""
def __init__(self, scheduler=None):
self._scheduler = scheduler

def handle_task_started(self, task):
pass

def handle_task_failed(self, task):
pass

def handle_task_disabled(self, task, config):
pass

def handle_task_done(self, task):
pass
44 changes: 44 additions & 0 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class scheduler(Config):
prune_on_get_work = parameter.BoolParameter(default=False)

pause_enabled = parameter.BoolParameter(default=True)
metrics_collection = parameter.Parameter(default=None)

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)
Expand Down Expand Up @@ -434,6 +435,7 @@ def __init__(self, state_path):
self._status_tasks = collections.defaultdict(dict)
self._active_workers = {} # map from id to a Worker object
self._task_batchers = {}
self._metrics_collector = None

def get_state(self):
return self._tasks, self._active_workers, self._task_batchers
Expand Down Expand Up @@ -552,9 +554,11 @@ def set_status(self, task, new_status, config=None):

if new_status == FAILED and task.status != DISABLED:
task.add_failure()
self.update_metrics_task_failed(task)
if task.has_excessive_failures():
task.scheduler_disable_time = time.time()
new_status = DISABLED
self.update_metrics_task_disabled(task, config)
if not config.batch_emails:
notifications.send_error_email(
'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id),
Expand All @@ -574,6 +578,9 @@ def set_status(self, task, new_status, config=None):
task.status = new_status
task.updated = time.time()

if new_status == DONE:
self.update_metrics_task_done(task)

if new_status == FAILED:
task.retry = time.time() + config.retry_delay
if remove_on_failure:
Expand Down Expand Up @@ -656,8 +663,21 @@ def disable_workers(self, worker_ids):
worker.disabled = True
worker.tasks.clear()

def update_metrics_task_started(self, task):
self._metrics_collector.handle_task_started(task)

def update_metrics_task_disabled(self, task, config):
self._metrics_collector.handle_task_disabled(task, config)

def update_metrics_task_failed(self, task):
self._metrics_collector.handle_task_failed(task)

def update_metrics_task_done(self, task):
self._metrics_collector.handle_task_done(task)


class Scheduler(object):

"""
Async scheduler that can handle multiple workers, etc.

Expand Down Expand Up @@ -689,6 +709,13 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs
if self._config.batch_emails:
self._email_batcher = BatchNotifier()

if self._config.metrics_collection == 'datadog':
import luigi.contrib.datadog as datadog
self._state._metrics_collector = datadog.DataDogMetricsCollector(self)
else:
from luigi.metrics import MetricsCollector
self._state._metrics_collector = MetricsCollector(self)

def load(self):
self._state.load()

Expand Down Expand Up @@ -1186,6 +1213,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None,
reply['batch_task_ids'] = [task.id for task in batched_tasks]

elif best_task:
self.update_metrics_task_started(best_task)
self._state.set_status(best_task, RUNNING, self._config)
best_task.worker_running = worker_id
best_task.resources_running = best_task.resources.copy()
Expand Down Expand Up @@ -1576,3 +1604,19 @@ def _update_task_history(self, task, status, host=None):
def task_history(self):
# Used by server.py to expose the calls
return self._task_history

@rpc_method()
def update_metrics_task_started(self, task):
self._state.update_metrics_task_started(task)

@rpc_method()
def update_metrics_task_disabled(self, task):
self._state.update_metrics_task_disabled(task)

@rpc_method()
def update_metrics_task_failed(self, task):
self._state.update_metrics_task_failed(task)

@rpc_method()
def update_metrics_task_done(self, task):
self._state.update_metrics_task_done(task)