Skip to content

Commit

Permalink
Obtain account and container names from provided path in Azure Blob S…
Browse files Browse the repository at this point in the history
…torage. (#55)

* Use Azure Blob path to obtain container and account name.

When accessing Azure Blob Storage, currently we require users to provide
the `container` and `account` names as separate parameters even though
they can be found in the provided data path.

With this change it is not require to explicitly set additional
`AZURE_ACCOUNT_NAME` and `AZURE_CONTAINER_NAME` properties, these values
are obtained from `BUCKET_PATH` value.

For example, the new import statement:

```
IMPORT INTO RETAIL.SALES_POSITIONS
FROM SCRIPT ETL.IMPORT_PATH WITH
  BUCKET_PATH          = 'wasbs://<AZURE_CONTAINER_NAME>@<AZURE_ACCOUNT_NAME>.blob.core.windows.net/data/orc/*'
  AZURE_SAS_TOKEN      = '<AZURE_SAS_TOKEN>'
  DATA_FORMAT          = 'ORC'
  PARALLELISM          = 'nproc()';
```

Previously users had to set additional two properties:

```
IMPORT INTO RETAIL.SALES_POSITIONS
FROM SCRIPT ETL.IMPORT_PATH WITH
  BUCKET_PATH          = 'wasbs://<AZURE_CONTAINER_NAME>@<AZURE_ACCOUNT_NAME>.blob.core.windows.net/data/orc/*'
  DATA_FORMAT          = 'ORC'
  AZURE_ACCOUNT_NAME   = '<AZURE_ACCOUNT_NAME>'
  AZURE_CONTAINER_NAME = '<AZURE_CONTAINER_NAME>'
  AZURE_SAS_TOKEN      = '<AZURE_SAS_TOKEN>'
  PARALLELISM          = 'nproc()';
```

Fixes #50.
  • Loading branch information
morazow authored Dec 6, 2019
1 parent d17708a commit d6b912b
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 26 deletions.
24 changes: 21 additions & 3 deletions src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.exasol.cloudetl.bucket

import scala.util.matching.Regex

import com.exasol.cloudetl.storage.StorageProperties

import org.apache.hadoop.conf.Configuration
Expand All @@ -24,7 +26,7 @@ final case class AzureBlobBucket(path: String, params: StorageProperties)
* Returns the list of required property keys for Azure Blob Storage.
*/
override def getRequiredProperties(): Seq[String] =
Seq(AZURE_ACCOUNT_NAME)
Seq.empty[String]

/** @inheritdoc */
override def getSecureProperties(): Seq[String] =
Expand Down Expand Up @@ -61,10 +63,15 @@ final case class AzureBlobBucket(path: String, params: StorageProperties)
properties
}

val accountName = mergedProperties.getString(AZURE_ACCOUNT_NAME)
val accountAndContainer = regexParsePath(path)

val accountName =
mergedProperties.get(AZURE_ACCOUNT_NAME).getOrElse(accountAndContainer.accountName)

if (mergedProperties.containsKey(AZURE_SAS_TOKEN)) {
val sasToken = mergedProperties.getString(AZURE_SAS_TOKEN)
val containerName = mergedProperties.getString(AZURE_CONTAINER_NAME)
val containerName =
mergedProperties.get(AZURE_CONTAINER_NAME).getOrElse(accountAndContainer.containerName)
conf.set(s"fs.azure.sas.$containerName.$accountName.blob.core.windows.net", sasToken)
} else {
val secretKey = mergedProperties.getString(AZURE_SECRET_KEY)
Expand All @@ -74,4 +81,15 @@ final case class AzureBlobBucket(path: String, params: StorageProperties)
conf
}

private[this] final val AZURE_BLOB_PATH_REGEX: Regex =
"""wasbs?://(.*)@([^.]+).blob.core.windows.net/(.*)$""".r

private[this] def regexParsePath(path: String): AccountAndContainer = path match {
case AZURE_BLOB_PATH_REGEX(containerName, accountName, _) =>
AccountAndContainer(accountName, containerName)
case _ => throw new IllegalArgumentException(s"Invalid Azure blob wasb(s) path: $path!")
}

