Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Sep 19, 2024
1 parent 23bcbd9 commit 26c2bd3
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 135 deletions.
246 changes: 144 additions & 102 deletions jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package magnolify.jmh

import magnolify.parquet.ParquetType.WriteSupport
import magnolify.parquet.{MagnolifyParquetProperties, ParquetType}

import java.util.concurrent.TimeUnit
import magnolify.scalacheck.auto._
import magnolify.test.Simple._
import org.apache.hadoop.conf.Configuration
import org.scalacheck._
import org.openjdk.jmh.annotations._

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._

object MagnolifyBench {
Expand Down Expand Up @@ -92,103 +89,6 @@ class AvroBench {
@Benchmark def avroSchema: Schema = AvroType[Nested].schema
}

@State(Scope.Benchmark)
class ParquetReadState(pt: ParquetType[Nested]) {
import org.apache.parquet.io._
import org.apache.parquet.column.impl.ColumnWriteStoreV1
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.api.InitContext

var reader: RecordReader[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
// Write page
val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema)
val memPageStore = new ParquetInMemoryPageStore(1)
val columns = new ColumnWriteStoreV1(
pt.schema,
memPageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val writeSupport = pt.writeSupport
val recordWriter = columnIO.getRecordWriter(columns)
writeSupport.prepareForWrite(recordWriter)
writeSupport.write(MagnolifyBench.nested)
recordWriter.flush()
columns.flush()

// Read and convert page
val conf = new Configuration()
val readSupport = pt.readSupport
reader = columnIO.getRecordReader(
memPageStore,
readSupport.prepareForRead(
conf,
new java.util.HashMap,
pt.schema,
readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema)))
)
}
}

@State(Scope.Benchmark)
class ParquetWriteState(pt: ParquetType[Nested]) {
import org.apache.parquet.io._
import org.apache.parquet.column.impl.ColumnWriteStoreV1
import org.apache.parquet.column.ParquetProperties

var writer: WriteSupport[Nested] = null

@Setup(Level.Invocation)
def setup(): Unit = {
val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema)
val memPageStore = new ParquetInMemoryPageStore(1)
val columns = new ColumnWriteStoreV1(
pt.schema,
memPageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val writeSupport = pt.writeSupport
val recordWriter = columnIO.getRecordWriter(columns)
writeSupport.prepareForWrite(recordWriter)
this.writer = writeSupport
}
}

object ParquetStates {
def confWithGroupedArraysProp(propValue: Boolean): Configuration = {
val conf = new Configuration()
conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue)
conf
}
class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false)))
class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false)))

class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true)))
class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true)))
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetBench {
import MagnolifyBench._

@Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested)
@Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read()
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetAvroCompatBench {
import MagnolifyBench._

@Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested)
@Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read()
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
Expand Down Expand Up @@ -259,7 +159,149 @@ class ExampleBench {
private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get
private val example = exampleType.to(exampleNested).build()
@Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested)
@Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap)
@Benchmark def exampleFrom: ExampleNested =
exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap)
}

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
class ParquetBench {
import MagnolifyBench._
import ParquetStates._
import magnolify.avro._
import org.apache.avro.generic.GenericRecord

private val genericRecord = AvroType[Nested].to(nested)

@Benchmark def parquetWriteMagnolify(state: ParquetCaseClassWriteState): Unit =
state.writer.write(nested)
@Benchmark def parquetWriteAvro(state: ParquetAvroWriteState): Unit =
state.writer.write(genericRecord)

@Benchmark def parquetReadMagnolify(state: ParquetCaseClassReadState): Nested =
state.reader.read()
@Benchmark def parquetReadAvro(state: ParquetAvroReadState): GenericRecord = state.reader.read()
}

@nowarn("cat=deprecation")
object ParquetStates {
import MagnolifyBench._
import magnolify.avro._
import magnolify.parquet._
import magnolify.parquet.ParquetArray.AvroCompat._
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.conf.PlainParquetConfiguration
import org.apache.parquet.avro.{AvroReadSupport, AvroWriteSupport}
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.io._
import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.column.impl.ColumnWriteStoreV1

@State(Scope.Benchmark)
class ReadState[T](
schema: MessageType,
writeSupport: WriteSupport[T],
readSupport: ReadSupport[T],
record: T
) {
import org.apache.parquet.hadoop.api.InitContext

var reader: RecordReader[T] = null

@Setup(Level.Trial)
def setup(): Unit = {
// Write page
val columnIO = new ColumnIOFactory(true).getColumnIO(schema)
val pageStore = new ParquetInMemoryPageStore(1)
val columnWriteStore = new ColumnWriteStoreV1(
schema,
pageStore,
ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build
)
val recordConsumer = columnIO.getRecordWriter(columnWriteStore)
writeSupport.init(new PlainParquetConfiguration())
writeSupport.prepareForWrite(recordConsumer)
writeSupport.write(record)
recordConsumer.flush()
columnWriteStore.flush()

// Set up reader
val conf = new Configuration()
reader = columnIO.getRecordReader(
pageStore,
readSupport.prepareForRead(
conf,
new java.util.HashMap,
schema,
readSupport.init(new InitContext(conf, new java.util.HashMap, schema))
)
): @nowarn("cat=deprecation")
}
}

@State(Scope.Benchmark)
class WriteState[T](writeSupport: WriteSupport[T]) {
val writer = writeSupport

@Setup(Level.Trial)
def setup(): Unit = {
writeSupport.init(new PlainParquetConfiguration())
// Use a no-op RecordConsumer; we want to measure only the record -> group conversion, and not pollute the
// benchmark with background tasks like flushing pages/blocks or validating records
writeSupport.prepareForWrite(new RecordConsumer {
override def startMessage(): Unit = {}
override def endMessage(): Unit = {}
override def startField(field: String, index: Int): Unit = {}
override def endField(field: String, index: Int): Unit = {}
override def startGroup(): Unit = {}
override def endGroup(): Unit = {}
override def addInteger(value: Int): Unit = {}
override def addLong(value: Long): Unit = {}
override def addBoolean(value: Boolean): Unit = {}
override def addBinary(value: Binary): Unit = {}
override def addFloat(value: Float): Unit = {}
override def addDouble(value: Double): Unit = {}
})
}
}

// R/W support for Group <-> Case Class Conversion (magnolify-parquet)
private val parquetType = ParquetType[Nested]
class ParquetCaseClassReadState
extends ParquetStates.ReadState[Nested](
parquetType.schema,
parquetType.writeSupport,
parquetType.readSupport,
nested
)
class ParquetCaseClassWriteState
extends ParquetStates.WriteState[Nested](parquetType.writeSupport)

// R/W support for Group <-> Avro Conversion (parquet-avro)
private val avroType = AvroType[Nested]
class ParquetAvroReadState
extends ParquetStates.ReadState[GenericRecord](
parquetType.schema,
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
),
new AvroReadSupport[GenericRecord](GenericData.get()),
avroType.to(nested)
)
class ParquetAvroWriteState
extends ParquetStates.WriteState[GenericRecord](
new AvroWriteSupport[GenericRecord](
parquetType.schema,
parquetType.avroSchema,
GenericData.get()
)
)
}

// Collections are not supported
Expand Down
Loading

0 comments on commit 26c2bd3

Please sign in to comment.