Skip to content

Commit

Permalink
Disable retries on table save, increase write timeouts, fix typo (#253)
Browse files Browse the repository at this point in the history
## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

There can be a corruption issue in HTS when writes are retried after a
timeout exception:
1. Write times out with `IllegalStateException` but the mysql commit may
have succeeded.
2. A retry is attempted and faces a 409 error with
`CommitFailedException` due to already having the commit, which would
then clean up the metadata files while the server still has a pointer to
these files if the first commit actually succeeded under the hood.
3. Table is no longer queryable and enters a corrupted state.

We want to disable retries on table writes in the tables service. We
also want to increase the timeout for writes to reduce the odds of
timeouts occurring, since retries are no longer done this shouldn't be
increasing latency.

Also fixes a small typo `HouseTableRepositoryStateUnkownException` ->
`HouseTableRepositoryStateUnknownException`

## Changes

- [ ] Client-facing API Changes
- [x] Internal API Changes
- [x] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [x] Refactoring
- [ ] Documentation
- [x] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.

---------

Co-authored-by: William Lo <[email protected]>
  • Loading branch information
Will-Lo and William Lo authored Nov 14, 2024
1 parent 2f3eb80 commit 071c2b0
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableCallerException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableConcurrentUpdateException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableNotFoundException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnkownException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnknownException;
import io.netty.resolver.dns.DnsNameResolverTimeoutException;
import java.time.Duration;
import java.util.Arrays;
Expand All @@ -39,12 +39,15 @@
public class HouseTableRepositoryImpl implements HouseTableRepository {

/**
* The request timeout is decided based on retry template logic and server side gateway timeout of
* 60 sec. The retry template has retry max attempt of 3 with 2 secs delay (with delay multiplier
* as attempt increases) between each retry. So the overall retry process should complete within
* 60 sec.
* The read request timeout is decided based on retry template logic and server side gateway
* timeout of 60 sec. The retry template has retry max attempt of 3 with 2 secs delay (with delay
* multiplier as attempt increases) between each retry. So the overall retry process should
* complete within 60 sec.
*/
private static final int REQUEST_TIMEOUT_SECONDS = 17;
private static final int READ_REQUEST_TIMEOUT_SECONDS = 30;

/** Write request timeout is 60 secs due to no retries on table write operations */
private static final int WRITE_REQUEST_TIMEOUT_SECONDS = 60;

@Autowired private UserTableApi apiInstance;

Expand Down Expand Up @@ -87,7 +90,7 @@ public List<HouseTable> findAllByDatabaseId(String databaseId) {

return getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnkownException.class, IllegalStateException.class))
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.execute(
context ->
apiInstance
Expand All @@ -96,7 +99,7 @@ public List<HouseTable> findAllByDatabaseId(String databaseId) {
.flatMapMany(Flux::fromIterable)
.map(houseTableMapper::toHouseTable)
.collectList()
.block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)));
.block(Duration.ofSeconds(READ_REQUEST_TIMEOUT_SECONDS)));
}

@edu.umd.cs.findbugs.annotations.SuppressFBWarnings(
Expand All @@ -107,23 +110,20 @@ public HouseTable save(HouseTable entity) {
CreateUpdateEntityRequestBodyUserTable requestBody =
new CreateUpdateEntityRequestBodyUserTable().entity(houseTableMapper.toUserTable(entity));

return getHtsRetryTemplate(Arrays.asList(IllegalStateException.class))
.execute(
context ->
apiInstance
.putUserTable(requestBody)
.map(EntityResponseBodyUserTable::getEntity)
.map(houseTableMapper::toHouseTable)
.onErrorResume(this::handleHtsHttpError)
.block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)));
return apiInstance
.putUserTable(requestBody)
.map(EntityResponseBodyUserTable::getEntity)
.map(houseTableMapper::toHouseTable)
.onErrorResume(this::handleHtsHttpError)
.block(Duration.ofSeconds(WRITE_REQUEST_TIMEOUT_SECONDS));
}

