Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

direct access #169

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/deploy/steps/25-Provision-Service-Principal.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ $dbSpn = Get-SpnWithSecret -spnName $dbDeploySpnName -keyVaultName $keyVaultName

$mountSpn = Get-SpnWithSecret -spnName $mountSpnName -keyVaultName $keyVaultName
$secrets.addSecret("Databricks--TenantId", $tenantId)
$secrets.addSecret("Databricks--ClientId", $mountSpn.clientId)
$secrets.addSecret("Databricks--ClientSecret", $mountSpn.secretText)
$secrets.addSecret("DatabricksOauthEndpoint", "https://login.microsoftonline.com/$tenantId/oauth2/token")
$secrets.addSecret("DatabricksClientId", $mountSpn.clientId)
$secrets.addSecret("DatabricksClientSecret", $mountSpn.secretText)

# there is a chicken-and-egg problem where we want to save the new SPN secret in the
# keyvault, but the keyvault may not exist yet. This doesn't matter since the keyvault
Expand Down
26 changes: 26 additions & 0 deletions .github/deploy/steps/91-Create-SparkConf.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

Write-Host "Write cluster configuration for Direct Access..." -ForegroundColor DarkYellow

$confDirectAccess = [ordered]@{}

$confDirectAccess["spark.databricks.cluster.profile"]= "singleNode"
$confDirectAccess["spark.databricks.delta.preview.enabled"] = $true
$confDirectAccess["spark.databricks.io.cache.enabled"] = $true
$confDirectAccess["spark.master"]= "local[*, 4]"


$url = az storage account show --name $dataLakeName --resource-group $resourceGroupName --query "primaryEndpoints.dfs" --out tsv

$storageUrl = ([System.Uri]$url).Host

Write-Host " Adds Direct Access for $storageUrl..." -ForegroundColor DarkYellow

$confDirectAccess["fs.azure.account.auth.type.$storageUrl"] = "OAuth"
$confDirectAccess["fs.azure.account.oauth.provider.type.$storageUrl"] = "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
$confDirectAccess["fs.azure.account.oauth2.client.id.$storageUrl"] = "{{secrets/secrets/DatabricksClientId}}"
$confDirectAccess["fs.azure.account.oauth2.client.endpoint.$storageUrl"] = "{{secrets/secrets/DatabricksOauthEndpoint}}"
$confDirectAccess["fs.azure.account.oauth2.client.secret.$storageUrl"] = "{{secrets/secrets/DatabricksClientSecret}}"

$values.addSecret("StorageAccount--Url", $storageUrl)

Set-Content $repoRoot\.github\submit\sparkconf.json ($confDirectAccess | ConvertTo-Json)
57 changes: 31 additions & 26 deletions .github/deploy/steps/99-SetupMounts.ps1
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@

$srcDir = "$PSScriptRoot/../../.."

Push-Location -Path $srcDir

pip install dbx

dbx configure
copy "$srcDir/.github/submit/sparklibs.json" "$srcDir/tests/cluster/mount/"

$mountsJson = (,@(
@{
storageAccountName=$resourceName
secretScope="secrets"
clientIdName="Databricks--ClientId"
clientSecretName="Databricks--ClientSecret"
tenantIdName="Databricks--TenantId"
containers = [array]$($dataLakeContainers | ForEach-Object{ $_.name })
}
))

$mountsJson | ConvertTo-Json -Depth 4 | Set-Content "$srcDir/tests/cluster/mount/mounts.json"

dbx deploy --deployment-file "$srcDir/tests/cluster/mount/setup_job.yml.j2"

dbx launch --job="Setup Mounts" --trace --kill-on-sigterm

Pop-Location
### This entire file is deprecated since we now use direct access and do not mount any
### more. Nevertheless, the code is kept until we remove the class EventHubCapture
### whose code requires a mount point if it is to be tested.
#
# $srcDir = "$PSScriptRoot/../../.."
#
# Push-Location -Path $srcDir
#
# pip install dbx
#
# dbx configure
# copy "$srcDir/.github/submit/sparklibs.json" "$srcDir/tests/cluster/mount/"
#
# $mountsJson = (,@(
# @{
# storageAccountName=$resourceName
# secretScope="secrets"
# clientIdName="Databricks--ClientId"
# clientSecretName="Databricks--ClientSecret"
# tenantIdName="Databricks--TenantId"
# containers = [array]$($dataLakeContainers | ForEach-Object{ $_.name })
# }
# ))
#
# $mountsJson | ConvertTo-Json -Depth 4 | Set-Content "$srcDir/tests/cluster/mount/mounts.json"
#
# dbx deploy --deployment-file "$srcDir/tests/cluster/mount/setup_job.yml.j2"
#
# dbx launch --job="Setup Mounts" --trace --kill-on-sigterm
#
# Pop-Location
14 changes: 14 additions & 0 deletions .github/submit/sparkconf_deprecated.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*, 4]",
"spark.databricks.delta.preview.enabled": true,
"spark.databricks.io.cache.enabled": true,

