Skip to content

Commit

Permalink
Merge pull request #29 from exasol/refactoring/after-debugging-session
Browse files Browse the repository at this point in the history
Some refactorings after debugging session
  • Loading branch information
morazow authored May 17, 2019
2 parents ffdb138 + b849ad4 commit 81ec798
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 78 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH
S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY'
S3_SECRET_KEY = 'MY_AWS_SECRET_KEY'
S3_ENDPOINT = 's3.MY_REGION.amazonaws.com'
PARALLELISM = 'nproc()';
PARALLELISM = 'iproc(), floor(random()*4)';
```

Please change the paths and parameters accordingly.
Expand Down Expand Up @@ -150,12 +150,14 @@ The following table shows currently supported features with the latest realese.
The following configuration parameters should be provided when using the
cloud-storage-etl-udfs.

| Parameter | Default | Description
|:-------------------------------|:--------------|:--------------------------------------------------------------------------------------------------------|
|``BUCKET_PATH`` |*<none>* |A path to the data bucket. It should start with cloud storage system specific schema, for example `s3a`. |
|``DATA_FORMAT`` |``PARQUET`` |The data storage format in the provided path. |
|``PARALLELISM`` |``nproc()`` |The number of parallel instances to be started for loading data. |
|``storage specific parameters`` |*<none>* |These are parameters for specific cloud storage for authentication purpose. |
| Parameter | Default | Description
|:-------------------------------|:---------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------|
|``BUCKET_PATH`` |*<none>* |A path to the data bucket. It should start with cloud storage system specific schema, for example `s3a`. |
|``DATA_FORMAT`` |``PARQUET`` |The data storage format in the provided path. |
|``PARALLELISM IN IMPORT`` |``nproc()`` |The number of parallel instances to be started for importing data. *Please multiply this to increase the parallelism*. |
|``PARALLELISM IN EXPORT`` |``iproc()`` |The parallel instances for exporting data. *Add another random number to increase the parallelism per node*. For example, ``iproc(), floor(random()*4)``. |
|``PARQUET_COMPRESSION_CODEC`` |``uncompressed``|The compression codec to use when exporting the data into parquet files. Other options are: `snappy`, `gzip` and `lzo`. |
|``storage specific parameters`` |*<none>* |These are parameters for specific cloud storage for authentication purpose. |

Please see [the parameters specific for each cloud storage and how to configure
them here](./docs/overview.md).
Expand Down
8 changes: 4 additions & 4 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH
S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY'
S3_SECRET_KEY = 'MY_AWS_SECRET_KEY'
S3_ENDPOINT = 's3.MY_REGION.amazonaws.com'
PARALLELISM = 'nproc()';
PARALLELISM = 'iproc(), floor(random()*4)';
```

## Google Cloud Storage
Expand Down Expand Up @@ -97,7 +97,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH
BUCKET_PATH = 'gs://google-storage-path/parquet/retail/sales_positions/'
GCS_PROJECT_ID = 'MY_GCS_PORJECT_ID'
GCS_KEYFILE_PATH = 'MY_BUCKETFS_PATH/project-id-service-keyfile.json'
PARALLELISM = 'nproc()';
PARALLELISM = 'iproc()';
```

## Azure Blob Storage
Expand Down Expand Up @@ -129,7 +129,7 @@ INTO SCRIPT ETL.EXPORT_PATH WITH
BUCKET_PATH = 'wasbs://CONTAINER@AZURE_ACCOUNT_NAME.blob.core.windows.net/parquet/sales-positions/'
AZURE_ACCOUNT_NAME = 'AZURE_ACCOUNT_NAME'
AZURE_SECRET_KEY = 'AZURE_SECRET_KEY'
PARALLELISM = 'nproc()';
PARALLELISM = 'iproc()';
```

