Skip to content

Commit

Permalink
Move to cloud composer 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Neha Singh committed Feb 27, 2024
1 parent 69be7ab commit ea1f49e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 27 deletions.
10 changes: 5 additions & 5 deletions airtable_scripts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tempfile

import requests
from airflow.hooks.base_hook import BaseHook
from dataloader.airflow_utils.utils import get_connection_info
from google.cloud import storage
from more_itertools import batched

Expand Down Expand Up @@ -110,8 +110,8 @@ def gcs_to_airtable_airflow(
column names will be used in Airtable
:return: None
"""
connection = BaseHook.get_connection("airtable")
token = connection.password
connection = get_connection_info("ETO_scout_airtable")
token = connection["password"]
gcs_to_airtable(bucket_name, input_prefix, table_name, base_id, token, column_map)


Expand Down Expand Up @@ -191,8 +191,8 @@ def airtable_to_gcs_airflow(
column names will be used in Airtable
:return: None
"""
connection = BaseHook.get_connection("airtable")
token = connection.password
connection = get_connection_info("ETO_scout_airtable")
token = connection["password"]
airtable_to_gcs(table_name, base_id, bucket_name, output_prefix, token, column_map)


Expand Down
10 changes: 5 additions & 5 deletions airtable_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
GCSToBigQueryOperator,
)
from dataloader.airflow_utils.defaults import (
DEV_DATA_BUCKET,
GCP_ZONE,
DAGS_DIR,
DATA_BUCKET,
PROJECT_ID,
get_default_args,
get_post_success,
Expand All @@ -28,7 +28,7 @@

DATASET = "airtable_to_bq"
STAGING_DATASET = f"staging_{DATASET}"
CONFIG_PATH = os.path.join(f"{os.environ.get('DAGS_FOLDER')}", f"{DATASET}_config")
CONFIG_PATH = os.path.join(DAGS_DIR, f"{DATASET}_config")
PARENT_CONFIG = "config.json"


Expand All @@ -40,7 +40,7 @@ def update_staging(dag: DAG, start_task, config: dict):
:param config: Task configuration
:return: Final task defined in this function
"""
bucket = DEV_DATA_BUCKET
bucket = DATA_BUCKET
name = config["name"]
sql_dir = f"sql/{DATASET}/{config.get('parent_name', name)}"
schema_dir = f"schemas/{DATASET}/{config.get('parent_name', name)}"
Expand Down Expand Up @@ -192,7 +192,7 @@ def create_dag(dagname: str, config: dict, parent_dir: str = None) -> DAG:
(presumed shared/general in this case) `config`
:return: Dag that runs an import from airtable to bq
"""
default_args = get_default_args()
default_args = get_default_args(pocs=["Neha"])
default_args.pop("on_failure_callback")

dag = DAG(
Expand Down
10 changes: 5 additions & 5 deletions bq_to_airtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
BigQueryToGCSOperator,
)
from dataloader.airflow_utils.defaults import (
DEV_DATA_BUCKET,
GCP_ZONE,
DAGS_DIR,
DATA_BUCKET,
PROJECT_ID,
get_default_args,
get_post_success,
Expand All @@ -26,7 +26,7 @@

DATASET = "bq_to_airtable"
STAGING_DATASET = f"staging_{DATASET}"
CONFIG_PATH = os.path.join(f"{os.environ.get('DAGS_FOLDER')}", f"{DATASET}_config")
CONFIG_PATH = os.path.join(DAGS_DIR, f"{DATASET}_config")
PARENT_CONFIG = "config.json"


Expand All @@ -39,7 +39,7 @@ def update_airtable(dag: DAG, start_task, end_task, config: dict):
:param config: Task configuration
:return: The `add_to_airtable` task, to be executed in series downstream
"""
bucket = DEV_DATA_BUCKET
bucket = DATA_BUCKET
name = config["name"]
sql_dir = f"sql/{DATASET}/{config.get('parent_name', name)}"
tmp_dir = f"{DATASET}/{name if 'parent_name' not in config else config['parent_name']+'_'+name}/tmp"
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_dag(dagname: str, config: dict, parent_dir: str) -> DAG:
(presumed shared/general in this case) `config`
:return: Dag that runs an import from bq to airtable
"""
default_args = get_default_args()
default_args = get_default_args(pocs=["Neha"])
default_args.pop("on_failure_callback")

dag = DAG(
Expand Down
24 changes: 12 additions & 12 deletions push_to_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
gsutil rm -r gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/airtable_scripts
gsutil -m cp -r airtable_scripts gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/
gsutil cp bq_to_airtable.py gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/
gsutil cp airtable_to_bq.py gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/
gsutil cp -r examples/multi_airtable_to_bq_test gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/airtable_to_bq_config/
gsutil cp examples/single_airtable_to_bq_test.json gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/airtable_to_bq_config/
gsutil cp examples/single_bq_to_airtable_test.json gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/bq_to_airtable_config/
gsutil cp -r examples/multi_bq_to_airtable_test gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/bq_to_airtable_config/
gsutil cp examples/sql/* gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/sql/airtable_to_bq/multi_airtable_to_bq_test/
gsutil cp examples/sql/* gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/sql/bq_to_airtable/multi_bq_to_airtable_test/
gsutil cp examples/sql/base2_table1_input.sql gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/sql/bq_to_airtable/single_bq_to_airtable_test/
gsutil cp examples/sql/default_merge.sql gs://us-east1-dev2023-cc1-b088c7e1-bucket/dags/sql/airtable_to_bq/single_airtable_to_bq_test/
gsutil rm -r gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/airtable_scripts
gsutil -m cp -r airtable_scripts gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
gsutil cp bq_to_airtable.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
gsutil cp airtable_to_bq.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/
gsutil cp -r examples/multi_airtable_to_bq_test gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/airtable_to_bq_config/
gsutil cp examples/single_airtable_to_bq_test.json gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/airtable_to_bq_config/
gsutil cp examples/single_bq_to_airtable_test.json gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/bq_to_airtable_config/
gsutil cp -r examples/multi_bq_to_airtable_test gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/bq_to_airtable_config/
gsutil cp examples/sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/airtable_to_bq/multi_airtable_to_bq_test/
gsutil cp examples/sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/bq_to_airtable/multi_bq_to_airtable_test/
gsutil cp examples/sql/base2_table1_input.sql gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/bq_to_airtable/single_bq_to_airtable_test/
gsutil cp examples/sql/default_merge.sql gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/airtable_to_bq/single_airtable_to_bq_test/
gsutil cp examples/schemas/* gs://airflow-data-exchange-development/schemas/airtable_to_bq/multi_airtable_to_bq_test/
gsutil cp examples/schemas/default.json gs://airflow-data-exchange-development/schemas/airtable_to_bq/single_airtable_to_bq_test/

0 comments on commit ea1f49e

Please sign in to comment.