diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java index 5046a9d786..d9de3e9902 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java @@ -65,7 +65,7 @@ public void initiateBootstrap(ReplicaId replicaId) { @Override public void initiateFileCopy(ReplicaId replicaId) { - //To BE Filled. + //To Be Added With File Copy Protocol } @Override @@ -108,7 +108,7 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep @Override public void waitForFileCopyCompleted(String partitionName) throws InterruptedException { - //To Be Filled + //To Be Added With File Copy Protocol } @Override @@ -204,7 +204,7 @@ public void onBootstrapComplete(ReplicaId replicaId) { @Override public void onFileCopyComplete(ReplicaId replicaId) { - // To Be Filled + //To Be Added With File Copy Protocol } @Override diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java new file mode 100644 index 0000000000..c9a75ffc8a --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetChunkRequest.java @@ -0,0 +1,99 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileCopyGetChunkRequest extends RequestOrResponse{ + private PartitionId partitionId; + private String fileName; + private long startOffset; + private long chunkLengthInBytes; + private static final short File_Chunk_Request_Version_V1 = 1; + private static final int File_Name_Size_In_Bytes = 4; + + + public FileCopyGetChunkRequest( short versionId, int correlationId, + String clientId, PartitionId partitionId, String fileName, long startOffset, long sizeInBytes) { + super(RequestOrResponseType.FileCopyGetChunkRequest, versionId, correlationId, clientId); + if(partitionId == null || fileName.isEmpty() || startOffset < 0 || sizeInBytes < 0){ + throw new IllegalArgumentException("PartitionId, FileName, StartOffset or SizeInBytes cannot be null or negative"); + } + this.partitionId = partitionId; + this.fileName = fileName; + this.startOffset = startOffset; + this.chunkLengthInBytes = sizeInBytes; + } + + public static FileCopyGetChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap) + throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + String fileName = Utils.readIntString(stream); + long startOffset = stream.readLong(); + long sizeInBytes = stream.readLong(); + return new FileCopyGetChunkRequest(versionId, correlationId, clientId, partitionId, fileName, startOffset, sizeInBytes); + } + + protected void prepareBuffer(){ + super.prepareBuffer(); + bufferToSend.writeBytes(partitionId.getBytes()); + Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); + bufferToSend.writeLong(startOffset); + bufferToSend.writeLong(chunkLengthInBytes); + } + + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("FileCopyProtocolGetChunkRequest[") + .append("PartitionId=").append(partitionId) + .append(", FileName=").append(fileName) + .append(", StartOffset=").append(startOffset) + .append(", SizeInBytes=").append(chunkLengthInBytes) + .append("]"); + return sb.toString(); + } + + public long sizeInBytes() { + return super.sizeInBytes() + partitionId.getBytes().length + File_Name_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES; + } + + public PartitionId getPartitionId() { + return partitionId; + } + public String getFileName() { + return fileName; + } + public long getStartOffset() { + return startOffset; + } + public long getChunkLengthInBytes() { + return chunkLengthInBytes; + } + + static void validateVersion(short version){ + if (version != File_Chunk_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java new file mode 100644 index 0000000000..ee87c3b236 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataRequest.java @@ -0,0 +1,82 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + +public class FileCopyGetMetaDataRequest extends RequestOrResponse{ + private PartitionId partitionId; + private String hostName; + private static final short File_Metadata_Request_Version_V1 = 1; + private static final int HostName_Field_Size_In_Bytes = 4; + + public FileCopyGetMetaDataRequest(short versionId, int correlationId, String clientId, + PartitionId partitionId, String hostName) { + super(RequestOrResponseType.FileCopyGetMetaDataRequest, versionId, correlationId, clientId); + if (partitionId == null) { + throw new IllegalArgumentException("Partition cannot be null"); + } + if (hostName.isEmpty()){ + throw new IllegalArgumentException("Host Name cannot be null"); + } + this.partitionId = partitionId; + this.hostName = hostName; + } + + public String getHostName() { + return hostName; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + protected static FileCopyGetMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + String hostName = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + return new FileCopyGetMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) + .append("]"); + return sb.toString(); + } + + public long sizeInBytes() { + return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length; + } + + protected void prepareBuffer() { + super.prepareBuffer(); + Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); + bufferToSend.writeBytes(partitionId.getBytes()); + } + + static void validateVersion(short version) { + if (version != File_Metadata_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); + } + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java new file mode 100644 index 0000000000..d8ec80fdeb --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileCopyGetMetaDataResponse.java @@ -0,0 +1,93 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.protocol; + +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + + +public class FileCopyGetMetaDataResponse extends Response { + private final int numberOfLogfiles; + private final List logInfos; + private static final short File_Copy_Protocol_Metadata_Response_Version_V1 = 1; + + public FileCopyGetMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, + List logInfos, ServerErrorCode errorCode) { + super(RequestOrResponseType.FileCopyGetMetaDataResponse, versionId, correlationId, clientId, errorCode); + validateVersion(versionId); + this.numberOfLogfiles = numberOfLogfiles; + this.logInfos = logInfos; + } + + public static FileCopyGetMetaDataResponse readFrom(DataInputStream stream) throws IOException { + RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; + if (type != RequestOrResponseType.FileCopyGetMetaDataResponse) { + throw new IllegalArgumentException("The type of request response is not compatible. Expected : {}, Actual : {}" + + RequestOrResponseType.FileCopyGetMetaDataResponse + type); + } + short versionId = stream.readShort(); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()]; + + if(errorCode != ServerErrorCode.No_Error) { + //Setting the number of logfiles to 0 as there are no logfiles to be read. + return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, 0, new ArrayList<>(), errorCode); + } + + int numberOfLogfiles = stream.readInt(); + List logInfos = new ArrayList<>(); + for (int i = 0; i < numberOfLogfiles; i++) { + logInfos.add(LogInfo.readFrom(stream)); + } + return new FileCopyGetMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfos, errorCode); + } + protected void prepareBuffer() { + super.prepareBuffer(); + bufferToSend.writeInt(numberOfLogfiles); + for (LogInfo logInfo : logInfos) { + logInfo.writeTo(bufferToSend); + } + } + + public long sizeInBytes() { + return super.sizeInBytes() + Integer.BYTES + logInfos.stream().mapToLong(LogInfo::sizeInBytes).sum(); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append( + logInfos.toString()).append("]"); + return sb.toString(); + } + + public int getNumberOfLogfiles() { + return numberOfLogfiles; + } + + public List getLogInfos() { + return logInfos; + } + + static void validateVersion(short version) { + if (version != File_Copy_Protocol_Metadata_Response_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileCopyProtocolMetaDataResponse: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java new file mode 100644 index 0000000000..020a8348f4 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -0,0 +1,68 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +/** + * Contains the fileName and fileSizeInBytes for a local partition. This is used + * by LogInfo as part of filecopy metadata request. + */ +public class FileInfo { + private String fileName; + private long fileSizeInBytes; + + private static final int FileName_Field_Size_In_Bytes = 4; + + private static final int FileSize_Field_Size_In_Bytes = 8; + + + public FileInfo(String fileName, long fileSize) { + this.fileName = fileName; + this.fileSizeInBytes = fileSize; + } + + public long sizeInBytes() { + return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; + } + public static FileInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream); + long fileSize = stream.readLong(); + return new FileInfo(fileName, fileSize); + } + public void writeTo(ByteBuf buf) { + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes) + .append("]"); + return sb.toString(); + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java new file mode 100644 index 0000000000..fb8d131a9c --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -0,0 +1,119 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + + +/** + * Contains the fileName, fileSizeInBytes, indexFiles and bloomFilters for a local partition. This is used + * by filecopy metadata request. + */ +public class LogInfo { + private String fileName; + private long fileSizeInBytes; + List indexFiles; + List bloomFilters; + + private static final int FileName_Field_Size_In_Bytes = 4; + private static final int FileSize_Field_Size_In_Bytes = 8; + + private static final int ListSize_In_Bytes = 4; + public LogInfo(String fileName, long fileSizeInBytes, List indexFiles, List bloomFilters) { + this.fileName = fileName; + this.fileSizeInBytes = fileSizeInBytes; + this.indexFiles = indexFiles; + this.bloomFilters = bloomFilters; + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public List getBloomFilters() { + return bloomFilters; + } + + public List getIndexFiles() { + return indexFiles; + } + + public long sizeInBytes() { + long size = FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes + ListSize_In_Bytes; + for (FileInfo fileInfo : indexFiles) { + size += fileInfo.sizeInBytes(); + } + size += ListSize_In_Bytes; + for (FileInfo fileInfo : bloomFilters) { + size += fileInfo.sizeInBytes(); + } + return size; + } + + public static LogInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream ); + long fileSize = stream.readLong(); + List listOfIndexFiles = new ArrayList<>(); + List listOfBloomFilters = new ArrayList<>(); + + int indexFilesCount = stream.readInt(); + for (int i = 0; i < indexFilesCount; i++) { + listOfIndexFiles.add(FileInfo.readFrom(stream)); + } + + int bloomFiltersCount = stream.readInt(); + for(int i= 0;i< bloomFiltersCount; i++){ + listOfBloomFilters.add(FileInfo.readFrom(stream)); + } + return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters); + } + + public void writeTo(ByteBuf buf){ + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + buf.writeInt(indexFiles.size()); + for(FileInfo fileInfo : indexFiles){ + fileInfo.writeTo(buf); + } + buf.writeInt(bloomFilters.size()); + for(FileInfo fileInfo: bloomFilters){ + fileInfo.writeTo(buf); + } + } + + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("LogInfo["); + sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); + for(FileInfo fileInfo : indexFiles) { + sb.append(fileInfo.toString()); + } + for(FileInfo fileInfo: bloomFilters){ + sb.append(fileInfo.toString()); + } + sb.append("]"); + return sb.toString(); + } + +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 115562bd9a..5d803a45eb 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -37,5 +37,9 @@ public enum RequestOrResponseType { PurgeRequest, PurgeResponse, BatchDeleteRequest, - BatchDeleteResponse + BatchDeleteResponse, + FileCopyGetChunkRequest, + FileCopyGetChunkResponse, + FileCopyGetMetaDataRequest, + FileCopyGetMetaDataResponse } diff --git a/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java b/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java index c416ec97bf..7bb4b1802e 100644 --- a/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java +++ b/ambry-protocol/src/test/java/com/github/ambry/protocol/RequestResponseTest.java @@ -65,6 +65,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mock; import static com.github.ambry.account.Account.*; import static com.github.ambry.account.Container.*; @@ -672,6 +673,168 @@ public void replicaMetadataRequestTest() throws IOException { MessageInfoAndMetadataListSerde.AUTO_VERSION = oldMessageInfoVersion; } + @Test + public void doFileCopyMetaDataRequestTest() throws IOException { + MockClusterMap clusterMap = new MockClusterMap(); + short requestVersionToUse = 1; + FileCopyGetMetaDataRequest request = + new FileCopyGetMetaDataRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), "host3"); + DataInputStream requestStream = serAndPrepForRead(request, -1, true); + FileCopyGetMetaDataRequest fileMetadataRequestFromBytes = + FileCopyGetMetaDataRequest.readFrom(requestStream, new MockClusterMap()); + Assert.assertEquals(fileMetadataRequestFromBytes.getHostName(), "host3"); + Assert.assertEquals(fileMetadataRequestFromBytes.getPartitionId().getId(), 0l); + Assert.assertEquals(fileMetadataRequestFromBytes.getPartitionId().toPathString(), "0"); + + request.release(); + + try{ + new FileCopyGetMetaDataRequest(requestVersionToUse, 111, "id1", + null, "host3"); + Assert.fail("Should have thrown exception"); + }catch (IllegalArgumentException e) { + //expected + } + + try{ + new FileCopyGetMetaDataRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), ""); + Assert.fail("Should have thrown exception"); + }catch (IllegalArgumentException e) { + //expected + } + } + + @Test + public void doFileInfoObjectParsingTest() throws IOException { + FileInfo fileInfo = new FileInfo("/tmp", 1000); + ByteBuf byteBuf = Unpooled.buffer(); + fileInfo.writeTo(byteBuf); + DataInputStream stream = new NettyByteBufDataInputStream(byteBuf); + FileInfo transformedFileInfo = FileInfo.readFrom(stream); + Assert.assertEquals(fileInfo.getFileName(), transformedFileInfo.getFileName()); + Assert.assertEquals(fileInfo.getFileSizeInBytes(), transformedFileInfo.getFileSizeInBytes()); + byteBuf.release(); + } + + @Test + public void doLogInfoParsingTest() throws IOException { + LogInfo logInfo = new LogInfo("0_index", 1000, + new ArrayList<>(Arrays.asList(new FileInfo("0_1_index", 1010))), + new ArrayList<>(Arrays.asList(new FileInfo("1_1_bloom", 1020)))); + + ByteBuf byteBuf = Unpooled.buffer(); + logInfo.writeTo(byteBuf); + DataInputStream stream = new NettyByteBufDataInputStream(byteBuf); + LogInfo transformedLogInfo = LogInfo.readFrom(stream); + Assert.assertEquals(logInfo.getFileName(), transformedLogInfo.getFileName()); + Assert.assertEquals(logInfo.getFileSizeInBytes(), transformedLogInfo.getFileSizeInBytes()); + byteBuf.release(); + } + + @Test + public void doFileCopyMetaDataResponseTest() throws IOException{ + short requestVersionToUse = 1; + LogInfo logInfo1 = new LogInfo("0_log", 1000, + new ArrayList<>(Arrays.asList(new FileInfo("0_1_index", 1010))), + new ArrayList<>(Arrays.asList(new FileInfo("0_1_bloom", 1020)))); + LogInfo logInfo2 = new LogInfo("1_log", 1050, + new ArrayList<>(Arrays.asList(new FileInfo("1_1_index", 1030))), + new ArrayList<>(Arrays.asList(new FileInfo("1_1_bloom", 1040)))); + List logInfoList = new ArrayList<>(Arrays.asList(logInfo1, logInfo2)); + FileCopyGetMetaDataResponse response = + new FileCopyGetMetaDataResponse(requestVersionToUse, 111, "id1", 2 , + logInfoList, ServerErrorCode.No_Error); + + DataInputStream requestStream = serAndPrepForRead(response, -1, false); + FileCopyGetMetaDataResponse fileCopyProtocolMetaDataResponseTranformed = + FileCopyGetMetaDataResponse.readFrom(requestStream); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getCorrelationId(), 111); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.versionId, 1); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getError(), ServerErrorCode.No_Error); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getNumberOfLogfiles(), 2); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().size(), 2); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getFileName(), "0_log"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getFileSizeInBytes(), 1000); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getIndexFiles().size(), 1); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getBloomFilters().size(), 1); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getFileName(), "1_log"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getFileSizeInBytes(), 1050); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getIndexFiles().size(), 1); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getBloomFilters().size(), 1); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getIndexFiles().get(0).getFileName(), "0_1_index"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getIndexFiles().get(0).getFileSizeInBytes(), 1010); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getBloomFilters().get(0).getFileName(), "0_1_bloom"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(0).getBloomFilters().get(0).getFileSizeInBytes(), 1020); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getIndexFiles().get(0).getFileName(), "1_1_index"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getIndexFiles().get(0).getFileSizeInBytes(), 1030); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getBloomFilters().get(0).getFileName(), "1_1_bloom"); + Assert.assertEquals(fileCopyProtocolMetaDataResponseTranformed.getLogInfos().get(1).getBloomFilters().get(0).getFileSizeInBytes(), 1040); + response.release(); + + + response = + new FileCopyGetMetaDataResponse(requestVersionToUse, 111, "id1", 2 , + logInfoList, ServerErrorCode.IO_Error); + DataInputStream requestStream1 = serAndPrepForRead(response, -1, false); + FileCopyGetMetaDataResponse fileCopyProtocolGetChunkResponse = + FileCopyGetMetaDataResponse.readFrom(requestStream1); + Assert.assertEquals(fileCopyProtocolGetChunkResponse.getError(), ServerErrorCode.IO_Error); + response.release(); + } + + @Test + public void doFileCopyChunkRequestTest() throws IOException{ + short requestVersionToUse = 1; + FileCopyGetChunkRequest request = + new FileCopyGetChunkRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), "file1", 1000, 100); + DataInputStream requestStream = serAndPrepForRead(request, -1, true); + FileCopyGetChunkRequest.readFrom(requestStream, new MockClusterMap()); + Assert.assertEquals(request.getFileName(), "file1"); + Assert.assertEquals(request.getChunkLengthInBytes(), 100); + Assert.assertEquals(request.getPartitionId().getId(), 0); + Assert.assertEquals(request.getPartitionId().toPathString(), "0"); + Assert.assertEquals(request.getCorrelationId(), 111); + Assert.assertEquals(request.getStartOffset(), 1000); + Assert.assertEquals(request.getVersionId(), requestVersionToUse); + request.release(); + + try{ + new FileCopyGetChunkRequest(requestVersionToUse, 111, "id1", + null, "file1", 1000, 0); + Assert.fail("Should have failed"); + } catch (IllegalArgumentException e){ + //expected + } + + try{ + new FileCopyGetChunkRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), "", 1000, 0); + Assert.fail("Should have failed"); + } catch (IllegalArgumentException e){ + //expected + } + + try{ + new FileCopyGetChunkRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), "file1", -1, 0); + Assert.fail("Should have failed"); + } catch (IllegalArgumentException e){ + //expected + } + + try{ + new FileCopyGetChunkRequest(requestVersionToUse, 111, "id1", + new MockPartitionId(), "file1", 1000, -1); + Assert.fail("Should have failed"); + } catch (IllegalArgumentException e){ + //expected + } + + } + private void doReplicaMetadataRequestTest(short responseVersionToUse, short requestVersionToUse, short messageInfoToUse, ReplicaType replicaType) throws IOException { MessageInfoAndMetadataListSerde.AUTO_VERSION = messageInfoToUse;