Skip to content

Commit

Permalink
#280: Improve log messages for import (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaklakariada authored Oct 10, 2023
1 parent f508227 commit f97397e
Show file tree
Hide file tree
Showing 21 changed files with 582 additions and 144 deletions.
2 changes: 1 addition & 1 deletion dependencies.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions doc/changes/changes_2.7.5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Cloud Storage Extension 2.7.5, released 2023-10-10

Code name: Improved log messages

## Summary

This release adds log messages to allow debugging issues during import.

## Features

* #280: Improved log messages for import
## Dependency Updates

### Cloud Storage Extension

#### Compile Dependency Updates

* Updated `com.google.protobuf:protobuf-java:3.24.3` to `3.24.4`

#### Test Dependency Updates

* Updated `org.mockito:mockito-core:5.5.0` to `5.6.0`
* Updated `org.testcontainers:localstack:1.19.0` to `1.19.1`

#### Plugin Dependency Updates

* Updated `com.diffplug.spotless:spotless-maven-plugin:2.39.0` to `2.40.0`
20 changes: 10 additions & 10 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ downloaded jar file is the same as the checksum provided in the releases.
To check the SHA256 result of the local jar, run the command:

```sh
sha256sum exasol-cloud-storage-extension-2.7.4.jar
sha256sum exasol-cloud-storage-extension-2.7.5.jar
```

### Building From Source
Expand Down Expand Up @@ -180,7 +180,7 @@ mvn clean package -DskipTests=true
```

The assembled jar file should be located at
`target/exasol-cloud-storage-extension-2.7.4.jar`.
`target/exasol-cloud-storage-extension-2.7.5.jar`.

### Create an Exasol Bucket

Expand All @@ -202,7 +202,7 @@ for the HTTP protocol.
Upload the jar file using curl command:

```sh
curl -X PUT -T exasol-cloud-storage-extension-2.7.4.jar \
curl -X PUT -T exasol-cloud-storage-extension-2.7.5.jar \
http://w:<WRITE_PASSWORD>@exasol.datanode.domain.com:2580/<BUCKET>/
```

Expand Down Expand Up @@ -234,7 +234,7 @@ OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesImportQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/

CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS (
Expand All @@ -244,12 +244,12 @@ CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS (
end_index DECIMAL(36, 0)
) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/
```

Expand All @@ -268,12 +268,12 @@ OPEN SCHEMA CLOUD_STORAGE_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.TableExportQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
%scriptclass com.exasol.cloudetl.scriptclasses.TableDataExporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/
```

Expand Down Expand Up @@ -407,13 +407,13 @@ CREATE OR REPLACE JAVA SCALAR SCRIPT IMPORT_METADATA(...) EMITS (
) AS
%jvmoption -DHTTPS_PROXY=http://username:password@10.10.1.10:1180
%scriptclass com.exasol.cloudetl.scriptclasses.FilesMetadataReader;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_FILES(...) EMITS (...) AS
%jvmoption -DHTTPS_PROXY=http://username:password@10.10.1.10:1180
%scriptclass com.exasol.cloudetl.scriptclasses.FilesDataImporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.4.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-cloud-storage-extension-2.7.5.jar;
/
```

