Skip to content

Commit

Permalink
add cache for row level annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
zingmane committed May 15, 2024
1 parent 652355a commit 041655e
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 40 deletions.
47 changes: 47 additions & 0 deletions src/main/scala/com/campudus/tableaux/cache/CacheClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,53 @@ class CacheClient(vertxAccess: VertxAccess) extends VertxAccess {
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_PERMISSIONS, obj)
}

def setRowLevelAnnotations(
tableId: TableId,
rowId: RowId,
rowLevelAnnotations: RowLevelAnnotations
): Future[_] = {
val rowValue = Json.obj("value" -> rowLevelAnnotations.getJson)
val obj = rowKey(tableId, rowId).copy().mergeIn(rowValue)
println(s"set cache value for row level annotations: $obj")
eventBus.sendFuture(CacheVerticle.ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, obj, options)
}

def retrieveRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[Option[RowLevelAnnotations]] = {
val obj = rowKey(tableId, rowId)

eventBus
.sendFuture[JsonObject](CacheVerticle.ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, obj, options)
.map(value => {
value match {
case v if v.body().containsKey("value") => {
val rawRowLevelAnnotations = v.body().getValue("value").asInstanceOf[JsonObject]
val finalFlag = rawRowLevelAnnotations.getBoolean("final", false)
val archivedFlag = rawRowLevelAnnotations.getBoolean("archived", false)
Some(RowLevelAnnotations(finalFlag, archivedFlag))
}
case _ => {
None
}
}
})
.recoverWith({
case ex: ReplyException if ex.failureCode() == CacheVerticle.NOT_FOUND_FAILURE =>
Future.successful(None)
case ex =>
Future.failed(ex)
})
}

def invalidateRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[_] = {
val obj = Json.obj("tableId" -> tableId, "rowId" -> rowId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS, obj)
}

def invalidateTableRowLevelAnnotations(tableId: TableId): Future[_] = {
val obj = Json.obj("tableId" -> tableId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS, obj)
}

def invalidateCellValue(tableId: TableId, columnId: ColumnId, rowId: RowId): Future[_] = {
val obj = cellKey(tableId, columnId, rowId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_CELL, obj, options)
Expand Down
199 changes: 161 additions & 38 deletions src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,26 @@ object CacheVerticle {
val NOT_FOUND_FAILURE: Int = 404
val INVALID_MESSAGE: Int = 400

// cell based addresses
val ADDRESS_SET_CELL: String = "cache.set.cell"
val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions"
val ADDRESS_RETRIEVE_CELL: String = "cache.retrieve.cell"
val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions"

val ADDRESS_INVALIDATE_CELL: String = "cache.invalidate.cell"
val ADDRESS_INVALIDATE_COLUMN: String = "cache.invalidate.column"
val ADDRESS_INVALIDATE_ROW: String = "cache.invalidate.row"
val ADDRESS_INVALIDATE_TABLE: String = "cache.invalidate.table"
val ADDRESS_INVALIDATE_ALL: String = "cache.invalidate.all"

// row permissions based addresses
val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions"
val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions"
val ADDRESS_INVALIDATE_ROW_PERMISSIONS: String = "cache.invalidate.row_permissions"

// row level annotations based addresses
val ADDRESS_SET_ROW_LEVEL_ANNOTATIONS: String = "cache.set.row_level_annotations"
val ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS: String = "cache.retrieve.row_level_annotations"
val ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.row_level_annotations"
val ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.table.row_level_annotations"

val TIMEOUT_AFTER_MILLISECONDS: Int = 400

def apply(tableauxConfig: TableauxConfig): CacheVerticle = {
Expand All @@ -62,10 +70,11 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L

type CellCaches = mutable.Map[(TableId, ColumnId), Cache[AnyRef]]
type RowPermissionsCaches = mutable.Map[(TableId), Cache[AnyRef]]
type Caches = Either[CellCaches, RowPermissionsCaches]
type RowLevelAnnotationsCache = mutable.Map[(TableId), Cache[AnyRef]]

private val cellCaches: CellCaches = mutable.Map.empty
private val rowPermissionsCaches: RowPermissionsCaches = mutable.Map.empty
private val rowLevelAnnotationsCache: RowLevelAnnotationsCache = mutable.Map.empty

override def startFuture(): Future[_] = {
logger.info(
Expand All @@ -84,16 +93,31 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
private def registerOnEventBus(): Future[_] = {
Future.sequence(
Seq(
// cell
registerHandler(eventBus, ADDRESS_SET_CELL, messageHandlerSetCell),
registerHandler(eventBus, ADDRESS_RETRIEVE_CELL, messageHandlerRetrieveCell),
registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions),
registerHandler(eventBus, ADDRESS_INVALIDATE_CELL, messageHandlerInvalidateCell),
registerHandler(eventBus, ADDRESS_INVALIDATE_COLUMN, messageHandlerInvalidateColumn),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW, messageHandlerInvalidateRow),
registerHandler(eventBus, ADDRESS_INVALIDATE_TABLE, messageHandlerInvalidateTable),
registerHandler(eventBus, ADDRESS_INVALIDATE_ALL, messageHandlerInvalidateAll),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions)
// row permissions
registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions),
// row level annotations
registerHandler(eventBus, ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, messageHandlerSetRowLevelAnnotations),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, messageHandlerRetrieveRowLevelAnnotations),
registerHandler(
eventBus,
ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS,
messageHandlerInvalidateRowLevelAnnotations
),
registerHandler(
eventBus,
ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS,
messageHandlerInvalidateTableRowLevelAnnotations
)
)
)
}
Expand Down Expand Up @@ -135,6 +159,16 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}
}

