Skip to content

Commit

Permalink
feat: 优化Archive服务压缩性能 TencentBlueKing#1688
Browse files Browse the repository at this point in the history
* impr: 优化系统gc TencentBlueKing#1648

* impr: 优化Archive服务压缩性能 TencentBlueKing#1688

* fix: 修复gc节点为空和高峰时微服务请求报错的情况 TencentBlueKing#1688

* fix: 修复在压缩处理异常时,任务订阅被取消的场景 TencentBlueKing#1648

* fix: 修复在压缩处理异常时,任务订阅被取消的场景 TencentBlueKing#1648

* feat: 添加修改状态接口 TencentBlueKing#1648

* feat: 调整签名文件清理周期 TencentBlueKing#1648
  • Loading branch information
felixncheng authored Jan 23, 2024
1 parent 87333c7 commit 6c5a32c
Show file tree
Hide file tree
Showing 30 changed files with 1,013 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ package com.tencent.bkrepo.archive.constant
const val XZ_SUFFIX = ".xz"
const val DEEP_ARCHIVE = "DEEP_ARCHIVE"
const val DEFAULT_KEY = "default"
const val MAX_CHAIN_LENGTH = 10
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ data class CompressFileRequest(
val baseSha256: String,
val baseSize: Long,
val storageCredentialsKey: String?,
val sync: Boolean = false,
val operator: String = SYSTEM_USER,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.tencent.bkrepo.archive.request

import com.tencent.bkrepo.archive.CompressStatus
import com.tencent.bkrepo.repository.constant.SYSTEM_USER

data class UpdateCompressFileStatusRequest(
val sha256: String,
val storageCredentialsKey: String?,
val status: CompressStatus,
val operator: String = SYSTEM_USER,
)
4 changes: 4 additions & 0 deletions src/backend/archive/biz-archive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ dependencies {
api(project(":common:common-mongo"))
api(project(":common:common-mongo-reactive"))
implementation("io.micrometer:micrometer-registry-prometheus")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo")
testImplementation("org.mockito.kotlin:mockito-kotlin")
testImplementation("io.mockk:mockk")
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ data class ArchiveProperties(
* 恢复数量限制
* */
var restoreLimit: Int = 1000,

/**
* gc 压缩相关配置
* */
@NestedConfigurationProperty
val compress: CompressProperties = CompressProperties(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.tencent.bkrepo.archive.config

import java.time.Duration

data class CompressProperties(
var signThreads: Int = 1, // 文件签名:CPU IO
var ioThreads: Int = 1, // 文件下载:网络 IO
var diffThreads: Int = 1, // 文件差分:CPU 内存 IO
var patchThreads: Int = 1, // 文件合并:IO
var ratio: Float = 0.5f, // 重复率阈值
var signFileCacheTime: Duration = Duration.ofHours(6), // 签名文件缓存事件
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.tencent.bkrepo.archive.controller.user

import com.tencent.bkrepo.archive.request.UpdateCompressFileStatusRequest
import com.tencent.bkrepo.archive.service.CompressService
import com.tencent.bkrepo.archive.service.SystemAdminService
import com.tencent.bkrepo.common.security.permission.Principal
import com.tencent.bkrepo.common.security.permission.PrincipalType
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
Expand All @@ -13,9 +17,15 @@ import org.springframework.web.bind.annotation.RestController
@RequestMapping("/api/archive/admin")
class SystemAdminController(
private val systemAdminService: SystemAdminService,
private val compressService: CompressService,
) {
@PutMapping("/stop")
fun stop(@RequestParam jobName: String) {
systemAdminService.stop(jobName)
}

@PostMapping("/compress/update")
fun updateCompressFileStatus(@RequestBody request: UpdateCompressFileStatusRequest) {
compressService.updateStatus(request)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.tencent.bkrepo.archive.job

import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import reactor.core.publisher.Mono
import java.io.File

interface FileProvider {
fun get(sha256: String, range: Range, storageCredentials: StorageCredentials): Mono<File>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.tencent.bkrepo.archive.job

import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.innercos.retry
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.StorageUtils
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.io.File
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.Executor
import kotlin.system.measureNanoTime

class FileStorageFileProvider(
private val fileDir: Path,
private val diskFreeThreshold: Long,
private val executor: Executor,
) : FileProvider {

override fun get(sha256: String, range: Range, storageCredentials: StorageCredentials): Mono<File> {
val filePath = fileDir.resolve(sha256)
if (Files.exists(filePath)) {
return Mono.just(filePath.toFile())
}
return Mono.fromCallable {
logger.info("Downloading $sha256 on ${storageCredentials.key}")
if (!Files.exists(filePath)) {
download(sha256, range, storageCredentials, filePath)
}
val file = filePath.toFile()
if (range != Range.FULL_RANGE) {
check(range.length == file.length())
}
file
}.publishOn(Schedulers.fromExecutor(executor))
}

private fun download(sha256: String, range: Range, storageCredentials: StorageCredentials, filePath: Path) {
retry(RETRY_TIMES) {
checkDiskSpace()
val nanos = measureNanoTime {
StorageUtils.downloadUseLocalPath(sha256, range, storageCredentials, filePath)
}
val throughput = Throughput(Files.size(filePath), nanos)
logger.info("Success to download file [$sha256] on ${storageCredentials.key}, $throughput.")
}
}

private fun checkDiskSpace() {
var diskFreeInBytes = fileDir.toFile().usableSpace
while (diskFreeInBytes < diskFreeThreshold) {
logger.info(
"DFree disk space below threshold.Available:" +
" $diskFreeInBytes bytes (threshold: $diskFreeThreshold).",
)
Thread.sleep(CHECK_INTERVAL)
diskFreeInBytes = fileDir.toFile().usableSpace
}
}

companion object {
private val logger = LoggerFactory.getLogger(FileStorageFileProvider::class.java)
private const val RETRY_TIMES = 3
private const val CHECK_INTERVAL = 60000L
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.tencent.bkrepo.archive.job.compress

import com.tencent.bkrepo.common.bksync.file.BDUtils
import com.tencent.bkrepo.common.storage.monitor.Throughput
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.io.File
import java.nio.file.Path
import java.util.concurrent.Executor

class BDCompressor(
private val ratio: Float,
private val executor: Executor,
) {

/**
* 根据源文件和签名文件,压缩成新的bd文件
* */
fun compress(
srcFile: Mono<File>,
checksumFile: Mono<File>,
srcKey: String,
destKey: String,
workDir: Path,
): Mono<File> {
return Mono.zip(checksumFile, srcFile) { checksum, src ->
compress(src, checksum, srcKey, destKey, workDir)
}.flatMap { it }
}

private fun compress(src: File, checksum: File, srcKey: String, destKey: String, workDir: Path): Mono<File> {
return Mono.fromCallable {
try {
val start = System.nanoTime()
val file = BDUtils.deltaByChecksumFile(src, checksum, srcKey, destKey, workDir, ratio)
val nanos = System.nanoTime() - start
val throughput = Throughput(nanos, file.length())
logger.info("Success to bd compress $srcKey,$throughput.")
file
} finally {
src.delete()
}
}.publishOn(Schedulers.fromExecutor(executor))
}

companion object {
private val logger = LoggerFactory.getLogger(BDCompressor::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.tencent.bkrepo.archive.job.compress

import com.tencent.bkrepo.common.bksync.file.BDUtils
import com.tencent.bkrepo.common.storage.monitor.Throughput
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.io.File
import java.nio.file.Path
import java.util.concurrent.Executor

class BDUncompressor(
private val executor: Executor,
) {

/**
* 根据源文件和签名文件,压缩成新的bd文件
* */
fun patch(bdFile: Mono<File>, baseFile: Mono<File>, sha256: String, workDir: Path): Mono<File> {
return Mono.zip(bdFile, baseFile) { bd, bsf ->
uncompress(bd, bsf, sha256, workDir)
}.flatMap { it }
}

private fun uncompress(bdFile: File, baseFile: File, sha256: String, workDir: Path): Mono<File> {
return Mono.fromCallable {
try {
val start = System.nanoTime()
val file = BDUtils.patch(bdFile, baseFile, workDir)
val nanos = System.nanoTime() - start
val throughput = Throughput(nanos, file.length())
logger.info("Success to bd uncompress $sha256,$throughput.")
file
} finally {
bdFile.delete()
baseFile.delete()
}
}.publishOn(Schedulers.fromExecutor(executor))
}

companion object {
private val logger = LoggerFactory.getLogger(BDUncompressor::class.java)
}
}
Loading

0 comments on commit 6c5a32c

Please sign in to comment.