Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance standard templates #75

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions templates/kudu-table-ddl/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ kudu-table-clean: kudu-table-drop.sql ## Drop Kudu table

tables-clean: kudu-table-clean ## Drop all tables

kudu-table-alter: kudu-table-alter.sql ## Alter Kudu table
$(impala-cmd) kudu-table-alter.sql

tables-alter: kudu-table-alter ## Alter all tables

tables: kudu-table ## Create all tables

integration-test: # Run integration-tests
Expand Down
7 changes: 7 additions & 0 deletions templates/kudu-table-ddl/Makefile.meta
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ tables-clean-{{ table.id }}:
$(MAKE) tables-clean -C {{ table.id }}
{%- endfor %}

tables-alter-all: {%- for table in tables %} tables-alter-{{ table.id }} {%- endfor %}

{%- for table in tables %}
tables-alter-{{ table.id }}:
$(MAKE) tables-alter -C {{ table.id }}
{%- endfor %}

tables-compute-stats-all: {%- for table in tables %} tables-compute-stats-{{ table.id }} {%- endfor %}

{%- for table in tables %}
Expand Down
2 changes: 1 addition & 1 deletion templates/kudu-table-ddl/compute-stats.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
-#}
-- Compute table stats for optimized joins
USE {{ conf.staging_database.name }};
COMPUTE STATS {{ table.destination.name }}_kudu;
COMPUTE STATS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %};

