Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(memory) BinaryVector fixes for M1 Mac #1429

Merged
merged 2 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion memory/src/main/scala/filodb.memory/format/BinaryVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ object BinaryVector {
*/
type BinaryVectorPtr = BinaryRegion.NativePointer

val isArm64 = System.getProperty("os.arch").equalsIgnoreCase("aarch64")

/**
* Returns the vector type and subtype from the WireFormat bytes of a BinaryVector
*/
Expand Down Expand Up @@ -343,7 +345,21 @@ trait BinaryAppendableVector[@specialized(Int, Long, Double, Boolean) A] {
def checkSize(need: Int, have: Int): AddResponse =
if (!(need <= have)) VectorTooSmall(need, have) else Ack

@inline final def incNumBytes(inc: Int): Unit = UnsafeUtils.unsafe.getAndAddInt(UnsafeUtils.ZeroPointer, addr, inc)
@inline final def incNumBytes(inc: Int): Unit =
if (BinaryVector.isArm64) {
// This is a temporary fix for M1 Mac till the operation
// UnsafeUtils.unsafe.getAndAddInt(UnsafeUtils.ZeroPointer, addr, inc) works. Issue with compareAndSet
// failing with a SIGBUS when a getAndAddInt is invoked on an address offset that is one of the three
// preceding every multiple of 16. For e.g when addrs is 13, 14, 15, 29, 30, 31, 45, 46, 47 byte offset from the
// base offset, getAndAddInt fails, however, when we perform get, increment and put in a non atomic operation
// it succeeds. This temporary work around for non atomic CAS will only happen on M1 Mac laptops till a fix is
// in place and we no longer need to handle these cases separately
// Since this code is invoked in ingestion path, ingestion performance needs benchmarking even for non M1 arch
val newIncVal = UnsafeUtils.unsafe.getInt(UnsafeUtils.ZeroPointer, addr) + inc
UnsafeUtils.unsafe.putInt(UnsafeUtils.ZeroPointer, addr, newIncVal)
} else {
UnsafeUtils.unsafe.getAndAddInt(UnsafeUtils.ZeroPointer, addr, inc)
}

/**
* Allocates a new instance of itself growing by factor growFactor.
Expand Down
14 changes: 7 additions & 7 deletions memory/src/test/scala/filodb.memory/BlockMemFactorySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class BlockMemFactorySpec extends AnyFlatSpec with Matchers {
val stats = new MemoryStats(Map("test1" -> "test1"))
val blockManager = new PageAlignedBlockManager(2048 * 1024, stats, testReclaimer, 1, evictionLock)
val bmf = new BlockMemFactory(blockManager, 50, Map("test" -> "val"), false)

val allocationSize = 1000 * PageManager.getInstance().pageSize().toInt / 4096
// simulate encoding of multiple ts partitions in flush group

for { flushGroup <- 0 to 1 } {
for {tsParts <- 0 to 5} {
bmf.startMetaSpan()
for {chunks <- 0 to 3} {
bmf.allocateOffheap(1000)
bmf.allocateOffheap(allocationSize)
}
bmf.endMetaSpan(d => {}, 45)
}
Expand All @@ -50,15 +50,15 @@ class BlockMemFactorySpec extends AnyFlatSpec with Matchers {
it should "Mark all blocks of BlockMemFactory as reclaimable when used in ODP by DemandPagedChunkStore" in {
val stats = new MemoryStats(Map("test1" -> "test1"))
val blockManager = new PageAlignedBlockManager(2048 * 1024, stats, testReclaimer, 1, evictionLock)

val allocationSize = 1000 * PageManager.getInstance().pageSize().toInt / 4096
// create block mem factories for different time buckets
val bmf = new BlockMemFactory(blockManager, 50, Map("test" -> "val"), true)

// simulate paging in chunks from cassandra
for {tsParts <- 0 to 10} {
bmf.startMetaSpan()
for {chunks <- 0 to 3} {
bmf.allocateOffheap(1000)
bmf.allocateOffheap(allocationSize)
}
bmf.endMetaSpan(d => {}, 45)
}
Expand Down Expand Up @@ -96,13 +96,13 @@ class BlockMemFactorySpec extends AnyFlatSpec with Matchers {

// create block mem factories for different time buckets
val odpFactory = new BlockMemFactory(blockManager, 50, Map("test" -> "val"), true)

val allocationSize = 1000 * PageManager.getInstance().pageSize().toInt / 4096
// simulate encoding of multiple ts partitions in flush group
for { flushGroup <- 0 to 1 } {
for {tsParts <- 0 to 5} {
ingestionFactory.startMetaSpan()
for {chunks <- 0 to 3} {
ingestionFactory.allocateOffheap(1000)
ingestionFactory.allocateOffheap(allocationSize)
}
ingestionFactory.endMetaSpan(d => {}, 45)
}
Expand All @@ -119,7 +119,7 @@ class BlockMemFactorySpec extends AnyFlatSpec with Matchers {
for {tsParts <- 0 to 10} {
odpFactory.startMetaSpan()
for {chunks <- 0 to 3} {
odpFactory.allocateOffheap(1000)
odpFactory.allocateOffheap(allocationSize)
}
odpFactory.endMetaSpan(d => {}, 45)
}
Expand Down
32 changes: 18 additions & 14 deletions memory/src/test/scala/filodb.memory/BlockSpec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package filodb.memory

import scala.language.reflectiveCalls
import com.kenai.jffi.PageManager

import scala.language.reflectiveCalls
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -11,7 +12,9 @@ class BlockSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with Befor

val evictionLock = new EvictionLock
val stats = new MemoryStats(Map("test1" -> "test1"))
val blockManager = new PageAlignedBlockManager(2048 * 1024, stats, testReclaimer, 1, evictionLock)
val numPagesPerBlock = 1
val blockManager = new PageAlignedBlockManager(2048 * 1024, stats, testReclaimer, numPagesPerBlock, evictionLock)
val expectedEmptyBlockSize = numPagesPerBlock * PageManager.getInstance().pageSize()

before {
testReclaimer.reclaimedBytes = 0
Expand All @@ -24,27 +27,28 @@ class BlockSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with Befor

it should "allocate metadata and report remaining bytes accurately" in {
val block = blockManager.requestBlock(false).get
block.capacity shouldEqual 4096
block.remaining shouldEqual 4096
block.capacity shouldEqual expectedEmptyBlockSize
block.remaining shouldEqual expectedEmptyBlockSize

// Now change the position
block.position(200)
block.remaining shouldEqual 3896
block.remaining shouldEqual (expectedEmptyBlockSize - 200)

// Now allocate some metadata and watch remaining bytes shrink
block.allocMetadata(40)
block.remaining shouldEqual 3854
block.remaining shouldEqual (expectedEmptyBlockSize - 200 - 40 - 2)
}

it should "return null when allocate metadata if not enough space" in {
val block = blockManager.requestBlock(false).get
block.capacity shouldEqual 4096
block.remaining shouldEqual 4096
block.capacity shouldEqual expectedEmptyBlockSize
block.remaining shouldEqual expectedEmptyBlockSize

block.position(3800)
block.remaining shouldEqual (4096-3800)
val newPosition = PageManager.getInstance().pageSize().toInt - 299
block.position(newPosition)
block.remaining shouldEqual (expectedEmptyBlockSize-newPosition)
intercept[OutOfOffheapMemoryException] { block.allocMetadata(300) }
block.remaining shouldEqual (4096-3800) // still same space remaining
block.remaining shouldEqual (expectedEmptyBlockSize-newPosition) // still same space remaining
}

it should "not reclaim when block has not been marked reclaimable" in {
Expand All @@ -55,14 +59,14 @@ class BlockSpec extends AnyFlatSpec with Matchers with BeforeAndAfter with Befor

it should "call reclaimListener with address of all allocated metadatas" in {
val block = blockManager.requestBlock(false).get
block.capacity shouldEqual 4096
block.remaining shouldEqual 4096
block.capacity shouldEqual expectedEmptyBlockSize
block.remaining shouldEqual expectedEmptyBlockSize

block.position(200)
val addr1 = block.allocMetadata(30)
val addr2 = block.allocMetadata(40)

block.remaining shouldEqual 3822
block.remaining shouldEqual (expectedEmptyBlockSize - 200 - 30 - 2 - 40 - 2)

//XXX for debugging
println(block.detailedDebugString)
Expand Down