Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mcc cdh5.10.1 #1

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,84 +15,87 @@

<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.7</maven.compiler.source>
<hadoop.version>2.6.0-cdh5.10.1</hadoop.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid duplication use the variable for CDH in this one.

<hbase.version>1.2.0-cdh5.10.1</hbase.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

<cloudera-manager.version>5.10.1</cloudera-manager.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this one to cdh.version.


</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.10.1</version>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.10.1</version>
<version>${hadoop.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
<!-- <scope>runtime</scope> -->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment.

</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
<version>1.2.0-cdh5.10.1</version>
<version>${hbase.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>com.cloudera.api</groupId>
<artifactId>cloudera-manager-api</artifactId>
<version>5.10.1</version>
<version>${cloudera-manager.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void createTableAsync(final HTableDescriptor desc,
callArray[counter++] = new Callable<Void>() {
@Override
public Void call() throws Exception {
LOG.info("createTableAsync: " + desc.getName() + " for cluster: primary");
LOG.info("createTableAsync: " + desc.getTableName() + " for cluster: primary");
HBaseAdminMultiCluster.super.createTableAsync(desc, splitKeys);
return null;
}
Expand All @@ -240,7 +240,7 @@ public Void call() throws Exception {
callArray[counter++] = new Callable<Void>() {
@Override
public Void call() throws Exception {
LOG.info("createTableAsync: " + desc.getName() + " for cluster: " + entry.getKey());
LOG.info("createTableAsync: " + desc.getTableName() + " for cluster: " + entry.getKey());
entry.getValue().createTableAsync(desc, splitKeys);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,6 @@ public static Configuration combineConfigurations(Configuration primary, Configu
return combineConfigurations(primary, map);
}

public static Configuration combineConfigurations(Configuration primary, Configuration failover, Configuration failover2 ) {
Map<String, Configuration> map = new HashMap<String, Configuration>();
map.put("failover1", failover);
map.put("failover2", failover2);
return combineConfigurations(primary, map);
}

// This method obtains the Hbase configuration from 2 clusters and combines
// them
public static Configuration combineConfigurations(String host1, String user1, String pwd1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,22 +185,22 @@ public Table getTable(TableName tableName, ExecutorService pool)

@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
return null;
return primaryConnection.getBufferedMutator(tableName);
}

@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorParams) throws IOException {
return null;
return primaryConnection.getBufferedMutator(bufferedMutatorParams);
}

@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return null;
return primaryConnection.getRegionLocator(tableName);
}

@Override
public Admin getAdmin() throws IOException {
return null;
return primaryConnection.getAdmin();
}

public boolean isMasterRunning() throws MasterNotRunningException,
Expand Down Expand Up @@ -255,9 +255,16 @@ public HTableDescriptor[] listTables() throws IOException {
}

@Deprecated
// public String[] getTableNames() throws IOException {
// return primaryConnection.getAdmin().listTableNames().toString();
// }
public String[] getTableNames() throws IOException {
TableName[] tableNames = primaryConnection.getAdmin().listTableNames();
String[] tNames = new String[tableNames.length];
int i =0;
for (TableName tn: tableNames) {
tNames[i] = tn.toString();
i++;
}
return tNames;
}

public TableName[] listTableNames() throws IOException {
return primaryConnection.getAdmin().listTableNames();
Expand Down Expand Up @@ -454,17 +461,29 @@ public <R> void processBatchCallback(List<? extends Row> list,
// return primaryConnection.getCurrentNrHRS();
// }

// public HTableDescriptor[] getHTableDescriptorsByTableName(
// List<TableName> tableNames) throws IOException {
// return primaryConnection.getAdmin().getTableDescriptor(tableNames)
// }
//
// @Deprecated
// public
// HTableDescriptor[] getHTableDescriptors(List<String> tableNames)
// throws IOException {
// return primaryConnection.getAdmin().getTableDescriptor(tableNames);
// }
public HTableDescriptor[] getHTableDescriptorsByTableName(
List<TableName> tableNames) throws IOException {
HTableDescriptor tdArr[] = new HTableDescriptor[tableNames.size()];
int i=0;
for (TableName tn: tableNames) {
tdArr[i] = primaryConnection.getAdmin().getTableDescriptor(tn);
i++;
}
return tdArr;
}

@Deprecated
public
HTableDescriptor[] getHTableDescriptors(List<String> tableNames)
throws IOException {
HTableDescriptor tdArr[] = new HTableDescriptor[tableNames.size()];
int i=0;
for (String tn: tableNames) {
tdArr[i] = primaryConnection.getAdmin().getTableDescriptor(TableName.valueOf(tn));
i++;
}
return tdArr;
}

public boolean isClosed() {
return primaryConnection.isClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use * imports.

import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.CompareFilter;
Expand Down Expand Up @@ -106,7 +103,7 @@ public boolean exists(final Get get) throws IOException {

@Override
public boolean[] existsAll(List<Get> list) throws IOException {
return new boolean[0];
return primaryHTable.existsAll(list);
}

public Tuple<Boolean> multiClusterExists(final Get get) throws IOException {
Expand All @@ -130,31 +127,37 @@ public Boolean call(Table table) throws Exception {
return new Tuple<Boolean>(result.isPrimary, doesExist);
}

// public Boolean[] exists(final List<Get> gets) throws IOException {
// return multiClusterExists(gets).getOriginalReturn();
// }
public Boolean[] exists(final List<Get> gets) throws IOException {
return multiClusterExists(gets).getOriginalReturn();
}

// public Tuple<Boolean[]> multiClusterExists(final List<Get> gets) throws IOException {
// long startTime = System.currentTimeMillis();
//
// HBaseTableFunction<Boolean[]> function = new HBaseTableFunction<Boolean[]>() {
// @Override
// public Boolean[] call(Table table) throws Exception {
// return table.exists(gets);
// }
// };
//
// SpeculativeRequester.ResultWrapper<Boolean[]> result = (new SpeculativeRequester<Boolean[]>(
// waitTimeBeforeRequestingFailover, waitTimeBeforeAcceptingResults, lastPrimaryFail,
// waitTimeFromLastPrimaryFail)).
// request(function, primaryHTable, failoverHTables);
//
// stats.addGetList(result.isPrimary, System.currentTimeMillis() - startTime);
//
// Boolean[] doesExists = result.t;
//
// return new Tuple<Boolean[]>(result.isPrimary, doesExists);
// }
public Tuple<Boolean[]> multiClusterExists(final List<Get> gets) throws IOException {
long startTime = System.currentTimeMillis();

HBaseTableFunction<Boolean[]> function = new HBaseTableFunction<Boolean[]>() {
Boolean[] bool = new Boolean[gets.size()];
int i=0;
@Override
public Boolean[] call(Table table) throws Exception {
for (Get gt: gets) {
bool[i]= table.exists(gt);
i++;
}
return bool;
}
};

SpeculativeRequester.ResultWrapper<Boolean[]> result = (new SpeculativeRequester<Boolean[]>(
waitTimeBeforeRequestingFailover, waitTimeBeforeAcceptingResults, lastPrimaryFail,
waitTimeFromLastPrimaryFail)).
request(function, primaryHTable, failoverHTables);

stats.addGetList(result.isPrimary, System.currentTimeMillis() - startTime);

Boolean[] doesExists = result.t;

return new Tuple<Boolean[]>(result.isPrimary, doesExists);
}

public void batch(final List<? extends Row> actions, final Object[] results)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -241,10 +244,7 @@ public Tuple<Result> multiClusterGetRowOrBefore(final byte[] row, final byte[] f
HBaseTableFunction<Result> function = new HBaseTableFunction<Result>() {
@Override
public Result call(Table table) throws Exception {

return (Result)table.getScanner(row, family);

//return table.getRowOrBefore(row, family);
}
};

Expand Down Expand Up @@ -406,7 +406,7 @@ private Put setTimeStampOfUnsetValues(final Put put, long ts)
// This will protect us from a multicluster sumbission
if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
newPut
.addColumn(cell.getFamily(), cell.getQualifier(), ts, cell.getValue());
.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), ts, CellUtil.cloneValue(cell));
} else {
newPut.add(cell);
}
Expand Down Expand Up @@ -472,7 +472,7 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,

@Override
public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException {
return false;
return primaryHTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put);
}

public void delete(final Delete delete) throws IOException {
Expand Down Expand Up @@ -532,7 +532,7 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,

@Override
public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException {
return false;
return primaryHTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete);
}

public void mutateRow(final RowMutations rm) throws IOException {
Expand Down
Loading