From 1d31bec35affdb18b7206501584cdcd014fa4465 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Fri, 10 May 2024 14:11:25 -0400 Subject: [PATCH 1/2] Ensure tasks added to dag in consistent order --- airtable_to_bq.py | 3 ++- bq_to_airtable.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airtable_to_bq.py b/airtable_to_bq.py index f3036e5..94d8927 100644 --- a/airtable_to_bq.py +++ b/airtable_to_bq.py @@ -58,6 +58,7 @@ def update_staging(dag: DAG, start_task, config: dict): "bucket_name": bucket, "output_prefix": f"{tmp_dir}/data", "column_map": config.get("column_map"), + "include_airtable_id": config.get("include_airtable_id"), }, python_callable=airtable_to_gcs_airflow, ) @@ -211,7 +212,7 @@ def create_dag(dagname: str, config: dict, parent_dir: str = None) -> DAG: if parent_dir: child_configs = [] - for child_config_fi in os.listdir(parent_dir): + for child_config_fi in sorted(os.listdir(parent_dir)): if child_config_fi != PARENT_CONFIG: with open(os.path.join(parent_dir, child_config_fi)) as f: child_config = json.loads(f.read()) diff --git a/bq_to_airtable.py b/bq_to_airtable.py index 1958991..f4ea45f 100644 --- a/bq_to_airtable.py +++ b/bq_to_airtable.py @@ -119,7 +119,7 @@ def create_dag(dagname: str, config: dict, parent_dir: str) -> DAG: prev_task = wait_for_export if parent_dir: - for child_config_fi in os.listdir(parent_dir): + for child_config_fi in sorted(os.listdir(parent_dir)): if child_config_fi != PARENT_CONFIG: with open(os.path.join(parent_dir, child_config_fi)) as f: child_config = json.loads(f.read()) From 1efeb8b008c1ef689f09717f9a02eedd282f8f83 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Fri, 10 May 2024 14:11:55 -0400 Subject: [PATCH 2/2] Add option to include airtable id in results --- airtable_scripts/utils.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/airtable_scripts/utils.py b/airtable_scripts/utils.py index 3d70dda..eaef7bf 100644 --- a/airtable_scripts/utils.py +++ b/airtable_scripts/utils.py @@ -115,12 +115,15 @@ def gcs_to_airtable_airflow( gcs_to_airtable(bucket_name, input_prefix, table_name, base_id, token, column_map) -def get_airtable_iter(table_name: str, base_id: str, token: str) -> iter: +def get_airtable_iter( + table_name: str, base_id: str, token: str, include_airtable_id: bool = False +) -> iter: """ Retrieves data from an airtable table :param table_name: Airtable table name we are retrieving data from :param base_id: Airtable base :param token: Airtable access token + :param include_airtable_id: If true, the airtable id of each row will be included in the results :return: Iterable of rows from `table_name` """ headers = {"Authorization": f"Bearer {token}"} @@ -133,7 +136,10 @@ def get_airtable_iter(table_name: str, base_id: str, token: str) -> iter: ) data = result.json() for row in data["records"]: - yield row["fields"] + content = row["fields"] + if include_airtable_id: + content["airtable_id"] = row["id"] + yield content # An offset will be provided if there is an additional page of data to retrieve if not data.get("offset"): break @@ -147,6 +153,7 @@ def airtable_to_gcs( output_prefix: str, token: str, column_map: dict, + include_airtable_id: bool = False, ) -> None: """ Retrieves data from airtable and writes it to GCS @@ -157,12 +164,13 @@ def airtable_to_gcs( :param token: Airtable access token :param column_map: A mapping from Airtable column names to BigQuery column names. If null, BigQuery column names will be used in Airtable + :param include_airtable_id: If true, the airtable id of each row will be included in the results :return: None """ gcs_client = storage.Client() bucket = gcs_client.get_bucket(bucket_name) blob = bucket.blob(output_prefix.strip("/") + "/data.jsonl") - data = get_airtable_iter(table_name, base_id, token) + data = get_airtable_iter(table_name, base_id, token, include_airtable_id) with blob.open("w") as f: for row in data: if column_map: @@ -180,6 +188,7 @@ def airtable_to_gcs_airflow( bucket_name: str, output_prefix: str, column_map: dict, + include_airtable_id: bool = False, ) -> None: """ Calls `airtable_to_gcs` from airflow, where we can grab the API key from the airtable connection @@ -189,11 +198,20 @@ def airtable_to_gcs_airflow( :param output_prefix: GCS prefix where output data should go within `bucket` :param column_map: A mapping from Airtable column names to BigQuery column names. If null, BigQuery column names will be used in Airtable + :param include_airtable_id: If true, the airtable id of each row will be included in the results :return: None """ connection = get_connection_info("ETO_scout_airtable") token = connection["password"] - airtable_to_gcs(table_name, base_id, bucket_name, output_prefix, token, column_map) + airtable_to_gcs( + table_name, + base_id, + bucket_name, + output_prefix, + token, + column_map, + include_airtable_id, + ) if __name__ == "__main__":