"fs.azure.account.auth.type.$resourceName.dfs.core.windows.net": "OAuth",
"fs.azure.account.oauth.provider.type.$resourceName.dfs.core.windows.net": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id.$resourceName.dfs.core.windows.net": "{{ secrets/secrets/DatabricksClientId }}",
"fs.azure.account.oauth2.client.secret.$resourceName.dfs.core.windows.net": "{{ secrets/secrets/DatabricksClientSecret }}",
"fs.azure.account.oauth2.client.endpoint.$resourceName.dfs.core.windows.net": "{{ secrets/secrets/DatabricksOauthEndpoint }}"
}
TODO: fix the resource name. Not possible in this json
UPDATE: See 20-Create-SparkConf.ps1
7 changes: 1 addition & 6 deletions .github/submit/submit_test_job.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,7 @@ $run = @{
# single node cluster is sufficient
new_cluster= @{
spark_version=$sparkVersion
spark_conf= @{
"spark.databricks.cluster.profile"= "singleNode"
"spark.master"= "local[*, 4]"
"spark.databricks.delta.preview.enabled"= $true
"spark.databricks.io.cache.enabled"= $true
}
spark_conf = Get-Content "$PSScriptRoot/sparkconf.json" | ConvertFrom-Json
azure_attributes=${
"availability"= "ON_DEMAND_AZURE",
"first_on_demand": 1,
Expand Down
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ where = src
console_scripts =
python3 = atc.alias:python3
atc-dataplatform-git-hooks = atc.formatting.git_hooks:main
atc-dataplatform-mounts = atc.mount.main:main

[flake8]
exclude = .git,__pycache__,docs,build,dist,venv
Expand Down
3 changes: 2 additions & 1 deletion src/atc/delta/db_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ def drop_cascade(self) -> None:
def create(self) -> None:
sql = f"CREATE DATABASE IF NOT EXISTS {self._name} "
if self._location:
sql += f" LOCATION '{self._location}'"
sql += f' LOCATION "{self._location}"'
print("execute sql:", sql)
Spark.get().sql(sql)
11 changes: 9 additions & 2 deletions src/atc/delta/delta_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ def append(self, df: DataFrame, mergeSchema: bool = None) -> None:
return self.write_or_append(df, "append", mergeSchema=mergeSchema)

def truncate(self) -> None:
Spark.get().sql(f"TRUNCATE TABLE {self._name};")
if self._location:
Spark.get().sql(f"TRUNCATE TABLE delta.`{self._location}`;")
else:
Spark.get().sql(f"TRUNCATE TABLE {self._name};")
# if the hive table does not exit, this will give a useful error like
# pyspark.sql.utils.AnalysisException:
# Table not found for 'TRUNCATE TABLE': TestDb.TestTbl;

def drop(self) -> None:
Spark.get().sql(f"DROP TABLE IF EXISTS {self._name};")
Expand All @@ -105,7 +111,8 @@ def drop_and_delete(self) -> None:
def create_hive_table(self) -> None:
sql = f"CREATE TABLE IF NOT EXISTS {self._name} "
if self._location:
sql += f" USING DELTA LOCATION '{self._location}'"
sql += f' USING DELTA LOCATION "{self._location}"'
print("execute sql:", sql)
Spark.get().sql(sql)

def recreate_hive_table(self):
Expand Down
10 changes: 10 additions & 0 deletions src/atc/mount/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@
import json
from types import SimpleNamespace

import deprecation

import atc
from atc.atc_exceptions import AtcException
from atc.functions import init_dbutils


@deprecation.deprecated(
deprecated_in="1.0.48",
removed_in="2.0.0",
mrmasterplan marked this conversation as resolved.
Show resolved Hide resolved
current_version=atc.__version__,
details="use direct access instead. "
"See the atc-dataplatform unittests, for example.",
)
def main():
parser = argparse.ArgumentParser(description="atc-dataplatform mountpoint setup.")
parser.add_argument(
LauJohansson marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
18 changes: 18 additions & 0 deletions tests/cluster/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from atc import Configurator
from tests.cluster import values
from tests.cluster.values import storageAccountUrl


def InitConfigurator(*, clear=False):
"""This example function is how you would use the Configurator in a project."""
tc = Configurator()
if clear:
tc.clear_all_configurations()

# This is how you would set yourself up for different environments
# tc.register('ENV','dev')

tc.register("resourceName", values.resourceName())
tc.register("storageAccount", f"abfss://silver@{storageAccountUrl()}")

return tc
12 changes: 6 additions & 6 deletions tests/cluster/cosmos/test_cosmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@

import atc.cosmos
from atc import Configurator
from atc.functions import init_dbutils
from atc.spark import Spark
from tests.cluster.config import InitConfigurator
from tests.cluster.secrets import cosmosAccountKey
from tests.cluster.values import cosmosEndpoint


class TestCosmos(atc.cosmos.CosmosDb):
def __init__(self):
dbutils = init_dbutils()
super().__init__(
endpoint=dbutils.secrets.get("values", "Cosmos--Endpoint"),
account_key=dbutils.secrets.get("secrets", "Cosmos--AccountKey"),
endpoint=cosmosEndpoint(),
account_key=cosmosAccountKey(),
database="AtcCosmosContainer",
)


class CosmosTests(unittest.TestCase):
def test_01_tables(self):
tc = Configurator()
tc.clear_all_configurations()
tc = InitConfigurator(clear=True)
tc.register(
"CmsTbl",
{
Expand Down
47 changes: 40 additions & 7 deletions tests/cluster/delta/test_delta_class.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
import time
import unittest

from py4j.protocol import Py4JJavaError
from pyspark.sql.utils import AnalysisException

from atc import Configurator
from atc.delta import DbHandle, DeltaHandle
from atc.etl import Orchestrator
from atc.etl.extractors import SimpleExtractor
from atc.etl.loaders import SimpleLoader
from atc.functions import init_dbutils
from atc.spark import Spark
from tests.cluster.config import InitConfigurator


class DeltaTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
Configurator().clear_all_configurations()
InitConfigurator(clear=True)

def test_01_configure(self):
tc = Configurator()

tc.register(
"MyDb", {"name": "TestDb{ID}", "path": "/mnt/atc/silver/testdb{ID}"}
"MyDb", {"name": "TestDb{ID}", "path": "{storageAccount}/testdb{ID}"}
)

tc.register(
"MyTbl",
{
"name": "TestDb{ID}.TestTbl",
"path": "/mnt/atc/silver/testdb{ID}/testtbl",
"name": "{MyDb}.TestTbl",
"path": "{MyDb_path}/testtbl",
},
)

tc.register(
"MyTbl2",
{
"name": "TestDb{ID}.TestTbl2",
"name": "{MyDb}.TestTbl2",
},
)

tc.register(
"MyTbl3",
{
"path": "/mnt/atc/silver/testdb/testtbl3",
"path": "{storageAccount}/testdb/testtbl3",
},
)

Expand Down Expand Up @@ -67,12 +72,40 @@ def test_02_write(self):

dh.append(df, mergeSchema=True)

# @unittest.skip("Flaky test")
def test_03_create(self):
# print(Configurator().get_all_details())
# print(
# {
# k: v[:-15] + v[-12:]
# for k, v in Spark.get().sparkContext.getConf().getAll()
# if k.startswith("fs.azure.account")
# }
# )

db = DbHandle.from_tc("MyDb")
db.create()

dh = DeltaHandle.from_tc("MyTbl")
dh.create_hive_table()
tc = Configurator()
print(init_dbutils().fs.ls(tc.get("MyTbl", "path")))
print(
init_dbutils().fs.put(
tc.get("MyTbl", "path") + "/some.file.txt", "Hello, ATC!", True
)
)
print(init_dbutils().fs.ls(tc.get("MyTbl", "path")))
for i in range(10, 0, -1):
try:
dh.create_hive_table()
break
except (AnalysisException, Py4JJavaError) as e:
if i > 0:
print(e)
print("trying again in 10 seconds")
time.sleep(10)
else:
raise e

# test hive access:
df = Spark.get().table("TestDb.TestTbl")
Expand Down
4 changes: 2 additions & 2 deletions tests/cluster/delta/test_sparkexecutor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest

from atc import Configurator
from atc.delta import DbHandle, DeltaHandle
from atc.spark import Spark
from tests.cluster.config import InitConfigurator
from tests.cluster.delta import extras
from tests.cluster.delta.SparkExecutor import SparkSqlExecutor

Expand All @@ -16,7 +16,7 @@ class DeliverySparkExecutorTests(unittest.TestCase):
def setUpClass(cls):

# Register the delivery table for the table configurator
cls.tc = Configurator()
cls.tc = InitConfigurator()
cls.tc.add_resource_path(extras)
cls.tc.set_debug()

Expand Down
4 changes: 2 additions & 2 deletions tests/cluster/eh/AtcEh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
This file sets up the EventHub that is deployed as part of the atc integration pipeline
"""
from atc.eh import EventHubStream
from atc.functions import init_dbutils
from tests.cluster.secrets import eventHubConnection


class AtcEh(EventHubStream):
def __init__(self):
super().__init__(
connection_str=init_dbutils().secrets.get("secrets", "EventHubConnection"),
connection_str=eventHubConnection(),
entity_path="atceh",
consumer_group="$Default",
)
4 changes: 2 additions & 2 deletions tests/cluster/eh/test_eh_json_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from atc.eh import EventHubCaptureExtractor
from atc.orchestrators.ehjson2delta.EhJsonToDeltaExtractor import EhJsonToDeltaExtractor
from atc.spark import Spark
from tests.cluster.config import InitConfigurator


class JsonEhOrchestratorUnitTests(unittest.TestCase):
tc: TableConfigurator

@classmethod
def setUpClass(cls) -> None:
cls.tc = TableConfigurator()
cls.tc.clear_all_configurations()
cls.tc = InitConfigurator(clear=True)
cls.tc.register("TblYMD", {"name": "TableYMD"})
cls.tc.register("TblYMDH", {"name": "TableYMDH"})
cls.tc.register("TblPdate", {"name": "TablePdate"})
Expand Down
Loading