Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/archive-link-cell-values #285

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/main/resources/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1644,13 +1644,15 @@
"id": 1,
"values": [
"Shimano"
]
],
"final": true
},
{
"id": 2,
"values": [
"Sram "
]
],
"archived": true
}
]
}
Expand Down Expand Up @@ -1713,13 +1715,15 @@
"id": 1,
"values": [
"Shimano"
]
],
"final": true
},
{
"id": 2,
"values": [
"Sram "
]
"Sram"
],
"archived": true
}
]
}
Expand Down Expand Up @@ -2476,6 +2480,9 @@
"allOf": [
{
"$ref": "#/definitions/Property:Status"
},
{
"$ref": "#/definitions/Property:RowLevelAnnotations"
}
],
"properties": {
Expand Down Expand Up @@ -3956,6 +3963,19 @@
"example": 1
}
}
},
"Property:RowLevelAnnotations": {
"type": "object",
"properties": {
"final": {
"type": "boolean",
"example": true
},
"archived": {
"type": "boolean",
"example": true
}
}
}
},
"parameters": {
Expand Down
46 changes: 46 additions & 0 deletions src/main/scala/com/campudus/tableaux/cache/CacheClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,52 @@ class CacheClient(vertxAccess: VertxAccess) extends VertxAccess {
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_PERMISSIONS, obj)
}

def setRowLevelAnnotations(
tableId: TableId,
rowId: RowId,
rowLevelAnnotations: RowLevelAnnotations
): Future[_] = {
val rowValue = Json.obj("value" -> rowLevelAnnotations.getJson)
val obj = rowKey(tableId, rowId).copy().mergeIn(rowValue)
eventBus.sendFuture(CacheVerticle.ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, obj, options)
}

def retrieveRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[Option[RowLevelAnnotations]] = {
val obj = rowKey(tableId, rowId)

eventBus
.sendFuture[JsonObject](CacheVerticle.ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, obj, options)
.map(value => {
value match {
case v if v.body().containsKey("value") => {
val rawRowLevelAnnotations = v.body().getValue("value").asInstanceOf[JsonObject]
val finalFlag = rawRowLevelAnnotations.getBoolean("final", false)
val archivedFlag = rawRowLevelAnnotations.getBoolean("archived", false)
Some(RowLevelAnnotations(finalFlag, archivedFlag))
}
case _ => {
None
}
}
})
.recoverWith({
case ex: ReplyException if ex.failureCode() == CacheVerticle.NOT_FOUND_FAILURE =>
Future.successful(None)
case ex =>
Future.failed(ex)
})
}

def invalidateRowLevelAnnotations(tableId: TableId, rowId: RowId): Future[_] = {
val obj = Json.obj("tableId" -> tableId, "rowId" -> rowId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS, obj)
}

def invalidateTableRowLevelAnnotations(tableId: TableId): Future[_] = {
val obj = Json.obj("tableId" -> tableId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS, obj)
}

