diff --git a/src/main/scala/com/campudus/tableaux/cache/CacheClient.scala b/src/main/scala/com/campudus/tableaux/cache/CacheClient.scala index 3a9cae08..b5252be9 100644 --- a/src/main/scala/com/campudus/tableaux/cache/CacheClient.scala +++ b/src/main/scala/com/campudus/tableaux/cache/CacheClient.scala @@ -96,6 +96,53 @@ class CacheClient(vertxAccess: VertxAccess) extends VertxAccess { eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_PERMISSIONS, obj) } + def setRowLevelAnnotations( + tableId: TableId, + rowId: RowId, + rowLevelAnnotations: RowLevelAnnotations + ): Future[_] = { + val rowValue = Json.obj("value" -> rowLevelAnnotations.getJson) + val obj = rowKey(tableId, rowId).copy().mergeIn(rowValue) + println(s"set cache value for row level annotations: $obj") + eventBus.sendFuture(CacheVerticle.ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, obj, options) + } + + def retrieveRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[Option[RowLevelAnnotations]] = { + val obj = rowKey(tableId, rowId) + + eventBus + .sendFuture[JsonObject](CacheVerticle.ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, obj, options) + .map(value => { + value match { + case v if v.body().containsKey("value") => { + val rawRowLevelAnnotations = v.body().getValue("value").asInstanceOf[JsonObject] + val finalFlag = rawRowLevelAnnotations.getBoolean("final", false) + val archivedFlag = rawRowLevelAnnotations.getBoolean("archived", false) + Some(RowLevelAnnotations(finalFlag, archivedFlag)) + } + case _ => { + None + } + } + }) + .recoverWith({ + case ex: ReplyException if ex.failureCode() == CacheVerticle.NOT_FOUND_FAILURE => + Future.successful(None) + case ex => + Future.failed(ex) + }) + } + + def invalidateRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[_] = { + val obj = Json.obj("tableId" -> tableId, "rowId" -> rowId) + eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS, obj) + } + + def invalidateTableRowLevelAnnotations(tableId: TableId): Future[_] = { + val obj = Json.obj("tableId" -> tableId) + eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS, obj) + } + def invalidateCellValue(tableId: TableId, columnId: ColumnId, rowId: RowId): Future[_] = { val obj = cellKey(tableId, columnId, rowId) eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_CELL, obj, options) diff --git a/src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala b/src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala index 0b0c981d..6e4d8e85 100644 --- a/src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala +++ b/src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala @@ -35,18 +35,26 @@ object CacheVerticle { val NOT_FOUND_FAILURE: Int = 404 val INVALID_MESSAGE: Int = 400 + // cell based addresses val ADDRESS_SET_CELL: String = "cache.set.cell" - val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions" val ADDRESS_RETRIEVE_CELL: String = "cache.retrieve.cell" - val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions" - val ADDRESS_INVALIDATE_CELL: String = "cache.invalidate.cell" val ADDRESS_INVALIDATE_COLUMN: String = "cache.invalidate.column" val ADDRESS_INVALIDATE_ROW: String = "cache.invalidate.row" val ADDRESS_INVALIDATE_TABLE: String = "cache.invalidate.table" val ADDRESS_INVALIDATE_ALL: String = "cache.invalidate.all" + + // row permissions based addresses + val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions" + val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions" val ADDRESS_INVALIDATE_ROW_PERMISSIONS: String = "cache.invalidate.row_permissions" + // row level annotations based addresses + val ADDRESS_SET_ROW_LEVEL_ANNOTATIONS: String = "cache.set.row_level_annotations" + val ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS: String = "cache.retrieve.row_level_annotations" + val ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.row_level_annotations" + val ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.table.row_level_annotations" + val TIMEOUT_AFTER_MILLISECONDS: Int = 400 def apply(tableauxConfig: TableauxConfig): CacheVerticle = { @@ -62,10 +70,11 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L type CellCaches = mutable.Map[(TableId, ColumnId), Cache[AnyRef]] type RowPermissionsCaches = mutable.Map[(TableId), Cache[AnyRef]] - type Caches = Either[CellCaches, RowPermissionsCaches] + type RowLevelAnnotationsCache = mutable.Map[(TableId), Cache[AnyRef]] private val cellCaches: CellCaches = mutable.Map.empty private val rowPermissionsCaches: RowPermissionsCaches = mutable.Map.empty + private val rowLevelAnnotationsCache: RowLevelAnnotationsCache = mutable.Map.empty override def startFuture(): Future[_] = { logger.info( @@ -84,16 +93,31 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L private def registerOnEventBus(): Future[_] = { Future.sequence( Seq( + // cell registerHandler(eventBus, ADDRESS_SET_CELL, messageHandlerSetCell), registerHandler(eventBus, ADDRESS_RETRIEVE_CELL, messageHandlerRetrieveCell), - registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions), - registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions), registerHandler(eventBus, ADDRESS_INVALIDATE_CELL, messageHandlerInvalidateCell), registerHandler(eventBus, ADDRESS_INVALIDATE_COLUMN, messageHandlerInvalidateColumn), registerHandler(eventBus, ADDRESS_INVALIDATE_ROW, messageHandlerInvalidateRow), registerHandler(eventBus, ADDRESS_INVALIDATE_TABLE, messageHandlerInvalidateTable), registerHandler(eventBus, ADDRESS_INVALIDATE_ALL, messageHandlerInvalidateAll), - registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions) + // row permissions + registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions), + registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions), + registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions), + // row level annotations + registerHandler(eventBus, ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, messageHandlerSetRowLevelAnnotations), + registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, messageHandlerRetrieveRowLevelAnnotations), + registerHandler( + eventBus, + ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS, + messageHandlerInvalidateRowLevelAnnotations + ), + registerHandler( + eventBus, + ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS, + messageHandlerInvalidateTableRowLevelAnnotations + ) ) ) } @@ -135,6 +159,16 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L } } + private def getRowLevelAnnotationsCache(tableId: TableId): Cache[AnyRef] = { + rowLevelAnnotationsCache.get(tableId) match { + case Some(cache) => cache + case None => + val cache: Cache[AnyRef] = GuavaCache(createCache()) + rowLevelAnnotationsCache.put((tableId), cache) + cache + } + } + private def removeCache(tableId: TableId, columnId: ColumnId): Unit = cellCaches.remove((tableId, columnId)) private def extractTableColumnRow(obj: JsonObject): Option[(TableId, ColumnId, RowId)] = { @@ -203,7 +237,6 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L extractTableRow(obj) match { case Some((tableId, rowId)) => { implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId) - logger.info(s"messageHandlerSetRowPermissions $obj $tableId $rowId $value") put(rowId)(value).map(_ => replyJson(message, tableId, rowId)) } case None => { @@ -247,9 +280,7 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L case Some((tableId, columnId, rowId)) => // invalidate cell implicit val scalaCache: Cache[AnyRef] = getCellCache(tableId, columnId) - - remove(rowId) - .map(_ => replyJson(message, tableId, columnId, rowId)) + remove(rowId).map(_ => replyJson(message, tableId, columnId, rowId)) case None => logger.error("Message invalid: Fields (tableId, columnId, rowId) should be a Long") @@ -257,6 +288,109 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L } } + private def messageHandlerSetRowLevelAnnotations(message: Message[JsonObject]): Unit = { + val obj = message.body() + val value = obj.getValue("value") + + extractTableRow(obj) match { + case Some((tableId, rowId)) => { + implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId) + put(rowId)(value).map(_ => replyJson(message, tableId, rowId)) + } + case None => { + logger.error("Message invalid: Fields (tableId, rowId) should be a Long") + message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long") + } + } + } + + private def messageHandlerRetrieveRowLevelAnnotations(message: Message[JsonObject]): Unit = { + val obj = message.body() + + extractTableRow(obj) match { + case Some((tableId, rowId)) => + implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId) + + get(rowId).map({ + case Some(value) => { + val reply = Json.obj( + "tableId" -> tableId, + "rowId" -> rowId, + "value" -> value + ) + + println(s"hey, ausm cache raus: $value") + message.reply(reply) + } + case None => { + logger.debug(s"messageHandlerRetrieveRowLevelAnnotations $tableId, $rowId not found") + message.fail(NOT_FOUND_FAILURE, "Not found") + } + }) + + case None => + logger.error("Message invalid: Fields (tableId, rowId) should be a Long") + message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long") + } + } + + private def messageHandlerInvalidateRowLevelAnnotations(message: Message[JsonObject]): Unit = { + extractTableRow(message.body()) match { + case Some((tableId, rowId)) => + // invalidate cell + implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId) + + remove(rowId).map(_ => replyJson(message, tableId, rowId)) + + case None => + logger.error("Message invalid: Fields (tableId, rowId) should be a Long") + message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long") + } + } + + private def messageHandlerInvalidateTableRowLevelAnnotations(message: Message[JsonObject]): Unit = { + val obj = message.body() + println(s"caches clear arrived for all tables") + + (for { + tableId <- Option(obj.getLong("tableId")).map(_.toLong) + } yield tableId) match { + case Some(tableId) => + Future.sequence(filterRowLevelAnnotationsCache(tableId) + .map(implicit cache => removeAll())) + .map(_ => { + val reply = Json.obj("tableId" -> tableId) + message.reply(reply) + }) + + case None => + logger.error("Message invalid: Fields (tableId) should be a Long") + message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId) should be a Long") + } + + // extractTableRow(message.body()) match { + // case Some((tableId, rowId)) => + // // invalidate cell + // implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId) + + // remove(rowId).map(_ => replyJson(message, tableId, rowId)) + + // case None => + // logger.error("Message invalid: Fields (tableId, rowId) should be a Long") + // message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long") + // } + + // Future.sequence(rowLevelAnnotationsCache.map({ + // case ((tableId), cache) => + // implicit val implicitCache: Cache[AnyRef] = implicitly(cache) + + // removeAll().map(_ => rowLevelAnnotationsCache.remove(tableId)) + // })).onComplete(_ => { + // rowLevelAnnotationsCache.clear() + // message.reply(Json.emptyObj()) + // }) + } + private def replyJson(message: Message[JsonObject], tableId: TableId, columnId: ColumnId, rowId: RowId): Unit = { val reply = Json.obj("tableId" -> tableId, "columnId" -> columnId, "rowId" -> rowId) message.reply(reply) @@ -299,11 +433,10 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L }).values } - private def filterRowPermissionCaches(tableId: TableId) = { + private def filterRowLevelAnnotationsCache(tableId: TableId) = { // invalidate table - rowPermissionsCaches.filterKeys({ - case (cachedTableId) => - cachedTableId == tableId + rowLevelAnnotationsCache.filterKeys({ + case (cachedTableId) => cachedTableId == tableId }).values } @@ -356,35 +489,25 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L } private def messageHandlerInvalidateAll(message: Message[JsonObject]): Unit = { - Future - .sequence(cellCaches.map({ - case ((tableId, columnId), cache) => - implicit val implicitCache: Cache[AnyRef] = implicitly(cache) - - removeAll().map(_ => removeCache(tableId, columnId)) - })) - .onComplete(_ => { - cellCaches.clear() - message.reply(Json.emptyObj()) - }) + Future.sequence(cellCaches.map({ + case ((tableId, columnId), cache) => + implicit val implicitCache: Cache[AnyRef] = implicitly(cache) + + removeAll().map(_ => removeCache(tableId, columnId)) + })).onComplete(_ => { + cellCaches.clear() + message.reply(Json.emptyObj()) + }) } private def messageHandlerInvalidateRowPermissions(message: Message[JsonObject]): Unit = { val obj = message.body() - (extractTableRow(obj)) match { + extractTableRow(message.body()) match { case Some((tableId, rowId)) => - Future - .sequence( - filterRowPermissionCaches(tableId) - .map(implicit cache => { - remove(rowId) - }) - ) - .map(_ => { - val reply = Json.obj("tableId" -> tableId, "rowId" -> rowId) - message.reply(reply) - }) + // invalidate cell + implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId) + remove(rowId).map(_ => replyJson(message, tableId, rowId)) case None => logger.error("Message invalid: Fields (tableId, rowId) should be a Long") diff --git a/src/main/scala/com/campudus/tableaux/database/model/TableauxModel.scala b/src/main/scala/com/campudus/tableaux/database/model/TableauxModel.scala index ca7cd26c..0d3e73d6 100644 --- a/src/main/scala/com/campudus/tableaux/database/model/TableauxModel.scala +++ b/src/main/scala/com/campudus/tableaux/database/model/TableauxModel.scala @@ -538,6 +538,7 @@ class TableauxModel( } yield () }) + _ <- CacheClient(this.connection).invalidateRowLevelAnnotations(table.id, rowId) } yield () }; @@ -570,6 +571,7 @@ class TableauxModel( _ <- Future.sequence( rowSeq.map(row => updateRowAnnotations(table, row.id, finalFlagOpt, archivedFlagOpt)) ) + _ <- CacheClient(this.connection).invalidateTableRowLevelAnnotations(table.id) } yield () } @@ -970,6 +972,7 @@ class TableauxModel( } for { + rowLevelAnnotationsCache <- CacheClient(this.connection).retrieveRowLevelAnnotations(column.table.id, rowId) valueCache <- CacheClient(this.connection).retrieveCellValue(column.table.id, column.id, rowId) value <- valueCache match { @@ -1018,8 +1021,25 @@ class TableauxModel( Future.successful(value) } - // TODO use cache for rowLevelAnnotations - (rowLevelAnnotations, _, _) <- retrieveRowModel.retrieveAnnotations(column.table.id, rowId, Seq(column)) + rowLevelAnnotations <- rowLevelAnnotationsCache match { + case Some(annotations) => { + println(s"Cache hit for rowLevelAnnotations for table ${column.table.id} and row $rowId") + Future.successful(annotations) + } + case None => { + for { + (rowLevelAnnotations, _, _) <- retrieveRowModel.retrieveAnnotations(column.table.id, rowId, Seq(column)) + } yield { + println( + s"Cache miss for rowLevelAnnotations for table ${column.table.id} and row $rowId, rowLevelAnnotations: $rowLevelAnnotations" + ) + // fire-and-forget don't need to wait for this to return + CacheClient(this.connection).setRowLevelAnnotations(column.table.id, rowId, rowLevelAnnotations) + rowLevelAnnotations + } + + } + } } yield { Cell(column, rowId, resultValue, rowLevelAnnotations) }