diff --git a/README.md b/README.md
index a7d98882..9ad9e09f 100644
--- a/README.md
+++ b/README.md
@@ -109,39 +109,21 @@ The following table shows currently supported features with the latest realese.
Amazon S3 |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
+ ✔ |
+ ✔ |
+ ✔ |
+ ✕ |
+ ✔ |
+ ✕ |
Google Cloud Storage |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
Azure Blob Storage |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
Azure Data Lake (Gen1) Storage |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
- :heavy_check_mark: |
- :heavy_multiplication_x: |
@@ -157,6 +139,7 @@ cloud-storage-etl-udfs.
|``PARALLELISM IN IMPORT`` |``nproc()`` |The number of parallel instances to be started for importing data. *Please multiply this to increase the parallelism*. |
|``PARALLELISM IN EXPORT`` |``iproc()`` |The parallel instances for exporting data. *Add another random number to increase the parallelism per node*. For example, ``iproc(), floor(random()*4)``. |
|``PARQUET_COMPRESSION_CODEC`` |``uncompressed``|The compression codec to use when exporting the data into parquet files. Other options are: `snappy`, `gzip` and `lzo`. |
+|``EXPORT_BATCH_SIZE`` |``100000`` |The number of records per file from each vm. For exampl, if a single vm gets `1M` records, it will export ten files with default 100000 records each. |
|``storage specific parameters`` |** |These are parameters for specific cloud storage for authentication purpose. |
Please see [the parameters specific for each cloud storage and how to configure
diff --git a/project/Compilation.scala b/project/Compilation.scala
index 53d8909e..4de11ec9 100644
--- a/project/Compilation.scala
+++ b/project/Compilation.scala
@@ -110,7 +110,7 @@ object Compilation {
ExtraWart.GenTraversableLikeOps,
ExtraWart.GenTraversableOnceOps,
ExtraWart.ScalaGlobalExecutionContext,
- ExtraWart.StringOpsPartial,
+ // ExtraWart.StringOpsPartial,
ExtraWart.ThrowablePartial,
ExtraWart.TraversableOnceOps,
ExtraWart.UnsafeContains
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 2330af03..28e6e856 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -4,7 +4,7 @@ addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")
// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
-addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.1")
+addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.2")
// Adds Contrib Warts
// http://github.com/wartremover/wartremover-contrib/
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala
index 39d14cc0..a07a5d70 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureAdlsBucket.scala
@@ -8,9 +8,12 @@ final case class AzureAdlsBucket(path: String, params: Map[String, String]) exte
/** @inheritdoc */
override val bucketPath: String = path
+ /** @inheritdoc */
+ override val properties: Map[String, String] = params
+
/** @inheritdoc */
override def validate(): Unit =
- Bucket.validate(params, Bucket.AZURE_ADLS_PARAMETERS)
+ Bucket.validate(properties, Bucket.AZURE_ADLS_PARAMETERS)
/**
* @inheritdoc
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala
index 4cde6695..62a90987 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/AzureBlobBucket.scala
@@ -8,9 +8,12 @@ final case class AzureBlobBucket(path: String, params: Map[String, String]) exte
/** @inheritdoc */
override val bucketPath: String = path
+ /** @inheritdoc */
+ override val properties: Map[String, String] = params
+
/** @inheritdoc */
override def validate(): Unit =
- Bucket.validate(params, Bucket.AZURE_BLOB_PARAMETERS)
+ Bucket.validate(properties, Bucket.AZURE_BLOB_PARAMETERS)
/**
* @inheritdoc
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
index 1d735815..d1a0272e 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
@@ -26,6 +26,9 @@ abstract class Bucket {
/** The path string of the bucket. */
val bucketPath: String
+ /** The user provided key value pair properties. */
+ val properties: Map[String, String]
+
/** Validates that all required parameter key values are available. */
def validate(): Unit
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala
index d2d7b058..f341cef8 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/GCSBucket.scala
@@ -8,9 +8,12 @@ final case class GCSBucket(path: String, params: Map[String, String]) extends Bu
/** @inheritdoc */
override val bucketPath: String = path
+ /** @inheritdoc */
+ override val properties: Map[String, String] = params
+
/** @inheritdoc */
override def validate(): Unit =
- Bucket.validate(params, Bucket.GCS_PARAMETERS)
+ Bucket.validate(properties, Bucket.GCS_PARAMETERS)
/**
* @inheritdoc
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala
index 5607cc29..8ff00b69 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/LocalBucket.scala
@@ -10,6 +10,9 @@ final case class LocalBucket(path: String, params: Map[String, String]) extends
/** @inheritdoc */
override val bucketPath: String = path
+ /** @inheritdoc */
+ override val properties: Map[String, String] = params
+
/** @inheritdoc */
override def validate(): Unit = ()
diff --git a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
index 797249a9..8396fa6f 100644
--- a/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
+++ b/src/main/scala/com/exasol/cloudetl/bucket/S3Bucket.scala
@@ -8,9 +8,12 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc
/** @inheritdoc */
override val bucketPath: String = path
+ /** @inheritdoc */
+ override val properties: Map[String, String] = params
+
/** @inheritdoc */
override def validate(): Unit =
- Bucket.validate(params, Bucket.S3_PARAMETERS)
+ Bucket.validate(properties, Bucket.S3_PARAMETERS)
/**
* @inheritdoc
diff --git a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala
index 5bcf71bc..0d618605 100644
--- a/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala
+++ b/src/main/scala/com/exasol/cloudetl/parquet/ParquetWriteOptions.scala
@@ -13,9 +13,7 @@ final case class ParquetWriteOptions(
object ParquetWriteOptions {
- @SuppressWarnings(
- Array("org.wartremover.warts.Overloading", "org.danielnixon.extrawarts.StringOpsPartial")
- )
+ @SuppressWarnings(Array("org.wartremover.warts.Overloading"))
def apply(params: Map[String, String]): ParquetWriteOptions = {
val compressionCodec = params.getOrElse("PARQUET_COMPRESSION_CODEC", "").toUpperCase() match {
case "SNAPPY" => CompressionCodecName.SNAPPY
diff --git a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala
index b1449ed7..800258c2 100644
--- a/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala
+++ b/src/main/scala/com/exasol/cloudetl/scriptclasses/ExportTable.scala
@@ -1,7 +1,5 @@
package com.exasol.cloudetl.scriptclasses
-import java.util.UUID
-
import scala.collection.mutable.ListBuffer
import com.exasol.ExaIterator
@@ -9,52 +7,37 @@ import com.exasol.ExaMetadata
import com.exasol.cloudetl.bucket.Bucket
import com.exasol.cloudetl.data.ExaColumnInfo
import com.exasol.cloudetl.data.Row
-import com.exasol.cloudetl.parquet.ParquetRowWriter
-import com.exasol.cloudetl.parquet.ParquetWriteOptions
+import com.exasol.cloudetl.sink.BatchSizedSink
import com.exasol.cloudetl.util.SchemaUtil
import com.typesafe.scalalogging.LazyLogging
-import org.apache.hadoop.fs.Path
@SuppressWarnings(Array("org.wartremover.warts.Var"))
object ExportTable extends LazyLogging {
def run(meta: ExaMetadata, iter: ExaIterator): Unit = {
- val bucketPath = iter.getString(0)
val params = Bucket.keyValueStringToMap(iter.getString(1))
val bucket = Bucket(params)
val srcColumnNames = iter.getString(2).split("\\.")
val firstColumnIdx = 3
- val columns = getColumns(meta, srcColumnNames, firstColumnIdx)
-
- val parquetFilename = generateParquetFilename(meta)
- val path = new Path(bucketPath, parquetFilename)
- val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema")
- val options = ParquetWriteOptions(params)
- val writer = ParquetRowWriter(path, bucket.getConfiguration(), messageType, options)
val nodeId = meta.getNodeId
val vmId = meta.getVmId
+ val columns = getColumns(meta, srcColumnNames, firstColumnIdx)
+
+ val sink = new BatchSizedSink(nodeId, vmId, iter.size(), columns, bucket)
+
logger.info(s"Starting export from node: $nodeId, vm: $vmId.")
- var count = 0;
do {
val row = getRow(iter, firstColumnIdx, columns)
- writer.write(row)
- count = count + 1
+ sink.write(row)
} while (iter.next())
- writer.close()
-
- logger.info(s"Exported '$count' records from node: $nodeId, vm: $vmId.")
- }
+ sink.close()
- private[this] def generateParquetFilename(meta: ExaMetadata): String = {
- val nodeId = meta.getNodeId
- val vmId = meta.getVmId
- val uuidStr = UUID.randomUUID.toString.replaceAll("-", "")
- s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet"
+ logger.info(s"Exported '${sink.getTotalRecords()}' records from node: $nodeId, vm: $vmId.")
}
private[this] def getRow(iter: ExaIterator, startIdx: Int, columns: Seq[ExaColumnInfo]): Row = {
diff --git a/src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala b/src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala
new file mode 100644
index 00000000..0d85f36f
--- /dev/null
+++ b/src/main/scala/com/exasol/cloudetl/sink/BatchSizedSink.scala
@@ -0,0 +1,114 @@
+package com.exasol.cloudetl.sink
+
+import java.util.UUID
+
+import com.exasol.cloudetl.bucket.Bucket
+import com.exasol.cloudetl.data.ExaColumnInfo
+import com.exasol.cloudetl.data.Row
+import com.exasol.cloudetl.parquet.ParquetRowWriter
+import com.exasol.cloudetl.parquet.ParquetWriteOptions
+import com.exasol.cloudetl.util.SchemaUtil
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.hadoop.fs.Path
+
+/**
+ * A specific [[Sink]] implementation with records per file request.
+ *
+ * Given the number of records for each file and total number of
+ * records, it is possible to balance the exported file sizes. Thus,
+ * small files are not created for the last records.
+ */
+@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.Var"))
+final class BatchSizedSink(
+ nodeId: Long,
+ vmId: String,
+ numOfRecords: Long,
+ columns: Seq[ExaColumnInfo],
+ override val bucket: Bucket
+) extends Sink[Row]
+ with LazyLogging {
+
+ // scalastyle:off null
+
+ final val DEFAULT_BATCH_SIZE: Int = 100000
+
+ private val requestedBatchSize: Int =
+ bucket.properties.get("EXPORT_BATCH_SIZE").fold(DEFAULT_BATCH_SIZE)(_.toInt)
+
+ private val numOfBuckets: Long = math.ceil(numOfRecords / requestedBatchSize.toDouble).toLong
+ private val batchSize: Long = math.floor(numOfRecords / numOfBuckets.toDouble).toLong
+ private var leftOvers: Long = numOfRecords % numOfBuckets
+
+ private var writer: Writer[Row] = null
+ private var recordsCount: Long = 0
+ private var totalRecords: Long = 0
+
+ /** Returns the total number of records written so far. */
+ def getTotalRecords(): Long = totalRecords
+
+ /** @inheritdoc */
+ override def createWriter(path: String): Writer[Row] = new Writer[Row] {
+ val newPath = new Path(bucket.bucketPath, path)
+ val messageType = SchemaUtil.createParquetMessageType(columns, "exasol_export_schema")
+ val options = ParquetWriteOptions(bucket.properties)
+ val writer = ParquetRowWriter(newPath, bucket.getConfiguration(), messageType, options)
+
+ override def write(value: Row): Unit =
+ writer.write(value)
+
+ override def close(): Unit =
+ writer.close()
+ }
+
+ /**
+ * @inheritdoc
+ *
+ * We check if the number of records written so far is more than the
+ * next batch, if so closes current writer and creates a new one with
+ * a different file path.
+ */
+ override def write(value: Row): Unit = {
+ if (shouldRoll()) {
+ openNewFile()
+ }
+ recordsCount += 1
+ writer.write(value)
+ }
+
+ /** @inheritdoc */
+ override def close(): Unit = {
+ totalRecords += recordsCount
+ recordsCount = 0
+ if (writer != null) {
+ writer.close()
+ }
+ }
+
+ @SuppressWarnings(Array("org.wartremover.warts.Return"))
+ private def shouldRoll(): Boolean = {
+ if (writer == null) {
+ return true // scalastyle:ignore return
+ }
+ if (leftOvers > 0 && recordsCount >= batchSize + 1) {
+ leftOvers -= 1
+ true
+ } else if (leftOvers == 0 && recordsCount >= batchSize) {
+ true
+ } else {
+ false
+ }
+ }
+
+ private def openNewFile(): Unit = {
+ close()
+ writer = createWriter(getNewPath())
+ }
+
+ private def getNewPath(): String = {
+ val uuidStr = UUID.randomUUID.toString.replaceAll("-", "")
+ s"exa_export_${nodeId}_${vmId}_$uuidStr.parquet"
+ }
+
+ // scalastyle:on
+}
diff --git a/src/main/scala/com/exasol/cloudetl/sink/Sink.scala b/src/main/scala/com/exasol/cloudetl/sink/Sink.scala
new file mode 100644
index 00000000..4ded5507
--- /dev/null
+++ b/src/main/scala/com/exasol/cloudetl/sink/Sink.scala
@@ -0,0 +1,52 @@
+package com.exasol.cloudetl.sink
+
+import com.exasol.cloudetl.bucket.Bucket
+
+/**
+ * An abstract sink representation.
+ */
+abstract class Sink[T] {
+
+ /**
+ * The specific [[com.exasol.cloudetl.bucket.Bucket]] where the files
+ * will be exported.
+ */
+ val bucket: Bucket
+
+ /**
+ * Creates a format (parquet, avro, etc) specific writer.
+ *
+ * @param path The file path this writer going to write
+ */
+ def createWriter(path: String): Writer[T]
+
+ /**
+ * Writes the provided value.
+ *
+ * @param value The specific value to write
+ */
+ def write(value: T): Unit
+
+ /**
+ * Finally close the resource used for this sink.
+ */
+ def close(): Unit
+
+}
+
+/**
+ * An interface for data writers.
+ */
+trait Writer[T] {
+
+ /**
+ * Writes the provided value to the path.
+ *
+ * @param value The value to write
+ */
+ def write(value: T): Unit
+
+ /** Closes the writer. */
+ def close(): Unit
+
+}
diff --git a/src/test/scala/com/exasol/cloudetl/TestUtils.scala b/src/test/scala/com/exasol/cloudetl/TestUtils.scala
new file mode 100644
index 00000000..ac842cf6
--- /dev/null
+++ b/src/test/scala/com/exasol/cloudetl/TestUtils.scala
@@ -0,0 +1,46 @@
+package com.exasol.cloudetl
+
+import java.io.IOException
+import java.math.BigDecimal
+import java.nio.file._
+import java.nio.file.attribute.BasicFileAttributes
+import java.sql.Date
+import java.sql.Timestamp
+
+trait TestUtils {
+
+ val BIG_DECIMAL_VALUE1: BigDecimal = new BigDecimal("5555555555555555555555555555555.55555")
+ val BIG_DECIMAL_VALUE2: BigDecimal = new BigDecimal("5555555555555555555555555555555.55555")
+ val DATE_VALUE1: Date = new Date(System.currentTimeMillis())
+ val DATE_VALUE2: Date = new Date(System.currentTimeMillis())
+ val TIMESTAMP_VALUE1: Timestamp = new Timestamp(System.currentTimeMillis())
+ val TIMESTAMP_VALUE2: Timestamp = new Timestamp(System.currentTimeMillis())
+
+ val rawRecords: Seq[Seq[Object]] = Seq(
+ Seq(1, 3L, BIG_DECIMAL_VALUE1, 3.14d, "xyz", true, DATE_VALUE1, TIMESTAMP_VALUE1),
+ Seq(2, 4L, BIG_DECIMAL_VALUE2, 0.13d, "abc", false, DATE_VALUE2, TIMESTAMP_VALUE2)
+ ).map { seq =>
+ seq.map(_.asInstanceOf[AnyRef])
+ }
+
+ final def createTemporaryFolder(name: String): Path =
+ Files.createTempDirectory(name)
+
+ final def deleteFiles(dir: Path): Unit = {
+ Files.walkFileTree(
+ dir,
+ new SimpleFileVisitor[Path] {
+ override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+ Files.delete(file)
+ FileVisitResult.CONTINUE
+ }
+ override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+ Files.delete(dir)
+ FileVisitResult.CONTINUE
+ }
+ }
+ )
+ ()
+ }
+
+}
diff --git a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala
index 6a50a175..76e46f21 100644
--- a/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala
+++ b/src/test/scala/com/exasol/cloudetl/scriptclasses/ExportTableSuite.scala
@@ -1,20 +1,23 @@
package com.exasol.cloudetl.scriptclasses
-import java.io.IOException
-import java.nio.file._
-import java.nio.file.attribute.BasicFileAttributes
+import java.nio.file.Path
import com.exasol.ExaIterator
import com.exasol.ExaMetadata
+import com.exasol.cloudetl.TestUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.ExtraMockito
import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfterEach
-@SuppressWarnings(Array("org.wartremover.warts.JavaSerializable"))
-class ExportTableSuite extends BaseSuite {
+@SuppressWarnings(Array("org.wartremover.warts.JavaSerializable", "org.wartremover.warts.Var"))
+class ExportTableSuite extends BaseSuite with BeforeAndAfterEach with TestUtils {
- val srcColumns: Seq[String] = Seq(
+ private var outputPath: Path = _
+ private var exasolMetadata: ExaMetadata = _
+ private var exasolIterator: ExaIterator = _
+ private val srcColumns: Seq[String] = Seq(
"c_int",
"c_long",
"c_decimal",
@@ -25,40 +28,27 @@ class ExportTableSuite extends BaseSuite {
"c_timestamp"
)
- private val bd1 = new java.math.BigDecimal("5555555555555555555555555555555.55555")
- private val bd2 = new java.math.BigDecimal("5555555555555555555555555555555.55555")
- private val dt1 = new java.sql.Date(System.currentTimeMillis())
- private val dt2 = new java.sql.Date(System.currentTimeMillis())
- private val ts1 = new java.sql.Timestamp(System.currentTimeMillis())
- private val ts2 = new java.sql.Timestamp(System.currentTimeMillis())
-
- val records: Seq[Seq[Object]] = Seq(
- Seq(1, 3L, bd1, 3.14d, "xyz", true, dt1, ts1),
- Seq(2, 4L, bd2, 0.13d, "abc", false, dt2, ts2)
- ).map { seq =>
- seq.map(_.asInstanceOf[AnyRef])
+ final def createMockedIterator(resourceDir: String): ExaIterator = {
+ val mockedIterator = commonExaIterator(resourceDir)
+ when(mockedIterator.getString(2)).thenReturn(srcColumns.mkString("."))
+ when(mockedIterator.next()).thenReturn(true, false)
+ when(mockedIterator.size()).thenReturn(2L)
+
+ when(mockedIterator.getInteger(3)).thenReturn(1, 2)
+ when(mockedIterator.getLong(4)).thenReturn(3L, 4L)
+ when(mockedIterator.getBigDecimal(5)).thenReturn(BIG_DECIMAL_VALUE1, BIG_DECIMAL_VALUE2)
+ when(mockedIterator.getDouble(6)).thenReturn(3.14, 0.13)
+ when(mockedIterator.getString(7)).thenReturn("xyz", "abc")
+ when(mockedIterator.getBoolean(8)).thenReturn(true, false)
+ when(mockedIterator.getDate(9)).thenReturn(DATE_VALUE1, DATE_VALUE2)
+ when(mockedIterator.getTimestamp(10)).thenReturn(TIMESTAMP_VALUE1, TIMESTAMP_VALUE2)
+
+ mockedIterator
}
- final def createMockedIter(resourceDir: String): ExaIterator = {
- val mockedIter = commonExaIterator(resourceDir)
- when(mockedIter.getString(2)).thenReturn(srcColumns.mkString("."))
- when(mockedIter.next()).thenReturn(true, false)
-
- when(mockedIter.getInteger(3)).thenReturn(1, 2)
- when(mockedIter.getLong(4)).thenReturn(3L, 4L)
- when(mockedIter.getBigDecimal(5)).thenReturn(bd1, bd2)
- when(mockedIter.getDouble(6)).thenReturn(3.14, 0.13)
- when(mockedIter.getString(7)).thenReturn("xyz", "abc")
- when(mockedIter.getBoolean(8)).thenReturn(true, false)
- when(mockedIter.getDate(9)).thenReturn(dt1, dt2)
- when(mockedIter.getTimestamp(10)).thenReturn(ts1, ts2)
-
- mockedIter
- }
-
- final def createMockedMeta(): ExaMetadata = {
- val mockedMeta = mock[ExaMetadata]
- when(mockedMeta.getInputColumnCount()).thenReturn(11L)
+ final def createMockedMetadata(): ExaMetadata = {
+ val mockedMetadata = mock[ExaMetadata]
+ when(mockedMetadata.getInputColumnCount()).thenReturn(11L)
val returns = Seq(
(3, classOf[java.lang.Integer], 0L, 0L, 0L),
(4, classOf[java.lang.Long], 0L, 0L, 0L),
@@ -71,79 +61,59 @@ class ExportTableSuite extends BaseSuite {
)
returns.foreach {
case (idx, cls, prec, scale, len) =>
- ExtraMockito.doReturn(cls).when(mockedMeta).getInputColumnType(idx)
- when(mockedMeta.getInputColumnPrecision(idx)).thenReturn(prec)
- when(mockedMeta.getInputColumnScale(idx)).thenReturn(scale)
- when(mockedMeta.getInputColumnLength(idx)).thenReturn(len)
+ ExtraMockito.doReturn(cls).when(mockedMetadata).getInputColumnType(idx)
+ when(mockedMetadata.getInputColumnPrecision(idx)).thenReturn(prec)
+ when(mockedMetadata.getInputColumnScale(idx)).thenReturn(scale)
+ when(mockedMetadata.getInputColumnLength(idx)).thenReturn(len)
}
- mockedMeta
+ mockedMetadata
}
- test("`run` should export the Exasol rows from ExaIterator") {
- val tempDir = Files.createTempDirectory("exportTableTest")
+ override final def beforeEach(): Unit = {
+ outputPath = createTemporaryFolder("exportTableTest")
+ exasolMetadata = createMockedMetadata()
+ exasolIterator = createMockedIterator(outputPath.toUri.toString)
+ ()
+ }
- val meta = createMockedMeta()
- val iter = createMockedIter(tempDir.toUri.toString)
+ override final def afterEach(): Unit = {
+ deleteFiles(outputPath)
+ ()
+ }
- ExportTable.run(meta, iter)
+ test("`run` should export the Exasol rows from ExaIterator") {
+ ExportTable.run(exasolMetadata, exasolIterator)
- verify(meta, times(1)).getInputColumnCount
+ verify(exasolMetadata, times(1)).getInputColumnCount
for { idx <- 3 to 10 } {
- verify(meta, times(1)).getInputColumnType(idx)
- verify(meta, times(1)).getInputColumnPrecision(idx)
- verify(meta, times(1)).getInputColumnScale(idx)
- verify(meta, times(1)).getInputColumnLength(idx)
+ verify(exasolMetadata, times(1)).getInputColumnType(idx)
+ verify(exasolMetadata, times(1)).getInputColumnPrecision(idx)
+ verify(exasolMetadata, times(1)).getInputColumnScale(idx)
+ verify(exasolMetadata, times(1)).getInputColumnLength(idx)
}
- verify(iter, times(2)).getInteger(3)
- verify(iter, times(2)).getLong(4)
- verify(iter, times(2)).getBigDecimal(5)
- verify(iter, times(2)).getDouble(6)
- verify(iter, times(2)).getString(7)
- verify(iter, times(2)).getBoolean(8)
- verify(iter, times(2)).getDate(9)
- verify(iter, times(2)).getTimestamp(10)
-
- deleteFiles(tempDir)
+ verify(exasolIterator, times(2)).getInteger(3)
+ verify(exasolIterator, times(2)).getLong(4)
+ verify(exasolIterator, times(2)).getBigDecimal(5)
+ verify(exasolIterator, times(2)).getDouble(6)
+ verify(exasolIterator, times(2)).getString(7)
+ verify(exasolIterator, times(2)).getBoolean(8)
+ verify(exasolIterator, times(2)).getDate(9)
+ verify(exasolIterator, times(2)).getTimestamp(10)
}
test("import exported rows from a file") {
- val tempDir = Files.createTempDirectory("importExportTableTest")
- val meta = createMockedMeta()
- val iter = createMockedIter(tempDir.toUri.toString)
-
- ExportTable.run(meta, iter)
+ ExportTable.run(exasolMetadata, exasolIterator)
val importIter = commonExaIterator(resourceImportBucket)
when(importIter.next()).thenReturn(false)
- when(importIter.getString(2)).thenReturn(tempDir.toUri.toString)
+ when(importIter.getString(2)).thenReturn(outputPath.toUri.toString)
ImportFiles.run(mock[ExaMetadata], importIter)
val totalRecords = 2
verify(importIter, times(totalRecords)).emit(Seq(any[Object]): _*)
-
- // TODO: verify each emitted row
-
- deleteFiles(tempDir)
- }
-
- final def deleteFiles(dir: Path): Unit = {
- Files.walkFileTree(
- dir,
- new SimpleFileVisitor[Path] {
- override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
- Files.delete(file)
- FileVisitResult.CONTINUE
- }
- override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
- Files.delete(dir)
- FileVisitResult.CONTINUE
- }
- }
- )
- ()
}
}
diff --git a/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkSuite.scala b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkSuite.scala
new file mode 100644
index 00000000..6613b00b
--- /dev/null
+++ b/src/test/scala/com/exasol/cloudetl/sink/BatchSizedSinkSuite.scala
@@ -0,0 +1,62 @@
+package com.exasol.cloudetl.sink
+
+import java.nio.file.Path
+
+import com.exasol.cloudetl.TestUtils
+import com.exasol.cloudetl.bucket.LocalBucket
+import com.exasol.cloudetl.data.ExaColumnInfo
+import com.exasol.cloudetl.data.Row
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FunSuite
+
+@SuppressWarnings(Array("org.wartremover.warts.Var"))
+class BatchSizedSinkSuite extends FunSuite with BeforeAndAfterEach with TestUtils {
+
+ private var outputPath: Path = _
+
+ private val columnMetadata: Seq[ExaColumnInfo] = Seq(
+ ExaColumnInfo("c_int", classOf[java.lang.Integer], 0, 0, 0),
+ ExaColumnInfo("c_long", classOf[java.lang.Long], 0, 0, 0),
+ ExaColumnInfo("c_decimal", classOf[java.math.BigDecimal], 36, 5, 0),
+ ExaColumnInfo("c_double", classOf[java.lang.Double], 0, 0, 0),
+ ExaColumnInfo("c_string", classOf[java.lang.String], 0, 0, 3),
+ ExaColumnInfo("c_boolean", classOf[java.lang.Boolean], 0, 0, 0),
+ ExaColumnInfo("c_date", classOf[java.sql.Date], 0, 0, 0),
+ ExaColumnInfo("c_timestamp", classOf[java.sql.Timestamp], 0, 0, 0)
+ )
+
+ private val rows: Seq[Row] = rawRecords.map(Row(_))
+
+ override final def beforeEach(): Unit = {
+ outputPath = createTemporaryFolder("batchSizedSinkTest")
+ ()
+ }
+
+ override final def afterEach(): Unit = {
+ deleteFiles(outputPath)
+ ()
+ }
+
+ test("export single file with default batch size") {
+ val bucket = LocalBucket(outputPath.toUri.toString, Map("EXPORT_BATCH_SIZE" -> "4"))
+ val sink = new BatchSizedSink(1L, "vm1", 2, columnMetadata, bucket)
+ rows.foreach { row =>
+ sink.write(row)
+ }
+ sink.close()
+ assert(sink.getTotalRecords() === 2)
+ }
+
+ test("export several files with batch size smaller than total records") {
+ val bucket = LocalBucket(outputPath.toUri.toString, Map("EXPORT_BATCH_SIZE" -> "3"))
+ val sink = new BatchSizedSink(1L, "vm1", 7, columnMetadata, bucket)
+ val newRows = rows ++ rows ++ rows ++ rows.take(1)
+ newRows.foreach { row =>
+ sink.write(row)
+ }
+ sink.close()
+ assert(sink.getTotalRecords() === 7)
+ }
+
+}