Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap Performance Improvement[PR2] : Adding Network Protocol Classes For File Copy And Implementation For Handling Async File Copy Termination #2969

Merged
merged 16 commits into from
Dec 20, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void initiateBootstrap(ReplicaId replicaId) {

@Override
public void initiateFileCopy(ReplicaId replicaId) {
//To BE Filled.
//To Be Added With File Copy Protocol
}

@Override
Expand Down Expand Up @@ -108,7 +108,7 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep

@Override
public void waitForFileCopyCompleted(String partitionName) throws InterruptedException {
//To Be Filled
//To Be Added With File Copy Protocol
}

@Override
Expand Down Expand Up @@ -204,7 +204,7 @@ public void onBootstrapComplete(ReplicaId replicaId) {

@Override
public void onFileCopyComplete(ReplicaId replicaId) {
// To Be Filled
//To Be Added With File Copy Protocol
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


public class FileCopyGetChunkRequest extends RequestOrResponse{
private PartitionId partitionId;
private String fileName;
private long startOffset;
private long chunkLengthInBytes;
private static final short File_Chunk_Request_Version_V1 = 1;
private static final int File_Name_Size_In_Bytes = 4;


public FileCopyGetChunkRequest( short versionId, int correlationId,
String clientId, PartitionId partitionId, String fileName, long startOffset, long sizeInBytes) {
super(RequestOrResponseType.FileCopyGetChunkRequest, versionId, correlationId, clientId);
if(partitionId == null || fileName.isEmpty() || startOffset < 0 || sizeInBytes < 0){
throw new IllegalArgumentException("PartitionId, FileName, StartOffset or SizeInBytes cannot be null or negative");
}
this.partitionId = partitionId;
this.fileName = fileName;
this.startOffset = startOffset;
this.chunkLengthInBytes = sizeInBytes;
}

public static FileCopyGetChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap)
throws IOException {
Short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
String fileName = Utils.readIntString(stream);
long startOffset = stream.readLong();
long sizeInBytes = stream.readLong();
return new FileCopyGetChunkRequest(versionId, correlationId, clientId, partitionId, fileName, startOffset, sizeInBytes);
}

protected void prepareBuffer(){
super.prepareBuffer();
bufferToSend.writeBytes(partitionId.getBytes());
Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset());
bufferToSend.writeLong(startOffset);
bufferToSend.writeLong(chunkLengthInBytes);
}

public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("FileCopyProtocolGetChunkRequest[")
.append("PartitionId=").append(partitionId)
.append(", FileName=").append(fileName)
.append(", StartOffset=").append(startOffset)
.append(", SizeInBytes=").append(chunkLengthInBytes)
.append("]");
return sb.toString();
}

public long sizeInBytes() {
return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES;
}

public PartitionId getPartitionId() {
return partitionId;
}
public String getFileName() {
return fileName;
}
public long getStartOffset() {
return startOffset;
}
public long getChunkLengthInBytes() {
return chunkLengthInBytes;
}

static void validateVersion(short version){
if (version != File_Chunk_Request_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version);
aga9900 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;

public class FileCopyGetMetaDataRequest extends RequestOrResponse{
private PartitionId partitionId;
private String hostName;
private static final short File_Metadata_Request_Version_V1 = 1;
private static final int HostName_Field_Size_In_Bytes = 4;
aga9900 marked this conversation as resolved.
Show resolved Hide resolved

public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId,
PartitionId partitionId, String hostName) {
super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId);
if (partitionId == null) {
throw new IllegalArgumentException("Partition cannot be null");
}
if (hostName.isEmpty()){
throw new IllegalArgumentException("Host Name cannot be null");
}
this.partitionId = partitionId;
this.hostName = hostName;
}

public String getHostName() {
return hostName;
}

public PartitionId getPartitionId() {
return partitionId;
}

protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException {
Short versionId = stream.readShort();
validateVersion(versionId);
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
String hostName = Utils.readIntString(stream);
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName);
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName)
.append("]");
return sb.toString();
}

public long sizeInBytes() {
return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length;
}

protected void prepareBuffer() {
super.prepareBuffer();
Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset());
bufferToSend.writeBytes(partitionId.getBytes());
}

static void validateVersion(short version) {
if (version != File_Metadata_Request_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version);
}
}
}
aga9900 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;


public class FileCopyGetMetaDataResponse extends Response {
private final int numberOfLogfiles;
private final List<LogInfo> logInfos;
private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1;

public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles,
List<LogInfo> logInfos, ServerErrorCode errorCode) {
super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode);
validateVersion(versionId);
this.numberOfLogfiles = numberOfLogfiles;
this.logInfos = logInfos;
}

public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException {
RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()];
if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) {
throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" +
RequestOrResponseType.FileCopyGetMetaDataResponse + type);
}
short versionId = stream.readShort();
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()];

if(errorCode != ServerErrorCode.No_Error) {
//Setting the number of logfiles to 0 as there are no logfiles to be read.
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, 0, new ArrayList<>(), errorCode);
}

int numberOfLogfiles = stream.readInt();
List<LogInfo> logInfos = new ArrayList<>();
for (int i = 0; i < numberOfLogfiles; i++) {
logInfos.add(LogInfo.readFrom(stream));
}
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfos, errorCode);
}
protected void prepareBuffer() {
super.prepareBuffer();
bufferToSend.writeInt(numberOfLogfiles);
for (LogInfo logInfo : logInfos) {
logInfo.writeTo(bufferToSend);
}
}

public long sizeInBytes() {
return super.sizeInBytes() + Integer.BYTES + logInfos.stream().mapToLong(LogInfo::sizeInBytes).sum();
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append(
logInfos.toString()).append("]");
return sb.toString();
}

public int getNumberOfLogfiles() {
return numberOfLogfiles;
}

public List<LogInfo> getLogInfos() {
return logInfos;
}

static void validateVersion(short version) {
if (version != File_Copy_Protocol_Metadata_Response_Version_V1) {
throw new IllegalArgumentException("Unknown version for FileCopyProtocolMetaDataResponse: " + version);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.protocol;

import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.Charset;


/**
* Contains the fileName and fileSizeInBytes for a local partition. This is used
* by LogInfo as part of filecopy metadata request.
*/
public class FileInfo {
private String fileName;
private long fileSizeInBytes;

private static final int FileName_Field_Size_In_Bytes = 4;

private static final int FileSize_Field_Size_In_Bytes = 8;


public FileInfo(String fileName, long fileSize) {
this.fileName = fileName;
this.fileSizeInBytes = fileSize;
}

public long sizeInBytes() {
return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes;
}
public static FileInfo readFrom(DataInputStream stream) throws IOException {
String fileName = Utils.readIntString(stream);
long fileSize = stream.readLong();
return new FileInfo(fileName, fileSize);
}
public void writeTo(ByteBuf buf) {
Utils.serializeString(buf, fileName, Charset.defaultCharset());
buf.writeLong(fileSizeInBytes);
}

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes)
.append("]");
return sb.toString();
}

public String getFileName() {
return fileName;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}
}
Loading
Loading