diff --git a/kettle-plugins/druid-m/pom.xml b/kettle-plugins/druid-m/pom.xml new file mode 100644 index 00000000000..1a13e3af497 --- /dev/null +++ b/kettle-plugins/druid-m/pom.xml @@ -0,0 +1,76 @@ + + + 4.0.0 + + pentaho + pentaho-big-data-kettle-plugins + 8.0-SNAPSHOT + + pentaho-big-data-kettle-plugins-druid + 8.0-SNAPSHOT + bundle + Pentaho Community Edition Project: ${project.artifactId} + a Pentaho open source project + http://www.pentaho.com + + + pentaho-kettle + kettle-core + ${dependency.kettle.revision} + provided + + + pentaho-kettle + kettle-engine + ${dependency.kettle.revision} + provided + + + pentaho-kettle + kettle-ui-swt + ${dependency.kettle.revision} + provided + + + pentaho-kettle + kettle-engine + ${dependency.kettle.revision} + tests + test + + + + + com.alibaba + fastjson + 1.2.38 + + + + org.apache.httpcomponents + httpclient + 4.5.3 + + + + + + + org.apache.felix + maven-bundle-plugin + ${dependency.maven-bundle-plugin.version} + true + + + ${project.artifactId} + + org.eclipse.swt*;resolution:=optional,org.pentaho.di.ui.xul*;resolution:=optional,org.pentaho.ui.xul*;resolution:=optional,org.pentaho.di.osgi,org.pentaho.di.core.plugins,* + + + + + + + \ No newline at end of file diff --git a/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStep.java b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStep.java new file mode 100644 index 00000000000..8cb564c5658 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStep.java @@ -0,0 +1,301 @@ +package org.pentaho.big.data.kettle.plugins.druid.input.output.output; + +import org.apache.http.HttpEntity; +import org.apache.http.ParseException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.row.RowMeta; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.*; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.text.ParsePosition; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @Title: 步骤�? + * @Package plugin.template + * @Description: TODO(插件导出) + * @author http://www.Teld.cn + * @date 2010-8-8 下午05:10:26 + * @version V1.0 + */ + +public class DruidOutputStep extends BaseStep implements StepInterface { + + private DruidOutputStepData data; + private DruidOutputStepMeta meta; + + public DruidOutputStep(StepMeta s, StepDataInterface stepDataInterface, int c, TransMeta t, Trans dis) { + super(s, stepDataInterface, c, t, dis); + + } + + public void dispose(StepMetaInterface smi, StepDataInterface sdi) { + meta = (DruidOutputStepMeta) smi; + data = (DruidOutputStepData) sdi; + + super.dispose(smi, sdi); + } + + public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { + meta = (DruidOutputStepMeta) smi; + data = (DruidOutputStepData) sdi; + String URL = meta.getOutputField(); + String table = meta.getOutputvalue(); + String stime = meta.getOutputSTime(); + String etime = meta.getOutputETime(); + String utcif = Integer.valueOf(meta.getOutputUtcOf())*60+""; + + String gojson = "{\n" + " \"queryType\": \"select\",\n" + " \"dataSource\": \"HATest\",\n" + + " \"granularity\": \"all\",\n" + " \"intervals\": [\n" + " \"2017-05-01/2017-05-02\"\n" + + " ],\n" + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":1000000000} \n" + "}"; + JSONObject jsStr = (JSONObject) JSONObject.parse(gojson); // 将字符串{“id”:1} + jsStr.put("dataSource", table); + jsStr.put("intervals", ortime(stime) + "/" + ortime(etime)); + String getTime = jsStr.get("intervals").toString() + .replace("\"", "").split("/")[0]; + String UTF_8 = getCutTime(getTime, utcif); + + String endTime = jsStr.get("intervals").toString() + .replace("\"", "").split("/")[1]; + + String end_UTF_8 = getCutTime(endTime, utcif); + if (first) { + first = false; + // data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone(); + // //只能在getrow之后再用 ming + data.outputRowMeta = new RowMeta(); + meta.getFields(data.outputRowMeta, getStepname(), null, null, this); + logBasic("template step initialized successfully"); + } + Ontime(UTF_8, end_UTF_8, jsStr.toJSONString(), URL, data.outputRowMeta); + setOutputDone(); + return false; + } + public String ortime(String stime) { + String year = split_string(stime)[2]; + String day = split_string(stime)[1]; + String Month = split_string(stime)[0]; + stime = year + "-" + Month + "-" + day+"T00:00:00"; + return stime; + + } + public String[] split_string(String stime) { + String[] sstime = stime.split("/"); + return sstime; + + } + public boolean init(StepMetaInterface smi, StepDataInterface sdi) { + meta = (DruidOutputStepMeta) smi; + data = (DruidOutputStepData) sdi; + return super.init(smi, sdi); + + } + + public void stop() { + dispose(meta, data); + logBasic("Finished, processing " + getLinesRead() + " rows"); + markStop(); + } + + // Run is were the action happens! + public void run() { + logBasic("Starting to run..."); + try { + processRow(meta, data); + } catch (Exception e) { + logError("Unexpected error : " + e.toString()); + logError(Const.getStackTracker(e)); + setErrors(1); + stopAll(); + } finally { + dispose(meta, data); + logBasic("Finished, processing " + getLinesRead() + " rows"); + markStop(); + } + } + + public static Date strToDateLong(String strDate) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + ParsePosition pos = new ParsePosition(0); + Date strtodate = formatter.parse(strDate, pos); + return strtodate; + } + + public static String getPreTime(String sj1, String jj) { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + String mydate1 = ""; + try { + Date date1 = format.parse(sj1); + long Time = (date1.getTime() / 1000) + Integer.parseInt(jj) * 60; + date1.setTime(Time * 1000); + mydate1 = format.format(date1); + } catch (Exception e) { + } + return mydate1; + } + + public static String getCutTime(String sj1, String jj) { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + String mydate1 = ""; + try { + Date date1 = format.parse(sj1); + long Time = (date1.getTime() / 1000) - Integer.parseInt(jj) * 60; + date1.setTime(Time * 1000); + mydate1 = format.format(date1); + } catch (Exception e) { + } + return mydate1; + } + + public static String doHttpPost(String xmlInfo, String URL) { + System.out.println("发起的数据:" + xmlInfo); + byte[] xmlData = xmlInfo.getBytes(); + final String CONTENT_TYPE_TEXT_JSON = "application/json"; + CloseableHttpClient client = HttpClients.createDefault(); + + HttpPost httpPost = new HttpPost(URL); + httpPost.setHeader("Content-Type", "application/json;charset=UTF-8"); + + StringEntity se = null; + try { + se = new StringEntity(xmlInfo); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + se.setContentType(CONTENT_TYPE_TEXT_JSON); + + httpPost.setEntity(se); + + CloseableHttpResponse response2 = null; + + try { + response2 = client.execute(httpPost); + } catch (ClientProtocolException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + HttpEntity entity2 = null; + entity2 = response2.getEntity(); + String s2 = null; + try { + s2 = EntityUtils.toString(entity2, "UTF-8"); + } catch (ParseException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return s2; + } + + public void Ontime(String UTF_8, String end_UTF_8, String jsonc, String URL, RowMetaInterface outputRowMeta) { + int maxThread = Thread.activeCount() + 4; + while (true) { + if (strToDateLong(UTF_8).getTime() < strToDateLong(end_UTF_8).getTime()) { + UTF_8 = getPreTime(UTF_8, "1"); + String UTF_8_1 = getCutTime(UTF_8, "1"); + + String query_time = "{" + "\"" + "intervals" + "\"" + ":" + "[" + "\"" + UTF_8_1 + "/" + UTF_8 + "\"" + + "]" + "}"; + JSONObject test1 = (JSONObject) JSON.parse(query_time); + JSONObject jsStr = (JSONObject) JSONObject.parse(jsonc); + jsStr.put("intervals", test1.get("intervals")); + + try { + while (Thread.activeCount() > maxThread) { + Thread.sleep(200); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + AsyncPost_BatchInputSize(JSON.toJSONString(jsStr), URL); + } catch (Exception e) { + e.printStackTrace(); + } + } else { + if (Thread.activeCount() <= (maxThread - 4)) { + try { + Thread thread = new Thread(); + if (thread.isAlive()) { + Thread.sleep(1000); + } + ; + + } catch (InterruptedException e) { + e.printStackTrace(); + } + break; + } + + } + } + + } + + public void AsyncPost_BatchInputSize(String c_json, String c_url) { + Runnable task = new Runnable() { + @Override + public void run() { + try { + Stored_procedure(c_json, c_url); + } catch (Exception e) { + e.printStackTrace(); + try { + Thread.sleep(50); + System.out.println("出现问题,已经等待了0.05秒钟,尝试重新请求" + c_json); + Stored_procedure(c_json, c_url); + } catch (KettleStepException | InterruptedException e1) { + e1.printStackTrace(); + } + } + } + }; + Thread thread = new Thread(task); + thread.start(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + public void Stored_procedure(String c_json, String c_url) throws KettleStepException { + + Object[] outputRow = new Object[meta.getOutputList().length]; + + String postResult = doHttpPost(c_json, c_url); + JSONArray test_json = (JSONArray) JSON.parse(postResult); + JSONArray jieg = test_json.getJSONObject(0).getJSONObject("result").getJSONArray("events"); + Object[] temp = jieg.getJSONObject(0).getJSONObject("event").keySet().toArray(); + for (int i = 0; i < jieg.size(); i++) { + for (int j = 0; j < outputRow.length; j++) { + outputRow[j] = jieg.getJSONObject(i).getJSONObject("event").get(temp[j]); + } + data.outputRowMeta = new RowMeta(); + meta.getFields(data.outputRowMeta, getStepname(), null, null, this); + putRow(data.outputRowMeta, outputRow); + + } + } + +} diff --git a/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepData.java b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepData.java new file mode 100644 index 00000000000..bbb755a4e08 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepData.java @@ -0,0 +1,25 @@ +package org.pentaho.big.data.kettle.plugins.druid.input.output.output; + +/** + * @Title: 数据类 + * @Package plugin.template + * @Description: TODO(用一句话描述该文件做什么) + * @author http://www.ahuoo.com + * @date 2010-8-8 下午05:10:26 + * @version V1.0 + */ + +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.trans.step.BaseStepData; +import org.pentaho.di.trans.step.StepDataInterface; + +public class DruidOutputStepData extends BaseStepData implements StepDataInterface { + + public RowMetaInterface outputRowMeta; + + public DruidOutputStepData() + { + super(); + } +} + diff --git a/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepDialog.java b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepDialog.java new file mode 100644 index 00000000000..2a57a615928 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepDialog.java @@ -0,0 +1,397 @@ +package org.pentaho.big.data.kettle.plugins.druid.input.output.output; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.http.HttpEntity; +import org.apache.http.ParseException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.eclipse.swt.SWT; +import org.eclipse.swt.events.SelectionAdapter; +import org.eclipse.swt.events.SelectionEvent; +import org.eclipse.swt.events.SelectionListener; +import org.eclipse.swt.events.ShellAdapter; +import org.eclipse.swt.events.ShellEvent; +import org.eclipse.swt.widgets.Button; +import org.eclipse.swt.widgets.DateTime; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Event; +import org.eclipse.swt.widgets.Label; +import org.eclipse.swt.widgets.Listener; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.Table; +import org.eclipse.swt.widgets.TableColumn; +import org.eclipse.swt.widgets.TableItem; +import org.eclipse.swt.widgets.Text; +import org.pentaho.di.i18n.BaseMessages; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.ui.trans.step.BaseStepDialog; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import org.pentaho.di.trans.step.BaseStepMeta; +import org.pentaho.di.trans.step.StepDialogInterface; + +/** + * @Title: 对话框类 + * @Package plugin.template + * @Description: TODO(用一句话描述该文件做什么) + * @author http://www.ahuoo.com + * @date 2010-8-8 下午05:10:26 + * @version V1.0 + */ + +public class DruidOutputStepDialog extends BaseStepDialog implements StepDialogInterface { + + private static Class PKG = DruidOutputStepMeta.class; // for i18n purposes + + private DruidOutputStepMeta input; + + // output field name + private Text text; + private Label lbljson; + private Table table; + private Text UTC_Offset; + private Text Druid_Table; + private DateTime dateTime; + private DateTime dateTime_1; + + public DruidOutputStepDialog(Shell parent, Object in, TransMeta transMeta, String sname) { + super(parent, (BaseStepMeta) in, transMeta, sname); + input = (DruidOutputStepMeta) in; + } + + public String open() { + Display display = Display.getDefault(); + + shell = new Shell(); + // head title + shell.setSize(425, 518); + shell.setText("SWT Application"); + + // 输入url的文字 + Label lblNewLabel = new Label(shell, SWT.NONE); + lblNewLabel.setBounds(10, 10, 61, 17); + lblNewLabel.setText("http Druid"); + props.setLook(lblNewLabel); + + // url input + text = new Text(shell, SWT.BORDER); + text.setBounds(77, 7, 320, 23); + props.setLook(text); + + // 输入的文字 + lbljson = new Label(shell, SWT.NONE); + lbljson.setBounds(10, 47, 61, 17); + lbljson.setText("Druid_Table"); + props.setLook(lbljson); + + // table inout + Druid_Table = new Text(shell, SWT.BORDER); + Druid_Table.setBounds(77, 44, 159, 23); + props.setLook(Druid_Table); + + // time space + dateTime = new DateTime(shell, SWT.BORDER); + dateTime.setBounds(77, 87, 88, 24); + props.setLook(dateTime); + + // time space + dateTime_1 = new DateTime(shell, SWT.BORDER); + dateTime_1.setBounds(269, 87, 88, 24); + props.setLook(dateTime_1); + + // start time + Label lblStarttime = new Label(shell, SWT.NONE); + lblStarttime.setBounds(10, 94, 61, 17); + lblStarttime.setText("Start_Time"); + + // end time + Label lblEndtime = new Label(shell, SWT.NONE); + lblEndtime.setBounds(210, 94, 52, 17); + lblEndtime.setText("End_time"); + + // 偏移时区 + Label lblUtc = new Label(shell, SWT.NONE); + lblUtc.setBounds(10, 132, 76, 17); + lblUtc.setText("UTC_Offset"); + + // 偏移输入框 + UTC_Offset = new Text(shell, SWT.BORDER); + UTC_Offset.setBounds(92, 129, 73, 23); + props.setLook(UTC_Offset); + + // 获取元数据的按钮 + Button btnTest = new Button(shell, SWT.NONE); + btnTest.setBounds(10, 161, 61, 27); + btnTest.setText("G_metaD"); + // props.setLook(btnTest); + + // 显示表格的文字 + Label label = new Label(shell, SWT.NONE); + label.setBounds(10, 224, 61, 17); + label.setText("view data"); + // props.setLook(label); + // 多行文本输入框 + + // 显示的表格 + table = new Table(shell, SWT.MULTI | SWT.FULL_SELECTION | SWT.BORDER); + table.setBounds(77, 158, 320, 249); + table.setHeaderVisible(true); + table.setLinesVisible(true); + + // 每一列 + TableColumn idColumn = new TableColumn(table, SWT.LEFT); + idColumn.setText("Id"); + idColumn.setWidth(43); + TableColumn usernameColumn = new TableColumn(table, SWT.LEFT); + usernameColumn.setText("Key"); + usernameColumn.setWidth(63); + TableColumn passwordColumn = new TableColumn(table, SWT.LEFT); + passwordColumn.setText("Value"); + passwordColumn.setWidth(207); + + // props.setLook(table); + // 尾部信息 + + Label lblNewLabel_1 = new Label(shell, SWT.NONE); + lblNewLabel_1.setBounds(124, 458, 179, 17); + lblNewLabel_1.setText("BigData-Druid-Export"); + + // OK and cancel buttons + wOK = new Button(shell, SWT.PUSH); + wOK.setText(BaseMessages.getString(PKG, "System.Button.OK")); + wOK.setBounds(100, 423, 80, 27); + props.setLook(wOK); + wCancel = new Button(shell, SWT.PUSH); + wCancel.setText(BaseMessages.getString(PKG, "System.Button.Cancel")); + wCancel.setBounds(223, 423, 80, 27); + props.setLook(wCancel); + + // test + + btnTest.addSelectionListener(new SelectionListener() { + @Override + public void widgetSelected(SelectionEvent arg0) { + + String URL = text.getText(); + String xmlInfo = gojson(Druid_Table.getText(),replace_string(dateTime.toString()), replace_string(dateTime_1.toString())).toJSONString(); + JSONObject jsStr = (JSONObject) JSONObject.parse(xmlInfo); + JSONObject pagingSpec_content = jsStr.getJSONObject("pagingSpec"); + pagingSpec_content.put("threshold", "1"); + jsStr.put("pagingSpec", pagingSpec_content); + String selectM = doHttpPost(JSON.toJSONString(jsStr), URL); + JSONArray test_json = (JSONArray) JSON.parse(selectM); + JSONObject jieg = test_json.getJSONObject(0).getJSONObject("result").getJSONArray("events") + .getJSONObject(0).getJSONObject("event"); + table.removeAll(); + for (int i = 0; i < jieg.keySet().size(); i++) { + TableItem item = new TableItem(table, SWT.LEFT); + String[] valueStrings = { "" + i, (String) jieg.keySet().toArray()[i], + jieg.getString((String) jieg.keySet().toArray()[i]) }; + + item.setText(valueStrings); + } + } + + @Override + public void widgetDefaultSelected(SelectionEvent arg0) { + + } + }); + + // Add listeners + lsCancel = new Listener() { + public void handleEvent(Event e) { + cancel(); + } + }; + lsOK = new Listener() { + public void handleEvent(Event e) { + ok(); + } + }; + + wCancel.addListener(SWT.Selection, lsCancel); + wOK.addListener(SWT.Selection, lsOK); + + lsDef = new SelectionAdapter() { + public void widgetDefaultSelected(SelectionEvent e) { + ok(); + } + }; + + // 设置监听后就保存 + text.addSelectionListener(lsDef); + Druid_Table.addSelectionListener(lsDef); + dateTime.addSelectionListener(lsDef); + dateTime_1.addSelectionListener(lsDef); + UTC_Offset.addSelectionListener(lsDef); + + // Detect X or ALT-F4 or something that kills this window... + shell.addShellListener(new ShellAdapter() { + public void shellClosed(ShellEvent e) { + cancel(); + } + }); + + // Set the shell size, based upon previous time... + // setSize(); + + getData(); + input.setChanged(changed); + + shell.open(); + shell.layout(); + while (!shell.isDisposed()) { + if (!display.readAndDispatch()) + display.sleep(); + } + return stepname; + } + + // Read data and place it in the dialog + public void getData() { + // wStepname.selectAll(); + text.setText(input.getOutputField()); + Druid_Table.setText(input.getOutputvalue()); + dateTime.setYear(Integer.valueOf(split_string(input.getOutputSTime())[2])); + dateTime.setMonth(Integer.valueOf(split_string(input.getOutputSTime())[0]) - 1); + dateTime.setDay(Integer.valueOf(split_string(input.getOutputSTime())[1])); + + dateTime_1.setYear(Integer.valueOf(split_string(input.getOutputETime())[2])); + dateTime_1.setMonth(Integer.valueOf(split_string(input.getOutputETime())[0]) - 1); + dateTime_1.setDay(Integer.valueOf(split_string(input.getOutputETime())[1])); + + UTC_Offset.setText(input.getOutputUtcOf()); + if (input.getOutputList().length > 2) { + + for (int row = 0; row < input.getOutputList().length; row++) { + TableItem item = new TableItem(table, SWT.LEFT); + String[] valueStrings = { input.getOutputList()[row][0].toString(), + input.getOutputList()[row][1].toString(), input.getOutputList()[row][2].toString() }; + item.setText(valueStrings); + } + ; + } else { + System.out.println(input.getOutputList().length); + + } + } + + private void cancel() { + stepname = null; + input.setChanged(changed); + dispose(); + } + + // let the plugin know about the entered data + private void ok() { + // stepname = wStepname.getText(); // return value + input.setOutputField(text.getText()); + input.setOutputvalue(Druid_Table.getText()); + input.setOutputSTime(replace_string(dateTime.toString())); + input.setOutputETime(replace_string(dateTime_1.toString())); + input.setOutputUtcOf(UTC_Offset.getText()); + + TableItem[] items = table.getItems(); + String outputList[][] = new String[items.length][table.getColumnCount()]; + for (int i = 0; i < items.length; i++) { + for (int j = 0; j < table.getColumnCount(); j++) + outputList[i][j] = items[i].getText(j); + } + input.setOutputList(outputList); + dispose(); + } + + public static String doHttpPost(String xmlInfo, String URL) { + System.out.println("发起的数据:" + xmlInfo); + + final String CONTENT_TYPE_TEXT_JSON = "application/json"; + CloseableHttpClient client = HttpClients.createDefault(); + + HttpPost httpPost = new HttpPost(URL); + httpPost.setHeader("Content-Type", "application/json;charset=UTF-8"); + + StringEntity se = null; + try { + se = new StringEntity(xmlInfo); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + se.setContentType(CONTENT_TYPE_TEXT_JSON); + + httpPost.setEntity(se); + + CloseableHttpResponse response2 = null; + + try { + response2 = client.execute(httpPost); + } catch (ClientProtocolException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + HttpEntity entity2 = null; + entity2 = response2.getEntity(); + String s2 = null; + try { + s2 = EntityUtils.toString(entity2, "UTF-8"); + } catch (ParseException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return s2; + } + + public String replace_string(String stime) { + stime = stime.replace("DateTime {", "").replace("}", ""); + return stime; + + } + + public String ortime(String stime) { + String year = split_string(stime)[2]; + String day = split_string(stime)[1]; + String Month = split_string(stime)[0]; + stime = year + "-" + Month + "-" + day; + return stime; + + } + +// //限制查询性能 只是为了前端不崩溃 并且可以显示元数据 跟流程无关 +// public String or1time(String stime) { +// String year = split_string(stime)[2]; +// String day = (Integer.valueOf(split_string(stime)[1])+1)+""; +// String Month = split_string(stime)[0]; +// stime = year + "-" + Month + "-" + day; +// return stime; +// +// } + + public String[] split_string(String stime) { + String[] sstime = stime.split("/"); + return sstime; + + } + + public JSONObject gojson(String tables, String stime, String etime) { + String gojson = "{\n" + " \"queryType\": \"select\",\n" + " \"dataSource\": \"HATest\",\n" + + " \"granularity\": \"all\",\n" + " \"intervals\": [\n" + " \"2017-05-01/2017-05-02\"\n" + + " ],\n" + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":1} \n" + "}"; + JSONObject jsStr = (JSONObject) JSONObject.parse(gojson); // 将字符串{“id”:1} + jsStr.put("dataSource", Druid_Table.getText()); + jsStr.put("intervals", ortime(stime) + "/" + ortime(etime)); + return jsStr; + } + +} diff --git a/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepMeta.java b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepMeta.java new file mode 100644 index 00000000000..04f29ee06a8 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/java/org/pentaho/big/data/kettle/plugins/druid/input/output/output/DruidOutputStepMeta.java @@ -0,0 +1,262 @@ +package org.pentaho.big.data.kettle.plugins.druid.input.output.output; + +import java.util.List; +import java.util.Map; + +import org.eclipse.swt.widgets.Shell; +import org.pentaho.di.core.*; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.exception.*; +import org.pentaho.di.core.row.*; +import org.pentaho.di.core.variables.VariableSpace; +import org.pentaho.di.core.xml.XMLHandler; +import org.pentaho.di.i18n.BaseMessages; +import org.pentaho.di.repository.*; +import org.pentaho.di.trans.*; +import org.pentaho.di.trans.step.*; +import org.w3c.dom.Node; +import org.pentaho.di.core.annotations.Step; + +/** + * @Title: 元数据类 + * @Package plugin.template + * @Description: TODO(用一句话描述该文件做什么) + * @author http://www.ahuoo.com + * @date 2010-8-8 下午05:10:26 + * @version V1.0 + */ +@Step(id = "DruidOutput", image = "druido.png", name = "DruidOutput.Name", description = "DruidOutput.Description", + categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.BigData", + documentationUrl = "http://wiki.pentaho.com/display/EAI/Druid+Output", + i18nPackageName = "org.pentaho.di.trans.steps.druidoutput") +public class DruidOutputStepMeta extends BaseStepMeta implements StepMetaInterface { + + private static Class PKG = DruidOutputStepMeta.class; // for i18n purposes + private String outputField = "url"; + private String outputValue = "table"; + private String outputSTime = "str"; + private String outputETime = "estr"; + private String outputUtcOf = "0"; + private String outputList[][] = new String[1][3]; + + // private String[] outputList = { "" + 1, (String) "test", (String) "test" }; + + public String getOutputField() { + return outputField; + } + + public String getOutputUtcOf() { + return outputUtcOf; + } + + public void setOutputUtcOf(String outputUtcOf) { + this.outputUtcOf = outputUtcOf; + } + + public String getOutputValue() { + return outputValue; + } + + public void setOutputValue(String outputValue) { + this.outputValue = outputValue; + } + + public String getOutputSTime() { + return outputSTime; + } + + public void setOutputSTime(String outputSTime) { + this.outputSTime = outputSTime; + } + + public String getOutputETime() { + return outputETime; + } + + public void setOutputETime(String outputETime) { + this.outputETime = outputETime; + } + + public void setOutputField(String outputField) { + this.outputField = outputField; + } + + public String getOutputvalue() { + return outputValue; + } + + public void setOutputvalue(String outputvalue) { + this.outputValue = outputvalue; + } + + public String[][] getOutputList() { + return outputList; + } + + public void setOutputList(String[][] outputList) { + this.outputList = outputList; + } + + public String getXML() throws KettleValueException { + String retval = ""; + retval += " " + getOutputField() + "" + Const.CR; + retval += " " + getOutputvalue() + "" + Const.CR; + retval += " " + getOutputSTime() + "" + Const.CR; + retval += " " + getOutputETime() + "" + Const.CR; + retval += " " + getOutputUtcOf() + "" + Const.CR; + int row = getRow(getOutputList()); + int col = getCol(getOutputList()); + String str = convertToString(getOutputList(), row, col); + retval += " " + row + "" + Const.CR; + retval += " " + col + "" + Const.CR; + retval += " " + str + "" + Const.CR; + + return retval; + } + + public void getFields(RowMetaInterface r, String origin, RowMetaInterface[] info, StepMeta nextStep, + VariableSpace space) { + + ValueMetaInterface v0 = null; + + for (int i = 0; i < outputList.length; i++) { + v0 = new ValueMeta(); + v0.setName(outputList[i][1]); + v0.setType(ValueMeta.TYPE_STRING); + v0.setTrimType(ValueMeta.TRIM_TYPE_BOTH); + v0.setOrigin(origin); + r.addValueMeta(v0); + } + } + + public Object clone() { + Object retval = super.clone(); + return retval; + } + + public void loadXML(Node stepnode, List databases, Map counters) + throws KettleXMLException { + + try { + setOutputField(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputfield"))); + setOutputvalue(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputvalue"))); + setOutputSTime(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputstime"))); + setOutputETime(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputetime"))); + setOutputUtcOf(XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outpututcof"))); + + String temp = XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputlist")); + String row = XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputrow")); + String col = XMLHandler.getNodeValue(XMLHandler.getSubNode(stepnode, "outputcol")); + String[][] arrayConvert = new String[Integer.parseInt(row)][Integer.parseInt(col)]; + arrayConvert = convertToArray(temp, Integer.parseInt(row), Integer.parseInt(col)); + setOutputList(arrayConvert); + } catch (Exception e) { + throw new KettleXMLException("Template Plugin Unable to read step info from XML node", e); + } + + } + + public void setDefault() { + outputField = "http://xxxx:8082/druid/v2/?pretty"; + outputValue = "HATest"; + outputList[0][0] = "" + 1; + outputList[0][1] = "" + "test"; + outputList[0][2] = "" + "test1"; + + outputSTime = "9/29/2017"; + outputETime = "9/29/2017"; + outputUtcOf = "0"; + } + + public void check(List remarks, TransMeta transmeta, StepMeta stepMeta, RowMetaInterface prev, + String input[], String output[], RowMetaInterface info) { + CheckResult cr; + + // See if we have input streams leading to this step! + if (input.length > 0) { + cr = new CheckResult(CheckResult.TYPE_RESULT_OK, "Step is receiving info from other steps.", stepMeta); + remarks.add(cr); + } else { + cr = new CheckResult(CheckResult.TYPE_RESULT_ERROR, "No input received from other steps!", stepMeta); + remarks.add(cr); + } + + } + + public StepDialogInterface getDialog(Shell shell, StepMetaInterface meta, TransMeta transMeta, String name) { + return new DruidOutputStepDialog(shell, meta, transMeta, name); + } + + public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, + Trans disp) { + return new DruidOutputStep(stepMeta, stepDataInterface, cnr, transMeta, disp); + } + + public StepDataInterface getStepData() { + return new DruidOutputStepData(); + } + + public void readRep(Repository rep, ObjectId id_step, List databases, Map counters) + throws KettleException { + try { + outputField = rep.getStepAttributeString(id_step, "outputfield"); //$NON-NLS-1$ + outputValue = rep.getStepAttributeString(id_step, "outputvalue"); //$NON-NLS-1$ + } catch (Exception e) { + throw new KettleException( + BaseMessages.getString(PKG, "TemplateStep.Exception.UnexpectedErrorInReadingStepInfo"), e); + } + } + + public void saveRep(Repository rep, ObjectId id_transformation, ObjectId id_step) throws KettleException { + try { + rep.saveStepAttribute(id_transformation, id_step, "outputfield", outputField); //$NON-NLS-1$ + rep.saveStepAttribute(id_transformation, id_step, "outputvalue", outputValue); //$NON-NLS-1$ + } catch (Exception e) { + throw new KettleException( + BaseMessages.getString(PKG, "TemplateStep.Exception.UnableToSaveStepInfoToRepository") + id_step, + e); + } + } + + // -------序列化组件----------- + public int getRow(String[][] array) { + int row = 0; + if (array != null) { + row = array.length; // 行 + } + return row; + } + + public int getCol(String[][] array) { + int col = 0; + if (array != null) { + col = array[0].length; // 列 + } + return col; + } + + public String convertToString(String[][] array, int row, int col) { + String str = ""; + String tempStr = null; + for (int i = 0; i < row; i++) { + for (int j = 0; j < col; j++) { + tempStr = String.valueOf(array[i][j]); + str = str + tempStr + ","; + } + } + return str; + } + + public String[][] convertToArray(String str, int row, int col) { + String[][] arrayConvert = new String[row][col]; + int count = 0; + String[] strArray = str.split(","); + for (int i = 0; i < row; i++) { + for (int j = 0; j < col; j++) { + arrayConvert[i][j] = (String) strArray[count]; + ++count; + } + } + return arrayConvert; + } +} diff --git a/kettle-plugins/druid-m/src/main/resources/META-INF/MANIFEST.MF b/kettle-plugins/druid-m/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 00000000000..b490b0e8c65 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/resources/META-INF/MANIFEST.MF @@ -0,0 +1,4 @@ +Manifest-Version: 1.0 +Class-Path: httpclient-4.5.3.jar httpcore-4.4.6.jar fastjson-1.2.38.ja + r commons-codec-1.10.jar commons-logging-1.1.3.jar + diff --git a/kettle-plugins/druid-m/src/main/resources/druid.png b/kettle-plugins/druid-m/src/main/resources/druid.png new file mode 100644 index 00000000000..981005e5131 Binary files /dev/null and b/kettle-plugins/druid-m/src/main/resources/druid.png differ diff --git a/kettle-plugins/druid-m/src/main/resources/org.pentaho.big.data.kettle.plugins.druid/input.messages/input.messages/messages_en_US.properties b/kettle-plugins/druid-m/src/main/resources/org.pentaho.big.data.kettle.plugins.druid/input.messages/input.messages/messages_en_US.properties new file mode 100644 index 00000000000..8c2edd2a139 --- /dev/null +++ b/kettle-plugins/druid-m/src/main/resources/org.pentaho.big.data.kettle.plugins.druid/input.messages/input.messages/messages_en_US.properties @@ -0,0 +1,2 @@ +Template.Shell.Title=Template step +Template.FieldName.Label=Output field name