diff --git a/.github/workflows/test-harvest.yml b/.github/workflows/test-harvest.yml index c3a6ea55..264c3a7b 100644 --- a/.github/workflows/test-harvest.yml +++ b/.github/workflows/test-harvest.yml @@ -64,7 +64,7 @@ jobs: id: api-start run: cd api && supabase start | grep -w "service_role key" | cut -d ":" -f 2 | xargs | tr -d '\n' | awk '{print "service_role_key="$1}' >> "$GITHUB_OUTPUT" && cd .. - name: run the harvester - run: docker run --env PG_SERVER='0.0.0.0' --env SKIP_MAPBOX --env PG_DB --env PG_PORT --env PG_USER --env PG_PASS --env SUPABASE_URL --env SUPABASE_SERVICE_ROLE_KEY='${{ steps.api-start.outputs.service_role_key }}' --env SUPABASE_BUCKET_NAME --env MAPBOXTOKEN --env MAPBOXUSERNAME --env LOGGING --env OUTPUT --network host technologiestiftung/giessdenkiez-de-dwd-harvester:test + run: docker run --env PG_SERVER='0.0.0.0' --env SKIP_MAPBOX --env PG_DB --env PG_PORT --env PG_USER --env PG_PASS --env SUPABASE_URL --env SUPABASE_SERVICE_ROLE_KEY='${{ steps.api-start.outputs.service_role_key }}' --env LIMIT_DAYS='30' --env SURROUNDING_SHAPE_FILE='/app/assets/buffer.shp' --env SUPABASE_BUCKET_NAME --env MAPBOXTOKEN --env MAPBOXUSERNAME --env MAPBOXTILESET --env MAPBOXLAYERNAME --env LOGGING --env OUTPUT --network host technologiestiftung/giessdenkiez-de-dwd-harvester:test - name: stop the api run: cd api && supabase stop && cd .. release: diff --git a/.gitignore b/.gitignore index ab417c89..b09aaaf0 100644 --- a/.gitignore +++ b/.gitignore @@ -292,3 +292,14 @@ dist .yarn/build-state.yml .yarn/install-state.gz .pnp.* + +# Generated files +harvester/assets/berlin.shx +harvester/assets/buffer.cpg +harvester/assets/buffer.dbf +harvester/assets/buffer.prj +harvester/assets/buffer.shp +harvester/assets/buffer.dbf +harvester/assets/buffer.shx + +harvester/.vscode \ No newline at end of file diff --git a/README.md b/README.md index 18417dfd..34908407 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ ![](https://img.shields.io/badge/Built%20with%20%E2%9D%A4%EF%B8%8F-at%20Technologiestiftung%20Berlin-blue) + + [![All Contributors](https://img.shields.io/badge/all_contributors-7-orange.svg?style=flat-square)](#contributors-) + # giessdenkiez-de-dwd-harvester @@ -47,20 +50,67 @@ The current python binding of gdal is fixed to GDAL==2.4.2. If you get another g Copy the `sample.env` file and rename to `.env` then update the parameters, most importantly the database connection parameters. +``` +PG_SERVER=localhost +PG_PORT=54322 +PG_USER=postgres +PG_PASS=postsgres +PG_DB=postgres +SUPABASE_URL=http://127.0.0.1:54321 +SUPABASE_SERVICE_ROLE=eyJh... +SUPABASE_BUCKET_NAME=data_assets +MAPBOXUSERNAME=your_mapbox_username +MAPBOXTOKEN=your_mapbox +MAPBOXTILESET=your_mapbox_tileset_id +MAPBOXLAYERNAME=your_mapbox_layer_name +SKIP_MAPBOX=False +LIMIT_DAYS=30 +SURROUNDING_SHAPE_FILE=./assets/buffer.shp +``` + ## Running -### Preparing the Buffer Shape -`harvester/prepare.py` shows how the assets/buffer.shp was created. If a bigger buffer is needed change `line 10` accordingly and re-run. +Starting from an empty database, the complete process of running the DWD harvester consists of three steps: + +1. Preparing the buffered shapefile +2. Creating the grid structure for the `radolan_geometry` table +3. Harvesting the DWD data -### Creating the Grid Structure -`harvester/grid/grid.py` can be used to populate the radolan_geometry table. This table contains vector data for the target city. The data is needed by the harvest process to find the rain data for the target city area. +### 1. Preparing the buffered shapefile -This tool currently works for Berlin. To make use of it for another city, just replace the `harvester/grid/buffer.shp` file with a suitable shape. (can be generated by `harvester/prepare.py` for example. See above) +Firstly, a buffered shapefile is needed, which is created with the following commands. This step is utilizing the `harvester/assets/berlin.prj` and `harvester/assets/berlin.shp` files. Make sure to set the environment variables properly before running this step. -### Running the Harvest Process -`harvester/harvester.py` is the actual file for harvesting the data. Simply run, no command line parameters, all settings are in `.env`. +- `cd harvester/prepare` +- `SHAPE_RESTORE_SHX=YES python create-buffer.py` -The code in `harvester/harvester.py` tries to clean up after running the code. But, when running this in a container, as the script is completely stand alone, its probably best to just destroy the whole thing and start from scratch next time. +### 2. Creating the grid structure for the `radolan_geometry` table + +Secondly, the `radolan_geometry` table needs to be populated. You need to have the buffered shapefile (from the previous step) created and available in `../assets`. The `radolan_geometry` table contains vector data for the target city. The data is needed by the harvest process to find the rain data for the target city area. This repository contains shape files for Berlin area. To make use of it for another city, replace the `harvester/assets/berlin.prj` and `harvester/assets/berlin.shp` files. Run the following commands to create the grid structure in the database: + +- `cd harvester/prepare` +- `python create-grid.py` + +### 3. Harvesting the DWD data + +Make sure to set the environment variables properly before running the script. Make sure that you have succesfully ran the previous steps for preparing the buffered shapefile and creating the grid structure for the `radolan_geometry` table. The file `harvester/src/run_harvester.py` contains the script for running the DWD harvester, it does the following: + +- Checks for existens of all required environment variables +- Setup database connection +- Get start end end date of current harvesting run (for incremental harvesting every day) +- Download all daily radolan files from DWD server +- Extracts the daily radolan files into hourly radolan files +- For each hourly radolan file: + - Projects the given data to Mercator, cuts out the area of interest. Using `gdalwarp` library. + - Produce a polygon feature layer. Using `gdal_polygonize.py` library. + - Extract raw radolan values from generate feature layer. + - Upload extracted radolan values to database +- Cleanup old radolan values in database (keep only last 30 days) +- Build a radolan grid holding the hourly radolan values for the last 30 days for each polygon in the grid. +- Updates `radolan_sum` and `radolan_values` columns in the database `trees` table +- Updates the Mapbox trees layer: + - Build a trees.csv file based on all trees (with updated radolan values) in the database + - Preprocess trees.csv using `tippecanoe` library. + - Start the creation of updated Mapbox layer ## Docker @@ -96,7 +146,6 @@ docker-compose up --build ``` - ## Contributors ✨ Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): diff --git a/action.yml b/action.yml index 829efabc..b68ce0e7 100644 --- a/action.yml +++ b/action.yml @@ -50,6 +50,14 @@ inputs: description: "Set to 'True' to skip the Mapbox Tileset generation (for testing pipelines)" required: true default: "False" + LIMIT_DAYS: + description: "The number of days to harvest DWD data for" + required: true + default: "30" + SURROUNDING_SHAPE_FILE: + description: "The path to the shape file of the area of interest" + required: true + default: "assets/buffer.shp" runs: using: "docker" image: "harvester/Dockerfile" @@ -69,3 +77,5 @@ runs: LOGGING: ${{ inputs.LOGGING }} DATABASE_URL: ${{ inputs.DATABASE_URL }} SKIP_MAPBOX: ${{ inputs.SKIP_MAPBOX }} + LIMIT_DAYS: ${{ inputs.LIMIT_DAYS }} + SURROUNDING_SHAPE_FILE: ${{ inputs.SURROUNDING_SHAPE_FILE }} diff --git a/harvester/Dockerfile b/harvester/Dockerfile index 9f519cb0..cb61afa3 100644 --- a/harvester/Dockerfile +++ b/harvester/Dockerfile @@ -27,11 +27,8 @@ COPY --from=builder /install /usr/local RUN apt-get update && apt-get -y install git && apt-get -y install make RUN git clone https://github.com/mapbox/tippecanoe.git && cd tippecanoe && make -j && make install -# COPY harvester.py /app/ -# COPY prepare.py /app/ -# COPY grid/ grid/ -# COPY assets/ /app/assets COPY . /app/ +RUN cd /app/prepare && SHAPE_RESTORE_SHX=YES python create-buffer.py -CMD python /app/harvester.py && python /app/mapbox_tree_update.py +CMD python /app/src/run_harvester.py diff --git a/harvester/assets/Berlin.cpg b/harvester/assets/Berlin.cpg deleted file mode 100644 index 3ad133c0..00000000 --- a/harvester/assets/Berlin.cpg +++ /dev/null @@ -1 +0,0 @@ -UTF-8 \ No newline at end of file diff --git a/harvester/assets/Berlin.dbf b/harvester/assets/Berlin.dbf deleted file mode 100644 index 1ccd4116..00000000 Binary files a/harvester/assets/Berlin.dbf and /dev/null differ diff --git a/harvester/assets/Berlin.shx b/harvester/assets/Berlin.shx deleted file mode 100644 index 42083dc9..00000000 Binary files a/harvester/assets/Berlin.shx and /dev/null differ diff --git a/harvester/assets/Berlin.xml b/harvester/assets/Berlin.xml deleted file mode 100644 index b47714e8..00000000 --- a/harvester/assets/Berlin.xml +++ /dev/null @@ -1,384 +0,0 @@ - - -utf8 - - -dataset - - -dataset - - - - -ISO 19139 Geographic Information - Metadata - Implementation Specification - - -2007 - - - - - -Daten des amtlichen Liegenschaftskatsterinformationssystems (ALKIS) - Die Bezirksgrenzen der 12 Berliner Bezirke.Quelle: Geoportal BerlinVerarbeitungsprozesse: WFS &quot;ALKIS Bezirke&quot; wurde in ArcGIS Pro importiert, nach Web Mercator projiziert und als Web Layer in ArcGIS Online veröffentlicht. - - -ALKIS Berliner Bezirke. Stand: 31.12.2017 - - -Geoportal Berlin / ALKIS Berlin - Bezirke - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Berlin - - -DE - - -Germany - - -Deutschland - - -Open Data - - -ALKIS - - -Grenzen - - -boundaries - - -Bezirke - - -bezirksgrenzen - - - - - - -Open Data  Nutzungsbedingungen NutzIII der Stadtentwicklung Berlin &quot;Geoportal Berlin / ALKIS Berlin - Bezirke&quot; - - - - - -utf8 - - - - diff --git a/harvester/assets/Berlin.prj b/harvester/assets/berlin.prj similarity index 100% rename from harvester/assets/Berlin.prj rename to harvester/assets/berlin.prj diff --git a/harvester/assets/Berlin.shp b/harvester/assets/berlin.shp similarity index 100% rename from harvester/assets/Berlin.shp rename to harvester/assets/berlin.shp diff --git a/harvester/assets/buffer.cpg b/harvester/assets/buffer.cpg deleted file mode 100644 index cd89cb97..00000000 --- a/harvester/assets/buffer.cpg +++ /dev/null @@ -1 +0,0 @@ -ISO-8859-1 \ No newline at end of file diff --git a/harvester/assets/buffer.dbf b/harvester/assets/buffer.dbf deleted file mode 100644 index 2c38708f..00000000 Binary files a/harvester/assets/buffer.dbf and /dev/null differ diff --git a/harvester/assets/buffer.prj b/harvester/assets/buffer.prj deleted file mode 100644 index 5c6f76df..00000000 --- a/harvester/assets/buffer.prj +++ /dev/null @@ -1 +0,0 @@ -PROJCS["WGS_1984_Web_Mercator_Auxiliary_Sphere",GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Mercator_Auxiliary_Sphere"],PARAMETER["False_Easting",0.0],PARAMETER["False_Northing",0.0],PARAMETER["Central_Meridian",0.0],PARAMETER["Standard_Parallel_1",0.0],PARAMETER["Auxiliary_Sphere_Type",0.0],UNIT["Meter",1.0]] \ No newline at end of file diff --git a/harvester/assets/buffer.shp b/harvester/assets/buffer.shp deleted file mode 100644 index bb3f18f6..00000000 Binary files a/harvester/assets/buffer.shp and /dev/null differ diff --git a/harvester/assets/buffer.shx b/harvester/assets/buffer.shx deleted file mode 100644 index 790ca451..00000000 Binary files a/harvester/assets/buffer.shx and /dev/null differ diff --git a/harvester/grid/grid-germany.asc b/harvester/assets/grid-germany.asc similarity index 100% rename from harvester/grid/grid-germany.asc rename to harvester/assets/grid-germany.asc diff --git a/harvester/grid/.vscode/launch.json b/harvester/grid/.vscode/launch.json deleted file mode 100644 index 441b3978..00000000 --- a/harvester/grid/.vscode/launch.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "configurations": [ - { - "name": "Python: Current File (Integrated Terminal)", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal" - }, - { - "name": "Python: Current File (External Terminal)", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "externalTerminal" - } - ] -} \ No newline at end of file diff --git a/harvester/grid/buffer.cpg b/harvester/grid/buffer.cpg deleted file mode 100644 index cd89cb97..00000000 --- a/harvester/grid/buffer.cpg +++ /dev/null @@ -1 +0,0 @@ -ISO-8859-1 \ No newline at end of file diff --git a/harvester/grid/buffer.dbf b/harvester/grid/buffer.dbf deleted file mode 100644 index 2c38708f..00000000 Binary files a/harvester/grid/buffer.dbf and /dev/null differ diff --git a/harvester/grid/buffer.prj b/harvester/grid/buffer.prj deleted file mode 100644 index 5c6f76df..00000000 --- a/harvester/grid/buffer.prj +++ /dev/null @@ -1 +0,0 @@ -PROJCS["WGS_1984_Web_Mercator_Auxiliary_Sphere",GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Mercator_Auxiliary_Sphere"],PARAMETER["False_Easting",0.0],PARAMETER["False_Northing",0.0],PARAMETER["Central_Meridian",0.0],PARAMETER["Standard_Parallel_1",0.0],PARAMETER["Auxiliary_Sphere_Type",0.0],UNIT["Meter",1.0]] \ No newline at end of file diff --git a/harvester/grid/buffer.shp b/harvester/grid/buffer.shp deleted file mode 100644 index bb3f18f6..00000000 Binary files a/harvester/grid/buffer.shp and /dev/null differ diff --git a/harvester/grid/buffer.shx b/harvester/grid/buffer.shx deleted file mode 100644 index 790ca451..00000000 Binary files a/harvester/grid/buffer.shx and /dev/null differ diff --git a/harvester/harvester.py b/harvester/harvester.py deleted file mode 100644 index fd111e05..00000000 --- a/harvester/harvester.py +++ /dev/null @@ -1,322 +0,0 @@ -import geopandas -from shapely.wkt import dumps -import subprocess -import shutil -import gzip -import tarfile -import urllib.request -from datetime import timedelta -from datetime import datetime -import psycopg2.extras -import psycopg2 -from dotenv import load_dotenv -import logging -import os -from mapbox_tree_update import upload_file_to_supabase_storage - -# setting up logging -logging.basicConfig() -LOGGING_MODE = None -if "LOGGING" in os.environ: - LOGGING_MODE = os.getenv("LOGGING") - if LOGGING_MODE == "ERROR": - logging.root.setLevel(logging.ERROR) - elif LOGGING_MODE == "WARNING": - logging.root.setLevel(logging.WARNING) - elif LOGGING_MODE == "INFO": - logging.root.setLevel(logging.INFO) - else: - logging.root.setLevel(logging.NOTSET) -else: - logging.root.setLevel(logging.NOTSET) - -# loading the environmental variables -load_dotenv() - -# check if all required environmental variables are accessible -for env_var in ["PG_DB", "PG_PORT", "PG_USER", "PG_PASS", "SUPABASE_URL", "SUPABASE_BUCKET_NAME", "SUPABASE_SERVICE_ROLE_KEY"]: - if env_var not in os.environ: - logging.error( - "❌Environmental Variable {} does not exist".format(env_var)) - -# database connection - -pg_server = os.getenv("PG_SERVER") -pg_port = os.getenv("PG_PORT") -pg_username = os.getenv("PG_USER") -pg_password = os.getenv("PG_PASS") -pg_database = os.getenv("PG_DB") - -dsn = f"host='{pg_server}' port={pg_port} user='{pg_username}' password='{pg_password}' dbname='{pg_database}'" - -logging.info("🆙 Starting harvester v0.5") -# get last day of insert -last_date = None - -SUPABASE_URL = os.getenv('SUPABASE_URL') -SUPABASE_BUCKET_NAME = os.getenv('SUPABASE_BUCKET_NAME') -SUPABASE_SERVICE_ROLE_KEY = os.getenv('SUPABASE_SERVICE_ROLE_KEY') - - -try: - conn = psycopg2.connect(dsn) - logging.info("🗄 Database connection established") -except: - logging.error("❌Could not establish database connection") - conn = None - -with conn.cursor() as cur: - cur.execute("SELECT collection_date FROM radolan_harvester WHERE id = 1") - last_date = cur.fetchone()[0] - -logging.info("Last harvest {}".format(last_date)) - -# create a temporary folder to store the downloaded DWD data -path = "/temp/" -if os.path.isdir(path) != True: - os.mkdir(path) - -# now download all zips starting from last date, until yesterday (DWD usually uploads the latest data at some point during the night) - -enddate = datetime.now() + timedelta(days=-1) -date = datetime.combine(last_date, datetime.min.time()) - -while date <= enddate: - url = 'https://opendata.dwd.de/climate_environment/CDC/grids_germany/hourly/radolan/recent/asc/RW-{}.tar.gz'.format( - date.strftime("%Y%m%d")) - url_split = url.split("/") - dest_name = url_split[len(url_split) - 1] - dest = path + dest_name - - try: - urllib.request.urlretrieve(url, dest) - except: - logging.warning("❌Could not download {}".format(url)) - - date += timedelta(days=1) - logging.info("Downloading: {} / {}".format(enddate, date)) - -# unpack the data and delete the zips afterwards - -for (dirpath, dirnames, filenames) in os.walk(path): - for fileindex, filename in enumerate(filenames): - if ".tar.gz" in filename: - # first unzip - full_filename = path + filename - with gzip.open(full_filename, 'rb') as f_in: - with open(full_filename.split(".gz")[0], 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - os.remove(full_filename) - - # now untar - with tarfile.open(full_filename.split(".gz")[0], "r") as tar: - temp_path = full_filename.split(".tar")[0] - if os.path.isdir(temp_path) == False: - os.mkdir(temp_path) - tarlist = [] - for member in tar.getmembers(): - tarlist.append(member) - tar.extractall(temp_path, tarlist) - tar.close() - - os.remove(full_filename.split(".gz")[0]) - logging.info( - "Unzipping: {} / {}".format(len(filenames), fileindex+1)) - -# collecting all the files that need importing in one list -filelist = [] -for (dirpath, dirnames, filenames) in os.walk(path): - for dirname in dirnames: - dpath = path + "/" + dirname - for (ddirpath, ddirnames, ffilenames) in os.walk(dpath): - for ffilename in ffilenames: - filelist.append(dpath + "/" + ffilename) - -# Import into postgresql -# - filter data through the berlin buffer -# - polygonize that data -# - import into radolan_temp -# - join with radolan_geometry and insert into radolan_data -# - purge temporary files - -last_received = datetime.strptime("1970-01-01 01:00:00", '%Y-%m-%d %H:%M:%S') - -for counter, file in enumerate(filelist): - input_file = file - - file_split = file.split("/") - date_time_obj = datetime.strptime( - file_split[len(file_split)-1], 'RW_%Y%m%d-%H%M.asc') - if date_time_obj > last_received: - last_received = date_time_obj - logging.info("Processing: {} / {}".format(len(filelist), counter+1)) - - output_file = path + "temp.tif" - - # clean the temporary folder - for del_file in [output_file, path + "temp.shp", path + "temp.shx", path + "temp.prj", path + "temp.dbf"]: - if os.path.exists(del_file): - os.remove(del_file) - - # for some reason the python gdal bindings are ****. after hours of trying to get this to work in pure python, this has proven to be more reliable and efficient. sorry. - - # filter data - cmdline = ['gdalwarp', input_file, output_file, "-s_srs", "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", "-t_srs", - "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", "-r", "near", "-of", "GTiff", "-cutline", "/app/assets/buffer.shp"] - subprocess.call(cmdline) - - # polygonize data - cmdline = ['gdal_polygonize.py', output_file, "-f", - "ESRI Shapefile", path + "temp.shp", "temp", "MYFLD"] - subprocess.call(cmdline) - - cmdline = None - - df = geopandas.read_file(path + "temp.shp") - df = df.to_crs("epsg:3857") - - # if there was no rain in Berlin on that timestamp, there will be no data to insert - if df['geometry'].count() > 0: - clean = df[(df['MYFLD'] > 0) & (df['MYFLD'].notnull())] - if len(clean) > 0: - logging.info("🌧 Found some rain") - values = [] - for index, row in clean.iterrows(): - values.append( - [dumps(row.geometry, rounding_precision=5), row.MYFLD, date_time_obj]) - with conn.cursor() as cur: - # just to be sure - cur.execute("DELETE FROM radolan_temp;") - psycopg2.extras.execute_batch( - cur, - "INSERT INTO radolan_temp (geometry, value, measured_at) VALUES (ST_Multi(ST_Transform(ST_GeomFromText(%s, 3857), 4326)), %s, %s);", - values - ) - # in order to keep our database fast and small, we are not storing the original polygonized data, but instead we are using a grid and only store the grid ids and the corresponding precipitation data - cur.execute("INSERT INTO radolan_data (geom_id, value, measured_at) SELECT radolan_geometry.id, radolan_temp.value, radolan_temp.measured_at FROM radolan_geometry JOIN radolan_temp ON ST_WithIn(radolan_geometry.centroid, radolan_temp.geometry);") - cur.execute("DELETE FROM radolan_temp;") - conn.commit() - # memory management, just to be sure - values = None - clean = None - df = None - date_time_obj = None - file_split = None - os.remove(file) - FNULL = None - -# purge data older than 30 days -logging.info("cleaning up old data 🗑️") -timelimit = 30 -with conn.cursor() as cur: - cur.execute( - "DELETE FROM radolan_data WHERE measured_at < NOW() - INTERVAL '{} days'".format(timelimit)) - conn.commit() - - -# purge duplicates -logging.info("purging duplicates 🗑️") -with conn.cursor() as cur: - cur.execute("DELETE FROM radolan_data AS a USING radolan_data AS b WHERE a.id < b.id AND a.geom_id = b.geom_id AND a.measured_at = b.measured_at") - conn.commit() - -# get all grid cells, get data for last 30 days for each grid cell, generate a list for each grid cell -# as we don't store "0" events, those need to be generated, afterwards trees are updated and a geojson is being created - -# get the grid and weather data -logging.info("building grid 🌐") -grid = [] -with conn.cursor() as cur: - cur.execute("SELECT radolan_geometry.id, ST_AsGeoJSON(radolan_geometry.geometry), ARRAY_AGG(radolan_data.measured_at) AS measured_at, ARRAY_AGG(radolan_data.value) AS value FROM radolan_geometry JOIN radolan_data ON radolan_geometry.id = radolan_data.geom_id WHERE radolan_data.measured_at > NOW() - INTERVAL '{} days' GROUP BY radolan_geometry.id, radolan_geometry.geometry".format(timelimit)) - grid = cur.fetchall() - -# build clean, sorted arrays -clean = [] -for cell in grid: - enddate = datetime.now() + timedelta(days=-1) - enddate = enddate.replace(hour=23, minute=50, second=0, microsecond=0) - startdate = datetime.now() + timedelta(days=-timelimit) - startdate = startdate.replace(hour=0, minute=50, second=0, microsecond=0) - clean_data = [] - while startdate <= enddate: - found = False - for dateindex, date in enumerate(cell[2]): - if startdate == date: - found = True - clean_data.append(cell[3][dateindex]) - # TODO: Add the algorithm that calculates the actually absorbed amount of water (upper & lower threshold) - if found == False: - clean_data.append(0) - startdate += timedelta(hours=1) - clean.append(clean_data) - -# update statistics db -if len(filelist) > 0: - enddate = datetime.now() + timedelta(days=-1) - enddate = enddate.replace(hour=23, minute=50, second=0, microsecond=0) - startdate = datetime.now() + timedelta(days=-timelimit) - startdate = startdate.replace(hour=0, minute=50, second=0, microsecond=0) - with conn.cursor() as cur: - cur.execute("UPDATE radolan_harvester SET collection_date = %s, start_date = %s, end_date = %s WHERE id = 1", [ - last_received, startdate, enddate]) - conn.commit() - - # update the tree data - logging.info("updating trees 🌳") - values = [] - for cellindex, cell in enumerate(grid): - values.append([clean[cellindex], sum(clean[cellindex]), cell[1]]) - - with conn.cursor() as cur: - psycopg2.extras.execute_batch( - cur, - "UPDATE trees SET radolan_days = %s, radolan_sum = %s WHERE ST_CoveredBy(geom, ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326));", - values - ) - conn.commit() - - # update all the trees we have missed with the first round :( - logging.info("updating sad trees 🌳") - with conn.cursor() as cur: - psycopg2.extras.execute_batch( - cur, - "UPDATE trees SET radolan_days = %s, radolan_sum = %s WHERE trees.radolan_sum IS NULL AND ST_CoveredBy(geom, ST_Buffer(ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326), 0.0002));", - values - ) - conn.commit() - - values = None - - # generate geojson for map and upload to Supabase Storage - logging.info("generate geojson 🗺️") - - features = [] - features_light = [] - - for cellindex, cell in enumerate(grid): - feature_template = '{{"type":"Feature","geometry":{},"properties":{{"id":{},"data":[{}]}}}}' - features.append(feature_template.format( - cell[1], cell[0], ",".join(map(str, clean[cellindex])))) - features_light.append(feature_template.format( - cell[1], cell[0], sum(clean[cellindex]))) - - def finishGeojson(feature_list, file_name): - geojson = '{{"type":"FeatureCollection","properties":{{"start":"{}","end":"{}"}},"features":[{}]}}'.format( - startdate, enddate, ",".join(feature_list)) - - text_file = open(path + file_name, "w") - n = text_file.write(geojson) - text_file.close() - n = None - upload_file_to_supabase_storage(path + file_name, file_name) - - finishGeojson(features, "weather.geojson") - finishGeojson(features_light, "weather_light.geojson") - - # remove all temporary files - shutil.rmtree(path) - -else: - logging.info("No updates") - -conn.close() diff --git a/harvester/mapbox_tree_update.py b/harvester/mapbox_tree_update.py deleted file mode 100644 index a838dec6..00000000 --- a/harvester/mapbox_tree_update.py +++ /dev/null @@ -1,224 +0,0 @@ -import os -import requests -import json -import tempfile -import time -import shutil -import subprocess -import boto3 -from datetime import datetime -import psycopg2 -import logging -from tqdm import tqdm - -# Set the log level -logging.root.setLevel(logging.INFO) - -SKIP_MAPBOX = os.getenv("SKIP_MAPBOX") - -# Database connection parameters -pg_server = os.getenv("PG_SERVER") -pg_port = os.getenv("PG_PORT") -pg_username = os.getenv("PG_USER") -pg_password = os.getenv("PG_PASS") -pg_database = os.getenv("PG_DB") -dsn = f"host='{pg_server}' port={pg_port} user='{pg_username}' password='{pg_password}' dbname='{pg_database}'" - -# Supabase configuration -SUPABASE_URL = os.getenv("SUPABASE_URL") -SUPABASE_BUCKET_NAME = os.getenv("SUPABASE_BUCKET_NAME") -SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") - - -# Function to check if a file exists in Supabase storage -def check_file_exists_in_supabase_storage(file_name): - url = f"{SUPABASE_URL}/storage/v1/object/info/public/{SUPABASE_BUCKET_NAME}/{file_name}" - response = requests.get(url) - return response.status_code == 200 - - -# Function to upload a file to Supabase storage -def upload_file_to_supabase_storage(file_path, file_name): - try: - with open(file_path, "rb") as file: - file_url = ( - f"{SUPABASE_URL}/storage/v1/object/{SUPABASE_BUCKET_NAME}/{file_name}" - ) - http_method = ( - requests.put - if check_file_exists_in_supabase_storage(file_name) - else requests.post - ) - response = http_method( - file_url, - files={"file": file}, - headers={ - "Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}", - "ContentType": "application/geo+json", - "AcceptEncoding": "gzip, deflate, br", - }, - ) - - if response.status_code == 200: - logging.info(f"✅ Uploaded {file_name} to Supabase storage") - else: - logging.error(response.status_code) - logging.error(response.content) - logging.error(f"❌ Could not upload {file_name} to Supabase storage") - - except Exception as error: - logging.error(error) - logging.error(f"❌ Could not upload {file_name} to Supabase storage") - - -# Create a temporary directory -path = tempfile.mkdtemp() - -# Get the current year -current_year = datetime.now().year - -# Initialize the database connection -try: - conn = psycopg2.connect(dsn) - logging.info("🗄 Database connection established") -except Exception as e: - logging.error("❌ Could not establish a database connection") - conn = None - -if conn is not None: - with conn.cursor() as cur: - logging.info("Fetching trees from the database...") - # WARNING: The db is still mislabeled lat <> lng - cur.execute( - "SELECT trees.id, trees.lat, trees.lng, trees.radolan_sum, trees.pflanzjahr FROM trees WHERE ST_CONTAINS(ST_SetSRID (( SELECT ST_EXTENT (geometry) FROM radolan_geometry), 4326), trees.geom)" - ) - trees = cur.fetchall() - - header = "id,lng,lat,radolan_sum,age" - logging.info(f"Creating trees.csv file for {len(trees)} trees") - - lines = [] - for tree in tqdm(trees): - age = int(current_year) - int(tree[4]) if tree[4] != 0 else "" - line = "{},{},{},{},{}".format(tree[0], tree[1], tree[2], tree[3], age) - lines.append(line) - - trees_csv = "\n".join([header] + lines) - - trees_csv_full_path = os.path.join(path, "trees.csv") - trees_preprocessed_full_path = os.path.join(path, "trees-preprocessed.mbtiles") - - with open(trees_csv_full_path, "w") as out: - out.write(trees_csv) - - # Pre-process trees.csv with tippecanoe - logging.info("Preprocess trees.csv with tippecanoe...") - subprocess.call( - [ - "tippecanoe", - "-zg", - "-o", - trees_preprocessed_full_path, - "--force", - "--drop-fraction-as-needed", - trees_csv_full_path, - ] - ) - logging.info("Preprocess trees.csv with tippecanoe... Done.") - - # Upload preprocessed data to Supabase storage - upload_file_to_supabase_storage( - trees_preprocessed_full_path, "trees-preprocessed.mbtiles" - ) - - # Send the updated CSV to Mapbox - if SKIP_MAPBOX != "True": - try: - url = "https://api.mapbox.com/uploads/v1/{}/credentials?access_token={}".format( - os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN") - ) - response = requests.post(url) - s3_credentials = json.loads(response.content) - - # Upload the latest data to S3 - s3mapbox = boto3.client( - "s3", - aws_access_key_id=s3_credentials["accessKeyId"], - aws_secret_access_key=s3_credentials["secretAccessKey"], - aws_session_token=s3_credentials["sessionToken"], - ) - s3mapbox.upload_file( - trees_preprocessed_full_path, - s3_credentials["bucket"], - s3_credentials["key"], - ) - - # Tell Mapbox that new data has arrived - url = "https://api.mapbox.com/uploads/v1/{}?access_token={}".format( - os.getenv("MAPBOXUSERNAME"), os.getenv("MAPBOXTOKEN") - ) - payload = '{{"url":"http://{}.s3.amazonaws.com/{}","tileset":"{}.{}","name":"{}"}}'.format( - s3_credentials["bucket"], - s3_credentials["key"], - os.getenv("MAPBOXUSERNAME"), - os.getenv("MAPBOXTILESET"), - os.getenv("MAPBOXLAYERNAME"), - ) - headers = { - "content-type": "application/json", - "Accept-Charset": "UTF-8", - "Cache-Control": "no-cache", - } - response = requests.post(url, data=payload, headers=headers) - if response.status_code != 201: - logging.error("Could not generate Mapbox upload") - logging.error(response.content) - - upload_id = json.loads(response.content)["id"] - logging.info( - f"Initialized generation of Mapbox tilesets for upload={upload_id}..." - ) - - # Check for the status of Mapbox upload until completed or error - complete = False - error = None - while not complete and error is None: - url = "https://api.mapbox.com/uploads/v1/{}/{}?access_token={}".format( - os.getenv("MAPBOXUSERNAME"), upload_id, os.getenv("MAPBOXTOKEN") - ) - headers = { - "content-type": "application/json", - "Accept-Charset": "UTF-8", - "Cache-Control": "no-cache", - } - response = requests.get(url, headers=headers) - responseJson = json.loads(response.content) - complete = responseJson["complete"] - error = responseJson["error"] - progress = responseJson["progress"] - logging.info( - f"Waiting for tileset generation for upload={upload_id} progress={progress} complete={complete} error={error}" - ) - time.sleep(2) - - if error is not None: - logging.error(error) - exit(1) - - except Exception as error: - logging.error("Could not upload tree data to Mapbox for vector tiles") - logging.error(error) - exit(1) - else: - logging.info("Skipping Mapbox Tileset generation.") - - # Clean up - trees_csv = None - csv_data = None - -# Remove all temporary files -shutil.rmtree(path) - -# Close the database connection -if conn is not None: - conn.close() diff --git a/harvester/prepare.py b/harvester/prepare.py deleted file mode 100644 index 47135e2f..00000000 --- a/harvester/prepare.py +++ /dev/null @@ -1,16 +0,0 @@ -# building a buffer shape for filtering the weather data -import geopandas -from shapely.ops import cascaded_union - -berlin = geopandas.read_file("./assets/Berlin.shp") -berlin = berlin.to_crs("epsg:3857") -berlin_boundary = geopandas.GeoDataFrame(geopandas.GeoSeries(cascaded_union(berlin['geometry']))) -berlin_boundary = berlin_boundary.rename(columns={0:'geometry'}).set_geometry('geometry') - -berlin_buffer = berlin_boundary.buffer(2000) -berlin_buffer = berlin_buffer.simplify(1000) - -berlin_buffer = geopandas.GeoDataFrame(berlin_buffer) -berlin_buffer = berlin_buffer.rename(columns={0:'geometry'}).set_geometry('geometry') -berlin_buffer.crs = "epsg:3857" -berlin_buffer.to_file("./assets/buffer.shp") \ No newline at end of file diff --git a/harvester/prepare/create-buffer.py b/harvester/prepare/create-buffer.py new file mode 100644 index 00000000..8298ad6e --- /dev/null +++ b/harvester/prepare/create-buffer.py @@ -0,0 +1,21 @@ +# building a buffer shape for filtering the weather data +import geopandas +from shapely.ops import unary_union + +berlin = geopandas.read_file("../assets/berlin.shp") + +berlin = berlin.to_crs("epsg:3857") +berlin_boundary = geopandas.GeoDataFrame( + geopandas.GeoSeries(unary_union(berlin["geometry"])) +) +berlin_boundary = berlin_boundary.rename(columns={0: "geometry"}).set_geometry( + "geometry" +) + +berlin_buffer = berlin_boundary.buffer(2000) +berlin_buffer = berlin_buffer.simplify(1000) + +berlin_buffer = geopandas.GeoDataFrame(berlin_buffer) +berlin_buffer = berlin_buffer.rename(columns={0: "geometry"}).set_geometry("geometry") +berlin_buffer.crs = "epsg:3857" +berlin_buffer.to_file("../assets/buffer.shp") diff --git a/harvester/grid/grid.py b/harvester/prepare/create-grid.py similarity index 74% rename from harvester/grid/grid.py rename to harvester/prepare/create-grid.py index b7b32b2e..71fdc86a 100644 --- a/harvester/grid/grid.py +++ b/harvester/prepare/create-grid.py @@ -16,6 +16,7 @@ import linecache from shapely.wkt import dumps from dotenv import load_dotenv + load_dotenv() logging.basicConfig() @@ -23,8 +24,8 @@ # name of folder that contains the data temp = "temp" -# if os.path.isdir(temp) != True: -# os.mkdir(temp) +if os.path.isdir(temp) != True: + os.mkdir(temp) # # grid file # if (len(sys.argv) < 2): @@ -37,8 +38,7 @@ # check if all required environmental variables are accessible for env_var in ["PG_DB", "PG_PORT", "PG_USER", "PG_PASS", "PG_SERVER"]: if env_var not in os.environ: - logging.error( - "❌Environmental Variable {} does not exist".format(env_var)) + logging.error("❌Environmental Variable {} does not exist".format(env_var)) pg_server = os.getenv("PG_SERVER") pg_port = os.getenv("PG_PORT") @@ -58,7 +58,7 @@ conn = None # we need to give each grid cell a unique value, otherwise gdal_polygonize will combine cells with equal values -base_grid_file = "grid-germany.asc" +base_grid_file = "../assets/grid-germany.asc" asc_data = numpy.loadtxt(base_grid_file, skiprows=6) col_value = 1 for r_idx, row in enumerate(asc_data): @@ -66,32 +66,53 @@ asc_data[r_idx][c_idx] = col_value col_value += 1 -header = linecache.getline(base_grid_file, 1) + \ - linecache.getline(base_grid_file, 2) + \ - linecache.getline(base_grid_file, 3) + \ - linecache.getline(base_grid_file, 4) + \ - linecache.getline(base_grid_file, 5) + \ - linecache.getline(base_grid_file, 6) - -numpy.savetxt(temp + "/grid-transform.asc", asc_data, - header=header.rstrip(), comments='', fmt='%i') +header = ( + linecache.getline(base_grid_file, 1) + + linecache.getline(base_grid_file, 2) + + linecache.getline(base_grid_file, 3) + + linecache.getline(base_grid_file, 4) + + linecache.getline(base_grid_file, 5) + + linecache.getline(base_grid_file, 6) +) + +numpy.savetxt( + temp + "/grid-transform.asc", + asc_data, + header=header.rstrip(), + comments="", + fmt="%i", +) # use gdalwarp as commandline tool to crop the example weather dataset to the outline of our area of interest that is defined by the file buffer.shp # with the arguments -s_srs and -t-srs we set the spatial reference systems for the source and the target files # for our example weather data the srs is: "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m" -cmdline = ['gdalwarp', temp + "/grid-transform.asc", temp + "/grid-buffer.asc", "-s_srs", "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", - "-t_srs", "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", "-r", "near", "-of", "GTiff", "-cutline", "buffer.shp"] +cmdline = [ + "gdalwarp", + temp + "/grid-transform.asc", + temp + "/grid-buffer.asc", + "-s_srs", + "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", + "-t_srs", + "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", + "-r", + "near", + "-of", + "GTiff", + "-cutline", + "../assets/buffer.shp", +] subprocess.call(cmdline) -# use gdal_polygonize to transform the grid into a vector. a polygon is created for each pixel in our area of interest with a unique ID (MYFLD). these polygons form our grid and are saved in a shapefile. +# use gdal_polygonize to transform the grid into a vector. a polygon is created for each pixel in our area of interest with a unique ID (MYFLD). these polygons form our grid and are saved in a shapefile. cmdline = [ "gdal_polygonize.py", temp + "/grid-buffer.asc", - "-f", "ESRI Shapefile", + "-f", + "ESRI Shapefile", temp + "/grid.shp", "temp", "MYFLD", - "-q" + "-q", ] subprocess.call(cmdline) @@ -99,8 +120,8 @@ df = geopandas.read_file(temp + "/grid.shp") df = df.to_crs("epsg:4326") -if df['geometry'].count() > 0: - clean = df[(df['MYFLD'].notnull())] # (df['MYFLD'] > 0) & +if df["geometry"].count() > 0: + clean = df[(df["MYFLD"].notnull())] # (df['MYFLD'] > 0) & if len(clean) > 0: values = [] for index, row in clean.iterrows(): @@ -111,12 +132,13 @@ psycopg2.extras.execute_batch( cur, "INSERT INTO public.radolan_geometry (geometry) VALUES (ST_GeomFromText(%s, 4326));", - values + values, ) conn.commit() cur.execute( - "UPDATE public.radolan_geometry SET centroid = ST_Centroid(geometry);") + "UPDATE public.radolan_geometry SET centroid = ST_Centroid(geometry);" + ) conn.commit() cur.close() diff --git a/harvester/sample.env b/harvester/sample.env index efc25739..676cdb80 100644 --- a/harvester/sample.env +++ b/harvester/sample.env @@ -1,33 +1,15 @@ -# PostgreSQL Server for storing the data -PG_SERVER= -PG_PORT=5432 -PG_USER= -PG_PASS= -PG_DB= - -# Supabase Storage access data to store the resulting geojson/csv files -SUPABASE_URL= -SUPABASE_SERVICE_ROLE_KEY= - -SUPABASE_BUCKET_NAME= - -# Generation of maptiles through mapbox requires api credentials and desired tileset name -MAPBOXUSERNAME= -MAPBOXTOKEN= -MAPBOXTILESET= -MAPBOXLAYERNAME= - -# OUTPUT=True -LOGGING=INFO -#ERROR, WARNING, INFO -# for your docker environment -# PG_SERVER=postgres -# PG_PORT=5432 -# PG_USER=postgres -# PG_PASS=postgres_password -# PG_DB=postgres -# SUPABASE_URL= -# SUPABASE_SERVICE_ROLE_KEY= - -# SUPABASE_BUCKET_NAME= -# OUTPUT=True +PG_SERVER=localhost +PG_PORT=54322 +PG_USER=postgres +PG_PASS=postsgres +PG_DB=postgres +SUPABASE_URL=http://127.0.0.1:54321 +SUPABASE_SERVICE_ROLE=eyJh... +SUPABASE_BUCKET_NAME=data_assets +MAPBOXUSERNAME=your_mapbox_username +MAPBOXTOKEN=your_mapbox +MAPBOXTILESET=your_mapbox_tileset_id +MAPBOXLAYERNAME=your_mapbox_layer_name +SKIP_MAPBOX=False +LIMIT_DAYS=30 +SURROUNDING_SHAPE_FILE=./assets/buffer.shp \ No newline at end of file diff --git a/harvester/src/build_radolan_grid.py b/harvester/src/build_radolan_grid.py new file mode 100644 index 00000000..492c2618 --- /dev/null +++ b/harvester/src/build_radolan_grid.py @@ -0,0 +1,95 @@ +from datetime import datetime +from datetime import timedelta +import logging + + +def build_radolan_grid(limit_days, db_conn): + """Builds a radolon grid based on radolon data in database + + Args: + limit_days (number): number of previous days to harvest data for + db_conn (_type_): the database connection + + + Returns: + _type_: grid of radolan data containing hourly radolan data for each hour of every polygon, structure below: + [ + [ + [radolon_hour_0, radolon_hour_1, ..., radolon_hour_x], + 25, + '{"type":"Polygon","coordinates":[coordinates_for_polygon_0]}'] + ], + + [ + [radolon_hour_0, radolon_hour_1, ..., radolon_hour_x], + 3, + '{"type":"Polygon","coordinates":[coordinates_for_polygon_1]}'] + ], + + ... + + [ + [radolon_hour_0, radolon_hour_1, ..., radolon_hour_x], + 40, + '{"type":"Polygon","coordinates":[coordinates_for_polygon_x]}'] + ] + ] + """ + logging.info(f"Building radolan grid for last {limit_days} days...") + grid = [] + with db_conn.cursor() as cur: + cur.execute( + """ + SELECT + radolan_geometry.id AS geometry_id, + ST_AsGeoJSON(radolan_geometry.geometry) AS geometry_geojson, + ARRAY_AGG(radolan_data.measured_at) AS measured_at, + ARRAY_AGG(radolan_data.value) AS value + FROM + radolan_geometry + JOIN radolan_data ON radolan_geometry.id = radolan_data.geom_id + WHERE + radolan_data.measured_at > NOW() - INTERVAL '{} days' + GROUP BY + radolan_geometry.id, + radolan_geometry.geometry; + """.format( + limit_days + ) + ) + grid = cur.fetchall() + db_conn.commit() + + end_date = datetime.now() + timedelta(days=-1) + end_date = end_date.replace(hour=23, minute=50, second=0, microsecond=0) + start_date = datetime.now() + timedelta(days=-limit_days) + start_date = start_date.replace(hour=0, minute=50, second=0, microsecond=0) + + grid_radolan_values = [] + for cell in grid: + measured_dates = cell[2] + measured_radolan_values = cell[3] + radolan_values_for_cell = [] + loop_date = start_date + while loop_date <= end_date: + found = False + for date_index, date in enumerate(measured_dates): + if loop_date == date: + found = True + radolan_values_for_cell.append(measured_radolan_values[date_index]) + if found == False: + radolan_values_for_cell.append(0) + loop_date += timedelta(hours=1) + grid_radolan_values.append(radolan_values_for_cell) + + formatted_grid_radolan_values = [] + for cell_index, cell in enumerate(grid): + formatted_grid_radolan_values.append( + [ + grid_radolan_values[cell_index], + sum(grid_radolan_values[cell_index]), + cell[1], + ] + ) + + return formatted_grid_radolan_values diff --git a/harvester/src/download_radolan_data.py b/harvester/src/download_radolan_data.py new file mode 100644 index 00000000..3ed626e2 --- /dev/null +++ b/harvester/src/download_radolan_data.py @@ -0,0 +1,70 @@ +from datetime import timedelta +import urllib.request +import logging +import os +import gzip +import tarfile +import shutil + +# We are using Radolan data from DWD +# https://www.dwd.de/DE/leistungen/radolan/radolan.html +# https://opendata.dwd.de/climate_environment/CDC/grids_germany/hourly/radolan/recent/asc/DESCRIPTION_gridsgermany-hourly-radolan-recent-asc_en.pdf +url = f"https://opendata.dwd.de/climate_environment/CDC/grids_germany/hourly/radolan/recent/asc" + + +def download_radolan_data(start_date, end_date, path): + """Download Radolan data from DWD + Args: + start_date (str): The first day to download Radolan data for + end_date (str): The last day to download Radolan data for + path (str): The full path where the downloaded files should be stored + Returns: + list[str]: List of file paths of the downloaded files. Each file contains zipped Radolan data files for each hour of the day. + """ + downloaded_files = [] + while start_date <= end_date: + date_str = start_date.strftime("%Y%m%d") + file_name = f"RW-{date_str}.tar.gz" + download_url = f"{url}/{file_name}" + dest_file = os.path.join(path, file_name) + urllib.request.urlretrieve(download_url, dest_file) + downloaded_files.append(dest_file) + start_date += timedelta(days=1) + logging.info(f"Downloading {download_url}...") + + return downloaded_files + + +def unzip_radolan_data(zipped_radar_files, root_path): + """Extract the previously downloaded Radolan files to get the hourly Radolan files + Args: + zipped_radar_files (list[str]): List of zipped Radolan files + root_path (str): Path where the extracted files should be stored + Returns: + list[str]: List of paths to the extracted Radolan files. Each file contains the hourly Radolan data. + """ + for _, filename in enumerate(zipped_radar_files): + if filename.endswith(".tar.gz"): + # Unzip + with gzip.open(filename, "rb") as f_in: + with open(filename[:-3], "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(filename) + + # Untar + with tarfile.open(filename[:-3], "r") as tar: + temp_path = filename[:-7] + os.makedirs(temp_path, exist_ok=True) + tar.extractall(path=temp_path) + tar.close() + + os.remove(filename[:-3]) + logging.info(f"Extracting hourly Radolan files from: {filename}...") + + unzipped_files = [ + os.path.join(root, file) + for root, _, files in os.walk(root_path) + for file in files + ] + + return unzipped_files diff --git a/harvester/src/dwd_harvest.py b/harvester/src/dwd_harvest.py new file mode 100644 index 00000000..37b95e51 --- /dev/null +++ b/harvester/src/dwd_harvest.py @@ -0,0 +1,67 @@ +from datetime import datetime +import tempfile +from download_radolan_data import download_radolan_data, unzip_radolan_data +from project_radolan_data import project_radolan_data, polygonize_data +from extract_radolan_data import extract_radolan_data_from_shapefile +from radolan_db_utils import ( + upload_radolan_data_in_db, + cleanup_radolan_entries, +) +from build_radolan_grid import build_radolan_grid + + +def harvest_dwd( + surrounding_shape_file, start_date, end_date, limit_days, database_connection +): + """Starts harvesting DWD radolan data based on start_date and end_date. + Builds a grid of radolan data containing hourly radolan data for every polygon in the grid. + Args: + surrounding_shape_file (shapefile): shapefile for area of interest + start_date (datetime): first day of radolon data to harvest + end_date (datetime): last day of radolon data to harvest + limit_days (number): number of previous days to harvest data for + database_connection (_type_): database connection + Returns: + _type_: grid of radolan data + """ + with tempfile.TemporaryDirectory() as temp_dir: + + # Download daily Radolan files from DWD for whole Germany + daily_radolan_files = download_radolan_data(start_date, end_date, temp_dir) + + # Extract downloaded daily Radolan files into hourly Radolan data files + hourly_radolan_files = unzip_radolan_data(daily_radolan_files, temp_dir) + + # Process all hourly Radolan files + for hourly_radolan_file in hourly_radolan_files: + + filename = hourly_radolan_file.split("/")[-1] + measured_at_timestamp = datetime.strptime(filename, "RW_%Y%m%d-%H%M.asc") + + with tempfile.TemporaryDirectory() as hourly_temp_dir: + + # Generate projected GeoTIFF file containing projected data for given shape file only + projected_radolan_geotiff = project_radolan_data( + hourly_radolan_file, surrounding_shape_file, hourly_temp_dir + ) + + # Polygonize given GeoTIFF file + polygonized_radolan = polygonize_data( + projected_radolan_geotiff, hourly_temp_dir + ) + + # Extract Radolan data + extracted_radolan_values = extract_radolan_data_from_shapefile( + polygonized_radolan, measured_at_timestamp + ) + + # Update Radolan data in DB + upload_radolan_data_in_db(extracted_radolan_values, database_connection) + + # After all database inserts, cleanup db + _ = cleanup_radolan_entries(limit_days, database_connection) + + # Build radolan grid based on database values + radolan_grid = build_radolan_grid(limit_days, database_connection) + + return radolan_grid diff --git a/harvester/src/extract_radolan_data.py b/harvester/src/extract_radolan_data.py new file mode 100644 index 00000000..ae502925 --- /dev/null +++ b/harvester/src/extract_radolan_data.py @@ -0,0 +1,37 @@ +import geopandas +from shapely.wkt import dumps +import logging + + +# Resources: +# https://epsg.io/3857 +# https://doc.arcgis.com/en/arcgis-online/reference/shapefiles.htm +def extract_radolan_data_from_shapefile(polygonized_shape_file, measured_at_timestamp): + """Extract radolon values from given shapefile + + Args: + polygonized_shape_file (_type_): the shapefile data should be extracted from + measured_at_timestamp (_type_): the timestamp of the extraction + + Returns: + _type_: list of extracted radolon data for each cell in the shapefile + """ + logging.info(f"Extracting radolan data for {polygonized_shape_file}...") + radolan_field_key = "RDLFIELD" + df = geopandas.read_file(polygonized_shape_file) + df = df.to_crs("epsg:3857") + values = [] + if df["geometry"].count() > 0: + radolan_data = df[ + (df[radolan_field_key] > 0) & (df[radolan_field_key].notnull()) + ] + if len(radolan_data) > 0: + for _, row in radolan_data.iterrows(): + values.append( + [ + dumps(row.geometry, rounding_precision=5), + row[radolan_field_key], + measured_at_timestamp, + ] + ) + return values diff --git a/harvester/src/mapbox_tree_update.py b/harvester/src/mapbox_tree_update.py new file mode 100644 index 00000000..cc7502a8 --- /dev/null +++ b/harvester/src/mapbox_tree_update.py @@ -0,0 +1,156 @@ +import os +import tempfile +import subprocess +from datetime import datetime +import logging +from tqdm import tqdm +from mapbox_utils import ( + upload_to_mapbox_storage, + start_tileset_creation, + wait_for_tileset_creation_complete, +) +from supabase_utils import upload_file_to_supabase_storage + + +def preprocess_trees_csv(trees_csv_full_path, temp_dir): + """Preprocesses the given trees.csv file with tippecanoe to fulfill the requirements from Mapbox + to create a tileset + + Args: + trees_csv_full_path (str): the full path to the trees.csv + temp_dir (str): the full path to the directory to store the preprocessed file + + Returns: + str: full path to the preprocessed trees file + """ + logging.info("Preprocessing trees.csv with tippecanoe...") + trees_preprocessed_full_path = os.path.join(temp_dir, "trees-preprocessed.mbtiles") + subprocess.call( + [ + "tippecanoe", + "-zg", + "-o", + trees_preprocessed_full_path, + "--force", + "--drop-fraction-as-needed", + trees_csv_full_path, + ] + ) + return trees_preprocessed_full_path + + +def generate_trees_csv(temp_dir, db_conn): + """Generate a trees.csv file containing all trees currently in the databae + + Args: + temp_dir (str): the full path to the directory to store the preprocessed file + db_conn: the database connection + + Returns: + str: full path to the trees.csv file + """ + logging.info(f"Generating trees.csv...") + current_year = datetime.now().year + with db_conn.cursor() as cur: + + # Fetch all trees from database + cur.execute( + # WARNING: The coordinates in the database columns lat and lng are mislabeled! They mean the opposite. + """ + SELECT + trees.id, + trees.lat, + trees.lng, + trees.radolan_sum, + trees.pflanzjahr, + (SELECT COALESCE(SUM(w.amount), 0)::INT AS total_amount FROM trees_watered w WHERE w.timestamp >= CURRENT_DATE - INTERVAL '30 days' AND DATE_TRUNC('day', w.timestamp) < CURRENT_DATE AND w.tree_id = trees.id) as watering_sum + FROM + trees + WHERE + ST_CONTAINS(ST_SetSRID (( + SELECT + ST_EXTENT (geometry) + FROM radolan_geometry), 4326), trees.geom) + """ + ) + trees = cur.fetchall() + logging.info(f"Creating trees.csv file for {len(trees)} trees...") + + # Build CSV file with all trees in it + header = "id,lng,lat,radolan_sum,age,watering_sum,total_water_sum_liters" + lines = [] + for tree in tqdm(trees): + id = tree[0] + lat = tree[1] + lng = tree[2] + # precipitation height in 0.1 mm per square meter + # 1mm on a square meter is 1 liter + # e.g. value of 380 = 0.1 * 380 = 38.0 mm * 1 liter = 38 liters + radolan_sum = tree[3] + pflanzjahr = tree[4] + watering_sum = tree[5] + age = int(current_year) - int(pflanzjahr) if int(pflanzjahr) != 0 else "" + # calculated in liters to be easily usable in the frontend + total_water_sum_liters = (radolan_sum / 10) + watering_sum + line = f"{id}, {lat}, {lng}, {radolan_sum}, {age}, {watering_sum}, {total_water_sum_liters}" + lines.append(line) + trees_csv = "\n".join([header] + lines) + trees_csv_full_path = os.path.join(temp_dir, "trees.csv") + with open(trees_csv_full_path, "w") as out: + out.write(trees_csv) + + return trees_csv_full_path + + +def update_mapbox_tree_layer( + mapbox_username, + mapbox_token, + mapbox_tileset, + mapbox_layer_name, + supabase_url, + supabase_bucket_name, + supabase_service_role_key, + db_conn, +): + + with tempfile.TemporaryDirectory() as temp_dir: + # Generate trees.csv from trees in database + trees_csv_full_path = generate_trees_csv(temp_dir, db_conn) + + # Preprocess trees.csv with tippecanoe + trees_preprocessed_full_path = preprocess_trees_csv( + trees_csv_full_path, temp_dir + ) + + # Upload preprocessed trees to Supabase storage + upload_file_to_supabase_storage( + supabase_url, + supabase_bucket_name, + supabase_service_role_key, + trees_preprocessed_full_path, + "trees-preprocessed.mbtiles", + ) + + # Start the Mapbox tileset creating + mapbox_storage_credentials = upload_to_mapbox_storage( + trees_preprocessed_full_path, + mapbox_username, + mapbox_token, + ) + + tileset_generation_id = start_tileset_creation( + mapbox_storage_credentials, + mapbox_username, + mapbox_tileset, + mapbox_layer_name, + mapbox_token, + ) + + tileset_creation_error = wait_for_tileset_creation_complete( + tileset_generation_id, + mapbox_username, + mapbox_token, + ) + + if tileset_creation_error is not None: + logging.error("Could not create Mapbox tileset") diff --git a/harvester/src/mapbox_utils.py b/harvester/src/mapbox_utils.py new file mode 100644 index 00000000..d6cb8d79 --- /dev/null +++ b/harvester/src/mapbox_utils.py @@ -0,0 +1,122 @@ +import requests +import json +import time +import boto3 +import logging + + +def upload_to_mapbox_storage(path_to_file, mapbox_username, mapbox_token): + """Uploads the given file to a Mapbox storage + + Args: + path_to_file (str): the full path to the file to upload + mapbox_username (str): the Mapbox username + mapbox_token (str): the Mapbox token + + Returns: + _type_: Mabox credentials dictionary holding bucket and key information + """ + logging.info(f"Uploading data to Mapbox storage...") + url = "https://api.mapbox.com/uploads/v1/{}/credentials?access_token={}".format( + mapbox_username, mapbox_token + ) + response = requests.post(url) + mapbox_storage_credentials = json.loads(response.content) + + s3mapbox = boto3.client( + "s3", + aws_access_key_id=mapbox_storage_credentials["accessKeyId"], + aws_secret_access_key=mapbox_storage_credentials["secretAccessKey"], + aws_session_token=mapbox_storage_credentials["sessionToken"], + ) + + s3mapbox.upload_file( + path_to_file, + mapbox_storage_credentials["bucket"], + mapbox_storage_credentials["key"], + ) + + return mapbox_storage_credentials + + +def start_tileset_creation( + mapbox_storage_credentials, + mapbox_username, + mapbox_tileset, + mapbox_layer_name, + mapbox_token, +): + """Start the Mapbox tileset creation + + Args: + path_tomapbox_storage_credentials_file (dict): dictionary holding bucket and key of the data + mapbox_username (str): the Mapbox username + mapbox_tileset (str): the Mapbox tileset name + mapbox_token (str): the Mapbox token + + Returns: + str: Mapbox upload id of the started tileset creation + """ + logging.info(f"Starting Mapbox tileset creation...") + url = "https://api.mapbox.com/uploads/v1/{}?access_token={}".format( + mapbox_username, mapbox_token + ) + payload = '{{"url":"http://{}.s3.amazonaws.com/{}","tileset":"{}.{}","name":"{}"}}'.format( + mapbox_storage_credentials["bucket"], + mapbox_storage_credentials["key"], + mapbox_username, + mapbox_tileset, + mapbox_layer_name, + ) + headers = { + "content-type": "application/json", + "Accept-Charset": "UTF-8", + "Cache-Control": "no-cache", + } + response = requests.post(url, data=payload, headers=headers) + if response.status_code != 201: + logging.error("Could not start Mapbox tileset creation") + logging.error(response.content) + + upload_id = json.loads(response.content)["id"] + + return upload_id + + +def wait_for_tileset_creation_complete( + tileset_generation_id, mapbox_username, mapbox_token +): + """Checks progress of the Mapbox tileset creation profcess + + Args: + tileset_generation_id (str): the ID of the started Mapbox tileset creation + mapbox_username (str): the Mapbox username + mapbox_token (str): the Mapbox token + + Returns: + error: None, if no error occurred + """ + complete = False + error = None + while not complete and error is None: + url = "https://api.mapbox.com/uploads/v1/{}/{}?access_token={}".format( + mapbox_username, + tileset_generation_id, + mapbox_token, + ) + headers = { + "content-type": "application/json", + "Accept-Charset": "UTF-8", + "Cache-Control": "no-cache", + } + response = requests.get(url, headers=headers) + responseJson = json.loads(response.content) + complete = responseJson["complete"] + error = responseJson["error"] + progress = responseJson["progress"] + logging.info( + f"Waiting for tileset creation for upload={tileset_generation_id} progress={progress} complete={complete} error={error}" + ) + time.sleep(2) + + return error diff --git a/harvester/src/project_radolan_data.py b/harvester/src/project_radolan_data.py new file mode 100644 index 00000000..f89e9d9e --- /dev/null +++ b/harvester/src/project_radolan_data.py @@ -0,0 +1,64 @@ +import os +import subprocess +import logging + + +# Resources: +# https://proj.org/en/9.2/usage/quickstart.html +# https://brightsky.dev/docs/#/operations/getRadar +# https://gdal.org/programs/gdalwarp.html +# https://www.ogc.org/standard/geotiff/ +def project_radolan_data(hourly_radolan_file, shape_file, tmp_dir): + """Projects the given Radolan to Mercator, cuts out the area of interest by using a shape file. + Args: + hourly_radolan_file (str): Path to the hourly radolan file + shape_file (str): Path to the shape file defining the area of interest + tmp_dir (str): Path to the directory holding the temp files + Returns: + str: Path to the generated GeoTIFF file + """ + logging.info(f"Projecting radolan data for {hourly_radolan_file}...") + output_file = os.path.join(tmp_dir, hourly_radolan_file.split("/")[-1] + ".tiff") + cmdline = [ + "gdalwarp", + hourly_radolan_file, + output_file, + "-s_srs", + "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", + "-t_srs", + "+proj=stere +lon_0=10.0 +lat_0=90.0 +lat_ts=60.0 +a=6370040 +b=6370040 +units=m", + "-r", + "near", + "-of", + "GTiff", + "-cutline", + shape_file, + ] + subprocess.call(cmdline, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + return output_file + + +# Resources: +# https://gdal.org/programs/gdal_polygonize.html +# https://www.esri.com/content/dam/esrisites/sitecore-archive/Files/Pdfs/library/whitepapers/pdfs/shapefile.pdf +def polygonize_data(input_raster_file, root_dir): + """Produces a polygon feature layer from a raster as an ESRI Shapefile + Args: + input_raster_file (str): Path to input raster file + root_dir (str): Path to the directory holding the temp files + Returns: + str: Path to generated ESRI Shapefile + """ + logging.info(f"Polygonize radolan data for {input_raster_file}...") + output_file = os.path.join(root_dir, input_raster_file.split("/")[-1] + ".shp") + cmdline = [ + "gdal_polygonize.py", + input_raster_file, + "-f", + "ESRI Shapefile", + output_file, + "RDLLAYER", + "RDLFIELD", + ] + subprocess.call(cmdline, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + return output_file diff --git a/harvester/src/radolan_db_utils.py b/harvester/src/radolan_db_utils.py new file mode 100644 index 00000000..ef54eb25 --- /dev/null +++ b/harvester/src/radolan_db_utils.py @@ -0,0 +1,114 @@ +import psycopg2 +from datetime import datetime +from datetime import timedelta +import logging + + +def get_start_end_harvest_dates(db_conn): + """Gets first and last day for harvesting + + Args: + db_conn (_type_): the database connection + Returns: + _type_: array containing start_date and end_date + """ + logging.info(f"Getting first and last day for harvesting...") + with db_conn.cursor() as cur: + cur.execute("SELECT collection_date FROM radolan_harvester WHERE id = 1") + last_date = cur.fetchone()[0] + end_date = datetime.now() - timedelta(days=1) + start_date = datetime.combine(last_date, datetime.min.time()) + return [start_date, end_date] + + +def upload_radolan_data_in_db(extracted_radolan_values, db_conn): + """Uploads extracted radolon data into database + + Args: + extracted_radolan_values (_type_): the radolon values to upload + db_conn (_type_): the database connection + """ + logging.info(f"Uploading radolan data to database...") + with db_conn.cursor() as cur: + cur.execute("DELETE FROM radolan_temp;") + psycopg2.extras.execute_batch( + cur, + """ + INSERT INTO radolan_temp (geometry, value, measured_at) + VALUES (ST_Multi(ST_Transform(ST_GeomFromText(%s, 3857), 4326)), %s, %s); + """, + extracted_radolan_values, + ) + cur.execute( + """ + INSERT INTO radolan_data (geom_id, value, measured_at) + SELECT radolan_geometry.id, radolan_temp.value, radolan_temp.measured_at + FROM radolan_geometry + JOIN radolan_temp ON ST_WithIn(radolan_geometry.centroid, radolan_temp.geometry); + """ + ) + db_conn.commit() + + +def update_trees_in_database(radolan_grid, db_conn): + """Updates tree radolon data in database + + Args: + radolan_grid (_type_): the radolon value grid to use for updating the trees + db_conn (_type_): the database connection + """ + logging.info(f"Updating trees in database...") + with db_conn.cursor() as cur: + psycopg2.extras.execute_batch( + cur, + """ + UPDATE trees + SET radolan_days = %s, radolan_sum = %s + WHERE ST_CoveredBy(geom, ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326)); + """, + radolan_grid, + ) + db_conn.commit() + + with db_conn.cursor() as cur: + psycopg2.extras.execute_batch( + cur, + """ + UPDATE trees + SET radolan_days = %s, radolan_sum = %s + WHERE trees.radolan_sum IS NULL + AND ST_CoveredBy(geom, ST_Buffer(ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326), 0.0002)); + """, + radolan_grid, + ) + db_conn.commit() + + +def cleanup_radolan_entries(limit_days, db_conn): + """Cleanup radolon data in database (old and duplicated data) + + Args: + limit_days (number): number of previous days to keep radolan data for + db_conn (_type_): the database connection + """ + logging.info(f"Cleanup old and duplicated datat in database...") + with db_conn.cursor() as cur: + # Delete duplicated data + cur.execute( + """ + DELETE FROM radolan_data AS a USING radolan_data AS b + WHERE a.id < b.id AND a.geom_id = b.geom_id + AND a.measured_at = b.measured_at + """ + ) + # Delete old data + cur.execute( + """ + DELETE + FROM radolan_data + WHERE measured_at < NOW() - INTERVAL '{} days' + """.format( + limit_days + ) + ) + db_conn.commit() diff --git a/harvester/src/run_harvester.py b/harvester/src/run_harvester.py new file mode 100644 index 00000000..9eceb3bd --- /dev/null +++ b/harvester/src/run_harvester.py @@ -0,0 +1,91 @@ +import sys +import psycopg2.extras +import psycopg2 +from dotenv import load_dotenv +import logging +import os +from radolan_db_utils import update_trees_in_database +from dwd_harvest import harvest_dwd +from radolan_db_utils import ( + get_start_end_harvest_dates, +) +from mapbox_tree_update import update_mapbox_tree_layer + +# Set up logging +logging.basicConfig() +logging.root.setLevel(logging.INFO) + +# Load the environmental variables +load_dotenv() + +# Check if all required environmental variables are accessible +for env_var in [ + "PG_DB", + "PG_PORT", + "PG_USER", + "PG_PASS", + "SUPABASE_URL", + "SUPABASE_BUCKET_NAME", + "SUPABASE_SERVICE_ROLE_KEY", + "LIMIT_DAYS", + "MAPBOXUSERNAME", + "MAPBOXTOKEN", + "MAPBOXTILESET", + "MAPBOXLAYERNAME", + "SURROUNDING_SHAPE_FILE", +]: + if env_var not in os.environ: + logging.error("❌Environmental Variable {} does not exist".format(env_var)) + sys.exit(1) + +SUPABASE_URL = os.getenv("SUPABASE_URL") +SUPABASE_BUCKET_NAME = os.getenv("SUPABASE_BUCKET_NAME") +SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") +LIMIT_DAYS = int(os.getenv("LIMIT_DAYS")) +SKIP_MAPBOX = os.getenv("SKIP_MAPBOX") == "True" +MAPBOX_USERNAME = os.getenv("MAPBOXUSERNAME") +MAPBOX_TOKEN = os.getenv("MAPBOXTOKEN") +MAPBOX_TILESET = os.getenv("MAPBOXTILESET") +MAPBOX_LAYERNAME = os.getenv("MAPBOXLAYERNAME") +PG_SERVER = os.getenv("PG_SERVER") +PG_PORT = os.getenv("PG_PORT") +PG_USER = os.getenv("PG_USER") +PG_PASS = os.getenv("PG_PASS") +PG_DB = os.getenv("PG_DB") +SURROUNDING_SHAPE_FILE = os.getenv("SURROUNDING_SHAPE_FILE") + +# Establish database connection +try: + database_connection_str = f"host='{PG_SERVER}' port={PG_PORT} user='{PG_USER}' password='{PG_PASS}' dbname='{PG_DB}'" + database_connection = psycopg2.connect(database_connection_str) + logging.info("🗄 Database connection established") +except: + logging.error("❌Could not establish database connection") + database_connection = None + sys.exit(1) + +# Start harvesting DWD data +start_date, end_date = get_start_end_harvest_dates(database_connection) +radolan_grid = harvest_dwd( + surrounding_shape_file=SURROUNDING_SHAPE_FILE, + start_date=start_date, + end_date=end_date, + limit_days=LIMIT_DAYS, + database_connection=database_connection, +) + +# Update trees in database +update_trees_in_database(radolan_grid, database_connection) + +# Update Mapbox layer +if not SKIP_MAPBOX: + update_mapbox_tree_layer( + MAPBOX_USERNAME, + MAPBOX_TOKEN, + MAPBOX_TILESET, + MAPBOX_LAYERNAME, + SUPABASE_URL, + SUPABASE_BUCKET_NAME, + SUPABASE_SERVICE_ROLE_KEY, + database_connection, + ) diff --git a/harvester/src/supabase_utils.py b/harvester/src/supabase_utils.py new file mode 100644 index 00000000..30ce0187 --- /dev/null +++ b/harvester/src/supabase_utils.py @@ -0,0 +1,40 @@ +import requests +import logging + + +# Function to check if a file exists in Supabase storage +def check_file_exists_in_supabase_storage(supabaseUrl, supabaseBucketName, file_name): + url = ( + f"{supabaseUrl}/storage/v1/object/info/public/{supabaseBucketName}/{file_name}" + ) + response = requests.get(url) + return response.status_code == 200 + + +# Function to upload a file to Supabase storage +def upload_file_to_supabase_storage( + supabaseUrl, supabaseBucketName, supabaseServiceRoleKey, file_path, file_name +): + with open(file_path, "rb") as file: + file_url = f"{supabaseUrl}/storage/v1/object/{supabaseBucketName}/{file_name}" + http_method = ( + requests.put + if check_file_exists_in_supabase_storage( + supabaseUrl, supabaseBucketName, file_name + ) + else requests.post + ) + response = http_method( + file_url, + files={"file": file}, + headers={ + "Authorization": f"Bearer {supabaseServiceRoleKey}", + "ContentType": "application/geo+json", + "AcceptEncoding": "gzip, deflate, br", + }, + ) + + if response.status_code == 200: + logging.info(f"Uploaded {file_name} to Supabase storage...") + else: + logging.error(response)