From be9bc6a972c73617f29bd3348d3cbd3cf3b63c94 Mon Sep 17 00:00:00 2001 From: zacYL <100330102+zacYL@users.noreply.github.com> Date: Wed, 27 Dec 2023 09:45:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=9D=97=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=A0=A1=E9=AA=8Crange=20#1552=20(#1570)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 分块上传校验range #1552 * feat: 代码调整 #1552 * feat: 代码调整 #1552 * feat: 注释调整 #1552 * feat: 注释调整 #1552 --- .../common/storage/core/FileBlockSupport.kt | 11 ++ .../core/operation/FileBlockOperation.kt | 5 + .../storage/filesystem/FileSystemClient.kt | 13 ++- .../service/impl/BlobChunkedServiceImpl.kt | 102 ++++++++++++++---- 4 files changed, 108 insertions(+), 23 deletions(-) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileBlockSupport.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileBlockSupport.kt index 5f535916f8..fb8d124b91 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileBlockSupport.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileBlockSupport.kt @@ -63,6 +63,17 @@ abstract class FileBlockSupport : CleanupSupport() { } } + override fun findLengthOfAppendFile(appendId: String, storageCredentials: StorageCredentials?): Long { + val credentials = getCredentialsOrDefault(storageCredentials) + val tempClient = getTempClient(credentials) + try { + return tempClient.length(CURRENT_PATH, appendId) + } catch (exception: Exception) { + logger.error("Failed to read length of id [$appendId] on [${credentials.key}]", exception) + throw StorageErrorException(StorageMessageCode.STORE_ERROR) + } + } + override fun append(appendId: String, artifactFile: ArtifactFile, storageCredentials: StorageCredentials?): Long { val credentials = getCredentialsOrDefault(storageCredentials) val tempClient = getTempClient(credentials) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/operation/FileBlockOperation.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/operation/FileBlockOperation.kt index d18ad8bcef..f4fb936e1e 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/operation/FileBlockOperation.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/operation/FileBlockOperation.kt @@ -47,6 +47,11 @@ interface FileBlockOperation { */ fun createAppendId(storageCredentials: StorageCredentials?): String + /** + * 查询追加文件长度 + */ + fun findLengthOfAppendFile(appendId: String, storageCredentials: StorageCredentials?): Long + /** * 追加文件,返回当前文件长度 * appendId: 文件追加Id diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt index 8ada37d609..8b8769871e 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt @@ -35,10 +35,10 @@ import java.io.IOException import java.io.InputStream import java.nio.channels.FileChannel import java.nio.channels.ReadableByteChannel +import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths -import java.nio.file.FileAlreadyExistsException /** * 本地文件存储客户端 @@ -297,6 +297,17 @@ class FileSystemClient(private val root: String) { } } + /** + * 获取文件大小 + */ + fun length(dir: String, filename: String): Long { + val filePath = Paths.get(this.root, dir, filename) + if (!Files.isRegularFile(filePath)) { + throw IllegalArgumentException("[$filePath] is not a regular file.") + } + return Files.size(filePath) + } + private fun transfer(input: ReadableByteChannel, output: FileChannel, size: Long, append: Boolean = false) { val startPosition: Long = if (append) output.size() else 0L var bytesCopied: Long diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/BlobChunkedServiceImpl.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/BlobChunkedServiceImpl.kt index ad0d35ace6..86e4e34786 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/BlobChunkedServiceImpl.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/BlobChunkedServiceImpl.kt @@ -33,6 +33,7 @@ import com.tencent.bkrepo.common.artifact.api.ArtifactFile import com.tencent.bkrepo.common.service.util.HttpContextHolder import com.tencent.bkrepo.common.storage.core.StorageService import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.common.storage.message.StorageErrorException import com.tencent.bkrepo.common.storage.pojo.FileInfo import com.tencent.bkrepo.replication.constant.BOLBS_UPLOAD_FIRST_STEP_URL_STRING import com.tencent.bkrepo.replication.exception.ReplicationMessageCode @@ -74,36 +75,45 @@ class BlobChunkedServiceImpl( ) { val range = HttpContextHolder.getRequest().getHeader("Content-Range") val length = HttpContextHolder.getRequest().contentLength - if (!range.isNullOrEmpty() && length > -1) { - logger.info("range $range, length $length, uuid $uuid") - val (start, end) = getRangeInfo(range) - // 判断要上传的长度是否超长 - if (end - start > length - 1) { - buildBlobUploadPatchResponse( + + val lengthOfAppendFile = storageService.findLengthOfAppendFile(uuid, credentials) + logger.info("current length of append file is $lengthOfAppendFile") + val (patchLen, status) = when (chunkedRequestCheck( uuid = uuid, - locationStr = buildLocationUrl(uuid, projectId, repoName), - response = HttpContextHolder.getResponse(), - range = length.toLong(), - status = HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE + lengthOfAppendFile = lengthOfAppendFile, + range = range, + contentLength = length + )) { + RangeStatus.ILLEGAL_RANGE -> { + Pair(length.toLong(), HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE) + } + RangeStatus.READY_TO_APPEND -> { + val patchLen = storageService.append( + appendId = uuid, + artifactFile = artifactFile, + storageCredentials = credentials + ) + logger.info( + "Part of file with sha256 $sha256 in repo $projectId|$repoName " + + "has been uploaded, size pf append file is $patchLen and uuid: $uuid" ) - return + Pair(patchLen, HttpStatus.ACCEPTED) + } + else -> { + logger.info( + "Part of file with sha256 $sha256 in repo $projectId|$repoName " + + "already appended, size pf append file is $lengthOfAppendFile and uuid: $uuid") + Pair(lengthOfAppendFile, HttpStatus.ACCEPTED) } } - val patchLen = storageService.append( - appendId = uuid, - artifactFile = artifactFile, - storageCredentials = credentials - ) - logger.info( - "Part of file with sha256 $sha256 in repo $projectId|$repoName " + - "has been uploaded, uploaded size is $patchLen uuid: $uuid," - ) buildBlobUploadPatchResponse( uuid = uuid, locationStr = buildLocationUrl(uuid, projectId, repoName), response = HttpContextHolder.getResponse(), - range = patchLen + range = patchLen, + status = status ) + } override fun finishChunkedUpload( @@ -123,7 +133,11 @@ class BlobChunkedServiceImpl( } else { null } - val fileInfo = storageService.finishAppend(uuid, credentials, originalFileInfo) + val fileInfo = try { + storageService.finishAppend(uuid, credentials, originalFileInfo) + } catch (e: StorageErrorException) { + throw BadRequestException(ReplicationMessageCode.REPLICA_ARTIFACT_BROKEN, sha256) + } logger.info( "The file with sha256 $sha256 in repo $projectId|$repoName has been uploaded with uuid: $uuid," + " received sha256 of file is ${fileInfo.sha256}") @@ -142,11 +156,55 @@ class BlobChunkedServiceImpl( ) } + private fun chunkedRequestCheck( + contentLength: Int, + range: String?, + uuid: String, + lengthOfAppendFile: Long + ): RangeStatus { + // 当range不存在或者length < 0时 + if (!validateValue(contentLength, range)) { + return RangeStatus.ILLEGAL_RANGE + } + logger.info("range $range, length $contentLength, uuid $uuid") + val (start, end) = getRangeInfo(range!!) + // 当上传的长度和range内容不匹配时 + return if ((end - start) != (contentLength - 1).toLong()) { + RangeStatus.ILLEGAL_RANGE + } else { + // 当追加的文件大小和range的起始大小一致时代表写入正常 + if (start == lengthOfAppendFile) { + RangeStatus.READY_TO_APPEND + } else if (start > lengthOfAppendFile) { + // 当追加的文件大小比start小时,说明文件写入有误 + RangeStatus.ILLEGAL_RANGE + } else { + // 当追加的文件大小==end+1时,可能存在重试导致已经写入一次 + if (lengthOfAppendFile == end + 1) { + RangeStatus.ALREADY_APPENDED + } else { + // 当追加的文件大小大于start时,并且不等于end+1时,文件已损坏 + RangeStatus.ILLEGAL_RANGE + } + } + } + } + + private fun validateValue(contentLength: Int, range: String?): Boolean { + return !(range.isNullOrEmpty() || contentLength < 0) + } + private fun buildLocationUrl(uuid: String, projectId: String, repoName: String) : String { val path = BOLBS_UPLOAD_FIRST_STEP_URL_STRING.format(projectId, repoName) return serviceName+path+uuid } + enum class RangeStatus { + ILLEGAL_RANGE, + ALREADY_APPENDED, + READY_TO_APPEND; + } + companion object { private val logger = LoggerFactory.getLogger(BlobChunkedServiceImpl::class.java) }