From 0ab111ac4e3610ee2a7780ce86dafa7a194cee60 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 3 May 2019 17:40:14 +0200 Subject: [PATCH 01/10] Add extra logging information for import and export. Add the number of files per node an vm, total number of files, etc. --- .../exasol/cloudetl/scriptclasses/ExportTable.scala | 11 ++++++++--- .../exasol/cloudetl/scriptclasses/ImportFiles.scala | 5 +++-- .../cloudetl/scriptclasses/ImportMetadata.scala | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala index 0d9bf518..2e4d4612 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -16,6 +16,7 @@ import com.exasol.cloudetl.util.SchemaUtil import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.fs.Path +@SuppressWarnings(Array("org.wartremover.warts.Var")) object ExportTable extends LazyLogging { def run(meta: ExaMetadata, iter: ExaIterator): Unit = { @@ -33,16 +34,20 @@ object ExportTable extends LazyLogging { val options = ParquetWriteOptions(params) val writer = ParquetRowWriter(path, bucket.createConfiguration(), messageType, options) - logger.info(s"Starting export from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + val nodeId = meta.getNodeId + val vmId = meta.getVmId + logger.info(s"Starting export from node: $nodeId, vm: $vmId.") + var count = 0; do { val row = getRow(iter, firstColumnIdx, columns) - logger.debug(s"Writing row '$row'") writer.write(row) + count = count + 1 } while (iter.next()) writer.close() - logger.info(s"Finished exporting from node = '${meta.getNodeId}' and vm = '${meta.getVmId}'") + + logger.info(s"Exported '$count' records from node: $nodeId, vm: $vmId.") } private[this] def generateParquetFilename(meta: ExaMetadata): String = { diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index a88c9d7f..7c92e085 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -13,14 +13,15 @@ import org.apache.hadoop.fs.Path object ImportFiles extends LazyLogging { def run(meta: ExaMetadata, iter: ExaIterator): Unit = { - val bucketPath = iter.getString(0) val rest = iter.getString(1) val params = Bucket.keyValueStringToMap(rest) val format = Bucket.optionalParameter(params, "DATA_FORMAT", "PARQUET") val bucket = Bucket(params) val files = groupFiles(iter, 2) - logger.info(s"Reading file = ${files.take(5).mkString(",")} from bucket = $bucketPath") + val nodeId = meta.getNodeId + val vmId = meta.getVmId + logger.info(s"The total number of files for node: $nodeId, vm: $vmId is '${files.size}'.") val source = createSource(format, files, bucket) readAndEmit(source, iter) diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala index 26b8c9f3..c09382e9 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportMetadata.scala @@ -13,15 +13,15 @@ object ImportMetadata extends LazyLogging { val parallelism = iter.getInteger(2) logger.info( - s"Reading metadata from bucket path = $bucketPath with parallelism = ${parallelism.toString}" + s"Reading metadata from bucket path: $bucketPath with parallelism: ${parallelism.toString}" ) val rest = iter.getString(1) val params = Bucket.keyValueStringToMap(rest) - val bucket = Bucket(params) val paths = bucket.getPaths() + logger.info(s"Total number of files: ${paths.size} in bucket path: $bucketPath") paths.zipWithIndex.foreach { case (filename, idx) => From c92685fb4a6cf738747c136fd01dd51621eccd29 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 3 May 2019 17:41:14 +0200 Subject: [PATCH 02/10] Add log4j properties file to resources. Without this we cannot see the redirected logs on debug address. --- src/main/resources/log4j.properties | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/main/resources/log4j.properties diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 00000000..dc36dad0 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n From 98911b8d2435a304fc2337651b262a1e0735fc61 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 3 May 2019 17:59:01 +0200 Subject: [PATCH 03/10] Add `getAs[T]` and `isNullAt` methods to Row class. --- .../scala/com/exasol/cloudetl/data/Row.scala | 26 ++++++++++++++++- .../cloudetl/parquet/RowWriteSupport.scala | 28 ++++++++----------- .../cloudetl/scriptclasses/ImportFiles.scala | 2 +- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/data/Row.scala b/src/main/scala/com/exasol/cloudetl/data/Row.scala index aafe5ec1..254456da 100644 --- a/src/main/scala/com/exasol/cloudetl/data/Row.scala +++ b/src/main/scala/com/exasol/cloudetl/data/Row.scala @@ -1,3 +1,27 @@ package com.exasol.cloudetl.data -final case class Row(val values: Seq[Any]) +/** + * The internal class that holds column data in an array. + */ +@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) +final case class Row(protected[data] val values: Seq[Any]) { + + /** Checks whether the value at position {@code index} is null. */ + def isNullAt(index: Int): Boolean = get(index) == null + + /** + * Returns the value at position {@code index}. + * + * If the value is null, null is returned. + */ + def get(index: Int): Any = values(index) + + /** Returns the value at position {@code index} casted to the type. */ + @throws[ClassCastException]("When data type does not match") + def getAs[T](index: Int): T = get(index).asInstanceOf[T] + + /** Returns the value array. */ + def getValues(): Seq[Any] = + values + +} diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index 6d3e8109..1c106541 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -27,13 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName * - org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport * */ -@SuppressWarnings( - Array( - "org.wartremover.warts.AsInstanceOf", - "org.wartremover.warts.Null", - "org.wartremover.warts.Var" - ) -) +@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var")) class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { // The number bytes required for timestamp buffer in Parquet @@ -83,7 +77,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { while (idx < schema.getFieldCount) { val fieldType = schema.getType(idx) val fieldName = fieldType.getName() - if (row.values(idx) != null) { + if (!row.isNullAt(idx)) { consumeField(fieldName, idx) { writers(idx).apply(row, idx) } @@ -111,7 +105,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { typeName match { case PrimitiveTypeName.BOOLEAN => (row: Row, index: Int) => - recordConsumer.addBoolean(row.values(index).asInstanceOf[Boolean]) + recordConsumer.addBoolean(row.getAs[Boolean](index)) case PrimitiveTypeName.INT32 => originalType match { @@ -119,25 +113,25 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { makeDateWriter() case _ => (row: Row, index: Int) => - recordConsumer.addInteger(row.values(index).asInstanceOf[Integer]) + recordConsumer.addInteger(row.getAs[Integer](index)) } case PrimitiveTypeName.INT64 => (row: Row, index: Int) => - recordConsumer.addLong(row.values(index).asInstanceOf[Long]) + recordConsumer.addLong(row.getAs[Long](index)) case PrimitiveTypeName.FLOAT => (row: Row, index: Int) => - recordConsumer.addFloat(row.values(index).asInstanceOf[Double].floatValue) + recordConsumer.addFloat(row.getAs[Double](index).floatValue) case PrimitiveTypeName.DOUBLE => (row: Row, index: Int) => - recordConsumer.addDouble(row.values(index).asInstanceOf[Double]) + recordConsumer.addDouble(row.getAs[Double](index)) case PrimitiveTypeName.BINARY => (row: Row, index: Int) => recordConsumer.addBinary( - Binary.fromReusedByteArray(row.values(index).asInstanceOf[String].getBytes) + Binary.fromReusedByteArray(row.getAs[String](index).getBytes) ) case PrimitiveTypeName.INT96 => @@ -153,14 +147,14 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { private def makeDateWriter(): RowValueWriter = (row: Row, index: Int) => { // Write the number of days since unix epoch as integer - val date = row.values(index).asInstanceOf[java.sql.Date] + val date = row.getAs[java.sql.Date](index) val days = DateTimeUtil.daysSinceEpoch(date) recordConsumer.addInteger(days.toInt) } private def makeTimestampWriter(): RowValueWriter = (row: Row, index: Int) => { - val timestamp = row.values(index).asInstanceOf[java.sql.Timestamp] + val timestamp = row.getAs[java.sql.Timestamp](index) val micros = DateTimeUtil.getMicrosFromTimestamp(timestamp) val (days, nanos) = DateTimeUtil.getJulianDayAndNanos(micros) @@ -186,7 +180,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1) val bytesWriter = (row: Row, index: Int) => { - val decimal = row.values(index).asInstanceOf[java.math.BigDecimal] + val decimal = row.getAs[java.math.BigDecimal](index) val unscaled = decimal.unscaledValue() val bytes = unscaled.toByteArray val fixedLenBytesArray = diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index 7c92e085..15d61327 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -52,7 +52,7 @@ object ImportFiles extends LazyLogging { private[this] def readAndEmit(src: Source, ctx: ExaIterator): Unit = src.stream.foreach { iter => iter.foreach { row => - val columns: Seq[Object] = row.values.map(_.asInstanceOf[AnyRef]) + val columns: Seq[Object] = row.getValues().map(_.asInstanceOf[AnyRef]) ctx.emit(columns: _*) } } From 1bb393381fa5a3bdf1bd9e0fb0a2d2fec4472ca3 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 6 May 2019 16:34:51 +0200 Subject: [PATCH 04/10] Fix parallelism in export. Change default value to `iproc()`. --- README.md | 15 ++++++++------- docs/overview.md | 8 ++++---- .../cloudetl/scriptclasses/ExportPath.scala | 2 +- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 09cbdb2f..e53acc05 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' - PARALLELISM = 'nproc()'; + PARALLELISM = 'iproc(), floor(random()*4)'; ``` Please change the paths and parameters accordingly. @@ -150,12 +150,13 @@ The following table shows currently supported features with the latest realese. The following configuration parameters should be provided when using the cloud-storage-etl-udfs. -| Parameter | Default | Description -|:-------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------| -|``BUCKET_PATH`` |** |A path to the data bucket. It should start with cloud storage system specific schema, for example `s3a`. | -|``DATA_FORMAT`` |``PARQUET`` |The data storage format in the provided path. | -|``PARALLELISM`` |``nproc()`` |The number of parallel instances to be started for loading data. | -|``storage specific parameters`` |** |These are parameters for specific cloud storage for authentication purpose. | +| Parameter | Default | Description +|:-------------------------------|:---------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------| +|``BUCKET_PATH`` |** |A path to the data bucket. It should start with cloud storage system specific schema, for example `s3a`. | +|``DATA_FORMAT`` |``PARQUET`` |The data storage format in the provided path. | +|``PARALLELISM IN IMPORT`` |``nproc()`` |The number of parallel instances to be started for importing data. *Please multiply this to increase the parallelism*. | +|``PARALLELISM IN EXPORT`` |``iproc()`` |The parallel instances for exporting data. *Add another random number to increase the parallelism per node*. For example, ``iproc(), floor(random()*4)``. | +|``storage specific parameters`` |** |These are parameters for specific cloud storage for authentication purpose. | Please see [the parameters specific for each cloud storage and how to configure them here](./docs/overview.md). diff --git a/docs/overview.md b/docs/overview.md index d7ba1982..a4042378 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -52,7 +52,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY' S3_SECRET_KEY = 'MY_AWS_SECRET_KEY' S3_ENDPOINT = 's3.MY_REGION.amazonaws.com' - PARALLELISM = 'nproc()'; + PARALLELISM = 'iproc(), floor(random()*4)'; ``` ## Google Cloud Storage @@ -97,7 +97,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH BUCKET_PATH = 'gs://google-storage-path/parquet/retail/sales_positions/' GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID' GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json' - PARALLELISM = 'nproc()'; + PARALLELISM = 'iproc()'; ``` ## Azure Blob Storage @@ -129,7 +129,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH BUCKET_PATH = 'wasbs://CONTAINER@AZURE_ACCOUNT_NAME.blob.core.windows.net/parquet/sales-positions/' AZURE_ACCOUNT_NAME = 'AZURE_ACCOUNT_NAME' AZURE_SECRET_KEY = 'AZURE_SECRET_KEY' - PARALLELISM = 'nproc()'; + PARALLELISM = 'iproc()'; ``` ## Azure Data Lake (Gen1) Storage @@ -173,5 +173,5 @@ INTO SCRIPT ETL.EXPORT_PATH WITH AZURE_CLIENT_ID = 'AZURE_CLIENT_ID' AZURE_CLIENT_SECRET = 'AZURE_CLIENT_SECRET' AZURE_DIRECTORY_ID = 'AZURE_DIRECTORY_ID' - PARALLELISM = 'nproc()'; + PARALLELISM = 'iproc()'; ``` diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala index 9effec62..227de4f8 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -15,7 +15,7 @@ object ExportPath { bucket.validate() val bucketPath = bucket.bucketPath - val parallelism = Bucket.optionalParameter(params, "PARALLELISM", "nproc()") + val parallelism = Bucket.optionalParameter(params, "PARALLELISM", "iproc()") val rest = Bucket.keyValueMapToString(params) val scriptSchema = exaMeta.getScriptSchema From 6784d0361a10bfe0219b00a65dfa6c8452f4d25b Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 6 May 2019 16:35:34 +0200 Subject: [PATCH 05/10] Fix typo in parquet writer validation parameter. --- .../scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala index 9ad6db03..5bcf71bc 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala @@ -27,7 +27,7 @@ object ParquetWriteOptions { params.get("PARQUET_BLOCK_SIZE").fold(ParquetWriter.DEFAULT_BLOCK_SIZE)(_.toInt) val pageSize = params.get("PARQUET_PAGE_SIZE").fold(ParquetWriter.DEFAULT_PAGE_SIZE)(_.toInt) val dictionary = params.get("PARQUET_DICTIONARY_ENCODING").fold(true)(_.toBoolean) - val validation = params.get("PARQUET_VALIDAIONT").fold(true)(_.toBoolean) + val validation = params.get("PARQUET_VALIDATION").fold(true)(_.toBoolean) ParquetWriteOptions(blockSize, pageSize, compressionCodec, dictionary, validation) } From 765957c2a9a012c4efb16c4e79cc850856e2bcc5 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Mon, 6 May 2019 16:55:58 +0200 Subject: [PATCH 06/10] Fix tests in export. Expected 'iproc()' instead of 'nproc()'. --- .../com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala index 823af5c0..6f61730c 100644 --- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportPathSuite.scala @@ -27,7 +27,7 @@ class ExportPathSuite extends BaseSuite { |FROM | DUAL |GROUP BY - | nproc(); + | iproc(); |""".stripMargin assert(ExportPath.generateSqlForExportSpec(exaMeta, exaSpec) === sqlExpected) From 3a4202a2d6cc99aa1c7096a666168a1f0a418faf Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 16 May 2019 09:57:22 +0200 Subject: [PATCH 07/10] Rename `createConfiguration` to `getConfiguration` method. --- .../com/exasol/cloudetl/bucket/AzureAdlsBucket.scala | 2 +- .../com/exasol/cloudetl/bucket/AzureBlobBucket.scala | 2 +- src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala | 4 ++-- src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala | 2 +- .../scala/com/exasol/cloudetl/bucket/LocalBucket.scala | 2 +- src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala | 2 +- .../com/exasol/cloudetl/scriptclasses/ExportTable.scala | 2 +- .../com/exasol/cloudetl/scriptclasses/ImportFiles.scala | 6 +++--- .../scala/com/exasol/cloudetl/bucket/BucketSuite.scala | 8 ++++---- 9 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala index ea87a4ad..39d14cc0 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala @@ -18,7 +18,7 @@ final case class AzureAdlsBucket(path: String, params: Map[String, String]) exte * Additionally validates that all required parameters are available * in order to create a configuration. */ - override def createConfiguration(): Configuration = { + override def getConfiguration(): Configuration = { validate() val conf = new Configuration() diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala index dd47fdae..4cde6695 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala @@ -18,7 +18,7 @@ final case class AzureBlobBucket(path: String, params: Map[String, String]) exte * Additionally validates that all required parameters are available * in order to create a configuration. */ - override def createConfiguration(): Configuration = { + override def getConfiguration(): Configuration = { validate() val conf = new Configuration() diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index 978ca52e..cbb7231f 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -33,14 +33,14 @@ abstract class Bucket { * Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this * specific bucket type. */ - def createConfiguration(): Configuration + def getConfiguration(): Configuration /** * The Hadoop [[org.apache.hadoop.fs.FileSystem]] for this specific * bucket path. */ lazy val fileSystem: FileSystem = - FileSystem.get(new URI(bucketPath), createConfiguration()) + FileSystem.get(new URI(bucketPath), getConfiguration()) /** * Get the all the paths in this bucket path. diff --git a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala index 76ad8c9a..d2d7b058 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala @@ -18,7 +18,7 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu * Additionally validates that all required parameters are available * in order to create a configuration. */ - override def createConfiguration(): Configuration = { + override def getConfiguration(): Configuration = { validate() val conf = new Configuration() diff --git a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala index 13bd4d53..5607cc29 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala @@ -14,7 +14,7 @@ final case class LocalBucket(path: String, params: Map[String, String]) extends override def validate(): Unit = () /** @inheritdoc */ - override def createConfiguration(): Configuration = { + override def getConfiguration(): Configuration = { validate() new Configuration() } diff --git a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala index 247c9296..797249a9 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala @@ -18,7 +18,7 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc * Additionally validates that all required parameters are available * in order to create a configuration. */ - override def createConfiguration(): Configuration = { + override def getConfiguration(): Configuration = { validate() val conf = new Configuration() diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala index 2e4d4612..100a64bc 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -32,7 +32,7 @@ object ExportTable extends LazyLogging { val path = new Path(bucketPath, parquetFilename) val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema") val options = ParquetWriteOptions(params) - val writer = ParquetRowWriter(path, bucket.createConfiguration(), messageType, options) + val writer = ParquetRowWriter(path, bucket.getConfiguration(), messageType, options) val nodeId = meta.getNodeId val vmId = meta.getVmId diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala index 15d61327..d7ce75c5 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ImportFiles.scala @@ -41,9 +41,9 @@ object ImportFiles extends LazyLogging { private[this] def createSource(format: String, files: Seq[String], bucket: Bucket): Source = { val paths = files.map(f => new Path(f)) format.toLowerCase match { - case "avro" => AvroSource(paths, bucket.fileSystem, bucket.createConfiguration()) - case "orc" => OrcSource(paths, bucket.fileSystem, bucket.createConfiguration()) - case "parquet" => ParquetSource(paths, bucket.fileSystem, bucket.createConfiguration()) + case "avro" => AvroSource(paths, bucket.fileSystem, bucket.getConfiguration()) + case "orc" => OrcSource(paths, bucket.fileSystem, bucket.getConfiguration()) + case "parquet" => ParquetSource(paths, bucket.fileSystem, bucket.getConfiguration()) case _ => throw new IllegalArgumentException(s"Unsupported storage format: '$format'") } diff --git a/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala index d53762b0..9c1b52e7 100644 --- a/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala +++ b/src/test/scala/com/exasol/cloudetl/bucket/BucketSuite.scala @@ -33,7 +33,7 @@ class BucketSuite extends FunSuite with Matchers { ) val bucket = Bucket(s3params) - val conf = bucket.createConfiguration() + val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[S3Bucket]) assert(conf.get("fs.s3a.impl") === classOf[S3AFileSystem].getName) @@ -51,7 +51,7 @@ class BucketSuite extends FunSuite with Matchers { ) val bucket = Bucket(gcsParams) - val conf = bucket.createConfiguration() + val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[GCSBucket]) assert(conf.get("fs.gs.impl") === classOf[GoogleHadoopFileSystem].getName) @@ -67,7 +67,7 @@ class BucketSuite extends FunSuite with Matchers { ) val bucket = Bucket(azureBlobParams) - val conf = bucket.createConfiguration() + val conf = bucket.getConfiguration() assert(bucket.isInstanceOf[AzureBlobBucket]) assert(conf.get("fs.azure.account.key.account1.blob.core.windows.net") === "secret") @@ -95,7 +95,7 @@ class BucketSuite extends FunSuite with Matchers { val bucket = Bucket(params) assert(bucket.isInstanceOf[AzureAdlsBucket]) - val conf = bucket.createConfiguration() + val conf = bucket.getConfiguration() val expectedSettings = Map( "fs.adl.impl" -> classOf[org.apache.hadoop.fs.adl.AdlFileSystem].getName, "fs.AbstractFileSystem.adl.impl" -> classOf[org.apache.hadoop.fs.adl.Adl].getName, From 19323d18c9915f41460103cfe301cec6b4ab1f26 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 16 May 2019 10:08:03 +0200 Subject: [PATCH 08/10] [skip ci] Refactor scaladoc comments. --- .../cloudetl/parquet/RowWriteSupport.scala | 28 +++++++++++-------- .../cloudetl/scriptclasses/ExportPath.scala | 7 +++-- .../cloudetl/scriptclasses/ExportTable.scala | 12 ++++---- .../cloudetl/source/ParquetSource.scala | 6 ++-- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala index 1c106541..55dd0da7 100644 --- a/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala +++ b/src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala @@ -20,8 +20,9 @@ import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName /** - * A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]] implementation that writes - * [[com.exasol.cloudetl.data.Row]] as a Parquet data. + * A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]] + * implementation that writes [[com.exasol.cloudetl.data.Row]] as a + * Parquet data. * * This is mostly adapted from Spark codebase: * - org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport @@ -33,20 +34,23 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { // The number bytes required for timestamp buffer in Parquet private final val TIMESTAMP_MAX_BYTE_SIZE: Int = 12 - // This is a type that is responsible for writing a value in Row values index to the - // RecordConsumer + // This is a type that is responsible for writing a value in Row + // values index to the RecordConsumer private type RowValueWriter = (Row, Int) => Unit - // A list of `RowValueWriter`-s for each field type of Parquet `schema` + // A list of `RowValueWriter`-s for each field type of Parquet + // `schema` private var rootFieldWriters: Array[RowValueWriter] = _ // A Parquet RecordConsumer that all values of a Row will be written private var recordConsumer: RecordConsumer = _ - // Reusable byte array used to write timestamps as Parquet INT96 values + // Reusable byte array used to write timestamps as Parquet INT96 + // values private val timestampBuffer = new Array[Byte](TIMESTAMP_MAX_BYTE_SIZE) - // Reusable byte array used to write decimal values as Parquet FIXED_LEN_BYTE_ARRAY values + // Reusable byte array used to write decimal values as Parquet + // FIXED_LEN_BYTE_ARRAY values private val decimalBuffer = new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(SchemaUtil.DECIMAL_MAX_PRECISION - 1)) @@ -185,12 +189,14 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] { val bytes = unscaled.toByteArray val fixedLenBytesArray = if (bytes.length == numBytes) { - // If the length of the underlying byte array of the unscaled `BigDecimal` happens to be - // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + // If the length of the underlying byte array of the unscaled + // `BigDecimal` happens to be `numBytes`, just reuse it, so + // that we don't bother copying it to `decimalBuffer`. bytes } else if (bytes.length < numBytes) { - // Otherwise, the length must be less than `numBytes`. In this case we copy contents of - // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // Otherwise, the length must be less than `numBytes`. In + // this case we copy contents of the underlying bytes with + // padding sign bytes to `decimalBuffer` to form the result // fixed-length byte array. // For negatives all high bits need to be 1 hence -1 used diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala index 227de4f8..5f12a3ca 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportPath.scala @@ -34,15 +34,18 @@ object ExportPath { |""".stripMargin } + /** Returns source column names with quotes removed. */ private[this] def getSourceColumns(spec: ExaExportSpecification): Seq[String] = spec.getSourceColumnNames.asScala .map { case value => - // Remove quotes if present getColumnName(value).replaceAll("\"", "") } - /** Given a table name dot column name syntax (myTable.colInt), return the column name. */ + /** + * Given a table name dot column name syntax (myTable.colInt), return + * the column name. + */ private[this] def getColumnName(str: String): String = str.split("\\.") match { case Array(colName) => colName case Array(tblName @ _, colName) => colName diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala index 100a64bc..b1449ed7 100644 --- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala +++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala @@ -66,14 +66,16 @@ object ExportTable extends LazyLogging { } /** - * Creates a sequence of [[ExaColumnInfo]] columns using an Exasol [[ExaMetadata]] input column - * methods. + * Creates a sequence of [[ExaColumnInfo]] columns using an Exasol + * [[ExaMetadata]] input column methods. * - * Set the name of the column using `srcColumnNames` parameter. Additionally, set the precision, - * scale and length using corresponding functions on Exasol metadata for input columns. + * Set the name of the column using `srcColumnNames` parameter. + * Additionally, set the precision, scale and length using + * corresponding functions on Exasol metadata for input columns. * * @param meta An Exasol [[ExaMetadata]] metadata - * @param srcColumnNames A sequence of column names per each input column in metadata + * @param srcColumnNames A sequence of column names per each input + * column in metadata * @param startIdx A starting integer index to reference input column * @return A sequence of [[ExaColumnInfo]] columns */ diff --git a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala b/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala index 8cd00142..510668e3 100644 --- a/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala +++ b/src/main/scala/com/exasol/cloudetl/source/ParquetSource.scala @@ -50,9 +50,9 @@ final case class ParquetSource( Iterator.continually(parquetReader.read).takeWhile(_ != null) } } catch { - case NonFatal(ex) => - logger.error(s"Could not create parquet reader for path: $path", ex); - throw ex + case NonFatal(exception) => + logger.error(s"Could not create parquet reader for path: $path", exception); + throw exception } } From dd60f7912b9b46462f16ca36a7000676feea552a Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 16 May 2019 10:09:06 +0200 Subject: [PATCH 09/10] [skip ci] Add parquet compression codec parameter to readme. --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index e53acc05..a7d98882 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,7 @@ cloud-storage-etl-udfs. |``DATA_FORMAT`` |``PARQUET`` |The data storage format in the provided path. | |``PARALLELISM IN IMPORT`` |``nproc()`` |The number of parallel instances to be started for importing data. *Please multiply this to increase the parallelism*. | |``PARALLELISM IN EXPORT`` |``iproc()`` |The parallel instances for exporting data. *Add another random number to increase the parallelism per node*. For example, ``iproc(), floor(random()*4)``. | +|``PARQUET_COMPRESSION_CODEC`` |``uncompressed``|The compression codec to use when exporting the data into parquet files. Other options are: `snappy`, `gzip` and `lzo`. | |``storage specific parameters`` |** |These are parameters for specific cloud storage for authentication purpose. | Please see [the parameters specific for each cloud storage and how to configure From b849ad43abd3fdf3df31787437189c0e23656b4a Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Thu, 16 May 2019 10:09:52 +0200 Subject: [PATCH 10/10] Make bucket fileSystem field final. --- src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala index cbb7231f..1d735815 100644 --- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala +++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala @@ -39,7 +39,7 @@ abstract class Bucket { * The Hadoop [[org.apache.hadoop.fs.FileSystem]] for this specific * bucket path. */ - lazy val fileSystem: FileSystem = + final lazy val fileSystem: FileSystem = FileSystem.get(new URI(bucketPath), getConfiguration()) /** @@ -87,7 +87,7 @@ object Bucket extends LazyLogging { /** * An apply method that creates different [[Bucket]] classes depending - * on the path schema. + * on the path scheme. * * @param params The key value parameters * @return A [[Bucket]] class for the given path