forked from databrickslabs/mosaic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
01. Data Prep.py
141 lines (107 loc) · 4.13 KB
/
01. Data Prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Databricks notebook source
# MAGIC %md ## We First Prep the data and download it
# COMMAND ----------
# MAGIC %pip install databricks_mosaic
# COMMAND ----------
from pyspark.sql.functions import *
import mosaic as mos
spark.conf.set("spark.databricks.labs.mosaic.geometry.api", "ESRI")
spark.conf.set("spark.databricks.labs.mosaic.index.system", "H3")
mos.enable_mosaic(spark, dbutils)
# COMMAND ----------
# MAGIC %md ##AIS Data
# COMMAND ----------
dbutils.fs.mkdirs("/tmp/ship2ship")
# COMMAND ----------
# MAGIC %sh
# MAGIC # see: https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2018/index.html
# MAGIC # we download data to dbfs:// mountpoint (/dbfs)
# MAGIC mkdir /ship2ship/
# MAGIC cd /ship2ship/
# MAGIC wget -np -r -nH -L --cut-dirs=4 https://coast.noaa.gov/htdata/CMSP/AISDataHandler/2018/AIS_2018_01_31.zip > /dev/null 2>&1
# MAGIC unzip AIS_2018_01_31.zip
# MAGIC mv AIS_2018_01_31.csv /dbfs/tmp/ship2ship/
# COMMAND ----------
schema = """
MMSI int,
BaseDateTime timestamp,
LAT double,
LON double,
SOG double,
COG double,
Heading double,
VesselName string,
IMO string,
CallSign string,
VesselType int,
Status int,
Length int,
Width int,
Draft double,
Cargo int,
TranscieverClass string
"""
AIS_df = (
spark.read.csv("/tmp/ship2ship", header=True, schema=schema)
.filter("VesselType = 70") # Only select cargos
.filter("Status IS NOT NULL")
)
display(AIS_df)
# COMMAND ----------
# MAGIC %sql
# MAGIC CREATE DATABASE IF NOT EXISTS ship2ship
# COMMAND ----------
(AIS_df.write.format("delta").mode("overwrite").saveAsTable("ship2ship.AIS"))
# COMMAND ----------
# MAGIC %md ## Harbours
# MAGIC
# MAGIC This data can be obtained from [here](https://data-usdot.opendata.arcgis.com/datasets/usdot::ports-major/about), and loaded with the code below.
# MAGIC
# MAGIC To avoid detecting overlap close to, or within harbours, in Notebook `03.b Advanced Overlap Detection` we filter out events taking place close to a harbour.
# MAGIC Various approaches are possible, including filtering out events too close to shore, and can be implemented in a similar fashion.
# MAGIC
# MAGIC In this instance we set a buffer of `10 km` around harbours to arbitrarily define an area wherein we do not expect ship-to-ship transfers to take place.
# MAGIC Since our projection is not in metres, we convert from decimal degrees. With `(0.00001 - 0.000001)` as being equal to one metre at the equator
# MAGIC Ref: http://wiki.gis.com/wiki/index.php/Decimal_degrees
# COMMAND ----------
# MAGIC %sh
# MAGIC # we download data to dbfs:// mountpoint (/dbfs)
# MAGIC cd /dbfs/tmp/ship2ship/
# MAGIC # wget -np -r -nH -L -q --cut-dirs=7 -O harbours.geojson "https://geo.dot.gov/mapping/rest/services/NTAD/Ports_Major/MapServer/0/query?outFields=*&where=1%3D1&f=geojson"
# MAGIC wget -np -r -nH -L -q --cut-dirs=7 -O harbours.geojson "https://geo.dot.gov/mapping/rest/services/NTAD/Strategic_Ports/MapServer/0/query?outFields=*&where=1%3D1&f=geojson"
# COMMAND ----------
one_metre = 0.00001 - 0.000001
buffer = 10 * 1000 * one_metre
major_ports = (
spark.read.format("json")
.option("multiline", "true")
.load("/tmp/ship2ship/harbours.geojson")
.select("type", explode(col("features")).alias("feature"))
.select(
"type",
col("feature.properties").alias("properties"),
to_json(col("feature.geometry")).alias("json_geometry"),
)
.withColumn("geom", mos.st_aswkt(mos.st_geomfromgeojson("json_geometry")))
.select(col("properties.PORT_NAME").alias("name"), "geom")
.withColumn("geom", mos.st_buffer("geom", lit(buffer)))
)
display(major_ports)
# COMMAND ----------
# MAGIC %%mosaic_kepler
# MAGIC major_ports "geom" "geometry"
# COMMAND ----------
(
major_ports.select("name", mos.grid_tessellateexplode("geom", lit(9)).alias("mos"))
.select("name", col("mos.index_id").alias("h3"))
.write.mode("overwrite")
.format("delta")
.saveAsTable("ship2ship.harbours_h3")
)
# COMMAND ----------
harbours_h3 = spark.read.table("ship2ship.harbours_h3")
display(harbours_h3)
# COMMAND ----------
# MAGIC %%mosaic_kepler
# MAGIC "harbours_h3" "h3" "h3" 5_000
# COMMAND ----------