def invalidateCellValue(tableId: TableId, columnId: ColumnId, rowId: RowId): Future[_] = {
val obj = cellKey(tableId, columnId, rowId)
eventBus.sendFuture(CacheVerticle.ADDRESS_INVALIDATE_CELL, obj, options)
Expand Down
175 changes: 137 additions & 38 deletions src/main/scala/com/campudus/tableaux/cache/CacheVerticle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,26 @@ object CacheVerticle {
val NOT_FOUND_FAILURE: Int = 404
val INVALID_MESSAGE: Int = 400

// cell based addresses
val ADDRESS_SET_CELL: String = "cache.set.cell"
val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions"
val ADDRESS_RETRIEVE_CELL: String = "cache.retrieve.cell"
val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions"

val ADDRESS_INVALIDATE_CELL: String = "cache.invalidate.cell"
val ADDRESS_INVALIDATE_COLUMN: String = "cache.invalidate.column"
val ADDRESS_INVALIDATE_ROW: String = "cache.invalidate.row"
val ADDRESS_INVALIDATE_TABLE: String = "cache.invalidate.table"
val ADDRESS_INVALIDATE_ALL: String = "cache.invalidate.all"

// row permissions based addresses
val ADDRESS_SET_ROW_PERMISSIONS: String = "cache.set.row_permissions"
val ADDRESS_RETRIEVE_ROW_PERMISSIONS: String = "cache.retrieve.row_permissions"
val ADDRESS_INVALIDATE_ROW_PERMISSIONS: String = "cache.invalidate.row_permissions"

// row level annotations based addresses
val ADDRESS_SET_ROW_LEVEL_ANNOTATIONS: String = "cache.set.row_level_annotations"
val ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS: String = "cache.retrieve.row_level_annotations"
val ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.row_level_annotations"
val ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS: String = "cache.invalidate.table.row_level_annotations"

val TIMEOUT_AFTER_MILLISECONDS: Int = 400

def apply(tableauxConfig: TableauxConfig): CacheVerticle = {
Expand All @@ -62,10 +70,11 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L

type CellCaches = mutable.Map[(TableId, ColumnId), Cache[AnyRef]]
type RowPermissionsCaches = mutable.Map[(TableId), Cache[AnyRef]]
type Caches = Either[CellCaches, RowPermissionsCaches]
type RowLevelAnnotationsCache = mutable.Map[(TableId), Cache[AnyRef]]

private val cellCaches: CellCaches = mutable.Map.empty
private val rowPermissionsCaches: RowPermissionsCaches = mutable.Map.empty
private val rowLevelAnnotationsCache: RowLevelAnnotationsCache = mutable.Map.empty

override def startFuture(): Future[_] = {
logger.info(
Expand All @@ -84,16 +93,31 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
private def registerOnEventBus(): Future[_] = {
Future.sequence(
Seq(
// cell
registerHandler(eventBus, ADDRESS_SET_CELL, messageHandlerSetCell),
registerHandler(eventBus, ADDRESS_RETRIEVE_CELL, messageHandlerRetrieveCell),
registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions),
registerHandler(eventBus, ADDRESS_INVALIDATE_CELL, messageHandlerInvalidateCell),
registerHandler(eventBus, ADDRESS_INVALIDATE_COLUMN, messageHandlerInvalidateColumn),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW, messageHandlerInvalidateRow),
registerHandler(eventBus, ADDRESS_INVALIDATE_TABLE, messageHandlerInvalidateTable),
registerHandler(eventBus, ADDRESS_INVALIDATE_ALL, messageHandlerInvalidateAll),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions)
// row permissions
registerHandler(eventBus, ADDRESS_SET_ROW_PERMISSIONS, messageHandlerSetRowPermissions),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_PERMISSIONS, messageHandlerRetrieveRowPermissions),
registerHandler(eventBus, ADDRESS_INVALIDATE_ROW_PERMISSIONS, messageHandlerInvalidateRowPermissions),
// row level annotations
registerHandler(eventBus, ADDRESS_SET_ROW_LEVEL_ANNOTATIONS, messageHandlerSetRowLevelAnnotations),
registerHandler(eventBus, ADDRESS_RETRIEVE_ROW_LEVEL_ANNOTATIONS, messageHandlerRetrieveRowLevelAnnotations),
registerHandler(
eventBus,
ADDRESS_INVALIDATE_ROW_LEVEL_ANNOTATIONS,
messageHandlerInvalidateRowLevelAnnotations
),
registerHandler(
eventBus,
ADDRESS_INVALIDATE_TABLE_ROW_LEVEL_ANNOTATIONS,
messageHandlerInvalidateTableRowLevelAnnotations
)
)
)
}
Expand Down Expand Up @@ -135,6 +159,16 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}
}

private def getRowLevelAnnotationsCache(tableId: TableId): Cache[AnyRef] = {
rowLevelAnnotationsCache.get(tableId) match {
case Some(cache) => cache
case None =>
val cache: Cache[AnyRef] = GuavaCache(createCache())
rowLevelAnnotationsCache.put((tableId), cache)
cache
}
}

private def removeCache(tableId: TableId, columnId: ColumnId): Unit = cellCaches.remove((tableId, columnId))

private def extractTableColumnRow(obj: JsonObject): Option[(TableId, ColumnId, RowId)] = {
Expand Down Expand Up @@ -203,7 +237,6 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
extractTableRow(obj) match {
case Some((tableId, rowId)) => {
implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId)
logger.info(s"messageHandlerSetRowPermissions $obj $tableId $rowId $value")
put(rowId)(value).map(_ => replyJson(message, tableId, rowId))
}
case None => {
Expand Down Expand Up @@ -247,16 +280,93 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
case Some((tableId, columnId, rowId)) =>
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getCellCache(tableId, columnId)

remove(rowId)
.map(_ => replyJson(message, tableId, columnId, rowId))
remove(rowId).map(_ => replyJson(message, tableId, columnId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, columnId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, columnId, rowId) should be a Long")
}
}

private def messageHandlerSetRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()
val value = obj.getValue("value")

extractTableRow(obj) match {
case Some((tableId, rowId)) => {
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)
put(rowId)(value).map(_ => replyJson(message, tableId, rowId))
}
case None => {
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}
}