Expand Down
2 changes: 1 addition & 1 deletion pk_generated_parent.pom

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 8 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.exasol</groupId>
<artifactId>cloud-storage-extension</artifactId>
<version>2.7.4</version>
<version>2.7.5</version>
<name>Cloud Storage Extension</name>
<description>Exasol Cloud Storage Import And Export Extension</description>
<url>https://github.com/exasol/cloud-storage-extension/</url>
<parent>
<artifactId>cloud-storage-extension-generated-parent</artifactId>
<groupId>com.exasol</groupId>
<version>2.7.4</version>
<version>2.7.5</version>
<relativePath>pk_generated_parent.pom</relativePath>
</parent>
<properties>
Expand Down Expand Up @@ -369,7 +369,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.24.3</version>
<version>3.24.4</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
Expand Down Expand Up @@ -424,7 +424,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>3.4.1</version>
<version>3.4.1</version> <!-- Upgrading will let unit tests fail -->
<exclusions>
<exclusion>
<groupId>org.spark-project.spark</groupId>
Expand Down Expand Up @@ -529,7 +529,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.5.0</version>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -547,7 +547,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>1.19.0</version>
<version>1.19.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -746,10 +746,6 @@
<!-- From transitive org.codehaus.janino:janino:3.1.9 dependency -->
<!-- CWE-787: Out-of-bounds Write (5.5); https://ossindex.sonatype.org/vulnerability/CVE-2023-33546 -->
<exclude>CVE-2023-33546</exclude>
<!-- Vulnerability in org.alluxio:alluxio-core-common:jar:300 -->
<!-- CWE-79: Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting') (6.1); -->
<!-- https://ossindex.sonatype.org/vulnerability/CVE-2020-21485 -->
<exclude>CVE-2020-21485</exclude>
<!-- Vulnerabilities in fr.turri:aXMLRPC:1.13.0:test -->
<!-- CWE-611: Improper Restriction of XML External Entity Reference ('XXE') (9.8); -->
<!-- Used in tests only. -->
Expand Down Expand Up @@ -838,7 +834,7 @@
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.39.0</version>
<version>2.40.0</version>
<configuration>
<scala>
<scalafmt>
Expand All @@ -862,7 +858,7 @@
<dependency>
<groupId>com.geirsson</groupId>
<artifactId>metaconfig-pprint_${scala.compat.version}</artifactId>
<version>0.11.1</version>
<version>0.12.0</version>
</dependency>
<dependency>
<groupId>com.github.liancheng</groupId>
Expand Down
22 changes: 20 additions & 2 deletions src/main/scala/com/exasol/cloudetl/emitter/FilesDataEmitter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,49 @@ final case class FilesDataEmitter(properties: StorageProperties, files: Map[Stri
case _ => emitRegularData(context)
}

private[this] def emitRegularData(context: ExaIterator): Unit =
private[this] def emitRegularData(context: ExaIterator): Unit = {
var totalRowCount = 0
files.foreach { case (filename, _) =>
Using(Source(fileFormat, new Path(filename), bucket.getConfiguration(), bucket.fileSystem)) { source =>
var rowCount = 0
source.stream().foreach { row =>
val values = defaultTransformation.transform(transformRegularRowValues(row))
context.emit(values: _*)
rowCount += 1
}
totalRowCount += rowCount
logger.info(s"Imported file $filename with $rowCount rows")
}
}
logger.info(s"Imported ${files.size} files with $totalRowCount rows in total")
}

private[this] def transformRegularRowValues(row: RegularRow): Array[Object] =
row.getValues().map(_.asInstanceOf[Object]).toArray

private[this] def emitParquetData(context: ExaIterator): Unit =
private[this] def emitParquetData(context: ExaIterator): Unit = {
var totalRowCount = 0
var totalIntervalCount = 0
files.foreach { case (filename, intervals) =>
val inputFile = getInputFile(filename)
val converter = ParquetValueConverter(RowParquetReader.getSchema(inputFile))
val source = new RowParquetChunkReader(inputFile, intervals)
var rowCount = 0
source.read(new Consumer[Row] {
override def accept(row: Row): Unit = {
val values = defaultTransformation.transform(converter.convert(row))
context.emit(values: _*)
rowCount += 1
}
})
totalRowCount += rowCount
totalIntervalCount += intervals.size()
logger.info(
s"Imported file $inputFile with $rowCount rows and ${intervals.size()} intervals"
)
}
logger.info(s"Imported ${files.size} files with $totalIntervalCount intervals and $totalRowCount rows in total")
}

private[this] def getInputFile(filename: String): InputFile =
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@ object FilesDataImporter extends LazyLogging {
val files = collectFiles(iterator)
val nodeId = metadata.getNodeId()
val vmId = metadata.getVmId()
var intervalCount = 0
files.foreach { case (filename, intervals) =>
logger.info(s"Intervals '${getIntervalString(intervals)}' for file $filename on node '$nodeId' and vm '$vmId'.")
logger.info(
s"Importing intervals '${getIntervalString(intervals)}' for file $filename on node '$nodeId' and vm '$vmId'."
)
intervalCount += intervals.size()
}
logger.info(s"Importing ${files.size} files with $intervalCount intervals")
FilesDataEmitter(storageProperties, files).emit(iterator)
}

private[this] def collectFiles(iterator: ExaIterator): Map[String, List[ChunkInterval]] = {
def collectFiles(iterator: ExaIterator): Map[String, List[ChunkInterval]] = {
val files = new HashMap[String, List[ChunkInterval]]()
do {
val filename = iterator.getString(FILENAME_STARTING_INDEX)
Expand All @@ -60,11 +65,14 @@ object FilesDataImporter extends LazyLogging {
private[this] def getIntervalString(intervals: List[ChunkInterval]): String = {
val sb = new StringBuilder()
for { i <- 0 until intervals.size() } {
if (i > 0) {
sb.append(", ")
}
sb.append("[")
.append(intervals.get(i).getStartPosition())
.append(",")
.append(intervals.get(i).getEndPosition())
.append("), ")
.append(")")
}
sb.toString()
}
Expand Down
Loading

0 comments on commit f97397e

Please sign in to comment.