-
Notifications
You must be signed in to change notification settings - Fork 1
/
SparkCovid19_2.py
58 lines (41 loc) · 1.25 KB
/
SparkCovid19_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import sys
import time
from pyspark import SparkContext, SparkConf
from operator import add
from csv import reader
pop_dict={}
def is_valid(x):
if x[2].isdigit():
return (x[1],int(x[2]))
else:
return None
def divide(x):
global pop_dict
try:
ret = ((x[1]*1000000.0)/broadcasted_dict.value[x[0]])
c = x[0] + " , " + str(ret)
# print("----",c)
return c
except:
return None
if __name__ == "__main__":
start_time = time.time()
sc = SparkContext("local","Spark task 3")
with open(sys.argv[2]) as file_ptr:
lines = reader(file_ptr,delimiter = ',')
for line in lines:
# line = line.rstrip().split(",")
# if len(line)==6:
# line[1]=line[2]
# line[4]=line[5]
if line[4].isdigit():
population = int(line[4])
pop_dict.update({line[1] : population})
broadcasted_dict = sc.broadcast(pop_dict)
# read data from text file and split each line into words
words = sc.textFile(sys.argv[1]).map(lambda line: line.split(","))
# # count the occurrence of each word
wordCounts = words.map(is_valid).filter(lambda x: x is not None).reduceByKey(add)
# save the counts to output
wordCounts.map(divide).filter(lambda x: x is not None).saveAsTextFile(sys.argv[3])
print("Time taken --- %s seconds ---" % (time.time() - start_time))