Skip to content

Commit

Permalink
include colidx in DFS packet packet header
Browse files Browse the repository at this point in the history
  • Loading branch information
jiajunmao committed Sep 2, 2024
1 parent cd4951a commit 6cbe0f4
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class DFSPacket {
private int traceParentsUsed;
private Span span;

// MLEC
private Integer colIdx;

/**
* Create a new packet.
*
Expand All @@ -100,6 +103,33 @@ public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
maxChunks = chunksPerPkt;
}

/**
* Create a new packet.
*
* @param buf the buffer storing data and checksums
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
* @param seqno the sequence number of this packet
* @param checksumSize the size of checksum
* @param lastPacketInBlock if this is the last packet
*/
public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
int checksumSize, boolean lastPacketInBlock, Integer colIdx) {
this.lastPacketInBlock = lastPacketInBlock;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
this.seqno = seqno;

this.buf = buf;
this.colIdx = colIdx;

checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
dataStart = checksumStart + (chunksPerPkt * checksumSize);
dataPos = dataStart;
maxChunks = chunksPerPkt;
}

/**
* Write data to this packet.
*
Expand Down Expand Up @@ -165,7 +195,7 @@ public synchronized void writeTo(DataOutputStream stm) throws IOException {
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;

PacketHeader header = new PacketHeader(
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock, colIdx);

if (checksumPos != dataStart) {
// Move the checksum to cover the gap. This can happen for the last
Expand Down Expand Up @@ -364,4 +394,8 @@ public void setSpan(Span span) {
public Span getSpan() {
return span;
}

private int getColIdx() {
return this.colIdx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,40 @@ public class PacketHeader {
public PacketHeader() {
}

public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock, Integer colIdx) {
this.packetLen = packetLen;
Preconditions.checkArgument(packetLen >= Ints.BYTES,
"packet len %s should always be at least 4 bytes",
packetLen);

PacketHeaderProto.Builder builder;
if (colIdx == null) {
builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen);
} else {
builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
.setColIdx(colIdx);
}

if (syncBlock) {
// Only set syncBlock if it is specified.
// This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
// because it changes the length of the packet header, and BlockReceiver
// in that version did not support variable-length headers.
builder.setSyncBlock(true);
}

proto = builder.build();
}

public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
Expand Down Expand Up @@ -117,6 +151,10 @@ public boolean getSyncBlock() {
return proto.getSyncBlock();
}

public Integer getColIdx() {
return proto.getColIdx();
}

@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message PacketHeaderProto {
required bool lastPacketInBlock = 3;
required sfixed32 dataLen = 4;
optional bool syncBlock = 5 [default = false];
optional int32 colIdx = 6;
}

// Status is a 4-bit enum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ private int receivePacket() throws IOException {
List<DnodeAttributes> dnodes = zfsTools.getFailedChunks("pool");
DnodeAttributes reconDnode = null;
for (DnodeAttributes dn : dnodes) {
if (dn.path.contains(String.valueOf(block.getBlockId()))) {
if (dn.path.contains(String.valueOf(block.getBlockId())) && !dn.path.contains("meta")) {
reconDnode = dn;
}
}
Expand All @@ -728,8 +728,9 @@ private int receivePacket() throws IOException {
throw new IllegalStateException("Cannot find reconstruction dnode for block " + block.getBlockId());
}

LOG.info("Writing to dnode at colIdx {}", header.getColIdx());
// TODO: pass in the column index information into the DFS packet
new Tools().writeRepairData("pool", reconDnode, 0, 0, dataBuf.array());
new Tools().writeRepairData("pool", reconDnode, 0, header.getColIdx(), dataBuf.array());
}

if (onDiskLen<offsetInBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,50 @@ void transferData2Target(byte[] packetBuf) throws IOException {
}
}

void transferData2Target(byte[] packetBuf, int colIdx) throws IOException {
if (targetBuffer.remaining() == 0) {
return;
}

LOG.info("Transferring {} bytes in StripedBlockWriter", packetBuf.length);

if (targetBuffer.isDirect()) {
ByteBuffer directCheckSumBuf =
BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer, directCheckSumBuf);
directCheckSumBuf.get(stripedWriter.getChecksumBuf());
BUFFER_POOL.putBuffer(directCheckSumBuf);
} else {
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer.array(), 0, targetBuffer.remaining(),
stripedWriter.getChecksumBuf(), 0);
}

int ckOff = 0;
while (targetBuffer.remaining() > 0) {
DFSPacket packet = new DFSPacket(packetBuf,
stripedWriter.getMaxChunksPerPacket(),
blockOffset4Target, seqNo4Target++,
stripedWriter.getChecksumSize(), false, colIdx);
int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket()
* stripedWriter.getBytesPerChecksum();
int toWrite = targetBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket : targetBuffer.remaining();
int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1)
* stripedWriter.getChecksumSize();
packet.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen);
ckOff += ckLen;
packet.writeData(targetBuffer, toWrite);

// Send packet
packet.writeTo(targetOutputStream);

blockOffset4Target += toWrite;
stripedWriter.getReconstructor().incrBytesWritten(toWrite);
}
}

// send an empty packet to mark the end of the block
void endTargetBlock(byte[] packetBuf) throws IOException {
DFSPacket packet = new DFSPacket(packetBuf, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ int transferData2Targets() {
return nSuccess;
}

// MLEC override
int transferData2Targets(int columnIdx) {
int nSuccess = 0;
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
boolean success = false;
try {
writers[i].transferData2Target(packetBuf, columnIdx);
nSuccess++;
success = true;
} catch (IOException e) {
LOG.error("Error while transferring data to target", e);
}
targetsStatus[i] = success;
}
}
return nSuccess;
}

/**
* Send an empty packet to mark the end of the block.
*/
Expand Down

0 comments on commit 6cbe0f4

Please sign in to comment.