diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java index aefaeb13f2..90ce06308f 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java @@ -28,6 +28,7 @@ public class CompactMessageRequest extends RequestOrResponse { private final static short CURRENT_VERSION = COMPACT_MESSAGE_REQUEST_VERSION_1; private final BlobId blobId; + private final long operationTimeInMs; /** * Helper to construct TtlUpdateRequest from a stream @@ -48,20 +49,23 @@ public static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID whose TTL needs to be updated + * @param operationTimeMs the time of the operation (in ms) */ - public CompactMessageRequest(int correlationId, String clientId, BlobId blobId) { - this(correlationId, clientId, blobId, CURRENT_VERSION); + public CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs) { + this(correlationId, clientId, blobId, operationTimeMs, CURRENT_VERSION); } /** * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID whose TTL needs to be updated + * @param operationTimeMs the time of the operation (in ms) * @param version the version of the TtlUpdateRequest */ - CompactMessageRequest(int correlationId, String clientId, BlobId blobId, short version) { + CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs, short version) { super(CompactMessageRequest, version, correlationId, clientId); this.blobId = blobId; + this.operationTimeInMs = operationTimeMs; } @Override @@ -96,17 +100,24 @@ public short getContainerId() { return blobId.getContainerId(); } + /** + * @return the time of the operation (in ms) + */ + public long getOperationTimeInMs() { + return operationTimeInMs; + } + @Override public long sizeInBytes() { - // header + blobId - return super.sizeInBytes() + blobId.sizeInBytes(); + // header + blobId + op time ms + return super.sizeInBytes() + blobId.sizeInBytes() + Long.BYTES; } @Override public String toString() { return "CompactMessageRequest[" + "BlobID=" + blobId + ", " + "PartitionId=" + blobId.getPartition() + ", " + "ClientId=" + clientId + ", " + "CorrelationId=" + correlationId + ", " + "AccountId=" + blobId.getAccountId() + ", " - + "ContainerId=" + blobId.getContainerId() + "]"; + + "ContainerId=" + blobId.getContainerId() + "OperationTimeMs=" + operationTimeInMs + "]"; } /** @@ -117,7 +128,8 @@ static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap map) th int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); BlobId id = new BlobId(stream, map); - return new CompactMessageRequest(correlationId, clientId, id, COMPACT_MESSAGE_REQUEST_VERSION_1); + long operationTimeMs = stream.readLong(); + return new CompactMessageRequest(correlationId, clientId, id, operationTimeMs, COMPACT_MESSAGE_REQUEST_VERSION_1); } } } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 811d813fab..f0e3dc3057 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -494,6 +494,76 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + public void handleCompactMessageRequest(NetworkRequest request) throws IOException, InterruptedException { + CompactMessageRequest compactMessageRequest = CompactMessageRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + long startTime = SystemTime.getInstance().milliseconds(); + CompactMessageResponse response = null; + try { + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(compactMessageRequest.getBlobId())).get(0); + ServerErrorCode error = + validateRequest(compactMessageRequest.getBlobId().getPartition(), RequestOrResponseType.CompactMessageRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating delete request failed with error {} for request {}", error, compactMessageRequest); + response = new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), error); + } else { + BlobId convertedBlobId = (BlobId) convertedStoreKey; + MessageInfo info = new MessageInfo.Builder(convertedBlobId, -1, convertedBlobId.getAccountId(), + convertedBlobId.getContainerId(), compactMessageRequest.getOperationTimeInMs()).isDeleted(true) + .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) + .build(); + Store storeToDelete = storeManager.getStore(compactMessageRequest.getBlobId().getPartition()); + storeToDelete.delete(Collections.singletonList(info)); + response = + new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), ServerErrorCode.No_Error); + // TODO Handle notification + if (notification != null) { + notification.onBlobReplicaDeleted(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(), + BlobReplicaSourceType.PRIMARY); + } + } + } catch (StoreException e) { + boolean logInErrorLevel = false; + if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { + metrics.idNotFoundError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.TTL_Expired) { + metrics.ttlExpiredError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) { + metrics.idDeletedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { + metrics.deleteAuthorizationFailure.inc(); + } else { + logInErrorLevel = true; + metrics.unExpectedStoreDeleteError.inc(); + } + if (logInErrorLevel) { + logger.error("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest, + e); + } else { + logger.trace("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest, + e); + } + response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + } catch (Exception e) { + logger.error("Unknown exception for delete request {}", deleteRequest, e); + response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + ServerErrorCode.Unknown_Error); + metrics.unExpectedStoreDeleteError.inc(); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", deleteRequest, response, processingTime); + // Update request metrics. + RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(requestQueueTime, processingTime, 0, 0, false); + deleteRequest.accept(metricsUpdater); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.deleteBlobResponseQueueTimeInMs, metrics.deleteBlobSendTimeInMs, + metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent)); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException { TtlUpdateRequest updateRequest;