Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 监听节点重命名事件,并更新对应目录信息#1195 #1204

Merged
merged 8 commits into from
Oct 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import com.tencent.bkrepo.common.artifact.event.node.NodeCopiedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeCreatedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeDeletedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeMovedEvent
import com.tencent.bkrepo.common.artifact.event.node.NodeRenamedEvent
import com.tencent.bkrepo.common.artifact.path.PathUtils
import com.tencent.bkrepo.common.artifact.path.PathUtils.combineFullPath
import com.tencent.bkrepo.repository.dao.NodeDao
import com.tencent.bkrepo.repository.model.TNode
import com.tencent.bkrepo.repository.service.node.NodeService
Expand All @@ -48,6 +50,7 @@ import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.and
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.query.where
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.time.LocalDateTime
Expand Down Expand Up @@ -86,10 +89,12 @@ class NodeModifyEventListener(
EventType.NODE_COPIED,
EventType.NODE_CREATED,
EventType.NODE_DELETED,
EventType.NODE_MOVED
EventType.NODE_MOVED,
EventType.NODE_RENAMED
)


@Async
@EventListener(ArtifactEvent::class)
fun handle(event: ArtifactEvent) {
if (!acceptTypes.contains(event.type)) {
Expand Down Expand Up @@ -133,49 +138,70 @@ class NodeModifyEventListener(
logger.info("event type ${event.type}")
val modifiedNodeList = mutableListOf<ModifiedNodeInfo>()
when (event.type) {
EventType.NODE_MOVED -> {
require(event is NodeMovedEvent)
val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey)
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = dstFullPath
)
EventType.NODE_DELETED -> {
require(event is NodeDeletedEvent)
val deletedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey,
deleted = true
)
modifiedNodeList.add(createdNode)
modifiedNodeList.add(deletedNode)
}
EventType.NODE_DELETED -> {
require(event is NodeDeletedEvent)
EventType.NODE_CREATED -> {
require(event is NodeCreatedEvent)
val createdNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey
)
modifiedNodeList.add(createdNode)
}
EventType.NODE_RENAMED -> {
require(event is NodeRenamedEvent)
// 节点重命名逻辑和其他操作不同,它会对旧节点下的目录删除,然后新建,但是对于非目录节点是进行更新动作,而不是删除再新建
// 节点重命名操作只需要更新该节点下的子目录的统计信息,不需要更新其上层目录统计信息
val renamedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.newFullPath,
includePrefix = event.newFullPath
)
modifiedNodeList.add(renamedNode)
}
EventType.NODE_MOVED -> {
require(event is NodeMovedEvent)
// 1 move空目录,2 move到已存在目录, 3 move到新目录 4 同路径,跳过 5 src为dst目录下的子节点,跳过
// 针对1 2 两种情况,需要判断原目录中的节点,然后再进行目标目录的统计信息更新
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = event.dstFullPath,
srcProjectId = event.projectId,
srcRepoName = event.repoName,
srcFullPath = event.resourceKey,
srcDeleted = true
)
val deletedNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey,
deleted = true
)
modifiedNodeList.add(createdNode)
modifiedNodeList.add(deletedNode)
}
EventType.NODE_COPIED -> {
require(event is NodeCopiedEvent)
val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey)
// 1 copy空目录, 2 copy到已存在目录,3 copy到新目录 4 同路径,跳过 5 src为dst目录下的子节点,跳过
// 针对1 2 两种情况,需要判断原目录中的节点,然后再进行目标目录的统计信息更新
val createdNode = ModifiedNodeInfo(
projectId = event.dstProjectId,
repoName = event.dstRepoName,
fullPath = dstFullPath
)
modifiedNodeList.add(createdNode)
}
EventType.NODE_CREATED -> {
require(event is NodeCreatedEvent)
val createdNode = ModifiedNodeInfo(
projectId = event.projectId,
repoName = event.repoName,
fullPath = event.resourceKey
fullPath = event.dstFullPath,
srcProjectId = event.projectId,
srcRepoName = event.repoName,
srcFullPath = event.resourceKey,
)
modifiedNodeList.add(createdNode)
}
Expand Down Expand Up @@ -203,64 +229,83 @@ class NodeModifyEventListener(
logger.info("start to stat modified node size with fullPath ${node.fullPath}" +
" in repo ${node.projectId}|${node.repoName}")
if (node.folder) {
val sourceNodes = filterSourceNodesFromMoveOrCopy(modifiedNode)
logger.info("the size of node ${modifiedNode.srcFullPath} is ${sourceNodes?.size}" +
" in repo ${modifiedNode.srcProjectId}|${modifiedNode.srcRepoName}")
if (sourceNodes != null && sourceNodes.isEmpty()) return
findAndCacheSubFolders(
artifactInfo = artifactInfo,
deleted = node.nodeInfo.deleted,
deletedFlag = modifiedNode.deleted
deletedFlag = modifiedNode.deleted,
includePrefix = modifiedNode.includePrefix,
sourceNodes = sourceNodes
)
} else {
updateCache(
projectId = artifactInfo.projectId,
repoName = artifactInfo.repoName,
fullPath = artifactInfo.getArtifactFullPath(),
size = node.size,
deleted = modifiedNode.deleted
deleted = modifiedNode.deleted,
includePrefix = modifiedNode.includePrefix
)
}
}