1 change: 1 addition & 0 deletions templates/kudu-table-ddl/imports
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
../shared/run-with-logging.sh
../shared/kudu-table-drop.sql
../shared/kudu-table-create.sql
../shared/kudu-table-alter.sql
26 changes: 26 additions & 0 deletions templates/shared/kudu-table-alter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{#- Copyright 2017 Cargill Incorporated

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-#}
-- Create a Kudu table in Impala
USE {{ conf.staging_database.name }};
ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %}
SET TBLPROPERTIES(
{%- if table.metadata %}
{%- for key, value in table.metadata.items() %}
'{{ key }}' = '{{ value }}',
{%- endfor %}
{%- endif %}
{%- for column in table.columns -%}
"{{ cleanse_column(column.name)|lower }}" = "{{ column.comment }}"{%- if not loop.last -%},{% endif %}
{%- endfor -%})
19 changes: 15 additions & 4 deletions templates/shared/kudu-table-create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@
-#}
-- Create a Kudu table in Impala
USE {{ conf.staging_database.name }};
CREATE TABLE IF NOT EXISTS {{ table.destination.name }}_kudu
CREATE TABLE IF NOT EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %}
{%- set ordered_columns = order_columns(table.primary_keys,table.columns) -%}
({%- for column in ordered_columns %}
{{ column.name }} {{ map_datatypes(column).kudu }}
`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column).kudu }}
{%- if not loop.last -%},{% endif %}
{%- endfor %},
primary key ({{ table.primary_keys|join(', ') }}))
PARTITION BY HASH({{ table.kudu.hash_by|join(', ') }}) PARTITIONS {{ table.kudu.num_partitions }}
{%- if table.kudu.hash_by or table.kudu.range %}
PARTITION BY
{%- endif %}
{%- if table.kudu.hash_by %}
HASH({{ table.kudu.hash_by|join(', ') }}) PARTITIONS {{ table.kudu.num_partitions }} {%- if table.kudu.range %} ,{%- endif %}
{%- endif %}
{%- if table.kudu.range %}
RANGE ({{ table.kudu.range|join(', ') }})
(
{{ table.kudu.ranges|join(', ') }}
)
{%- endif %}
COMMENT '{{ table.comment }}'
STORED AS KUDU
TBLPROPERTIES(
Expand All @@ -31,5 +42,5 @@ TBLPROPERTIES(
{%- endfor %}
{%- endif %}
{%- for column in table.columns -%}
'{{ column.name|lower }}' = "{{ column.comment }}"{%- if not loop.last -%},{% endif %}
"{{ cleanse_column(column.name)|lower }}" = "{{ column.comment }}"{%- if not loop.last -%},{% endif %}
{%- endfor -%})
2 changes: 1 addition & 1 deletion templates/shared/kudu-table-drop.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

-- Drop the Kudu table
USE {{ conf.staging_database.name }};
DROP TABLE IF EXISTS {{ table.destination.name }}_kudu PURGE;
DROP TABLE IF EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %} PURGE;
4 changes: 2 additions & 2 deletions templates/shared/kudu-table-rowcount.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
limitations under the License. #}

USE {{ conf.staging_database.name }};
INVALIDATE METADATA {{ table.destination.name }}_kudu;
SELECT COUNT(*) FROM {{ table.destination.name }}_kudu;
INVALIDATE METADATA {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %};
SELECT COUNT(*) FROM {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %};
2 changes: 1 addition & 1 deletion templates/shared/parquet-refresh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
-- Create a Parquet table in Impala
SET SYNC_DDL=1;
USE {{ conf.staging_database.name }};
REFRESH {{ table.destination.name }}_parquet;
REFRESH {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %};
31 changes: 31 additions & 0 deletions templates/shared/parquet-table-alter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{#- Copyright 2017 Cargill Incorporated

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. #}

-- Create a Parquet table in Impala
set sync_ddl=1;
USE {{ conf.staging_database.name }};
{%- for column in table.columns %}
ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}
CHANGE {{ cleanse_column(column.name) }} {{ cleanse_column(column.name) }} {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}";
{%- endfor %})

{%- if table.metadata %}
ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}
SET TBLPROPERTIES (
{%- for key, value in table.metadata.items() %}
'{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %}
{%- endfor %}
)
{%- endif %}

4 changes: 2 additions & 2 deletions templates/shared/parquet-table-create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
-- Create a Parquet table in Impala
set sync_ddl=1;
USE {{ conf.staging_database.name }};
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_parquet (
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %} (
{%- for column in table.columns %}
{{ column.name }} {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}"
{{ cleanse_column(column.name) }} {{ map_datatypes_v2(column).parquet }} COMMENT "{{ column.comment }}"
{%- if not loop.last -%}, {% endif %}
{%- endfor %})
COMMENT '{{ table.comment }}'
Expand Down
2 changes: 1 addition & 1 deletion templates/shared/parquet-table-drop.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

-- Drop the Impala Parquet table
USE {{ conf.staging_database.name }};
DROP TABLE IF EXISTS {{ table.destination.name }}_parquet;
DROP TABLE IF EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %};
4 changes: 2 additions & 2 deletions templates/shared/parquet-table-rowcount.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@

-- Query Parquet table in Impala
USE {{ conf.staging_database.name }};
INVALIDATE METADATA {{ table.destination.name }}_parquet;
SELECT COUNT(*) FROM {{ table.destination.name }}_parquet;
INVALIDATE METADATA {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how this user_defined block is used and what it's for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raises a flag because it will break existing scripts that are expecting the _parquet suffix

SELECT COUNT(*) FROM {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %};

4 changes: 2 additions & 2 deletions templates/sqoop-parquet-full-load/avro-table-create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
-- Create a Parquet table in Impala
set sync_ddl=1;
USE {{ conf.raw_database.name }};
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_avro (
CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}_avro` (
{% for column in table.columns %}
`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).avro }} COMMENT "{{ column.comment }}"
`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column).avro }} COMMENT "{{ column.comment }}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of cleanse_column

