From 8da4438e88396212b12e2d61e92ad1eca2573ae1 Mon Sep 17 00:00:00 2001 From: Simon Heisterkamp Date: Thu, 10 Nov 2022 12:13:26 +0000 Subject: [PATCH] fix flaky test --- src/atc/delta/db_handle.py | 3 ++- src/atc/delta/delta_handle.py | 11 +++++++++-- tests/cluster/delta/test_delta_class.py | 17 +++++++++-------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/atc/delta/db_handle.py b/src/atc/delta/db_handle.py index aeb2ef99..801f4faf 100644 --- a/src/atc/delta/db_handle.py +++ b/src/atc/delta/db_handle.py @@ -50,5 +50,6 @@ def drop_cascade(self) -> None: def create(self) -> None: sql = f"CREATE DATABASE IF NOT EXISTS {self._name} " if self._location: - sql += f" LOCATION '{self._location}'" + sql += f' LOCATION "{self._location}"' + print("execute sql:", sql) Spark.get().sql(sql) diff --git a/src/atc/delta/delta_handle.py b/src/atc/delta/delta_handle.py index 92a962fb..2aa8f8bb 100644 --- a/src/atc/delta/delta_handle.py +++ b/src/atc/delta/delta_handle.py @@ -92,7 +92,13 @@ def append(self, df: DataFrame, mergeSchema: bool = None) -> None: return self.write_or_append(df, "append", mergeSchema=mergeSchema) def truncate(self) -> None: - Spark.get().sql(f"TRUNCATE TABLE {self._name};") + if self._path: + Spark.get().sql(f"TRUNCATE TABLE delta.`{self._path}`;") + else: + Spark.get().sql(f"TRUNCATE TABLE {self._name};") + # if the name also does not exit, this will give a useful error like + # pyspark.sql.utils.AnalysisException: + # Table not found for 'TRUNCATE TABLE': TestDb.TestTbl; def drop(self) -> None: Spark.get().sql(f"DROP TABLE IF EXISTS {self._name};") @@ -105,7 +111,8 @@ def drop_and_delete(self) -> None: def create_hive_table(self) -> None: sql = f"CREATE TABLE IF NOT EXISTS {self._name} " if self._location: - sql += f" USING DELTA LOCATION '{self._location}'" + sql += f' USING DELTA LOCATION "{self._location}"' + print("execute sql:", sql) Spark.get().sql(sql) def recreate_hive_table(self): diff --git a/tests/cluster/delta/test_delta_class.py b/tests/cluster/delta/test_delta_class.py index de5b6f4f..89b2e5aa 100644 --- a/tests/cluster/delta/test_delta_class.py +++ b/tests/cluster/delta/test_delta_class.py @@ -69,15 +69,16 @@ def test_02_write(self): dh.append(df, mergeSchema=True) + # @unittest.skip("Flaky test") def test_03_create(self): - print(Configurator().get_all_details()) - print( - { - k: v[:-15] + v[-12:] - for k, v in Spark.get().sparkContext.getConf().getAll() - if k.startswith("fs.azure.account") - } - ) + # print(Configurator().get_all_details()) + # print( + # { + # k: v[:-15] + v[-12:] + # for k, v in Spark.get().sparkContext.getConf().getAll() + # if k.startswith("fs.azure.account") + # } + # ) db = DbHandle.from_tc("MyDb") db.create()