diff --git a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/constant/ArchiveConstants.kt b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/constant/ArchiveConstants.kt index 3cadf7e360..c8c098c9ab 100644 --- a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/constant/ArchiveConstants.kt +++ b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/constant/ArchiveConstants.kt @@ -3,3 +3,4 @@ package com.tencent.bkrepo.archive.constant const val XZ_SUFFIX = ".xz" const val DEEP_ARCHIVE = "DEEP_ARCHIVE" const val DEFAULT_KEY = "default" +const val MAX_CHAIN_LENGTH = 10 diff --git a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/CompressFileRequest.kt b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/CompressFileRequest.kt index 1a1051e1a6..493049cdf9 100644 --- a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/CompressFileRequest.kt +++ b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/CompressFileRequest.kt @@ -8,6 +8,5 @@ data class CompressFileRequest( val baseSha256: String, val baseSize: Long, val storageCredentialsKey: String?, - val sync: Boolean = false, val operator: String = SYSTEM_USER, ) diff --git a/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/UpdateCompressFileStatusRequest.kt b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/UpdateCompressFileStatusRequest.kt new file mode 100644 index 0000000000..5e273b6211 --- /dev/null +++ b/src/backend/archive/api-archive/src/main/kotlin/com/tencent/bkrepo/archive/request/UpdateCompressFileStatusRequest.kt @@ -0,0 +1,11 @@ +package com.tencent.bkrepo.archive.request + +import com.tencent.bkrepo.archive.CompressStatus +import com.tencent.bkrepo.repository.constant.SYSTEM_USER + +data class UpdateCompressFileStatusRequest( + val sha256: String, + val storageCredentialsKey: String?, + val status: CompressStatus, + val operator: String = SYSTEM_USER, +) diff --git a/src/backend/archive/biz-archive/build.gradle.kts b/src/backend/archive/biz-archive/build.gradle.kts index 60b5367f22..5ccaefce77 100644 --- a/src/backend/archive/biz-archive/build.gradle.kts +++ b/src/backend/archive/biz-archive/build.gradle.kts @@ -7,4 +7,8 @@ dependencies { api(project(":common:common-mongo")) api(project(":common:common-mongo-reactive")) implementation("io.micrometer:micrometer-registry-prometheus") + testImplementation("org.springframework.boot:spring-boot-starter-test") + testImplementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo") + testImplementation("org.mockito.kotlin:mockito-kotlin") + testImplementation("io.mockk:mockk") } \ No newline at end of file diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveProperties.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveProperties.kt index d77d8282db..55ebe6feb4 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveProperties.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/ArchiveProperties.kt @@ -50,4 +50,10 @@ data class ArchiveProperties( * 恢复数量限制 * */ var restoreLimit: Int = 1000, + + /** + * gc 压缩相关配置 + * */ + @NestedConfigurationProperty + val compress: CompressProperties = CompressProperties(), ) diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/CompressProperties.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/CompressProperties.kt new file mode 100644 index 0000000000..47e4f1a53a --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/config/CompressProperties.kt @@ -0,0 +1,12 @@ +package com.tencent.bkrepo.archive.config + +import java.time.Duration + +data class CompressProperties( + var signThreads: Int = 1, // 文件签名:CPU IO + var ioThreads: Int = 1, // 文件下载:网络 IO + var diffThreads: Int = 1, // 文件差分:CPU 内存 IO + var patchThreads: Int = 1, // 文件合并:IO + var ratio: Float = 0.5f, // 重复率阈值 + var signFileCacheTime: Duration = Duration.ofHours(6), // 签名文件缓存事件 +) diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt index d0c90d4b28..22b575d9bb 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/controller/user/SystemAdminController.kt @@ -1,9 +1,13 @@ package com.tencent.bkrepo.archive.controller.user +import com.tencent.bkrepo.archive.request.UpdateCompressFileStatusRequest +import com.tencent.bkrepo.archive.service.CompressService import com.tencent.bkrepo.archive.service.SystemAdminService import com.tencent.bkrepo.common.security.permission.Principal import com.tencent.bkrepo.common.security.permission.PrincipalType +import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.PutMapping +import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.RestController @@ -13,9 +17,15 @@ import org.springframework.web.bind.annotation.RestController @RequestMapping("/api/archive/admin") class SystemAdminController( private val systemAdminService: SystemAdminService, + private val compressService: CompressService, ) { @PutMapping("/stop") fun stop(@RequestParam jobName: String) { systemAdminService.stop(jobName) } + + @PostMapping("/compress/update") + fun updateCompressFileStatus(@RequestBody request: UpdateCompressFileStatusRequest) { + compressService.updateStatus(request) + } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileProvider.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileProvider.kt new file mode 100644 index 0000000000..33faea37c2 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileProvider.kt @@ -0,0 +1,10 @@ +package com.tencent.bkrepo.archive.job + +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import reactor.core.publisher.Mono +import java.io.File + +interface FileProvider { + fun get(sha256: String, range: Range, storageCredentials: StorageCredentials): Mono +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileStorageFileProvider.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileStorageFileProvider.kt new file mode 100644 index 0000000000..edc77ec9c9 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/FileStorageFileProvider.kt @@ -0,0 +1,69 @@ +package com.tencent.bkrepo.archive.job + +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.innercos.retry +import com.tencent.bkrepo.common.storage.monitor.Throughput +import com.tencent.bkrepo.common.storage.util.StorageUtils +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import java.io.File +import java.nio.file.Files +import java.nio.file.Path +import java.util.concurrent.Executor +import kotlin.system.measureNanoTime + +class FileStorageFileProvider( + private val fileDir: Path, + private val diskFreeThreshold: Long, + private val executor: Executor, +) : FileProvider { + + override fun get(sha256: String, range: Range, storageCredentials: StorageCredentials): Mono { + val filePath = fileDir.resolve(sha256) + if (Files.exists(filePath)) { + return Mono.just(filePath.toFile()) + } + return Mono.fromCallable { + logger.info("Downloading $sha256 on ${storageCredentials.key}") + if (!Files.exists(filePath)) { + download(sha256, range, storageCredentials, filePath) + } + val file = filePath.toFile() + if (range != Range.FULL_RANGE) { + check(range.length == file.length()) + } + file + }.publishOn(Schedulers.fromExecutor(executor)) + } + + private fun download(sha256: String, range: Range, storageCredentials: StorageCredentials, filePath: Path) { + retry(RETRY_TIMES) { + checkDiskSpace() + val nanos = measureNanoTime { + StorageUtils.downloadUseLocalPath(sha256, range, storageCredentials, filePath) + } + val throughput = Throughput(Files.size(filePath), nanos) + logger.info("Success to download file [$sha256] on ${storageCredentials.key}, $throughput.") + } + } + + private fun checkDiskSpace() { + var diskFreeInBytes = fileDir.toFile().usableSpace + while (diskFreeInBytes < diskFreeThreshold) { + logger.info( + "DFree disk space below threshold.Available:" + + " $diskFreeInBytes bytes (threshold: $diskFreeThreshold).", + ) + Thread.sleep(CHECK_INTERVAL) + diskFreeInBytes = fileDir.toFile().usableSpace + } + } + + companion object { + private val logger = LoggerFactory.getLogger(FileStorageFileProvider::class.java) + private const val RETRY_TIMES = 3 + private const val CHECK_INTERVAL = 60000L + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDCompressor.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDCompressor.kt new file mode 100644 index 0000000000..34eda72059 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDCompressor.kt @@ -0,0 +1,50 @@ +package com.tencent.bkrepo.archive.job.compress + +import com.tencent.bkrepo.common.bksync.file.BDUtils +import com.tencent.bkrepo.common.storage.monitor.Throughput +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import java.io.File +import java.nio.file.Path +import java.util.concurrent.Executor + +class BDCompressor( + private val ratio: Float, + private val executor: Executor, +) { + + /** + * 根据源文件和签名文件,压缩成新的bd文件 + * */ + fun compress( + srcFile: Mono, + checksumFile: Mono, + srcKey: String, + destKey: String, + workDir: Path, + ): Mono { + return Mono.zip(checksumFile, srcFile) { checksum, src -> + compress(src, checksum, srcKey, destKey, workDir) + }.flatMap { it } + } + + private fun compress(src: File, checksum: File, srcKey: String, destKey: String, workDir: Path): Mono { + return Mono.fromCallable { + try { + val start = System.nanoTime() + val file = BDUtils.deltaByChecksumFile(src, checksum, srcKey, destKey, workDir, ratio) + val nanos = System.nanoTime() - start + val throughput = Throughput(nanos, file.length()) + logger.info("Success to bd compress $srcKey,$throughput.") + file + } finally { + src.delete() + } + }.publishOn(Schedulers.fromExecutor(executor)) + } + + companion object { + private val logger = LoggerFactory.getLogger(BDCompressor::class.java) + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDUncompressor.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDUncompressor.kt new file mode 100644 index 0000000000..cc3a40c749 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDUncompressor.kt @@ -0,0 +1,44 @@ +package com.tencent.bkrepo.archive.job.compress + +import com.tencent.bkrepo.common.bksync.file.BDUtils +import com.tencent.bkrepo.common.storage.monitor.Throughput +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import java.io.File +import java.nio.file.Path +import java.util.concurrent.Executor + +class BDUncompressor( + private val executor: Executor, +) { + + /** + * 根据源文件和签名文件,压缩成新的bd文件 + * */ + fun patch(bdFile: Mono, baseFile: Mono, sha256: String, workDir: Path): Mono { + return Mono.zip(bdFile, baseFile) { bd, bsf -> + uncompress(bd, bsf, sha256, workDir) + }.flatMap { it } + } + + private fun uncompress(bdFile: File, baseFile: File, sha256: String, workDir: Path): Mono { + return Mono.fromCallable { + try { + val start = System.nanoTime() + val file = BDUtils.patch(bdFile, baseFile, workDir) + val nanos = System.nanoTime() - start + val throughput = Throughput(nanos, file.length()) + logger.info("Success to bd uncompress $sha256,$throughput.") + file + } finally { + bdFile.delete() + baseFile.delete() + } + }.publishOn(Schedulers.fromExecutor(executor)) + } + + companion object { + private val logger = LoggerFactory.getLogger(BDUncompressor::class.java) + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManager.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManager.kt new file mode 100644 index 0000000000..93ee09fd83 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManager.kt @@ -0,0 +1,224 @@ +package com.tencent.bkrepo.archive.job.compress + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.tencent.bkrepo.archive.CompressStatus +import com.tencent.bkrepo.archive.config.ArchiveProperties +import com.tencent.bkrepo.archive.event.StorageFileCompressedEvent +import com.tencent.bkrepo.archive.event.StorageFileUncompressedEvent +import com.tencent.bkrepo.archive.job.FileStorageFileProvider +import com.tencent.bkrepo.archive.model.TCompressFile +import com.tencent.bkrepo.archive.repository.CompressFileDao +import com.tencent.bkrepo.archive.repository.CompressFileRepository +import com.tencent.bkrepo.archive.utils.ArchiveDaoUtils.optimisticLock +import com.tencent.bkrepo.archive.utils.ArchiveUtils +import com.tencent.bkrepo.common.artifact.api.toArtifactFile +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.service.util.SpringContextUtils +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.common.storage.monitor.Throughput +import com.tencent.bkrepo.common.storage.util.toPath +import com.tencent.bkrepo.repository.api.FileReferenceClient +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import reactor.core.publisher.Mono +import java.nio.file.Files +import java.nio.file.Paths +import java.time.LocalDateTime + +/** + * bd压缩管理器,负责文件压缩与解压 + * */ +@Component +class BDZipManager( + private val compressFileDao: CompressFileDao, + private val archiveProperties: ArchiveProperties, + private val fileReferenceClient: FileReferenceClient, + private val compressFileRepository: CompressFileRepository, + private val storageService: StorageService, +) { + private val workDir = archiveProperties.workDir.toPath() + val signThreadPool = ArchiveUtils.newFixedAndCachedThreadPool( + archiveProperties.compress.signThreads, + ThreadFactoryBuilder().setNameFormat("bd-sign-%d").build(), + ) + val fileDownloadThreadPool = ArchiveUtils.newFixedAndCachedThreadPool( + archiveProperties.compress.ioThreads, + ThreadFactoryBuilder().setNameFormat("bd-io-%d").build(), + ) + val diffThreadPool = ArchiveUtils.newFixedAndCachedThreadPool( + archiveProperties.compress.diffThreads, + ThreadFactoryBuilder().setNameFormat("bd-diff-%d").build(), + ) + val patchThreadPool = ArchiveUtils.newFixedAndCachedThreadPool( + archiveProperties.compress.patchThreads, + ThreadFactoryBuilder().setNameFormat("bd-patch-%d").build(), + ) + + private val fileProvider = FileStorageFileProvider( + workDir.resolve(DOWNLOAD_DIR), + archiveProperties.threshold.toBytes(), + fileDownloadThreadPool, + ) + private val checksumProvider = ChecksumFileProvider( + workDir.resolve(SIGN_DIR), + fileProvider, + archiveProperties.compress.signFileCacheTime, + signThreadPool, + ) + private val bdCompressor = BDCompressor(archiveProperties.compress.ratio, diffThreadPool) + private val bdUncompressor = BDUncompressor(patchThreadPool) + + init { + val dirs = listOf(DOWNLOAD_DIR, SIGN_DIR, COMPRESS_DIR, UNCOMPRESS_DIR) + dirs.forEach { + val filePath = workDir.resolve(it) + if (!Files.exists(filePath)) { + Files.createDirectories(filePath) + } + } + } + + fun compress(file: TCompressFile) { + try { + compress0(file) + } catch (e: Exception) { + logger.error("Compress file [${file.sha256}] error", e) + } + } + + fun uncompress(file: TCompressFile) { + try { + uncompress0(file) + } catch (e: Exception) { + logger.error("Uncompress file [${file.sha256}] error", e) + } + } + + private fun compress0(file: TCompressFile) { + with(file) { + logger.info("Start compress file [$sha256].") + // 增量存储源文件和基础文件必须不同,不然会导致base文件丢失 + require(sha256 != baseSha256) { "Incremental storage source file and base file must be different." } + // 乐观锁 + val tryLock = compressFileDao.optimisticLock( + file, + TCompressFile::status.name, + CompressStatus.CREATED.name, + CompressStatus.COMPRESSING.name, + ) + if (!tryLock) { + logger.info("File[$sha256] already start compress.") + return + } + // 压缩 + val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) + val workDir = Paths.get(workDir.toString(), COMPRESS_DIR, sha256) + val srcFile = fileProvider.get(sha256, Range.full(uncompressedSize), credentials) + val baseRange = if (baseSize == null) Range.FULL_RANGE else Range.full(baseSize) + val checksumFile = checksumProvider.get(baseSha256, baseRange, credentials) + val begin = System.nanoTime() + bdCompressor.compress(srcFile, checksumFile, sha256, baseSha256, workDir) + .doOnSuccess { + val newFileName = sha256.plus(BD_FILE_SUFFIX) + storageService.store(newFileName, it.toArtifactFile(), credentials) + file.compressedSize = it.length() + file.status = CompressStatus.COMPRESSED + logger.info("Success to compress file [$sha256] on $storageCredentialsKey.") + } + .doOnError { + logger.info("Failed to compress file [$sha256].", it) + status = CompressStatus.COMPRESS_FAILED + fileReferenceClient.decrement(baseSha256, storageCredentialsKey) + } + .doFinally { + workDir.toFile().deleteRecursively() + file.lastModifiedDate = LocalDateTime.now() + compressFileRepository.save(file) + // 发送压缩事件 + val took = System.nanoTime() - begin + val throughput = Throughput(uncompressedSize, took) + val event = StorageFileCompressedEvent( + sha256 = sha256, + baseSha256 = baseSha256, + uncompressed = uncompressedSize, + compressed = compressedSize, + storageCredentialsKey = storageCredentialsKey, + throughput = throughput, + ) + SpringContextUtils.publishEvent(event) + logger.info("Complete compress file [$sha256] on $storageCredentialsKey") + } + .onErrorResume { Mono.empty() } + .subscribe() + } + } + + private fun uncompress0(file: TCompressFile) { + with(file) { + logger.info("Start uncompress file [$sha256].") + // 乐观锁 + val tryLock = compressFileDao.optimisticLock( + file, + TCompressFile::status.name, + CompressStatus.WAIT_TO_UNCOMPRESS.name, + CompressStatus.UNCOMPRESSING.name, + ) + if (!tryLock) { + logger.info("File[$sha256] already start uncompress.") + return + } + // 解压 + val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) + compressFileRepository.findBySha256AndStorageCredentialsKey(baseSha256, storageCredentialsKey)?.let { + if (file.status == CompressStatus.COMPLETED || file.status == CompressStatus.COMPRESSED) { + uncompress(it) + } + } + val workDir = Paths.get(workDir.toString(), UNCOMPRESS_DIR, sha256) + val bdFileName = sha256.plus(BD_FILE_SUFFIX) + val bdFile = fileProvider.get(bdFileName, Range.full(compressedSize), credentials) + val baseRange = if (baseSize == null) Range.FULL_RANGE else Range.full(baseSize) + val baseFile = fileProvider.get(baseSha256, baseRange, credentials) + val begin = System.nanoTime() + bdUncompressor.patch(bdFile, baseFile, sha256, workDir) + .doOnSuccess { + // 更新状态 + file.status = CompressStatus.UNCOMPRESSED + storageService.store(sha256, it.toArtifactFile(), credentials) + storageService.delete(bdFileName, credentials) + logger.info("Success to uncompress file [$sha256] on $storageCredentialsKey") + } + .doOnError { + logger.info("Failed to uncompress file [$sha256] on $storageCredentialsKey", it) + file.status = CompressStatus.UNCOMPRESS_FAILED + } + .doFinally { + workDir.toFile().deleteRecursively() + file.lastModifiedDate = LocalDateTime.now() + compressFileRepository.save(file) + val took = System.nanoTime() - begin + val throughput = Throughput(uncompressedSize, took) + val event = StorageFileUncompressedEvent( + sha256 = sha256, + compressed = compressedSize, + uncompressed = uncompressedSize, + storageCredentialsKey = storageCredentialsKey, + throughput = throughput, + ) + SpringContextUtils.publishEvent(event) + logger.info("Complete uncompress file [$sha256] on $storageCredentialsKey") + } + .onErrorResume { Mono.empty() } + .subscribe() + } + } + + companion object { + private val logger = LoggerFactory.getLogger(BDZipManager::class.java) + private const val BD_FILE_SUFFIX = ".bd" + private const val DOWNLOAD_DIR = "download" + private const val SIGN_DIR = "sign" + private const val COMPRESS_DIR = "compress" + private const val UNCOMPRESS_DIR = "uncompress" + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/ChecksumFileProvider.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/ChecksumFileProvider.kt new file mode 100644 index 0000000000..134b5c0774 --- /dev/null +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/ChecksumFileProvider.kt @@ -0,0 +1,81 @@ +package com.tencent.bkrepo.archive.job.compress + +import com.tencent.bkrepo.archive.job.FileProvider +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.bksync.BkSync +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.filesystem.cleanup.BasedAtimeAndMTimeFileExpireResolver +import com.tencent.bkrepo.common.storage.monitor.measureThroughput +import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import java.io.File +import java.nio.file.Files +import java.nio.file.Path +import java.time.Duration +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +class ChecksumFileProvider( + private val workDir: Path, + private val fileProvider: FileProvider, + private val cacheTime: Duration, + private val executor: Executor, +) : FileProvider { + + private val fileExpireResolver = BasedAtimeAndMTimeFileExpireResolver(cacheTime) + private val monitorExecutor = Executors.newSingleThreadScheduledExecutor() + + init { + monitorExecutor.scheduleAtFixedRate(this::deleteAfterAccess, 0, 1, TimeUnit.HOURS) + } + + override fun get(sha256: String, range: Range, storageCredentials: StorageCredentials): Mono { + check(range.isFullContent()) + val filePath = workDir.resolve("$sha256.checksum") + if (Files.exists(filePath)) { + return Mono.just(filePath.toFile()) + } + return fileProvider.get(sha256, range, storageCredentials).flatMap { + signFile(it, filePath, sha256, storageCredentials.key) + } + } + + private fun signFile(file: File, checksumFilePath: Path, sha256: String, key: String?): Mono { + return Mono.fromCallable { + try { + sign(file, checksumFilePath, sha256, key) + } finally { + file.delete() + } + }.publishOn(Schedulers.fromExecutor(executor)) + } + + private fun sign(file: File, checksumFilePath: Path, sha256: String, key: String?): File { + synchronized(sha256.intern()) { + Files.newOutputStream(checksumFilePath).use { out -> + val bkSync = BkSync() + val throughput = measureThroughput(file.length()) { bkSync.checksum(file, out) } + logger.info("Success to sign file [$sha256] on $key, $throughput.") + } + checksumFilePath.toFile() + } + return checksumFilePath.toFile() + } + + private fun deleteAfterAccess() { + Files.list(workDir).use { + it.forEach { path -> + if (Files.isRegularFile(path) && fileExpireResolver.isExpired(path.toFile())) { + Files.deleteIfExists(path) + logger.info("Delete sign file ${path.toAbsolutePath()}.") + } + } + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ChecksumFileProvider::class.java) + } +} diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt index eb51f288d1..21525ca2d5 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/CompressJob.kt @@ -29,7 +29,7 @@ class CompressJob( return ReactiveDaoUtils.query(query, TCompressFile::class.java) } - @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.HOURS) + @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.DAYS) fun compress() { val subscriber = CompressSubscriber(compressService) listFiles().subscribe(subscriber) diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt index ff8289dde3..85ce9379f7 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/job/compress/UncompressJob.kt @@ -28,7 +28,7 @@ class UncompressJob( return ReactiveDaoUtils.query(query, TCompressFile::class.java) } - @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) + @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.DAYS) fun uncompress() { val subscriber = UncompressSubscriber(compressService) listFiles().subscribe(subscriber) diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/listener/StorageCompressListener.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/listener/StorageCompressListener.kt index 42b5126ca6..f45c3df273 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/listener/StorageCompressListener.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/listener/StorageCompressListener.kt @@ -20,22 +20,23 @@ class StorageCompressListener(val archiveMetrics: ArchiveMetrics) { * */ @EventListener(StorageFileCompressedEvent::class) fun compress(event: StorageFileCompressedEvent) { + val key = event.storageCredentialsKey ?: DEFAULT_KEY with(event) { - val ratio = StringPool.calculateRatio(uncompressed, compressed) - val uncompressedSize = HumanReadable.size(uncompressed) - val compressedSize = HumanReadable.size(compressed) - val releaseSize = HumanReadable.size(uncompressed - compressed) - logger.info( - "Success to compress file $sha256 on $storageCredentialsKey," + - "($uncompressedSize->$compressedSize,$releaseSize) ratio:$ratio ,$throughput", - ) - val key = event.storageCredentialsKey ?: DEFAULT_KEY - archiveMetrics.getCounter(ArchiveMetrics.Action.COMPRESSED, key).increment() - archiveMetrics.getSizeCounter(ArchiveMetrics.Action.COMPRESSED, key, TYPE, TAG_COMPRESSED) - .increment(compressed.toDouble()) - archiveMetrics.getSizeCounter(ArchiveMetrics.Action.COMPRESSED, key, TYPE, TAG_UNCOMPRESSED) - .increment(uncompressed.toDouble()) - archiveMetrics.getTimer(ArchiveMetrics.Action.COMPRESSED, key).record(throughput.duration) + logger.info("Success to compress file $sha256 on $storageCredentialsKey,$throughput.") + if (compressed != -1L) { + val ratio = StringPool.calculateRatio(uncompressed, compressed) + val uncompressedSize = HumanReadable.size(uncompressed) + val compressedSize = HumanReadable.size(compressed) + val freeSize = uncompressed - compressed + val compressInfo = "$uncompressedSize->$compressedSize,${HumanReadable.size(freeSize)},ratio:$ratio" + logger.info("File[$sha256] compress info: $compressInfo.") + // 释放存储 + archiveMetrics.getSizeCounter(ArchiveMetrics.Action.STORAGE_FREE, key).increment(freeSize.toDouble()) + } + archiveMetrics.getCounter(ArchiveMetrics.Action.COMPRESSED, key).increment() // 压缩个数 + archiveMetrics.getSizeCounter(ArchiveMetrics.Action.COMPRESSED, key) + .increment(throughput.bytes.toDouble()) // 压缩吞吐 + archiveMetrics.getTimer(ArchiveMetrics.Action.COMPRESSED, key).record(throughput.duration) // 压缩时长 } } @@ -47,19 +48,17 @@ class StorageCompressListener(val archiveMetrics: ArchiveMetrics) { with(event) { logger.info("Success to uncompress file $sha256 on $storageCredentialsKey,$throughput") val key = event.storageCredentialsKey ?: DEFAULT_KEY - archiveMetrics.getCounter(ArchiveMetrics.Action.UNCOMPRESSED, key).increment() - archiveMetrics.getSizeCounter(ArchiveMetrics.Action.UNCOMPRESSED, key, TYPE, TAG_COMPRESSED) - .increment(compressed.toDouble()) - archiveMetrics.getSizeCounter(ArchiveMetrics.Action.UNCOMPRESSED, key, TYPE, TAG_UNCOMPRESSED) - .increment(uncompressed.toDouble()) - archiveMetrics.getTimer(ArchiveMetrics.Action.UNCOMPRESSED, key).record(throughput.duration) + val allocateSize = uncompressed - compressed + archiveMetrics.getSizeCounter(ArchiveMetrics.Action.STORAGE_ALLOCATE, key) + .increment(allocateSize.toDouble()) // 新增存储 + archiveMetrics.getCounter(ArchiveMetrics.Action.UNCOMPRESSED, key).increment() // 解压个数 + archiveMetrics.getSizeCounter(ArchiveMetrics.Action.UNCOMPRESSED, key) + .increment(throughput.bytes.toDouble()) // 解压吞吐 + archiveMetrics.getTimer(ArchiveMetrics.Action.UNCOMPRESSED, key).record(throughput.duration) // 解压时长 } } companion object { private val logger = LoggerFactory.getLogger(StorageCompressListener::class.java) - private const val TYPE = "type" - private const val TAG_COMPRESSED = "compressed" - private const val TAG_UNCOMPRESSED = "uncompressed" } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/metrics/ArchiveMetrics.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/metrics/ArchiveMetrics.kt index 378c31ab1b..ab7ba2b2f9 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/metrics/ArchiveMetrics.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/metrics/ArchiveMetrics.kt @@ -2,7 +2,7 @@ package com.tencent.bkrepo.archive.metrics import com.tencent.bkrepo.archive.ArchiveStatus import com.tencent.bkrepo.archive.CompressStatus -import com.tencent.bkrepo.archive.job.archive.ArchiveJob +import com.tencent.bkrepo.archive.job.compress.BDZipManager import com.tencent.bkrepo.archive.repository.ArchiveFileRepository import com.tencent.bkrepo.archive.repository.CompressFileRepository import io.micrometer.core.instrument.Counter @@ -17,36 +17,44 @@ import org.springframework.stereotype.Component * */ @Component class ArchiveMetrics( - val archiveJob: ArchiveJob, val archiveFileRepository: ArchiveFileRepository, val compressFileRepository: CompressFileRepository, + val bdZipManager: BDZipManager, ) : MeterBinder { lateinit var registry: MeterRegistry override fun bindTo(registry: MeterRegistry) { this.registry = registry // 下载量,队列 - Gauge.builder(FILE_DOWNLOAD_ACTIVE_COUNT, archiveJob.httpDownloadPool) { it.activeCount.toDouble() } + Gauge.builder(FILE_DOWNLOAD_ACTIVE_COUNT, bdZipManager.fileDownloadThreadPool) { it.activeCount.toDouble() } .description(FILE_DOWNLOAD_ACTIVE_COUNT_DESC) .register(registry) - Gauge.builder(FILE_DOWNLOAD_QUEUE_SIZE, archiveJob.httpDownloadPool) { it.queue.size.toDouble() } + Gauge.builder(FILE_DOWNLOAD_QUEUE_SIZE, bdZipManager.fileDownloadThreadPool) { it.queue.size.toDouble() } .description(FILE_DOWNLOAD_QUEUE_SIZE_DESC) .register(registry) // 压缩量,队列 - Gauge.builder(FILE_COMPRESS_ACTIVE_COUNT, archiveJob.compressPool) { it.activeCount.toDouble() } + Gauge.builder(FILE_COMPRESS_ACTIVE_COUNT, bdZipManager.diffThreadPool) { it.activeCount.toDouble() } .description(FILE_COMPRESS_ACTIVE_COUNT_DESC) .register(registry) - Gauge.builder(FILE_COMPRESS_QUEUE_SIZE, archiveJob.compressPool) { it.queue.size.toDouble() } + Gauge.builder(FILE_COMPRESS_QUEUE_SIZE, bdZipManager.diffThreadPool) { it.queue.size.toDouble() } .description(FILE_COMPRESS_QUEUE_SIZE_DESC) .register(registry) - // 上传量,队列 - Gauge.builder(FILE_UPLOAD_ACTIVE_COUNT, archiveJob.httpUploadPool) { it.activeCount.toDouble() } - .description(FILE_UPLOAD_ACTIVE_COUNT_DESC) + // 签名量,队列 + Gauge.builder(FILE_SING_ACTIVE_COUNT, bdZipManager.signThreadPool) { it.activeCount.toDouble() } + .description(FILE_SING_ACTIVE_COUNT_DESC) .register(registry) - Gauge.builder(FILE_UPLOAD_QUEUE_SIZE, archiveJob.httpUploadPool) { it.queue.size.toDouble() } - .description(FILE_UPLOAD_QUEUE_SIZE_DESC) + Gauge.builder(FILE_SING_QUEUE_SIZE, bdZipManager.signThreadPool) { it.queue.size.toDouble() } + .description(FILE_SIGN_QUEUE_SIZE_DESC) + .register(registry) + + // 解压量,队列 + Gauge.builder(FILE_PATCH_ACTIVE_COUNT, bdZipManager.patchThreadPool) { it.activeCount.toDouble() } + .description(FILE_PATCH_ACTIVE_COUNT_DESC) + .register(registry) + Gauge.builder(FILE_PATCH_QUEUE_SIZE, bdZipManager.patchThreadPool) { it.queue.size.toDouble() } + .description(FILE_PATCH_QUEUE_SIZE_DESC) .register(registry) // 归档文件状态 @@ -84,6 +92,8 @@ class ArchiveMetrics( Action.UNCOMPRESSED -> Counter.builder(FILE_UNCOMPRESSED_COUNTER) .description(FILE_UNCOMPRESSED_COUNTER_DESC) + + else -> throw IllegalArgumentException("Action $action not support.") } return builder.tag(TAG_CREDENTIALS_KEY, credentialsKey) .register(registry) @@ -108,6 +118,12 @@ class ArchiveMetrics( Action.UNCOMPRESSED -> Counter.builder(FILE_UNCOMPRESSED_SIZE_COUNTER) .description(FILE_UNCOMPRESSED_SIZE_COUNTER_DESC) + + Action.STORAGE_FREE -> Counter.builder(STORAGE_FREE_SIZE_COUNTER) + .description(STORAGE_FREE_SIZE_COUNTER_DESC) + + Action.STORAGE_ALLOCATE -> Counter.builder(STORAGE_ALLOCATE_SIZE_COUNTER) + .description(STORAGE_ALLOCATE_SIZE_COUNTER_DESC) } require(tags.size % 2 == 0) for (i in 0 until tags.lastIndex) { @@ -137,6 +153,8 @@ class ArchiveMetrics( Action.UNCOMPRESSED -> Timer.builder(FILE_UNCOMPRESSED_TIME) .description(FILE_UNCOMPRESSED_TIME_DESC) + + else -> throw IllegalArgumentException("Action $action not support.") } return builder.tag(TAG_CREDENTIALS_KEY, credentialsKey) .register(registry) @@ -150,6 +168,8 @@ class ArchiveMetrics( RESTORED, COMPRESSED, UNCOMPRESSED, + STORAGE_FREE, + STORAGE_ALLOCATE, } companion object { @@ -177,6 +197,14 @@ class ArchiveMetrics( private const val FILE_COMPRESS_ACTIVE_COUNT_DESC = "文件压缩实时数量" private const val FILE_COMPRESS_QUEUE_SIZE = "file.compress.queue.size" private const val FILE_COMPRESS_QUEUE_SIZE_DESC = "文件压缩队列大小" + private const val FILE_SING_ACTIVE_COUNT = "file.sign.active.count" + private const val FILE_SING_ACTIVE_COUNT_DESC = "文件签名实时数量" + private const val FILE_SING_QUEUE_SIZE = "file.sign.queue.size" + private const val FILE_SIGN_QUEUE_SIZE_DESC = "文件签名队列大小" + private const val FILE_PATCH_ACTIVE_COUNT = "file.patch.active.count" + private const val FILE_PATCH_ACTIVE_COUNT_DESC = "文件解压实时数量" + private const val FILE_PATCH_QUEUE_SIZE = "file.patch.queue.size" + private const val FILE_PATCH_QUEUE_SIZE_DESC = "文件解压队列大小" private const val ARCHIVE_FILE_STATUS_COUNTER = "file.archive.status.count" private const val ARCHIVE_FILE_STATUS_COUNTER_DESC = "归档文件状态统计" private const val COMPRESS_FILE_STATUS_COUNTER = "file.compress.status.count" @@ -193,6 +221,10 @@ class ArchiveMetrics( private const val FILE_UNCOMPRESSED_SIZE_COUNTER_DESC = "文件解压大小" private const val FILE_UNCOMPRESSED_TIME = "file.uncompress.time" private const val FILE_UNCOMPRESSED_TIME_DESC = "文件解压耗时" + private const val STORAGE_FREE_SIZE_COUNTER = "storage.free.size.count" + private const val STORAGE_FREE_SIZE_COUNTER_DESC = "存储释放大小" + private const val STORAGE_ALLOCATE_SIZE_COUNTER = "storage.allocate.size.count" + private const val STORAGE_ALLOCATE_SIZE_COUNTER_DESC = "存储新增大小" private const val TAG_CREDENTIALS_KEY = "credentialsKey" private const val TAG_STATUS = "status" } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressService.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressService.kt index afa8dd120b..ab2c079c78 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressService.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressService.kt @@ -1,16 +1,18 @@ package com.tencent.bkrepo.archive.service +import com.tencent.bkrepo.archive.job.Cancellable import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.pojo.CompressFile import com.tencent.bkrepo.archive.request.CompleteCompressRequest import com.tencent.bkrepo.archive.request.CompressFileRequest import com.tencent.bkrepo.archive.request.DeleteCompressRequest import com.tencent.bkrepo.archive.request.UncompressFileRequest +import com.tencent.bkrepo.archive.request.UpdateCompressFileStatusRequest /** * 压缩服务 * */ -interface CompressService { +interface CompressService : Cancellable { /** * 压缩文件 * @return 1表示压缩成功,0表示未压缩 @@ -34,6 +36,11 @@ interface CompressService { * */ fun complete(request: CompleteCompressRequest) + /** + * 更新压缩文件状态 + * */ + fun updateStatus(request: UpdateCompressFileStatusRequest) + /** * 获取压缩信息 * */ diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt index 54195cb0aa..73423885d3 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/service/CompressServiceImpl.kt @@ -1,63 +1,48 @@ package com.tencent.bkrepo.archive.service -import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.archive.ArchiveFileNotFound import com.tencent.bkrepo.archive.CompressStatus -import com.tencent.bkrepo.archive.config.ArchiveProperties import com.tencent.bkrepo.archive.constant.ArchiveMessageCode -import com.tencent.bkrepo.archive.event.StorageFileCompressedEvent -import com.tencent.bkrepo.archive.event.StorageFileUncompressedEvent +import com.tencent.bkrepo.archive.constant.MAX_CHAIN_LENGTH +import com.tencent.bkrepo.archive.job.compress.BDZipManager import com.tencent.bkrepo.archive.model.TCompressFile import com.tencent.bkrepo.archive.pojo.CompressFile -import com.tencent.bkrepo.archive.repository.CompressFileDao import com.tencent.bkrepo.archive.repository.CompressFileRepository import com.tencent.bkrepo.archive.request.CompleteCompressRequest import com.tencent.bkrepo.archive.request.CompressFileRequest import com.tencent.bkrepo.archive.request.DeleteCompressRequest import com.tencent.bkrepo.archive.request.UncompressFileRequest -import com.tencent.bkrepo.archive.utils.ArchiveDaoUtils.optimisticLock +import com.tencent.bkrepo.archive.request.UpdateCompressFileStatusRequest import com.tencent.bkrepo.archive.utils.ArchiveUtils import com.tencent.bkrepo.common.api.exception.ErrorCodeException -import com.tencent.bkrepo.common.bksync.transfer.exception.TooLowerReuseRateException -import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.core.StorageService -import com.tencent.bkrepo.common.storage.innercos.retry -import com.tencent.bkrepo.common.storage.monitor.measureThroughput import com.tencent.bkrepo.repository.api.FileReferenceClient import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import reactor.core.publisher.Sinks -import reactor.core.scheduler.Schedulers import java.time.LocalDateTime +import java.util.concurrent.atomic.AtomicBoolean /** * 压缩服务实现类 * */ -@Suppress("LeakingThis") @Service class CompressServiceImpl( private val compressFileRepository: CompressFileRepository, private val storageService: StorageService, private val fileReferenceClient: FileReferenceClient, - private val compressFileDao: CompressFileDao, - archiveProperties: ArchiveProperties, + private val bdZipManager: BDZipManager, ) : CompressService { private val compressSink = Sinks.many().unicast().onBackpressureBuffer() private val uncompressSink = Sinks.many().unicast().onBackpressureBuffer() - - init { - val executor = ArchiveUtils.newFixedAndCachedThreadPool( - archiveProperties.ioThreads, - ThreadFactoryBuilder().setNameFormat("compress-worker-%d").build(), - ) - val scheduler = Schedulers.fromExecutor(executor) - compressSink.asFlux().parallel().runOn(scheduler).subscribe(this::compress0) - uncompressSink.asFlux().parallel().runOn(scheduler).subscribe(this::uncompress0) - } + private val compressDisposable = compressSink.asFlux().subscribe(bdZipManager::compress) + private val uncompressDisposable = uncompressSink.asFlux().subscribe(bdZipManager::uncompress) + private var shutdown = AtomicBoolean(false) override fun compress(request: CompressFileRequest) { with(request) { + require(sha256 != baseSha256) // 队头元素 val head = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, storageCredentialsKey) if (head != null && head.status != CompressStatus.NONE) { @@ -98,7 +83,7 @@ class CompressServiceImpl( status = CompressStatus.NONE, chainLength = 1, ) - if (newChain.status != CompressStatus.NONE) { + if (newChain.status != CompressStatus.NONE && newChain.status != CompressStatus.COMPRESS_FAILED) { throw ErrorCodeException(ArchiveMessageCode.BASE_COMPRESSED) } /* @@ -181,6 +166,23 @@ class CompressServiceImpl( } } + override fun updateStatus(request: UpdateCompressFileStatusRequest) { + with(request) { + val file = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, storageCredentialsKey) + ?: throw ArchiveFileNotFound(sha256) + val oldStatus = file.status + file.status = status + compressFileRepository.save(file) + if (status == CompressStatus.CREATED) { + compress(file) + } + if (status == CompressStatus.WAIT_TO_UNCOMPRESS) { + uncompress(file) + } + logger.info("Update file $sha256 status $oldStatus -> $status.") + } + } + override fun getCompressInfo(sha256: String, storageCredentialsKey: String?): CompressFile? { val file = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, storageCredentialsKey) if (file == null || file.status == CompressStatus.NONE) { @@ -212,122 +214,17 @@ class CompressServiceImpl( logger.info("Emit file ${file.sha256} to uncompress: $result") } - private fun compress0(file: TCompressFile) { - with(file) { - logger.info("Start compress file [$sha256].") - // 乐观锁 - val tryLock = compressFileDao.optimisticLock( - file, - TCompressFile::status.name, - CompressStatus.CREATED.name, - CompressStatus.COMPRESSING.name, - ) - if (!tryLock) { - logger.info("File[$sha256] already start compress.") - return - } - // 压缩 - val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) - var compressedSize = -1L - try { - val throughput = measureThroughput(uncompressedSize) { - compressedSize = retry( - times = RETRY_TIMES, - delayInSeconds = 1, - ignoreExceptions = listOf(TooLowerReuseRateException::class.java), - ) { - storageService.compress(sha256, uncompressedSize, baseSha256, baseSize, credentials, true) - } - } - if (compressedSize == -1L) { - return - } - // 更新状态 - file.compressedSize = compressedSize - file.status = CompressStatus.COMPRESSED - file.lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(file) - val event = StorageFileCompressedEvent( - sha256 = sha256, - baseSha256 = baseSha256, - uncompressed = uncompressedSize, - compressed = compressedSize, - storageCredentialsKey = storageCredentialsKey, - throughput = throughput, - ) - SpringContextUtils.publishEvent(event) - } catch (e: TooLowerReuseRateException) { - logger.info("Reuse rate is too lower.") - compressFailed(file) - } catch (e: Exception) { - compressFailed(file) - throw e - } - } - } - - private fun uncompress0(file: TCompressFile) { - with(file) { - logger.info("Start uncompress file [$sha256].") - // 乐观锁 - val tryLock = compressFileDao.optimisticLock( - file, - TCompressFile::status.name, - CompressStatus.WAIT_TO_UNCOMPRESS.name, - CompressStatus.UNCOMPRESSING.name, - ) - if (!tryLock) { - logger.info("File[$sha256] already start uncompress.") - return - } - // 解压 - val credentials = ArchiveUtils.getStorageCredentials(storageCredentialsKey) - compressFileRepository.findBySha256AndStorageCredentialsKey(baseSha256, storageCredentialsKey)?.let { - if (file.status == CompressStatus.COMPLETED || file.status == CompressStatus.COMPRESSED) { - uncompress0(it) - } - } - try { - var ret = 0 - val throughput = measureThroughput(uncompressedSize) { - ret = storageService.uncompress(sha256, compressedSize, baseSha256, baseSize, credentials) - } - if (ret == 0) { - return - } - // 更新状态 - file.status = CompressStatus.UNCOMPRESSED - file.lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(file) - val event = StorageFileUncompressedEvent( - sha256 = sha256, - compressed = compressedSize, - uncompressed = uncompressedSize, - storageCredentialsKey = storageCredentialsKey, - throughput = throughput, - ) - SpringContextUtils.publishEvent(event) - } catch (e: Exception) { - file.status = CompressStatus.UNCOMPRESS_FAILED - file.lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(file) - throw e - } - } - } - - private fun compressFailed(file: TCompressFile) { - with(file) { - status = CompressStatus.COMPRESS_FAILED - lastModifiedDate = LocalDateTime.now() - compressFileRepository.save(file) - fileReferenceClient.decrement(baseSha256, storageCredentialsKey) + override fun cancel() { + if (shutdown.compareAndSet(false, true)) { + compressDisposable?.dispose() + uncompressDisposable?.dispose() + logger.info("Shutdown compress service successful.") + } else { + logger.info("Compress service has been shutdown.") } } companion object { private val logger = LoggerFactory.getLogger(CompressServiceImpl::class.java) - private const val MAX_CHAIN_LENGTH = 10 - private const val RETRY_TIMES = 3 } } diff --git a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/utils/ArchiveUtils.kt b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/utils/ArchiveUtils.kt index cdb4cb9e1e..cc1f0455d1 100644 --- a/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/utils/ArchiveUtils.kt +++ b/src/backend/archive/biz-archive/src/main/kotlin/com/tencent/bkrepo/archive/utils/ArchiveUtils.kt @@ -56,7 +56,10 @@ class ArchiveUtils( } fun getStorageCredentials(key: String?): StorageCredentials { - return storageCredentialsCache.get(key.orEmpty()) + return storageCredentialsCache.get(key.orEmpty()).apply { + // 指定使用本地路径进行cos分片下载 + upload.location = defaultStorageCredentials.upload.localPath + } } fun getRepositoryDetail(project: String, repoName: String): RepositoryDetail { @@ -88,6 +91,18 @@ class ArchiveUtils( ) } + fun newCachedThreadPool(maxThreads: Int, threadFactory: ThreadFactory): ThreadPoolExecutor { + return ThreadPoolExecutor( + 0, + maxThreads, + 60, + TimeUnit.SECONDS, + ArrayBlockingQueue(DEFAULT_BUFFER_SIZE), + threadFactory, + ThreadPoolExecutor.CallerRunsPolicy(), + ) + } + fun runCmd(cmd: List) { logger.debug("# ${cmd.joinToString(" ")}") val pb = ProcessBuilder() diff --git a/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/BaseTest.kt b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/BaseTest.kt new file mode 100644 index 0000000000..dda9025035 --- /dev/null +++ b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/BaseTest.kt @@ -0,0 +1,36 @@ +package com.tencent.bkrepo.archive + +import com.tencent.bkrepo.archive.utils.ArchiveUtils +import com.tencent.bkrepo.common.service.util.SpringContextUtils +import com.tencent.bkrepo.common.storage.util.StorageUtils +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkObject +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.SpringBootConfiguration +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.cloud.sleuth.Tracer +import org.springframework.cloud.sleuth.otel.bridge.OtelTracer +import org.springframework.context.annotation.ComponentScan +import org.springframework.test.context.TestPropertySource + +@ComponentScan("com.tencent.bkrepo.archive") +@SpringBootConfiguration +@EnableAutoConfiguration +@TestPropertySource(locations = ["classpath:bootstrap-ut.properties"]) +class BaseTest { + + @Autowired + lateinit var archiveUtils: ArchiveUtils + + @Autowired + lateinit var storageUtils: StorageUtils + + fun initMock() { + mockkObject(SpringContextUtils) + every { SpringContextUtils.publishEvent(any()) } returns Unit + val tracer = mockk() + every { SpringContextUtils.getBean() } returns tracer + every { tracer.currentSpan() } returns null + } +} diff --git a/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManagerTest.kt b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManagerTest.kt new file mode 100644 index 0000000000..fa10465636 --- /dev/null +++ b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/job/compress/BDZipManagerTest.kt @@ -0,0 +1,173 @@ +package com.tencent.bkrepo.archive.job.compress + +import com.tencent.bkrepo.archive.BaseTest +import com.tencent.bkrepo.archive.CompressStatus +import com.tencent.bkrepo.archive.model.TCompressFile +import com.tencent.bkrepo.archive.repository.CompressFileRepository +import com.tencent.bkrepo.common.artifact.api.ArtifactFile +import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile +import com.tencent.bkrepo.common.artifact.hash.sha256 +import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.bksync.file.BkSyncDeltaSource.Companion.toBkSyncDeltaSource +import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.common.storage.StorageAutoConfiguration +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.RepositoryClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.ArgumentMatchers +import org.mockito.Mockito +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.ImportAutoConfiguration +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean +import java.time.LocalDateTime +import kotlin.random.Random + +@DataMongoTest +@ImportAutoConfiguration(StorageAutoConfiguration::class, TaskExecutionAutoConfiguration::class) +class BDZipManagerTest @Autowired constructor( + private val storageService: StorageService, + private val bdZipManager: BDZipManager, + private val compressFileRepository: CompressFileRepository, +) : BaseTest() { + + @MockBean + lateinit var fileReferenceClient: FileReferenceClient + + @MockBean + lateinit var storageCredentialsClient: StorageCredentialsClient + + @MockBean + lateinit var repositoryClient: RepositoryClient + + @BeforeEach + fun beforeEach() { + initMock() + } + + @Test + fun compressTest() { + with(createCompressFile()) { + val cf = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, null) + Assertions.assertNotNull(cf) + Assertions.assertEquals(CompressStatus.COMPRESSED, cf!!.status) + Assertions.assertTrue(cf.compressedSize != -1L) + // 原文件还在,原文件的删除由单独job处理 + Assertions.assertTrue(storageService.exist(sha256, null)) + // 压缩文件存在 + Assertions.assertTrue(storageService.exist(sha256.plus(".bd"), null)) + storageService.load(sha256.plus(".bd"), Range.full(compressedSize), null)!!.use { + Assertions.assertDoesNotThrow { it.toBkSyncDeltaSource(createTempFile()) } + } + } + } + + @Test + fun compressFailedTest() { + var decrement = 0 + Mockito.`when`(fileReferenceClient.decrement(ArgumentMatchers.anyString(), ArgumentMatchers.isNull())).then { + println("decrement file reference") + decrement++ + ResponseBuilder.success(true) + } + val data1 = Random.nextBytes(Random.nextInt(1024, 1 shl 20)) + val data2 = data1.copyOfRange(Random.nextInt(1, 10), data1.size) + val artifactFile1 = createTempArtifactFile(data1) + val artifactFile2 = createTempArtifactFile(data2) + val file = TCompressFile( + createdBy = "ut", + createdDate = LocalDateTime.now(), + lastModifiedBy = "ut", + lastModifiedDate = LocalDateTime.now(), + sha256 = artifactFile1.getFileSha256(), + baseSha256 = artifactFile2.getFileSha256(), + uncompressedSize = 1, // set error + storageCredentialsKey = null, + status = CompressStatus.CREATED, + chainLength = 1, + + ) + storageService.store(artifactFile1.getFileSha256(), artifactFile1, null) + storageService.store(artifactFile2.getFileSha256(), artifactFile2, null) + compressFileRepository.save(file) + bdZipManager.compress(file) + Thread.sleep(1000) + val cf = compressFileRepository.findBySha256AndStorageCredentialsKey(artifactFile1.getFileSha256(), null) + Assertions.assertNotNull(cf) + Assertions.assertEquals(CompressStatus.COMPRESS_FAILED, cf!!.status) + Assertions.assertEquals(1, decrement) + } + + @Test + fun uncompressTest() { + val compressFile = createCompressFile() + storageService.delete(compressFile.sha256, null) + compressFile.status = CompressStatus.WAIT_TO_UNCOMPRESS + compressFileRepository.save(compressFile) + bdZipManager.uncompress(compressFile) + Thread.sleep(1000) + val cf = compressFileRepository.findBySha256AndStorageCredentialsKey(compressFile.sha256, null) + Assertions.assertEquals(CompressStatus.UNCOMPRESSED, cf!!.status) + with(cf) { + Assertions.assertTrue(storageService.exist(sha256, null)) + Assertions.assertFalse(storageService.exist(sha256.plus(".bd"), null)) + val load = storageService.load(sha256, Range.full(uncompressedSize), null) + Assertions.assertEquals(sha256, load!!.sha256()) + } + } + + @Test + fun uncompressFailedTest() { + val compressFile = createCompressFile() + storageService.delete(compressFile.sha256, null) + compressFile.status = CompressStatus.WAIT_TO_UNCOMPRESS + compressFileRepository.save(compressFile) + compressFile.compressedSize = 1 // set error + bdZipManager.uncompress(compressFile) + Thread.sleep(1000) + val cf = compressFileRepository.findBySha256AndStorageCredentialsKey(compressFile.sha256, null) + Assertions.assertEquals(CompressStatus.UNCOMPRESS_FAILED, cf!!.status) + with(cf) { + Assertions.assertFalse(storageService.exist(sha256, null)) + Assertions.assertTrue(storageService.exist(sha256.plus(".bd"), null)) + } + } + + private fun createCompressFile(): TCompressFile { + val data1 = Random.nextBytes(Random.nextInt(1024, 1 shl 20)) + val data2 = data1.copyOfRange(Random.nextInt(1, 10), data1.size) + val artifactFile1 = createTempArtifactFile(data1) + val artifactFile2 = createTempArtifactFile(data2) + val file = TCompressFile( + createdBy = "ut", + createdDate = LocalDateTime.now(), + lastModifiedBy = "ut", + lastModifiedDate = LocalDateTime.now(), + sha256 = artifactFile1.getFileSha256(), + baseSha256 = artifactFile2.getFileSha256(), + uncompressedSize = artifactFile1.getSize(), + storageCredentialsKey = null, + status = CompressStatus.CREATED, + chainLength = 1, + + ) + storageService.store(artifactFile1.getFileSha256(), artifactFile1, null) + storageService.store(artifactFile2.getFileSha256(), artifactFile2, null) + compressFileRepository.save(file) + bdZipManager.compress(file) + Thread.sleep(1000) + Assertions.assertEquals(CompressStatus.COMPRESSED, file.status) + return file + } + + private fun createTempArtifactFile(data: ByteArray): ArtifactFile { + val tempFile = createTempFile() + tempFile.writeBytes(data) + return FileSystemArtifactFile(tempFile) + } +} diff --git a/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/service/CompressServiceTest.kt b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/service/CompressServiceTest.kt new file mode 100644 index 0000000000..bebcf06e39 --- /dev/null +++ b/src/backend/archive/biz-archive/src/test/kotlin/com/tencent/bkrepo/archive/service/CompressServiceTest.kt @@ -0,0 +1,104 @@ +package com.tencent.bkrepo.archive.service + +import com.tencent.bkrepo.archive.BaseTest +import com.tencent.bkrepo.archive.CompressStatus +import com.tencent.bkrepo.archive.constant.MAX_CHAIN_LENGTH +import com.tencent.bkrepo.archive.job.compress.BDZipManager +import com.tencent.bkrepo.archive.repository.CompressFileRepository +import com.tencent.bkrepo.archive.request.CompressFileRequest +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.api.exception.ErrorCodeException +import com.tencent.bkrepo.common.storage.StorageAutoConfiguration +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.RepositoryClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.ImportAutoConfiguration +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean + +@DataMongoTest +@ImportAutoConfiguration(StorageAutoConfiguration::class, TaskExecutionAutoConfiguration::class) +class CompressServiceTest @Autowired constructor( + private val compressService: CompressService, + private val compressFileRepository: CompressFileRepository, +) : BaseTest() { + @MockBean + lateinit var bdZipManager: BDZipManager + + @MockBean + lateinit var storageService: StorageService + + @MockBean + lateinit var fileReferenceClient: FileReferenceClient + + @MockBean + lateinit var storageCredentialsClient: StorageCredentialsClient + + @MockBean + lateinit var repositoryClient: RepositoryClient + + @BeforeEach + fun beforeEach() { + compressFileRepository.deleteAll() + initMock() + } + + @Test + fun compressTest() { + with(createCompressFile()) { + val create = compressFileRepository.findBySha256AndStorageCredentialsKey(sha256, null) + Assertions.assertEquals(CompressStatus.CREATED, create!!.status) + } + } + + @Test + fun maxChainLengthTest() { + for (i in 0..MAX_CHAIN_LENGTH) { + if (i == MAX_CHAIN_LENGTH) { + assertThrows { createCompressFile("$i", "${i + 1}") } + } else { + createCompressFile("$i", "${i + 1}") + } + } + } + + @Test + fun compressFailedTest() { + val f1 = "f1" + val f2 = "f2" + createCompressFile(f1, f2) + // base已经压缩 + assertThrows { createCompressFile(f2, f1) } + // 相同的sha256 + assertThrows { createCompressFile(f1, f1) } + } + + @Test + fun cancelTest() { + compressService.cancel() + createCompressFile() + } + + private fun createCompressFile( + sha256: String = StringPool.randomString(64), + baseSha256: String = StringPool.randomString(64), + ): CompressFileRequest { + val request = CompressFileRequest( + sha256 = sha256, + size = 1, + baseSha256 = baseSha256, + baseSize = 1, + storageCredentialsKey = null, + operator = "ut-op", + ) + compressService.compress(request) + return request + } +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt index fc9377037b..288986ead6 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/CompressSupport.kt @@ -186,8 +186,7 @@ abstract class CompressSupport : OverlaySupport() { if (!Files.isDirectory(filePath.parent)) { Files.createDirectories(filePath.parent) } - val path = fileLocator.locate(digest) - val nanos = measureNanoTime { StorageUtils.download(path, digest, range, credentials, filePath) } + val nanos = measureNanoTime { StorageUtils.downloadUseLocalPath(digest, range, credentials, filePath) } if (range != Range.FULL_RANGE) { check(Files.size(filePath) == range.length) } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/FileCleanupChunkedFutureListener.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/FileCleanupChunkedFutureListener.kt index 74adf1567b..bb31e91c29 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/FileCleanupChunkedFutureListener.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/FileCleanupChunkedFutureListener.kt @@ -14,7 +14,7 @@ class FileCleanupChunkedFutureListener : ChunkedFutureListener { val file = future.get() try { Files.deleteIfExists(file.toPath()) - logger.info("Delete cos downloading temp file[$file] success.") + logger.debug("Delete cos downloading temp file[$file] success.") } catch (e: Exception) { logger.error("Delete cos downloading temp file[$file] failed.") } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/SessionChunkedFutureListener.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/SessionChunkedFutureListener.kt index 79261af0b5..f0a1c1aa2b 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/SessionChunkedFutureListener.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/SessionChunkedFutureListener.kt @@ -36,10 +36,10 @@ class SessionChunkedFutureListener(private val session: DownloadSession) : Chunk override fun done(future: Future?, getInputStreamTime: Long) { session.latencyTime = getInputStreamTime if (future == null || future.isCancelled) { - logger.info("Session[${session.id}] current latency $getInputStreamTime ms with no future") + logger.debug("Session[${session.id}] current latency $getInputStreamTime ms with no future") return } - logger.info("Session[${session.id}] finish read file[${future.get()}], current latency $getInputStreamTime ms") + logger.debug("Session[${session.id}] finish read file[${future.get()}], current latency $getInputStreamTime ms") } companion object { diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt index d5c7b75a44..ba203bbc8e 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/util/StorageUtils.kt @@ -4,6 +4,7 @@ import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.api.util.StreamUtils import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.storage.core.FileStorage +import com.tencent.bkrepo.common.storage.core.locator.FileLocator import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import com.tencent.bkrepo.common.storage.filesystem.FileSystemClient import org.springframework.stereotype.Component @@ -13,30 +14,32 @@ import java.nio.file.Path @Component class StorageUtils( private val fileStorage: FileStorage, + private val fileLocator: FileLocator, ) { init { Companion.fileStorage = fileStorage + Companion.fileLocator = fileLocator } companion object { private lateinit var fileStorage: FileStorage + private lateinit var fileLocator: FileLocator private const val DOWNLOAD_PREFIX = "downloading_" private const val DOWNLOAD_SUFFIX = ".temp" /** * 下载文件到指定路径 - * @param path 文件源路径 * @param digest 文件名 * @param credentials 存储实例 * @param filePath 下载目标路径 * */ - fun download( - path: String, + fun downloadUseLocalPath( digest: String, range: Range = Range.FULL_RANGE, credentials: StorageCredentials, filePath: Path, ) { + val path = fileLocator.locate(digest) val dir = credentials.upload.localPath val fileName = StringPool.randomStringByLongValue(DOWNLOAD_PREFIX, DOWNLOAD_SUFFIX) val tempFile = FileSystemClient(dir).touch("", fileName) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt index 7c3e0d6eb5..0c84259ace 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeCompressedJob.kt @@ -11,6 +11,7 @@ import com.tencent.bkrepo.job.batch.utils.RepositoryCommonUtils import com.tencent.bkrepo.job.config.properties.NodeCompressedJobProperties import com.tencent.bkrepo.repository.api.NodeClient import com.tencent.bkrepo.repository.pojo.node.service.NodeCompressedRequest +import org.slf4j.LoggerFactory import java.time.Duration import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.mongodb.core.query.Criteria @@ -53,6 +54,14 @@ class NodeCompressedJob( override fun run(row: CompressFile, collectionName: String, context: NodeContext) { with(row) { + val storageCredentials = storageCredentialsKey?.let { + RepositoryCommonUtils.getStorageCredentials(storageCredentialsKey) + } + val bdFileName = "$sha256.bd" + if (!storageService.exist(bdFileName, storageCredentials)) { + logger.warn("Miss file $bdFileName.") + return + } listNode(sha256, storageCredentialsKey).forEach { val compressedRequest = NodeCompressedRequest( projectId = it.projectId, @@ -62,9 +71,6 @@ class NodeCompressedJob( ) nodeClient.compressedNode(compressedRequest) } - val storageCredentials = storageCredentialsKey?.let { - RepositoryCommonUtils.getStorageCredentials(storageCredentialsKey) - } storageService.delete(sha256, storageCredentials) val request = CompleteCompressRequest(sha256, storageCredentialsKey, lastModifiedBy) archiveClient.completeCompress(request) @@ -101,4 +107,8 @@ class NodeCompressedJob( ) return NodeCommonUtils.findNodes(query, storageCredentialsKey) } + + companion object { + private val logger = LoggerFactory.getLogger(NodeCompressedJob::class.java) + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt index 95d0e205be..d1ef67e486 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/NodeUncompressedJob.kt @@ -3,12 +3,15 @@ package com.tencent.bkrepo.job.batch import com.tencent.bkrepo.archive.CompressStatus import com.tencent.bkrepo.archive.api.ArchiveClient import com.tencent.bkrepo.archive.request.DeleteCompressRequest +import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.job.batch.base.MongoDbBatchJob import com.tencent.bkrepo.job.batch.context.NodeContext import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.batch.utils.RepositoryCommonUtils import com.tencent.bkrepo.job.config.properties.NodeUncompressedJobProperties import com.tencent.bkrepo.repository.api.NodeClient import com.tencent.bkrepo.repository.pojo.node.service.NodeUnCompressedRequest +import org.slf4j.LoggerFactory import java.time.Duration import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.mongodb.core.query.Criteria @@ -31,6 +34,7 @@ class NodeUncompressedJob( properties: NodeUncompressedJobProperties, val nodeClient: NodeClient, val archiveClient: ArchiveClient, + val storageService: StorageService, ) : MongoDbBatchJob(properties) { override fun createJobContext(): NodeContext { @@ -49,6 +53,13 @@ class NodeUncompressedJob( override fun run(row: CompressFile, collectionName: String, context: NodeContext) { with(row) { + val storageCredentials = storageCredentialsKey?.let { + RepositoryCommonUtils.getStorageCredentials(storageCredentialsKey) + } + if (!storageService.exist(sha256, storageCredentials)) { + logger.warn("Miss file $sha256.") + return + } listNode(sha256, storageCredentialsKey).forEach { val compressedRequest = NodeUnCompressedRequest( projectId = it.projectId, @@ -93,4 +104,8 @@ class NodeUncompressedJob( ) return NodeCommonUtils.findNodes(query, storageCredentialsKey) } + + companion object { + private val logger = LoggerFactory.getLogger(NodeUncompressedJob::class.java) + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt index 3ca3cf40d4..973b9de085 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/SystemGcJob.kt @@ -10,6 +10,7 @@ import com.tencent.bkrepo.common.mongo.constant.MIN_OBJECT_ID import com.tencent.bkrepo.common.mongo.dao.util.sharding.HashShardingUtils import com.tencent.bkrepo.common.service.exception.RemoteErrorCodeException import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.innercos.retry import com.tencent.bkrepo.fs.server.constant.FAKE_SHA256 import com.tencent.bkrepo.job.SHARDING_COUNT import com.tencent.bkrepo.job.batch.base.DefaultContextJob @@ -207,10 +208,10 @@ class SystemGcJob( } } } - return if (gcable) { - if (sampleNode != null) { - gcNodes.remove(sampleNode) - } + if (sampleNode != null) { + gcNodes.remove(sampleNode) + } + return if (gcable && gcNodes.size > 0) { Pair(gcNodes, newest) } else { null @@ -298,10 +299,16 @@ class SystemGcJob( storageCredentialsKey = storageCredentials?.key, ) return try { - archiveClient.compress(compressedRequest) - 1 + retry(RETRY_TIMES, ignoreExceptions = listOf(RemoteErrorCodeException::class.java)) { + archiveClient.compress(compressedRequest) + 1 + } } catch (ignore: RemoteErrorCodeException) { 0 + } catch (e: Exception) { + // 发送请求失败 + logger.error("Sending request failed", e) + 0 } } } @@ -338,5 +345,6 @@ class SystemGcJob( private const val SIZE_RATIO = 0.5 private val HAMMING_DISTANCE_INSTANCE = HammingDistance() private const val MIN_SAMPLING_GROUP_SIZE = 5 + private const val RETRY_TIMES = 3 } }