/**
* 更新缓存
* 当要更新包含该文件所有的目录的缓存记录时includePrefix为空
* 当只需要更新特定目录前缀目录的缓存记录时设置includePrefix
*/
private fun updateCache(
projectId: String,
repoName: String,
fullPath: String,
size: Long,
deleted: Boolean = false
deleted: Boolean = false,
includePrefix: String? = null
) {

// 更新当前节点所有上级目录统计信息
PathUtils.resolveAncestorFolder(fullPath).forEach{
if (it != PathUtils.ROOT) {
val key = Triple(projectId, repoName, it)
var (cachedSize, nodeNum) = cache.getIfPresent(key) ?: Pair(0L, 0L)
if (deleted) {
cachedSize -= size
nodeNum -= 1
} else {
cachedSize += size
nodeNum += 1
}
cache.put(key, Pair(cachedSize, nodeNum))
val folderPaths = PathUtils.resolveAncestorFolder(fullPath)
folderPaths.forEach { it ->
if (it == PathUtils.ROOT) return@forEach
// 当只需要更新特定目录前缀目录的缓存记录时设置folderPrefix
if (!includePrefix.isNullOrEmpty() && !it.startsWith(includePrefix)) return@forEach
val key = Triple(projectId, repoName, it)
var (cachedSize, nodeNum) = cache.getIfPresent(key) ?: Pair(0L, 0L)
if (deleted) {
cachedSize -= size
nodeNum -= 1
} else {
cachedSize += size
nodeNum += 1
}
cache.put(key, Pair(cachedSize, nodeNum))
}
}

private fun findAndCacheSubFolders(
artifactInfo: ArtifactInfo,
deleted: String? = null,
deletedFlag: Boolean = false
deletedFlag: Boolean = false,
includePrefix: String? = null,
sourceNodes: List<String>? = null
) {
findAllNodesUnderFolder(
artifactInfo.projectId,
artifactInfo.repoName,
artifactInfo.getArtifactFullPath(),
deleted = deleted
).forEach {
if (!sourceNodes.isNullOrEmpty() && !sourceNodes.contains(it.fullPath)) return@forEach
updateCache(
projectId = artifactInfo.projectId,
repoName = artifactInfo.repoName,
fullPath = it.fullPath.getFolderPath(),
fullPath = it.fullPath,
size = it.size,
deleted = deletedFlag
deleted = deletedFlag,
includePrefix = includePrefix
)
}
}
Expand All @@ -278,6 +323,42 @@ class NodeModifyEventListener(
}


/**
* 针对move/copy情况下目标节点是目录的情况下,过滤出变更的节点信息
* 可能情况:1 源节点为空目录 2 目标节点为已存在的目录,其下可能已经包含文件
*/
private fun filterSourceNodesFromMoveOrCopy(modifiedNode: ModifiedNodeInfo): List<String>? {
if (modifiedNode.srcFullPath.isNullOrEmpty()) return null
val artifactInfo = ArtifactInfo(
projectId = modifiedNode.srcProjectId!!,
repoName = modifiedNode.srcRepoName!!,
artifactUri = modifiedNode.srcFullPath!!
)
val sourceNodes = mutableListOf<String>()
val node = if (modifiedNode.srcDeleted) {
nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return emptyList()
} else {
// 查询节点信息,当节点新增,然后删除后可能会找不到节点
nodeService.getNodeDetail(artifactInfo)
?: nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return emptyList()
}
val path = PathUtils.resolveParent(modifiedNode.srcFullPath!!)
if (node.folder) {
findAllNodesUnderFolder(
artifactInfo.projectId,
artifactInfo.repoName,
artifactInfo.getArtifactFullPath(),
node.nodeInfo.deleted
).map {
sourceNodes.add(combineFullPath(modifiedNode.fullPath, it.fullPath.removePrefix(path)))
}
} else {
sourceNodes.add(combineFullPath(modifiedNode.fullPath, node.fullPath.removePrefix(path)))
}
return sourceNodes
}


/**
* 查询目录下的节点,排除path为"/"的节点
*/
Expand All @@ -303,28 +384,23 @@ class NodeModifyEventListener(
return Query(criteria).withHint(TNode.FULL_PATH_IDX)
}


private fun buildDstFullPath(dstFullPath: String, srcFullPath: String): String {
val path = PathUtils.toPath(dstFullPath)
val name = PathUtils.resolveName(srcFullPath)
return PathUtils.combineFullPath(path, name)
}

private fun String.getFolderPath(): String {
val path = PathUtils.resolveParent(this)
return PathUtils.normalizeFullPath(path)
}

private data class ModifiedNodeInfo(
var projectId: String,
var repoName: String,
var fullPath: String,
var deleted: Boolean = false
var deleted: Boolean = false,
// 针对重命名去过滤上层目录
var includePrefix: String? = null,
// 针对move/copy 目标节点是目录的情况下去判断来源节点信息
var srcProjectId: String? = null,
var srcRepoName: String? = null,
var srcFullPath: String? = null,
var srcDeleted: Boolean = false
)

companion object {
private val logger = LoggerFactory.getLogger(NodeModifyEventListener::class.java)
private const val FIXED_DELAY = 30000L
private const val FIXED_DELAY = 10000L
private val IGNORE_PROJECT_PREFIX_LIST = listOf("CODE_", "CLOSED_SOURCE_", "git_")
private val IGNORE_REPO_LIST = listOf(REPORT, LOG)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,17 @@ abstract class NodeBaseService(
deletedTime: LocalDateTime?
) {
with(node) {
if (isGenericRepo(repo)) {
publishEvent(buildCreatedEvent(node))
}
reportNode2Bkbase(node)
val createEnd = System.currentTimeMillis()
val timeout = createEnd - createStart > repositoryProperties.nodeCreateTimeout
if (timeout) {
logger.info("Create node[$fullPath] timeout")
rollbackCreate(parents, node, deletedTime)
throw ErrorCodeException(ArtifactMessageCode.NODE_CREATE_TIMEOUT, fullPath)
}
if (isGenericRepo(repo)) {
publishEvent(buildCreatedEvent(node))
}
reportNode2Bkbase(node)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,11 @@ open class NodeMoveCopySupport(

override fun moveNode(moveRequest: NodeMoveCopyRequest) {
moveCopy(moveRequest, true)
publishEvent(NodeEventFactory.buildMovedEvent(moveRequest))
logger.info("Move node success: [$moveRequest]")
}

override fun copyNode(copyRequest: NodeMoveCopyRequest) {
moveCopy(copyRequest, false)
publishEvent(NodeEventFactory.buildCopiedEvent(copyRequest))
logger.info("Copy node success: [$copyRequest]")
}

Expand All @@ -97,6 +95,11 @@ open class NodeMoveCopySupport(
} else {
moveCopyFile(this)
}
if (move) {
publishEvent(NodeEventFactory.buildMovedEvent(request))
} else {
publishEvent(NodeEventFactory.buildCopiedEvent(request))
}
}
}

Expand Down Expand Up @@ -184,6 +187,8 @@ open class NodeMoveCopySupport(
path = dstPath,
name = dstName,
fullPath = dstFullPath,
size = if (node.folder) 0 else node.size,
nodeNum = if (node.folder) null else node.nodeNum,
lastModifiedBy = operator,
lastModifiedDate = LocalDateTime.now()
)
Expand Down
Loading