Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3Hook class is no longer accepting s3_conn_id parameter and it uses aws_conn_id instead #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from airflow.plugins_manager import AirflowPlugin
from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook
from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import GoogleAnalyticsReportingToS3Operator
from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import GoogleAnalyticsAccountSummariesToS3Operator
from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import (
GoogleAnalyticsReportingToS3Operator,
)
from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import (
GoogleAnalyticsAccountSummariesToS3Operator,
)


class GoogleAnalyticsPlugin(AirflowPlugin):
name = "google_analytics_plugin"
hooks = [GoogleAnalyticsHook]
operators = [GoogleAnalyticsReportingToS3Operator,
GoogleAnalyticsAccountSummariesToS3Operator]
operators = [
GoogleAnalyticsReportingToS3Operator,
GoogleAnalyticsAccountSummariesToS3Operator,
]
executors = []
macros = []
admin_views = []
Expand Down
52 changes: 27 additions & 25 deletions operators/google_analytics_account_summaries_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,56 @@


class GoogleAnalyticsAccountSummariesToS3Operator(BaseOperator):
template_fields = ('s3_key',)

def __init__(self,
google_analytics_conn_id,
s3_conn_id,
s3_bucket,
s3_key,
brand,
space,
*args,
**kwargs):
template_fields = "s3_key"

def __init__(
self,
google_analytics_conn_id,
aws_conn_id,
s3_bucket,
s3_key,
brand,
space,
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.google_analytics_conn_id = google_analytics_conn_id
self.s3_conn_id = s3_conn_id
self.aws_conn_id = aws_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.brand = brand
self.space = space

def execute(self, context):
ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id)
s3_conn = S3Hook(self.s3_conn_id)
s3_conn = S3Hook(self.aws_conn_id)

account_summaries = ga_conn.get_account_summaries()

file_name = '/tmp/{key}.jsonl'.format(key=self.s3_key)
with open(file_name, 'w') as ga_file:
file_name = "/tmp/{key}.jsonl".format(key=self.s3_key)
with open(file_name, "w") as ga_file:
data = []
for item in account_summaries.get('items', []):
for item in account_summaries.get("items", []):
root_data_obj = {
'account_id': item['id'],
'brand': self.brand,
'space': self.space
"account_id": item["id"],
"brand": self.brand,
"space": self.space,
}

for web_property in item.get('webProperties', []):
for web_property in item.get("webProperties", []):
data_obj = {}
data_obj.update(root_data_obj)

data_obj['property_id'] = web_property['id']
data_obj["property_id"] = web_property["id"]

for profile in web_property.get('profiles', []):
data_obj['profile_id'] = profile['id']
data_obj['profile_name'] = profile['name']
for profile in web_property.get("profiles", []):
data_obj["profile_id"] = profile["id"]
data_obj["profile_name"] = profile["name"]
data.append(data_obj)

json_data = '\n'.join([json.dumps(d) for d in data])
json_data = "\n".join([json.dumps(d) for d in data])
ga_file.write(json_data)