## Azure Data Lake (Gen1) Storage
Expand Down Expand Up @@ -173,5 +173,5 @@ INTO SCRIPT ETL.EXPORT_PATH WITH
AZURE_CLIENT_ID = 'AZURE_CLIENT_ID'
AZURE_CLIENT_SECRET = 'AZURE_CLIENT_SECRET'
AZURE_DIRECTORY_ID = 'AZURE_DIRECTORY_ID'
PARALLELISM = 'nproc()';
PARALLELISM = 'iproc()';
```
6 changes: 6 additions & 0 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final case class AzureAdlsBucket(path: String, params: Map[String, String]) exte
* Additionally validates that all required parameters are available
* in order to create a configuration.
*/
override def createConfiguration(): Configuration = {
override def getConfiguration(): Configuration = {
validate()

val conf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final case class AzureBlobBucket(path: String, params: Map[String, String]) exte
* Additionally validates that all required parameters are available
* in order to create a configuration.
*/
override def createConfiguration(): Configuration = {
override def getConfiguration(): Configuration = {
validate()

val conf = new Configuration()
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ abstract class Bucket {
* Creates a Hadoop [[org.apache.hadoop.conf.Configuration]] for this
* specific bucket type.
*/
def createConfiguration(): Configuration
def getConfiguration(): Configuration

/**
* The Hadoop [[org.apache.hadoop.fs.FileSystem]] for this specific
* bucket path.
*/
lazy val fileSystem: FileSystem =
FileSystem.get(new URI(bucketPath), createConfiguration())
final lazy val fileSystem: FileSystem =
FileSystem.get(new URI(bucketPath), getConfiguration())

/**
* Get the all the paths in this bucket path.
Expand Down Expand Up @@ -87,7 +87,7 @@ object Bucket extends LazyLogging {

/**
* An apply method that creates different [[Bucket]] classes depending
* on the path schema.
* on the path scheme.
*
* @param params The key value parameters
* @return A [[Bucket]] class for the given path
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu
* Additionally validates that all required parameters are available
* in order to create a configuration.
*/
override def createConfiguration(): Configuration = {
override def getConfiguration(): Configuration = {
validate()

val conf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final case class LocalBucket(path: String, params: Map[String, String]) extends
override def validate(): Unit = ()

/** @inheritdoc */
override def createConfiguration(): Configuration = {
override def getConfiguration(): Configuration = {
validate()
new Configuration()
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc
* Additionally validates that all required parameters are available
* in order to create a configuration.
*/
override def createConfiguration(): Configuration = {
override def getConfiguration(): Configuration = {
validate()

val conf = new Configuration()
Expand Down
26 changes: 25 additions & 1 deletion src/main/scala/com/exasol/cloudetl/data/Row.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
package com.exasol.cloudetl.data

final case class Row(val values: Seq[Any])
/**
* The internal class that holds column data in an array.
*/
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
final case class Row(protected[data] val values: Seq[Any]) {

/** Checks whether the value at position {@code index} is null. */
def isNullAt(index: Int): Boolean = get(index) == null

/**
* Returns the value at position {@code index}.
*
* If the value is null, null is returned.
*/
def get(index: Int): Any = values(index)

/** Returns the value at position {@code index} casted to the type. */
@throws[ClassCastException]("When data type does not match")
def getAs[T](index: Int): T = get(index).asInstanceOf[T]

/** Returns the value array. */
def getValues(): Seq[Any] =
values

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ParquetWriteOptions {
params.get("PARQUET_BLOCK_SIZE").fold(ParquetWriter.DEFAULT_BLOCK_SIZE)(_.toInt)
val pageSize = params.get("PARQUET_PAGE_SIZE").fold(ParquetWriter.DEFAULT_PAGE_SIZE)(_.toInt)
val dictionary = params.get("PARQUET_DICTIONARY_ENCODING").fold(true)(_.toBoolean)
val validation = params.get("PARQUET_VALIDAIONT").fold(true)(_.toBoolean)
val validation = params.get("PARQUET_VALIDATION").fold(true)(_.toBoolean)

ParquetWriteOptions(blockSize, pageSize, compressionCodec, dictionary, validation)
}
Expand Down
56 changes: 28 additions & 28 deletions src/main/scala/com/exasol/cloudetl/parquet/RowWriteSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,37 @@ import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

/**
* A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]] implementation that writes
* [[com.exasol.cloudetl.data.Row]] as a Parquet data.
* A Parquet [[org.apache.parquet.hadoop.api.WriteSupport]]
* implementation that writes [[com.exasol.cloudetl.data.Row]] as a
* Parquet data.
*
* This is mostly adapted from Spark codebase:
* - org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport
*
*/
@SuppressWarnings(
Array(
"org.wartremover.warts.AsInstanceOf",
"org.wartremover.warts.Null",
"org.wartremover.warts.Var"
)
)
@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var"))
class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {

// The number bytes required for timestamp buffer in Parquet
private final val TIMESTAMP_MAX_BYTE_SIZE: Int = 12

// This is a type that is responsible for writing a value in Row values index to the
// RecordConsumer
// This is a type that is responsible for writing a value in Row
// values index to the RecordConsumer
private type RowValueWriter = (Row, Int) => Unit

// A list of `RowValueWriter`-s for each field type of Parquet `schema`
// A list of `RowValueWriter`-s for each field type of Parquet
// `schema`
private var rootFieldWriters: Array[RowValueWriter] = _

// A Parquet RecordConsumer that all values of a Row will be written
private var recordConsumer: RecordConsumer = _

// Reusable byte array used to write timestamps as Parquet INT96 values
// Reusable byte array used to write timestamps as Parquet INT96
// values
private val timestampBuffer = new Array[Byte](TIMESTAMP_MAX_BYTE_SIZE)

// Reusable byte array used to write decimal values as Parquet FIXED_LEN_BYTE_ARRAY values
// Reusable byte array used to write decimal values as Parquet
// FIXED_LEN_BYTE_ARRAY values
private val decimalBuffer =
new Array[Byte](SchemaUtil.PRECISION_TO_BYTE_SIZE(SchemaUtil.DECIMAL_MAX_PRECISION - 1))

Expand Down Expand Up @@ -83,7 +81,7 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {
while (idx < schema.getFieldCount) {
val fieldType = schema.getType(idx)
val fieldName = fieldType.getName()
if (row.values(idx) != null) {
if (!row.isNullAt(idx)) {
consumeField(fieldName, idx) {
writers(idx).apply(row, idx)
}
Expand Down Expand Up @@ -111,33 +109,33 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {
typeName match {
case PrimitiveTypeName.BOOLEAN =>
(row: Row, index: Int) =>
recordConsumer.addBoolean(row.values(index).asInstanceOf[Boolean])
recordConsumer.addBoolean(row.getAs[Boolean](index))

case PrimitiveTypeName.INT32 =>
originalType match {
case OriginalType.DATE =>
makeDateWriter()
case _ =>
(row: Row, index: Int) =>
recordConsumer.addInteger(row.values(index).asInstanceOf[Integer])
recordConsumer.addInteger(row.getAs[Integer](index))
}

case PrimitiveTypeName.INT64 =>
(row: Row, index: Int) =>
recordConsumer.addLong(row.values(index).asInstanceOf[Long])
recordConsumer.addLong(row.getAs[Long](index))

case PrimitiveTypeName.FLOAT =>
(row: Row, index: Int) =>
recordConsumer.addFloat(row.values(index).asInstanceOf[Double].floatValue)
recordConsumer.addFloat(row.getAs[Double](index).floatValue)

case PrimitiveTypeName.DOUBLE =>
(row: Row, index: Int) =>
recordConsumer.addDouble(row.values(index).asInstanceOf[Double])
recordConsumer.addDouble(row.getAs[Double](index))

case PrimitiveTypeName.BINARY =>
(row: Row, index: Int) =>
recordConsumer.addBinary(
Binary.fromReusedByteArray(row.values(index).asInstanceOf[String].getBytes)
Binary.fromReusedByteArray(row.getAs[String](index).getBytes)
)

case PrimitiveTypeName.INT96 =>
Expand All @@ -153,14 +151,14 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {

private def makeDateWriter(): RowValueWriter = (row: Row, index: Int) => {
// Write the number of days since unix epoch as integer
val date = row.values(index).asInstanceOf[java.sql.Date]
val date = row.getAs[java.sql.Date](index)
val days = DateTimeUtil.daysSinceEpoch(date)

recordConsumer.addInteger(days.toInt)
}

private def makeTimestampWriter(): RowValueWriter = (row: Row, index: Int) => {
val timestamp = row.values(index).asInstanceOf[java.sql.Timestamp]
val timestamp = row.getAs[java.sql.Timestamp](index)
val micros = DateTimeUtil.getMicrosFromTimestamp(timestamp)
val (days, nanos) = DateTimeUtil.getJulianDayAndNanos(micros)

Expand All @@ -186,17 +184,19 @@ class RowWriteSupport(schema: MessageType) extends WriteSupport[Row] {
val numBytes = SchemaUtil.PRECISION_TO_BYTE_SIZE(precision - 1)

val bytesWriter = (row: Row, index: Int) => {
val decimal = row.values(index).asInstanceOf[java.math.BigDecimal]
val decimal = row.getAs[java.math.BigDecimal](index)
val unscaled = decimal.unscaledValue()
val bytes = unscaled.toByteArray
val fixedLenBytesArray =
if (bytes.length == numBytes) {
// If the length of the underlying byte array of the unscaled `BigDecimal` happens to be
// `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`.
// If the length of the underlying byte array of the unscaled
// `BigDecimal` happens to be `numBytes`, just reuse it, so
// that we don't bother copying it to `decimalBuffer`.
bytes
} else if (bytes.length < numBytes) {
// Otherwise, the length must be less than `numBytes`. In this case we copy contents of
// the underlying bytes with padding sign bytes to `decimalBuffer` to form the result
// Otherwise, the length must be less than `numBytes`. In
// this case we copy contents of the underlying bytes with
// padding sign bytes to `decimalBuffer` to form the result
// fixed-length byte array.

// For negatives all high bits need to be 1 hence -1 used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object ExportPath {
bucket.validate()

val bucketPath = bucket.bucketPath
val parallelism = Bucket.optionalParameter(params, "PARALLELISM", "nproc()")
val parallelism = Bucket.optionalParameter(params, "PARALLELISM", "iproc()")
val rest = Bucket.keyValueMapToString(params)

val scriptSchema = exaMeta.getScriptSchema
Expand All @@ -34,15 +34,18 @@ object ExportPath {
|""".stripMargin
}

/** Returns source column names with quotes removed. */
private[this] def getSourceColumns(spec: ExaExportSpecification): Seq[String] =
spec.getSourceColumnNames.asScala
.map {
case value =>
// Remove quotes if present
getColumnName(value).replaceAll("\"", "")
}

/** Given a table name dot column name syntax (myTable.colInt), return the column name. */
/**
* Given a table name dot column name syntax (myTable.colInt), return
* the column name.
*/
private[this] def getColumnName(str: String): String = str.split("\\.") match {
case Array(colName) => colName
case Array(tblName @ _, colName) => colName
Expand Down
Loading

0 comments on commit 81ec798

Please sign in to comment.