Skip to content

Commit

Permalink
feat: 当文件大小超过配置的clientMaxBodySize时,使用分块上传进行分发#1185 (#1187)
Browse files Browse the repository at this point in the history
* feat: 当文件大小超过配置的clientMaxBodySize时,使用分块上传进行分发#1185

* feat: 代码检查问题修复 #1185

* feat: 传入md5值 #1185

* feat: url参数修复 #1185

* feat: 修复OutOfMemoryError #1185

* feat: 读取限速配置 #1185

* feat: 打印调整 #1185

* feat: 打印调整 #1185

* feat: 打印调整 #1185
  • Loading branch information
zacYL authored Sep 21, 2023
1 parent 3bc6c37 commit 2616836
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ abstract class FileBlockSupport : CleanupSupport() {
}
}

override fun finishAppend(appendId: String, storageCredentials: StorageCredentials?): FileInfo {
override fun finishAppend(
appendId: String,
storageCredentials: StorageCredentials?,
fileInfo: FileInfo?
): FileInfo {
val credentials = getCredentialsOrDefault(storageCredentials)
val tempClient = getTempClient(credentials)
try {
val fileInfo = tempClient.load(CURRENT_PATH, appendId)?.let { storeMergedFile(it, credentials) }
val fileInfo = tempClient.load(CURRENT_PATH, appendId)?.let { storeMergedFile(it, credentials, fileInfo) }
?: throw IllegalArgumentException("Append file does not exist.")
tempClient.delete(CURRENT_PATH, appendId)
logger.info("Success to finish append file [$appendId], file info [$fileInfo]")
Expand Down Expand Up @@ -206,18 +210,27 @@ abstract class FileBlockSupport : CleanupSupport() {
}
}

private fun storeMergedFile(file: File, credentials: StorageCredentials): FileInfo {
val sha256 = file.sha256()
val md5 = file.md5()
/**
* 合并文件并返回对应FileInfo(sha256、md5、size)
* 当 fileInfo不为空时:避免当文件过大时生成 sha256 或者 md5 需要过长时间,信任传递进来的 sha256 和md5 值
* 当 fileInfo为空时,生成对应的 sha256 或者 md5
*/
private fun storeMergedFile(file: File, credentials: StorageCredentials, fileInfo: FileInfo? = null): FileInfo {
val size = file.length()
val fileInfo = FileInfo(sha256, md5, size)
val path = fileLocator.locate(sha256)
if (!doExist(path, sha256, credentials)) {
doStore(path, sha256, file.toArtifactFile(), credentials)
val realFileInfo = if (fileInfo == null) {
FileInfo(file.sha256(), file.md5(), size)
} else {
logger.info("File [$sha256] exist, skip store.")
if (fileInfo.size != size)
throw IllegalArgumentException("Merged file is broken!")
FileInfo(fileInfo.sha256, fileInfo.md5, size)
}
return fileInfo
val path = fileLocator.locate(realFileInfo.sha256)
if (!doExist(path, realFileInfo.sha256, credentials)) {
doStore(path, realFileInfo.sha256, file.toArtifactFile(), credentials)
} else {
logger.info("File [${realFileInfo.sha256}] exist, skip store.")
}
return realFileInfo
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ interface FileBlockOperation {
* 结束追加,存储并返回完整文件
* appendId: 文件追加Id
*/
fun finishAppend(appendId: String, storageCredentials: StorageCredentials?): FileInfo
fun finishAppend(appendId: String, storageCredentials: StorageCredentials?, fileInfo: FileInfo? = null): FileInfo

/**
* 创建分块存储目录,返回分块存储Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const val NODE_FULL_PATH = "fullPath"
const val SIZE = "size"
const val REPOSITORY_INFO = "repo"
const val SHA256 = "sha256"
const val MD5 = "md5"
const val FILE = "file"
const val STORAGE_KEY = "storageKey"
const val CHUNKED_UPLOAD = "chunkedUpload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,10 @@ data class ReplicationProperties(
* 分发任务调度服务器所需账户密码
*/
var dispatchUser: String? = null,
var dispatchPwd: String? = null
var dispatchPwd: String? = null,
/**
* 针对部分 client_max_body_size 大小限制,
* 导致超过该请求的文件无法使用普通上传
*/
var clientMaxBodySize: Long = 10 * 1024 * 1024 * 1024L
)
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,23 @@ class BlobReplicaController(
artifactFile: ArtifactFile,
@RequestParam sha256: String,
@RequestParam storageKey: String? = null,
@RequestParam size: Long? = null,
@RequestParam md5: String? = null,
@PathVariable uuid: String,
@PathVariable projectId: String,
@PathVariable repoName: String,
) {
logger.info("The file with sha256 [$sha256] will be finished with $uuid")
logger.info("The file (sha256 [$sha256], size [$size], md5 [$md5]) will be finished with $uuid")
val credentials = baseCacheHandler.credentialsCache.get(storageKey.orEmpty())
blobChunkedService.finishChunkedUpload(
projectId,
repoName,
credentials,
sha256,
artifactFile,
uuid
projectId = projectId,
repoName = repoName,
credentials = credentials,
sha256 = sha256,
artifactFile = artifactFile,
uuid = uuid,
size = size,
md5 = md5
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import com.tencent.bkrepo.common.artifact.exception.RepoNotFoundException
import com.tencent.bkrepo.common.artifact.exception.VersionNotFoundException
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.pojo.FileInfo
import com.tencent.bkrepo.replication.constant.MD5
import com.tencent.bkrepo.replication.constant.NODE_FULL_PATH
import com.tencent.bkrepo.replication.constant.SIZE
import com.tencent.bkrepo.repository.api.MetadataClient
import com.tencent.bkrepo.repository.api.NodeClient
import com.tencent.bkrepo.repository.api.PackageClient
import com.tencent.bkrepo.repository.api.ProjectClient
Expand All @@ -64,7 +65,6 @@ class LocalDataManager(
private val repositoryClient: RepositoryClient,
private val nodeClient: NodeClient,
private val packageClient: PackageClient,
private val metadataClient: MetadataClient,
private val storageService: StorageService
) {

Expand Down Expand Up @@ -183,9 +183,9 @@ class LocalDataManager(
projectId: String,
repoName: String,
sha256: String
): Long {
): FileInfo {
val queryModel = NodeQueryBuilder()
.select(NODE_FULL_PATH, SIZE)
.select(NODE_FULL_PATH, SIZE, MD5)
.projectId(projectId)
.repoName(repoName)
.sha256(sha256)
Expand All @@ -194,7 +194,11 @@ class LocalDataManager(
if (result == null || result.records.isEmpty()) {
throw NodeNotFoundException(sha256)
}
return result.records[0][SIZE].toString().toLong()
return FileInfo(
sha256 = sha256,
md5 = result.records[0][MD5].toString(),
size = result.records[0][SIZE].toString().toLong()
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class FilePushContext(
val context: ReplicaContext,
val sha256: String? = null,
val size: Long? = null,
val md5: String? = null,
val digest: String? = null,
val name: String,
val token: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ import com.tencent.bkrepo.common.api.constant.HttpHeaders
import com.tencent.bkrepo.common.api.constant.MediaTypes
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.artifact.stream.rateLimit
import com.tencent.bkrepo.common.artifact.util.http.UrlFormatter
import com.tencent.bkrepo.common.storage.pojo.FileInfo
import com.tencent.bkrepo.replication.config.ReplicationProperties
import com.tencent.bkrepo.replication.constant.CHUNKED_UPLOAD
import com.tencent.bkrepo.replication.constant.MD5
import com.tencent.bkrepo.replication.constant.REPOSITORY_INFO
import com.tencent.bkrepo.replication.constant.SHA256
import com.tencent.bkrepo.replication.constant.SIZE
Expand Down Expand Up @@ -77,7 +80,7 @@ abstract class ArtifactReplicationHandler(
val clusterUrl = context.cluster.url
val clusterName = context.cluster.name
logger.info(
"Will try to push $name file $sha256 " +
"Will try to push $name file $digest or $sha256 " +
"in repo ${context.localProjectId}|${context.localRepo} to remote cluster $clusterName."
)
logger.info("Will try to obtain uuid from remote cluster $clusterName for blob $name|$digest")
Expand All @@ -86,16 +89,15 @@ abstract class ArtifactReplicationHandler(
if (!sessionIdHandlerResult.isSuccess) {
return false
}
val (sha256, size) = getBlobSha256AndSize(filePushContext)
val fileInfo = getBlobFileInfo(filePushContext)
logger.info(
"Will try to push file with ${sessionIdHandlerResult.location} " +
"in chunked upload way to remote cluster $clusterUrl for blob $name|$sha256"
"in chunked upload way to remote cluster $clusterUrl for blob $name|${fileInfo.sha256}"
)
// 需要将大文件进行分块上传
var chunkedUploadResult = try {
processFileChunkUpload(
size = size,
sha256 = sha256,
fileInfo = fileInfo,
filePushContext = filePushContext,
location = buildRequestUrl(clusterUrl, sessionIdHandlerResult.location)
)
Expand All @@ -108,22 +110,21 @@ abstract class ArtifactReplicationHandler(
return false
}
chunkedUploadResult = processBlobUploadWithSingleChunk(
size = size,
sha256 = sha256,
fileInfo = fileInfo,
location = buildRequestUrl(clusterUrl, sessionIdHandlerResult.location),
filePushContext = filePushContext
)
}

if (!chunkedUploadResult.isSuccess) return false
logger.info(
"The file $name|$sha256 is pushed " +
"The file $name|${fileInfo.sha256} is pushed " +
"and will try to send a completed request with ${chunkedUploadResult.location}."
)
val sessionCloseHandlerResult = processSessionCloseHandler(
location = buildRequestUrl(clusterUrl, chunkedUploadResult.location),
filePushContext = filePushContext,
sha256 = sha256
fileInfo = fileInfo
)
return sessionCloseHandlerResult.isSuccess
}
Expand Down Expand Up @@ -173,33 +174,33 @@ abstract class ArtifactReplicationHandler(
* 上传file文件step2: patch分块上传
*/
private fun processFileChunkUpload(
size: Long,
sha256: String,
fileInfo: FileInfo,
location: String?,
filePushContext: FilePushContext
): DefaultHandlerResult? {
var startPosition: Long = 0
var chunkedHandlerResult: DefaultHandlerResult? = null
val (params, ignoredFailureCode) = buildChunkUploadRequestInfo(sha256, filePushContext)
while (startPosition < size) {
val offset = size - startPosition - replicationProperties.chunkedSize
val (params, ignoredFailureCode) = buildChunkUploadRequestInfo(fileInfo.sha256, filePushContext)
while (startPosition < fileInfo.size) {
val offset = fileInfo.size - startPosition - replicationProperties.chunkedSize
val byteCount: Long = if (offset < 0) {
(size - startPosition)
(fileInfo.size - startPosition)
} else {
replicationProperties.chunkedSize
}
val contentRange = "$startPosition-${startPosition + byteCount - 1}"
logger.info(
"${Thread.currentThread().name} start is $startPosition, " +
"size is $size, byteCount is $byteCount contentRange is $contentRange"
"size is ${fileInfo.size}, byteCount is $byteCount contentRange is $contentRange"
)
val range = Range(startPosition, startPosition + byteCount - 1, size)
val range = Range(startPosition, startPosition + byteCount - 1, fileInfo.size)
val input = localDataManager.loadInputStreamByRange(
sha256, range, filePushContext.context.localProjectId, filePushContext.context.localRepoName
fileInfo.sha256, range, filePushContext.context.localProjectId, filePushContext.context.localRepoName
)
val patchBody: RequestBody = RequestBody.create(
MediaTypes.APPLICATION_OCTET_STREAM.toMediaTypeOrNull(), input.readBytes()
val rateLimitInputStream = input.rateLimit(
replicationProperties.rateLimit.toBytes()
)
val patchBody: RequestBody = StreamRequestBody(rateLimitInputStream, byteCount)
val patchHeader = Headers.Builder()
.add(HttpHeaders.CONTENT_TYPE, MediaTypes.APPLICATION_OCTET_STREAM)
.add(HttpHeaders.CONTENT_RANGE, contentRange)
Expand All @@ -209,16 +210,17 @@ abstract class ArtifactReplicationHandler(
REPOSITORY_INFO,
"${filePushContext.context.localProjectId}|${filePushContext.context.localRepoName}"
)
.add(SHA256, sha256)
.add(SIZE, size.toString())
.add(SHA256, fileInfo.sha256)
.add(SIZE, fileInfo.size.toString())
.add(MD5, fileInfo.md5)
.build()
val property = RequestProperty(
requestBody = patchBody,
authorizationCode = filePushContext.token,
requestMethod = RequestMethod.PATCH,
headers = patchHeader,
requestUrl = location,
requestTag = buildRequestTag(filePushContext.context, sha256 + range, byteCount),
requestTag = buildRequestTag(filePushContext.context, fileInfo.sha256 + range, byteCount),
params = params
)
chunkedHandlerResult = DefaultHandler.process(
Expand Down Expand Up @@ -250,13 +252,13 @@ abstract class ArtifactReplicationHandler(
*/
private fun processSessionCloseHandler(
location: String?,
sha256: String,
fileInfo: FileInfo,
filePushContext: FilePushContext
): DefaultHandlerResult {
val putBody: RequestBody = RequestBody.create(
null, ByteString.EMPTY
)
val params = buildSessionCloseRequestParam(sha256, filePushContext)
val params = buildSessionCloseRequestParam(fileInfo, filePushContext)
val putHeader = Headers.Builder()
.add(HttpHeaders.CONTENT_TYPE, MediaTypes.APPLICATION_OCTET_STREAM)
.add(HttpHeaders.CONTENT_LENGTH, "0")
Expand All @@ -278,7 +280,7 @@ abstract class ArtifactReplicationHandler(
}

open fun buildSessionCloseRequestParam(
sha256: String,
fileInfo: FileInfo,
filePushContext: FilePushContext
) : String {
return StringPool.EMPTY
Expand All @@ -290,23 +292,28 @@ abstract class ArtifactReplicationHandler(
* 针对部分registry不支持将blob分成多块上传,将blob文件整块上传
*/
private fun processBlobUploadWithSingleChunk(
size: Long,
sha256: String,
fileInfo: FileInfo,
location: String?,
filePushContext: FilePushContext
): DefaultHandlerResult {
with(filePushContext) {
logger.info("Will upload blob $sha256 in a single patch request")
val params = buildBlobUploadWithSingleChunkRequestParam(sha256, filePushContext)
logger.info("Will upload blob ${fileInfo.sha256} in a single patch request")
val params = buildBlobUploadWithSingleChunkRequestParam(fileInfo.sha256, filePushContext)
val inputStream = localDataManager.loadInputStream(
fileInfo.sha256, fileInfo.size, context.localProjectId, context.localRepoName
)
val rateLimitInputStream = inputStream.rateLimit(
replicationProperties.rateLimit.toBytes()
)
val patchBody = StreamRequestBody(
localDataManager.loadInputStream(sha256, size, context.localProjectId, context.localRepoName),
size
rateLimitInputStream,
fileInfo.size
)
val patchHeader = Headers.Builder()
.add(HttpHeaders.CONTENT_TYPE, MediaTypes.APPLICATION_OCTET_STREAM)
.add(HttpHeaders.CONTENT_RANGE, "0-${0 + size - 1}")
.add(HttpHeaders.CONTENT_RANGE, "0-${0 + fileInfo.size - 1}")
.add(REPOSITORY_INFO, "${context.localProjectId}|${context.localRepoName}")
.add(SHA256, sha256)
.add(SHA256, fileInfo.sha256)
.add(HttpHeaders.CONTENT_LENGTH, "$size")
.add(CHUNKED_UPLOAD, CHUNKED_UPLOAD)
.build()
Expand All @@ -317,7 +324,7 @@ abstract class ArtifactReplicationHandler(
headers = patchHeader,
requestUrl = location,
params = params,
requestTag = buildRequestTag(context, sha256, size)
requestTag = buildRequestTag(context, fileInfo.sha256, fileInfo.size)
)
return DefaultHandler.process(
httpClient = httpClient,
Expand All @@ -336,7 +343,7 @@ abstract class ArtifactReplicationHandler(



abstract fun getBlobSha256AndSize(filePushContext: FilePushContext): Pair<String, Long>
abstract fun getBlobFileInfo(filePushContext: FilePushContext): FileInfo

/**
* 获取上传blob的location
Expand Down
Loading

0 comments on commit 2616836

Please sign in to comment.