diff --git a/pom.xml b/pom.xml index 98c17d6..56e9627 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,10 @@ 1.7 1.7 + 5.10.1 + 2.6.0-cdh${cdh.version} + 1.2.0-cdh${cdh.version} + @@ -22,77 +26,77 @@ org.apache.hadoop hadoop-common - 2.5.0-cdh5.3.0 + ${hadoop.version} org.apache.hadoop hadoop-common - 2.5.0-cdh5.3.0 + ${hadoop.version} test-jar tests org.apache.hbase hbase-client - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-server - 0.98.6-cdh5.3.0 + ${hbase.version} test-jar org.apache.hbase hbase-server - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-protocol - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-hadoop2-compat - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-hadoop2-compat - 0.98.6-cdh5.3.0 + ${hbase.version} test-jar tests org.apache.hbase hbase-common - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-common - 0.98.6-cdh5.3.0 + ${hbase.version} test-jar tests org.apache.hbase hbase-hadoop-compat - 0.98.6-cdh5.3.0 + ${hbase.version} org.apache.hbase hbase-hadoop-compat - 0.98.6-cdh5.3.0 + ${hbase.version} test-jar tests com.cloudera.api cloudera-manager-api - 5.3.1 + ${cdh.version} org.apache.cxf diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdminMultiCluster.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdminMultiCluster.java index d5d2f7e..f238794 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdminMultiCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdminMultiCluster.java @@ -21,26 +21,8 @@ public class HBaseAdminMultiCluster extends HBaseAdmin { Map failoverAdminMap = new HashMap(); - public HBaseAdminMultiCluster(Configuration c) - throws MasterNotRunningException, ZooKeeperConnectionException, - IOException { - super(HBaseMultiClusterConfigUtil.splitMultiConfigFile(c).get( - HBaseMultiClusterConfigUtil.PRIMARY_NAME)); - - Map configs = HBaseMultiClusterConfigUtil - .splitMultiConfigFile(c); - - for (Entry entry : configs.entrySet()) { - - if (!entry.getKey().equals(HBaseMultiClusterConfigUtil.PRIMARY_NAME)) { - HBaseAdmin admin = new HBaseAdmin(entry.getValue()); - LOG.info("creating HBaseAdmin for : " + entry.getKey()); - failoverAdminMap.put(entry.getKey(), admin); - LOG.info(" - successfully creating HBaseAdmin for : " + entry.getKey()); - } - } - LOG.info("Successful loaded all HBaseAdmins"); - + public HBaseAdminMultiCluster(ClusterConnection connection) { + super(connection); } @Override @@ -230,7 +212,7 @@ public void createTableAsync(final HTableDescriptor desc, callArray[counter++] = new Callable() { @Override public Void call() throws Exception { - LOG.info("createTableAsync: " + desc.getName() + " for cluster: primary"); + LOG.info("createTableAsync: " + desc.getTableName() + " for cluster: primary"); HBaseAdminMultiCluster.super.createTableAsync(desc, splitKeys); return null; } @@ -240,7 +222,7 @@ public Void call() throws Exception { callArray[counter++] = new Callable() { @Override public Void call() throws Exception { - LOG.info("createTableAsync: " + desc.getName() + " for cluster: " + entry.getKey()); + LOG.info("createTableAsync: " + desc.getTableName() + " for cluster: " + entry.getKey()); entry.getValue().createTableAsync(desc, splitKeys); return null; } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseTableFunction.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseTableFunction.java index e0c3a8f..d2c2b53 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseTableFunction.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseTableFunction.java @@ -3,5 +3,5 @@ import org.apache.hadoop.hbase.client.HTableInterface; public interface HBaseTableFunction { - public T call(HTableInterface table) throws Exception; + public T call(Table table) throws Exception; } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManagerMultiClusterWrapper.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManagerMultiClusterWrapper.java index f8fe1c0..5ef01ea 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManagerMultiClusterWrapper.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManagerMultiClusterWrapper.java @@ -11,7 +11,7 @@ public class HConnectionManagerMultiClusterWrapper { - public static HConnection createConnection(Configuration conf) + public static Connection createConnection(Configuration conf) throws IOException { Logger LOG = Logger.getLogger(HConnectionManagerMultiClusterWrapper.class); @@ -21,31 +21,31 @@ public static HConnection createConnection(Configuration conf) if (failoverClusters.size() == 0) { LOG.info(" -- Getting a signle cluster connection !!"); - return HConnectionManager.createConnection(conf); + return ConnectionFactory.createConnection(conf); } else { Map configMap = HBaseMultiClusterConfigUtil .splitMultiConfigFile(conf); LOG.info(" -- Getting primary Connction"); - HConnection primaryConnection = HConnectionManager + Connection primaryConnection = ConnectionFactory .createConnection(configMap .get(HBaseMultiClusterConfigUtil.PRIMARY_NAME)); LOG.info(" --- Got primary Connction"); - ArrayList failoverConnections = new ArrayList(); + ArrayList failoverConnections = new ArrayList(); for (Entry entry : configMap.entrySet()) { if (!entry.getKey().equals(HBaseMultiClusterConfigUtil.PRIMARY_NAME)) { LOG.info(" -- Getting failure Connction"); - failoverConnections.add(HConnectionManager.createConnection(entry + failoverConnections.add(ConnectionFactory.createConnection(entry .getValue())); LOG.info(" --- Got failover Connction"); } } return new HConnectionMultiCluster(conf, primaryConnection, - failoverConnections.toArray(new HConnection[0])); + failoverConnections.toArray(new Connection[0])); } } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionMultiCluster.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionMultiCluster.java index bbd9cb8..4a7c63a 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionMultiCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionMultiCluster.java @@ -14,10 +14,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService.BlockingInterface; import org.apache.hadoop.hbase.util.Bytes; -public class HConnectionMultiCluster implements HConnection { +public class HConnectionMultiCluster implements Connection { - HConnection primaryConnection; - HConnection[] failoverConnections; + Connection primaryConnection; + Connection[] failoverConnections; Configuration originalConfiguration; boolean isMasterMaster; int waitTimeBeforeAcceptingResults; @@ -34,7 +34,7 @@ public class HConnectionMultiCluster implements HConnection { ExecutorService executor; public HConnectionMultiCluster(Configuration originalConfiguration, - HConnection primaryConnection, HConnection[] failoverConnections) { + Connection primaryConnection, Connection[] failoverConnections) { this.primaryConnection = primaryConnection; this.failoverConnections = failoverConnections; this.originalConfiguration = originalConfiguration; @@ -78,7 +78,7 @@ public HConnectionMultiCluster(Configuration originalConfiguration, public void abort(String why, Throwable e) { primaryConnection.abort(why, e); - for (HConnection failOverConnection : failoverConnections) { + for (Connection failOverConnection : failoverConnections) { failOverConnection.abort(why, e); } } @@ -96,7 +96,7 @@ public void close() throws IOException { LOG.error("Exception while closing primary", e); lastException = e; } - for (HConnection failOverConnection : failoverConnections) { + for (Connection failOverConnection : failoverConnections) { try { failOverConnection.close(); } catch (Exception e) { @@ -113,29 +113,29 @@ public Configuration getConfiguration() { return originalConfiguration; } - @Override - public HTableInterface getTable(String tableName) throws IOException { + + public Table getTable(String tableName) throws IOException { return this.getTable(Bytes.toBytes(tableName)); } - @Override - public HTableInterface getTable(byte[] tableName) throws IOException { + + public Table getTable(byte[] tableName) throws IOException { return this.getTable(TableName.valueOf(tableName)); } - @Override - public HTableInterface getTable(TableName tableName) throws IOException { + + public Table getTable(TableName tableName) throws IOException { LOG.info(" -- getting primaryHTable" + primaryConnection.getConfiguration().get("hbase.zookeeper.quorum")); - HTableInterface primaryHTable = primaryConnection.getTable(tableName); - primaryHTable.setAutoFlush(true, true); + Table primaryHTable = primaryConnection.getTable(tableName); + primaryConnection.getBufferedMutator(tableName).flush(); LOG.info(" --- got primaryHTable"); - ArrayList failoverHTables = new ArrayList(); - for (HConnection failOverConnection : failoverConnections) { + ArrayList failoverHTables = new ArrayList
(); + for (Connection failOverConnection : failoverConnections) { LOG.info(" -- getting failoverHTable:" + failOverConnection.getConfiguration().get("hbase.zookeeper.quorum")); - HTableInterface htable = failOverConnection.getTable(tableName); - htable.setAutoFlush(true, true); + Table htable = failOverConnection.getTable(tableName); + primaryConnection.getBufferedMutator(tableName).flush(); failoverHTables.add(htable); LOG.info(" --- got failoverHTable"); @@ -153,21 +153,21 @@ public HTableInterface getTable(TableName tableName) throws IOException { waitTimeFromLastPrimaryFail); } - public HTableInterface getTable(String tableName, ExecutorService pool) + public Table getTable(String tableName, ExecutorService pool) throws IOException { return this.getTable(TableName.valueOf(tableName), pool); } - public HTableInterface getTable(byte[] tableName, ExecutorService pool) + public Table getTable(byte[] tableName, ExecutorService pool) throws IOException { return this.getTable(TableName.valueOf(tableName), pool); } - public HTableInterface getTable(TableName tableName, ExecutorService pool) + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { - HTableInterface primaryHTable = primaryConnection.getTable(tableName, pool); - ArrayList failoverHTables = new ArrayList(); - for (HConnection failOverConnection : failoverConnections) { + Table primaryHTable = primaryConnection.getTable(tableName, pool); + ArrayList
failoverHTables = new ArrayList
(); + for (Connection failOverConnection : failoverConnections) { failoverHTables.add(failOverConnection.getTable(tableName, pool)); } @@ -183,193 +183,117 @@ public HTableInterface getTable(TableName tableName, ExecutorService pool) waitTimeFromLastPrimaryFail); } + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return primaryConnection.getBufferedMutator(tableName); + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorParams) throws IOException { + return primaryConnection.getBufferedMutator(bufferedMutatorParams); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return primaryConnection.getRegionLocator(tableName); + } + + @Override + public Admin getAdmin() throws IOException { + return primaryConnection.getAdmin(); + } + public boolean isMasterRunning() throws MasterNotRunningException, - ZooKeeperConnectionException { - return primaryConnection.isMasterRunning(); + ZooKeeperConnectionException, IOException { + return primaryConnection.getAdmin().getClusterStatus().getMaster() != null; } - + public boolean isTableEnabled(TableName tableName) throws IOException { - return primaryConnection.isTableEnabled(tableName); + return primaryConnection.getAdmin().isTableEnabled(tableName); } @Deprecated public boolean isTableEnabled(byte[] tableName) throws IOException { - return primaryConnection.isTableEnabled(tableName); + return primaryConnection.getAdmin().isTableEnabled(TableName.valueOf(tableName)); } public boolean isTableDisabled(TableName tableName) throws IOException { - return primaryConnection.isTableDisabled(tableName); + return primaryConnection.getAdmin().isTableDisabled(tableName); } @Deprecated public boolean isTableDisabled(byte[] tableName) throws IOException { - return primaryConnection.isTableDisabled(tableName); + return primaryConnection.getAdmin().isTableDisabled(TableName.valueOf(tableName)); } public boolean isTableAvailable(TableName tableName) throws IOException { - return primaryConnection.isTableAvailable(tableName); + return primaryConnection.getAdmin().isTableAvailable(tableName); } @Deprecated public boolean isTableAvailable(byte[] tableName) throws IOException { - return primaryConnection.isTableAvailable(tableName); + return primaryConnection.getAdmin().isTableAvailable(TableName.valueOf(tableName)); } public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException { - return primaryConnection.isTableAvailable(tableName, splitKeys); + return primaryConnection.getAdmin().isTableAvailable(tableName, splitKeys); } @Deprecated public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException { - return primaryConnection.isTableAvailable(tableName, splitKeys); + return primaryConnection.getAdmin().isTableAvailable(TableName.valueOf(tableName), splitKeys); } public HTableDescriptor[] listTables() throws IOException { - return primaryConnection.listTables(); + return primaryConnection.getAdmin().listTables(); } @Deprecated public String[] getTableNames() throws IOException { - return primaryConnection.getTableNames(); + TableName[] tableNames = primaryConnection.getAdmin().listTableNames(); + String[] tNames = new String[tableNames.length]; + int i =0; + for (TableName tn: tableNames) { + tNames[i] = tn.toString(); + i++; + } + return tNames; } public TableName[] listTableNames() throws IOException { - return primaryConnection.listTableNames(); + return primaryConnection.getAdmin().listTableNames(); } public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException { - return primaryConnection.getHTableDescriptor(tableName); + return primaryConnection.getAdmin().getTableDescriptor(tableName); } @Deprecated - public - HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException { - return primaryConnection.getHTableDescriptor(tableName); - } - - public HRegionLocation locateRegion(TableName tableName, byte[] row) - throws IOException { - return primaryConnection.locateRegion(tableName, row); - } - - @Deprecated - public HRegionLocation locateRegion(byte[] tableName, byte[] row) - throws IOException { - return primaryConnection.locateRegion(tableName, row); - } - - public void clearRegionCache() { - primaryConnection.clearRegionCache(); - } - - public void clearRegionCache(TableName tableName) { - primaryConnection.clearRegionCache(tableName); - } - - @Deprecated - public - void clearRegionCache(byte[] tableName) { - primaryConnection.clearRegionCache(tableName); - - } - - public void deleteCachedRegionLocation(HRegionLocation location) { - primaryConnection.deleteCachedRegionLocation(location); - } - - public HRegionLocation relocateRegion(TableName tableName, byte[] row) - throws IOException { - return primaryConnection.relocateRegion(tableName, row); - } - - @Deprecated - public - HRegionLocation relocateRegion(byte[] tableName, byte[] row) - throws IOException { - return primaryConnection.relocateRegion(tableName, row); - } - - public void updateCachedLocations(TableName tableName, byte[] rowkey, - Object exception, HRegionLocation source) { - primaryConnection.updateCachedLocations(tableName, rowkey, exception, source); - } - - @Deprecated - public - void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception, - HRegionLocation source) { - primaryConnection.updateCachedLocations(tableName, rowkey, exception, source); - } - - public HRegionLocation locateRegion(byte[] regionName) throws IOException { - return primaryConnection.locateRegion(regionName); - } - - public List locateRegions(TableName tableName) - throws IOException { - return primaryConnection.locateRegions(tableName); - } - - @Deprecated - public - List locateRegions(byte[] tableName) throws IOException { - return (List) primaryConnection.locateRegion(tableName); - } - - public List locateRegions(TableName tableName, - boolean useCache, boolean offlined) throws IOException { - return (List) primaryConnection.locateRegions(tableName, - useCache, offlined); - } - - @Deprecated - public List locateRegions(byte[] tableName, - boolean useCache, boolean offlined) throws IOException { - return (List) primaryConnection.locateRegions(tableName, - useCache, offlined); - } - - public BlockingInterface getMaster() throws IOException { - return primaryConnection.getMaster(); - } - - public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface getAdmin( - ServerName serverName) throws IOException { - return primaryConnection.getAdmin(serverName); - } - - public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface getClient( - ServerName serverName) throws IOException { - return primaryConnection.getClient(serverName); - } - - public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface getAdmin( - ServerName serverName, boolean getMaster) throws IOException { - return primaryConnection.getAdmin(serverName); + public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException { + return primaryConnection.getAdmin().getTableDescriptor(TableName.valueOf(tableName)); } public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload) throws IOException { - return primaryConnection.getRegionLocation(tableName, row, reload); + return primaryConnection.getRegionLocator(tableName).getRegionLocation(row, reload); } @Deprecated public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload) throws IOException { - return primaryConnection.getRegionLocation(tableName, row, reload); + return primaryConnection.getRegionLocator(TableName.valueOf(tableName)).getRegionLocation(row, reload); } @Deprecated - public - void processBatch(List actions, TableName tableName, + public void processBatch(List actions, TableName tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { throw new RuntimeException("processBatch not supported in " + this.getClass()); @@ -381,108 +305,52 @@ void processBatch(List actions, TableName tableName, void processBatch(List actions, byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { - primaryConnection.processBatch(actions, tableName, pool, results); + primaryConnection.getTable(TableName.valueOf(tableName)).batch(actions, results); } @Deprecated public void processBatchCallback(List list, TableName tableName, ExecutorService pool, Object[] results, Callback callback) throws IOException, InterruptedException { - primaryConnection.processBatchCallback(list, tableName, pool, results, callback); + primaryConnection.getTable(tableName).batchCallback(list, results, callback); } @Deprecated public void processBatchCallback(List list, byte[] tableName, ExecutorService pool, Object[] results, Callback callback) throws IOException, InterruptedException { - primaryConnection.processBatchCallback(list, tableName, pool, results, callback); - - } - - public void setRegionCachePrefetch(TableName tableName, boolean enable) { - RuntimeException lastException = null; - try { - primaryConnection.setRegionCachePrefetch(tableName, enable); - } catch (RuntimeException e) { - LOG.error("Exception while closing primary", e); - lastException = e; - } - for (HConnection failOverConnection : failoverConnections) { - try { - failOverConnection.setRegionCachePrefetch(tableName, enable); - } catch (RuntimeException e) { - LOG.error("Exception while closing failOverConnection", e); - lastException = e; - } - } - if (lastException != null) { - throw lastException; - } - - } - - public void setRegionCachePrefetch(byte[] tableName, boolean enable) { - this.setRegionCachePrefetch(TableName.valueOf(tableName), enable); - } - - public boolean getRegionCachePrefetch(TableName tableName) { - return this.getRegionCachePrefetch(tableName); - } + primaryConnection.getTable(TableName.valueOf(tableName)).batchCallback(list, results, callback); - public boolean getRegionCachePrefetch(byte[] tableName) { - return this.getRegionCachePrefetch(TableName.valueOf(tableName)); - } - - public int getCurrentNrHRS() throws IOException { - return primaryConnection.getCurrentNrHRS(); } public HTableDescriptor[] getHTableDescriptorsByTableName( List tableNames) throws IOException { - return primaryConnection.getHTableDescriptorsByTableName(tableNames); + HTableDescriptor tdArr[] = new HTableDescriptor[tableNames.size()]; + int i=0; + for (TableName tn: tableNames) { + tdArr[i] = primaryConnection.getAdmin().getTableDescriptor(tn); + i++; + } + return tdArr; } @Deprecated public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException { - return primaryConnection.getHTableDescriptors(tableNames); + HTableDescriptor tdArr[] = new HTableDescriptor[tableNames.size()]; + int i=0; + for (String tn: tableNames) { + tdArr[i] = primaryConnection.getAdmin().getTableDescriptor(TableName.valueOf(tn)); + i++; + } + return tdArr; } public boolean isClosed() { return primaryConnection.isClosed(); } - public void clearCaches(ServerName sn) { - RuntimeException lastException = null; - try { - primaryConnection.clearCaches(sn); - } catch (RuntimeException e) { - LOG.error("Exception while closing primary", e); - lastException = e; - } - for (HConnection failOverConnection : failoverConnections) { - try { - failOverConnection.clearCaches(sn); - } catch (RuntimeException e) { - LOG.error("Exception while closing failOverConnection", e); - lastException = e; - } - } - if (lastException != null) { - throw lastException; - } - } - - public boolean isDeadServer(ServerName serverName) { - return primaryConnection.isDeadServer(serverName); - } - - public NonceGenerator getNonceGenerator() { - return primaryConnection.getNonceGenerator(); - } - - @Deprecated public MasterKeepAliveConnection getKeepAliveMasterService() diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTableMultiCluster.java b/src/main/java/org/apache/hadoop/hbase/client/HTableMultiCluster.java index 6ba0518..4d87608 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTableMultiCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTableMultiCluster.java @@ -7,10 +7,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.CompareFilter; @@ -25,10 +26,10 @@ import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; -public class HTableMultiCluster implements HTableInterface { +public class HTableMultiCluster implements Table { - HTableInterface primaryHTable; - Collection failoverHTables; + Table primaryHTable; + Collection
failoverHTables; Configuration originalConfiguration; boolean isMasterMaster; int waitTimeBeforeAcceptingResults; @@ -56,8 +57,8 @@ public HTableStats getStats() { } public HTableMultiCluster(Configuration originalConfiguration, - HTableInterface primaryHTable, - Collection failoverHTables, boolean isMasterMaster, + Table primaryHTable, + Collection
failoverHTables, boolean isMasterMaster, int waitTimeBeforeAcceptingResults, int waitTimeBeforeRequestingFailover, int waitTimeBeforeMutatingFailover, int waitTimeBeforeMutatingFailoverWithPrimaryException, @@ -84,7 +85,7 @@ public HTableMultiCluster(Configuration originalConfiguration, } public byte[] getTableName() { - return primaryHTable.getTableName(); + return primaryHTable.getName().toBytes(); } public TableName getName() { @@ -104,12 +105,17 @@ public boolean exists(final Get get) throws IOException { return multiClusterExists(get).getOriginalReturn(); } + @Override + public boolean[] existsAll(List list) throws IOException { + return primaryHTable.existsAll(list); + } + public Tuple multiClusterExists(final Get get) throws IOException { long startTime = System.currentTimeMillis(); HBaseTableFunction function = new HBaseTableFunction() { @Override - public Boolean call(HTableInterface table) throws Exception { + public Boolean call(Table table) throws Exception { return table.exists(get); } }; @@ -133,9 +139,15 @@ public Tuple multiClusterExists(final List gets) throws IOExcept long startTime = System.currentTimeMillis(); HBaseTableFunction function = new HBaseTableFunction() { + Boolean[] bool = new Boolean[gets.size()]; + int i=0; @Override - public Boolean[] call(HTableInterface table) throws Exception { - return table.exists(gets); + public Boolean[] call(Table table) throws Exception { + for (Get gt: gets) { + bool[i]= table.exists(gt); + i++; + } + return bool; } }; @@ -182,7 +194,7 @@ public Tuple multiClusterGet(final Get get) throws IOException { HBaseTableFunction function = new HBaseTableFunction() { @Override - public Result call(HTableInterface table) throws Exception { + public Result call(Table table) throws Exception { return table.get(get); } }; @@ -207,7 +219,7 @@ public Tuple multiClusterGet(final List gets) throws IOException HBaseTableFunction function = new HBaseTableFunction() { @Override - public Result[] call(HTableInterface table) throws Exception { + public Result[] call(Table table) throws Exception { return table.get(gets); } }; @@ -235,8 +247,8 @@ public Tuple multiClusterGetRowOrBefore(final byte[] row, final byte[] f HBaseTableFunction function = new HBaseTableFunction() { @Override - public Result call(HTableInterface table) throws Exception { - return table.getRowOrBefore(row, family); + public Result call(Table table) throws Exception { + return (Result)table.getScanner(row, family); } }; @@ -261,7 +273,7 @@ public Tuple multiClusterGetScanner(final Scan scan) throws IOExc HBaseTableFunction function = new HBaseTableFunction() { @Override - public ResultScanner call(HTableInterface table) throws Exception { + public ResultScanner call(Table table) throws Exception { return table.getScanner(scan); } }; @@ -287,7 +299,7 @@ public Tuple multiClusterGetScanner(final byte[] family) throws I HBaseTableFunction function = new HBaseTableFunction() { @Override - public ResultScanner call(HTableInterface table) throws Exception { + public ResultScanner call(Table table) throws Exception { return table.getScanner(family); } }; @@ -314,7 +326,7 @@ public Tuple multiClusterGetScanner(final byte[] family, final by HBaseTableFunction function = new HBaseTableFunction() { @Override - public ResultScanner call(HTableInterface table) throws Exception { + public ResultScanner call(Table table) throws Exception { return table.getScanner(family, qualifier); } }; @@ -360,7 +372,7 @@ private Boolean autoFlushMutliClusterPut(final Put put) throws IOException { HBaseTableFunction function = new HBaseTableFunction() { @Override - public Void call(HTableInterface table) throws Exception { + public Void call(Table table) throws Exception { synchronized (table) { System.out.println("table.put.start:" + table.getConfiguration().get("hbase.zookeeper.quorum") + " " + table.getName() + " " + Bytes.toString(newPut.getRow())); try { @@ -398,7 +410,7 @@ private Put setTimeStampOfUnsetValues(final Put put, long ts) // This will protect us from a multicluster sumbission if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) { newPut - .add(cell.getFamily(), cell.getQualifier(), ts, cell.getValue()); + .addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), ts, CellUtil.cloneValue(cell)); } else { newPut.add(cell); } @@ -440,7 +452,7 @@ public Boolean autoFlushMutliClusterPut(final List puts) throws IOException HBaseTableFunction function = new HBaseTableFunction() { @Override - public Void call(HTableInterface table) throws Exception { + public Void call(Table table) throws Exception { table.put(newPuts); return null; } @@ -462,6 +474,11 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, return primaryHTable.checkAndPut(row, family, qualifier, value, put); } + @Override + public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException { + return primaryHTable.checkAndPut(bytes, bytes1, bytes2, compareOp, bytes3, put); + } + public void delete(final Delete delete) throws IOException { multiClusterDelete(delete); } @@ -471,7 +488,7 @@ public Boolean multiClusterDelete(final Delete delete) throws IOException { HBaseTableFunction function = new HBaseTableFunction() { @Override - public Void call(HTableInterface table) throws Exception { + public Void call(Table table) throws Exception { table.delete(delete); return null; } @@ -496,7 +513,7 @@ public Boolean multiClusterDelete(final List deletes) throws IOException HBaseTableFunction function = new HBaseTableFunction() { @Override - public Void call(HTableInterface table) throws Exception { + public Void call(Table table) throws Exception { table.delete(deletes); return null; } @@ -517,6 +534,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, return primaryHTable.checkAndDelete(row, family, qualifier, value, delete); } + @Override + public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException { + return primaryHTable.checkAndDelete(bytes, bytes1, bytes2, compareOp, bytes3, delete); + } + public void mutateRow(final RowMutations rm) throws IOException { primaryHTable.mutateRow(rm); @@ -541,20 +563,6 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, durability); } - @Deprecated - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, boolean writeToWAL) throws IOException { - return primaryHTable.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - } - - public boolean isAutoFlush() { - - boolean primaryAnswer = primaryHTable.isAutoFlush(); - - return primaryAnswer; - } - public void flushCommits() throws IOException { if (bufferPutList.size() > 0) { autoFlushMutliClusterPut(bufferPutList); @@ -580,7 +588,7 @@ public void close() throws IOException { LOG.error("Exception while flushCommits primary", e); lastException = e; } - for (final HTableInterface failoverTable : failoverHTables) { + for (final Table failoverTable : failoverHTables) { try { synchronized (failoverTable) { failoverTable.close(); diff --git a/src/main/java/org/apache/hadoop/hbase/client/SpeculativeMutater.java b/src/main/java/org/apache/hadoop/hbase/client/SpeculativeMutater.java index 18f873a..ec155f5 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/SpeculativeMutater.java +++ b/src/main/java/org/apache/hadoop/hbase/client/SpeculativeMutater.java @@ -21,8 +21,8 @@ public class SpeculativeMutater { public static Boolean mutate(final long waitToSendFailover, final long waitToSendFailoverWithException, final HBaseTableFunction function, - final HTableInterface primaryTable, - final Collection failoverTables, + final Table primaryTable, + final Collection
failoverTables, final AtomicLong lastPrimaryFail, final int waitTimeFromLastPrimaryFail) { ExecutorCompletionService exeS = new ExecutorCompletionService(exe); @@ -54,7 +54,7 @@ public Boolean call() throws Exception { } - for (final HTableInterface failoverTable : failoverTables) { + for (final Table failoverTable : failoverTables) { callables.add(new Callable() { public Boolean call() throws Exception { diff --git a/src/main/java/org/apache/hadoop/hbase/client/SpeculativeRequester.java b/src/main/java/org/apache/hadoop/hbase/client/SpeculativeRequester.java index 668d6e6..e466a74 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/SpeculativeRequester.java +++ b/src/main/java/org/apache/hadoop/hbase/client/SpeculativeRequester.java @@ -36,8 +36,8 @@ public SpeculativeRequester(long waitTimeBeforeRequestingFailover, } public ResultWrapper request(final HBaseTableFunction function, - final HTableInterface primaryTable, - final Collection failoverTables) { + final Table primaryTable, + final Collection
failoverTables) { ExecutorCompletionService> exeS = new ExecutorCompletionService>(exe); @@ -64,7 +64,7 @@ public ResultWrapper call() throws Exception { }); } - for (final HTableInterface failoverTable : failoverTables) { + for (final Table failoverTable : failoverTables) { callables.add(new Callable>() { public ResultWrapper call() throws Exception { diff --git a/src/main/java/org/apache/hadoop/hbase/test/MultiHBaseClusterClientTest.java b/src/main/java/org/apache/hadoop/hbase/test/MultiHBaseClusterClientTest.java new file mode 100644 index 0000000..83d2032 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/test/MultiHBaseClusterClientTest.java @@ -0,0 +1,115 @@ +package org.apache.hadoop.hbase.test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseMultiClusterConfigUtil; +import org.apache.hadoop.hbase.client.HConnectionManagerMultiClusterWrapper; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; + +/** + * Created by balachandrapai on 19.06.17. + */ +public class MultiHBaseClusterClientTest { + Configuration combinedConfig; + Connection connection; + + @Before + public void initialize(){ + System.setProperty("java.security.krb5.conf", "/home/balachandrapai/Desktop/Security/krb5.conf"); + System.setProperty("sun.security.krb5.debug", "true"); + + //Primary Cluster + Configuration primaryConfig = HBaseConfiguration.create(); + primaryConfig.set("hbase.zookeeper.quorum", "cdhmaster1"); + primaryConfig.set("hbase.zookeeper.property.clientPort", "2181"); + primaryConfig.set("hadoop.security.authentication", "kerberos"); + primaryConfig.set("hbase.security.authentication", "kerberos"); + primaryConfig.set("hbase.master.kerberos.principal", "hbase/_HOST@EXAMPLE.COM"); + primaryConfig.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@EXAMPLE.COM"); + UserGroupInformation.setConfiguration(primaryConfig); + System.out.println("Principal Authentication: "); + final String user = "hbase/cdhmaster1@EXAMPLE.COM"; + final String keyPath = "/home/balachandrapai/Desktop/Security/cdhmaster1/hbase.keytab"; + try { + UserGroupInformation.loginUserFromKeytab(user, keyPath); + } catch (IOException e) { + e.printStackTrace(); + } + + //failover Cluster + Configuration failover = HBaseConfiguration.create(); + failover.set("hbase.zookeeper.quorum", "cdhmaster2"); + failover.set("hbase.zookeeper.property.clientPort", "2181"); + failover.set("hadoop.security.authentication", "kerberos"); + failover.set("hbase.security.authentication", "kerberos"); + failover.set("hbase.master.kerberos.principal", "hbase/_HOST@EXAMPLE.COM"); + failover.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@EXAMPLE.COM"); + UserGroupInformation.setConfiguration(primaryConfig); + System.out.println("Principal Authentication: "); + final String user2 = "hbase/cdhmaster2@EXAMPLE.COM"; + final String keyPath2 = "/home/balachandrapai/Desktop/Security/hbase.keytab"; + try { + UserGroupInformation.loginUserFromKeytab(user2, keyPath2); + } catch (IOException e) { + e.printStackTrace(); + } + + Map failoverClusters = new HashMap(); + failoverClusters.put("failover", failover); + + combinedConfig = HBaseMultiClusterConfigUtil.combineConfigurations(primaryConfig, failoverClusters); + + try { + connection = HConnectionManagerMultiClusterWrapper.createConnection(combinedConfig); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void readFromMCC(){ + try { + Table multiTable = connection.getTable(TableName.valueOf("t1")); + Get get1 = new Get(Bytes.toBytes("row1")); + + Result result = multiTable.get(get1); + + Assert.assertFalse(result.isEmpty()); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @Test + public void writeToMCC() { + + try { + Table multiTable = connection.getTable(TableName.valueOf("t1")); + Put put1 = new Put(Bytes.toBytes("row4")); + + put1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("c1"), Bytes.toBytes("Data")); + multiTable.put(put1); + + Get get1 = new Get(Bytes.toBytes("row4")); + Result result = multiTable.get(get1); + + Assert.assertFalse(result.isEmpty()); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCmApiTest.java b/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCmApiTest.java index 20b6c4a..cd33b0a 100644 --- a/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCmApiTest.java +++ b/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCmApiTest.java @@ -59,6 +59,8 @@ public static void main(String[] args) throws Exception { cmHost2, username2, password2, cluster2, hbaseService2); + final Connection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); + LOG.info("--Got Configuration"); final String tableName = args[10]; @@ -74,7 +76,7 @@ public static void main(String[] args) throws Exception { LOG.info("hbase.zookeeper.quorum: " + config.get("hbase.zookeeper.quorum")); LOG.info("hbase.failover.cluster.fail1.hbase.hstore.compaction.max: " + config.get("hbase.failover.cluster.fail1.hbase.hstore.compaction.max")); - HBaseAdmin admin = new HBaseAdminMultiCluster(config); + Admin admin = connection.getAdmin(); try { if (admin.tableExists(TableName.valueOf(tableName))) { @@ -107,19 +109,17 @@ public static void main(String[] args) throws Exception { splitKeys[8][0] = '8'; splitKeys[9][0] = '9'; - LOG.info(" - About to create Table " + tableD.getName()); + LOG.info(" - About to create Table " + tableD.getTableName()); admin.createTable(tableD, splitKeys); - LOG.info(" - Created Table " + tableD.getName()); + LOG.info(" - Created Table " + tableD.getTableName()); LOG.info("Getting HConnection"); config.set("hbase.client.retries.number", "1"); config.set("hbase.client.pause", "1"); - final HConnection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); - LOG.info(" - Got HConnection: " + connection.getClass()); LOG.info("Getting HTable"); @@ -144,7 +144,7 @@ public static void main(String[] args) throws Exception { public void run() { try { Random r = new Random(); - HTableInterface table = connection.getTable(tableName); + Table table = connection.getTable(TableName.valueOf(tableName)); HTableStats stats = ((HTableMultiCluster) table).getStats(); stats.printStats(writer, 5000); @@ -153,7 +153,7 @@ public void run() { int hash = r.nextInt(10); Put put = new Put(Bytes.toBytes(hash + ".key." + i + "." + StringUtils.leftPad(String.valueOf(i * threadFinalNum), 12))); - put.add(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i * threadFinalNum)); + put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i * threadFinalNum)); table.put(put); Thread.sleep(millisecondToWait); diff --git a/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCombinedFileTest.java b/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCombinedFileTest.java index 2c17c0c..8612271 100644 --- a/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCombinedFileTest.java +++ b/src/main/java/org/apache/hadoop/hbase/test/MultiThreadedMultiClusterWithCombinedFileTest.java @@ -53,7 +53,9 @@ public static void main(String[] args) throws Exception { System.out.println("hbase.zookeeper.quorum: " + config.get("hbase.zookeeper.quorum")); System.out.println("hbase.failover.cluster.fail1.hbase.hstore.compaction.max: " + config.get("hbase.failover.cluster.fail1.hbase.hstore.compaction.max")); - HBaseAdmin admin = new HBaseAdminMultiCluster(config); + final Connection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); + + Admin admin = connection.getAdmin(); try { admin.disableTable(TableName.valueOf(tableName)); @@ -87,7 +89,6 @@ public static void main(String[] args) throws Exception { config.set("hbase.client.retries.number", "1"); config.set("hbase.client.pause", "1"); - final HConnection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); System.out.println(" - Got HConnection: " + connection.getClass()); @@ -107,14 +108,14 @@ public void run() { try { Random r = new Random(); for (int i = 1; i <= numberOfPuts; i++) { - HTableInterface table = connection.getTable(tableName); + Table table = connection.getTable(TableName.valueOf(tableName)); HTableStats stats = ((HTableMultiCluster) table).getStats(); stats.printStats(writer, 5000); int hash = r.nextInt(10); Put put = new Put(Bytes.toBytes(hash + ".key." + StringUtils.leftPad(String.valueOf(i * threadFinalNum), 12))); - put.add(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i * threadFinalNum)); + put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i * threadFinalNum)); table.put(put); Thread.sleep(millisecondToWait); diff --git a/src/main/java/org/apache/hadoop/hbase/test/RunMultiClusterTest.java b/src/main/java/org/apache/hadoop/hbase/test/RunMultiClusterTest.java index 90c57a4..4843932 100644 --- a/src/main/java/org/apache/hadoop/hbase/test/RunMultiClusterTest.java +++ b/src/main/java/org/apache/hadoop/hbase/test/RunMultiClusterTest.java @@ -39,8 +39,10 @@ public static void main(String[] args) throws Exception { System.out.println(ConfigConst.HBASE_FAILOVER_CLUSTERS_CONFIG + ": " + config.get(ConfigConst.HBASE_FAILOVER_CLUSTERS_CONFIG)); System.out.println("hbase.zookeeper.quorum: " + config.get("hbase.zookeeper.quorum")); System.out.println("hbase.failover.cluster.fail1.hbase.hstore.compaction.max: " + config.get("hbase.failover.cluster.fail1.hbase.hstore.compaction.max")); - - HBaseAdmin admin = new HBaseAdminMultiCluster(config); + + Connection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); + + Admin admin = connection.getAdmin(); try { admin.disableTable(TableName.valueOf(tableName)); @@ -74,13 +76,11 @@ public static void main(String[] args) throws Exception { config.set("hbase.client.retries.number", "1"); config.set("hbase.client.pause", "1"); - HConnection connection = HConnectionManagerMultiClusterWrapper.createConnection(config); - System.out.println(" - Got HConnection: " + connection.getClass()); System.out.println("Getting HTable"); - HTableInterface table = connection.getTable(tableName); + Table table = connection.getTable(TableName.valueOf(tableName)); System.out.println("Got HTable: " + table.getClass()); @@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception { for (int i = 1; i <= numberOfPuts; i++) { System.out.print("p"); Put put = new Put(Bytes.toBytes(i%10 + ".key." + StringUtils.leftPad(String.valueOf(i), 12))); - put.add(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i)); + put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes("C"), Bytes.toBytes("Value:" + i)); table.put(put); System.out.print("g"); diff --git a/src/test/java/org/apache/hadoop/hbase/client/HBaseMultiClusterClientTest.java b/src/test/java/org/apache/hadoop/hbase/client/HBaseMultiClusterClientTest.java index 79df9f2..6e0de49 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/HBaseMultiClusterClientTest.java +++ b/src/test/java/org/apache/hadoop/hbase/client/HBaseMultiClusterClientTest.java @@ -7,8 +7,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.junit.Test; - -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -67,14 +65,14 @@ public void testHBaseMultiClusterClientTest() throws Exception { combinedConfig.setInt(ConfigConst.HBASE_WAIT_TIME_BEFORE_TRYING_PRIMARY_AFTER_FAILURE, 0); - HConnection connection = HConnectionManagerMultiClusterWrapper.createConnection(combinedConfig); + Connection connection = HConnectionManagerMultiClusterWrapper.createConnection(combinedConfig); - HTableInterface multiTable = connection.getTable(TABLE_NAME); + Table multiTable = connection.getTable(TABLE_NAME); Put put1 = new Put(Bytes.toBytes("A1")); - put1.add(FAM_NAME, QUAL_NAME, VALUE); + put1.addColumn(FAM_NAME, QUAL_NAME, VALUE); multiTable.put(put1); - multiTable.flushCommits(); + Get get1 = new Get(Bytes.toBytes("A1")); Result r1_1 = table1.get(get1); @@ -97,7 +95,7 @@ public void testHBaseMultiClusterClientTest() throws Exception { System.out.println("------------2"); Put put2 = new Put(Bytes.toBytes("A2")); - put2.add(FAM_NAME, QUAL_NAME, VALUE); + put2.addColumn(FAM_NAME, QUAL_NAME, VALUE); System.out.println("------------3"); table2.put(put2); @@ -121,10 +119,9 @@ public void testHBaseMultiClusterClientTest() throws Exception { System.out.println("------------7"); Put put3 = new Put(Bytes.toBytes("A3")); - put3.add(FAM_NAME, QUAL_NAME, VALUE); + put3.addColumn(FAM_NAME, QUAL_NAME, VALUE); multiTable = connection.getTable(TABLE_NAME); multiTable.put(put3); - multiTable.flushCommits(); System.out.println("------------8"); Get get3 = new Get(Bytes.toBytes("A3"));