Skip to content

Commit

Permalink
Fix catalog drop table for dual storage (#237)
Browse files Browse the repository at this point in the history
## Summary

Currently dropTable in catalog assumes the cluster default storage as
the storage for all tables that were created. That assumption is
incorrect with the introduction of storage per table. This PR fixes that
bug/assumption by retrieving the correct fileio for a table being
dropped and perform cleanup with the correct fileio


## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [x] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [x] Refactoring
- [ ] Documentation
- [x] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Added a DualStorageTest that tests creation and deletion of tables on
hdfs and local.
# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
vikrambohra authored Oct 23, 2024
1 parent 79e2a9f commit 2960f85
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public List<TableIdentifier> listTables(Namespace namespace) {
@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
String tableLocation = loadTable(identifier).location();
FileIO fileIO = resolveFileIO(identifier);
log.debug("Dropping table {}, purge:{}", tableLocation, purge);
try {
houseTableRepository.deleteById(
Expand All @@ -110,7 +111,6 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
if (purge) {
// Delete data and metadata files from storage.
FileIO fileIO = fileIOManager.getFileIO(storageManager.getDefaultStorage().getType());
if (fileIO instanceof SupportsPrefixOperations) {
log.debug("Deleting files for table {}", tableLocation);
((SupportsPrefixOperations) fileIO).deletePrefix(tableLocation);
Expand All @@ -137,7 +137,7 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
* @param tableIdentifier
* @return fileIO
*/
private FileIO resolveFileIO(TableIdentifier tableIdentifier) {
protected FileIO resolveFileIO(TableIdentifier tableIdentifier) {
Optional<HouseTable> houseTable = Optional.empty();
try {
houseTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public interface TablesMapper {
})
TableDtoPrimaryKey toTableDtoPrimaryKey(TableIdentifier tableIdentifier);

@Mappings({
@Mapping(source = "tableDto.databaseId", target = "databaseId"),
@Mapping(source = "tableDto.tableId", target = "tableId")
})
TableDtoPrimaryKey toTableDtoPrimaryKey(TableDto tableDto);

@Mappings({
@Mapping(
conditionExpression = "java(tableIdentifier.namespace() != null)",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.linkedin.openhouse.tables.e2e.h2;

import static com.linkedin.openhouse.tables.model.TableModelConstants.buildGetTableResponseBodyWithDbTbl;
import static com.linkedin.openhouse.tables.model.TableModelConstants.buildTableDto;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import com.linkedin.openhouse.tables.dto.mapper.TablesMapper;
import com.linkedin.openhouse.tables.mock.properties.CustomClusterPropertiesInitializer;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.model.TableDtoPrimaryKey;
import com.linkedin.openhouse.tables.repository.OpenHouseInternalRepository;
import org.apache.iceberg.catalog.Catalog;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(initializers = CustomClusterPropertiesInitializer.class)
public class DualStorageTest {

@Autowired HouseTableRepository houseTablesRepository;

@SpyBean @Autowired OpenHouseInternalRepository openHouseInternalRepository;

@Autowired StorageManager storageManager;

@Autowired Catalog catalog;

@Autowired TablesMapper tablesMapper;

@Test
public void testCreateDropTableDualStorage() {

// Test create table
// db.table should be created on hdfs storage
TableDto hdfsTableDto = buildTableDto(buildGetTableResponseBodyWithDbTbl("db", "table"));
openHouseInternalRepository.save(hdfsTableDto);
TableDtoPrimaryKey hdfsDtoPrimaryKey = tablesMapper.toTableDtoPrimaryKey(hdfsTableDto);
Assertions.assertTrue(openHouseInternalRepository.existsById(hdfsDtoPrimaryKey));
HouseTablePrimaryKey hdfsHtsPrimaryKey =
HouseTablePrimaryKey.builder()
.databaseId(hdfsDtoPrimaryKey.getDatabaseId())
.tableId(hdfsDtoPrimaryKey.getTableId())
.build();
Assertions.assertTrue(houseTablesRepository.existsById(hdfsHtsPrimaryKey));
HouseTable houseTable = houseTablesRepository.findById(hdfsHtsPrimaryKey).get();
// storage type hdfs
Assertions.assertEquals(StorageType.HDFS.getValue(), houseTable.getStorageType());

// local_db.table should be created on local storage
TableDto localTableDto = buildTableDto(buildGetTableResponseBodyWithDbTbl("local_db", "table"));
openHouseInternalRepository.save(localTableDto);
TableDtoPrimaryKey localDtoPrimaryKey = tablesMapper.toTableDtoPrimaryKey(localTableDto);
Assertions.assertTrue(openHouseInternalRepository.existsById(localDtoPrimaryKey));
HouseTablePrimaryKey localHtsPrimaryKey =
HouseTablePrimaryKey.builder()
.databaseId(localDtoPrimaryKey.getDatabaseId())
.tableId(localDtoPrimaryKey.getTableId())
.build();
Assertions.assertTrue(houseTablesRepository.existsById(localHtsPrimaryKey));
houseTable = houseTablesRepository.findById(localHtsPrimaryKey).get();
// storage type local
Assertions.assertEquals(StorageType.LOCAL.getValue(), houseTable.getStorageType());

// Test Drop Table
openHouseInternalRepository.deleteById(hdfsDtoPrimaryKey);
Assertions.assertFalse(openHouseInternalRepository.existsById(hdfsDtoPrimaryKey));
openHouseInternalRepository.deleteById(localDtoPrimaryKey);
Assertions.assertFalse(openHouseInternalRepository.existsById(localDtoPrimaryKey));
}

@AfterAll
static void unsetSysProp() {
System.clearProperty("OPENHOUSE_CLUSTER_CONFIG_PATH");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import com.linkedin.openhouse.cluster.storage.selector.impl.DefaultStorageSelector;
import com.linkedin.openhouse.cluster.storage.selector.impl.RegexStorageSelector;
import com.linkedin.openhouse.tables.mock.properties.CustomClusterPropertiesInitializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -20,11 +20,10 @@ public class StoragePropertiesConfigTest {
@MockBean private StorageManager storageManager;
private static final String DEFAULT_TYPE = "hdfs";

private static final String DEFAULT_ENDPOINT = "hdfs://localhost:9000";
private static final String DEFAULT_ENDPOINT = "file:///";

private static final String ANOTHER_TYPE = "objectstore";
private static final String ANOTHER_TYPE = "local";

private static final String ANOTHER_ENDPOINT = "http://localhost:9000";
private static final String NON_EXISTING_TYPE = "non-existing-type";

@Test
Expand All @@ -41,7 +40,7 @@ public void testStorageTypeEndpoint() {
@Test
public void testStorageTypeLookup() {
Assertions.assertEquals(
ANOTHER_ENDPOINT, storageProperties.getTypes().get(ANOTHER_TYPE).getEndpoint());
DEFAULT_ENDPOINT, storageProperties.getTypes().get(ANOTHER_TYPE).getEndpoint());
}

@Test
Expand All @@ -60,13 +59,14 @@ public void testStorageSelector() {
Assertions.assertNotNull(storageProperties.getStorageSelector());
Assertions.assertEquals(
storageProperties.getStorageSelector().getName(),
DefaultStorageSelector.class.getSimpleName());
RegexStorageSelector.class.getSimpleName());
Assertions.assertNotNull(storageProperties.getStorageSelector().getParameters());
Assertions.assertEquals(storageProperties.getStorageSelector().getParameters().size(), 2);
Assertions.assertEquals(
storageProperties.getStorageSelector().getParameters().get("prop1"), "value1");
storageProperties.getStorageSelector().getParameters().get("regex"),
"local_db\\.[a-zA-Z0-9_]+$");
Assertions.assertEquals(
storageProperties.getStorageSelector().getParameters().get("prop2"), "value2");
storageProperties.getStorageSelector().getParameters().get("storage-type"), "local");
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,39 @@ public static TableDto buildTableDto(GetTableResponseBody getTableResponseBody)
.build();
}

public static GetTableResponseBody buildGetTableResponseBodyWithDbTbl(String db, String table) {
return GetTableResponseBody.builder()
.tableId(table)
.databaseId(db)
.clusterId(CLUSTER_NAME)
.tableUri(String.format("%s.%s.%s", CLUSTER_NAME, db, table))
.tableLocation(INITIAL_TABLE_VERSION)
.tableVersion(INITIAL_TABLE_VERSION)
.tableUUID(UUID.randomUUID().toString())
.tableProperties(buildTableProperties(TABLE_PROPS))
.tableCreator(TEST_USER)
.schema(HEALTH_SCHEMA_LITERAL)
.policies(TABLE_POLICIES)
.tableType(TableType.PRIMARY_TABLE)
.timePartitioning(
TimePartitionSpec.builder()
.columnName("timestampCol")
.granularity(TimePartitionSpec.Granularity.HOUR)
.build())
.clustering(
Arrays.asList(
ClusteringColumn.builder().columnName("id").build(),
ClusteringColumn.builder()
.columnName("name")
.transform(
Transform.builder()
.transformType(Transform.TransformType.TRUNCATE)
.transformParams(Collections.singletonList("10"))
.build())
.build()))
.build();
}

public static CreateUpdateTableRequestBody buildCreateUpdateTableRequestBody(
GetTableResponseBody getTableResponseBody) {
return buildCreateUpdateTableRequestBody(getTableResponseBody, false);
Expand Down
17 changes: 7 additions & 10 deletions services/tables/src/test/resources/cluster-test-properties.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@ cluster:
types:
hdfs:
rootpath: "/tmp/unittest"
endpoint: "hdfs://localhost:9000"
endpoint: "file:///"
parameters:
key1: value1
objectstore:
rootpath: "tmpbucket"
endpoint: "http://localhost:9000"
parameters:
key2: value2
token: xyz
local:
rootpath: "/tmp/unittest"
endpoint: "file:///"
storage-selector:
name: "DefaultStorageSelector"
name: "RegexStorageSelector"
parameters:
prop1: "value1"
prop2: "value2"
regex: local_db\.[a-zA-Z0-9_]+$
storage-type: local
housetables:
base-uri: "http://localhost:8080"
tables:
Expand Down

0 comments on commit 2960f85

Please sign in to comment.