Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Efficient_Metadata_Operations] [DEPENDS_ON PR#2555] Handle purged blobs in store operations. #2578

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ default void onBlobUndeleted(String blobId, String serviceId, Account account, C
*/
void onBlobReplicaDeleted(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType);

/**
* Notifies the underlying system when a purged state of a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
* @param port The port of the source host from where the notification is being invoked.
* @param blobId The id of the blob whose purged state has been replicated
* @param sourceType The source that purged the blob replica
*/
void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType);

/**
* Notifies the underlying system when a updated state of a blob is replicated to a node
* @param sourceHost The source host from where the notification is being invoked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public interface RequestAPI {
*/
void handleDeleteRequest(NetworkRequest request) throws IOException, InterruptedException;

/**
* Purges the blob from the store.
* @param request The request that contains the partition and id of the blob that needs to be purged.
* @throws IOException if there are I/O errors carrying our the required operation.
* @throws InterruptedException if request processing is interrupted.
*/
void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException;

/**
* Updates the TTL of a blob as required in {@code request}.
* @param request The request that contains the partition and id of the blob that needs to be updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ public enum StoreErrorCodes {
Life_Version_Conflict,
ID_Not_Deleted,
ID_Undeleted,
ID_Deleted_Permanently
ID_Deleted_Permanently,
// TODO Efficient_Metadata_Operations_TODO : ID_Purged error should be handled in all of the store's methods.
ID_Purged
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ public void handleDeleteRequest(NetworkRequest request) throws InterruptedExcept
}
}

@Override
public void handlePurgeRequest(NetworkRequest request) {
throw new UnsupportedOperationException("Purge is not supported in cloud yet.");
}

@Override
public void handleTtlUpdateRequest(NetworkRequest request) throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte
throw new UnsupportedOperationException("Request type not supported");
}

@Override
public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException {
throw new UnsupportedOperationException("Request type not supported");
}

@Override
public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException {
throw new UnsupportedOperationException("Request type not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo
logger.debug("onBlobReplicaDeleted {}, {}, {}, {}", sourceHost, port, blobId, sourceType);
}

@Override
public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
logger.debug("onBlobReplicaPurged {}, {}, {}, {}", sourceHost, port, blobId, sourceType);
}

@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ public class ServerMetrics {
public final Histogram deleteBlobRequestQueueTimeInMs;
public final Histogram deleteBlobProcessingTimeInMs;
public final Histogram deleteBlobResponseQueueTimeInMs;
public final Histogram purgeBlobResponseQueueTimeInMs;
public final Histogram deleteBlobSendTimeInMs;
public final Histogram purgeBlobSendTimeInMs;
public final Histogram deleteBlobTotalTimeInMs;
public final Histogram purgeBlobTotalTimeInMs;

public final Histogram undeleteBlobRequestQueueTimeInMs;
public final Histogram undeleteBlobProcessingTimeInMs;
Expand Down Expand Up @@ -255,6 +258,7 @@ public class ServerMetrics {
public final Counter unExpectedStoreGetError;
public final Counter unExpectedStoreTtlUpdateError;
public final Counter unExpectedStoreDeleteError;
public final Counter unExpectedStorePurgeError;
public final Counter unExpectedStoreUndeleteError;
public final Counter unExpectedAdminOperationError;
public final Counter unExpectedStoreFindEntriesError;
Expand All @@ -263,6 +267,7 @@ public class ServerMetrics {
public final Counter unknownFormatError;
public final Counter idNotFoundError;
public final Counter idDeletedError;
public final Counter idPurgedError;
public final Counter idUndeletedError;
public final Counter idNotDeletedError;
public final Counter lifeVersionConflictError;
Expand All @@ -271,6 +276,7 @@ public class ServerMetrics {
public final Counter temporarilyDisabledError;
public final Counter getAuthorizationFailure;
public final Counter deleteAuthorizationFailure;
public final Counter purgeAuthorizationFailure;
public final Counter undeleteAuthorizationFailure;
public final Counter ttlUpdateAuthorizationFailure;
public final Counter ttlAlreadyUpdatedError;
Expand Down Expand Up @@ -393,8 +399,12 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
deleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobProcessingTime"));
deleteBlobResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobResponseQueueTime"));
purgeBlobResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobResponseQueueTimeInMs"));
deleteBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobSendTime"));
purgeBlobSendTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobSendTimeInMs"));
deleteBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "DeleteBlobTotalTime"));
purgeBlobTotalTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "PurgeBlobTotalTimeInMs"));

undeleteBlobRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "UndeleteBlobRequestQueueTime"));
Expand Down Expand Up @@ -613,6 +623,7 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
unknownFormatError = registry.counter(MetricRegistry.name(requestClass, "UnknownFormatError"));
idNotFoundError = registry.counter(MetricRegistry.name(requestClass, "IDNotFoundError"));
idDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDDeletedError"));
idPurgedError = registry.counter(MetricRegistry.name(requestClass, "IDPurgedError"));
idUndeletedError = registry.counter(MetricRegistry.name(requestClass, "IDUndeletedError"));
idNotDeletedError = registry.counter(MetricRegistry.name(requestClass, "IDNotDeletedError"));
lifeVersionConflictError = registry.counter(MetricRegistry.name(requestClass, "lifeVersionConflictError"));
Expand All @@ -622,6 +633,8 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
unExpectedStorePutError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStorePutError"));
unExpectedStoreGetError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreGetError"));
unExpectedStoreDeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreDeleteError"));
unExpectedStorePurgeError =
registry.counter(MetricRegistry.name(requestClass, "UnExpectedStorePurgeError"));
unExpectedStoreUndeleteError = registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreUndeleteError"));
unExpectedAdminOperationError =
registry.counter(MetricRegistry.name(requestClass, "UnexpectedAdminOperationError"));
Expand All @@ -631,6 +644,8 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
registry.counter(MetricRegistry.name(requestClass, "UnexpectedStoreFindEntriesError"));
getAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "GetAuthorizationFailure"));
deleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "DeleteAuthorizationFailure"));
purgeAuthorizationFailure =
registry.counter(MetricRegistry.name(requestClass, "PurgeAuthorizationFailure"));
undeleteAuthorizationFailure = registry.counter(MetricRegistry.name(requestClass, "UndeleteAuthorizationFailure"));
ttlUpdateAuthorizationFailure =
registry.counter(MetricRegistry.name(requestClass, "TtlUpdateAuthorizationFailure"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PurgeRequest extends RequestOrResponse {
private final static short CURRENT_VERSION = PURGE_MESSAGE_REQUEST_VERSION_1;

private final BlobId blobId;
private final long purgeTimeInMs;

/**
* Helper to construct PurgeRequest from a stream
Expand All @@ -55,20 +56,23 @@ public static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) thro
* @param correlationId the correlation id for the request
* @param clientId the id of the client generating the request
* @param blobId the blob ID whose TTL needs to be updated
* @param purgeTimeInMs deletion time of the blob in ms
*/
public PurgeRequest(int correlationId, String clientId, BlobId blobId) {
this(correlationId, clientId, blobId, CURRENT_VERSION);
public PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs) {
this(correlationId, clientId, blobId, purgeTimeInMs, CURRENT_VERSION);
}

/**
* @param correlationId the correlation id for the request
* @param clientId the id of the client generating the request
* @param blobId the blob ID that needs to be purged.
* @param purgeTimeInMs deletion time of the blob in ms
* @param version the version of the {@link PurgeRequest}.
*/
PurgeRequest(int correlationId, String clientId, BlobId blobId, short version) {
PurgeRequest(int correlationId, String clientId, BlobId blobId, long purgeTimeInMs, short version) {
super(PurgeRequest, version, correlationId, clientId);
this.blobId = blobId;
this.purgeTimeInMs = purgeTimeInMs;
}

@Override
Expand Down Expand Up @@ -103,6 +107,13 @@ public short getContainerId() {
return blobId.getContainerId();
}

/**
* @return the purge time.
*/
public long getPurgeTimeInMs() {
return purgeTimeInMs;
}

@Override
public long sizeInBytes() {
// header + blobId
Expand All @@ -124,7 +135,8 @@ static PurgeRequest readFrom(DataInputStream stream, ClusterMap map) throws IOEx
int correlationId = stream.readInt();
String clientId = Utils.readIntString(stream);
BlobId id = new BlobId(stream, map);
return new PurgeRequest(correlationId, clientId, id, PURGE_MESSAGE_REQUEST_VERSION_1);
long purgeTimeInMs = stream.readLong();
return new PurgeRequest(correlationId, clientId, id, purgeTimeInMs, PURGE_MESSAGE_REQUEST_VERSION_1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ public void onBlobReplicaDeleted(String sourceHost, int port, String blobId, Blo
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaPurged(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType) {
throw new IllegalStateException("Not implemented");
}

@Override
public void onBlobReplicaUpdated(String sourceHost, int port, String blobId, BlobReplicaSourceType sourceType,
UpdateType updateType, MessageInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ class EventTracker {
private final int numberOfReplicas;
private final Helper creationHelper;
private final Helper deletionHelper;
private final Helper purgeHelper;
private final Helper undeleteHelper;
private final Helper replicateHelper;
private final ConcurrentMap<UpdateType, Helper> updateHelpers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -496,6 +497,7 @@ private String getKey(String host, int port) {
numberOfReplicas = expectedNumberOfReplicas;
creationHelper = new Helper(numberOfReplicas);
deletionHelper = new Helper(numberOfReplicas);
purgeHelper = new Helper(numberOfReplicas);
undeleteHelper = new Helper(numberOfReplicas);
// On-demand-replication, we usually only need one replica to replicate the Blob.
replicateHelper = new Helper(1);
Expand All @@ -519,6 +521,15 @@ void trackDeletion(String host, int port) {
deletionHelper.track(host, port);
}

/**
* Tracks the purge event that arrived on {@code host}:{@code port}.
* @param host the host that received the purge request.
* @param port the port of the host that describes the instance along with {@code host}.
*/
void trackPurge(String host, int port) {
purgeHelper.track(host, port);
}

/**
* Tracks the undelete event that arrived on {@code host}:{@code port}.
* @param host the host that received the undelete
Expand Down Expand Up @@ -686,6 +697,13 @@ public synchronized void onBlobReplicaDeleted(String sourceHost, int port, Strin
.trackDeletion(sourceHost, port);
}

@Override
public synchronized void onBlobReplicaPurged(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType) {
objectTracker.computeIfAbsent(blobId, k -> new EventTracker(getNumReplicas(blobId)))
.trackPurge(sourceHost, port);
}

@Override
public synchronized void onBlobReplicaUpdated(String sourceHost, int port, String blobId,
BlobReplicaSourceType sourceType, UpdateType updateType, MessageInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.github.ambry.protocol.GetResponse;
import com.github.ambry.protocol.PartitionRequestInfo;
import com.github.ambry.protocol.PartitionResponseInfo;
import com.github.ambry.protocol.PurgeRequest;
import com.github.ambry.protocol.PurgeResponse;
import com.github.ambry.protocol.PutRequest;
import com.github.ambry.protocol.PutResponse;
import com.github.ambry.protocol.ReplicaMetadataRequest;
Expand Down Expand Up @@ -494,6 +496,74 @@ public void handleDeleteRequest(NetworkRequest request) throws IOException, Inte
metrics.deleteBlobTotalTimeInMs, null, null, totalTimeSpent));
}

@Override
public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException {
PurgeRequest purgeRequest = PurgeRequest.readFrom(new DataInputStream(request.getInputStream()), clusterMap);
long requestQueueTime = SystemTime.getInstance().milliseconds() - request.getStartTimeInMs();
long totalTimeSpent = requestQueueTime;
long startTime = SystemTime.getInstance().milliseconds();
PurgeResponse response = null;
try {
StoreKey convertedStoreKey = getConvertedStoreKeys(Collections.singletonList(purgeRequest.getBlobId())).get(0);
ServerErrorCode error =
validateRequest(purgeRequest.getBlobId().getPartition(), RequestOrResponseType.PurgeRequest, false);
if (error != ServerErrorCode.No_Error) {
logger.error("Validating purge request failed with error {} for request {}", error, purgeRequest);
response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), error);
} else {
BlobId convertedBlobId = (BlobId) convertedStoreKey;
MessageInfo info = new MessageInfo.Builder(convertedBlobId, -1, convertedBlobId.getAccountId(),
convertedBlobId.getContainerId(), purgeRequest.getPurgeTimeInMs()).isDeleted(true)
.lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND)
.build();
Store storeToPurge = storeManager.getStore(purgeRequest.getBlobId().getPartition());
storeToPurge.purge(Collections.singletonList(info));
response =
new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(), ServerErrorCode.No_Error);
if (notification != null) {
notification.onBlobReplicaPurged(currentNode.getHostname(), currentNode.getPort(), convertedStoreKey.getID(),
BlobReplicaSourceType.PRIMARY);
}
}
} catch (StoreException e) {
boolean logInErrorLevel = false;
if (e.getErrorCode() == StoreErrorCodes.ID_Not_Found) {
metrics.idNotFoundError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.ID_Purged) {
metrics.idPurgedError.inc();
} else if (e.getErrorCode() == StoreErrorCodes.Authorization_Failure) {
metrics.purgeAuthorizationFailure.inc();
} else {
logInErrorLevel = true;
metrics.unExpectedStorePurgeError.inc();
}
if (logInErrorLevel) {
logger.error("Store exception on a purge message with error code {} for request {}", e.getErrorCode(),
purgeRequest, e);
} else {
logger.trace("Store exception on a purge with error code {} for request {}", e.getErrorCode(), purgeRequest,
e);
}
response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(),
ErrorMapping.getStoreErrorMapping(e.getErrorCode()));
} catch (Exception e) {
logger.error("Unknown exception for purge request {}", purgeRequest, e);
response = new PurgeResponse(purgeRequest.getCorrelationId(), purgeRequest.getClientId(),
ServerErrorCode.Unknown_Error);
metrics.unExpectedStorePurgeError.inc();
} finally {
long processingTime = SystemTime.getInstance().milliseconds() - startTime;
totalTimeSpent += processingTime;
publicAccessLogger.info("{} {} processingTime {}", purgeRequest, response, processingTime);
// Update request metrics.
RequestMetricsUpdater metricsUpdater = new RequestMetricsUpdater(requestQueueTime, processingTime, 0, 0, false);
purgeRequest.accept(metricsUpdater);
}
requestResponseChannel.sendResponse(response, request,
new ServerNetworkResponseMetrics(metrics.purgeBlobResponseQueueTimeInMs, metrics.purgeBlobSendTimeInMs,
metrics.purgeBlobTotalTimeInMs, null, null, totalTimeSpent));
}

@Override
public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException {
TtlUpdateRequest updateRequest;
Expand Down
Loading