From 186ce6dfc2027b3331b1a4adeebb62542caed019 Mon Sep 17 00:00:00 2001 From: Jonathan Jarvis Date: Fri, 10 Nov 2017 16:46:24 -0600 Subject: [PATCH] PDI-16661 - HBaseOutput Tuple Write and Row Delete --- .../plugins/hbase/mapping/MappingEditor.java | 249 +++++++++--------- .../plugins/hbase/mapping/MappingUtils.java | 22 +- .../plugins/hbase/output/HBaseOutput.java | 231 +++++++++++----- .../plugins/hbase/output/HBaseOutputData.java | 11 +- .../hbase/output/HBaseOutputDialog.java | 74 ++++-- .../plugins/hbase/output/HBaseOutputMeta.java | 47 +++- .../hbase/output/KettleRowToHBaseTuple.java | 190 +++++++++++++ .../output/messages/messages_en_US.properties | 16 +- .../output/HBaseOutputMetaInjectionTest.java | 6 + .../output/KettleRowToHBaseTupleTest.java | 104 ++++++++ 10 files changed, 712 insertions(+), 238 deletions(-) create mode 100644 kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTuple.java create mode 100644 kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTupleTest.java diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingEditor.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingEditor.java index ca34cfc0a9e..dd43d9a1fdf 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingEditor.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingEditor.java @@ -69,6 +69,7 @@ import org.pentaho.di.core.Const; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.core.util.Utils; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.ui.core.PropsUI; @@ -139,10 +140,9 @@ public class MappingEditor extends Composite implements ConfigurationProducer { protected TransMeta m_transMeta; public MappingEditor( Shell shell, Composite parent, ConfigurationProducer configProducer, - FieldProducer fieldProducer, int tableViewStyle, boolean allowTableCreate, PropsUI props, - TransMeta transMeta, NamedClusterService namedClusterService, - RuntimeTestActionService runtimeTestActionService, RuntimeTester runtimeTester, - NamedClusterServiceLocator namedClusterServiceLocator ) { + FieldProducer fieldProducer, int tableViewStyle, boolean allowTableCreate, PropsUI props, TransMeta transMeta, + NamedClusterService namedClusterService, RuntimeTestActionService runtimeTestActionService, + RuntimeTester runtimeTester, NamedClusterServiceLocator namedClusterServiceLocator ) { super( parent, SWT.NONE ); this.namedClusterServiceLocator = namedClusterServiceLocator; m_shell = shell; @@ -181,7 +181,8 @@ public MappingEditor( Shell shell, Composite parent, ConfigurationProducer confi fd.right = new FormAttachment( middle, -margin ); namedClusterLabel.setLayoutData( fd ); - namedClusterWidget = new NamedClusterWidgetImpl( this, false, namedClusterService, runtimeTestActionService, runtimeTester ); + namedClusterWidget = + new NamedClusterWidgetImpl( this, false, namedClusterService, runtimeTestActionService, runtimeTester ); namedClusterWidget.initiate(); props.setLook( namedClusterWidget ); fd = new FormData(); @@ -193,17 +194,16 @@ public MappingEditor( Shell shell, Composite parent, ConfigurationProducer confi m_currentConfiguration = m_configProducer.getCurrentConfiguration(); } - parent.addDisposeListener( - new DisposeListener() { - @Override - public void widgetDisposed( DisposeEvent de ) { - try { - resetConnection(); - } catch ( Exception e ) { - // we have to swallow it. - } - } - } ); + parent.addDisposeListener( new DisposeListener() { + @Override + public void widgetDisposed( DisposeEvent de ) { + try { + resetConnection(); + } catch ( Exception e ) { + // we have to swallow it. + } + } + } ); // table names Label tableNameLab = new Label( this, SWT.RIGHT ); @@ -318,19 +318,14 @@ public void widgetDefaultSelected( SelectionEvent e ) { // fields ColumnInfo[] colinf = - new ColumnInfo[] { - new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_ALIAS" ), ColumnInfo.COLUMN_TYPE_TEXT, - false ), - new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_KEY" ), ColumnInfo.COLUMN_TYPE_CCOMBO, - false ), - new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_FAMILY" ), ColumnInfo.COLUMN_TYPE_CCOMBO, - false ), + new ColumnInfo[] { new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_ALIAS" ), + ColumnInfo.COLUMN_TYPE_TEXT, false ), new ColumnInfo( Messages.getString( + "HBaseInputDialog.Fields.FIELD_KEY" ), ColumnInfo.COLUMN_TYPE_CCOMBO, false ), new ColumnInfo( Messages + .getString( "HBaseInputDialog.Fields.FIELD_FAMILY" ), ColumnInfo.COLUMN_TYPE_CCOMBO, false ), new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_NAME" ), ColumnInfo.COLUMN_TYPE_TEXT, - false ), - new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_TYPE" ), ColumnInfo.COLUMN_TYPE_CCOMBO, - false ), - new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_INDEXED" ), ColumnInfo.COLUMN_TYPE_TEXT, - false ), }; + false ), new ColumnInfo( Messages.getString( "HBaseInputDialog.Fields.FIELD_TYPE" ), + ColumnInfo.COLUMN_TYPE_CCOMBO, false ), new ColumnInfo( Messages.getString( + "HBaseInputDialog.Fields.FIELD_INDEXED" ), ColumnInfo.COLUMN_TYPE_TEXT, false ), }; m_keyCI = colinf[1]; m_keyCI.setComboValues( new String[] { "N", "Y" } ); @@ -354,7 +349,7 @@ public String[] getComboValues( TableItem tableItem, int rowNr, int colNr ) { String[] comboValues = null; String keyOrNot = tableItem.getText( 2 ); - if ( Const.isEmpty( keyOrNot ) || keyOrNot.equalsIgnoreCase( "N" ) ) { + if ( Utils.isEmpty( keyOrNot ) || keyOrNot.equalsIgnoreCase( "N" ) ) { comboValues = new String[] { "String", "Integer", "Long", "Float", "Double", "Boolean", "Date", "BigNumber", "Serializable", "Binary" }; @@ -398,12 +393,29 @@ public void widgetSelected( SelectionEvent e ) { } } ); + m_keyValueTupleBut = new Button( this, SWT.PUSH | SWT.CENTER ); + props.setLook( m_keyValueTupleBut ); + m_keyValueTupleBut.setText( Messages.getString( "MappingDialog.KeyValueTemplate" ) ); + m_keyValueTupleBut.setToolTipText( Messages.getString( "MappingDialog.KeyValueTemplate.TipText" ) ); + fd = new FormData(); + fd.right = new FormAttachment( 100, 0 ); + fd.bottom = new FormAttachment( 100, -margin * 2 ); + m_keyValueTupleBut.setLayoutData( fd ); + + m_keyValueTupleBut.addSelectionListener( new SelectionAdapter() { + @Override + public void widgetSelected( SelectionEvent e ) { + populateTableWithTupleTemplate( m_allowTableCreate ); + } + } ); + if ( m_allowTableCreate ) { + m_getFieldsBut = new Button( this, SWT.PUSH | SWT.CENTER ); props.setLook( m_getFieldsBut ); m_getFieldsBut.setText( Messages.getString( "MappingDialog.GetIncomingFields" ) ); fd = new FormData(); - fd.right = new FormAttachment( 100, 0 ); + fd.right = new FormAttachment( m_keyValueTupleBut, -margin ); fd.bottom = new FormAttachment( 100, -margin * 2 ); m_getFieldsBut.setLayoutData( fd ); @@ -413,23 +425,7 @@ public void widgetSelected( SelectionEvent e ) { populateTableWithIncomingFields(); } } ); - } else { - m_keyValueTupleBut = new Button( this, SWT.PUSH | SWT.CENTER ); - props.setLook( m_keyValueTupleBut ); - m_keyValueTupleBut.setText( Messages.getString( "MappingDialog.KeyValueTemplate" ) ); - m_keyValueTupleBut.setToolTipText( Messages.getString( "MappingDialog.KeyValueTemplate.TipText" ) ); - fd = new FormData(); - fd.right = new FormAttachment( 100, 0 ); - fd.bottom = new FormAttachment( 100, -margin * 2 ); - m_keyValueTupleBut.setLayoutData( fd ); - - m_keyValueTupleBut.addSelectionListener( new SelectionAdapter() { - @Override - public void widgetSelected( SelectionEvent e ) { - populateTableWithTupleTemplate(); - } - } ); } m_fieldsView = new TableView( transMeta, this, tableViewStyle, colinf, 1, null, props ); @@ -442,14 +438,14 @@ public void widgetSelected( SelectionEvent e ) { m_fieldsView.setLayoutData( fd ); } - private void populateTableWithTupleTemplate() { + private void populateTableWithTupleTemplate( boolean fromOutputStep ) { Table table = m_fieldsView.table; Set existingRowAliases = new HashSet(); for ( int i = 0; i < table.getItemCount(); i++ ) { TableItem tableItem = table.getItem( i ); String alias = tableItem.getText( 1 ); - if ( !Const.isEmpty( alias ) ) { + if ( !Utils.isEmpty( alias ) ) { existingRowAliases.add( alias ); } } @@ -459,9 +455,9 @@ private void populateTableWithTupleTemplate() { // Ask what we should do with existing mapping data MessageDialog md = new MessageDialog( m_shell, Messages.getString( "MappingDialog.GetFieldsChoice.Title" ), null, Messages - .getString( "MappingDialog.GetFieldsChoice.Message", "" + existingRowAliases.size(), "" + 5 ), - MessageDialog.WARNING, new String[] { Messages.getString( "MappingOutputDialog.ClearAndAdd" ), - Messages.getString( "MappingOutputDialog.Cancel" ), }, 0 ); + .getString( "MappingDialog.GetFieldsChoice.Message", "" + existingRowAliases.size(), "" + ( fromOutputStep + ? /* 6 */ 5 : 5 ) ), MessageDialog.WARNING, new String[] { Messages.getString( + "MappingOutputDialog.ClearAndAdd" ), Messages.getString( "MappingOutputDialog.Cancel" ), }, 0 ); MessageDialog.setDefaultImage( GUIResource.getInstance().getImageSpoon() ); int idx = md.open(); choice = idx & 0xFF; @@ -490,6 +486,18 @@ private void populateTableWithTupleTemplate() { item.setText( 2, "N" ); item.setText( 5, "Long" ); + /* + * Disabled from GUI for now, since visibility/ACL processing + * requires an additional co-processor on HBase + * + if ( fromOutputStep ) { + item = new TableItem( table, SWT.NONE ); + item.setText( 1, "Visibility" ); + item.setText( 2, "N" ); + item.setText( 5, "String" ); + } + */ + m_fieldsView.removeEmptyRows(); m_fieldsView.setRowNums(); m_fieldsView.optWidth( true ); @@ -505,7 +513,7 @@ private void populateTableWithIncomingFields() { for ( int i = 0; i < table.getItemCount(); i++ ) { TableItem tableItem = table.getItem( i ); String alias = tableItem.getText( 1 ); - if ( !Const.isEmpty( alias ) ) { + if ( !Utils.isEmpty( alias ) ) { existingRowAliases.add( alias ); } } @@ -516,10 +524,10 @@ private void populateTableWithIncomingFields() { MessageDialog md = new MessageDialog( m_shell, Messages.getString( "MappingDialog.GetFieldsChoice.Title" ), null, Messages .getString( "MappingDialog.GetFieldsChoice.Message", "" + existingRowAliases.size(), "" - + incomingRowMeta.size() ), MessageDialog.WARNING, new String[] { - Messages.getString( "MappingDialog.AddNew" ), Messages.getString( "MappingOutputDialog.Add" ), - Messages.getString( "MappingOutputDialog.ClearAndAdd" ), - Messages.getString( "MappingOutputDialog.Cancel" ), }, 0 ); + + incomingRowMeta.size() ), MessageDialog.WARNING, new String[] { Messages.getString( + "MappingDialog.AddNew" ), Messages.getString( "MappingOutputDialog.Add" ), Messages.getString( + "MappingOutputDialog.ClearAndAdd" ), Messages.getString( + "MappingOutputDialog.Cancel" ), }, 0 ); MessageDialog.setDefaultImage( GUIResource.getInstance().getImageSpoon() ); int idx = md.open(); choice = idx & 0xFF; @@ -620,7 +628,7 @@ private void populateTableCombo( boolean force ) { m_existingTableNamesCombo.add( currentTableName ); } // restore any previous value - if ( !Const.isEmpty( existingName ) ) { + if ( !Utils.isEmpty( existingName ) ) { m_existingTableNamesCombo.setText( existingName ); } } catch ( Exception e ) { @@ -630,7 +638,7 @@ private void populateTableCombo( boolean force ) { } } - private void resetConnection() throws IOException { + private void resetConnection() throws IOException { if ( m_admin != null ) { m_admin.close(); } @@ -642,9 +650,8 @@ private boolean notInitializedMappingAdmin() { } private void showConnectionErrorDialog( Exception ex ) { - new ErrorDialog( m_shell, Messages.getString( "MappingDialog.Error.Title.UnableToConnect" ), Messages - .getString( "MappingDialog.Error.Message.UnableToConnect" ) - + "\n\n", ex ); + new ErrorDialog( m_shell, Messages.getString( "MappingDialog.Error.Title.UnableToConnect" ), Messages.getString( + "MappingDialog.Error.Message.UnableToConnect" ) + "\n\n", ex ); } private void deleteMapping() { @@ -655,14 +662,14 @@ private void deleteMapping() { return; } String tableName = ""; - if ( !Const.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { + if ( !Utils.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { tableName = m_existingTableNamesCombo.getText().trim(); if ( tableName.indexOf( '@' ) > 0 ) { tableName = tableName.substring( 0, tableName.indexOf( '@' ) ); } } - if ( Const.isEmpty( tableName ) || Const.isEmpty( m_existingMappingNamesCombo.getText().trim() ) ) { + if ( Utils.isEmpty( tableName ) || Utils.isEmpty( m_existingMappingNamesCombo.getText().trim() ) ) { MessageDialog.openError( m_shell, Messages.getString( "MappingDialog.Error.Title.MissingTableMappingName" ), Messages.getString( "MappingDialog.Error.Message.MissingTableMappingName" ) ); return; @@ -712,12 +719,14 @@ private void deleteMapping() { public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List problems ) throws Exception { return getMapping( performChecksAndShowGUIErrorDialog, problems, false ); } + /** - * Parameter includeKeyToColumns should be true if only we need key to be included in mapColumns and mapAliases + * Parameter includeKeyToColumns should be true if only we need key to be included in mapColumns and mapAliases */ - public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List problems, Boolean includeKeyToColumns ) { + public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List problems, + Boolean includeKeyToColumns ) { String tableName = ""; - if ( !Const.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { + if ( !Utils.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { tableName = m_existingTableNamesCombo.getText().trim(); if ( tableName.indexOf( '@' ) > 0 ) { @@ -726,8 +735,8 @@ public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List missingFamilies = new ArrayList(); @@ -768,19 +777,20 @@ public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List= 5 && nrNonEmpty <= 6 ) { for ( int i = 0; i < nrNonEmpty; i++ ) { - if ( m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.KEY.toString() ) - || m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.FAMILY.toString() ) - || m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.COLUMN.toString() ) - || m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.VALUE.toString() ) - || m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.TIMESTAMP.toString() ) ) { + if ( m_fieldsView.getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.KEY.toString() ) || m_fieldsView + .getNonEmpty( i ).getText( 1 ).equals( Mapping.TupleMapping.FAMILY.toString() ) || m_fieldsView.getNonEmpty( + i ).getText( 1 ).equals( Mapping.TupleMapping.COLUMN.toString() ) || m_fieldsView.getNonEmpty( i ) + .getText( 1 ).equals( Mapping.TupleMapping.VALUE.toString() ) || m_fieldsView.getNonEmpty( i ) + .getText( 1 ).equals( Mapping.TupleMapping.TIMESTAMP.toString() ) || m_fieldsView.getNonEmpty( + i ).getText( 1 ).equals( MappingUtils.TUPLE_MAPPING_VISIBILITY ) ) { tupleIdCount++; } } } - if ( tupleIdCount == 5 ) { + if ( tupleIdCount == 5 || tupleIdCount == 6 ) { isTupleMapping = true; theMapping.setTupleMapping( true ); } @@ -789,10 +799,10 @@ public Mapping getMapping( boolean performChecksAndShowGUIErrorDialog, List 0 ) { @@ -1187,7 +1191,7 @@ private void loadTableViewFromMapping() { private void populateMappingComboAndFamilyStuff() { String tableName = ""; - if ( !Const.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { + if ( !Utils.isEmpty( m_existingTableNamesCombo.getText().trim() ) ) { tableName = m_existingTableNamesCombo.getText().trim(); if ( tableName.indexOf( '@' ) > 0 ) { @@ -1199,7 +1203,7 @@ private void populateMappingComboAndFamilyStuff() { m_familyCI.setComboValues( new String[] { "" } ); m_existingMappingNamesCombo.removeAll(); - if ( m_admin != null && !Const.isEmpty( tableName ) ) { + if ( m_admin != null && !Utils.isEmpty( tableName ) ) { try { // first get the existing mapping names (if any) @@ -1229,7 +1233,8 @@ private void populateMappingComboAndFamilyStuff() { } } - @Override public HBaseService getHBaseService() throws ClusterInitializationException { + @Override + public HBaseService getHBaseService() throws ClusterInitializationException { NamedCluster nc = namedClusterWidget.getSelectedNamedCluster(); return namedClusterServiceLocator.getService( nc, HBaseService.class ); } @@ -1246,7 +1251,7 @@ public String getCurrentConfiguration() { if ( nc != null ) { host = m_transMeta.environmentSubstitute( nc.getZooKeeperHost() ); - port = m_transMeta.environmentSubstitute( nc.getZooKeeperPort() ); + port = m_transMeta.environmentSubstitute( nc.getZooKeeperPort() ); } return host + ":" + port; } diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingUtils.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingUtils.java index 9793d5524a0..f1991916704 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingUtils.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/mapping/MappingUtils.java @@ -51,6 +51,8 @@ public class MappingUtils { private static final Set TUPLE_COLUMNS = new HashSet(); + public static final String TUPLE_MAPPING_VISIBILITY = "Visibility"; + static { TUPLE_COLUMNS.add( Mapping.TupleMapping.KEY.toString() ); TUPLE_COLUMNS.add( Mapping.TupleMapping.FAMILY.toString() ); @@ -70,8 +72,8 @@ public static MappingAdmin getMappingAdmin( ConfigurationProducer cProducer ) th } } - public static MappingAdmin getMappingAdmin( HBaseService hBaseService, VariableSpace variableSpace, - String siteConfig, String defaultConfig ) throws IOException { + public static MappingAdmin getMappingAdmin( HBaseService hBaseService, VariableSpace variableSpace, String siteConfig, + String defaultConfig ) throws IOException { HBaseConnection hBaseConnection = hBaseService.getHBaseConnection( variableSpace, siteConfig, defaultConfig, null ); return new MappingAdmin( hBaseConnection ); } @@ -88,7 +90,8 @@ public static Mapping getMapping( MappingDefinition mappingDefinition, HBaseServ throw new KettleException( Messages.getString( "MappingDialog.Error.Message.NoFieldsDefined" ) ); } - Mapping theMapping = hBaseService.getMappingFactory().createMapping( tableName, mappingDefinition.getMappingName() ); + Mapping theMapping = + hBaseService.getMappingFactory().createMapping( tableName, mappingDefinition.getMappingName() ); // is the mapping a tuple mapping? final boolean isTupleMapping = isTupleMapping( mappingDefinition ); if ( isTupleMapping ) { @@ -128,7 +131,8 @@ public static Mapping getMapping( MappingDefinition mappingDefinition, HBaseServ if ( !Const.isEmpty( column.getType() ) ) { type = column.getType(); } else { - throw new KettleException( Messages.getString( "MappingDialog.Error.Message.TypeIssue" ) + ": " + columnNumber ); + throw new KettleException( Messages.getString( "MappingDialog.Error.Message.TypeIssue" ) + ": " + + columnNumber ); } HBaseValueMetaInterfaceFactory valueMetaInterfaceFactory = hBaseService.getHBaseValueMetaInterfaceFactory(); @@ -160,8 +164,8 @@ public static Mapping getMapping( MappingDefinition mappingDefinition, HBaseServ theMapping.addMappedColumn( valueMeta, isTupleMapping ); } catch ( Exception ex ) { String message = - Messages.getString( "MappingDialog.Error.Message1.DuplicateColumn" ) + family + "," + colName - + Messages.getString( "MappingDialog.Error.Message2.DuplicateColumn" ); + Messages.getString( "MappingDialog.Error.Message1.DuplicateColumn" ) + family + "," + colName + Messages + .getString( "MappingDialog.Error.Message2.DuplicateColumn" ); throw new KettleException( message ); } } @@ -196,7 +200,7 @@ public static HBaseValueMetaInterface buildNonKeyValueMeta( String alias, String public static boolean isTupleMapping( MappingDefinition mappingDefinition ) { List mappingColumns = mappingDefinition.getMappingColumns(); int mappingSize = mappingColumns.size(); - if ( mappingSize != TUPLE_COLUMNS_COUNT ) { + if ( !( mappingSize == TUPLE_COLUMNS_COUNT || mappingSize == TUPLE_COLUMNS_COUNT + 1 ) ) { return false; } int tupleIdCount = 0; @@ -205,11 +209,11 @@ public static boolean isTupleMapping( MappingDefinition mappingDefinition ) { tupleIdCount++; } } - return tupleIdCount == TUPLE_COLUMNS_COUNT; + return tupleIdCount == TUPLE_COLUMNS_COUNT || tupleIdCount == TUPLE_COLUMNS_COUNT + 1; } public static boolean isTupleMappingColumn( String columnName ) { - return TUPLE_COLUMNS.contains( columnName ); + return TUPLE_COLUMNS.contains( columnName ) || columnName.equals( TUPLE_MAPPING_VISIBILITY ); } } diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutput.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutput.java index 55e36df10d3..2ef4549fdc5 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutput.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutput.java @@ -29,18 +29,20 @@ import org.pentaho.big.data.api.cluster.service.locator.NamedClusterServiceLocator; import org.pentaho.big.data.kettle.plugins.hbase.mapping.MappingAdmin; +import org.pentaho.big.data.kettle.plugins.hbase.output.KettleRowToHBaseTuple.FieldException; import org.pentaho.bigdata.api.hbase.ByteConversionUtil; import org.pentaho.bigdata.api.hbase.HBaseConnection; import org.pentaho.bigdata.api.hbase.HBaseService; import org.pentaho.bigdata.api.hbase.mapping.Mapping; import org.pentaho.bigdata.api.hbase.meta.HBaseValueMetaInterface; +import org.pentaho.bigdata.api.hbase.table.HBaseDelete; import org.pentaho.bigdata.api.hbase.table.HBasePut; import org.pentaho.bigdata.api.hbase.table.HBaseTable; import org.pentaho.bigdata.api.hbase.table.HBaseTableWriteOperationManager; -import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.core.util.Utils; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; @@ -66,7 +68,7 @@ public class HBaseOutput extends BaseStep implements StepInterface { private HBaseTableWriteOperationManager targetTableWriteOperationManager; public HBaseOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, - Trans trans, NamedClusterServiceLocator namedClusterServiceLocator ) { + Trans trans, NamedClusterServiceLocator namedClusterServiceLocator ) { super( stepMeta, stepDataInterface, copyNr, transMeta, trans ); this.namedClusterServiceLocator = namedClusterServiceLocator; @@ -93,6 +95,12 @@ public HBaseOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int /** Index of the key in the incoming fields */ protected int m_incomingKeyIndex; + /** The ValueMetaInterface of the incoming key field */ + protected ValueMetaInterface m_incomingKeyValueMeta; + + /** Object used when a tuple is supplied as the incoming fields */ + protected KettleRowToHBaseTuple tupleRowConverter; + @Override public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException { @@ -112,7 +120,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws } } catch ( Exception ex ) { throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, - "HBaseOutput.Error.ProblemFlushingBufferedData", ex.getMessage() ), ex ); + "HBaseOutput.Error.ProblemFlushingBufferedData", ex.getMessage() ), ex ); } finally { try { targetTableWriteOperationManager.close(); @@ -124,7 +132,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws try { targetTable.close(); } catch ( IOException e ) { - //Ignore + // Ignore } try { @@ -152,7 +160,8 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws List connectionMessages = new ArrayList(); hBaseService = namedClusterServiceLocator.getService( m_meta.getNamedCluster(), HBaseService.class ); - m_hbAdmin = hBaseService.getHBaseConnection( this, environmentSubstitute( m_meta.getCoreConfigURL() ), + m_hbAdmin = + hBaseService.getHBaseConnection( this, environmentSubstitute( m_meta.getCoreConfigURL() ), environmentSubstitute( m_meta.getDefaultConfigURL() ), log ); m_bytesUtil = hBaseService.getByteConversionUtil(); @@ -174,7 +183,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws // check on the existence and readiness of the target table String targetName = environmentSubstitute( m_meta.getTargetTableName() ); - if ( Const.isEmpty( targetName ) ) { + if ( Utils.isEmpty( targetName ) ) { throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.NoTargetTableSpecified" ) ); } @@ -196,15 +205,15 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws // Get mapping details for the target table - if ( m_meta.getMapping() != null && Const.isEmpty( m_meta.getTargetMappingName() ) ) { + if ( m_meta.getMapping() != null && Utils.isEmpty( m_meta.getTargetMappingName() ) ) { m_tableMapping = m_meta.getMapping(); } else { try { logBasic( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.RetrievingMappingDetails" ) ); m_tableMapping = - m_mappingAdmin.getMapping( environmentSubstitute( m_meta.getTargetTableName() ), - environmentSubstitute( m_meta.getTargetMappingName() ) ); + m_mappingAdmin.getMapping( environmentSubstitute( m_meta.getTargetTableName() ), environmentSubstitute( + m_meta.getTargetMappingName() ) ); } catch ( Exception ex ) { throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ProblemGettingMappingInfo", ex.getMessage() ), ex ); @@ -212,40 +221,51 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws } m_columnsMappedByAlias = m_tableMapping.getMappedColumns(); - if ( m_tableMapping.isTupleMapping() ) { - throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, - "HBaseOutput.Error.CantWriteUsingATupleMapping" ) ); - } + if ( !m_meta.m_deleteRowKey && m_tableMapping.isTupleMapping() ) { + /* + * We are not executing a delete and the mapping is a tuple mapping + * Deletes need to go through the other branch of code to decode the incoming key field index + */ + try { + tupleRowConverter = new KettleRowToHBaseTuple( getInputRowMeta(), m_tableMapping, m_columnsMappedByAlias ); + } catch ( Exception e ) { + throw new KettleException( e ); + } - // check that all incoming fields are in the mapping. - // fewer fields than the mapping defines is OK as long as we have - // the key as an incoming field. Can either use strict type checking - // or use an error stream for rows where type-conversion to the mapping - // types fail. Probably should use an error stream - e.g. could get rows - // with negative numeric key value where mapping specifies an unsigned key - boolean incomingKey = false; - RowMetaInterface inMeta = getInputRowMeta(); - for ( int i = 0; i < inMeta.size(); i++ ) { - ValueMetaInterface vm = inMeta.getValueMeta( i ); - String inName = vm.getName(); - - if ( m_tableMapping.getKeyName().equals( inName ) ) { - incomingKey = true; - m_incomingKeyIndex = i; - // should we check the type? - } else { - HBaseValueMetaInterface hvm = m_columnsMappedByAlias.get( inName.trim() ); - if ( hvm == null ) { - throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, - "HBaseOutput.Error.CantFindIncomingField", inName, m_tableMapping.getMappingName() ) ); + } else { + + // check that all incoming fields are in the mapping. + // fewer fields than the mapping defines is OK as long as we have + // the key as an incoming field. Can either use strict type checking + // or use an error stream for rows where type-conversion to the mapping + // types fail. Probably should use an error stream - e.g. could get rows + // with negative numeric key value where mapping specifies an unsigned key + boolean incomingKey = false; + RowMetaInterface inMeta = getInputRowMeta(); + for ( int i = 0; i < inMeta.size(); i++ ) { + ValueMetaInterface vm = inMeta.getValueMeta( i ); + String inName = vm.getName(); + + if ( m_tableMapping.getKeyName().equals( inName ) ) { + incomingKey = true; + m_incomingKeyIndex = i; + m_incomingKeyValueMeta = vm; + // should we check the type? + } else { + HBaseValueMetaInterface hvm = m_columnsMappedByAlias.get( inName.trim() ); + if ( hvm == null && !m_meta.getDeleteRowKey() ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, + "HBaseOutput.Error.CantFindIncomingField", inName, m_tableMapping.getMappingName() ) ); + } } } - } - if ( !incomingKey ) { - throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, - "HBaseOutput.Error.TableKeyNotPresentInIncomingFields", m_tableMapping.getKeyName(), m_tableMapping - .getMappingName() ) ); + if ( !incomingKey ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, + "HBaseOutput.Error.TableKeyNotPresentInIncomingFields", m_tableMapping.getKeyName(), m_tableMapping + .getMappingName() ) ); + } + } try { @@ -253,7 +273,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws // set a write buffer size (and disable auto flush) Long writeBufferSize = null; - if ( !Const.isEmpty( m_meta.getWriteBufferSize() ) ) { + if ( !Utils.isEmpty( m_meta.getWriteBufferSize() ) ) { writeBufferSize = Long.parseLong( environmentSubstitute( m_meta.getWriteBufferSize() ) ); logBasic( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.SettingWriteBuffer", writeBufferSize ) ); @@ -272,45 +292,110 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws m_data.setOutputRowMeta( getInputRowMeta() ); } - // Put the data - HBasePut hBasePut; - try { - // key must not be null - hBasePut = - HBaseOutputData.initializeNewPut( getInputRowMeta(), m_incomingKeyIndex, r, m_tableMapping, m_bytesUtil, - targetTableWriteOperationManager, !m_meta.getDisableWriteToWAL() ); - if ( hBasePut == null ) { - String errorDescriptions = - BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.IncomingRowHasNullKeyValue" ); + + if ( m_meta.getDeleteRowKey() ) { + + try { + + if ( m_incomingKeyValueMeta.isNull( r[m_incomingKeyIndex] ) ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.IncomingRowHasNullKeyValue" ) ); + } + + byte[] encodedKeyBytes = m_bytesUtil.encodeKeyValue( r[m_incomingKeyIndex], m_incomingKeyValueMeta, m_tableMapping.getKeyType() ); + HBaseDelete hBaseDelete = targetTableWriteOperationManager.createDelete( encodedKeyBytes ); + hBaseDelete.execute(); + + } catch ( Exception ex ) { + if ( getStepMeta().isDoingErrorHandling() ) { - String errorFields = m_tableMapping.getKeyName(); - putError( getInputRowMeta(), r, 1, errorDescriptions, errorFields, "HBaaseOutput001" ); + String errorDescriptions = ""; + if ( !Utils.isEmpty( ex.getMessage() ) ) { + errorDescriptions = ex.getMessage(); + } else { + errorDescriptions = BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ErrorCreatingDelete" ); + } + putError( getInputRowMeta(), r, 1, errorDescriptions, m_tableMapping.getKeyName(), "HBaseOutput004" ); return true; } else { - throw new KettleException( errorDescriptions ); + throw new KettleException( ex ); } } - } catch ( Exception ex ) { - throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, - "HBaseOutput.Error.UnableToSetTargetTable" ), ex ); - } - // now encode the rest of the fields. Nulls do not get inserted of course - HBaseOutputData.addColumnsToPut( getInputRowMeta(), r, m_incomingKeyIndex, m_columnsMappedByAlias, hBasePut, - m_bytesUtil ); - - try { - hBasePut.execute(); - } catch ( Exception e ) { - String errorDescriptions = - BaseMessages - .getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ProblemInsertingRowIntoHBase", e.getMessage() ); - if ( getStepMeta().isDoingErrorHandling() ) { - String errorFields = "Unknown"; - putError( getInputRowMeta(), r, 1, errorDescriptions, errorFields, "HBaseOutput002" ); + } else { + // Put the data + HBasePut hBasePut; + + if ( tupleRowConverter != null ) { + + try { + + hBasePut = + tupleRowConverter.createTuplePut( targetTableWriteOperationManager, m_bytesUtil, r, !m_meta + .getDisableWriteToWAL() ); + } catch ( Exception ex ) { + + if ( getStepMeta().isDoingErrorHandling() ) { + String errorDescriptions = ""; + String errorFields = "Unknown"; + if ( ex instanceof FieldException ) { + errorFields = ( (FieldException) ex ).getFieldString(); + errorDescriptions = BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.MissingFieldData", errorFields ); + } else if ( !Utils.isEmpty( ex.getMessage() ) ) { + errorDescriptions = ex.getMessage(); + } else { + errorDescriptions = BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ErrorCreatingPut" ); + } + putError( getInputRowMeta(), r, 1, errorDescriptions, errorFields, "HBaseOutput003" ); + + return true; + } else { + throw new KettleException( ex ); + } + + } + } else { - throw new KettleException( errorDescriptions, e ); + + try { + // key must not be null + hBasePut = + HBaseOutputData.initializeNewPut( getInputRowMeta(), m_incomingKeyIndex, r, m_tableMapping, m_bytesUtil, + targetTableWriteOperationManager, !m_meta.getDisableWriteToWAL() ); + if ( hBasePut == null ) { + String errorDescriptions = + BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.IncomingRowHasNullKeyValue" ); + if ( getStepMeta().isDoingErrorHandling() ) { + String errorFields = m_tableMapping.getKeyName(); + putError( getInputRowMeta(), r, 1, errorDescriptions, errorFields, "HBaseOutput001" ); + + return true; + } else { + throw new KettleException( errorDescriptions ); + } + } + } catch ( Exception ex ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, + "HBaseOutput.Error.UnableToSetTargetTable" ), ex ); + } + + // now encode the rest of the fields. Nulls do not get inserted of course + HBaseOutputData.addColumnsToPut( getInputRowMeta(), r, m_incomingKeyIndex, m_columnsMappedByAlias, hBasePut, + m_bytesUtil ); + } + + try { + hBasePut.execute(); + } catch ( Exception e ) { + String errorDescriptions = + BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ProblemInsertingRowIntoHBase", e + .getMessage() ); + if ( getStepMeta().isDoingErrorHandling() ) { + String errorFields = "Unknown"; + putError( getInputRowMeta(), r, 1, errorDescriptions, errorFields, "HBaseOutput002" ); + } else { + throw new KettleException( errorDescriptions, e ); + } } } @@ -358,7 +443,7 @@ public void setStopped( boolean stopped ) { } } catch ( Exception ex ) { logError( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ProblemFlushingBufferedData", ex - .getMessage() ), ex ); + .getMessage() ), ex ); } } if ( m_hbAdmin != null ) { @@ -367,7 +452,7 @@ public void setStopped( boolean stopped ) { m_hbAdmin.close(); } catch ( Exception ex ) { logError( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.ProblemWhenClosingConnection", ex - .getMessage() ), ex ); + .getMessage() ), ex ); } } } diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputData.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputData.java index 96c67245569..8cfeab6f632 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputData.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputData.java @@ -81,9 +81,8 @@ public void setOutputRowMeta( RowMetaInterface rmi ) { * if a problem occurs when initializing the new put operation */ public static HBasePut initializeNewPut( RowMetaInterface inRowMeta, int keyIndex, Object[] kettleRow, - Mapping tableMapping, ByteConversionUtil bu, - HBaseTableWriteOperationManager hBaseTableWriteOperationManager, - boolean writeToWAL ) throws Exception { + Mapping tableMapping, ByteConversionUtil bu, HBaseTableWriteOperationManager hBaseTableWriteOperationManager, + boolean writeToWAL ) throws Exception { ValueMetaInterface keyvm = inRowMeta.getValueMeta( keyIndex ); if ( keyvm.isNull( kettleRow[keyIndex] ) ) { @@ -92,8 +91,7 @@ public static HBasePut initializeNewPut( RowMetaInterface inRowMeta, int keyInde byte[] encodedKey = bu.encodeKeyValue( kettleRow[keyIndex], keyvm, tableMapping.getKeyType() ); - HBasePut hBaseTablePut = hBaseTableWriteOperationManager - .createPut( bu.encodeKeyValue( kettleRow[ keyIndex ], keyvm, tableMapping.getKeyType() ) ); + HBasePut hBaseTablePut = hBaseTableWriteOperationManager.createPut( encodedKey ); hBaseTablePut.setWriteToWAL( writeToWAL ); return hBaseTablePut; } @@ -118,7 +116,7 @@ public static HBasePut initializeNewPut( RowMetaInterface inRowMeta, int keyInde * if a problem occurs when adding a column to the put operation */ public static void addColumnsToPut( RowMetaInterface inRowMeta, Object[] kettleRow, int keyIndex, - Map columnsMappedByAlias, HBasePut hBasePut, ByteConversionUtil bu ) + Map columnsMappedByAlias, HBasePut hBasePut, ByteConversionUtil bu ) throws KettleException { for ( int i = 0; i < inRowMeta.size(); i++ ) { @@ -160,4 +158,5 @@ public static URL stringToURL( String pathOrURL ) throws MalformedURLException { return result; } + } diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputDialog.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputDialog.java index de4b6a96a54..422fa525a13 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputDialog.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputDialog.java @@ -64,6 +64,7 @@ import org.pentaho.di.core.Props; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.util.Utils; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStepMeta; @@ -121,6 +122,9 @@ public class HBaseOutputDialog extends BaseStepDialog implements StepDialogInter private Button m_mappingNamesBut; private CCombo m_mappingNamesCombo; + //Delete row key line + private Button m_deleteRowKeyBut; + /** Store the mapping information in the step's meta data */ private Button m_storeMappingInStepMetaData; @@ -444,6 +448,7 @@ public void modifyText( ModifyEvent e ) { fd.right = new FormAttachment( m_mappingNamesBut, -margin ); m_mappingNamesCombo.setLayoutData( fd ); + // store mapping in meta data Label storeMapping = new Label( wConfigComp, SWT.RIGHT ); storeMapping.setText( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutputDialog.StoreMapping.Label" ) ); @@ -464,6 +469,33 @@ public void modifyText( ModifyEvent e ) { fd.top = new FormAttachment( m_mappingNamesCombo, margin ); m_storeMappingInStepMetaData.setLayoutData( fd ); + + //delete rows by key option + Label deleteRows = new Label( wConfigComp, SWT.RIGHT ); + deleteRows.setText( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutputDialog.DeleteRowKey.Label" ) ); + deleteRows.setToolTipText( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutputDialog.DeleteRowKey.TipText" ) ); + props.setLook( deleteRows ); + + fd = new FormData(); + fd.left = new FormAttachment( 0, 0 ); + fd.top = new FormAttachment( m_storeMappingInStepMetaData, margin ); + fd.right = new FormAttachment( middle, -margin ); + deleteRows.setLayoutData( fd ); + + m_deleteRowKeyBut = new Button( wConfigComp, SWT.CHECK ); + props.setLook( m_deleteRowKeyBut ); + fd = new FormData(); + fd.right = new FormAttachment( 100, 0 ); + fd.left = new FormAttachment( middle, 0 ); + fd.top = new FormAttachment( m_storeMappingInStepMetaData, margin ); + m_deleteRowKeyBut.setLayoutData( fd ); + + m_deleteRowKeyBut.addSelectionListener( new SelectionAdapter() { + public void widgetSelected( SelectionEvent se ) { + walEnabled(); + }; + } ); + // disable write to WAL Label disableWALLab = new Label( wConfigComp, SWT.RIGHT ); disableWALLab.setText( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutputDialog.DisableWAL.Label" ) ); @@ -472,7 +504,7 @@ public void modifyText( ModifyEvent e ) { props.setLook( disableWALLab ); fd = new FormData(); fd.left = new FormAttachment( 0, 0 ); - fd.top = new FormAttachment( m_storeMappingInStepMetaData, margin ); + fd.top = new FormAttachment( m_deleteRowKeyBut, margin ); fd.right = new FormAttachment( middle, -margin ); disableWALLab.setLayoutData( fd ); @@ -482,7 +514,7 @@ public void modifyText( ModifyEvent e ) { props.setLook( m_disableWriteToWALBut ); fd = new FormData(); fd.left = new FormAttachment( middle, 0 ); - fd.top = new FormAttachment( m_storeMappingInStepMetaData, margin ); + fd.top = new FormAttachment( m_deleteRowKeyBut, margin ); // fd.right = new FormAttachment(middle, -margin); m_disableWriteToWALBut.setLayoutData( fd ); @@ -624,7 +656,7 @@ protected void cancel() { } protected void ok() { - if ( Const.isEmpty( m_stepnameText.getText() ) ) { + if ( Utils.isEmpty( m_stepnameText.getText() ) ) { MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR ); mb.setText( BaseMessages.getString( HBaseOutputMeta.PKG, "System.StepJobEntryNameMissing.Title" ) ); mb.setMessage( BaseMessages.getString( HBaseOutputMeta.PKG, "System.JobEntryNameMissing.Msg" ) ); @@ -654,7 +686,7 @@ protected void ok() { updateMetaConnectionDetails( m_currentMeta ); if ( m_storeMappingInStepMetaData.getSelection() ) { - if ( Const.isEmpty( m_mappingNamesCombo.getText() ) ) { + if ( Utils.isEmpty( m_mappingNamesCombo.getText() ) ) { List problems = new ArrayList(); Mapping toSet = m_mappingEditor.getMapping( false, problems, false ); if ( problems.size() > 0 ) { @@ -731,7 +763,7 @@ protected void ok() { } protected void updateMetaConnectionDetails( HBaseOutputMeta meta ) { - if ( Const.isEmpty( m_stepnameText.getText() ) ) { + if ( Utils.isEmpty( m_stepnameText.getText() ) ) { return; } @@ -745,6 +777,8 @@ protected void updateMetaConnectionDetails( HBaseOutputMeta meta ) { meta.setTargetTableName( m_mappedTableNamesCombo.getText() ); meta.setTargetMappingName( m_mappingNamesCombo.getText() ); + meta.setDeleteRowKey( m_deleteRowKeyBut.getSelection() ); + meta.setDisableWriteToWAL( m_disableWriteToWALBut.getSelection() ); meta.setWriteBufferSize( m_writeBufferSizeText.getText() ); @@ -754,32 +788,38 @@ private void getData() { namedClusterWidget.setSelectedNamedCluster( m_currentMeta.getNamedCluster().getName() ); - if ( !Const.isEmpty( m_currentMeta.getCoreConfigURL() ) ) { + if ( !Utils.isEmpty( m_currentMeta.getCoreConfigURL() ) ) { m_coreConfigText.setText( m_currentMeta.getCoreConfigURL() ); } - if ( !Const.isEmpty( m_currentMeta.getDefaultConfigURL() ) ) { + if ( !Utils.isEmpty( m_currentMeta.getDefaultConfigURL() ) ) { m_defaultConfigText.setText( m_currentMeta.getDefaultConfigURL() ); } - if ( !Const.isEmpty( m_currentMeta.getTargetTableName() ) ) { + if ( !Utils.isEmpty( m_currentMeta.getTargetTableName() ) ) { m_mappedTableNamesCombo.setText( m_currentMeta.getTargetTableName() ); } - if ( !Const.isEmpty( m_currentMeta.getTargetMappingName() ) ) { + if ( !Utils.isEmpty( m_currentMeta.getTargetMappingName() ) ) { m_mappingNamesCombo.setText( m_currentMeta.getTargetMappingName() ); } + m_deleteRowKeyBut.setSelection( m_currentMeta.getDeleteRowKey() ); + m_disableWriteToWALBut.setSelection( m_currentMeta.getDisableWriteToWAL() ); - if ( !Const.isEmpty( m_currentMeta.getWriteBufferSize() ) ) { + walEnabled(); + + if ( !Utils.isEmpty( m_currentMeta.getWriteBufferSize() ) ) { m_writeBufferSizeText.setText( m_currentMeta.getWriteBufferSize() ); } - if ( Const.isEmpty( m_currentMeta.getTargetMappingName() ) && m_currentMeta.getMapping() != null ) { + if ( Utils.isEmpty( m_currentMeta.getTargetMappingName() ) && m_currentMeta.getMapping() != null ) { m_mappingEditor.setMapping( m_currentMeta.getMapping() ); m_storeMappingInStepMetaData.setSelection( true ); } + + } @Override public HBaseService getHBaseService() throws ClusterInitializationException { @@ -795,11 +835,11 @@ private void getData() { String defaultConf = ""; String zookeeperHosts = ""; - if ( !Const.isEmpty( m_coreConfigText.getText() ) ) { + if ( !Utils.isEmpty( m_coreConfigText.getText() ) ) { coreConf = transMeta.environmentSubstitute( m_coreConfigText.getText() ); } - if ( !Const.isEmpty( m_defaultConfigText.getText() ) ) { + if ( !Utils.isEmpty( m_defaultConfigText.getText() ) ) { defaultConf = transMeta.environmentSubstitute( m_defaultConfigText.getText() ); } @@ -808,7 +848,7 @@ private void getData() { zookeeperHosts = transMeta.environmentSubstitute( nc.getZooKeeperHost() ); } - if ( Const.isEmpty( zookeeperHosts ) && Const.isEmpty( coreConf ) && Const.isEmpty( defaultConf ) ) { + if ( Utils.isEmpty( zookeeperHosts ) && Utils.isEmpty( coreConf ) && Utils.isEmpty( defaultConf ) ) { throw new IOException( BaseMessages.getString( HBaseOutputMeta.PKG, "MappingDialog.Error.Message.CantConnectNoConnectionDetailsProvided" ) ); } @@ -851,7 +891,7 @@ private void setupMappedTableNames() { private void setupMappingNamesForTable( boolean quiet ) { m_mappingNamesCombo.removeAll(); - if ( !Const.isEmpty( m_mappedTableNamesCombo.getText() ) ) { + if ( !Utils.isEmpty( m_mappedTableNamesCombo.getText() ) ) { HBaseConnection connection = null; try { connection = getHBaseConnection(); @@ -906,4 +946,8 @@ public String getCurrentConfiguration() { updateMetaConnectionDetails( m_configurationMeta ); return m_configurationMeta.getXML(); } + + public void walEnabled() { + m_disableWriteToWALBut.setEnabled( !m_deleteRowKeyBut.getSelection() ); + } } diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMeta.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMeta.java index ce626c6c599..8c450c2215f 100644 --- a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMeta.java +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMeta.java @@ -36,7 +36,6 @@ import org.pentaho.bigdata.api.hbase.mapping.Mapping; import org.pentaho.di.core.CheckResult; import org.pentaho.di.core.CheckResultInterface; -import org.pentaho.di.core.Const; import org.pentaho.di.core.annotations.Step; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; @@ -45,6 +44,7 @@ import org.pentaho.di.core.injection.InjectionDeep; import org.pentaho.di.core.injection.InjectionSupported; import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.util.Utils; import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.Variables; import org.pentaho.di.core.xml.XMLHandler; @@ -96,6 +96,10 @@ public class HBaseOutputMeta extends BaseStepMeta implements StepMetaInterface { @Injection( name = "TARGET_MAPPING_NAME" ) protected String m_targetMappingName; + /** if true then the incoming column with row key from the mapping will be deleted */ + @Injection( name = "DELETE_ROW_KEY" ) + protected boolean m_deleteRowKey; + /** if true then the WAL will not be written to */ @Injection( name = "DISABLE_WRITE_TO_WAL" ) protected boolean m_disableWriteToWAL; @@ -211,6 +215,14 @@ public String getTargetMappingName() { return m_targetMappingName; } + public boolean getDeleteRowKey() { + return m_deleteRowKey; + } + + public void setDeleteRowKey( boolean m_deleteRowKey ) { + this.m_deleteRowKey = m_deleteRowKey; + } + public void setDisableWriteToWAL( boolean d ) { m_disableWriteToWAL = d; } @@ -290,23 +302,27 @@ public String getXML() { namedClusterLoadSaveUtil .getXml( retval, namedClusterService, namedCluster, repository == null ? null : repository.getMetaStore(), getLog() ); - if ( !Const.isEmpty( m_coreConfigURL ) ) { + if ( !Utils.isEmpty( m_coreConfigURL ) ) { retval.append( "\n " ).append( XMLHandler.addTagValue( "core_config_url", m_coreConfigURL ) ); } - if ( !Const.isEmpty( m_defaultConfigURL ) ) { + if ( !Utils.isEmpty( m_defaultConfigURL ) ) { retval.append( "\n " ).append( XMLHandler.addTagValue( "default_config_url", m_defaultConfigURL ) ); } - if ( !Const.isEmpty( m_targetTableName ) ) { + if ( !Utils.isEmpty( m_targetTableName ) ) { retval.append( "\n " ).append( XMLHandler.addTagValue( "target_table_name", m_targetTableName ) ); } - if ( !Const.isEmpty( m_targetMappingName ) ) { + if ( !Utils.isEmpty( m_targetMappingName ) ) { retval.append( "\n " ).append( XMLHandler.addTagValue( "target_mapping_name", m_targetMappingName ) ); } - if ( !Const.isEmpty( m_writeBufferSize ) ) { + + retval.append( "\n " ).append( XMLHandler.addTagValue( "delete_rows_by_key", m_deleteRowKey ) ); + + if ( !Utils.isEmpty( m_writeBufferSize ) ) { retval.append( "\n " ).append( XMLHandler.addTagValue( "write_buffer_size", m_writeBufferSize ) ); } retval.append( "\n " ).append( XMLHandler.addTagValue( "disable_wal", m_disableWriteToWAL ) ); + if ( m_mapping != null ) { retval.append( m_mapping.getXML() ); } @@ -333,6 +349,10 @@ public StepDataInterface getStepData() { m_defaultConfigURL = XMLHandler.getTagValue( stepnode, "default_config_url" ); m_targetTableName = XMLHandler.getTagValue( stepnode, "target_table_name" ); m_targetMappingName = XMLHandler.getTagValue( stepnode, "target_mapping_name" ); + String deleteKeys = XMLHandler.getTagValue( stepnode, "delete_rows_by_key" ); + if ( !Utils.isEmpty( deleteKeys ) ) { + m_deleteRowKey = deleteKeys.equalsIgnoreCase( "Y" ); + } m_writeBufferSize = XMLHandler.getTagValue( stepnode, "write_buffer_size" ); String disableWAL = XMLHandler.getTagValue( stepnode, "disable_wal" ); m_disableWriteToWAL = disableWAL.equalsIgnoreCase( "Y" ); @@ -360,6 +380,7 @@ public StepDataInterface getStepData() { m_defaultConfigURL = rep.getStepAttributeString( id_step, 0, "default_config_url" ); m_targetTableName = rep.getStepAttributeString( id_step, 0, "target_table_name" ); m_targetMappingName = rep.getStepAttributeString( id_step, 0, "target_mapping_name" ); + m_deleteRowKey = rep.getStepAttributeBoolean( id_step, 0, "delete_rows_by_key" ); m_writeBufferSize = rep.getStepAttributeString( id_step, 0, "write_buffer_size" ); m_disableWriteToWAL = rep.getStepAttributeBoolean( id_step, 0, "disable_wal" ); @@ -380,19 +401,22 @@ public StepDataInterface getStepData() { @Override public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transformation, ObjectId id_step ) throws KettleException { namedClusterLoadSaveUtil.saveRep( rep, metaStore, id_transformation, id_step, namedClusterService, namedCluster, getLog() ); - if ( !Const.isEmpty( m_coreConfigURL ) ) { + if ( !Utils.isEmpty( m_coreConfigURL ) ) { rep.saveStepAttribute( id_transformation, id_step, 0, "core_config_url", m_coreConfigURL ); } - if ( !Const.isEmpty( m_defaultConfigURL ) ) { + if ( !Utils.isEmpty( m_defaultConfigURL ) ) { rep.saveStepAttribute( id_transformation, id_step, 0, "default_config_url", m_defaultConfigURL ); } - if ( !Const.isEmpty( m_targetTableName ) ) { + if ( !Utils.isEmpty( m_targetTableName ) ) { rep.saveStepAttribute( id_transformation, id_step, 0, "target_table_name", m_targetTableName ); } - if ( !Const.isEmpty( m_targetMappingName ) ) { + if ( !Utils.isEmpty( m_targetMappingName ) ) { rep.saveStepAttribute( id_transformation, id_step, 0, "target_mapping_name", m_targetMappingName ); } - if ( !Const.isEmpty( m_writeBufferSize ) ) { + + rep.saveStepAttribute( id_transformation, id_step, 0, "delete_rows_by_key", m_deleteRowKey ); + + if ( !Utils.isEmpty( m_writeBufferSize ) ) { rep.saveStepAttribute( id_transformation, id_step, 0, "write_buffer_size", m_writeBufferSize ); } rep.saveStepAttribute( id_transformation, id_step, 0, "disable_wal", m_disableWriteToWAL ); @@ -407,6 +431,7 @@ public void setDefault() { m_defaultConfigURL = null; m_targetTableName = null; m_targetMappingName = null; + m_deleteRowKey = false; m_disableWriteToWAL = false; m_writeBufferSize = null; namedCluster = namedClusterService.getClusterTemplate(); diff --git a/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTuple.java b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTuple.java new file mode 100644 index 00000000000..5280545a36f --- /dev/null +++ b/kettle-plugins/hbase/src/main/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTuple.java @@ -0,0 +1,190 @@ +/******************************************************************************* + * + * Pentaho Big Data + * + * Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.big.data.kettle.plugins.hbase.output; + +import java.util.Map; + +import org.pentaho.big.data.kettle.plugins.hbase.mapping.MappingUtils; +import org.pentaho.bigdata.api.hbase.ByteConversionUtil; +import org.pentaho.bigdata.api.hbase.mapping.Mapping; +import org.pentaho.bigdata.api.hbase.mapping.Mapping.KeyType; +import org.pentaho.bigdata.api.hbase.meta.HBaseValueMetaInterface; +import org.pentaho.bigdata.api.hbase.table.HBasePut; +import org.pentaho.bigdata.api.hbase.table.HBaseTableWriteOperationManager; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.ValueMetaInterface; +import org.pentaho.di.i18n.BaseMessages; + +public class KettleRowToHBaseTuple { + + private int keyIndex = -1; + private ValueMetaInterface keyInMeta; + private KeyType keyType; + + private int familyIndex = -1; + private ValueMetaInterface familyInMeta; + + private int columnIndex = -1; + private ValueMetaInterface columnInMeta; + + private int valueIndex = -1; + private ValueMetaInterface valueInMeta; + private HBaseValueMetaInterface valueMeta; + + private int visibilityIndex = -1; + private ValueMetaInterface visibilityInMeta; + private HBaseValueMetaInterface visibilityMeta; + + /** + * Creates a conversion class that converts an incoming row object with values for the various Tuple fields into an HBasePut + * + * @param inputRowMeta + * The row meta of the incoming row structure + * @param tupleMapping + * The mapping in use for the step + * @param columnMapping + * The non-KEY columns in the mapping mapped by column alias + * @throws KettleException + */ + public KettleRowToHBaseTuple( RowMetaInterface inputRowMeta, Mapping tupleMapping, + Map columnMapping ) throws KettleException { + + String keyName = tupleMapping.getKeyName(); + keyIndex = inputRowMeta.indexOfValue( keyName ); + if ( keyIndex < 0 ) { + // No Key Column + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.NoKeyColumn" ) ); + } + keyInMeta = inputRowMeta.getValueMeta( keyIndex ); + keyType = tupleMapping.getKeyType(); + + familyIndex = inputRowMeta.indexOfValue( Mapping.TupleMapping.FAMILY.toString() ); + if ( familyIndex < 0 ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.NoFamilyColumn" ) ); + } + familyInMeta = inputRowMeta.getValueMeta( familyIndex ); + + columnIndex = inputRowMeta.indexOfValue( Mapping.TupleMapping.COLUMN.toString() ); + if ( columnIndex < 0 ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.NoColumnColumn" ) ); + } + columnInMeta = inputRowMeta.getValueMeta( columnIndex ); + + // NOTE: TIMESTAMPS cannot be written via HBase Put, so the column is useless for writing + + valueIndex = inputRowMeta.indexOfValue( Mapping.TupleMapping.VALUE.toString() ); + if ( valueIndex < 0 ) { + throw new KettleException( BaseMessages.getString( HBaseOutputMeta.PKG, "HBaseOutput.Error.NoValueColumn" ) ); + } + valueInMeta = inputRowMeta.getValueMeta( valueIndex ); + valueMeta = columnMapping.get( valueInMeta.getName() ); + + // NOTE: The Visibility Index is optional + visibilityIndex = inputRowMeta.indexOfValue( MappingUtils.TUPLE_MAPPING_VISIBILITY ); + if ( visibilityIndex >= 0 ) { + visibilityInMeta = inputRowMeta.getValueMeta( visibilityIndex ); + visibilityMeta = columnMapping.get( visibilityInMeta.getName() ); + if ( visibilityMeta == null ) { + // There is no column mapping for Visibility, so disable it by removing the index in the RowMeta + visibilityInMeta = null; + visibilityIndex = -1; + } + } + + } + + /** + * Creates an HBasePut representing the tuple by extracting data from a row + * + * @param hBaseTableWriteOperationManager + * HBase write manager + * @param bu + * The Byte Conversion utility (Required for key conversion) + * @param row + * Object containing row data + * @param writeToWAL + * Should data be written to WAL? + * @return An HBase Put for the tuple + * @throws Exception + */ + public HBasePut createTuplePut( HBaseTableWriteOperationManager hBaseTableWriteOperationManager, + ByteConversionUtil bu, Object[] row, boolean writeToWAL ) throws Exception { + + if ( keyInMeta.isNull( row[keyIndex] ) ) { + throw new FieldException( Mapping.TupleMapping.KEY ); + } + if ( familyInMeta.isNull( row[familyIndex] ) ) { + throw new FieldException( Mapping.TupleMapping.FAMILY ); + } + if ( columnInMeta.isNull( row[columnIndex] ) ) { + throw new FieldException( Mapping.TupleMapping.COLUMN ); + } + if ( valueInMeta.isNull( row[valueIndex] ) ) { + throw new FieldException( Mapping.TupleMapping.VALUE ); + } + + byte[] encodedKey = bu.encodeKeyValue( row[keyIndex], keyInMeta, keyType ); + + HBasePut put = hBaseTableWriteOperationManager.createPut( encodedKey ); + + // Note: Families must always be string with the implementation of HBasePut + String columnFamily = familyInMeta.getString( row[familyIndex] ); + + boolean binaryColName = false; + String columnName = columnInMeta.getString( row[columnIndex] ); + if ( columnName.startsWith( "@@@binary@@@" ) ) { + // assume hex encoded column name + columnName = columnName.replace( "@@@binary@@@", "" ); + binaryColName = true; + } + + byte[] encodedValue = valueMeta.encodeColumnValue( row[valueIndex], valueInMeta ); + put.addColumn( columnFamily, columnName, binaryColName, encodedValue ); + + if ( visibilityIndex >= 0 && !visibilityInMeta.isNull( row[visibilityIndex] ) ) { + byte[] encodedVisibility = visibilityMeta.encodeColumnValue( row[visibilityIndex], visibilityInMeta ); + put.addColumn( columnFamily, MappingUtils.TUPLE_MAPPING_VISIBILITY, false, encodedVisibility ); + } + + put.setWriteToWAL( writeToWAL ); + return put; + } + + public static class FieldException extends Exception { + + public Mapping.TupleMapping field; + + public FieldException( Mapping.TupleMapping field ) { + super(); + this.field = field; + } + + public String getFieldString() { + return field.toString(); + } + + } + +} diff --git a/kettle-plugins/hbase/src/main/resources/org/pentaho/big/data/kettle/plugins/hbase/output/messages/messages_en_US.properties b/kettle-plugins/hbase/src/main/resources/org/pentaho/big/data/kettle/plugins/hbase/output/messages/messages_en_US.properties index b763ea6bf1d..ba6c376950a 100644 --- a/kettle-plugins/hbase/src/main/resources/org/pentaho/big/data/kettle/plugins/hbase/output/messages/messages_en_US.properties +++ b/kettle-plugins/hbase/src/main/resources/org/pentaho/big/data/kettle/plugins/hbase/output/messages/messages_en_US.properties @@ -17,7 +17,7 @@ HBaseOutputDialog.TableName.TipText=The name of the HBase table to write to HBaseOutputDialog.TableName.Button=Get table names HBaseOutputDialog.FileType.XML=XML config file HBaseOutputDialog.NamedCluster.Label=Hadoop cluster -HBaseOutputDialog.ClusterConfig.TipText=Hadoop cluster to use for setting ZooKeeper host(s) and port +HBaseOutputDialog.NamedCluster.TipText=Hadoop cluster to use for setting ZooKeeper host(s) and port HBaseOutputDialog.ClusterMissingValues.Msg=The selected Hadoop cluster is missing required values. HBaseOutputDialog.ClusterNotSelected.Msg=You must select a Hadoop cluster to continue. HBaseOutputDialog.NamedClusterNotSelected.Msg=You must select a Hadoop cluster to continue. @@ -27,6 +27,9 @@ HBaseOutputDialog.MappingName.TipText=Mapping to use for the above HBase table HBaseOutputDialog.MappingName.Button=Get mappings HBaseOutputDialog.MappingName.Button=Get mappings for the specified table +HBaseOutputDialog.DeleteRowKey.Label=Delete rows by mapping key +HBaseOutputDialog.DeleteRowKey.TipText=Deletes data from the HBase table based on the row key defined in the mapping + HBaseOutputDialog.StoreMapping.Label=Store mapping info in step meta data HBaseOutputDialog.StoreMapping.TipText=Store the mapping in the step''s meta data, rather than load it from HBase at runtime @@ -127,7 +130,6 @@ HBaseOutput.Error.TargetTableDoesNotExist=Target table "{0}" does not exist! HBaseOutput.Error.TargetTableIsNotAvailable=Target table "{0}" is not available! HBaseOutput.Error.ProblemWhenCheckingAvailReadiness=A problem occurred when trying to check availability/readiness of target table "{0}": {1} HBaseOutput.Error.ProblemGettingMappingInfo=Problem getting mapping information: {0} -HBaseOutput.Error.CantWriteUsingATupleMapping=HBaseOutput can't write using a tuple mapping! HBaseOutput.Error.CantFindIncomingField=Can't find incoming field "{0}" defined in the mapping "{1}" HBaseOutput.Error.TableKeyNotPresentInIncomingFields=The table key "{0}" defined in mapping "{1}" does not seem to be present in the incoming fields HBaseOutput.Error.ProblemConnectingToTargetTable=Problem connecting to target table: {0} @@ -138,6 +140,15 @@ HBaseOutput.Error.UnableToSetTargetTable=Unable to set a new target table to wri HBaseOutput.Error.UnableToAddColumnToTargetTablePut=Unable to add a column to the current target table put operation HBaseOutput.Error.ServiceStatus=Cannot communicate with HBaseService\nSaving the transformation may lose data.\nPlease correct the communication issue before working with this transformation\n +HBaseOutput.Error.ErrorCreatingDelete=Error creating the HBase delete! +HBaseOutput.Error.MissingFieldData=The incoming row tuple has a null value in the "{0}" field! +HBaseOutput.Error.ErrorCreatingPut=Error creating the HBase put for tuple row +HBaseOutput.Error.NoKeyColumn=No key field was found in the incoming stream +HBaseOutput.Error.NoFamilyColumn=No family field was found in the incoming stream +HBaseOutput.Error.NoColumnColumn=No column name field was found in the incoming stream +HBaseOutput.Error.NoValueColumn=No value field was found in the incoming stream + + Dialog.Error=Error HBaseOutput.Injection.HBASE_SITE_XML_URL=The address of the hbase-site.xml file. @@ -151,6 +162,7 @@ HBaseOutput.Injection.WRITE_BUFFER_SIZE=Specify the size of the write buffer use HBaseOutput.Injection.MAPPING=Mappings HBaseOutput.Injection.TABLE_NAME=The name of the HBase table. HBaseOutput.Injection.MAPPING_NAME=The name of the map to use for the HBase table. +HBaseOutput.Injection.DELETE_ROW_KEY=Delete the row key (specified in the mapping) from the HBase table. HBaseOutput.Injection.MAPPING_ALIAS=The name to assign to the HBase table key. HBaseOutput.Injection.MAPPING_KEY=This option indicates if the column is the key for the table. diff --git a/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMetaInjectionTest.java b/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMetaInjectionTest.java index 5f3683689cf..6220ed33d2d 100644 --- a/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMetaInjectionTest.java +++ b/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/HBaseOutputMetaInjectionTest.java @@ -65,6 +65,12 @@ public String get() { return meta.getTargetMappingName(); } } ); + check( "DELETE_ROW_KEY", new BooleanGetter() { + @Override + public boolean get() { + return meta.getDeleteRowKey(); + } + } ); check( "DISABLE_WRITE_TO_WAL", new BooleanGetter() { public boolean get() { return meta.getDisableWriteToWAL(); diff --git a/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTupleTest.java b/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTupleTest.java new file mode 100644 index 00000000000..34a07af9a38 --- /dev/null +++ b/kettle-plugins/hbase/src/test/java/org/pentaho/big/data/kettle/plugins/hbase/output/KettleRowToHBaseTupleTest.java @@ -0,0 +1,104 @@ +package org.pentaho.big.data.kettle.plugins.hbase.output; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.pentaho.big.data.kettle.plugins.hbase.mapping.MappingUtils; +import org.pentaho.big.data.kettle.plugins.hbase.output.KettleRowToHBaseTuple.FieldException; +import org.pentaho.bigdata.api.hbase.ByteConversionUtil; +import org.pentaho.bigdata.api.hbase.mapping.Mapping; +import org.pentaho.bigdata.api.hbase.mapping.Mapping.KeyType; +import org.pentaho.bigdata.api.hbase.mapping.Mapping.TupleMapping; +import org.pentaho.bigdata.api.hbase.meta.HBaseValueMetaInterface; +import org.pentaho.bigdata.api.hbase.table.HBasePut; +import org.pentaho.bigdata.api.hbase.table.HBaseTableWriteOperationManager; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.core.row.value.ValueMetaString; + +@RunWith( org.mockito.runners.MockitoJUnitRunner.class ) +public class KettleRowToHBaseTupleTest { + + @Test + public void testRowConversion() throws Exception { + + RowMetaInterface inputRowMeta = Mockito.mock( RowMetaInterface.class ); + + when(inputRowMeta.indexOfValue( Mapping.TupleMapping.KEY.toString() )).thenReturn( 0 ); + when(inputRowMeta.indexOfValue( Mapping.TupleMapping.FAMILY.toString() )).thenReturn( 1 ); + when(inputRowMeta.indexOfValue( Mapping.TupleMapping.COLUMN.toString() )).thenReturn( 2 ); + when(inputRowMeta.indexOfValue( Mapping.TupleMapping.VALUE.toString() )).thenReturn( 3 ); + when(inputRowMeta.indexOfValue( MappingUtils.TUPLE_MAPPING_VISIBILITY )).thenReturn( 4 ); + + ValueMetaString keyMeta = new ValueMetaString( Mapping.TupleMapping.KEY.toString() ); + ValueMetaString familyMeta = new ValueMetaString( Mapping.TupleMapping.FAMILY.toString() ); + ValueMetaString columnMeta = new ValueMetaString( Mapping.TupleMapping.COLUMN.toString() ); + ValueMetaString valueMeta = new ValueMetaString( Mapping.TupleMapping.VALUE.toString() ); + ValueMetaString visMeta = new ValueMetaString( MappingUtils.TUPLE_MAPPING_VISIBILITY ); + + when(inputRowMeta.getValueMeta( 0 ) ).thenReturn( keyMeta ); + when(inputRowMeta.getValueMeta( 1 ) ).thenReturn( familyMeta ); + when(inputRowMeta.getValueMeta( 2 ) ).thenReturn( columnMeta ); + when(inputRowMeta.getValueMeta( 3 ) ).thenReturn( valueMeta ); + when(inputRowMeta.getValueMeta( 4 ) ).thenReturn( visMeta ); + + + Mapping tupleMapping = Mockito.mock( Mapping.class ); + + when(tupleMapping.getKeyName()).thenReturn( Mapping.TupleMapping.KEY.toString() ); + when(tupleMapping.getKeyType()).thenReturn( KeyType.STRING ); + + + Map columnMap = new HashMap<>(); + + HBaseValueMetaInterface hvmi = Mockito.mock( HBaseValueMetaInterface.class ); + + columnMap.put( valueMeta.getName(), hvmi ); + + HBaseValueMetaInterface hvmiv = Mockito.mock( HBaseValueMetaInterface.class ); + + columnMap.put( visMeta.getName(), hvmiv ); + + + KettleRowToHBaseTuple rowConverter = new KettleRowToHBaseTuple( inputRowMeta, tupleMapping, columnMap ); + + + ByteConversionUtil byteConversionUtil = Mockito.mock( ByteConversionUtil.class ); + + String row[] = { "key", "family", "@@@binary@@@column", "value", "public" }; + + HBaseTableWriteOperationManager writeManager = Mockito.mock( HBaseTableWriteOperationManager.class ); + + HBasePut put = Mockito.mock( HBasePut.class ); + + when(writeManager.createPut( row[0].getBytes() ) ).thenReturn( put ); + + when(byteConversionUtil.encodeKeyValue(row[0], keyMeta, KeyType.STRING)).thenReturn( row[0].getBytes() ); + + rowConverter.createTuplePut( writeManager, byteConversionUtil, row, true ); + + verify( put, times( 1 ) ).addColumn( eq(row[1]), eq("column"), eq(true), any() ); + verify( put, times( 1 ) ).addColumn( eq(row[1]), eq( MappingUtils.TUPLE_MAPPING_VISIBILITY), eq( false ), any() ); + verify( put, times( 1 ) ).setWriteToWAL( true ); + + } + + @Test + public void testException() { + + FieldException fieldException = new FieldException( TupleMapping.KEY ); + Assert.assertEquals( fieldException.getFieldString(), TupleMapping.KEY.toString() ); + + } + + +}