Skip to content

Commit

Permalink
minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaroslav Bobrowski committed Nov 14, 2024
1 parent cba1dda commit d276b48
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 67 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/com/campudus/tableaux/TableauxConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class TableauxConfig(
override val vertx: Vertx,
val authConfig: JsonObject,
val databaseConfig: JsonObject,
workingDirectory: String,
uploadsDirectory: String,
val workingDirectory: String,
val uploadsDirectory: String,
val rolePermissions: JsonObject,
val openApiUrl: Option[String] = None,
val isPublicFileServerEnabled: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class TableauxController(
for {
createdRows <- repository.createRows(table, seq, rowPermissionsOpt)
} yield {
createdRows.rows.foreach(singleCreatedRow => messagingClient.rowCreated(tableId, singleCreatedRow.id))
createdRows.rows.foreach(row => messagingClient.rowCreated(tableId, row.id))
createdRows
}
case None =>
Expand Down
20 changes: 14 additions & 6 deletions src/main/scala/com/campudus/tableaux/router/SystemRouter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class SystemRouter(override val config: TableauxConfig, val controller: SystemCo
service <-
controller.createService(name, serviceType, ordering, displayName, description, active, config, scope)
} yield {
messagingClient.servicesChange()
messagingClient.servicesChanged()
service
}
}
Expand All @@ -292,7 +292,6 @@ class SystemRouter(override val config: TableauxConfig, val controller: SystemCo
for {
serviceId <- getServiceId(context)
} yield {
messagingClient.servicesChange()
sendReply(
context,
asyncGetReply {
Expand All @@ -308,8 +307,13 @@ class SystemRouter(override val config: TableauxConfig, val controller: SystemCo
val config = getNullableObject("config")(json)
val scope = getNullableObject("scope")(json)

controller
.updateService(serviceId, name, serviceType, ordering, displayName, description, active, config, scope)
for {
service <- controller
.updateService(serviceId, name, serviceType, ordering, displayName, description, active, config, scope)
} yield {
messagingClient.servicesChanged()
service
}
}
)
}
Expand All @@ -323,11 +327,15 @@ class SystemRouter(override val config: TableauxConfig, val controller: SystemCo
for {
serviceId <- getServiceId(context)
} yield {
messagingClient.servicesChange()
sendReply(
context,
asyncGetReply {
controller.deleteService(serviceId)
for {
service <- controller.deleteService(serviceId)
} yield {
messagingClient.servicesChanged()
service
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,44 @@ object MessagingVerticle {
val KEY_TABLE_ID = "tableId"
val KEY_COLUMN_ID = "columnId"
val KEY_ROW_ID = "rowId"
val ID_KEYS: Seq[String] = Seq(KEY_TABLE_ID, KEY_COLUMN_ID, KEY_ROW_ID)

val ADDRESS_CELL_CHANGED = "message.cell.change"
val ADDRESS_SERVICES_CHANGE = "message.services.change"
val ADDRESS_COLUMN_CREATED = "message.columns.created"
val ADDRESS_COLUMN_CHANGED = "message.columns.changed"
val ADDRESS_COLUMN_DELETED = "message.columns.deleted"

val ADDRESS_TABLE_CREATED = "message.tables.created"
val ADDRESS_TABLE_CHANGED = "message.tables.changed"
val ADDRESS_TABLE_DELETED = "message.tables.deleted"

val ADDRESS_ROW_DELETED = "message.rows.deleted"
val ADDRESS_ROW_CREATED = "message.rows.created"
val ADDRESS_ROW_ANNOTATION_CHANGED = "message.rows.annotation.changed"

val ADDRESS_CELL_ANNOTATION_CHANGED = "message.cells.annotation.changed"
val ID_KEYS: Seq[String] = Seq(KEY_TABLE_ID, KEY_COLUMN_ID, KEY_ROW_ID)

val EVENT_TYPE_TABLE_CREATE = "table_create"
val EVENT_TYPE_TABLE_CHANGE = "table_change"
val EVENT_TYPE_TABLE_DELETE = "table_delete"
val EVENT_TYPE_ROW_CREATED = "row_create"
val EVENT_TYPE_ROW_DELETE = "row_delete"
val ADDRESS_CELL_CHANGED = "message.cell.changed"
val ADDRESS_SERVICES_CHANGED = "message.services.changed"
val ADDRESS_COLUMN_CREATED = "message.column.created"
val ADDRESS_COLUMN_CHANGED = "message.column.changed"
val ADDRESS_COLUMN_DELETED = "message.column.deleted"
val ADDRESS_TABLE_CREATED = "message.table.created"
val ADDRESS_TABLE_CHANGED = "message.table.changed"
val ADDRESS_TABLE_DELETED = "message.table.deleted"
val ADDRESS_ROW_CREATED = "message.row.created"
val ADDRESS_ROW_DELETED = "message.row.deleted"
val ADDRESS_ROW_ANNOTATION_CHANGED = "message.row.annotation.changed"
val ADDRESS_CELL_ANNOTATION_CHANGED = "message.cell.annotation.changed"

val EVENT_TYPE_TABLE_CREATED = "table_created"
val EVENT_TYPE_TABLE_CHANGED = "table_changed"
val EVENT_TYPE_TABLE_DELETED = "table_deleted"
val EVENT_TYPE_ROW_CREATED = "row_created"
val EVENT_TYPE_ROW_DELETED = "row_deleted"
val EVENT_TYPE_ROW_ANNOTATION_CHANGED = "row_annotation_changed"
val EVENT_TYPE_COLUMN_CREATE = "column_create"
val EVENT_TYPE_COLUMN_CHANGE = "column_change"
val EVENT_TYPE_COLUMN_DELETE = "column_delete"
val EVENT_TYPE_COLUMN_CREATED = "column_created"
val EVENT_TYPE_COLUMN_CHANGED = "column_changed"
val EVENT_TYPE_COLUMN_DELETED = "column_deleted"
val EVENT_TYPE_CELL_ANNOTATION_CHANGED = "cell_annotation_changed"
val EVENT_TYPE_CELL_CHANGED = "cell_changed"

val eventTypes: Seq[String] = Seq(
EVENT_TYPE_TABLE_CREATE,
EVENT_TYPE_TABLE_CHANGE,
EVENT_TYPE_TABLE_DELETE,
EVENT_TYPE_TABLE_CREATED,
EVENT_TYPE_TABLE_CHANGED,
EVENT_TYPE_TABLE_DELETED,
EVENT_TYPE_ROW_CREATED,
EVENT_TYPE_ROW_DELETE,
EVENT_TYPE_ROW_DELETED,
EVENT_TYPE_ROW_ANNOTATION_CHANGED,
EVENT_TYPE_COLUMN_CREATE,
EVENT_TYPE_COLUMN_CHANGE,
EVENT_TYPE_COLUMN_DELETE,
EVENT_TYPE_COLUMN_CREATED,
EVENT_TYPE_COLUMN_CHANGED,
EVENT_TYPE_COLUMN_DELETED,
EVENT_TYPE_CELL_CHANGED,
EVENT_TYPE_CELL_ANNOTATION_CHANGED
)
Expand Down Expand Up @@ -108,14 +106,17 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle

service.serviceType.toString == ServiceType.LISTENER && hasRequiredConfigValues
})

val listenerMap = eventTypes.foldLeft[Map[String, Seq[Service]]](Map()) { (acc, eventType) =>
{
val listenersWithEventType = listeners.filter(service => {
service.config.getJsonArray("events", Json.arr()).asScala.toSeq.contains(eventType)
})

acc + (eventType -> listenersWithEventType)
}
}

listenerMap
}
}
Expand All @@ -126,6 +127,7 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
maybeColumnJson: Option[JsonObject]
): Seq[Service] = {
val listenersForEventType = listeners getOrElse (eventType, Seq())

val getFilterRules: (String, String, JsonObject) => Seq[JsonObject] =
(objectType: String, ruleType: String, scope: JsonObject) => {
scope.getJsonObject(objectType, Json.obj()).getJsonArray(ruleType, Json.arr()).asScala.toSeq.map(obj => {
Expand Down Expand Up @@ -163,6 +165,7 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
val tableIncludes = getFilterRules("tables", "includes", scope)
val tableExcludes = getFilterRules("tables", "excludes", scope)
val shouldIncludeTable = applyRules(tableIncludes, table) && !applyRules(tableExcludes, table)

maybeColumnJson match {
case Some(column) =>
val columnIncludes = getFilterRules("columns", "includes", scope)
Expand All @@ -180,12 +183,14 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
}

val filtered = listenersForEventType.filter(filterFunction)

filtered

}

override def startFuture(): Future[_] = {
logger.info("start future")

val isAuthorization: Boolean = !tableauxConfig.authConfig.isEmpty
val roles = tableauxConfig.rolePermissions

Expand All @@ -194,6 +199,7 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
val vertxAccess = new VertxAccess {
override val vertx: Vertx = MessagingVerticle.this.vertx
}

val connection = SQLConnection(vertxAccess, tableauxConfig.databaseConfig)
val dbConnection = DatabaseConnection(vertxAccess, connection)

Expand All @@ -216,7 +222,7 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
eventBus.consumer(address, errorHandler(handler) _).completionFuture()
}

listen(ADDRESS_CELL_CHANGED, messageHandlerCellChange)
listen(ADDRESS_CELL_CHANGED, messageHandlerCellChanged)
listen(ADDRESS_COLUMN_CREATED, messageHandlerColumnCreated())
listen(ADDRESS_COLUMN_CHANGED, messageHandlerColumnChanged)
listen(ADDRESS_COLUMN_DELETED, messageHandlerColumnDeleted)
Expand All @@ -228,7 +234,7 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
listen(ADDRESS_ROW_ANNOTATION_CHANGED, messageHandlerRowAnnotationChanged)
listen(ADDRESS_CELL_ANNOTATION_CHANGED, messageHandlerCellAnnotationChanged)

eventBus.consumer(ADDRESS_SERVICES_CHANGE, messageHandlerUpdateListeners).completionFuture()
eventBus.consumer(ADDRESS_SERVICES_CHANGED, messageHandlerUpdateListeners).completionFuture()
}

private def messageHandlerUpdateListeners(message: Message[JsonObject]): Unit = {
Expand All @@ -240,17 +246,20 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
}
}

private def messageHandlerCellChange(message: Message[JsonObject]): Future[Seq[Any]] = {
private def messageHandlerCellChanged(message: Message[JsonObject]): Future[Seq[Any]] = {
implicit val user: TableauxUser = TableauxUser("Messaging Verticle", Seq("dev"))

val body = message.body()
val tableId = body.getLong("tableId").asInstanceOf[TableId]
val columnId = body.getLong("columnId").asInstanceOf[ColumnId]
val rowId = body.getLong("rowId").asInstanceOf[RowId]

for {
table <- tableauxModel.retrieveTable(tableId, isInternalCall = true)
column <- tableauxModel.retrieveColumn(table, columnId)
cell <- tableauxModel.retrieveCell(table, column.id, rowId)
dependentCells <- tableauxModel.retrieveDependentCells(table, rowId)

dependentCellValues <- Future.sequence(dependentCells.flatMap {
case (table, linkColumn, rowIds) =>
rowIds.map(rowId => {
Expand All @@ -259,8 +268,10 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
})
})
})

data = Json.obj("cell" -> cell.getJson, "dependentCells" -> dependentCellValues)
listeners = getApplicableListeners(EVENT_TYPE_CELL_CHANGED, Some(table.getJson), Some(column.getJson))

listenerResponses <-
sendMessage(listeners, createPayload(EVENT_TYPE_CELL_CHANGED, Some(tableId), Some(columnId), Some(rowId), data))
} yield {
Expand Down Expand Up @@ -310,13 +321,14 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
}

private def messageHandlerColumnCreated(
eventType: String = EVENT_TYPE_COLUMN_CREATE
eventType: String = EVENT_TYPE_COLUMN_CREATED
)(
message: Message[JsonObject]
): Future[Seq[Any]] = {
val body = message.body()
val tableId = body.getLong("tableId").asInstanceOf[TableId]
val columnId = body.getLong("columnId")

for {
table <- tableauxModel.retrieveTable(tableId, isInternalCall = true)
column <- tableauxModel.retrieveColumn(table, columnId)
Expand All @@ -330,12 +342,13 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
}

private def messageHandlerTableCreated(
eventType: String = EVENT_TYPE_TABLE_CREATE
eventType: String = EVENT_TYPE_TABLE_CREATED
)(
message: Message[JsonObject]
): Future[Seq[Any]] = {
val body = message.body()
val tableId = body.getLong("tableId").asInstanceOf[TableId]

for {
table <- tableauxModel.retrieveTable(tableId, isInternalCall = true)
listeners = getApplicableListeners(eventType, Some(table.getJson), None)
Expand All @@ -348,11 +361,11 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
}

private def messageHandlerTableChanged(message: Message[JsonObject]): Future[Seq[Any]] = {
messageHandlerTableCreated(EVENT_TYPE_TABLE_CHANGE)(message)
messageHandlerTableCreated(EVENT_TYPE_TABLE_CHANGED)(message)
}

private def messageHandlerColumnChanged(message: Message[JsonObject]): Future[Seq[Any]] = {
messageHandlerColumnCreated(EVENT_TYPE_COLUMN_CHANGE)(message)
messageHandlerColumnCreated(EVENT_TYPE_COLUMN_CHANGED)(message)
}

private def getJsonValueAsOption[A](key: String, obj: JsonObject): Option[A] = {
Expand All @@ -376,19 +389,22 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
val body = message.body()
val (tableId, columnId, rowId) = getIds(body)
val deletedTable = body.getJsonObject("table", Json.obj())
val listeners = getApplicableListeners(EVENT_TYPE_TABLE_DELETE, Some(deletedTable), None)
sendMessage(listeners, createPayload(EVENT_TYPE_TABLE_DELETE, tableId, columnId, rowId, deletedTable))
val listeners = getApplicableListeners(EVENT_TYPE_TABLE_DELETED, Some(deletedTable), None)

sendMessage(listeners, createPayload(EVENT_TYPE_TABLE_DELETED, tableId, columnId, rowId, deletedTable))
}

private def messageHandlerColumnDeleted(message: Message[JsonObject]): Future[Seq[Any]] = {
val body = message.body()
val (tableId, columnId, rowId) = getIds(body)
val deletedColumn = body.getJsonObject("column", Json.obj())

for {
table <- tableauxModel.retrieveTable(tableId.get, isInternalCall = true)
listeners = getApplicableListeners(EVENT_TYPE_COLUMN_DELETE, Some(table.getJson), Some(deletedColumn))
listeners = getApplicableListeners(EVENT_TYPE_COLUMN_DELETED, Some(table.getJson), Some(deletedColumn))

listenerResponses <-
sendMessage(listeners, createPayload(EVENT_TYPE_COLUMN_DELETE, tableId, columnId, rowId, deletedColumn))
sendMessage(listeners, createPayload(EVENT_TYPE_COLUMN_DELETED, tableId, columnId, rowId, deletedColumn))
} yield {
listenerResponses
}
Expand All @@ -399,8 +415,8 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
val (tableId, columnId, rowId) = getIds(body)
for {
table <- tableauxModel.retrieveTable(tableId.get, isInternalCall = true)
listeners = getApplicableListeners(EVENT_TYPE_ROW_DELETE, Some(table.getJson), None)
listenerResponses <- sendMessage(listeners, createPayload(EVENT_TYPE_ROW_DELETE, tableId, columnId, rowId))
listeners = getApplicableListeners(EVENT_TYPE_ROW_DELETED, Some(table.getJson), None)
listenerResponses <- sendMessage(listeners, createPayload(EVENT_TYPE_ROW_DELETED, tableId, columnId, rowId))
} yield {
listenerResponses
}
Expand All @@ -409,10 +425,12 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
private def messageHandlerRowCreated(message: Message[JsonObject]): Future[Seq[Any]] = {
val body = message.body()
val (tableId, columnId, rowId) = getIds(body)

for {
table <- tableauxModel.retrieveTable(tableId.get, isInternalCall = true)
row <- tableauxModel.retrieveRow(table, rowId.get)
listeners = getApplicableListeners(EVENT_TYPE_ROW_CREATED, Some(table.getJson), None)

listenerResponses <-
sendMessage(listeners, createPayload(EVENT_TYPE_ROW_CREATED, tableId, columnId, rowId, row.getJson))
} yield {
Expand All @@ -427,8 +445,10 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
table <- tableauxModel.retrieveTable(tableId.get, isInternalCall = true)
column <- tableauxModel.retrieveColumn(table, columnId.get)
cellAnnotations <- tableauxModel.retrieveCellAnnotations(table, columnId.get, rowId.get)

listeners =
getApplicableListeners(EVENT_TYPE_CELL_ANNOTATION_CHANGED, Some(table.getJson), Some(column.getJson))

listenerResponses <- sendMessage(
listeners,
createPayload(EVENT_TYPE_CELL_ANNOTATION_CHANGED, tableId, columnId, rowId, cellAnnotations.getJson)
Expand All @@ -441,11 +461,14 @@ class MessagingVerticle(tableauxConfig: TableauxConfig) extends ScalaVerticle
private def messageHandlerRowAnnotationChanged(message: Message[JsonObject]): Future[Seq[Any]] = {
val body = message.body()
val (tableId, columnId, rowId) = getIds(body)

for {
table <- tableauxModel.retrieveTable(tableId.get, isInternalCall = true)
row <- tableauxModel.retrieveRow(table, rowId.get)

listeners =
getApplicableListeners(EVENT_TYPE_ROW_ANNOTATION_CHANGED, Some(table.getJson), None)

listenerResponses <- sendMessage(
listeners,
createPayload(EVENT_TYPE_ROW_ANNOTATION_CHANGED, tableId, columnId, rowId, row.getJson)
Expand Down
Loading

0 comments on commit d276b48

Please sign in to comment.