{%- if not loop.last -%}, {% endif %}
{%- endfor %})
COMMENT '{{ table.comment }}'
Expand Down
4 changes: 2 additions & 2 deletions templates/sqoop-parquet-full-load/copy-avsc.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
hadoop fs -mkdir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/
hadoop fs -put -f AutoGeneratedSchema.avsc {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/{{ table.destination.name }}.avsc
hadoop fs -mkdir -p {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure -p is the safe thing to do here. If the user has misconfigured something the application will happily keep running. What was your thought process for adding this? Can you manually create the directories first?

hadoop fs -put -f AutoGeneratedSchema.avsc {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/{{ table.destination.name }}.avsc
14 changes: 7 additions & 7 deletions templates/sqoop-parquet-full-load/insert-overwrite.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
INSERT OVERWRITE TABLE {{ conf.raw_database.name }}.{{ table.destination.name }}_partitioned PARTITION (ingest_partition=${var:val})
INSERT OVERWRITE TABLE `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_partitioned` PARTITION (ingest_partition=${var:val})
SELECT {% for column in table.columns %}
{%- if column["datatype"].lower() == "decimal" %}
cast (`{{ column.name.replace('/','_') }}` as decimal({{column.precision}}, {{column.scale}}) )
{%- else %} `{{ column.name.replace('/','_') }}`
{% endif %}
{%- if not loop.last -%}, {% endif %}
{%- if column["datatype"].lower() == "decimal" %} cast (`{{ cleanse_column(column.name) }}` as decimal({{column.precision}}, {{column.scale}}) )
{%- elif column["datatype"].lower() == "timestamp" %} cast (`{{ cleanse_column(column.name) }}`/1000 as timestamp ) AS `{{ cleanse_column(column.name) }}`
{%- else %} `{{ cleanse_column(column.name) }}`
{%- endif %}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
FROM {{ conf.raw_database.name }}.{{ table.destination.name }}_avro;
FROM `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_avro`;
14 changes: 5 additions & 9 deletions templates/sqoop-parquet-full-load/partitioned-table-create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

-- Create a Parquet table in Impala
set sync_ddl=1;
USE {{ conf.raw_database.name }};
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_partitioned (
{% for column in table.columns %}
{%- if column["datatype"].lower() == "decimal" %}
`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }}({{column.precision}},{{column.scale}}) COMMENT "{{ column.comment }}"
{%- else %} `{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}"
{% endif %}
{%- if not loop.last -%}, {% endif %}
USE `{{ conf.raw_database.name }}`;
CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}_partitioned` (
{%- for column in table.columns %}
`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'parquet') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %}
{%- endfor %})
PARTITIONED BY (ingest_partition int)
COMMENT '{{ table.comment }}'
Expand All @@ -33,4 +29,4 @@ TBLPROPERTIES(
'{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- endif %}
14 changes: 5 additions & 9 deletions templates/sqoop-parquet-full-load/report-table-create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

-- Create a Parquet table in Impala
set sync_ddl=1;
USE {{ conf.staging_database.name }};
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }} (
{% for column in table.columns %}
{%- if column["datatype"].lower() == "decimal" %}
`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }}({{column.precision}},{{column.scale}}) COMMENT "{{ column.comment }}"
{%- else %} `{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}"
{% endif %}
{%- if not loop.last -%}, {% endif %}
USE `{{ conf.staging_database.name }}`;
CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}` (
{%- for column in table.columns %}
`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'parquet') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed the decimal logic, does map_datatypes_v2 handle the decimal type?

