Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
using wildcards with dataframe source
Browse files Browse the repository at this point in the history
  • Loading branch information
rao-abdul-mannan committed Jun 10, 2018
1 parent e204e49 commit 2b7b94b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 18 deletions.
14 changes: 3 additions & 11 deletions edx/analytics/tasks/common/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,9 @@ def get_event_log_dataframe(self, spark, *args, **kwargs):
schema = self.get_log_schema()
if self.direct_eventlogs_processing:
self.log.warn("\nPYSPARK => Processing event log source directly\n")
event_log_source = self.get_config_from_args('event_log_source', *args, default_value=None)
if event_log_source is not None:
event_log_source = json.loads(event_log_source)
self.log.warn("\nPYSPARK => Event log source : {}\n".format(event_log_source))
dataframe = spark.read.format('json').load(event_log_source[0], schema=self.get_log_schema())
source_list_count = len(event_log_source)
if source_list_count > 1:
for k in range(1, source_list_count):
dataframe = dataframe.union(
spark.read.format('json').load(event_log_source[k], schema=self.get_log_schema())
)
source = [src.encode('utf-8') for src in self.source]
self.log.warn("\nPYSPARK => Event log source : {}\n".format(source))
dataframe = spark.read.format('json').load(source, schema=self.get_log_schema())
else:
self.log.warn("\nPYSPARK => Processing path selection output\n")
input_source = self.get_input_source(*args)
Expand Down
8 changes: 1 addition & 7 deletions edx/analytics/tasks/insights/location_per_course.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,6 @@ def run(self):
target.remove()
super(LastDailyIpAddressOfUserTaskSpark, self).run()

def get_luigi_configuration(self):
options = {}
config = luigi.configuration.get_config()
options['event_log_source'] = config.get('event-logs', 'source', '')
return options

def spark_job(self, *args):
from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string
from pyspark.sql.functions import udf
Expand All @@ -246,7 +240,7 @@ def spark_job(self, *args):
WHERE rank = 1
"""
result = self._spark.sql(query)
result.coalesce(2).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')
result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t')


class LastCountryOfUserDownstreamMixin(
Expand Down

0 comments on commit 2b7b94b

Please sign in to comment.