This project is to practice the ETL process with Player Unknown Battle Ground(PUBG) dataset. Often times, dataset below 10,000 rows can easily be handled within a single server. However, when the size of the dataset is over 1,000,000 or even higher up to billions, the need for distributed computation is somewhat required. This project utilizes three PUBG-related dataset (two of them with 13,000,000 rows) from Kaggle. This ETL process loads the dataset into AWS S3 bucket, creates the AWS EMR cluster, loads and transforms the dataset within the EMR cluster-end by using pySpark, then write the transformed dataset back to AWS S3 in csv format. Then finally, it extracts the dataset in the S3 to final Fact and Dimension tables in AWS Redshift. All of the above series of steps are orchestrated by AirFlow. Structure of the Fact/Dimension tables are made based on the future analytical queries.
- PUBG - Aggregate - match-related dataset
- PUBG - Death - kill/death-related dataset
- PUBG - Weapon - weapons-related dataset
- Python
- AirFlow
- Docker
- AWS S3
- AWS EMR
- AWS Redshift
- You need to have AWS CLI configuration ready (AWS credentials + EMR Credentiasl) (for details)
- You need ๐ณ docker & docker-compose
- Run the following command in the terminal where you git clone the reposit
docker-compose -f docker-compose-LocalExecutor.yml up -d
- Add your "redshift' account info in the AirFlow Web UI (localhost:8080/admin -> Admin -> Connections)
- Assign your S3 Bucket name to "BUCKET_NAME" variable in "/dags/spark_submit_airflow.py"
- Assign your S3 Bucket name to "BUCKET_NAME" variable in "/dags/scripts/spark/spark-scipt.py"
- Create the S3 bucket with the name you specified for "BUCKET_NAME"
- Run the dag named "spark_submit_airflow"
Above is the total process of ETL process used in this project. All the workflows were controlled by AirFlow. Raw dataset is stored in AWS S3 bucket and all the data wrangling process is handled by AWS EMR cluster (mostly spark-related work). Then final Fact and Dimension tables are created in AWS Redshift, which supports fast query speed and compuatation due to columnar storage characteristic.
- start_data_pipeline: DummyOperator to indicate the successful run of the DAG
- <script_to_s3, data_to_s3>: Load raw data and spark script to S3
- create_emr_cluster: Create AWS EMR cluster for spark job
- add_steps: Submit a list of work EMR cluster needs to do
- watch_step: Check if the EMR cluster and the steps are successfully done
- terminate_emr_cluster: Terminate the created EMR cluster after job finished
- create_tables: Create Fact/Dimension tables in AWS Redshift
- load_XXX_table: Load the output csv file from EMR cluster in S3 to Redsfhit
- check_data_quality: Check if the data is successfully stored in Redshift table
- end_data_pipeline: DummyOperator to indicate the successful end of the DAG
kill_log table acts as FACT table. Each record represents every kill log during the match and the details of the kill log and relevant players info are stored in other DIMENSION tables .
- Detailed information about the match itself (map, game_size, etc...) can be found by JOINING the fact table with "match" table with JOIN key of "match_id".
- "killer_id" and "victim_id" represents unique identifier for the player at specific match. It can be used as JOIN key with "player_id" column of "player" table.
- Detailed "timestamp" information can be retrieved by JOINING "kill_log" table with "time" table with JOIN key of "timestamp".
- Specific information regarding the weapon that was used in the kill log can be found in the "weapon" dimension table. It can be retrieved by JOINING the fact table with "weapon" table.
SELECT m.map AS Map,
kl.weapon AS Weapon,
COUNT(*) AS Num
FROM pubg.kill_log AS kl
LEFT JOIN pubg.match AS m ON kl.match_id = m.match_id
LEFT JOIN pubg.time AS t ON kl.timestamp = t.timestamp
LEFT JOIN pubg.weapon AS w ON kl.weapon = w.weapon
WHERE m.map in ('ERANGEL', 'MIRAMAR')
GROUP BY m.map, kl.weapon
ORDER BY m.map, Num DESC
By JOINING Fact & Dimension tables, one can get the result of the Most used Weapon by Map. The result of the above code would be as follows
- Often times when Data Engineering work is needed, seemless workflows from Extract to Transform to Load are necessary. These 3 steps can be treated as one single data engineering work and Airflow works as one of the best tools to orchestrate the 3 ETL steps.
- Airflow was chosen for orchestration because I was accustomed to working with Python and Airflow is one of the most popular Open Source pipeline framework recognized by many developers in Github. This hugh community enables quick trouble-shooting.
- Since AWS Services share the same data center, moving data within the AWS Services guarantees high speed and stability. Thus, AWS S3 was chosen for Storage.
- For data wrangling, Spark was used instead of Hadoop since Spark supports faster speed with the use of in-memory as intermediate data saving storage (replacing HDFS). For this Spark job, AWS EMR was used because it can be created and turned-off easily with Airflow and support Spark. It also supports easy data transfer from AWS S3.
- Lastly, AWS Redshift was used for storing the final Fact/Dimension table because it supports high data transfer speed from AWS S3 by using 'COPY COMMAND'. In spite of the fact that AWS Redshift is a columnar storage, it also supports PostgreSQL. Thus, it can be said AWS Redshift supports both the easy access and fast query speed.
- ๋จผ์ , ํด๋ผ์ฐ๋ ์๋น์ค๋ก๋ง ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ฑํ๋ ค๊ณ ํ์ต๋๋ค. AWS, ๊ตฌ๊ธ ํด๋ผ์ฐ๋, Azure ๋ฑ์ ์ต์ ์ด ์์์ง๋ง, Airflow์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ ์ ์จ๊ณผ ์ปค๋ฎค๋ํฐ์ ํฌ๊ธฐ ๋ฉด์์ ๋์์ ๋ฐ์ ๊ธธ์ด ๋ ๋ง์๋ณด์ฌ์ AWS๋ฅผ ์ ํํ์์ต๋๋ค.
- ์ํฌํ๋ก์ฐ ๋งค๋์ง๋จผํธ ํ๋ซํผ์ผ๋ก Airflow, Oozie, Luigi ๋ฑ๋ฑ์ด ์์์ง๋ง ํ์ด์ฌ์ ์ฌ์ฉํ๊ณ UI๊ฐ ์ข๋ ์ง๊ด์ ์ด๋ฉฐ tasks ๋ผ๋ฆฌ์ dependcies๋ฅผ ์ฝ๊ฒ ์์๋ณผ ์ ์๋๋ฐ ํธ๋ฆฌ๋ DAG ํํ๋ก ๋ํ๋ด์ฃผ๋ Airflow๋ฅผ ์ ํํ์์ต๋๋ค. ๋ํ ๊ด๋ จ ์ปค๋ฎค๋ํฐ๊ฐ ํ์์ ์์ ๊ฐ์ฅ ์ปค์ ์ด๋ณด์ ์ฅ์์ ํธ๋ฌ๋ธ ์ํ ์ ์ข ๋ ์ฉ์ดํ ๊ฒ ๊ฐ์์ Airflow๋ฅผ ์ ํํ์์ต๋๋ค.
- AWS S3 ์๋ raw data๊ฐ ์ ์ฅ๋๋ ๊ณณ + AWS EMR ์์ spark๋ฅผ ์ด์ฉํ data transformation ๊ฒฐ๊ณผ ํ
์ด๋ธ์ ์ ์ฅํ๋ ์ฉ๋๋ก ์ฌ์ฉํ์์ต๋๋ค. AWS EMR์ ์ด์ฉํ ๊ฒฐ๊ณผ๋ฌผ์ ์ ์ฅํ๋ ์ฉ๋๋ก๋ HDFS ๋ ์ฌ์ฉ๋ ์ ์์์ง๋ง,
- AWS EMR ์์ HDFS ์ ์ฅ ๋น์ฉ์ ๋๋ฆฌ๊ณ ์ถ์ง ์๋ค๋ ์
- AWS ์๋น์ค๋ค์ ๊ฐ์ data center๋ฅผ ๊ณต์ ํ๊ณ ์์ผ๋ฏ๋ก S3๋ฅผ HDFS ๋์ฉ์ผ๋ก ์ด์ฉํ์ฌ๋ ๋คํธ์ํฌ ์ค๋ฒํค๋๊ฐ ํฌ์ง ์๊ณ , ์ผ ๊ฐ๊ฒฉ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ์ ์๋ค๋ ์
๋๋ฌธ์ AWS S3 ๋ฅผ ์ ํํ๊ฒ ๋์์ต๋๋ค
- Data Transformation์ ํ๋ ๊ณผ์ ์์ Airflow ์๋ฒ์์ ์ง์ sql๋ฌธ์ ํตํด์ data wrangling์ ํ ์๋ ์์์ง๋ง,
- raw data๊ฐ 2GB์ ํด๋นํ์ฌ ํฌ๋ค๋ ์
- Airflow operator๋ฅผ ํตํด์ AWS EMR์ ์๋ ์์ฑ ๋ฐ ์ข ๋ฃ๋ฅผ ์ง์ํ๊ณ AWS ์๋น์ค ๋ด์์์ (S3 -> EMR -> S3 -> Redshift) ๋ฐ์ดํฐ ์ด๋์ด ๊ฐํธํ๊ณ ๋น ๋ฅด๋ค๋ ์
- data transformation ์ ์ํด ์ ๊น ๋์ AWS EMR์ ์ด์ฉํ๋ ๊ฒ์ ๋น์ฉ์ด ๊ทธ๋ ๊ฒ ํฌ์ง ์๋ค๋ ์
- Spark๋ฅผ ์ง์ํด์ pySpark๋ฅผ ํตํ ๋น ๋ฅธ data transformation์ด ๊ฐ๋ฅํ๋ค๋ ์
- Spark๋ intermediate output ์ ์ฅ ์ ํ๋ ๋์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ค๋ ์ ์์ Hadoop ๋์ ์คํํฌ๋ฅผ ์ฌ์ฉ ํ์ต๋๋ค.
์ด๋ฌํ ์ ๋๋ฌธ์ AWS EMR์ ์ด์ฉํ์ฌ data transformation ์ ์งํํ์์ต๋๋ค
- ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ก AWS Aurora, RDS ๋ฑ ์ฌ์ํ ์๋น์ค๋ค์ด ์ฌ์ฉ๋ ์ ์์ง๋ง,
- Column based Redshift๊ฐ ๋ค๋ฅธ DB ๋ณด๋ค ๋ ๋น ๋ฅธ ์ฑ๋ฅ์ ๋ณด์ฌ์ค๋ค๋ ์
- PostgreSQL ์ ์ง์ํ์ฌ S3 ๋ก๋ถํฐ COPY COMMAND๋ก ๋น ๋ฅด๊ฒ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ ์ ์๋ค๋ ์
- Redshift๊ฐ OLAP ์ ๊ฐ์ analytical query ์์ ๋ ๋ฐ์ด๋ ์ฑ๋ฅ์ ๋ณด์ฌ์ค๋ค๋ ์
๋๋ฌธ์ AWS Redshift๋ฅผ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ก ์ฌ์ฉํ์์ต๋๋ค.
- [S3, Redshift] Region ์ ๋์ผ์ํ๋ฉด, data transfer ์๋๊ฐ ๋นจ๋ผ์ง (๊ฐ์ ๋ฐ์ดํฐ ์ผํฐ ๋ด์ ์๊ธฐ ๋๋ฌธ)
- COPY COMMAND ์์ฑ์, ์ฎ๊ธฐ๋ ค๋ ํ์ผ(json, csv, parquet) ๋ฐ์ดํฐ์ ํค๋(HEADER)๊ฐ ์๋์ง ์๋์ง ๊ต์ฅํ ์ค์ํ๋ค. Redshift์ ์ด๋ฏธ Columns๋ค์ ๋ง๋ค์๋ค๋ฉด, ignoreheader=1 ์ต์
์ ๊ผญ ๋ฃ์ด์ค์ผํจ
- ignoreheader=1 ์ต์ ์ ์ถ๊ฐํ์ผ๋ฏ๋ก, Redshift์์ ๋ ์ฝ๋๋ฅผ ์ฝ์๋ ์ปฌ๋ผ๋ช ์ ๋ณด ์์ด ๊ฐ๋ค๋ง ์์๋๋ก ์ฝ์ผ๋ฏ๋ก, Redshift ์ปฌ๋ผ๋ช ์ ์ํ ๋ ์์๊ฐ ์ค์
- COPY COMMAND ์์ฑ์, ์ต์
์ "TIMEFORMAT 'YYYY-MM-DD HH:MI:SS" ์ถ๊ฐํด์ค์ผํจ
- ์ต์ ์ ์ ์์ Redshift์ "yyyy-MM-dd HH:mm:ss.0" default ํ์์ผ๋ก ์ ์ฅ๋จ
- Airflow ๋ฒ์ ๋ณ(v1, v2)๋ก CustomOperator ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ฌ์ฉ๋ฒ์ด ๋ค๋ฅด๋ฏ๋ก ์ฃผ์ํ ๊ฒ. ํ ํ๋ก์ ํธ๋ v1.1 ๊ธฐ์ค
- ๊ฐ๋ ๋จนํต์ด ๋ ๋๊ฐ ์๋๋ฐ, web server ๋ฆฌ๋ถํ ํ์
-
Timestamp() ํ์ ๊ด๋ จ
- to_timestamp() ํจ์๋ฅผ ํตํด 10(13)์๋ฆฌ Unix Timestamp OR ๋ค์ํญ ํํ์ Default Timestamp ๋ฅผ (yyyy-MM-dd HH:mm:ss) ํํ๋ก ๋ณํ๊ฐ๋ฅ
-
df = kg_df.withColumn("ts", to_timestamp(df.unix_ts / 1000) df = kg_df.withColumn("ts", to_timestamp(df.default_ts, "yyyy-MM-dd'T'HH:mm:ssZ"))
- ์๊ฐ๋ผ๋ฆฌ์ ์ฐ์ฐ์๋ Unix Timestampํํ๋ก ๋ฐ๊ฟ์ ๊ณ์ฐํ๋ ๊ฒ์ด ํธํจ
-
df = df.withColumn("added timestamp", to_timestamp(unix_timestamp(df.ts) + 1232))
- to_timestamp(): [string/long(10) ํ์ โ timestamp ํ์] ๋ณํ
- unix_timestamp(): [Default timestamp ํ์ โ unix timestamp ํ์] ๋ณํ
- Unix timestamp ๊ด๋ จ ํจ์
-
์ธ๋ฑ์ค ๋๋ฒ(index) ์ถ๊ฐํ๊ธฐ
- Pandas index ์ด๊ธฐํ ๊ธฐ๋ณธ ํจ์๋ฅผ pySpark๋ ์ ๊ณตํ์ง ์์
df.reset_index(drop=False, inplace=True)
- ๋ฐ๋ผ์ 'row_number()' ์ 'window ํจ์' ์กฐํฉ์ผ๋ก index column(0,1,2,...,n-1) ์ถ๊ฐ ๊ฐ๋ฅ
-
window = Window.orderBy(kill_log_df.killer_id) kill_log_df = kill_log_df.withColumn('kill_log_id', row_number().over(window))
- Window.orderBy()๋ฅผ ํตํด ํน์ ์ด์ ๋ํ windowSpec ์ ๋ง๋ค์ด์ค๋ค
- row_number()๋ฅผ ํตํด windowSpec์ ๋ํด ์ธ๋ฑ์ค ๋ฒํธ๋ฅผ ์ถ๊ฐํด์ค๋ค
- Pandas index ์ด๊ธฐํ ๊ธฐ๋ณธ ํจ์๋ฅผ pySpark๋ ์ ๊ณตํ์ง ์์
-
pySpark โ S3 ๋ก write ํ ๋ (df.write.csv("s3a://xxxxx", timestampFormat="....")
- timestampFormat ์ธ์๋ฅผ ์ง์ ํ์ง ์์ผ๋ฉด default ํฌ๋งท์ผ๋ก write ๋๋ฏ๋ก ์ํ๋ ํฌ๋งท์ด ์์ผ๋ฉด ๊ผญ ๋ช ์ํด์ค์ผํจ timestampFormat = "yyyy-MM-dd HH:mm:ss"
-
Unix timestamp(์ ์ 13์๋ฆฌ, ๋ฐ๋ฆฌ์ด) ์ฒ๋ฆฌ๋ฅผ ์ฃผ์
- Airflow EmrTerminateJobFlowOperator ๋ฅผ ์จ์ EMR Auto termination ๋ช ๋ น์ ๋ด๋ฆด ๋ EMR version์ด 5.34.0 ์ด์์ด์ด์ผํจ
- NUMERIC ํ์ ์ Numeric(precision, scale) ์ธ์๋ฅผ ๊ฐ์ง ์ ์๋๋ฐ, precision์ ์ ์ฒด(์์์ ํฌํจ) ์ซ์ ๊ธธ์ด๋ฅผ ๋ปํ๊ณ scales์ ์์์ ์๋ฆฌ๋ฅผ ๋ปํ๋ค. ๋ฐ๋ผ์ Numeric(5,2)๋ (-999.99 ~ 999.99 ๊น์ง ์ปค๋ฒ๊ฐ๋ฅ). default scale ๊ฐ์ด 0 ์ด๊ธฐ ๋๋ฌธ์ ์๋ตํ๋ฉด ์์์ ์ซ์๋ฅผ ํ๊ธฐํ ์ ์์!!!
- distkey, sortkey ์ถ๊ฐํด๋ณด๊ณ
- Redshift table์ distribution style, sorting key ์ถ๊ฐํด์ ์ฟผ๋ฆฌ ์ฑ๋ฅ ๊ฒ์ฆํด๋ณด๊ธฐ
- dist/sort key๋ 2๊ฐ ์ด์์ node๋ก ๊ตฌ์ฑ๋ cluster์์ ํจ๊ณผ๊ฐ ๋์ด
- why? distkey ์์ฒด๊ฐ ๊ฐ node์ ๋ฐ์ดํฐ๋ฅผ ๊ณ ๋ฅด๊ฒ ๋ถ๋ฐฐํด์ shuffling overhead๋ฅผ ์ค์ด๋ ๊ฒ์ด ๋ชฉ์ ์ธ๋ฐ node๊ฐ 1๊ฐ๋ฉด ์ด์ฐจํผ ํ node์์ JOIN์ด ์ผ์ด๋๋ ํจ๊ณผ X
- Redshift table์ BI Tool ์ฐ๊ฒฐํด์ analytics ํด๋ณด๊ธฐ
- Full refresh (DAG ๋๋ฆด๋๋ง๋ค ๋ชจ๋ ๊ฒ์ ์ ๋ถ ์๋ก ETL) ๋ง๊ณ Execution date ๋ฅผ ๊ธฐ์ค์ผ๋ก backfilling ํด๋ณด๊ธฐ