Skip to content

Commit

Permalink
Merge pull request #32 from exasol/feature/#28-export-batchsize
Browse files Browse the repository at this point in the history
Support user provided batch size in export
  • Loading branch information
morazow authored May 27, 2019
2 parents 81ec798 + 68b9eda commit 12e6880
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 148 deletions.
31 changes: 7 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,39 +109,21 @@ The following table shows currently supported features with the latest realese.
</tr>
<tr>
<td>Amazon S3</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
<td rowspan="4" align="center">&#10004;</td>
<td rowspan="4" align="center">&#10004;</td>
<td rowspan="4" align="center">&#10004;</td>
<td rowspan="4" align="center">&#10005;</td>
<td rowspan="4" align="center">&#10004;</td>
<td rowspan="4" align="center">&#10005;</td>
</tr>
<tr>
<td>Google Cloud Storage</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
</tr>
<tr>
<td>Azure Blob Storage</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
</tr>
<tr>
<td>Azure Data Lake (Gen1) Storage</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
<td align="center">:heavy_check_mark:</td>
<td align="center">:heavy_multiplication_x:</td>
</tr>
</table>

Expand All @@ -157,6 +139,7 @@ cloud-storage-etl-udfs.
|``PARALLELISM IN IMPORT`` |``nproc()`` |The number of parallel instances to be started for importing data. *Please multiply this to increase the parallelism*. |
|``PARALLELISM IN EXPORT`` |``iproc()`` |The parallel instances for exporting data. *Add another random number to increase the parallelism per node*. For example, ``iproc(), floor(random()*4)``. |
|``PARQUET_COMPRESSION_CODEC`` |``uncompressed``|The compression codec to use when exporting the data into parquet files. Other options are: `snappy`, `gzip` and `lzo`. |
|``EXPORT_BATCH_SIZE`` |``100000`` |The number of records per file from each vm. For exampl, if a single vm gets `1M` records, it will export ten files with default 100000 records each. |
|``storage specific parameters`` |*<none>* |These are parameters for specific cloud storage for authentication purpose. |