private def messageHandlerRetrieveRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()

extractTableRow(obj) match {
case Some((tableId, rowId)) =>
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)

get(rowId).map({
case Some(value) => {
val reply = Json.obj(
"tableId" -> tableId,
"rowId" -> rowId,
"value" -> value
)

message.reply(reply)
}
case None => {
logger.debug(s"messageHandlerRetrieveRowLevelAnnotations $tableId, $rowId not found")
message.fail(NOT_FOUND_FAILURE, "Not found")
}
})

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}

private def messageHandlerInvalidateRowLevelAnnotations(message: Message[JsonObject]): Unit = {
extractTableRow(message.body()) match {
case Some((tableId, rowId)) =>
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getRowLevelAnnotationsCache(tableId)

remove(rowId).map(_ => replyJson(message, tableId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId, rowId) should be a Long")
}
}

private def messageHandlerInvalidateTableRowLevelAnnotations(message: Message[JsonObject]): Unit = {
val obj = message.body()

(for {
tableId <- Option(obj.getLong("tableId")).map(_.toLong)
} yield tableId) match {
case Some(tableId) =>
Future.sequence(filterRowLevelAnnotationsCache(tableId)
.map(implicit cache => removeAll()))
.map(_ => {
val reply = Json.obj("tableId" -> tableId)
message.reply(reply)
})

case None =>
logger.error("Message invalid: Fields (tableId) should be a Long")
message.fail(INVALID_MESSAGE, "Message invalid: Fields (tableId) should be a Long")
}
}

private def replyJson(message: Message[JsonObject], tableId: TableId, columnId: ColumnId, rowId: RowId): Unit = {
val reply = Json.obj("tableId" -> tableId, "columnId" -> columnId, "rowId" -> rowId)
message.reply(reply)
Expand Down Expand Up @@ -299,11 +409,10 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}).values
}

private def filterRowPermissionCaches(tableId: TableId) = {
private def filterRowLevelAnnotationsCache(tableId: TableId) = {
// invalidate table
rowPermissionsCaches.filterKeys({
case (cachedTableId) =>
cachedTableId == tableId
rowLevelAnnotationsCache.filterKeys({
case (cachedTableId) => cachedTableId == tableId
}).values
}

Expand Down Expand Up @@ -356,35 +465,25 @@ class CacheVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle with L
}

private def messageHandlerInvalidateAll(message: Message[JsonObject]): Unit = {
Future
.sequence(cellCaches.map({
case ((tableId, columnId), cache) =>
implicit val implicitCache: Cache[AnyRef] = implicitly(cache)

removeAll().map(_ => removeCache(tableId, columnId))
}))
.onComplete(_ => {
cellCaches.clear()
message.reply(Json.emptyObj())
})
Future.sequence(cellCaches.map({
case ((tableId, columnId), cache) =>
implicit val implicitCache: Cache[AnyRef] = implicitly(cache)

removeAll().map(_ => removeCache(tableId, columnId))
})).onComplete(_ => {
cellCaches.clear()
message.reply(Json.emptyObj())
})
}

private def messageHandlerInvalidateRowPermissions(message: Message[JsonObject]): Unit = {
val obj = message.body()

(extractTableRow(obj)) match {
extractTableRow(message.body()) match {
case Some((tableId, rowId)) =>
Future
.sequence(
filterRowPermissionCaches(tableId)
.map(implicit cache => {
remove(rowId)
})
)
.map(_ => {
val reply = Json.obj("tableId" -> tableId, "rowId" -> rowId)
message.reply(reply)
})
// invalidate cell
implicit val scalaCache: Cache[AnyRef] = getRowPermissionsCache(tableId)
remove(rowId).map(_ => replyJson(message, tableId, rowId))

case None =>
logger.error("Message invalid: Fields (tableId, rowId) should be a Long")
Expand Down
Loading
Loading