private def getRowLevelAnnotationsCache(tableId: TableId): Cache[AnyRef] = {
rowLevelAnnotationsCache.get(tableId) match {
case Some(cache) => cache
case None =>
val cache: Cache[AnyRef] = GuavaCache(createCache())
rowLevelAnnotationsCache.put((tableId), cache)
cache
}
}

private def removeCache(tableId: TableId, columnId: ColumnId): Unit = cellCaches.remove((tableId, columnId))

private def extractTableColumnRow(obj: JsonObject): Option[(TableId, ColumnId, RowId)] = {
Expand Down Expand Up @@ -203,7 +237,6 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
extractTableRow(obj) match {
case Some((tableId, rowId)) => {
implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId)
logger.info(s"messageHandlerSetRowPermissions $obj $tableId $rowId $value")
put(rowId)(value).map(_ => replyJson(message, tableId, rowId))
}
case None => {
Expand Down Expand Up @@ -247,16 +280,117 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
case Some((tableId, columnId, rowId)) =>
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getCellCache(tableId, columnId)

remove(rowId)
.map(_ => replyJson(message, tableId, columnId, rowId))
remove(rowId).map(_ => replyJson(message, tableId, columnId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, columnId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, columnId, rowId) should be a Long")
}
}

private def messageHandlerSetRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()
val value = obj.getValue("value")

extractTableRow(obj) match {
case Some((tableId, rowId)) => {
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)
put(rowId)(value).map(_ => replyJson(message, tableId, rowId))
}
case None => {
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}
}

private def messageHandlerRetrieveRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()

extractTableRow(obj) match {
case Some((tableId, rowId)) =>
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)

get(rowId).map({
case Some(value) => {
val reply = Json.obj(
"tableId" -> tableId,
"rowId" -> rowId,
"value" -> value
)

println(s"hey, ausm cache raus: $value")
message.reply(reply)
}
case None => {
logger.debug(s"messageHandlerRetrieveRowLevelAnnotations $tableId, $rowId not found")
message.fail(NOT_FOUND_FAILURE, "Not found")
}
})

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}

private def messageHandlerInvalidateRowLevelAnnotations(message: Message[JsonObject]): Unit = {
extractTableRow(message.body()) match {
case Some((tableId, rowId)) =>
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)

remove(rowId).map(_ => replyJson(message, tableId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}

private def messageHandlerInvalidateTableRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()
println(s"caches clear arrived for all tables")

(for {
tableId <- Option(obj.getLong("tableId")).map(_.toLong)
} yield tableId) match {
case Some(tableId) =>
Future.sequence(filterRowLevelAnnotationsCache(tableId)
.map(implicit cache => removeAll()))
.map(_ => {
val reply = Json.obj("tableId" -> tableId)
message.reply(reply)
})

case None =>
logger.error("Message invalid: Fields (tableId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId) should be a Long")
}

// extractTableRow(message.body()) match {
// case Some((tableId, rowId)) =>
// // invalidate cell
// implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)

// remove(rowId).map(_ => replyJson(message, tableId, rowId))

// case None =>
// logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
// message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
// }

// Future.sequence(rowLevelAnnotationsCache.map({
// case ((tableId), cache) =>
// implicit val implicitCache: Cache[AnyRef] = implicitly(cache)

// removeAll().map(_ => rowLevelAnnotationsCache.remove(tableId))
// })).onComplete(_ => {
// rowLevelAnnotationsCache.clear()
// message.reply(Json.emptyObj())
// })
}

private def replyJson(message: Message[JsonObject], tableId: TableId, columnId: ColumnId, rowId: RowId): Unit = {
val reply = Json.obj("tableId" -> tableId, "columnId" -> columnId, "rowId" -> rowId)
message.reply(reply)
Expand Down Expand Up @@ -299,11 +433,10 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}).values
}

