diff --git a/edx/analytics/tasks/common/spark.py b/edx/analytics/tasks/common/spark.py index df645eb8dd..640c1b6aff 100644 --- a/edx/analytics/tasks/common/spark.py +++ b/edx/analytics/tasks/common/spark.py @@ -259,17 +259,9 @@ def get_event_log_dataframe(self, spark, *args, **kwargs): schema = self.get_log_schema() if self.direct_eventlogs_processing: self.log.warn("\nPYSPARK => Processing event log source directly\n") - event_log_source = self.get_config_from_args('event_log_source', *args, default_value=None) - if event_log_source is not None: - event_log_source = json.loads(event_log_source) - self.log.warn("\nPYSPARK => Event log source : {}\n".format(event_log_source)) - dataframe = spark.read.format('json').load(event_log_source[0], schema=self.get_log_schema()) - source_list_count = len(event_log_source) - if source_list_count > 1: - for k in range(1, source_list_count): - dataframe = dataframe.union( - spark.read.format('json').load(event_log_source[k], schema=self.get_log_schema()) - ) + source = [src.encode('utf-8') for src in self.source] + self.log.warn("\nPYSPARK => Event log source : {}\n".format(source)) + dataframe = spark.read.format('json').load(source, schema=self.get_log_schema()) else: self.log.warn("\nPYSPARK => Processing path selection output\n") input_source = self.get_input_source(*args) diff --git a/edx/analytics/tasks/insights/location_per_course.py b/edx/analytics/tasks/insights/location_per_course.py index 2ad1241335..5f767256cf 100644 --- a/edx/analytics/tasks/insights/location_per_course.py +++ b/edx/analytics/tasks/insights/location_per_course.py @@ -216,12 +216,6 @@ def run(self): target.remove() super(LastDailyIpAddressOfUserTaskSpark, self).run() - def get_luigi_configuration(self): - options = {} - config = luigi.configuration.get_config() - options['event_log_source'] = config.get('event-logs', 'source', '') - return options - def spark_job(self, *args): from edx.analytics.tasks.util.spark_util import get_event_predicate_labels, get_course_id, get_event_time_string from pyspark.sql.functions import udf @@ -246,7 +240,7 @@ def spark_job(self, *args): WHERE rank = 1 """ result = self._spark.sql(query) - result.coalesce(2).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') + result.coalesce(4).write.partitionBy('dt').csv(self.output_dir().path, mode='append', sep='\t') class LastCountryOfUserDownstreamMixin(