Skip to content

Commit

Permalink
Cannot use whole DatanodeBlockID as part of mapKey as blockCommitSequ…
Browse files Browse the repository at this point in the history
…enceId remains changing. Issue discovered by Pratyush.
  • Loading branch information
ChenSammi committed Dec 17, 2024
1 parent ea2d594 commit ff217fb
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ public long getReplicatedMinCommitIndex() {
}

public FileInputStream getFileInputStream(long id, DatanodeBlockID blockID) {
String mapKey = id + blockID.toString();
String mapKey = id + "-" + blockID.getLocalID();
if (LOG.isDebugEnabled()) {
LOG.debug("mapKey {}", mapKey);
}
return blockStreamCache.remove(mapKey);
}

Expand Down Expand Up @@ -514,10 +517,10 @@ public void run() {
DATA_TRANSFER_MAGIC_CODE + ", Received: " + buf[0] + ")");
}
DatanodeBlockID blockID = getBlockResponse.getBlockData().getBlockID();
String mapKey = responseProto.getCallId() + blockID.toString();
String mapKey = responseProto.getCallId() + "-" + blockID.getLocalID();
blockStreamCache.put(mapKey, fis[0]);
if (LOG.isDebugEnabled()) {
LOG.debug("received fd {} ", fis[0]);
LOG.debug("received fd {} with mapKey {}", fis[0], mapKey);
}
} catch (IOException e) {
LOG.warn("Failed to handle short-circuit information exchange", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ChecksumCache {
private static final int BLOCK_CHUNK_SIZE = 4 * 1024 * 1024; // 4 MB

public ChecksumCache(int bytesPerChecksum) {
LOG.info("Initializing ChecksumCache with bytesPerChecksum = {}", bytesPerChecksum);
LOG.debug("Initializing ChecksumCache with bytesPerChecksum = {}", bytesPerChecksum);
this.prevChunkLength = 0;
this.bytesPerChecksum = bytesPerChecksum;
// Set initialCapacity to avoid costly resizes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ public void run() {
LOG.error("Failed to processRequest {} {} {}", type, request.getClientId(), request.getCallId(), e);
} finally {
span.finish();
LOG.info("before counter is incremented {}", counter);
counter.incrementAndGet();
LOG.info("counter is incremented {}", counter);
}
}
}
Expand Down

0 comments on commit ff217fb

Please sign in to comment.