private def filterRowPermissionCaches(tableId: TableId) = {
private def filterRowLevelAnnotationsCache(tableId: TableId) = {
// invalidate table
rowPermissionsCaches.filterKeys({
case (cachedTableId) =>
cachedTableId == tableId
rowLevelAnnotationsCache.filterKeys({
case (cachedTableId) => cachedTableId == tableId
}).values
}

Expand Down Expand Up @@ -356,35 +489,25 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}

private def messageHandlerInvalidateAll(message: Message[JsonObject]): Unit = {
Future
.sequence(cellCaches.map({
case ((tableId, columnId), cache) =>
implicit val implicitCache: Cache[AnyRef] = implicitly(cache)

removeAll().map(_ => removeCache(tableId, columnId))
}))
.onComplete(_ => {
cellCaches.clear()
message.reply(Json.emptyObj())
})
Future.sequence(cellCaches.map({
case ((tableId, columnId), cache) =>
implicit val implicitCache: Cache[AnyRef] = implicitly(cache)

removeAll().map(_ => removeCache(tableId, columnId))
})).onComplete(_ => {
cellCaches.clear()
message.reply(Json.emptyObj())
})
}

private def messageHandlerInvalidateRowPermissions(message: Message[JsonObject]): Unit = {
val obj = message.body()

(extractTableRow(obj)) match {
extractTableRow(message.body()) match {
case Some((tableId, rowId)) =>
Future
.sequence(
filterRowPermissionCaches(tableId)
.map(implicit cache => {
remove(rowId)
})
)
.map(_ => {
val reply = Json.obj("tableId" -> tableId, "rowId" -> rowId)
message.reply(reply)
})
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId)
remove(rowId).map(_ => replyJson(message, tableId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ class TableauxModel(
} yield ()
})

_ <- CacheClient(this.connection).invalidateRowLevelAnnotations(table.id, rowId)
} yield ()

};
Expand Down Expand Up @@ -570,6 +571,7 @@ class TableauxModel(
_ <- Future.sequence(
rowSeq.map(row => updateRowAnnotations(table, row.id, finalFlagOpt, archivedFlagOpt))
)
_ <- CacheClient(this.connection).invalidateTableRowLevelAnnotations(table.id)
} yield ()
}

Expand Down Expand Up @@ -970,6 +972,7 @@ class TableauxModel(
}

for {
rowLevelAnnotationsCache <- CacheClient(this.connection).retrieveRowLevelAnnotations(column.table.id, rowId)
valueCache <- CacheClient(this.connection).retrieveCellValue(column.table.id, column.id, rowId)

value <- valueCache match {
Expand Down Expand Up @@ -1018,8 +1021,25 @@ class TableauxModel(
Future.successful(value)
}

// TODO use cache for rowLevelAnnotations
(rowLevelAnnotations, _, _) <- retrieveRowModel.retrieveAnnotations(column.table.id, rowId, Seq(column))
rowLevelAnnotations <- rowLevelAnnotationsCache match {
case Some(annotations) => {
println(s"Cache hit for rowLevelAnnotations for table ${column.table.id} and row $rowId")
Future.successful(annotations)
}
case None => {
for {
(rowLevelAnnotations, _, _) <- retrieveRowModel.retrieveAnnotations(column.table.id, rowId, Seq(column))
} yield {
println(
s"Cache miss for rowLevelAnnotations for table ${column.table.id} and row $rowId, rowLevelAnnotations: $rowLevelAnnotations"
)
// fire-and-forget don't need to wait for this to return
CacheClient(this.connection).setRowLevelAnnotations(column.table.id, rowId, rowLevelAnnotations)
rowLevelAnnotations
}

}
}
} yield {
Cell(column, rowId, resultValue, rowLevelAnnotations)
}
Expand Down

0 comments on commit 041655e

Please sign in to comment.