-
Notifications
You must be signed in to change notification settings - Fork 0
/
Day 4 Practice.txt
194 lines (116 loc) · 5.95 KB
/
Day 4 Practice.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# Aggregation commands
# total numbers of rows in dataframe
df.count()
# column count
# import all the functions for aggregation
from pyspark.sql.functions import *
df.select(count('salary')).show()
# Column alias
df.select(count('salary').alias('Total salary')).show()
# salary sum
df.select(sum('salary').alias('Total salary')).show()
# avg salary
df.select(avg('salary').alias('Total salary')).show()
# max salary
df.select(max('salary').alias('Total salary')).show()
# min salary
df.select(min('salary').alias('Total salary')).show()
# Error
# should be part of group by clause
df.select('employee_id',min(col('salary')).alias('Total salary')).show()
# Ordering the dataframe according to any column value in ascending/descending
df.orderBy(col('salary')).select('employee_id','first_name','salary','department_id').show()
# ordering by two columns
df.orderBy(col('salary'), col('department_id')).select('employee_id','first_name','salary','department_id').show()
# ascending and descending
df.orderBy(col('salary').asc(), col('department_id').desc()).select('employee_id','first_name','salary','department_id').show()
# Grouping
#1st method
#Grouping on 1 column
df.groupBy(col('department_id')).sum('salary').show()
df.groupBy(col('department_id')).min('salary').show()
#Grouping on 2 columns
df.groupBy(col('department_id'),col('job_id')).sum('salary').show()
# Sum operation on more than 1 column
df.groupBy(col('department_id'),col('job_id')).sum('salary','employee_id').show()
# 2nd method
# flexibility and different aggregation function can be used in same transformation
df.groupBy('department_id').agg(sum('salary').alias('salary sum'), max('salary').alias('Max salary')).show()
# having clause
# Max salary greater than 10000
df.groupBy('department_id').agg(sum('salary').alias('salary sum'), max('salary').alias('Max salary')).where(col('max salary')
>10000).show()
#max salary equals to 10000
df.groupBy('department_id').agg(sum('salary').alias('salary sum'), max('salary').alias('Max salary')).where(col('max salary')
==11000).show()
# when - otherwise statement in Pyspark
# behaves same as case when statement in sql
df1 = df.withColumn('EMP_GRADE', when( col('salary') > 15000 ,'A').when( ( (col('salary') >=10000 ) & ( col('salary')< 15000) ), 'B').otherwise('C')).show()
----------------------------------------------------------------------------
# Spark SQL
# converting dataframe into sql table
df.createOrReplaceTempView('Employee')
# Table is created by the table name--employee
# querying the table
# returns dataframe as an output
spark.sql('select * from employee limit 5').show()
df4 = spark.sql('select department_id as dept_id, sum(salary) from employee group by department_id')
df4.show()
----------------------------------------------------------------------------------------------------------
#Joins
# Inner Join
df1.join(df2, df1.department_id==df2.department_id, 'inner').show()
df.join(df2, df.DEPARTMENT_ID==df2.DEPARTMENT_ID, 'inner').select(df['department_id'], df['first_name'],df2['department_name']).show()
# left join
df.join(df2, df.DEPARTMENT_ID==df2.DEPARTMENT_ID, 'left').select(df['employee_id'], df['department_id'],df2['department_name']).show()
# Right join
df.join(df2, df.DEPARTMENT_ID==df2.DEPARTMENT_ID, 'right').select(df['employee_id'], df['department_id'],df2['department_name']).show()
# Full outer join
df.join(df2, df['department_id']==df2['department_id'], 'fullouter').select(df['employee_id'], df['department_id'],df2['department_name']).show(100)
# self join
df.alias('emp1').join(df.alias('emp2') , col('emp1.manager_id') == col('emp2.employee_id') , 'inner').show()
# Selecting only few columns with aliases
df.alias('emp1').join(df.alias('emp2') , col('emp1.manager_id') == col('emp2.employee_id') , 'inner')
.select(col('emp1.employee_id'),col('emp1.first_name').alias('emp name'),col('emp1.manager_id'), col('emp2.first_name').alias('manager_name')).show()
# Multiple condition for join
df.join(df2, (df['department_id']==df2['department_id']) & (df2['location_id']==1700), 'fullouter').select(df['employee_id'], df['department_id'],df2['department_name']).show(100)
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
location_data = [(1800, 'India'), (1900, 'USA')]
schema = StructType([ StructField('location_id', IntegerType(), True), StructField('Location', StringType(), True)])
df = spark.createDataFrame(data=location_data, schema=schema)
# multi level join
# 2 times inner join 1st with emp and dept and 2nd with the joined dataframe and location dataframe
df.join(df2, df['dept_id']==df2['dept_id'], 'inner').join(locdf, locdf['loc_id']==df2['loc_id'], 'inner').show()
# User Defined Function
def uppercase(in_str):
out_str = in_str.upper()
return out_str
from pyspark.sql.functions import udf
# Registring our function in udf
uppercase_udf = udf(lambda x : uppercase(x),StringType())
# using our function
df.select(uppercase_udf(df['location'])).show()
# 2nd method of registring our function
@udf(returnType=StringType())
def uppercase_2(in_str):
out_str = in_str.upper()
return out_str
# Window function
from pyspark.sql.window import Window
WindowSpec = Window.partitionBy('Dept_id).orderBy('Salary')
df.withColumn('salary_rank',rank().over(WindowSpec)).show(100)
# Broadcast join
from pyspark.sql.functions import *
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', 104857600)
df.join(broadcast(df1), df.Dept_id == df1.Dept_id , 'inner').show(100)
# Failure Scenarios
Driver going out of memory because the data at the action phase is so big that driver cant handle it
Executor going out of memory because the datasets are big and hence resulting executor going out of memory
# writing dataframe as csv
df.write.option('header',True).csv("/input")
# overwriting in the directory
df.write.mode('overwrite').format('csv').save('/output')
df.write.mode('append').format('csv').save('/output')
# Number of partitions
df.rdd.getNumPartitions()
#