Skip to content

Commit

Permalink
feat: 增加制品分析任务执行心跳 TencentBlueKing#985 (TencentBlueKing#1058)
Browse files Browse the repository at this point in the history
* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985

* feat: 增加制品分析任务执行心跳 TencentBlueKing#985
  • Loading branch information
cnlkl authored Aug 22, 2023
1 parent 34d6130 commit 9f78cbc
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ class ExecutorScheduler @Autowired constructor(
scanClient.updateSubScanTaskStatus(subtask.taskId, SubScanTaskStatus.EXECUTING.name)

executingSubtaskExecutorMap[subtask.taskId] = scanExecutorFactory.get(subtask.scanner.type)
logger.info("task start, executing task count ${executingSubtaskExecutorMap.size}")
val executingCount = executingSubtaskExecutorMap.size
logger.info("task start, executing task count $executingCount")
executor.execute {
try {
startHeartbeat(subtask.taskId, executingCount)
doScan(subtask)
} finally {
executingSubtaskExecutorMap.remove(subtask.taskId)
Expand All @@ -97,6 +99,21 @@ class ExecutorScheduler @Autowired constructor(
return executingSubtaskExecutorMap.containsKey(taskId)
}

/**
* 开始发送任务心跳到制品分析服务
*/
private fun startHeartbeat(subtaskId: String, executingCount: Int) {
if (scannerExecutorProperties.heartbeatInterval.seconds > 0) {
val runnable = SubtaskHeartbeatRunnable(
this,
scannerExecutorProperties.heartbeatInterval,
scanClient,
subtaskId
)
Thread(runnable, "subtask-heartbeat-$executingCount").start()
}
}

private fun pullSubtaskAtFixedRate() {
val runnable = {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2023 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.analysis.executor

import com.tencent.bkrepo.analyst.api.ScanClient
import org.slf4j.LoggerFactory
import java.time.Duration

class SubtaskHeartbeatRunnable(
private val executorScheduler: ExecutorScheduler,
private val heartbeatInterval: Duration,
private val scanClient: ScanClient,
private val subtaskId: String
): Runnable {
override fun run() {
while (executorScheduler.scanning(subtaskId)) {
try {
scanClient.heartbeat(subtaskId)
Thread.sleep(heartbeatInterval.toMillis())
} catch (ignore: Exception) {
logger.warn("subtask[$subtaskId] heartbeat failed", ignore)
}
}
}

companion object {
private val logger = LoggerFactory.getLogger(SubtaskHeartbeatRunnable::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.bkrepo.analysis.executor.configuration

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.util.unit.DataSize
import java.time.Duration

@ConfigurationProperties("scanner.executor")
data class ScannerExecutorProperties(
Expand Down Expand Up @@ -63,5 +64,9 @@ data class ScannerExecutorProperties(
/**
* 是否输出分析工具容器执行日志
*/
var showContainerLogs: Boolean = true
var showContainerLogs: Boolean = true,
/**
* 子任务心跳间隔,为0时不上报心跳
*/
var heartbeatInterval: Duration = Duration.ofSeconds(0)
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ interface ScanClient {
@RequestParam status: String
): Response<Boolean>

/**
* 维持任务心跳
*/
@PostMapping("/subtask/{subtaskId}/heartbeat")
fun heartbeat(
@PathVariable("subtaskId") subtaskId: String,
): Response<Boolean>

/**
* 根据许可id列表查询许可详细信息
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ data class ScannerProperties(
* 为0时表示任务将不会因为阻塞而超时
*/
var blockTimeout: Duration = Duration.ofSeconds(DEFAULT_TASK_EXECUTE_TIMEOUT_SECONDS),
/**
* 任务心跳超时时间,当任务超过这个时间未上报状态时将会触发超时, 0表示不检查任务心跳
*/
var heartbeatTimeout: Duration = Duration.ofMinutes(0),
/**
* 任务最长执行时间,超过后将不再重试而是直接转为超时状态
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class ScanController @Autowired constructor(
return ResponseBuilder.success(scanService.updateSubScanTaskStatus(subScanTaskId, status))
}

override fun heartbeat(subtaskId: String): Response<Boolean> {
scanService.heartbeat(subtaskId)
return ResponseBuilder.success()
}

override fun licenseInfoByIds(licenseIds: List<String>): Response<Map<String, SpdxLicenseInfo>> {
return ResponseBuilder.success(licenseService.listLicenseByIds(licenseIds))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,15 @@ class UserTemporaryScanController(
temporaryScanTokenService.checkToken(subtaskId, token)
return ResponseBuilder.success(scanService.updateSubScanTaskStatus(subtaskId, status))
}

@ApiOperation("维持任务心跳")
@PostMapping("/scan/subtask/{subtaskId}/heartbeat")
fun heartbeat(
@PathVariable subtaskId: String,
@RequestParam token: String
): Response<Void> {
temporaryScanTokenService.checkToken(subtaskId, token)
scanService.heartbeat(subtaskId)
return ResponseBuilder.success()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class SubScanTaskDao(
} else {
update.unset(TSubScanTask::timeoutDateTime.name)
}
if (status == EXECUTING || status == PULLED) {
update.set(TSubScanTask::heartbeatDateTime.name, now)
}

val updateResult = updateFirst(query, update)
if (updateResult.modifiedCount == 1L) {
Expand All @@ -136,6 +139,12 @@ class SubScanTaskDao(
return updateResult
}

fun heartbeat(subtaskId: String): UpdateResult {
val criteria = Criteria.where(ID).isEqualTo(subtaskId).and(TSubScanTask::status.name).`in`(PULLED, EXECUTING)
val update = Update.update(TSubScanTask::heartbeatDateTime.name, LocalDateTime.now())
return updateFirst(Query(criteria), update)
}

fun incExecutedTimes(subTaskId: String): UpdateResult {
val criteria = Criteria.where(ID).isEqualTo(subTaskId)
val update = Update()
Expand Down Expand Up @@ -229,22 +238,21 @@ class SubScanTaskDao(
/**
* 获取一个执行超时的任务
*
* @param timeoutSeconds 允许执行的最长时间
* @param heartbeatTimeoutSeconds 心跳超时时间
*/
fun firstTimeoutTask(timeoutSeconds: Long, dispatcher: String?): TSubScanTask? {
fun firstTimeoutTask(heartbeatTimeoutSeconds: Long, dispatcher: String?): TSubScanTask? {
val now = LocalDateTime.now()

val lastModifiedCriteria = Criteria
.where(TSubScanTask::lastModifiedDate.name).lt(now.minusSeconds(timeoutSeconds))
.and(TSubScanTask::timeoutDateTime.name).exists(false)

val timeoutCriteria = Criteria().orOperator(
TSubScanTask::timeoutDateTime.lt(now),
lastModifiedCriteria
)
val timeoutCriteria = ArrayList<Criteria>()
timeoutCriteria.add(TSubScanTask::timeoutDateTime.lt(now))
if(heartbeatTimeoutSeconds > 0) {
val heartbeatTimeoutCriteria = Criteria
.where(TSubScanTask::heartbeatDateTime.name).lt(now.minusSeconds(heartbeatTimeoutSeconds))
timeoutCriteria.add(heartbeatTimeoutCriteria)
}

val criteria = Criteria().andOperator(
timeoutCriteria,
Criteria().orOperator(timeoutCriteria),
TSubScanTask::status.inValues(PULLED.name, EXECUTING.name),
dispatcherCriteria(dispatcher)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import io.kubernetes.client.openapi.ApiException
import io.kubernetes.client.util.ClientBuilder
import io.kubernetes.client.util.Config
import io.kubernetes.client.util.credentials.AccessTokenAuthentication
import java.time.Duration

fun buildCommand(cmd: String, baseUrl: String, subtaskId: String, token: String, ): List<String> {
fun buildCommand(
cmd: String, baseUrl: String, subtaskId: String, token: String, heartbeatTimeout: Duration
): List<String> {
val command = ArrayList<String>()
command.addAll(cmd.split(" "))
command.add("--url")
Expand All @@ -43,6 +46,8 @@ fun buildCommand(cmd: String, baseUrl: String, subtaskId: String, token: String,
command.add(token)
command.add("--task-id")
command.add(subtaskId)
command.add("--heartbeat")
command.add((heartbeatTimeout.seconds / 2L).toString())
return command
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ class DockerDispatcher(
val scanner = subtask.scanner
require(scanner is StandardScanner)
try {
val command = buildCommand(scanner.cmd, scannerProperties.baseUrl, subtask.taskId, subtask.token!!)
val command = buildCommand(
cmd = scanner.cmd,
baseUrl = scannerProperties.baseUrl,
subtaskId = subtask.taskId,
token = subtask.token!!,
heartbeatTimeout = scannerProperties.heartbeatTimeout
)
val containerId = dockerClient.createContainer(
image = scanner.image, hostConfig = hostConfig(), cmd = command
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ class KubernetesDeploymentDispatcher(
command.add(executionCluster.name)
command.add("--pull-retry")
command.add(executionCluster.pullRetry.toString())
command.add("--keep-running")
command.add("--heartbeat")
command.add((scannerProperties.heartbeatTimeout.seconds / 2L).toString())
return command
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ class KubernetesDispatcher(
require(scanner is StandardScanner)
val jobName = jobName(subtask)
val containerImage = scanner.image
val cmd = buildCommand(scanner.cmd, scannerProperties.baseUrl, subtask.taskId, subtask.token!!)
val cmd = buildCommand(
cmd = scanner.cmd,
baseUrl = scannerProperties.baseUrl,
subtaskId = subtask.taskId,
token = subtask.token!!,
heartbeatTimeout = scannerProperties.heartbeatTimeout
)
val requestStorageSize = maxStorageSize(subtask.packageSize)
val jobActiveDeadlineSeconds = subtask.scanner.maxScanDuration(subtask.packageSize)
val k8sProps = executionCluster.kubernetesProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ import java.time.LocalDateTime
def = "{'projectId': 1, 'status': 1}",
background = true
),
CompoundIndex(
name = "status_metadata_idx",
def = "{'status': 1, 'metadata.key': 1, 'metadata.value': 1}",
background = true
),
CompoundIndex(
name = "parentScanTaskId_idx",
def = "{'parentScanTaskId': 1}",
Expand All @@ -76,7 +81,10 @@ class TSubScanTask(
* 执行超时时间点
*/
val timeoutDateTime: LocalDateTime? = null,

/**
* 任务上次心跳时间
*/
val heartbeatDateTime: LocalDateTime? = null,
triggerType: String? = null,
parentScanTaskId: String,
planId: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ interface ScanService {
*/
fun updateSubScanTaskStatus(subScanTaskId: String, subScanTaskStatus: String): Boolean

/**
* 记录任务心跳
*/
fun heartbeat(subScanTaskId: String)

/**
* 拉取子任务
* @param dispatcher 指定子任务分发器
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class ScanServiceImpl @Autowired constructor(

override fun peek(dispatcher: String?): SubScanTask? {
val subtask = subScanTaskDao.firstTaskByStatusIn(listOf(SubScanTaskStatus.CREATED.name), dispatcher)
?: subScanTaskDao.firstTimeoutTask(DEFAULT_TASK_EXECUTE_TIMEOUT_SECONDS, dispatcher)
?: subScanTaskDao.firstTimeoutTask(scannerProperties.heartbeatTimeout.seconds, dispatcher)
return subtask?.let { SubtaskConverter.convert(it, scannerService.get(it.scanner)) }
}

Expand All @@ -215,6 +215,12 @@ class ScanServiceImpl @Autowired constructor(
return false
}

override fun heartbeat(subScanTaskId: String) {
if (subScanTaskDao.heartbeat(subScanTaskId).modifiedCount == 0L) {
throw NotFoundException(CommonMessageCode.RESOURCE_NOT_FOUND, subScanTaskId)
}
}

override fun get(subtaskId: String): SubScanTask {
return subScanTaskDao.findById(subtaskId)?.let {
SubtaskConverter.convert(it, scannerService.get(it.scanner))
Expand Down Expand Up @@ -248,7 +254,7 @@ class ScanServiceImpl @Autowired constructor(
while (true) {
// 优先返回待执行任务,再返回超时任务
val task = subScanTaskDao.firstTaskByStatusIn(listOf(SubScanTaskStatus.CREATED.name), dispatcher)
?: subScanTaskDao.firstTimeoutTask(DEFAULT_TASK_EXECUTE_TIMEOUT_SECONDS, dispatcher)
?: subScanTaskDao.firstTimeoutTask(scannerProperties.heartbeatTimeout.seconds, dispatcher)
?: return null

// 处于执行中的任务,而且任务执行了最大允许的次数,直接设置为失败
Expand Down

0 comments on commit 9f78cbc

Please sign in to comment.