private[this] case class AccountAndContainer(accountName: String, containerName: String)

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class StorageProperties(
.split(";")
.map { str =>
val idx = str.indexOf('=')
if (idx < 0) {
throw new IllegalArgumentException(
"Connection object password does not contain key=value pairs!"
)
}
str.substring(0, idx) -> str.substring(idx + 1)
}
.toMap
Expand Down
62 changes: 45 additions & 17 deletions src/test/scala/com/exasol/cloudetl/bucket/AzureBlobBucketTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.hadoop.fs.azure.Wasbs
class AzureBlobBucketTest extends AbstractBucketTest {

private[this] val defaultProperties = Map(
PATH -> "wasbs://container@account1.windows.net/orc-data/",
PATH -> "wasbs://container1@account1.blob.core.windows.net/orc-data/",
FORMAT -> "ORC"
)

Expand All @@ -31,16 +31,17 @@ class AzureBlobBucketTest extends AbstractBucketTest {
}
}

test("apply throws if account name is not provided") {
properties = defaultProperties
test("apply throws if Azure Blob path is not valid") {
val path = "wasb://[email protected]/data/"
properties = defaultProperties ++ Map(PATH -> path, "AZURE_SECRET_KEY" -> "secret")
val thrown = intercept[IllegalArgumentException] {
assertAzureBlobBucket(getBucket(properties), Map.empty[String, String])
}
assert(thrown.getMessage === "Please provide a value for the AZURE_ACCOUNT_NAME property!")
assert(thrown.getMessage === s"Invalid Azure blob wasb(s) path: $path!")
}

test("apply throws if no connection name or credential (secret key or sas token) is provided") {
properties = defaultProperties ++ Map("AZURE_ACCOUNT_NAME" -> "account1")
properties = defaultProperties
val thrown = intercept[IllegalArgumentException] {
assertAzureBlobBucket(getBucket(properties), Map.empty[String, String])
}
Expand All @@ -49,7 +50,7 @@ class AzureBlobBucketTest extends AbstractBucketTest {
assert(thrown.getMessage === expected)
}

test("apply returns AzureBlobBucket with secret key") {
test("apply returns AzureBlobBucket with account name and secret key") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
"AZURE_SECRET_KEY" -> "secret"
Expand All @@ -61,18 +62,16 @@ class AzureBlobBucketTest extends AbstractBucketTest {
)
}

test("apply throws if container name is not provided when using with sas token") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
"AZURE_SAS_TOKEN" -> "token"
test("apply returns AzureBlobBucket with secret key") {
properties = defaultProperties ++ Map("AZURE_SECRET_KEY" -> "secret")
val bucket = getBucket(properties)
assertAzureBlobBucket(
bucket,
Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret")
)
val thrown = intercept[IllegalArgumentException] {
assertAzureBlobBucket(getBucket(properties), Map.empty[String, String])
}
assert(thrown.getMessage === "Please provide a value for the AZURE_CONTAINER_NAME property!")
}

test("apply returns AzureBlobBucket with sas token") {
test("apply returns AzureBlobBucket with account, container name and sas token") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
"AZURE_SAS_TOKEN" -> "token",
Expand All @@ -85,7 +84,16 @@ class AzureBlobBucketTest extends AbstractBucketTest {
)
}

test("apply returns secret from password of connection object") {
test("apply returns AzureBlobBucket with sas token") {
properties = defaultProperties ++ Map("AZURE_SAS_TOKEN" -> "token")
val bucket = getBucket(properties)
assertAzureBlobBucket(
bucket,
Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token")
)
}

test("apply returns secret from password of connection object with account name") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
"CONNECTION_NAME" -> "connection_info"
Expand All @@ -98,7 +106,17 @@ class AzureBlobBucketTest extends AbstractBucketTest {
)
}

test("apply returns sas token from password of connection object") {
test("apply returns secret from password of connection object") {
properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info")
val exaMetadata = mockConnectionInfo("", "AZURE_SECRET_KEY=secret")
val bucket = getBucket(properties, exaMetadata)
assertAzureBlobBucket(
bucket,
Map("fs.azure.account.key.account1.blob.core.windows.net" -> "secret")
)
}

test("apply returns sas token from password of connection with account, container name") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
"AZURE_CONTAINER_NAME" -> "container1",
Expand All @@ -112,6 +130,16 @@ class AzureBlobBucketTest extends AbstractBucketTest {
)
}

test("apply returns sas token from password of connection object") {
properties = defaultProperties ++ Map("CONNECTION_NAME" -> "connection_info")
val exaMetadata = mockConnectionInfo("", "AZURE_SAS_TOKEN=token")
val bucket = getBucket(properties, exaMetadata)
assertAzureBlobBucket(
bucket,
Map("fs.azure.sas.container1.account1.blob.core.windows.net" -> "token")
)
}

test("apply returns sas from connection object if both sas and secret are provided") {
properties = defaultProperties ++ Map(
"AZURE_ACCOUNT_NAME" -> "account1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,22 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach with Mockit
assert(thrown.getMessage === "Exasol metadata is None!")
}

test("getConnectionInformation returns storage connection information") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo: ExaConnectionInformation = new ExaConnectionInformation() {
final def newConnectionInformation(
username: String,
password: String
): ExaConnectionInformation =
new ExaConnectionInformation() {
override def getType(): ExaConnectionInformation.ConnectionType =
ExaConnectionInformation.ConnectionType.PASSWORD
override def getAddress(): String = ""
override def getUser(): String = "user"
override def getPassword(): String = "secret"
override def getUser(): String = username
override def getPassword(): String = password
}

test("getConnectionInformation returns storage connection information") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo = newConnectionInformation("user", "secret")
when(metadata.getConnection("connection_info")).thenReturn(connectionInfo)
assert(StorageProperties(properties, metadata).getConnectionInformation() === connectionInfo)
verify(metadata, times(1)).getConnection("connection_info")
Expand All @@ -105,6 +110,49 @@ class StoragePropertiesTest extends FunSuite with BeforeAndAfterEach with Mockit
assert(BaseProperties(properties).hasNamedConnection() === true)
}

test("merge returns StorageProperties with new properties") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo = newConnectionInformation("", "KEY1=secret1==;KEY2=sec=ret2;KEY3=secret")
when(metadata.getConnection("connection_info")).thenReturn(connectionInfo)
val storageProperties = StorageProperties(properties, metadata).merge("")
assert(storageProperties.getString("KEY1") === "secret1==")
assert(storageProperties.getString("KEY2") === "sec=ret2")
assert(storageProperties.getString("KEY3") === "secret")
}

test("merge returns with keyForUsername mapped to connection username") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo = newConnectionInformation("usernameValue", "KEY1=secret1")
when(metadata.getConnection("connection_info")).thenReturn(connectionInfo)
val storageProperties = StorageProperties(properties, metadata).merge("usernameKey")
assert(storageProperties.getString("usernameKey") === "usernameValue")
assert(storageProperties.getString("KEY1") === "secret1")
}

test("merge returns with keyForUsername -> connection username overwritted") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo =
newConnectionInformation("usernameValue", "KEY1=secret1;usernameKey=newUsername")
when(metadata.getConnection("connection_info")).thenReturn(connectionInfo)
val storageProperties = StorageProperties(properties, metadata).merge("usernameKey")
assert(storageProperties.getString("usernameKey") === "newUsername")
assert(storageProperties.getString("KEY1") === "secret1")
}

test("merge throws if it cannot find key=value pairs in connection passoword") {
properties = Map(StorageProperties.CONNECTION_NAME -> "connection_info")
val metadata = mock[ExaMetadata]
val connectionInfo = newConnectionInformation("", "secret1;key=value")
when(metadata.getConnection("connection_info")).thenReturn(connectionInfo)
val thrown = intercept[IllegalArgumentException] {
StorageProperties(properties, metadata).merge("")
}
assert(thrown.getMessage === "Connection object password does not contain key=value pairs!")
}

test("mkString returns empty string by default") {
val str = BaseProperties(properties).mkString()
assert(str.isEmpty === true)
Expand Down

0 comments on commit d6b912b

Please sign in to comment.