{%- endfor %})
COMMENT '{{ table.comment }}'
STORED AS PARQUET
Expand All @@ -32,4 +28,4 @@ TBLPROPERTIES(
'{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- endif %}
47 changes: 17 additions & 30 deletions templates/sqoop-parquet-full-load/sqoop-import.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,37 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. #}
{# This function will put the --map-column-java col=String parameter for any clob data types.#}
{% macro map_clobs_macro(columns) -%}
{{ map_clobs(columns) }}
{%- endmacro -%}
# Create a Sqoop job
set -eu
{% set mapcolumn = [] %}
{%- for column in table.columns -%}
{%- if column["datatype"].lower() == "varbinary" or column["datatype"].lower() == "binary" or column["datatype"].lower() == "longvarbinary" -%}
{%- set mapcolumn = mapcolumn.append(column["name"]) -%}
{%- endif -%}
{%- endfor -%}
sqoop import \
-D 'mapred.job.name={{ conf.source_database.name }}.{{ table.source.name }}.{{ conf.sqoop_job_name_suffix }}' \
--connect '{{ conf.source_database.connection_string }}' \
--username '{{ conf.user_name }}' \
--password-file '{{ conf.sqoop_password_file }}' \
{%- if conf["sqoop_driver"] is defined %}
--driver {{ conf.sqoop_driver }} \
{%- endif %}
{% if mapcolumn|length > 0 -%}
--map-column-java {% for column in mapcolumn -%}
{% if loop.last -%}
{{ '"{}"'.format(column) }}=String \
{%- else -%}
{{ '"{}"'.format(column) }}=String,
{%- endif -%}
{% endfor %}
{% endif -%}
{%- set map_java_column = sqoop_map_java_column(table.columns,clean_column=True) %}
{%- if map_java_column %}
{{ map_java_column }} \
{%- endif %}
--delete-target-dir \
--target-dir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/ \
--temporary-rootdir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/ \
--as-avrodatafile \
--fetch-size {% if table.columns|length < 30 -%} 10000 {% else %} 5000 {% endif %} \
--compress \
--compression-codec snappy \
-m 1 \
{%- if conf["sqoop_driver"] is defined %}
{%- if "sqlserver" in conf["sqoop_driver"].lower() -%}
--query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ '"{}"'.format(column.name) }} {% else %} {{ '"{}",'.format(column.name) }} {% endif %} {% endfor %} FROM {{ table.source.name }} WHERE $CONDITIONS'
{%- elif "sap" in conf["sqoop_driver"].lower() -%}
--query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ '"{}"'.format(column.name) }} {% else %} {{ '"{}",'.format(column.name) }} {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{%- if table.num_mappers > 1 %}
--split-by {{ table.split_by_column }} \
--boundary-query 'SELECT min({{ table.split_by_column }}), max({{ table.split_by_column }}) FROM {{ conf.source_database.name }}.{{ table.source.name }}' \{%- endif %}
-m {{ table.num_mappers or 1 }} \
{% if conf["sqoop_driver"] is defined %}
{%- if "sqlserver" in conf["sqoop_driver"].lower() -%}
--query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{%- else -%}
--query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{% endif -%}
{%- else -%}
--query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ column.name }} {% else %} {{ column.name }}, {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{% endif -%}
{%- else %}
--query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ column.name }} {% else %} {{ column.name }}, {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{%- endif -%}
--query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS'
{%- endif -%}
6 changes: 3 additions & 3 deletions templates/sqoop-parquet-full-load/tables-drop.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
DROP TABLE IF EXISTS {{ conf.raw_database.name }}.{{ table.destination.name }}_avro;
DROP TABLE IF EXISTS {{ conf.raw_database.name }}.{{ table.destination.name }}_partitioned;
DROP TABLE IF EXISTS {{ conf.staging_database.name }}.{{ table.destination.name }};
DROP TABLE IF EXISTS `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_avro`;
DROP TABLE IF EXISTS `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_partitioned`;
DROP TABLE IF EXISTS `{{ conf.staging_database.name }}`.`{{ table.destination.name }}`;
5 changes: 2 additions & 3 deletions templates/sqoop-parquet-full-load/test-rowcount.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ set -e
# Check parquet table
AVRO=$({{ conf.impala_cmd }} avro-table-rowcount.sql -B 2> /dev/null)
PARQUET=$({{ conf.impala_cmd }} report-table-rowcount.sql -B 2> /dev/null)
SOURCE=$(cat sourceCount.txt)

SOURCE=$({{ conf.source_database.cmd }} source-table-rowcount.sql -s -r -N -B 2> /dev/null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is stderr piped to /dev/null

echo "avro count: $AVRO"
echo "report count: $PARQUET"
echo "source count: $SOURCE"
Expand All @@ -30,4 +29,4 @@ fi
if [ "$PARQUET" -ne "$SOURCE" ]; then
echo FINAL TABLE ROW COUNTS DO NOT MATCH
exit 1
fi
fi
4 changes: 2 additions & 2 deletions templates/sqoop-parquet-full-load/type-mapping.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ date:
timestamp:
kudu: bigint
impala: timestamp
parquet: bigint
parquet: timestamp
avro: bigint
datetime:
kudu: bigint
Expand Down Expand Up @@ -171,4 +171,4 @@ boolean:
kudu: boolean
impala: boolean
parquet: boolean
avro: boolean
avro: boolean
2 changes: 2 additions & 0 deletions templates/sqoop-parquet-hdfs-impala/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ hdfs-clean: hdfs-delete.sh ## Delete parquet files from HDFS

tables-clean: parquet-table-clean ## Drop all tables

tables-alter: parquet-table-alter ## Alter all tables

tables: parquet-table ## Create all tables

update: sqoop-exec ## Insert data from source db into Kudu
Expand Down
7 changes: 7 additions & 0 deletions templates/sqoop-parquet-hdfs-impala/Makefile.meta
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ integration-test-all:
{%- for table in tables %}
$(MAKE) integration-test -C {{ table.id }}
{%- endfor %}

tables-alter-all: {%- for table in tables %} tables-alter-{{ table.id }} {%- endfor %}

{%- for table in tables %}
tables-alter-{{ table.id }}:
$(MAKE) tables-alter -C {{ table.id }}
{%- endfor %}
Loading