From eb1132d5c2a0345ba646bf6d532db900cb63fea3 Mon Sep 17 00:00:00 2001 From: Hugo Bastien Date: Thu, 23 Feb 2017 22:29:06 -0500 Subject: [PATCH 01/27] Load os.environ into the config singleton I think it would be neat if luigi.cfg could interpolate some environment variables. I think this is the minimum required amount of work to make this happen but I'm looking for guidance. --- luigi/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/configuration.py b/luigi/configuration.py index 95e8a4c7c4..64244616ed 100644 --- a/luigi/configuration.py +++ b/luigi/configuration.py @@ -128,4 +128,4 @@ def get_config(): """ Convenience method (for backwards compatibility) for accessing config singleton. """ - return LuigiConfigParser.instance() + return LuigiConfigParser.instance(os.environ) From 2af8aef75c41ef6f817316e2e66282de617f2391 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 24 Oct 2017 12:58:42 -0400 Subject: [PATCH 02/27] Add the ability to add metrics based on certain scheduler events We're currently integration our solutions with DataDog. We wanted to integrate that ability to send metrics to that service from our Pipeline. Doing such, will allow us to monitor the status of our Pipeline by looking at statistics based on metrics sent by the pipeline. At this moment, there is only one event that's supported but as the feature progress forward, it's easy to see that we could support a bunch more. I had an implementation that was fairly basic at first but after navigating on the existing PR against the official Luigi repo I've discovered that there an ongoing implementation of exactly what we were trying to achieve but with similar service. Thanks to chrispalmer, I've been able to re-use his original work to implement ours. https://github.com/spotify/luigi/pull/2044 --- luigi/contrib/datadog.py | 26 ++++++++++++++++++++++++++ luigi/metrics.py | 10 ++++++++++ luigi/scheduler.py | 19 +++++++++++++++++++ 3 files changed, 55 insertions(+) create mode 100644 luigi/contrib/datadog.py create mode 100644 luigi/metrics.py diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py new file mode 100644 index 0000000000..6287d5ede9 --- /dev/null +++ b/luigi/contrib/datadog.py @@ -0,0 +1,26 @@ +from luigi import parameter +from luigi.metrics import MetricsCollector +from luigi.task import Config + +from datadog import initialize, api + + +class datadog(Config): + api_key = parameter.Parameter(default='dummy_api_key') + app_key = parameter.Parameter(default='dummy_app_key') + + +class DataDogMetricsCollector(MetricsCollector): + def __init__(self, *args, **kwargs): + super(DataDogMetricsCollector, self).__init__(*args, **kwargs) + self._config = datadog(**kwargs) + initialize(api_key=self._config.api_key, app_key=self._config.app_key) + + def handle_task_started(self, task): + self._add_event(task.family) + + def _add_event(self, task_family): + title = "Did this work? -- I hope so!" + text = 'And let me tell you all about it here!' + tags = ['dw-test', 'version:1', 'application:web'] + api.Event.create(title=title, text=text, tags=tags) diff --git a/luigi/metrics.py b/luigi/metrics.py new file mode 100644 index 0000000000..d975e1b79f --- /dev/null +++ b/luigi/metrics.py @@ -0,0 +1,10 @@ +class MetricsCollector(object): + """ + Dummy MetricsCollecter base class that can be replace by tool specific + implementation + """ + def __init__(self, scheduler): + self._scheduler = scheduler + + def handle_task_started(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 405507028b..755e1de8a2 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -148,6 +148,7 @@ class scheduler(Config): prune_on_get_work = parameter.BoolParameter(default=False) pause_enabled = parameter.BoolParameter(default=True) + metrics_collection = parameter.Parameter(default=None) def _get_retry_policy(self): return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window) @@ -434,6 +435,7 @@ def __init__(self, state_path): self._status_tasks = collections.defaultdict(dict) self._active_workers = {} # map from id to a Worker object self._task_batchers = {} + self._metrics_collector = None def get_state(self): return self._tasks, self._active_workers, self._task_batchers @@ -569,6 +571,7 @@ def set_status(self, task, new_status, config=None): task.scheduler_disable_time = None if new_status != task.status: + # cab - maybe here self._status_tasks[task.status].pop(task.id) self._status_tasks[new_status][task.id] = task task.status = new_status @@ -656,8 +659,12 @@ def disable_workers(self, worker_ids): worker.disabled = True worker.tasks.clear() + def update_metrics_task_started(self, task): + self._metrics_collector.handle_task_started(task) + class Scheduler(object): + """ Async scheduler that can handle multiple workers, etc. @@ -689,6 +696,13 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs if self._config.batch_emails: self._email_batcher = BatchNotifier() + if self._config.metrics_collection == 'datadog': + import luigi.contrib.datadog as datadog + self._state._metrics_collector = datadog.DataDogMetricsCollector(self) + else: + from luigi.metrics import MetricsCollector + self._state._metrics_collector = MetricsCollector(self) + def load(self): self._state.load() @@ -1186,6 +1200,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, reply['batch_task_ids'] = [task.id for task in batched_tasks] elif best_task: + self.update_metrics_task_started(best_task) self._state.set_status(best_task, RUNNING, self._config) best_task.worker_running = worker_id best_task.resources_running = best_task.resources.copy() @@ -1576,3 +1591,7 @@ def _update_task_history(self, task, status, host=None): def task_history(self): # Used by server.py to expose the calls return self._task_history + + @rpc_method() + def update_metrics_task_started(self, task): + self._state.update_metrics_task_started(task) From 7d5318b1c94ef712c6f21aca0ead79c6b3e55d1f Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 24 Oct 2017 16:14:34 -0400 Subject: [PATCH 03/27] Add additional values to the task started event Added more significant name and values to the event data. --- luigi/contrib/datadog.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 6287d5ede9..9c141bc899 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -8,6 +8,7 @@ class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key') app_key = parameter.Parameter(default='dummy_app_key') + default_event_tags = parameter.Parameter(default=None) class DataDogMetricsCollector(MetricsCollector): @@ -17,10 +18,27 @@ def __init__(self, *args, **kwargs): initialize(api_key=self._config.api_key, app_key=self._config.app_key) def handle_task_started(self, task): - self._add_event(task.family) + title = "Luigi: A task has been started!" + text = "A task has been started in the Pipeline named: {name}".format(name=task.family) + tags = ["task_state:STARTED", + "task_name:{name}".format(name=task.family)] - def _add_event(self, task_family): - title = "Did this work? -- I hope so!" - text = 'And let me tell you all about it here!' - tags = ['dw-test', 'version:1', 'application:web'] - api.Event.create(title=title, text=text, tags=tags) + self._add_event(title=title, text=text, + tags=tags, alert_type='info', + priority='low') + + def _add_event(self, + title=None, text=None, + tags=[], alert_type='info', + priority='normal'): + + all_tags = tags + self.default_event_tags() + api.Event.create(title=title, text=text, + tags=all_tags, alert_type=alert_type, + priority=priority) + + def default_event_tags(self): + if not self._config.default_event_tags: + return [] + + return str.split(self._config.default_event_tags, ',') From c85f876dc07859d49f8a44d5c84cc4abf861baf5 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 25 Oct 2017 10:50:48 -0400 Subject: [PATCH 04/27] Add an event to DataDog when a Luigi task fails This will add an event of type "error" in DataDog telling us that a task has failed running. --- luigi/contrib/datadog.py | 12 +++++++++++- luigi/metrics.py | 3 +++ luigi/scheduler.py | 5 ++++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 9c141bc899..785b42e086 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -19,7 +19,7 @@ def __init__(self, *args, **kwargs): def handle_task_started(self, task): title = "Luigi: A task has been started!" - text = "A task has been started in the Pipeline named: {name}".format(name=task.family) + text = "A task has been started in the pipeline named: {name}".format(name=task.family) tags = ["task_state:STARTED", "task_name:{name}".format(name=task.family)] @@ -27,6 +27,16 @@ def handle_task_started(self, task): tags=tags, alert_type='info', priority='low') + def handle_task_failed(self, task): + title = "Luigi: A task has failed!" + text = "A task has failed in the pipeline named: {name}".format(name=task.family) + tags = ["task_state:FAILED", + "task_name:{name}".format(name=task.family)] + + self._add_event(title=title, text=text, + tags=tags, alert_type='error', + priority='normal') + def _add_event(self, title=None, text=None, tags=[], alert_type='info', diff --git a/luigi/metrics.py b/luigi/metrics.py index d975e1b79f..420084f1f1 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -8,3 +8,6 @@ def __init__(self, scheduler): def handle_task_started(self, task): pass + + def handle_task_failed(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 755e1de8a2..fef7ee212d 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -554,6 +554,7 @@ def set_status(self, task, new_status, config=None): if new_status == FAILED and task.status != DISABLED: task.add_failure() + self.update_metrics_task_failed(task) if task.has_excessive_failures(): task.scheduler_disable_time = time.time() new_status = DISABLED @@ -571,7 +572,6 @@ def set_status(self, task, new_status, config=None): task.scheduler_disable_time = None if new_status != task.status: - # cab - maybe here self._status_tasks[task.status].pop(task.id) self._status_tasks[new_status][task.id] = task task.status = new_status @@ -662,6 +662,9 @@ def disable_workers(self, worker_ids): def update_metrics_task_started(self, task): self._metrics_collector.handle_task_started(task) + def update_metrics_task_failed(self, task): + self._metrics_collector.handle_task_failed(task) + class Scheduler(object): From f58b8750800678968363c785bd4f969d1109dd1b Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 25 Oct 2017 11:12:36 -0400 Subject: [PATCH 05/27] Add an event to DataDog when a task is flagged as disabled When a task is disabled, it means that it has failed multiple times given a certain window. That type of event is interesting to know about so implementing that into DataDog will allow us to permanently log that information. --- luigi/contrib/datadog.py | 18 ++++++++++++++++++ luigi/metrics.py | 3 +++ luigi/scheduler.py | 1 + 3 files changed, 22 insertions(+) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 785b42e086..49e3c70c09 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -37,6 +37,24 @@ def handle_task_failed(self, task): tags=tags, alert_type='error', priority='normal') + def handle_task_disabled(self, task, config): + title = "Luigi: A task has been disabled!" + text = """A task has been disabled in the pipeline named: {name}. + The task has failed {failures} times in the last {window} + seconds, so it is being disabled for {persist} seconds.""".format( + name=task.family, + persist=config.disable_persist, + failures=task.retry_policy.retry_count, + window=config.disable_window + ) + + tags = ["task_state:DISABLED", + "task_name:{name}".format(name=task.family)] + + self._add_event(title=title, text=text, + tags=tags, alert_type='error', + priority='normal') + def _add_event(self, title=None, text=None, tags=[], alert_type='info', diff --git a/luigi/metrics.py b/luigi/metrics.py index 420084f1f1..46eff143f6 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -11,3 +11,6 @@ def handle_task_started(self, task): def handle_task_failed(self, task): pass + + def handle_task_disabled(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index fef7ee212d..1810a30a02 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -558,6 +558,7 @@ def set_status(self, task, new_status, config=None): if task.has_excessive_failures(): task.scheduler_disable_time = time.time() new_status = DISABLED + self.update_metrics_task_disabled(task, config) if not config.batch_emails: notifications.send_error_email( 'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id), From 99e09f37e5ab0784fb72e180d949b558d9d0a8ea Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 6 Nov 2017 12:46:49 -0500 Subject: [PATCH 06/27] Add simple metrics to count tasks This will let us keep track on how many tasks has been started, failed, disabled so that we can see a graphic and alert if things goes wrong. --- luigi/contrib/datadog.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 49e3c70c09..5c080c16fd 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -2,7 +2,7 @@ from luigi.metrics import MetricsCollector from luigi.task import Config -from datadog import initialize, api +from datadog import initialize, statsd class datadog(Config): @@ -23,6 +23,7 @@ def handle_task_started(self, task): tags = ["task_state:STARTED", "task_name:{name}".format(name=task.family)] + statsd.increment('thwomper.task.started') self._add_event(title=title, text=text, tags=tags, alert_type='info', priority='low') @@ -33,6 +34,7 @@ def handle_task_failed(self, task): tags = ["task_state:FAILED", "task_name:{name}".format(name=task.family)] + statsd.increment('thwomper.task.failed') self._add_event(title=title, text=text, tags=tags, alert_type='error', priority='normal') @@ -51,6 +53,7 @@ def handle_task_disabled(self, task, config): tags = ["task_state:DISABLED", "task_name:{name}".format(name=task.family)] + statsd.increment('thwomper.task.disabled') self._add_event(title=title, text=text, tags=tags, alert_type='error', priority='normal') From f7ff014c0d87866a32f077b8c57001a17a1f295e Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 6 Nov 2017 13:03:34 -0500 Subject: [PATCH 07/27] Add metrics and events when a task gets completed --- luigi/contrib/datadog.py | 13 ++++++++++++- luigi/metrics.py | 3 +++ luigi/scheduler.py | 6 ++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 5c080c16fd..bce5191eaa 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -2,7 +2,7 @@ from luigi.metrics import MetricsCollector from luigi.task import Config -from datadog import initialize, statsd +from datadog import initialize, api, statsd class datadog(Config): @@ -58,6 +58,17 @@ def handle_task_disabled(self, task, config): tags=tags, alert_type='error', priority='normal') + def handle_task_done(self, task): + title = "Luigi: A task has been completed!" + text = "A task has completed in the pipeline named: {name}".format(name=task.family) + tags = ["task_state:DONE", + "task_name:{name}".format(name=task.family)] + + statsd.increment('thwomper.task.done') + self._add_event(title=title, text=text, + tags=tags, alert_type='info', + priority='low') + def _add_event(self, title=None, text=None, tags=[], alert_type='info', diff --git a/luigi/metrics.py b/luigi/metrics.py index 46eff143f6..85b183ebb3 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -14,3 +14,6 @@ def handle_task_failed(self, task): def handle_task_disabled(self, task): pass + + def handle_task_done(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 1810a30a02..540c31d08c 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -576,6 +576,9 @@ def set_status(self, task, new_status, config=None): self._status_tasks[task.status].pop(task.id) self._status_tasks[new_status][task.id] = task task.status = new_status + if new_status == DONE: + self.update_metrics_task_done(task, config) + task.updated = time.time() if new_status == FAILED: @@ -666,6 +669,9 @@ def update_metrics_task_started(self, task): def update_metrics_task_failed(self, task): self._metrics_collector.handle_task_failed(task) + def update_metrics_task_done(self, task): + self._metrics_collector.handle_task_done(task) + class Scheduler(object): From 096039607e7bc0fa6fd8ca7a1a63a376e481204b Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 6 Nov 2017 15:03:03 -0500 Subject: [PATCH 08/27] Add execution time of a task + bug fixes --- luigi/contrib/datadog.py | 2 ++ luigi/metrics.py | 2 +- luigi/scheduler.py | 8 ++++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index bce5191eaa..9a861ff7d7 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -65,6 +65,8 @@ def handle_task_done(self, task): "task_name:{name}".format(name=task.family)] statsd.increment('thwomper.task.done') + time_elapse = task.updated - task.time_running + statsd.gauge('thwomper.{name}.execution_time'.format(name=task.family), time_elapse) self._add_event(title=title, text=text, tags=tags, alert_type='info', priority='low') diff --git a/luigi/metrics.py b/luigi/metrics.py index 85b183ebb3..ea1ca96253 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -12,7 +12,7 @@ def handle_task_started(self, task): def handle_task_failed(self, task): pass - def handle_task_disabled(self, task): + def handle_task_disabled(self, task, config): pass def handle_task_done(self, task): diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 540c31d08c..8d1ec7b0c8 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -576,10 +576,11 @@ def set_status(self, task, new_status, config=None): self._status_tasks[task.status].pop(task.id) self._status_tasks[new_status][task.id] = task task.status = new_status + task.updated = time.time() + if new_status == DONE: - self.update_metrics_task_done(task, config) + self.update_metrics_task_done(task) - task.updated = time.time() if new_status == FAILED: task.retry = time.time() + config.retry_delay @@ -666,6 +667,9 @@ def disable_workers(self, worker_ids): def update_metrics_task_started(self, task): self._metrics_collector.handle_task_started(task) + def update_metrics_task_disabled(self, task, config): + self._metrics_collector.handle_task_started(task, config) + def update_metrics_task_failed(self, task): self._metrics_collector.handle_task_failed(task) From b2ab7600b1e2e78e35602f34d4e0f6b4a7d0cb1d Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 6 Nov 2017 15:20:02 -0500 Subject: [PATCH 09/27] Add namespaced metrics Some users may want to namespace their metrics differently. We're allowing that to be configured in the Luigi configuration file. --- luigi/contrib/datadog.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 9a861ff7d7..0ebfd6a7a1 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -9,6 +9,7 @@ class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key') app_key = parameter.Parameter(default='dummy_app_key') default_event_tags = parameter.Parameter(default=None) + metric_namespace = parameter.Parameter(default='luigi') class DataDogMetricsCollector(MetricsCollector): @@ -23,7 +24,8 @@ def handle_task_started(self, task): tags = ["task_state:STARTED", "task_name:{name}".format(name=task.family)] - statsd.increment('thwomper.task.started') + import pdb; pdb.set_trace() # XXX - debug + statsd.increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, tags=tags, alert_type='info', priority='low') @@ -34,7 +36,7 @@ def handle_task_failed(self, task): tags = ["task_state:FAILED", "task_name:{name}".format(name=task.family)] - statsd.increment('thwomper.task.failed') + statsd.increment('{namespace}.task.failed'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, tags=tags, alert_type='error', priority='normal') @@ -53,7 +55,7 @@ def handle_task_disabled(self, task, config): tags = ["task_state:DISABLED", "task_name:{name}".format(name=task.family)] - statsd.increment('thwomper.task.disabled') + statsd.increment('{namespace}.task.disabled'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, tags=tags, alert_type='error', priority='normal') @@ -64,9 +66,12 @@ def handle_task_done(self, task): tags = ["task_state:DONE", "task_name:{name}".format(name=task.family)] - statsd.increment('thwomper.task.done') time_elapse = task.updated - task.time_running - statsd.gauge('thwomper.{name}.execution_time'.format(name=task.family), time_elapse) + + statsd.increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace)) + statsd.gauge('{namespace}.{name}.execution_time'.format( + namespace=self._config.metric_namespace, + name=task.family), time_elapse) self._add_event(title=title, text=text, tags=tags, alert_type='info', priority='low') From 1947aab6f9fd1e0d30ff8df4f2cfba31ae7f9a81 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 21 Nov 2017 10:00:30 -0500 Subject: [PATCH 10/27] Add missing RPC method for datadog --- luigi/scheduler.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 8d1ec7b0c8..f36bc08bf7 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -1609,3 +1609,15 @@ def task_history(self): @rpc_method() def update_metrics_task_started(self, task): self._state.update_metrics_task_started(task) + + @rpc_method() + def update_metrics_task_disabled(self, task): + self._state.update_metrics_task_disabled(task) + + @rpc_method() + def update_metrics_task_failed(self, task): + self._state.update_metrics_task_failed(task) + + @rpc_method() + def update_metrics_task_done(self, task): + self._state.update_metrics_task_done(task) From d4dc6da4a0c57a04b3252da2655437a1c4db5aac Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 21 Nov 2017 10:24:47 -0500 Subject: [PATCH 11/27] Add support for params Task parameters will now be displayed as tags in DataDog. --- luigi/contrib/datadog.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 0ebfd6a7a1..9b37cb182f 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -23,8 +23,9 @@ def handle_task_started(self, task): text = "A task has been started in the pipeline named: {name}".format(name=task.family) tags = ["task_state:STARTED", "task_name:{name}".format(name=task.family)] + tags = tags + self._format_task_params_to_tags(task) + - import pdb; pdb.set_trace() # XXX - debug statsd.increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, tags=tags, alert_type='info', @@ -35,6 +36,7 @@ def handle_task_failed(self, task): text = "A task has failed in the pipeline named: {name}".format(name=task.family) tags = ["task_state:FAILED", "task_name:{name}".format(name=task.family)] + tags = tags + self._format_task_params_to_tags(task) statsd.increment('{namespace}.task.failed'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, @@ -54,6 +56,7 @@ def handle_task_disabled(self, task, config): tags = ["task_state:DISABLED", "task_name:{name}".format(name=task.family)] + tags = tags + self._format_task_params_to_tags(task) statsd.increment('{namespace}.task.disabled'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, @@ -65,6 +68,7 @@ def handle_task_done(self, task): text = "A task has completed in the pipeline named: {name}".format(name=task.family) tags = ["task_state:DONE", "task_name:{name}".format(name=task.family)] + tags = tags + self._format_task_params_to_tags(task) time_elapse = task.updated - task.time_running @@ -86,6 +90,13 @@ def _add_event(self, tags=all_tags, alert_type=alert_type, priority=priority) + def _format_task_params_to_tags(self, task): + params = [] + for key, value in task.params.items(): + params.append("{key}:{value}".format(key=key, value=value)) + + return params + def default_event_tags(self): if not self._config.default_event_tags: return [] From 6e5e5a4602179f82b5b819bcb452d6454ae35aa3 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 13 Dec 2017 15:50:47 -0500 Subject: [PATCH 12/27] Fix duplicative event logging for done tasks Fix issue were already completed task would trigger another event. We're hooking ourselves at a point where if the task was completed in previous time, it would still call our DataDog tracking event. When that case happen, we don't want to log an event again because we've already logged it. --- luigi/contrib/datadog.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 9b37cb182f..26b9bd3392 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -70,6 +70,10 @@ def handle_task_done(self, task): "task_name:{name}".format(name=task.family)] tags = tags + self._format_task_params_to_tags(task) + # The task is already done -- Let's not re-create an event + if task.time_running is None: + return + time_elapse = task.updated - task.time_running statsd.increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace)) From a78e1c9ff700a4209a2915a8e341663eee2cf16a Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Thu, 4 Jan 2018 15:31:23 -0500 Subject: [PATCH 13/27] Set the scheduler to be optional for metrics logging --- luigi/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/metrics.py b/luigi/metrics.py index ea1ca96253..ea5cb00fa3 100644 --- a/luigi/metrics.py +++ b/luigi/metrics.py @@ -3,7 +3,7 @@ class MetricsCollector(object): Dummy MetricsCollecter base class that can be replace by tool specific implementation """ - def __init__(self, scheduler): + def __init__(self, scheduler=None): self._scheduler = scheduler def handle_task_started(self, task): From 625c9bcc4f28a107954f65bf0abc3f50f4842f8d Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Thu, 4 Jan 2018 15:32:32 -0500 Subject: [PATCH 14/27] Fix wrongly named event method name We were calling the wrong event for disabled tasks event. This would cause problems since the method signature is different and thus crashing the task. --- luigi/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/scheduler.py b/luigi/scheduler.py index f36bc08bf7..a491a8b891 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -668,7 +668,7 @@ def update_metrics_task_started(self, task): self._metrics_collector.handle_task_started(task) def update_metrics_task_disabled(self, task, config): - self._metrics_collector.handle_task_started(task, config) + self._metrics_collector.handle_task_disabled(task, config) def update_metrics_task_failed(self, task): self._metrics_collector.handle_task_failed(task) From a633d0dcb96637d1eae581334f6c48cca7d69805 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Thu, 4 Jan 2018 15:38:00 -0500 Subject: [PATCH 15/27] Fix minor linter issues Double empty lines --- luigi/contrib/datadog.py | 1 - luigi/scheduler.py | 1 - 2 files changed, 2 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 26b9bd3392..04b1a04358 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -25,7 +25,6 @@ def handle_task_started(self, task): "task_name:{name}".format(name=task.family)] tags = tags + self._format_task_params_to_tags(task) - statsd.increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace)) self._add_event(title=title, text=text, tags=tags, alert_type='info', diff --git a/luigi/scheduler.py b/luigi/scheduler.py index a491a8b891..846161f542 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -581,7 +581,6 @@ def set_status(self, task, new_status, config=None): if new_status == DONE: self.update_metrics_task_done(task) - if new_status == FAILED: task.retry = time.time() + config.retry_delay if remove_on_failure: From 3e5ccf59385ffd03083b72439477b5f4fca67cfd Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Fri, 5 Jan 2018 12:26:13 -0500 Subject: [PATCH 16/27] Change metric name for execution time The name of the task shouldn't have a dimension into it, it should be put into the tags section. --- luigi/contrib/datadog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 04b1a04358..8d465e0a2f 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -76,7 +76,7 @@ def handle_task_done(self, task): time_elapse = task.updated - task.time_running statsd.increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace)) - statsd.gauge('{namespace}.{name}.execution_time'.format( + statsd.gauge('{namespace}.task.execution_time'.format( namespace=self._config.metric_namespace, name=task.family), time_elapse) self._add_event(title=title, text=text, From 0e261490c78aa768863b39c68f417e937d08f2c6 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 10 Jan 2018 15:47:57 -0500 Subject: [PATCH 17/27] Refactor the datadog metric class We can now improve the implementation quite a bit by being aware that metrics can also have tags. [ch8202] [ch8215] --- luigi/contrib/datadog.py | 77 +++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 8d465e0a2f..d96bcac3a5 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -21,26 +21,22 @@ def __init__(self, *args, **kwargs): def handle_task_started(self, task): title = "Luigi: A task has been started!" text = "A task has been started in the pipeline named: {name}".format(name=task.family) - tags = ["task_state:STARTED", - "task_name:{name}".format(name=task.family)] - tags = tags + self._format_task_params_to_tags(task) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) - statsd.increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace)) - self._add_event(title=title, text=text, - tags=tags, alert_type='info', - priority='low') + self.send_increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace), tags=tags) + + event_tags = tags + ["task_state:STARTED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') def handle_task_failed(self, task): title = "Luigi: A task has failed!" text = "A task has failed in the pipeline named: {name}".format(name=task.family) - tags = ["task_state:FAILED", - "task_name:{name}".format(name=task.family)] - tags = tags + self._format_task_params_to_tags(task) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('{namespace}.task.failed'.format(namespace=self._config.metric_namespace), tags=tags) - statsd.increment('{namespace}.task.failed'.format(namespace=self._config.metric_namespace)) - self._add_event(title=title, text=text, - tags=tags, alert_type='error', - priority='normal') + event_tags = tags + ["task_state:FAILED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') def handle_task_disabled(self, task, config): title = "Luigi: A task has been disabled!" @@ -52,46 +48,45 @@ def handle_task_disabled(self, task, config): failures=task.retry_policy.retry_count, window=config.disable_window ) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) - tags = ["task_state:DISABLED", - "task_name:{name}".format(name=task.family)] - tags = tags + self._format_task_params_to_tags(task) + self.send_increment('{namespace}.task.disabled'.format(namespace=self._config.metric_namespace), tags=tags) - statsd.increment('{namespace}.task.disabled'.format(namespace=self._config.metric_namespace)) - self._add_event(title=title, text=text, - tags=tags, alert_type='error', - priority='normal') + event_tags = tags + ["task_state:DISABLED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') def handle_task_done(self, task): - title = "Luigi: A task has been completed!" - text = "A task has completed in the pipeline named: {name}".format(name=task.family) - tags = ["task_state:DONE", - "task_name:{name}".format(name=task.family)] - tags = tags + self._format_task_params_to_tags(task) - # The task is already done -- Let's not re-create an event if task.time_running is None: return + title = "Luigi: A task has been completed!" + text = "A task has completed in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + time_elapse = task.updated - task.time_running - statsd.increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace)) - statsd.gauge('{namespace}.task.execution_time'.format( - namespace=self._config.metric_namespace, - name=task.family), time_elapse) - self._add_event(title=title, text=text, - tags=tags, alert_type='info', - priority='low') + self.send_increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace), tags=tags) + self.send_gauge('{namespace}.task.execution_time'.format(namespace=self._config.metric_namespace), time_elapse, tags=tags) + + event_tags = tags + ["task_state:DONE"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') - def _add_event(self, - title=None, text=None, - tags=[], alert_type='info', - priority='normal'): + def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'): + basic_tags = + self.default_event_tags() + all_tags = tags + basic_tags + api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority) + + def send_gauge(self, metric_name, value, tags=[]): all_tags = tags + self.default_event_tags() - api.Event.create(title=title, text=text, - tags=all_tags, alert_type=alert_type, - priority=priority) + + statsd.gauge(metric_name, value, tags=all_tags) + + def send_increment(self, metric_name, value=1, tags=[]): + all_tags = tags + self.default_event_tags() + + statsd.increment(metric_name, value, tags=all_tags) def _format_task_params_to_tags(self, task): params = [] From e3cc88c73feb647cdb9dffb6383623d239870d6f Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 10 Jan 2018 16:03:02 -0500 Subject: [PATCH 18/27] Refactor the namespace of the metric This removes code duplication and reads much better. --- luigi/contrib/datadog.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index d96bcac3a5..43e541ee21 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -23,7 +23,7 @@ def handle_task_started(self, task): text = "A task has been started in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) - self.send_increment('{namespace}.task.started'.format(namespace=self._config.metric_namespace), tags=tags) + self.send_increment('task.started', tags=tags) event_tags = tags + ["task_state:STARTED"] self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') @@ -33,7 +33,7 @@ def handle_task_failed(self, task): text = "A task has failed in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) - self.send_increment('{namespace}.task.failed'.format(namespace=self._config.metric_namespace), tags=tags) + self.send_increment('task.failed', tags=tags) event_tags = tags + ["task_state:FAILED"] self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') @@ -50,7 +50,7 @@ def handle_task_disabled(self, task, config): ) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) - self.send_increment('{namespace}.task.disabled'.format(namespace=self._config.metric_namespace), tags=tags) + self.send_increment('task.disabled', tags=tags) event_tags = tags + ["task_state:DISABLED"] self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') @@ -66,8 +66,8 @@ def handle_task_done(self, task): time_elapse = task.updated - task.time_running - self.send_increment('{namespace}.task.done'.format(namespace=self._config.metric_namespace), tags=tags) - self.send_gauge('{namespace}.task.execution_time'.format(namespace=self._config.metric_namespace), time_elapse, tags=tags) + self.send_increment('task.done', tags=tags) + self.send_gauge('task.execution_time', time_elapse, tags=tags) event_tags = tags + ["task_state:DONE"] self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') @@ -81,12 +81,16 @@ def send_event(self, title=None, text=None, tags=[], alert_type='info', priority def send_gauge(self, metric_name, value, tags=[]): all_tags = tags + self.default_event_tags() - statsd.gauge(metric_name, value, tags=all_tags) + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.gauge(namespaced_metric, value, tags=all_tags) def send_increment(self, metric_name, value=1, tags=[]): all_tags = tags + self.default_event_tags() - statsd.increment(metric_name, value, tags=all_tags) + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.increment(namespaced_metric, value, tags=all_tags) def _format_task_params_to_tags(self, task): params = [] From a69d82783239535d1d20448800942d30d68d25b9 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Wed, 10 Jan 2018 16:04:52 -0500 Subject: [PATCH 19/27] Fix invalid code path --- luigi/contrib/datadog.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 43e541ee21..bbd010b813 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -73,8 +73,7 @@ def handle_task_done(self, task): self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'): - basic_tags = + self.default_event_tags() - all_tags = tags + basic_tags + all_tags = tags + self.default_event_tags() api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority) From d7bf27498b9d76ceb2cb7461f343a12fa20b5390 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Tue, 23 Jan 2018 13:13:14 -0500 Subject: [PATCH 20/27] Add support for default environment flag Before adding this feature, we would have the metric namespaced with different values. For instance we would have `luigi_production` for production metrics and `luigi_staging` for staging metrics. This caused a whole lot of problems. Having this new feature will allow us to set up monitoring on multiple different environment in parallel thus effectively reducing the number of metrics we have to manage. --- luigi/contrib/datadog.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index bbd010b813..aa74bc8890 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -9,6 +9,7 @@ class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key') app_key = parameter.Parameter(default='dummy_app_key') default_event_tags = parameter.Parameter(default=None) + environment = parameter.Parameter(default='development', description='Environment of the pipeline') metric_namespace = parameter.Parameter(default='luigi') @@ -99,7 +100,13 @@ def _format_task_params_to_tags(self, task): return params def default_event_tags(self): - if not self._config.default_event_tags: - return [] + default_tags = [] - return str.split(self._config.default_event_tags, ',') + if self._config.default_event_tags: + default_tags = default_tags + str.split(self._config.default_event_tags, ',') + + if self._config.environment: + env_tag = "env={environment}".format(environment=self._config.environment) + default_tags.append(env_tag) + + return default_tags From e1c4a766220e2bc33dd69398e47e3d4066c60bc9 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Fri, 26 Jan 2018 11:14:22 -0500 Subject: [PATCH 21/27] Remove unecessary IF clause for the environment ENV variable This is not necessary as we have a default value if set to nothing. --- luigi/contrib/datadog.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index aa74bc8890..3fa42fde0a 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -102,11 +102,10 @@ def _format_task_params_to_tags(self, task): def default_event_tags(self): default_tags = [] + env_tag = "env={environment}".format(environment=self._config.environment) + default_tags.append(env_tag) + if self._config.default_event_tags: default_tags = default_tags + str.split(self._config.default_event_tags, ',') - if self._config.environment: - env_tag = "env={environment}".format(environment=self._config.environment) - default_tags.append(env_tag) - return default_tags From c895dcb64d35829087cc7580ea9fc9c6a663bec9 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 29 Jan 2018 11:59:39 -0500 Subject: [PATCH 22/27] Fix the environment tag for DataDog events DataDog requires the use of `:` instead of `=`. --- luigi/contrib/datadog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 3fa42fde0a..6690dabec2 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -102,7 +102,7 @@ def _format_task_params_to_tags(self, task): def default_event_tags(self): default_tags = [] - env_tag = "env={environment}".format(environment=self._config.environment) + env_tag = "env:{environment}".format(environment=self._config.environment) default_tags.append(env_tag) if self._config.default_event_tags: From 420c438ed9a59af07435cd625357a9380be11ac6 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 29 Jan 2018 12:02:39 -0500 Subject: [PATCH 23/27] Luigi throws warning if param is None Instead of using the None value, we use an empty try. The check that we do before using the value will also return False if we detect an empty string. --- luigi/contrib/datadog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 6690dabec2..5c98a1803d 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -8,7 +8,7 @@ class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key') app_key = parameter.Parameter(default='dummy_app_key') - default_event_tags = parameter.Parameter(default=None) + default_event_tags = parameter.Parameter(default='') environment = parameter.Parameter(default='development', description='Environment of the pipeline') metric_namespace = parameter.Parameter(default='luigi') From 6d2b26bf74fc98dc8fd3b027f99ee1c8021516d5 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 29 Jan 2018 17:55:20 -0500 Subject: [PATCH 24/27] Add the ability to configure statsd information in Luigi Unfortunately, the datadog python library doesn't follow their own convention of being configurable from a `datadog.conf` file. We have to manually set those objects as configuration in Luigi and set the relevant properties when initializing the `statsd` specialized class of DataDog. --- luigi/contrib/datadog.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 5c98a1803d..18c66ffa88 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -11,13 +11,19 @@ class datadog(Config): default_event_tags = parameter.Parameter(default='') environment = parameter.Parameter(default='development', description='Environment of the pipeline') metric_namespace = parameter.Parameter(default='luigi') + statsd_host = parameter.Parameter(default='localhost') + statsd_port = parameter.Parameter(default=8125) class DataDogMetricsCollector(MetricsCollector): def __init__(self, *args, **kwargs): super(DataDogMetricsCollector, self).__init__(*args, **kwargs) self._config = datadog(**kwargs) - initialize(api_key=self._config.api_key, app_key=self._config.app_key) + + initialize(api_key=self._config.api_key, + app_key=self._config.app_key, + statsd_host=self._config.statsd_host, + statsd_port=self._config.statsd_port) def handle_task_started(self, task): title = "Luigi: A task has been started!" From 9bd57f31c2db74268d82d2fcd53654754ff1f122 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 5 Feb 2018 11:35:54 -0500 Subject: [PATCH 25/27] Change the behavior of the default tags Previously, we had name that variable `default_event_tags` but it was confusing because we were sending those tags for both events and metrics. Renaming that variables + adding a sane default value will clarify the goal of that variable. [ch9820] --- luigi/contrib/datadog.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 18c66ffa88..ae83e6b52f 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -8,7 +8,7 @@ class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key') app_key = parameter.Parameter(default='dummy_app_key') - default_event_tags = parameter.Parameter(default='') + default_tags = parameter.Parameter(default='application:luigi') environment = parameter.Parameter(default='development', description='Environment of the pipeline') metric_namespace = parameter.Parameter(default='luigi') statsd_host = parameter.Parameter(default='localhost') @@ -80,19 +80,19 @@ def handle_task_done(self, task): self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'): - all_tags = tags + self.default_event_tags() + all_tags = tags + self.default_tags() api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority) def send_gauge(self, metric_name, value, tags=[]): - all_tags = tags + self.default_event_tags() + all_tags = tags + self.default_tags() namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) statsd.gauge(namespaced_metric, value, tags=all_tags) def send_increment(self, metric_name, value=1, tags=[]): - all_tags = tags + self.default_event_tags() + all_tags = tags + self.default_tags() namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) @@ -105,13 +105,13 @@ def _format_task_params_to_tags(self, task): return params - def default_event_tags(self): + def default_tags(self): default_tags = [] env_tag = "env:{environment}".format(environment=self._config.environment) default_tags.append(env_tag) - if self._config.default_event_tags: - default_tags = default_tags + str.split(self._config.default_event_tags, ',') + if self._config.default_tags: + default_tags = default_tags + str.split(self._config.default_tags, ',') return default_tags From e2653e546ec7f50e868f8c6ca5d0266b72199881 Mon Sep 17 00:00:00 2001 From: Charles-Andre Bouffard Date: Mon, 5 Feb 2018 11:26:58 -0500 Subject: [PATCH 26/27] Change the DD tag from env to environment In our current implementation of this pipeline, we are already sending the `environment` parameter in all of our task. In our DD contrib, we log all the parameters that are passed to a task as a tag. That said, the environment tag is already set in our case. If we were to have `env` then we would have both tags sent to DD for every metrics / events which duplicated the numbers of that that we trully want. DD is clever enough that if a tag already exists, then it won't duplicate it, so by default we want to log the `environment` so that if it doesn't exists, it will create it, else it will use the one that we pass to the task itself. [ch9704] --- luigi/contrib/datadog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index ae83e6b52f..70b3b96575 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -108,7 +108,7 @@ def _format_task_params_to_tags(self, task): def default_tags(self): default_tags = [] - env_tag = "env:{environment}".format(environment=self._config.environment) + env_tag = "environment:{environment}".format(environment=self._config.environment) default_tags.append(env_tag) if self._config.default_tags: From bb66ad3bc2772df89ce0be1a50bae3c584732164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Andr=C3=A9=20Bouffard?= Date: Thu, 19 Apr 2018 13:30:21 -0400 Subject: [PATCH 27/27] Change statsd_port parameter type (#11) This would yield an annoying warning telling us that we're passing an INT when it was expecting a STRING. This wasn't affecting anything, but today is the day that I'm removing this warning. --- luigi/contrib/datadog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py index 70b3b96575..69494d8772 100644 --- a/luigi/contrib/datadog.py +++ b/luigi/contrib/datadog.py @@ -12,7 +12,7 @@ class datadog(Config): environment = parameter.Parameter(default='development', description='Environment of the pipeline') metric_namespace = parameter.Parameter(default='luigi') statsd_host = parameter.Parameter(default='localhost') - statsd_port = parameter.Parameter(default=8125) + statsd_port = parameter.IntParameter(default=8125) class DataDogMetricsCollector(MetricsCollector):