Skip to content

Commit

Permalink
Make Parquet benchmark more granular
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Sep 18, 2024
1 parent da8fb91 commit 23bcbd9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 24 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ lazy val jmh: Project = project
cats % Test,
datastore % Test,
guava % Test,
parquet % "test->test",
parquet % Test,
protobuf % "test->test",
scalacheck % Test,
tensorflow % Test,
Expand Down
72 changes: 49 additions & 23 deletions jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package magnolify.jmh

import magnolify.parquet.{MagnolifyParquetProperties, ParquetType, TestInputFile, TestOutputFile}
import magnolify.parquet.ParquetType.WriteSupport
import magnolify.parquet.{MagnolifyParquetProperties, ParquetType}

import java.util.concurrent.TimeUnit
import magnolify.scalacheck.auto._
import magnolify.test.Simple._
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.scalacheck._
import org.openjdk.jmh.annotations._

Expand Down Expand Up @@ -94,39 +94,65 @@ class AvroBench {

@State(Scope.Benchmark)
class ParquetReadState(pt: ParquetType[Nested]) {
var out: TestOutputFile = null
var reader: ParquetReader[Nested] = null
import org.apache.parquet.io._
import org.apache.parquet.column.impl.ColumnWriteStoreV1
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.api.InitContext

var reader: RecordReader[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
out = new TestOutputFile
val writer = pt.writeBuilder(out).build()
writer.write(MagnolifyBench.nested)
writer.close()

val in = new TestInputFile(out.getBytes)
reader = pt.readBuilder(in).build()
}
// Write page
val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema)
val memPageStore = new ParquetInMemoryPageStore(1)
val columns = new ColumnWriteStoreV1(
pt.schema,
memPageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val writeSupport = pt.writeSupport
val recordWriter = columnIO.getRecordWriter(columns)
writeSupport.prepareForWrite(recordWriter)
writeSupport.write(MagnolifyBench.nested)
recordWriter.flush()
columns.flush()

@TearDown(Level.Invocation)
def tearDown(): Unit = {
reader.close()
// Read and convert page
val conf = new Configuration()
val readSupport = pt.readSupport
reader = columnIO.getRecordReader(
memPageStore,
readSupport.prepareForRead(
conf,
new java.util.HashMap,
pt.schema,
readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema)))
)
}
}

@State(Scope.Benchmark)
class ParquetWriteState(pt: ParquetType[Nested]) {
var writer: ParquetWriter[Nested] = null
import org.apache.parquet.io._
import org.apache.parquet.column.impl.ColumnWriteStoreV1
import org.apache.parquet.column.ParquetProperties

var writer: WriteSupport[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
val out = new TestOutputFile
writer = pt.writeBuilder(out).build()
}

@TearDown(Level.Invocation)
def tearDown(): Unit = {
writer.close()
val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema)
val memPageStore = new ParquetInMemoryPageStore(1)
val columns = new ColumnWriteStoreV1(
pt.schema,
memPageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val writeSupport = pt.writeSupport
val recordWriter = columnIO.getRecordWriter(columns)
writeSupport.prepareForWrite(recordWriter)
this.writer = writeSupport
}
}

Expand Down
77 changes: 77 additions & 0 deletions jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package magnolify.jmh

import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator}
import org.apache.parquet.column.{ColumnDescriptor, Encoding}
import org.apache.parquet.column.page._
import org.apache.parquet.column.statistics._

import scala.collection.mutable

/**
* An in-memory Parquet page store modeled after parquet-java's MemPageStore, used to benchmark
* ParquetType conversion between Parquet Groups and Scala case classes
*/
class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWriteStore {
lazy val writers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryWriter]()
lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]()

override def getPageReader(path: ColumnDescriptor): PageReader =
readers.getOrElseUpdate(path, {
val writer = writers(path)
new ParquetInMemoryReader(writer.numValues, writer.pages.toList, writer.dictionaryPage)
})

override def getPageWriter(path: ColumnDescriptor): PageWriter =
writers.getOrElseUpdate(path, new ParquetInMemoryWriter())

override def getRowCount: Long = rowCount
}

class ParquetInMemoryReader(valueCount: Long, pages: List[DataPage], dictionaryPage: DictionaryPage) extends PageReader {
lazy val pagesIt = pages.iterator
override def readDictionaryPage(): DictionaryPage = dictionaryPage
override def getTotalValueCount: Long = valueCount
override def readPage(): DataPage = pagesIt.next()
}

class ParquetInMemoryWriter extends PageWriter {
var numRows = 0
var numValues: Long = 0
var memSize: Long = 0
val pages = new mutable.ListBuffer[DataPage]()
var dictionaryPage: DictionaryPage = null

override def writePage(bytesInput: BytesInput, valueCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = {
writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding)
}

override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], sizeStatistics: SizeStatistics, rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = {
writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding)
}

override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = {
pages.addOne(new DataPageV1(
bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)),
valueCount,
bytesInput.size().toInt,
statistics,
rlEncoding,
dlEncoding,
valuesEncoding))
memSize += bytesInput.size()
numRows += rowCount
numValues += valueCount
}

override def writePageV2(rowCount: Int, nullCount: Int, valueCount: Int, repetitionLevels: BytesInput, definitionLevels: BytesInput, dataEncoding: Encoding, data: BytesInput, statistics: Statistics[_]): Unit = ???

override def getMemSize: Long = memSize

override def allocatedSize(): Long = memSize

override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = {
this.dictionaryPage = dictionaryPage
}

override def memUsageString(prefix: String): String = s"$prefix $memSize bytes"
}

0 comments on commit 23bcbd9

Please sign in to comment.