Please see [the parameters specific for each cloud storage and how to configure
Expand Down
2 changes: 1 addition & 1 deletion project/Compilation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object Compilation {
ExtraWart.GenTraversableLikeOps,
ExtraWart.GenTraversableOnceOps,
ExtraWart.ScalaGlobalExecutionContext,
ExtraWart.StringOpsPartial,
// ExtraWart.StringOpsPartial,
ExtraWart.ThrowablePartial,
ExtraWart.TraversableOnceOps,
ExtraWart.UnsafeContains
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")

// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.1")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.2")

// Adds Contrib Warts
// http://github.com/wartremover/wartremover-contrib/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ final case class AzureAdlsBucket(path: String, params: Map[String, String]) exte
/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(params, Bucket.AZURE_ADLS_PARAMETERS)
Bucket.validate(properties, Bucket.AZURE_ADLS_PARAMETERS)

/**
* @inheritdoc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ final case class AzureBlobBucket(path: String, params: Map[String, String]) exte
/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(params, Bucket.AZURE_BLOB_PARAMETERS)
Bucket.validate(properties, Bucket.AZURE_BLOB_PARAMETERS)

/**
* @inheritdoc
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ abstract class Bucket {
/** The path string of the bucket. */
val bucketPath: String

/** The user provided key value pair properties. */
val properties: Map[String, String]

/** Validates that all required parameter key values are available. */
def validate(): Unit

Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu
/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(params, Bucket.GCS_PARAMETERS)
Bucket.validate(properties, Bucket.GCS_PARAMETERS)

/**
* @inheritdoc
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ final case class LocalBucket(path: String, params: Map[String, String]) extends
/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params

/** @inheritdoc */
override def validate(): Unit = ()

Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc
/** @inheritdoc */
override val bucketPath: String = path

/** @inheritdoc */
override val properties: Map[String, String] = params

/** @inheritdoc */
override def validate(): Unit =
Bucket.validate(params, Bucket.S3_PARAMETERS)
Bucket.validate(properties, Bucket.S3_PARAMETERS)

/**
* @inheritdoc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ final case class ParquetWriteOptions(

object ParquetWriteOptions {

@SuppressWarnings(
Array("org.wartremover.warts.Overloading", "org.danielnixon.extrawarts.StringOpsPartial")
)
@SuppressWarnings(Array("org.wartremover.warts.Overloading"))
def apply(params: Map[String, String]): ParquetWriteOptions = {
val compressionCodec = params.getOrElse("PARQUET_COMPRESSION_CODEC", "").toUpperCase() match {
case "SNAPPY" => CompressionCodecName.SNAPPY
Expand Down
33 changes: 8 additions & 25 deletions src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala
Original file line number Diff line number Diff line change
@@ -1,60 +1,43 @@
package com.exasol.cloudetl.scriptclasses

import java.util.UUID

import scala.collection.mutable.ListBuffer

import com.exasol.ExaIterator
import com.exasol.ExaMetadata
import com.exasol.cloudetl.bucket.Bucket
import com.exasol.cloudetl.data.ExaColumnInfo
import com.exasol.cloudetl.data.Row
import com.exasol.cloudetl.parquet.ParquetRowWriter
import com.exasol.cloudetl.parquet.ParquetWriteOptions
import com.exasol.cloudetl.sink.BatchSizedSink
import com.exasol.cloudetl.util.SchemaUtil

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.Path

@SuppressWarnings(Array("org.wartremover.warts.Var"))
object ExportTable extends LazyLogging {

def run(meta: ExaMetadata, iter: ExaIterator): Unit = {
val bucketPath = iter.getString(0)
val params = Bucket.keyValueStringToMap(iter.getString(1))
val bucket = Bucket(params)

val srcColumnNames = iter.getString(2).split("\\.")
val firstColumnIdx = 3
val columns = getColumns(meta, srcColumnNames, firstColumnIdx)

val parquetFilename = generateParquetFilename(meta)
val path = new Path(bucketPath, parquetFilename)
val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema")
val options = ParquetWriteOptions(params)
val writer = ParquetRowWriter(path, bucket.getConfiguration(), messageType, options)

val nodeId = meta.getNodeId
val vmId = meta.getVmId
val columns = getColumns(meta, srcColumnNames, firstColumnIdx)

val sink = new BatchSizedSink(nodeId, vmId, iter.size(), columns, bucket)

logger.info(s"Starting export from node: $nodeId, vm: $vmId.")

var count = 0;
do {
val row = getRow(iter, firstColumnIdx, columns)
writer.write(row)
count = count + 1
sink.write(row)
} while (iter.next())

writer.close()

logger.info(s"Exported '$count' records from node: $nodeId, vm: $vmId.")
}
sink.close()

private[this] def generateParquetFilename(meta: ExaMetadata): String = {
val nodeId = meta.getNodeId
val vmId = meta.getVmId
val uuidStr = UUID.randomUUID.toString.replaceAll("-", "")
s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet"
logger.info(s"Exported '${sink.getTotalRecords()}' records from node: $nodeId, vm: $vmId.")
}

private[this] def getRow(iter: ExaIterator, startIdx: Int, columns: Seq[ExaColumnInfo]): Row = {
Expand Down
114 changes: 114 additions & 0 deletions src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.exasol.cloudetl.sink

import java.util.UUID

import com.exasol.cloudetl.bucket.Bucket
import com.exasol.cloudetl.data.ExaColumnInfo
import com.exasol.cloudetl.data.Row
import com.exasol.cloudetl.parquet.ParquetRowWriter
import com.exasol.cloudetl.parquet.ParquetWriteOptions
import com.exasol.cloudetl.util.SchemaUtil

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.Path

/**
* A specific [[Sink]] implementation with records per file request.
*
* Given the number of records for each file and total number of
* records, it is possible to balance the exported file sizes. Thus,
* small files are not created for the last records.
*/
@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var"))
final class BatchSizedSink(
nodeId: Long,
vmId: String,
numOfRecords: Long,
columns: Seq[ExaColumnInfo],
override val bucket: Bucket
) extends Sink[Row]
with LazyLogging {

// scalastyle:off null

final val DEFAULT_BATCH_SIZE: Int = 100000

private val requestedBatchSize: Int =
bucket.properties.get("EXPORT_BATCH_SIZE").fold(DEFAULT_BATCH_SIZE)(_.toInt)

private val numOfBuckets: Long = math.ceil(numOfRecords / requestedBatchSize.toDouble).toLong
private val batchSize: Long = math.floor(numOfRecords / numOfBuckets.toDouble).toLong
private var leftOvers: Long = numOfRecords % numOfBuckets

private var writer: Writer[Row] = null
private var recordsCount: Long = 0
private var totalRecords: Long = 0

/** Returns the total number of records written so far. */
def getTotalRecords(): Long = totalRecords

/** @inheritdoc */
override def createWriter(path: String): Writer[Row] = new Writer[Row] {
val newPath = new Path(bucket.bucketPath, path)
val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema")
val options = ParquetWriteOptions(bucket.properties)
val writer = ParquetRowWriter(newPath, bucket.getConfiguration(), messageType, options)

override def write(value: Row): Unit =
writer.write(value)

override def close(): Unit =
writer.close()
}

/**
* @inheritdoc
*
* We check if the number of records written so far is more than the
* next batch, if so closes current writer and creates a new one with
* a different file path.
*/
override def write(value: Row): Unit = {
if (shouldRoll()) {
openNewFile()
}
recordsCount += 1
writer.write(value)
}

/** @inheritdoc */
override def close(): Unit = {
totalRecords += recordsCount
recordsCount = 0
if (writer != null) {
writer.close()
}
}

@SuppressWarnings(Array("org.wartremover.warts.Return"))
private def shouldRoll(): Boolean = {
if (writer == null) {
return true // scalastyle:ignore return
}
if (leftOvers > 0 && recordsCount >= batchSize + 1) {
leftOvers -= 1
true
} else if (leftOvers == 0 && recordsCount >= batchSize) {
true
} else {
false
}
}

private def openNewFile(): Unit = {
close()
writer = createWriter(getNewPath())
}

private def getNewPath(): String = {
val uuidStr = UUID.randomUUID.toString.replaceAll("-", "")
s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet"
}

// scalastyle:on
}
52 changes: 52 additions & 0 deletions src/main/scala/com/exasol/cloudetl/sink/Sink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.exasol.cloudetl.sink

import com.exasol.cloudetl.bucket.Bucket

/**
* An abstract sink representation.
*/
abstract class Sink[T] {

/**
* The specific [[com.exasol.cloudetl.bucket.Bucket]] where the files
* will be exported.
*/
val bucket: Bucket

/**
* Creates a format (parquet, avro, etc) specific writer.
*
* @param path The file path this writer going to write
*/
def createWriter(path: String): Writer[T]

/**
* Writes the provided value.
*
* @param value The specific value to write
*/
def write(value: T): Unit

/**
* Finally close the resource used for this sink.
*/
def close(): Unit

}

/**
* An interface for data writers.
*/
trait Writer[T] {

/**
* Writes the provided value to the path.
*
* @param value The value to write
*/
def write(value: T): Unit

/** Closes the writer. */
def close(): Unit

}
Loading

0 comments on commit 12e6880

Please sign in to comment.