diff --git a/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java index 5c8727d732..b900be591a 100644 --- a/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java +++ b/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java @@ -96,6 +96,15 @@ default void onBlobUndeleted(String blobId, String serviceId, Account account, C */ void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + /** + * Notifies the underlying system when a purged state of a blob is replicated to a node + * @param sourceHost The source host from where the notification is being invoked + * @param port The port of the source host from where the notification is being invoked. + * @param blobId The id of the blob whose purged state has been replicated + * @param sourceType The source that purged the blob replica + */ + void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + /** * Notifies the underlying system when a updated state of a blob is replicated to a node * @param sourceHost The source host from where the notification is being invoked diff --git a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java index a4c3d377dd..cb912b7379 100644 --- a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java +++ b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java @@ -55,6 +55,14 @@ public interface RequestAPI { */ void handleDeleteRequest(NetworkRequest request) throws IOException, InterruptedException; + /** + * Purges the blob from the store. + * @param request The request that contains the partition and id of the blob that needs to be purged. + * @throws IOException if there are I/O errors carrying our the required operation. + * @throws InterruptedException if request processing is interrupted. + */ + void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException; + /** * Updates the TTL of a blob as required in {@code request}. * @param request The request that contains the partition and id of the blob that needs to be updated. diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java index 0860ec4939..2a43362a06 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java @@ -284,6 +284,11 @@ public void handleDeleteRequest(NetworkRequest request) throws InterruptedExcept } } + @Override + public void handlePurgeRequest(NetworkRequest request) { + throw new UnsupportedOperationException("Purge is not supported in cloud yet."); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws InterruptedException { diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java index 020d938342..9dd2eec256 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java @@ -61,6 +61,11 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte throw new UnsupportedOperationException("Request type not supported"); } + @Override + public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException { throw new UnsupportedOperationException("Request type not supported"); diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java b/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java index 877868d540..8e4286bb4c 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java @@ -90,6 +90,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo logger.debug("onBlobReplicaDeleted {}, {}, {}, {}", sourceHost, port, blobId, sourceType); } + @Override + public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + logger.debug("onBlobReplicaPurged {}, {}, {}, {}", sourceHost, port, blobId, sourceType); + } + @Override public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java index af775ebe8b..e7c7cfe843 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java @@ -100,8 +100,11 @@ public class ServerMetrics { public final Histogram deleteBlobRequestQueueTimeInMs; public final Histogram deleteBlobProcessingTimeInMs; public final Histogram deleteBlobResponseQueueTimeInMs; + public final Histogram purgeBlobResponseQueueTimeInMs; public final Histogram deleteBlobSendTimeInMs; + public final Histogram purgeBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; + public final Histogram purgeBlobTotalTimeInMs; public final Histogram undeleteBlobRequestQueueTimeInMs; public final Histogram undeleteBlobProcessingTimeInMs; @@ -255,6 +258,7 @@ public class ServerMetrics { public final Counter unExpectedStoreGetError; public final Counter unExpectedStoreTtlUpdateError; public final Counter unExpectedStoreDeleteError; + public final Counter unExpectedStorePurgeError; public final Counter unExpectedStoreUndeleteError; public final Counter unExpectedAdminOperationError; public final Counter unExpectedStoreFindEntriesError; @@ -271,6 +275,7 @@ public class ServerMetrics { public final Counter temporarilyDisabledError; public final Counter getAuthorizationFailure; public final Counter deleteAuthorizationFailure; + public final Counter purgeAuthorizationFailure; public final Counter undeleteAuthorizationFailure; public final Counter ttlUpdateAuthorizationFailure; public final Counter ttlAlreadyUpdatedError; @@ -393,8 +398,12 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobProcessingTime")); deleteBlobResponseQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobResponseQueueTime")); + purgeBlobResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobResponseQueueTimeInMs")); deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); + purgeBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobSendTimeInMs")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); + purgeBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobTotalTimeInMs")); undeleteBlobRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); @@ -622,6 +631,8 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError")); unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError")); unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError")); + unExpectedStorePurgeError = + registry.counter(MetricRegistry.name(requestClass, "UnExpectedStorePurgeError")); unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError")); unExpectedAdminOperationError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError")); @@ -631,6 +642,8 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError")); getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure")); deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure")); + purgeAuthorizationFailure = + registry.counter(MetricRegistry.name(requestClass, "PurgeAuthorizationFailure")); undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure")); ttlUpdateAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure")); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java deleted file mode 100644 index 90ce06308f..0000000000 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageRequest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2023 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.protocol; - -import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.commons.BlobId; -import com.github.ambry.utils.Utils; -import java.io.DataInputStream; -import java.io.IOException; - -import static com.github.ambry.protocol.RequestOrResponseType.CompactMessageRequest; - - -public class CompactMessageRequest extends RequestOrResponse { - - static final short COMPACT_MESSAGE_REQUEST_VERSION_1 = 1; - private final static short CURRENT_VERSION = COMPACT_MESSAGE_REQUEST_VERSION_1; - - private final BlobId blobId; - private final long operationTimeInMs; - - /** - * Helper to construct TtlUpdateRequest from a stream - * @param stream the stream to read data from - * @param map the {@link ClusterMap} to use - * @return a TtlUpdateRequest based on data read off of the stream - * @throws IOException if there were any problems reading the stream - */ - public static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException { - short version = stream.readShort(); - if (version == COMPACT_MESSAGE_REQUEST_VERSION_1) { - return CompactMessageRequest_V1.readFrom(stream, map); - } - throw new IllegalStateException("Unknown TTL Request version " + version); - } - - /** - * @param correlationId the correlation id for the request - * @param clientId the id of the client generating the request - * @param blobId the blob ID whose TTL needs to be updated - * @param operationTimeMs the time of the operation (in ms) - */ - public CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs) { - this(correlationId, clientId, blobId, operationTimeMs, CURRENT_VERSION); - } - - /** - * @param correlationId the correlation id for the request - * @param clientId the id of the client generating the request - * @param blobId the blob ID whose TTL needs to be updated - * @param operationTimeMs the time of the operation (in ms) - * @param version the version of the TtlUpdateRequest - */ - CompactMessageRequest(int correlationId, String clientId, BlobId blobId, long operationTimeMs, short version) { - super(CompactMessageRequest, version, correlationId, clientId); - this.blobId = blobId; - this.operationTimeInMs = operationTimeMs; - } - - @Override - public void accept(RequestVisitor visitor) { - // TODO Efficient Metadata Operations: Implement this. - } - - @Override - protected void prepareBuffer() { - super.prepareBuffer(); - bufferToSend.writeBytes(blobId.toBytes()); - } - - /** - * @return the blob ID whose TTL needs to be updated. - */ - public BlobId getBlobId() { - return blobId; - } - - /** - * @return the id of the account that the blob belongs to - */ - public short getAccountId() { - return blobId.getAccountId(); - } - - /** - * @return the id of the container that the blob belongs to - */ - public short getContainerId() { - return blobId.getContainerId(); - } - - /** - * @return the time of the operation (in ms) - */ - public long getOperationTimeInMs() { - return operationTimeInMs; - } - - @Override - public long sizeInBytes() { - // header + blobId + op time ms - return super.sizeInBytes() + blobId.sizeInBytes() + Long.BYTES; - } - - @Override - public String toString() { - return "CompactMessageRequest[" + "BlobID=" + blobId + ", " + "PartitionId=" + blobId.getPartition() + ", " + "ClientId=" - + clientId + ", " + "CorrelationId=" + correlationId + ", " + "AccountId=" + blobId.getAccountId() + ", " - + "ContainerId=" + blobId.getContainerId() + "OperationTimeMs=" + operationTimeInMs + "]"; - } - - /** - * Class to read protocol version 1 CompactMessageRequest from the stream. - */ - public static class CompactMessageRequest_V1 { - static CompactMessageRequest readFrom(DataInputStream stream, ClusterMap map) throws IOException { - int correlationId = stream.readInt(); - String clientId = Utils.readIntString(stream); - BlobId id = new BlobId(stream, map); - long operationTimeMs = stream.readLong(); - return new CompactMessageRequest(correlationId, clientId, id, operationTimeMs, COMPACT_MESSAGE_REQUEST_VERSION_1); - } - } -} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageResponse.java deleted file mode 100644 index 7742bb688f..0000000000 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/CompactMessageResponse.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2023 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.protocol; - -import com.github.ambry.server.ServerErrorCode; -import com.github.ambry.utils.Utils; -import java.io.DataInputStream; -import java.io.IOException; - - -/** - * Response to a {@link CompactMessageRequest} - */ -public class CompactMessageResponse extends Response { - private static final short COMPACT_MESSAGE_RESPONSE_VERSION_1 = 1; - - /** - * @param correlationId the correlation id from the {@link TtlUpdateRequest} - * @param clientId the id of the client from the {@link TtlUpdateRequest} - * @param error the {@link ServerErrorCode} for the operation - */ - public CompactMessageResponse(int correlationId, String clientId, ServerErrorCode error) { - super(RequestOrResponseType.TtlUpdateResponse, COMPACT_MESSAGE_RESPONSE_VERSION_1, correlationId, clientId, error); - } - - /** - * Helper to help construct TtlUpdateResponse from the {@code stream}. - * @param stream the stream to read bytes from - * @return a TtlUpdateResponse based on data read from the {@code stream} - * @throws IOException if there was any problem reading the stream - */ - public static CompactMessageResponse readFrom(DataInputStream stream) throws IOException { - RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; - if (type != RequestOrResponseType.CompactMessageResponse) { - throw new IllegalArgumentException("The type of request response is not compatible"); - } - short version = stream.readShort(); - if (version != COMPACT_MESSAGE_RESPONSE_VERSION_1) { - throw new IllegalStateException("Unknown CompactMessageResponse version: " + version); - } - int correlationId = stream.readInt(); - String clientId = Utils.readIntString(stream); - ServerErrorCode error = ServerErrorCode.values()[stream.readShort()]; - return new CompactMessageResponse(correlationId, clientId, error); - } - - @Override - public String toString() { - return "CompactMessageResponse[" + "ServerErrorCode=" + getError() + "]"; - } -} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java index 7012fc01c6..8340c5d0a2 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java @@ -35,6 +35,7 @@ public class PurgeRequest extends RequestOrResponse { private final static short CURRENT_VERSION = PURGE_MESSAGE_REQUEST_VERSION_1; private final BlobId blobId; + private final long purgeTimeInMs; /** * Helper to construct PurgeRequest from a stream @@ -55,20 +56,23 @@ public static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) thro * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID whose TTL needs to be updated + * @param purgeTimeInMs deletion time of the blob in ms */ - public PurgeRequest(int correlationId, String clientId, BlobId blobId) { - this(correlationId, clientId, blobId, CURRENT_VERSION); + public PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs) { + this(correlationId, clientId, blobId, purgeTimeInMs, CURRENT_VERSION); } /** * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID that needs to be purged. + * @param purgeTimeInMs deletion time of the blob in ms * @param version the version of the {@link PurgeRequest}. */ - PurgeRequest(int correlationId, String clientId, BlobId blobId, short version) { + PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs, short version) { super(PurgeRequest, version, correlationId, clientId); this.blobId = blobId; + this.purgeTimeInMs = purgeTimeInMs; } @Override @@ -103,6 +107,13 @@ public short getContainerId() { return blobId.getContainerId(); } + /** + * @return the purge time. + */ + public long getPurgeTimeInMs() { + return purgeTimeInMs; + } + @Override public long sizeInBytes() { // header + blobId @@ -124,7 +135,8 @@ static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) throws IOEx int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); BlobId id = new BlobId(stream, map); - return new PurgeRequest(correlationId, clientId, id, PURGE_MESSAGE_REQUEST_VERSION_1); + long purgeTimeInMs = stream.readLong(); + return new PurgeRequest(correlationId, clientId, id, purgeTimeInMs, PURGE_MESSAGE_REQUEST_VERSION_1); } } } diff --git a/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java b/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java index 86042c64ba..994f484368 100644 --- a/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java +++ b/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java @@ -482,6 +482,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo throw new IllegalStateException("Not implemented"); } + @Override + public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + throw new IllegalStateException("Not implemented"); + } + @Override public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java b/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java index 72164299cf..a731a2cbe7 100644 --- a/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java @@ -419,6 +419,7 @@ class EventTracker { private final int numberOfReplicas; private final Helper creationHelper; private final Helper deletionHelper; + private final Helper purgeHelper; private final Helper undeleteHelper; private final Helper replicateHelper; private final ConcurrentMap updateHelpers = new ConcurrentHashMap<>(); @@ -496,6 +497,7 @@ private String getKey(String host, int port) { numberOfReplicas = expectedNumberOfReplicas; creationHelper = new Helper(numberOfReplicas); deletionHelper = new Helper(numberOfReplicas); + purgeHelper = new Helper(numberOfReplicas); undeleteHelper = new Helper(numberOfReplicas); // On-demand-replication, we usually only need one replica to replicate the Blob. replicateHelper = new Helper(1); @@ -519,6 +521,15 @@ void trackDeletion(String host, int port) { deletionHelper.track(host, port); } + /** + * Tracks the purge event that arrived on {@code host}:{@code port}. + * @param host the host that received the compact message. + * @param port the port of the host that describes the instance along with {@code host}. + */ + void trackPurge(String host, int port) { + purgeHelper.track(host, port); + } + /** * Tracks the undelete event that arrived on {@code host}:{@code port}. * @param host the host that received the undelete @@ -686,6 +697,13 @@ public synchronized void onBlobReplicaDeleted(String sourceHost, int port, Strin .trackDeletion(sourceHost, port); } + @Override + public synchronized void onBlobReplicaPurged(String sourceHost, int port, String blobId, + BlobReplicaSourceType sourceType) { + objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId))) + .trackPurge(sourceHost, port); + } + @Override public synchronized void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index f0e3dc3057..9bc0431b8a 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -494,32 +494,32 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } - public void handleCompactMessageRequest(NetworkRequest request) throws IOException, InterruptedException { - CompactMessageRequest compactMessageRequest = CompactMessageRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + @Override + public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException { + PurgeRequest purgeRequest = PurgeRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); long totalTimeSpent = requestQueueTime; long startTime = SystemTime.getInstance().milliseconds(); - CompactMessageResponse response = null; + PurgeResponse response = null; try { - StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(compactMessageRequest.getBlobId())).get(0); + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(purgeRequest.getBlobId())).get(0); ServerErrorCode error = - validateRequest(compactMessageRequest.getBlobId().getPartition(), RequestOrResponseType.CompactMessageRequest, false); + validateRequest(purgeRequest.getBlobId().getPartition(), RequestOrResponseType.PurgeRequest, false); if (error != ServerErrorCode.No_Error) { - logger.error("Validating delete request failed with error {} for request {}", error, compactMessageRequest); - response = new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), error); + logger.error("Validating delete request failed with error {} for request {}", error, purgeRequest); + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), error); } else { BlobId convertedBlobId = (BlobId) convertedStoreKey; MessageInfo info = new MessageInfo.Builder(convertedBlobId, -1, convertedBlobId.getAccountId(), - convertedBlobId.getContainerId(), compactMessageRequest.getOperationTimeInMs()).isDeleted(true) + convertedBlobId.getContainerId(), purgeRequest.getPurgeTimeInMs()).isDeleted(true) .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) .build(); - Store storeToDelete = storeManager.getStore(compactMessageRequest.getBlobId().getPartition()); - storeToDelete.delete(Collections.singletonList(info)); + Store storeToDelete = storeManager.getStore(purgeRequest.getBlobId().getPartition()); + storeToDelete.purge(Collections.singletonList(info)); response = - new CompactMessageResponse(compactMessageRequest.getCorrelationId(), compactMessageRequest.getClientId(), ServerErrorCode.No_Error); - // TODO Handle notification + new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), ServerErrorCode.No_Error); if (notification != null) { - notification.onBlobReplicaDeleted(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(), + notification.onBlobReplicaPurged(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(), BlobReplicaSourceType.PRIMARY); } } @@ -532,36 +532,36 @@ public void handleCompactMessageRequest(NetworkRequest request) throws IOExcepti } else if (e.getErrorCode() == StoreErrorCodes.ID_Deleted) { metrics.idDeletedError.inc(); } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { - metrics.deleteAuthorizationFailure.inc(); + metrics.purgeAuthorizationFailure.inc(); } else { logInErrorLevel = true; metrics.unExpectedStoreDeleteError.inc(); } if (logInErrorLevel) { - logger.error("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest, + logger.error("Store exception on a compact message with error code {} for request {}", e.getErrorCode(), purgeRequest, e); } else { - logger.trace("Store exception on a delete with error code {} for request {}", e.getErrorCode(), deleteRequest, + logger.trace("Store exception on a compact with error code {} for request {}", e.getErrorCode(), purgeRequest, e); } - response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), ErrorMapping.getStoreErrorMapping(e.getErrorCode())); } catch (Exception e) { - logger.error("Unknown exception for delete request {}", deleteRequest, e); - response = new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + logger.error("Unknown exception for compact message request {}", purgeRequest, e); + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), ServerErrorCode.Unknown_Error); - metrics.unExpectedStoreDeleteError.inc(); + metrics.unExpectedStorePurgeError.inc(); } finally { long processingTime = SystemTime.getInstance().milliseconds() - startTime; totalTimeSpent += processingTime; - publicAccessLogger.info("{} {} processingTime {}", deleteRequest, response, processingTime); + publicAccessLogger.info("{} {} processingTime {}", purgeRequest, response, processingTime); // Update request metrics. RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(requestQueueTime, processingTime, 0, 0, false); - deleteRequest.accept(metricsUpdater); + purgeRequest.accept(metricsUpdater); } requestResponseChannel.sendResponse(response, request, - new ServerNetworkResponseMetrics(metrics.deleteBlobResponseQueueTimeInMs, metrics.deleteBlobSendTimeInMs, - metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent)); + new ServerNetworkResponseMetrics(metrics.purgeBlobResponseQueueTimeInMs, metrics.purgeBlobSendTimeInMs, + metrics.purgeBlobTotalTimeInMs, null, null, totalTimeSpent)); } @Override