s3_conn.load_file(file_name, self.s3_key, self.s3_bucket, True)
Expand Down
131 changes: 69 additions & 62 deletions operators/google_analytics_reporting_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class GoogleAnalyticsReportingToS3Operator(BaseOperator):
but in either case it will be
passed to GA as '%Y-%m-%d'.
:type until: string
:param s3_conn_id: The s3 connection id.
:type s3_conn_id: string
:param aws_conn_id: The s3 connection id.
:type aws_conn_id: string
:param s3_bucket: The S3 bucket to be used to store
the Google Analytics data.
:type s3_bucket: string
Expand All @@ -38,25 +38,25 @@ class GoogleAnalyticsReportingToS3Operator(BaseOperator):
:type s3_key: string
"""

template_fields = ('s3_key',
'since',
'until')

def __init__(self,
google_analytics_conn_id,
view_id,
since,
until,
dimensions,
metrics,
s3_conn_id,
s3_bucket,
s3_key,
page_size=1000,
include_empty_rows=True,
sampling_level=None,
*args,
**kwargs):
template_fields = ("s3_key", "since", "until")

def __init__(
self,
google_analytics_conn_id,
view_id,
since,
until,
dimensions,
metrics,
aws_conn_id,
s3_bucket,
s3_key,
page_size=1000,
include_empty_rows=True,
sampling_level=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)

self.google_analytics_conn_id = google_analytics_conn_id
Expand All @@ -68,86 +68,93 @@ def __init__(self,
self.metrics = metrics
self.page_size = page_size
self.include_empty_rows = include_empty_rows
self.s3_conn_id = s3_conn_id
self.aws_conn_id = aws_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key

self.metricMap = {
'METRIC_TYPE_UNSPECIFIED': 'varchar(255)',
'CURRENCY': 'decimal(20,5)',
'INTEGER': 'int(11)',
'FLOAT': 'decimal(20,5)',
'PERCENT': 'decimal(20,5)',
'TIME': 'time'
"METRIC_TYPE_UNSPECIFIED": "varchar(255)",
"CURRENCY": "decimal(20,5)",
"INTEGER": "int(11)",
"FLOAT": "decimal(20,5)",
"PERCENT": "decimal(20,5)",
"TIME": "time",
}

if self.page_size > 10000:
raise Exception('Please specify a page size equal to or lower than 10000.')
raise Exception("Please specify a page size equal to or lower than 10000.")

if not isinstance(self.include_empty_rows, bool):
raise Exception('Please specificy "include_empty_rows" as a boolean.')

def execute(self, context):
ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id)
s3_conn = S3Hook(self.s3_conn_id)
s3_conn = S3Hook(self.aws_conn_id)
try:
since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d')
since_formatted = datetime.strptime(
self.since, "%Y-%m-%d %H:%M:%S"
).strftime("%Y-%m-%d")
except:
since_formatted = str(self.since)
try:
until_formatted = datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d')
until_formatted = datetime.strptime(
self.until, "%Y-%m-%d %H:%M:%S"
).strftime("%Y-%m-%d")
except:
until_formatted = str(self.until)
report = ga_conn.get_analytics_report(self.view_id,
since_formatted,
until_formatted,
self.sampling_level,
self.dimensions,
self.metrics,
self.page_size,
self.include_empty_rows)

columnHeader = report.get('columnHeader', {})
report = ga_conn.get_analytics_report(
self.view_id,
since_formatted,
until_formatted,
self.sampling_level,
self.dimensions,
self.metrics,
self.page_size,
self.include_empty_rows,
)

columnHeader = report.get("columnHeader", {})
# Right now all dimensions are hardcoded to varchar(255), will need a map if any non-varchar dimensions are used in the future
# Unfortunately the API does not send back types for Dimensions like it does for Metrics (yet..)
dimensionHeaders = [
{'name': header.replace('ga:', ''), 'type': 'varchar(255)'}
for header
in columnHeader.get('dimensions', [])
{"name": header.replace("ga:", ""), "type": "varchar(255)"}
for header in columnHeader.get("dimensions", [])
]
metricHeaders = [
{'name': entry.get('name').replace('ga:', ''),
'type': self.metricMap.get(entry.get('type'), 'varchar(255)')}
for entry
in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
{
"name": entry.get("name").replace("ga:", ""),
"type": self.metricMap.get(entry.get("type"), "varchar(255)"),
}
for entry in columnHeader.get("metricHeader", {}).get(
"metricHeaderEntries", []
)
]

with NamedTemporaryFile("w") as ga_file:
rows = report.get('data', {}).get('rows', [])
rows = report.get("data", {}).get("rows", [])

for row_counter, row in enumerate(rows):
root_data_obj = {}
dimensions = row.get('dimensions', [])
metrics = row.get('metrics', [])
dimensions = row.get("dimensions", [])
metrics = row.get("metrics", [])

for index, dimension in enumerate(dimensions):
header = dimensionHeaders[index].get('name').lower()
header = dimensionHeaders[index].get("name").lower()
root_data_obj[header] = dimension

for metric in metrics:
data = {}
data.update(root_data_obj)

for index, value in enumerate(metric.get('values', [])):
header = metricHeaders[index].get('name').lower()
for index, value in enumerate(metric.get("values", [])):
header = metricHeaders[index].get("name").lower()
data[header] = value

data['viewid'] = self.view_id
data['timestamp'] = self.since
data["viewid"] = self.view_id
data["timestamp"] = self.since

ga_file.write(json.dumps(data) + ('' if row_counter == len(rows) else '\n'))
ga_file.write(
json.dumps(data) + ("" if row_counter == len(rows) else "\n")
)

s3_conn.load_file(ga_file.name,
self.s3_key,
self.s3_bucket,
True)
s3_conn.load_file(ga_file.name, self.s3_key, self.s3_bucket, True)