Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

spark 11.3 testing #222

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/submit/sparklibs113.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[
{
"maven" : {
"coordinates" : "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22"
}
},
{
"maven" : {
"coordinates" : "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0"
}
},
{
"maven" : {
"coordinates" : "com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.15.0"
}
}
]
12 changes: 12 additions & 0 deletions .github/workflows/pre-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ jobs:
-testJobDetails job104.json `
-sparkLibs sparklibs104.json

- name: Launch integration tests 11.3
shell: pwsh
run: |
.github/submit/submit_test_job.ps1 `
-sparkVersion "11.3.x-scala2.12" `
-testJobDetails job113.json `
-sparkLibs sparklibs113.json

- name: Wait 2 min for things to settle
shell: pwsh
run: Start-Sleep -s 120
Expand All @@ -106,6 +114,10 @@ jobs:
shell: pwsh
run: .github/submit/fetch_test_job.ps1 -testJobDetails job104.json

- name: Fetch integration tests 11.3
shell: pwsh
run: .github/submit/fetch_test_job.ps1 -testJobDetails job113.json

- name: Delete Deployment
if: always() # this step runs even if the pipeline is manually cancelled
shell: pwsh
Expand Down
16 changes: 8 additions & 8 deletions tests/cluster/cache/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ class ChildCacher(CachedLoader):

def write_operation(self, df: DataFrame):
self.to_be_written = df
self.written = df.filter(df["b"].isin([1, 2]))
self.written = df.filter(df["beta"].isin([1, 2]))
DeltaHandle.from_tc("CachedTestTarget").append(self.written)
return self.written.withColumn("myId", f.lit(12345))

def delete_operation(self, df: DataFrame) -> DataFrame:
target_name = Configurator().table_name("CachedTestTarget")

self.to_be_deleted = df
Spark.get().sql(f"DELETE FROM {target_name} WHERE b = 8")
self.deleted = df.filter(df["b"] == 8)
Spark.get().sql(f"DELETE FROM {target_name} WHERE beta = 8")
self.deleted = df.filter(f.col["beta"] == 8)
return self.deleted


Expand Down Expand Up @@ -113,8 +113,8 @@ def setUpClass(cls) -> None:
"""
CREATE TABLE IF NOT EXISTS {CachedTest_name}
(
a STRING,
b INTEGER,
alpha STRING,
beta INTEGER,
rowHash INTEGER,
loadedTime TIMESTAMP,
deletedTime TIMESTAMP,
Expand All @@ -132,8 +132,8 @@ def setUpClass(cls) -> None:
"""
CREATE TABLE IF NOT EXISTS {CachedTestTarget_name}
(
a STRING,
b INTEGER,
alpha STRING,
beta INTEGER,
payload STRING
)
USING DELTA
Expand All @@ -146,7 +146,7 @@ def setUpClass(cls) -> None:

cls.params = CachedLoaderParameters(
cache_table_name=tc.table_name("CachedTest"),
key_cols=["a", "b"],
key_cols=["alpha", "beta"],
cache_id_cols=["myId"],
)

Expand Down