diff --git a/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java b/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java index 5c8727d732..b900be591a 100644 --- a/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java +++ b/ambry-api/src/main/java/com/github/ambry/notification/NotificationSystem.java @@ -96,6 +96,15 @@ default void onBlobUndeleted(String blobId, String serviceId, Account account, C */ void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + /** + * Notifies the underlying system when a purged state of a blob is replicated to a node + * @param sourceHost The source host from where the notification is being invoked + * @param port The port of the source host from where the notification is being invoked. + * @param blobId The id of the blob whose purged state has been replicated + * @param sourceType The source that purged the blob replica + */ + void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType); + /** * Notifies the underlying system when a updated state of a blob is replicated to a node * @param sourceHost The source host from where the notification is being invoked diff --git a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java index a4c3d377dd..cb912b7379 100644 --- a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java +++ b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java @@ -55,6 +55,14 @@ public interface RequestAPI { */ void handleDeleteRequest(NetworkRequest request) throws IOException, InterruptedException; + /** + * Purges the blob from the store. + * @param request The request that contains the partition and id of the blob that needs to be purged. + * @throws IOException if there are I/O errors carrying our the required operation. + * @throws InterruptedException if request processing is interrupted. + */ + void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException; + /** * Updates the TTL of a blob as required in {@code request}. * @param request The request that contains the partition and id of the blob that needs to be updated. diff --git a/ambry-api/src/main/java/com/github/ambry/store/StoreErrorCodes.java b/ambry-api/src/main/java/com/github/ambry/store/StoreErrorCodes.java index 4464998bc7..af13d31543 100644 --- a/ambry-api/src/main/java/com/github/ambry/store/StoreErrorCodes.java +++ b/ambry-api/src/main/java/com/github/ambry/store/StoreErrorCodes.java @@ -39,5 +39,7 @@ public enum StoreErrorCodes { Life_Version_Conflict, ID_Not_Deleted, ID_Undeleted, - ID_Deleted_Permanently + ID_Deleted_Permanently, + // TODO Efficient_Metadata_Operations_TODO : ID_Purged error should be handled in all of the store's methods. + ID_Purged } diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java index 0860ec4939..2a43362a06 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/AmbryCloudRequests.java @@ -284,6 +284,11 @@ public void handleDeleteRequest(NetworkRequest request) throws InterruptedExcept } } + @Override + public void handlePurgeRequest(NetworkRequest request) { + throw new UnsupportedOperationException("Purge is not supported in cloud yet."); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws InterruptedException { diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java index 020d938342..9dd2eec256 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrRequests.java @@ -61,6 +61,11 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte throw new UnsupportedOperationException("Request type not supported"); } + @Override + public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException { throw new UnsupportedOperationException("Request type not supported"); diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java b/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java index 877868d540..8e4286bb4c 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/LoggingNotificationSystem.java @@ -90,6 +90,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo logger.debug("onBlobReplicaDeleted {}, {}, {}, {}", sourceHost, port, blobId, sourceType); } + @Override + public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + logger.debug("onBlobReplicaPurged {}, {}, {}, {}", sourceHost, port, blobId, sourceType); + } + @Override public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java index af775ebe8b..ac95f1030d 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/ServerMetrics.java @@ -100,8 +100,11 @@ public class ServerMetrics { public final Histogram deleteBlobRequestQueueTimeInMs; public final Histogram deleteBlobProcessingTimeInMs; public final Histogram deleteBlobResponseQueueTimeInMs; + public final Histogram purgeBlobResponseQueueTimeInMs; public final Histogram deleteBlobSendTimeInMs; + public final Histogram purgeBlobSendTimeInMs; public final Histogram deleteBlobTotalTimeInMs; + public final Histogram purgeBlobTotalTimeInMs; public final Histogram undeleteBlobRequestQueueTimeInMs; public final Histogram undeleteBlobProcessingTimeInMs; @@ -255,6 +258,7 @@ public class ServerMetrics { public final Counter unExpectedStoreGetError; public final Counter unExpectedStoreTtlUpdateError; public final Counter unExpectedStoreDeleteError; + public final Counter unExpectedStorePurgeError; public final Counter unExpectedStoreUndeleteError; public final Counter unExpectedAdminOperationError; public final Counter unExpectedStoreFindEntriesError; @@ -263,6 +267,7 @@ public class ServerMetrics { public final Counter unknownFormatError; public final Counter idNotFoundError; public final Counter idDeletedError; + public final Counter idPurgedError; public final Counter idUndeletedError; public final Counter idNotDeletedError; public final Counter lifeVersionConflictError; @@ -271,6 +276,7 @@ public class ServerMetrics { public final Counter temporarilyDisabledError; public final Counter getAuthorizationFailure; public final Counter deleteAuthorizationFailure; + public final Counter purgeAuthorizationFailure; public final Counter undeleteAuthorizationFailure; public final Counter ttlUpdateAuthorizationFailure; public final Counter ttlAlreadyUpdatedError; @@ -393,8 +399,12 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se deleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobProcessingTime")); deleteBlobResponseQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobResponseQueueTime")); + purgeBlobResponseQueueTimeInMs = + registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobResponseQueueTimeInMs")); deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime")); + purgeBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobSendTimeInMs")); deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime")); + purgeBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobTotalTimeInMs")); undeleteBlobRequestQueueTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime")); @@ -613,6 +623,7 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se unknownFormatError = registry.counter(MetricRegistry.name(requestClass, "UnknownFormatError")); idNotFoundError = registry.counter(MetricRegistry.name(requestClass, "IDNotFoundError")); idDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDDeletedError")); + idPurgedError = registry.counter(MetricRegistry.name(requestClass, "IDPurgedError")); idUndeletedError = registry.counter(MetricRegistry.name(requestClass, "IDUndeletedError")); idNotDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDNotDeletedError")); lifeVersionConflictError = registry.counter(MetricRegistry.name(requestClass, "lifeVersionConflictError")); @@ -622,6 +633,8 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError")); unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError")); unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError")); + unExpectedStorePurgeError = + registry.counter(MetricRegistry.name(requestClass, "UnExpectedStorePurgeError")); unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError")); unExpectedAdminOperationError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError")); @@ -631,6 +644,8 @@ public ServerMetrics(MetricRegistry registry, Class requestClass, Class se registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError")); getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure")); deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure")); + purgeAuthorizationFailure = + registry.counter(MetricRegistry.name(requestClass, "PurgeAuthorizationFailure")); undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure")); ttlUpdateAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure")); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java index 7012fc01c6..8340c5d0a2 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/PurgeRequest.java @@ -35,6 +35,7 @@ public class PurgeRequest extends RequestOrResponse { private final static short CURRENT_VERSION = PURGE_MESSAGE_REQUEST_VERSION_1; private final BlobId blobId; + private final long purgeTimeInMs; /** * Helper to construct PurgeRequest from a stream @@ -55,20 +56,23 @@ public static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) thro * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID whose TTL needs to be updated + * @param purgeTimeInMs deletion time of the blob in ms */ - public PurgeRequest(int correlationId, String clientId, BlobId blobId) { - this(correlationId, clientId, blobId, CURRENT_VERSION); + public PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs) { + this(correlationId, clientId, blobId, purgeTimeInMs, CURRENT_VERSION); } /** * @param correlationId the correlation id for the request * @param clientId the id of the client generating the request * @param blobId the blob ID that needs to be purged. + * @param purgeTimeInMs deletion time of the blob in ms * @param version the version of the {@link PurgeRequest}. */ - PurgeRequest(int correlationId, String clientId, BlobId blobId, short version) { + PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs, short version) { super(PurgeRequest, version, correlationId, clientId); this.blobId = blobId; + this.purgeTimeInMs = purgeTimeInMs; } @Override @@ -103,6 +107,13 @@ public short getContainerId() { return blobId.getContainerId(); } + /** + * @return the purge time. + */ + public long getPurgeTimeInMs() { + return purgeTimeInMs; + } + @Override public long sizeInBytes() { // header + blobId @@ -124,7 +135,8 @@ static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) throws IOEx int correlationId = stream.readInt(); String clientId = Utils.readIntString(stream); BlobId id = new BlobId(stream, map); - return new PurgeRequest(correlationId, clientId, id, PURGE_MESSAGE_REQUEST_VERSION_1); + long purgeTimeInMs = stream.readLong(); + return new PurgeRequest(correlationId, clientId, id, purgeTimeInMs, PURGE_MESSAGE_REQUEST_VERSION_1); } } } diff --git a/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java b/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java index 86042c64ba..994f484368 100644 --- a/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java +++ b/ambry-rest/src/test/java/com/github/ambry/rest/NettyMessageProcessorTest.java @@ -482,6 +482,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo throw new IllegalStateException("Not implemented"); } + @Override + public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) { + throw new IllegalStateException("Not implemented"); + } + @Override public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java b/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java index 72164299cf..c56fe8a3f3 100644 --- a/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com/github/ambry/server/MockCluster.java @@ -419,6 +419,7 @@ class EventTracker { private final int numberOfReplicas; private final Helper creationHelper; private final Helper deletionHelper; + private final Helper purgeHelper; private final Helper undeleteHelper; private final Helper replicateHelper; private final ConcurrentMap updateHelpers = new ConcurrentHashMap<>(); @@ -496,6 +497,7 @@ private String getKey(String host, int port) { numberOfReplicas = expectedNumberOfReplicas; creationHelper = new Helper(numberOfReplicas); deletionHelper = new Helper(numberOfReplicas); + purgeHelper = new Helper(numberOfReplicas); undeleteHelper = new Helper(numberOfReplicas); // On-demand-replication, we usually only need one replica to replicate the Blob. replicateHelper = new Helper(1); @@ -519,6 +521,15 @@ void trackDeletion(String host, int port) { deletionHelper.track(host, port); } + /** + * Tracks the purge event that arrived on {@code host}:{@code port}. + * @param host the host that received the purge request. + * @param port the port of the host that describes the instance along with {@code host}. + */ + void trackPurge(String host, int port) { + purgeHelper.track(host, port); + } + /** * Tracks the undelete event that arrived on {@code host}:{@code port}. * @param host the host that received the undelete @@ -686,6 +697,13 @@ public synchronized void onBlobReplicaDeleted(String sourceHost, int port, Strin .trackDeletion(sourceHost, port); } + @Override + public synchronized void onBlobReplicaPurged(String sourceHost, int port, String blobId, + BlobReplicaSourceType sourceType) { + objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId))) + .trackPurge(sourceHost, port); + } + @Override public synchronized void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) { diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index 811d813fab..8e76d858f2 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -60,6 +60,8 @@ import com.github.ambry.protocol.GetResponse; import com.github.ambry.protocol.PartitionRequestInfo; import com.github.ambry.protocol.PartitionResponseInfo; +import com.github.ambry.protocol.PurgeRequest; +import com.github.ambry.protocol.PurgeResponse; import com.github.ambry.protocol.PutRequest; import com.github.ambry.protocol.PutResponse; import com.github.ambry.protocol.ReplicaMetadataRequest; @@ -494,6 +496,74 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + @Override + public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException { + PurgeRequest purgeRequest = PurgeRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap); + long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs(); + long totalTimeSpent = requestQueueTime; + long startTime = SystemTime.getInstance().milliseconds(); + PurgeResponse response = null; + try { + StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(purgeRequest.getBlobId())).get(0); + ServerErrorCode error = + validateRequest(purgeRequest.getBlobId().getPartition(), RequestOrResponseType.PurgeRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating purge request failed with error {} for request {}", error, purgeRequest); + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), error); + } else { + BlobId convertedBlobId = (BlobId) convertedStoreKey; + MessageInfo info = new MessageInfo.Builder(convertedBlobId, -1, convertedBlobId.getAccountId(), + convertedBlobId.getContainerId(), purgeRequest.getPurgeTimeInMs()).isDeleted(true) + .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) + .build(); + Store storeToPurge = storeManager.getStore(purgeRequest.getBlobId().getPartition()); + storeToPurge.purge(Collections.singletonList(info)); + response = + new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), ServerErrorCode.No_Error); + if (notification != null) { + notification.onBlobReplicaPurged(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(), + BlobReplicaSourceType.PRIMARY); + } + } + } catch (StoreException e) { + boolean logInErrorLevel = false; + if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) { + metrics.idNotFoundError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.ID_Purged) { + metrics.idPurgedError.inc(); + } else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) { + metrics.purgeAuthorizationFailure.inc(); + } else { + logInErrorLevel = true; + metrics.unExpectedStorePurgeError.inc(); + } + if (logInErrorLevel) { + logger.error("Store exception on a purge message with error code {} for request {}", e.getErrorCode(), + purgeRequest, e); + } else { + logger.trace("Store exception on a purge with error code {} for request {}", e.getErrorCode(), purgeRequest, + e); + } + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(e.getErrorCode())); + } catch (Exception e) { + logger.error("Unknown exception for purge request {}", purgeRequest, e); + response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), + ServerErrorCode.Unknown_Error); + metrics.unExpectedStorePurgeError.inc(); + } finally { + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + totalTimeSpent += processingTime; + publicAccessLogger.info("{} {} processingTime {}", purgeRequest, response, processingTime); + // Update request metrics. + RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(requestQueueTime, processingTime, 0, 0, false); + purgeRequest.accept(metricsUpdater); + } + requestResponseChannel.sendResponse(response, request, + new ServerNetworkResponseMetrics(metrics.purgeBlobResponseQueueTimeInMs, metrics.purgeBlobSendTimeInMs, + metrics.purgeBlobTotalTimeInMs, null, null, totalTimeSpent)); + } + @Override public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException { TtlUpdateRequest updateRequest; diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index cbb880270f..83ebe08d81 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -563,6 +563,11 @@ public void delete(List infosToDelete) throws StoreException { "Cannot delete id " + info.getStoreKey() + " since it is already deleted in the index.", StoreErrorCodes.ID_Deleted); } + if (value.isPurge()) { + throw new StoreException( + "Cannot purge id " + info.getStoreKey() + " since it is already purged in the index.", + StoreErrorCodes.ID_Purged); + } revisedLifeVersion = value.getLifeVersion(); } else { // This is a delete request from replication @@ -778,6 +783,10 @@ public void updateTtl(List infosToUpdate) throws StoreException { throw new StoreException( "Cannot update TTL of " + info.getStoreKey() + " since it is already deleted in the index.", StoreErrorCodes.ID_Deleted); + } else if (value.isPurge()) { + throw new StoreException( + "Cannot update TTL of " + info.getStoreKey() + " since it is already purged in the index.", + StoreErrorCodes.ID_Purged); } else if (value.isTtlUpdate()) { throw new StoreException("TTL of " + info.getStoreKey() + " is already updated in the index.", StoreErrorCodes.Already_Updated); @@ -805,6 +814,10 @@ public void updateTtl(List infosToUpdate) throws StoreException { throw new StoreException( "Cannot update TTL of " + info.getStoreKey() + " since it is already deleted in the index.", StoreErrorCodes.ID_Deleted); + } else if (value.isPurge()) { + throw new StoreException( + "Cannot update TTL of " + info.getStoreKey() + " since it is already purged in the index.", + StoreErrorCodes.ID_Purged); } else if (value.isTtlUpdate()) { throw new StoreException("TTL of " + info.getStoreKey() + " is already updated in the index.", StoreErrorCodes.Already_Updated); diff --git a/ambry-store/src/main/java/com/github/ambry/store/IndexValue.java b/ambry-store/src/main/java/com/github/ambry/store/IndexValue.java index 221bb58e1b..94d669507b 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/IndexValue.java +++ b/ambry-store/src/main/java/com/github/ambry/store/IndexValue.java @@ -50,7 +50,7 @@ class IndexValue implements Comparable { enum Flags { - Delete_Index, Ttl_Update_Index, Undelete_Index + Delete_Index, Ttl_Update_Index, Undelete_Index, Purge_Index } final static byte FLAGS_DEFAULT_VALUE = (byte) 0; @@ -269,6 +269,14 @@ boolean isDelete() { return isFlagSet(Flags.Delete_Index); } + /** + * Helper function for isFlagSet(Flags.Purge_Index). + * @return true when the Purge_Index is set. + */ + boolean isPurge() { + return isFlagSet(Flags.Purge_Index); + } + /** * Helper function for isFlagSet(Flags.Undelete_Index). * @return true when the Undelete_Index is set. diff --git a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java index 368dd789d2..6eb5ad2772 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java +++ b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java @@ -1026,6 +1026,12 @@ void validateSanityForUndelete(StoreKey key, List values, short life validateSanityForUndeleteWithoutLifeVersion(key, values); return; } + for (IndexValue indexValue : values) { + if (indexValue.isPurge()) { + throw new StoreException( + "Id " + key + " cannot be undeleted as it has been purged in index " + dataDir,StoreErrorCodes.ID_Purged); + } + } // This is from replication. For replication, undelete should be permitted only when // 1. The oldest record is PUT // 2. the latest record's lifeVersion is less then undelete's lifeVersion. @@ -1341,6 +1347,8 @@ BlobReadOptions getBlobReadInfo(StoreKey id, EnumSet getOptions } else { readOptions = getDeletedBlobReadOptions(value, id, indexSegments); } + } else if (value.isPurge()) { + throw new StoreException("Id " + id + " has been purged in index " + dataDir, StoreErrorCodes.ID_Purged); } else if (isExpired(value) && !getOptions.contains(StoreGetOptions.Store_Include_Expired)) { throw new StoreException("Id " + id + " has expired ttl in index " + dataDir, StoreErrorCodes.TTL_Expired); } else if (value.isUndelete()) { diff --git a/ambry-store/src/main/java/com/github/ambry/store/StoreMetrics.java b/ambry-store/src/main/java/com/github/ambry/store/StoreMetrics.java index ac43a3a29a..731bb087df 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StoreMetrics.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StoreMetrics.java @@ -37,6 +37,7 @@ public class StoreMetrics { public final Timer getResponse; public final Timer putResponse; public final Timer deleteResponse; + public final Timer purgeResponse; public final Timer ttlUpdateResponse; public final Timer undeleteResponse; public final Timer findEntriesSinceResponse; @@ -87,6 +88,7 @@ public class StoreMetrics { public final Counter identicalPutAttemptCount; public final Counter getAuthorizationFailureCount; public final Counter deleteAuthorizationFailureCount; + public final Counter purgeAuthorizationFailureCount; public final Counter ttlUpdateAuthorizationFailureCount; public final Counter undeleteAuthorizationFailureCount; public final Counter keyInFindEntriesAbsent; @@ -141,6 +143,7 @@ public StoreMetrics(String prefix, MetricRegistry registry) { getResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreGetResponse")); putResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePutResponse")); deleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreDeleteResponse")); + purgeResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePurgeResponse")); ttlUpdateResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreTtlUpdateResponse")); undeleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreUndeleteResponse")); findEntriesSinceResponse = @@ -216,6 +219,8 @@ public StoreMetrics(String prefix, MetricRegistry registry) { registry.counter(MetricRegistry.name(BlobStore.class, name + "GetAuthorizationFailureCount")); deleteAuthorizationFailureCount = registry.counter(MetricRegistry.name(BlobStore.class, name + "DeleteAuthorizationFailureCount")); + purgeAuthorizationFailureCount = + registry.counter(MetricRegistry.name(BlobStore.class, name + "PurgeAuthorizationFailureCount")); ttlUpdateAuthorizationFailureCount = registry.counter(MetricRegistry.name(BlobStore.class, name + "TtlUpdateAuthorizationFailureCount")); undeleteAuthorizationFailureCount =