-
Notifications
You must be signed in to change notification settings - Fork 275
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bootstrap Performance Improvement[PR2] : Adding Network Protocol Clas…
…ses For File Copy And Implementation For Handling Async File Copy Termination (#2969) * Adding File Based Replication Protocol Changes * Updating protocol changes * File Copy Protocol Changes * Adding File Copy Related Method For File Copy Sync Up * Adding File Copy Related Method For File Copy Sync Up * Removing Code Changes For Replica Sync Up Manager * Adding Liscense Info To New Classes * Adding Liscense Info To New Classes * Setting Up Indentation * Fixing test Cases * Adding Review Changes * Updating review comments * Updating Review Comments * Updating review comments * Updating Test Cases * making review Changes
- Loading branch information
Showing
8 changed files
with
632 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/** | ||
* Copyright 2024 LinkedIn Corp. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
*/ | ||
package com.github.ambry.protocol; | ||
|
||
import com.github.ambry.clustermap.ClusterMap; | ||
import com.github.ambry.clustermap.PartitionId; | ||
import com.github.ambry.utils.Utils; | ||
import java.io.DataInputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
|
||
|
||
public class FileCopyGetChunkRequest extends RequestOrResponse{ | ||
private PartitionId partitionId; | ||
private String fileName; | ||
private long startOffset; | ||
private long chunkLengthInBytes; | ||
private static final short File_Chunk_Request_Version_V1 = 1; | ||
private static final int File_Name_Size_In_Bytes = 4; | ||
|
||
|
||
public FileCopyGetChunkRequest( short versionId, int correlationId, | ||
String clientId, PartitionId partitionId, String fileName, long startOffset, long sizeInBytes) { | ||
super(RequestOrResponseType.FileCopyGetChunkRequest, versionId, correlationId, clientId); | ||
if(partitionId == null || fileName.isEmpty() || startOffset < 0 || sizeInBytes < 0){ | ||
throw new IllegalArgumentException("PartitionId, FileName, StartOffset or SizeInBytes cannot be null or negative"); | ||
} | ||
this.partitionId = partitionId; | ||
this.fileName = fileName; | ||
this.startOffset = startOffset; | ||
this.chunkLengthInBytes = sizeInBytes; | ||
} | ||
|
||
public static FileCopyGetChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap) | ||
throws IOException { | ||
Short versionId = stream.readShort(); | ||
validateVersion(versionId); | ||
int correlationId = stream.readInt(); | ||
String clientId = Utils.readIntString(stream); | ||
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); | ||
String fileName = Utils.readIntString(stream); | ||
long startOffset = stream.readLong(); | ||
long sizeInBytes = stream.readLong(); | ||
return new FileCopyGetChunkRequest(versionId, correlationId, clientId, partitionId, fileName, startOffset, sizeInBytes); | ||
} | ||
|
||
protected void prepareBuffer(){ | ||
super.prepareBuffer(); | ||
bufferToSend.writeBytes(partitionId.getBytes()); | ||
Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); | ||
bufferToSend.writeLong(startOffset); | ||
bufferToSend.writeLong(chunkLengthInBytes); | ||
} | ||
|
||
public String toString(){ | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("FileCopyProtocolGetChunkRequest[") | ||
.append("PartitionId=").append(partitionId) | ||
.append(", FileName=").append(fileName) | ||
.append(", StartOffset=").append(startOffset) | ||
.append(", SizeInBytes=").append(chunkLengthInBytes) | ||
.append("]"); | ||
return sb.toString(); | ||
} | ||
|
||
public long sizeInBytes() { | ||
return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES; | ||
} | ||
|
||
public PartitionId getPartitionId() { | ||
return partitionId; | ||
} | ||
public String getFileName() { | ||
return fileName; | ||
} | ||
public long getStartOffset() { | ||
return startOffset; | ||
} | ||
public long getChunkLengthInBytes() { | ||
return chunkLengthInBytes; | ||
} | ||
|
||
static void validateVersion(short version){ | ||
if (version != File_Chunk_Request_Version_V1) { | ||
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); | ||
} | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/** | ||
* Copyright 2024 LinkedIn Corp. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
*/ | ||
package com.github.ambry.protocol; | ||
|
||
import com.github.ambry.clustermap.ClusterMap; | ||
import com.github.ambry.clustermap.PartitionId; | ||
import com.github.ambry.utils.Utils; | ||
import java.io.DataInputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
|
||
public class FileCopyGetMetaDataRequest extends RequestOrResponse{ | ||
private PartitionId partitionId; | ||
private String hostName; | ||
private static final short File_Metadata_Request_Version_V1 = 1; | ||
private static final int HostName_Field_Size_In_Bytes = 4; | ||
|
||
public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId, | ||
PartitionId partitionId, String hostName) { | ||
super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId); | ||
if (partitionId == null) { | ||
throw new IllegalArgumentException("Partition cannot be null"); | ||
} | ||
if (hostName.isEmpty()){ | ||
throw new IllegalArgumentException("Host Name cannot be null"); | ||
} | ||
this.partitionId = partitionId; | ||
this.hostName = hostName; | ||
} | ||
|
||
public String getHostName() { | ||
return hostName; | ||
} | ||
|
||
public PartitionId getPartitionId() { | ||
return partitionId; | ||
} | ||
|
||
protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { | ||
Short versionId = stream.readShort(); | ||
validateVersion(versionId); | ||
int correlationId = stream.readInt(); | ||
String clientId = Utils.readIntString(stream); | ||
String hostName = Utils.readIntString(stream); | ||
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); | ||
return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); | ||
} | ||
|
||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) | ||
.append("]"); | ||
return sb.toString(); | ||
} | ||
|
||
public long sizeInBytes() { | ||
return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length; | ||
} | ||
|
||
protected void prepareBuffer() { | ||
super.prepareBuffer(); | ||
Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); | ||
bufferToSend.writeBytes(partitionId.getBytes()); | ||
} | ||
|
||
static void validateVersion(short version) { | ||
if (version != File_Metadata_Request_Version_V1) { | ||
throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); | ||
} | ||
} | ||
} |
93 changes: 93 additions & 0 deletions
93
ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/** | ||
* Copyright 2024 LinkedIn Corp. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
*/ | ||
package com.github.ambry.protocol; | ||
|
||
import com.github.ambry.server.ServerErrorCode; | ||
import com.github.ambry.utils.Utils; | ||
import java.io.DataInputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
|
||
public class FileCopyGetMetaDataResponse extends Response { | ||
private final int numberOfLogfiles; | ||
private final List<LogInfo> logInfos; | ||
private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; | ||
|
||
public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, | ||
List<LogInfo> logInfos, ServerErrorCode errorCode) { | ||
super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode); | ||
validateVersion(versionId); | ||
this.numberOfLogfiles = numberOfLogfiles; | ||
this.logInfos = logInfos; | ||
} | ||
|
||
public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException { | ||
RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; | ||
if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) { | ||
throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" + | ||
RequestOrResponseType.FileCopyGetMetaDataResponse + type); | ||
} | ||
short versionId = stream.readShort(); | ||
int correlationId = stream.readInt(); | ||
String clientId = Utils.readIntString(stream); | ||
ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()]; | ||
|
||
if(errorCode != ServerErrorCode.No_Error) { | ||
//Setting the number of logfiles to 0 as there are no logfiles to be read. | ||
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, 0, new ArrayList<>(), errorCode); | ||
} | ||
|
||
int numberOfLogfiles = stream.readInt(); | ||
List<LogInfo> logInfos = new ArrayList<>(); | ||
for (int i = 0; i < numberOfLogfiles; i++) { | ||
logInfos.add(LogInfo.readFrom(stream)); | ||
} | ||
return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfos, errorCode); | ||
} | ||
protected void prepareBuffer() { | ||
super.prepareBuffer(); | ||
bufferToSend.writeInt(numberOfLogfiles); | ||
for (LogInfo logInfo : logInfos) { | ||
logInfo.writeTo(bufferToSend); | ||
} | ||
} | ||
|
||
public long sizeInBytes() { | ||
return super.sizeInBytes() + Integer.BYTES + logInfos.stream().mapToLong(LogInfo::sizeInBytes).sum(); | ||
} | ||
|
||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append( | ||
logInfos.toString()).append("]"); | ||
return sb.toString(); | ||
} | ||
|
||
public int getNumberOfLogfiles() { | ||
return numberOfLogfiles; | ||
} | ||
|
||
public List<LogInfo> getLogInfos() { | ||
return logInfos; | ||
} | ||
|
||
static void validateVersion(short version) { | ||
if (version != File_Copy_Protocol_Metadata_Response_Version_V1) { | ||
throw new IllegalArgumentException("Unknown version for FileCopyProtocolMetaDataResponse: " + version); | ||
} | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/** | ||
* Copyright 2024 LinkedIn Corp. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
*/ | ||
package com.github.ambry.protocol; | ||
|
||
import com.github.ambry.utils.Utils; | ||
import io.netty.buffer.ByteBuf; | ||
import java.io.DataInputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
|
||
|
||
/** | ||
* Contains the fileName and fileSizeInBytes for a local partition. This is used | ||
* by LogInfo as part of filecopy metadata request. | ||
*/ | ||
public class FileInfo { | ||
private String fileName; | ||
private long fileSizeInBytes; | ||
|
||
private static final int FileName_Field_Size_In_Bytes = 4; | ||
|
||
private static final int FileSize_Field_Size_In_Bytes = 8; | ||
|
||
|
||
public FileInfo(String fileName, long fileSize) { | ||
this.fileName = fileName; | ||
this.fileSizeInBytes = fileSize; | ||
} | ||
|
||
public long sizeInBytes() { | ||
return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; | ||
} | ||
public static FileInfo readFrom(DataInputStream stream) throws IOException { | ||
String fileName = Utils.readIntString(stream); | ||
long fileSize = stream.readLong(); | ||
return new FileInfo(fileName, fileSize); | ||
} | ||
public void writeTo(ByteBuf buf) { | ||
Utils.serializeString(buf, fileName, Charset.defaultCharset()); | ||
buf.writeLong(fileSizeInBytes); | ||
} | ||
|
||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes) | ||
.append("]"); | ||
return sb.toString(); | ||
} | ||
|
||
public String getFileName() { | ||
return fileName; | ||
} | ||
|
||
public long getFileSizeInBytes() { | ||
return fileSizeInBytes; | ||
} | ||
} |
Oops, something went wrong.