@Override
public Optional<HouseTable> findById(HouseTablePrimaryKey houseTablePrimaryKey) {

return getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnkownException.class, IllegalStateException.class))
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.execute(
context ->
apiInstance
Expand All @@ -133,7 +133,7 @@ public Optional<HouseTable> findById(HouseTablePrimaryKey houseTablePrimaryKey)
.map(houseTableMapper::toHouseTable)
.switchIfEmpty(Mono.empty())
.onErrorResume(this::handleHtsHttpError)
.blockOptional(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)));
.blockOptional(Duration.ofSeconds(READ_REQUEST_TIMEOUT_SECONDS)));
}

/**
Expand All @@ -159,14 +159,14 @@ private Mono<? extends HouseTable> handleHtsHttpError(Throwable e) {
} else if (e instanceof WebClientResponseException
&& ((WebClientResponseException) e).getStatusCode().is5xxServerError()) {
return Mono.error(
new HouseTableRepositoryStateUnkownException(
new HouseTableRepositoryStateUnknownException(
"Cannot determine if HTS has persisted the proposed change", e));
} else if (ExceptionUtils.indexOfThrowable(e, DnsNameResolverTimeoutException.class)
!= -1) { // DnsNameResolverTimeoutException appears nested within exception causes and
// ExceptionUtils class is used to match the occurrence of this failure. Retry is done
// for this failure using existing retry template.
return Mono.error(
new HouseTableRepositoryStateUnkownException(
new HouseTableRepositoryStateUnknownException(
"HTS service could not be resolved due to DNS lookup timeout", e));
} else {
return Mono.error(new RuntimeException("UNKNOWN and unhandled failure from HTS:", e));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.openhouse.internal.catalog.repository.exception;

/** Exception thrown when HTS returns a 5xx. */
public class HouseTableRepositoryStateUnknownException extends HouseTableRepositoryException {
public HouseTableRepositoryStateUnknownException(String message, Throwable cause) {
super(message, cause);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableCallerException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableConcurrentUpdateException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableNotFoundException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnkownException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnknownException;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -322,7 +322,7 @@ void testDoCommitExceptionHandling() {
CommitFailedException.class,
() -> openHouseInternalTableOperations.doCommit(base, metadata));
when(mockHouseTableRepository.save(Mockito.any(HouseTable.class)))
.thenThrow(HouseTableRepositoryStateUnkownException.class);
.thenThrow(HouseTableRepositoryStateUnknownException.class);
Assertions.assertThrows(
CommitStateUnknownException.class,
() -> openHouseInternalTableOperations.doCommit(base, metadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,35 @@
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableCallerException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableConcurrentUpdateException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableNotFoundException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnkownException;
import com.linkedin.openhouse.internal.catalog.repository.exception.HouseTableRepositoryStateUnknownException;
import io.netty.handler.codec.dns.DefaultDnsQuestion;
import io.netty.handler.codec.dns.DnsRecordType;
import io.netty.resolver.dns.DnsNameResolverTimeoutException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Mono;

/** As part of prerequisite of this test, bring up the /hts Springboot application. */
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
Expand All @@ -49,6 +57,8 @@ public class HouseTableRepositoryImplTest {

@Autowired HouseTableMapper houseTableMapper;

@SpyBean UserTableApi userTableApi;

@TestConfiguration
public static class MockWebServerConfiguration {
/**
Expand Down Expand Up @@ -384,7 +394,7 @@ public void testRetryForFindByIdHtsCall() {
((HouseTableRepositoryImpl) htsRepo)
.getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnkownException.class, IllegalStateException.class))
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.registerListener(retryListener);
Assertions.assertThrows(
HouseTableConcurrentUpdateException.class, () -> htsRepo.findById(testKey));
Expand All @@ -393,7 +403,7 @@ public void testRetryForFindByIdHtsCall() {
}

@Test
public void testNoRetryForStateUnkown() {
public void testNoRetryForStateUnknown() {
for (int i : Arrays.asList(500, 501, 502, 503, 504)) {
mockHtsServer.enqueue(
new MockResponse()
Expand All @@ -405,9 +415,10 @@ public void testNoRetryForStateUnkown() {
.getHtsRetryTemplate(Collections.singletonList(IllegalStateException.class))
.registerListener(retryListener);
Assertions.assertThrows(
HouseTableRepositoryStateUnkownException.class, () -> htsRepo.save(HOUSE_TABLE));
HouseTableRepositoryStateUnknownException.class, () -> htsRepo.save(HOUSE_TABLE));
int actualRetryCount = retryListener.getRetryCount();
Assertions.assertEquals(actualRetryCount, 1);
// Should not be retrying table writes regardless of the error to avoid corruption
Assertions.assertEquals(actualRetryCount, 0);
}
}

Expand All @@ -423,7 +434,7 @@ public void testRetryForHtsFindByIdCallOnConcurrentException() {
((HouseTableRepositoryImpl) htsRepo)
.getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnkownException.class, IllegalStateException.class))
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.registerListener(retryListener);

HouseTablePrimaryKey testKey =
Expand All @@ -436,4 +447,82 @@ public void testRetryForHtsFindByIdCallOnConcurrentException() {
int actualRetryCount = retryListener.getRetryCount();
Assertions.assertEquals(actualRetryCount, 1);
}

@Test
public void testWriteTimeout() {
EntityResponseBodyUserTable putResponse = new EntityResponseBodyUserTable();
putResponse.entity(houseTableMapper.toUserTable(HOUSE_TABLE));
int writeTimeout = 60;
mockHtsServer.enqueue(
new MockResponse()
.setResponseCode(200)
.setBody((new Gson()).toJson(putResponse))
.setHeadersDelay(writeTimeout - 2, TimeUnit.SECONDS)
.addHeader("Content-Type", "application/json"));
Assertions.assertDoesNotThrow(() -> htsRepo.save(HOUSE_TABLE));
}

@Test
public void testReadTimeoutWithRetries() {
EntityResponseBodyUserTable response = new EntityResponseBodyUserTable();
response.entity(houseTableMapper.toUserTable(HOUSE_TABLE));
int readTimeout = 30;
mockHtsServer.enqueue(
new MockResponse()
.setResponseCode(200)
.setBody((new Gson()).toJson(response))
.setHeadersDelay(readTimeout + 1, TimeUnit.SECONDS)
.addHeader("Content-Type", "application/json"));
mockHtsServer.enqueue(
new MockResponse()
.setResponseCode(200)
.setBody((new Gson()).toJson(response))
.setHeadersDelay(readTimeout + 1, TimeUnit.SECONDS)
.addHeader("Content-Type", "application/json"));
mockHtsServer.enqueue(
new MockResponse()
.setResponseCode(200)
.setBody((new Gson()).toJson(response))
.setHeadersDelay(0, TimeUnit.SECONDS) // Last attempt should pass
.addHeader("Content-Type", "application/json"));

CustomRetryListener retryListener = new CustomRetryListener();
((HouseTableRepositoryImpl) htsRepo)
.getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.registerListener(retryListener);

Assertions.assertDoesNotThrow(
() ->
htsRepo.findById(
HouseTablePrimaryKey.builder()
.tableId(HOUSE_TABLE.getTableId())
.databaseId(HOUSE_TABLE.getDatabaseId())
.build()));
Assertions.assertEquals(retryListener.getRetryCount(), 2);
}

@Test
void testDnsResolverTimeoutDoesNotRetry() {
DnsNameResolverTimeoutException mockDnsException =
new DnsNameResolverTimeoutException(
InetSocketAddress.createUnresolved("localhost", 8080),
new DefaultDnsQuestion("test", DnsRecordType.ANY),
"DNS resolution timeout");
Mockito.doReturn(Mono.error(mockDnsException)).when(userTableApi).putUserTable(Mockito.any());

CustomRetryListener retryListener = new CustomRetryListener();
((HouseTableRepositoryImpl) htsRepo)
.getHtsRetryTemplate(
Arrays.asList(
HouseTableRepositoryStateUnknownException.class, IllegalStateException.class))
.registerListener(retryListener);

Assertions.assertThrows(
HouseTableRepositoryStateUnknownException.class, () -> htsRepo.save(HOUSE_TABLE));
int actualRetryCount = retryListener.getRetryCount();
// Should not be retrying table writes regardless of the error to avoid corruption
Assertions.assertEquals(actualRetryCount, 0);
}
}

0 comments on commit 071c2b0

Please sign in to comment.