Skip to content

Commit

Permalink
Start handling compact message.
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Sep 18, 2023
1 parent cda2489 commit c3a1fd0
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class CompactMessageRequest extends RequestOrResponse {
private final static short CURRENT_VERSION = COMPACT_MESSAGE_REQUEST_VERSION_1;

private final BlobId blobId;
private final long operationTimeInMs;

/**
* Helper to construct TtlUpdateRequest from a stream
Expand All @@ -48,20 +49,23 @@ public static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap
* @param correlationId the correlation id for the request
* @param clientId the id of the client generating the request
* @param blobId the blob ID whose TTL needs to be updated
* @param operationTimeMs the time of the operation (in ms)
*/
public CompactMessageRequest(int correlationId, String clientId, BlobId blobId) {
this(correlationId, clientId, blobId, CURRENT_VERSION);
public CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs) {
this(correlationId, clientId, blobId, operationTimeMs, CURRENT_VERSION);
}

/**
* @param correlationId the correlation id for the request
* @param clientId the id of the client generating the request
* @param blobId the blob ID whose TTL needs to be updated
* @param operationTimeMs the time of the operation (in ms)
* @param version the version of the TtlUpdateRequest
*/
CompactMessageRequest(int correlationId, String clientId, BlobId blobId, short version) {
CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs, short version) {
super(CompactMessageRequest, version, correlationId, clientId);
this.blobId = blobId;
this.operationTimeInMs = operationTimeMs;
}

@Override
Expand Down Expand Up @@ -96,17 +100,24 @@ public short getContainerId() {
return blobId.getContainerId();
}

/**
* @return the time of the operation (in ms)
*/
public long getOperationTimeInMs() {
return operationTimeInMs;
}

@Override
public long sizeInBytes() {
// header + blobId
return super.sizeInBytes() + blobId.sizeInBytes();
// header + blobId + op time ms
return super.sizeInBytes() + blobId.sizeInBytes() + Long.BYTES;
}

@Override
public String toString() {
return "CompactMessageRequest[" + "BlobID=" + blobId + ", " + "PartitionId=" + blobId.getPartition() + ", " + "ClientId="
+ clientId + ", " + "CorrelationId=" + correlationId + ", " + "AccountId=" + blobId.getAccountId() + ", "
+ "ContainerId=" + blobId.getContainerId() + "]";
+ "ContainerId=" + blobId.getContainerId() + "OperationTimeMs=" + operationTimeInMs + "]";
}

/**
Expand All @@ -117,7 +128,8 @@ static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap map) th
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
return new CompactMessageRequest(correlationId, clientId, id, COMPACT_MESSAGE_REQUEST_VERSION_1);
long operationTimeMs = stream.readLong();
return new CompactMessageRequest(correlationId, clientId, id, operationTimeMs, COMPACT_MESSAGE_REQUEST_VERSION_1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,76 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte
metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent));
}

public void handleCompactMessageRequest(NetworkRequest request) throws IOException, InterruptedException {
CompactMessageRequest compactMessageRequest = CompactMessageRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);
long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
long totalTimeSpent = requestQueueTime;
long startTime = SystemTime.getInstance().milliseconds();
CompactMessageResponse response = null;
try {
StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(compactMessageRequest.getBlobId())).get(0);
ServerErrorCode error =
validateRequest(compactMessageRequest.getBlobId().getPartition(), RequestOrResponseType.CompactMessageRequest, false);
if (error != ServerErrorCode.No_Error) {
logger.error("Validating delete request failed with error {} for request {}", error, compactMessageRequest);
response = new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), error);
} else {
BlobId convertedBlobId = (BlobId) convertedStoreKey;
MessageInfo info = new MessageInfo.Builder(convertedBlobId, -1, convertedBlobId.getAccountId(),
convertedBlobId.getContainerId(), compactMessageRequest.getOperationTimeInMs()).isDeleted(true)
.lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND)
.build();
Store storeToDelete = storeManager.getStore(compactMessageRequest.getBlobId().getPartition());
storeToDelete.delete(Collections.singletonList(info));
response =
new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), ServerErrorCode.No_Error);
// TODO Handle notification
if (notification != null) {
notification.onBlobReplicaDeleted(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(),
BlobReplicaSourceType.PRIMARY);
}
}
} catch (StoreException e) {
boolean logInErrorLevel = false;
if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) {
metrics.idNotFoundError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) {
metrics.ttlExpiredError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) {
metrics.idDeletedError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) {
metrics.deleteAuthorizationFailure.inc();
} else {
logInErrorLevel = true;
metrics.unExpectedStoreDeleteError.inc();
}
if (logInErrorLevel) {
logger.error("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest,
e);
} else {
logger.trace("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest,
e);
}
response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(),
ErrorMapping.getStoreErrorMapping(e.getErrorCode()));
} catch (Exception e) {
logger.error("Unknown exception for delete request {}", deleteRequest, e);
response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(),
ServerErrorCode.Unknown_Error);
metrics.unExpectedStoreDeleteError.inc();
} finally {
long processingTime = SystemTime.getInstance().milliseconds() - startTime;
totalTimeSpent += processingTime;
publicAccessLogger.info("{} {} processingTime {}", deleteRequest, response, processingTime);
// Update request metrics.
RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(requestQueueTime, processingTime, 0, 0, false);
deleteRequest.accept(metricsUpdater);
}
requestResponseChannel.sendResponse(response, request,
new ServerNetworkResponseMetrics(metrics.deleteBlobResponseQueueTimeInMs, metrics.deleteBlobSendTimeInMs,
metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent));
}

@Override
public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException {
TtlUpdateRequest updateRequest;
Expand Down

0 comments on commit c3a1fd0

Please sign in to comment.