From f683f7f2227d57e4fb2adcbb47fc1032295bdaa7 Mon Sep 17 00:00:00 2001 From: Eduardo Bizarro Date: Sat, 9 Mar 2019 23:55:52 -0300 Subject: [PATCH 1/2] rename s3_conn_id to aws_conn_id --- __init__.py | 14 +- ...lytics_account_summaries_to_s3_operator.py | 52 +++---- ...ogle_analytics_reporting_to_s3_operator.py | 131 +++++++++--------- 3 files changed, 106 insertions(+), 91 deletions(-) diff --git a/__init__.py b/__init__.py index 75c617e..433d0e7 100644 --- a/__init__.py +++ b/__init__.py @@ -1,14 +1,20 @@ from airflow.plugins_manager import AirflowPlugin from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook -from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import GoogleAnalyticsReportingToS3Operator -from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import GoogleAnalyticsAccountSummariesToS3Operator +from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import ( + GoogleAnalyticsReportingToS3Operator, +) +from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import ( + GoogleAnalyticsAccountSummariesToS3Operator, +) class GoogleAnalyticsPlugin(AirflowPlugin): name = "google_analytics_plugin" hooks = [GoogleAnalyticsHook] - operators = [GoogleAnalyticsReportingToS3Operator, - GoogleAnalyticsAccountSummariesToS3Operator] + operators = [ + GoogleAnalyticsReportingToS3Operator, + GoogleAnalyticsAccountSummariesToS3Operator, + ] executors = [] macros = [] admin_views = [] diff --git a/operators/google_analytics_account_summaries_to_s3_operator.py b/operators/google_analytics_account_summaries_to_s3_operator.py index a642ea3..ee543ee 100644 --- a/operators/google_analytics_account_summaries_to_s3_operator.py +++ b/operators/google_analytics_account_summaries_to_s3_operator.py @@ -9,21 +9,23 @@ class GoogleAnalyticsAccountSummariesToS3Operator(BaseOperator): - template_fields = ('s3_key',) - - def __init__(self, - google_analytics_conn_id, - s3_conn_id, - s3_bucket, - s3_key, - brand, - space, - *args, - **kwargs): + template_fields = ("s3_key",) + + def __init__( + self, + google_analytics_conn_id, + aws_conn_id, + s3_bucket, + s3_key, + brand, + space, + *args, + **kwargs + ): super().__init__(*args, **kwargs) self.google_analytics_conn_id = google_analytics_conn_id - self.s3_conn_id = s3_conn_id + self.aws_conn_id = aws_conn_id self.s3_bucket = s3_bucket self.s3_key = s3_key self.brand = brand @@ -31,32 +33,32 @@ def __init__(self, def execute(self, context): ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id) - s3_conn = S3Hook(self.s3_conn_id) + s3_conn = S3Hook(self.aws_conn_id) account_summaries = ga_conn.get_account_summaries() - file_name = '/tmp/{key}.jsonl'.format(key=self.s3_key) - with open(file_name, 'w') as ga_file: + file_name = "/tmp/{key}.jsonl".format(key=self.s3_key) + with open(file_name, "w") as ga_file: data = [] - for item in account_summaries.get('items', []): + for item in account_summaries.get("items", []): root_data_obj = { - 'account_id': item['id'], - 'brand': self.brand, - 'space': self.space + "account_id": item["id"], + "brand": self.brand, + "space": self.space, } - for web_property in item.get('webProperties', []): + for web_property in item.get("webProperties", []): data_obj = {} data_obj.update(root_data_obj) - data_obj['property_id'] = web_property['id'] + data_obj["property_id"] = web_property["id"] - for profile in web_property.get('profiles', []): - data_obj['profile_id'] = profile['id'] - data_obj['profile_name'] = profile['name'] + for profile in web_property.get("profiles", []): + data_obj["profile_id"] = profile["id"] + data_obj["profile_name"] = profile["name"] data.append(data_obj) - json_data = '\n'.join([json.dumps(d) for d in data]) + json_data = "\n".join([json.dumps(d) for d in data]) ga_file.write(json_data) s3_conn.load_file(file_name, self.s3_key, self.s3_bucket, True) diff --git a/operators/google_analytics_reporting_to_s3_operator.py b/operators/google_analytics_reporting_to_s3_operator.py index 0740066..76487a5 100644 --- a/operators/google_analytics_reporting_to_s3_operator.py +++ b/operators/google_analytics_reporting_to_s3_operator.py @@ -28,8 +28,8 @@ class GoogleAnalyticsReportingToS3Operator(BaseOperator): but in either case it will be passed to GA as '%Y-%m-%d'. :type until: string - :param s3_conn_id: The s3 connection id. - :type s3_conn_id: string + :param aws_conn_id: The s3 connection id. + :type aws_conn_id: string :param s3_bucket: The S3 bucket to be used to store the Google Analytics data. :type s3_bucket: string @@ -38,25 +38,25 @@ class GoogleAnalyticsReportingToS3Operator(BaseOperator): :type s3_key: string """ - template_fields = ('s3_key', - 'since', - 'until') - - def __init__(self, - google_analytics_conn_id, - view_id, - since, - until, - dimensions, - metrics, - s3_conn_id, - s3_bucket, - s3_key, - page_size=1000, - include_empty_rows=True, - sampling_level=None, - *args, - **kwargs): + template_fields = ("s3_key", "since", "until") + + def __init__( + self, + google_analytics_conn_id, + view_id, + since, + until, + dimensions, + metrics, + aws_conn_id, + s3_bucket, + s3_key, + page_size=1000, + include_empty_rows=True, + sampling_level=None, + *args, + **kwargs + ): super().__init__(*args, **kwargs) self.google_analytics_conn_id = google_analytics_conn_id @@ -68,86 +68,93 @@ def __init__(self, self.metrics = metrics self.page_size = page_size self.include_empty_rows = include_empty_rows - self.s3_conn_id = s3_conn_id + self.aws_conn_id = aws_conn_id self.s3_bucket = s3_bucket self.s3_key = s3_key self.metricMap = { - 'METRIC_TYPE_UNSPECIFIED': 'varchar(255)', - 'CURRENCY': 'decimal(20,5)', - 'INTEGER': 'int(11)', - 'FLOAT': 'decimal(20,5)', - 'PERCENT': 'decimal(20,5)', - 'TIME': 'time' + "METRIC_TYPE_UNSPECIFIED": "varchar(255)", + "CURRENCY": "decimal(20,5)", + "INTEGER": "int(11)", + "FLOAT": "decimal(20,5)", + "PERCENT": "decimal(20,5)", + "TIME": "time", } if self.page_size > 10000: - raise Exception('Please specify a page size equal to or lower than 10000.') + raise Exception("Please specify a page size equal to or lower than 10000.") if not isinstance(self.include_empty_rows, bool): raise Exception('Please specificy "include_empty_rows" as a boolean.') def execute(self, context): ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id) - s3_conn = S3Hook(self.s3_conn_id) + s3_conn = S3Hook(self.aws_conn_id) try: - since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + since_formatted = datetime.strptime( + self.since, "%Y-%m-%d %H:%M:%S" + ).strftime("%Y-%m-%d") except: since_formatted = str(self.since) try: - until_formatted = datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + until_formatted = datetime.strptime( + self.until, "%Y-%m-%d %H:%M:%S" + ).strftime("%Y-%m-%d") except: until_formatted = str(self.until) - report = ga_conn.get_analytics_report(self.view_id, - since_formatted, - until_formatted, - self.sampling_level, - self.dimensions, - self.metrics, - self.page_size, - self.include_empty_rows) - - columnHeader = report.get('columnHeader', {}) + report = ga_conn.get_analytics_report( + self.view_id, + since_formatted, + until_formatted, + self.sampling_level, + self.dimensions, + self.metrics, + self.page_size, + self.include_empty_rows, + ) + + columnHeader = report.get("columnHeader", {}) # Right now all dimensions are hardcoded to varchar(255), will need a map if any non-varchar dimensions are used in the future # Unfortunately the API does not send back types for Dimensions like it does for Metrics (yet..) dimensionHeaders = [ - {'name': header.replace('ga:', ''), 'type': 'varchar(255)'} - for header - in columnHeader.get('dimensions', []) + {"name": header.replace("ga:", ""), "type": "varchar(255)"} + for header in columnHeader.get("dimensions", []) ] metricHeaders = [ - {'name': entry.get('name').replace('ga:', ''), - 'type': self.metricMap.get(entry.get('type'), 'varchar(255)')} - for entry - in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', []) + { + "name": entry.get("name").replace("ga:", ""), + "type": self.metricMap.get(entry.get("type"), "varchar(255)"), + } + for entry in columnHeader.get("metricHeader", {}).get( + "metricHeaderEntries", [] + ) ] with NamedTemporaryFile("w") as ga_file: - rows = report.get('data', {}).get('rows', []) + rows = report.get("data", {}).get("rows", []) for row_counter, row in enumerate(rows): root_data_obj = {} - dimensions = row.get('dimensions', []) - metrics = row.get('metrics', []) + dimensions = row.get("dimensions", []) + metrics = row.get("metrics", []) for index, dimension in enumerate(dimensions): - header = dimensionHeaders[index].get('name').lower() + header = dimensionHeaders[index].get("name").lower() root_data_obj[header] = dimension for metric in metrics: data = {} data.update(root_data_obj) - for index, value in enumerate(metric.get('values', [])): - header = metricHeaders[index].get('name').lower() + for index, value in enumerate(metric.get("values", [])): + header = metricHeaders[index].get("name").lower() data[header] = value - data['viewid'] = self.view_id - data['timestamp'] = self.since + data["viewid"] = self.view_id + data["timestamp"] = self.since - ga_file.write(json.dumps(data) + ('' if row_counter == len(rows) else '\n')) + ga_file.write( + json.dumps(data) + ("" if row_counter == len(rows) else "\n") + ) - s3_conn.load_file(ga_file.name, - self.s3_key, - self.s3_bucket, - True) + s3_conn.load_file(ga_file.name, self.s3_key, self.s3_bucket, True) From 0f4688a7802335851924bc39321845bf8c18549a Mon Sep 17 00:00:00 2001 From: Eduardo Bizarro Date: Sun, 10 Mar 2019 00:05:12 -0300 Subject: [PATCH 2/2] remove unused comma --- operators/google_analytics_account_summaries_to_s3_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operators/google_analytics_account_summaries_to_s3_operator.py b/operators/google_analytics_account_summaries_to_s3_operator.py index ee543ee..f96f39b 100644 --- a/operators/google_analytics_account_summaries_to_s3_operator.py +++ b/operators/google_analytics_account_summaries_to_s3_operator.py @@ -9,7 +9,7 @@ class GoogleAnalyticsAccountSummariesToS3Operator(BaseOperator): - template_fields = ("s3_key",) + template_fields = "s3_key" def __init__( self,