From 07bde55752a5d2dd1f07a5dfe44a1a47ff484a70 Mon Sep 17 00:00:00 2001 From: liuwenli Date: Thu, 6 Dec 2018 15:49:24 +0800 Subject: [PATCH] update flink flow submit name --- docs/deployment.md | 14 +++++++++++--- rider/conf/wormhole.sql | 12 ++++++------ .../main/scala/edp/rider/rest/util/FlowUtils.scala | 4 ++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/docs/deployment.md b/docs/deployment.md index d206d93a1..80cf63f06 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -208,6 +208,16 @@ maintenance = { 如果使用checkpoint则需要配置flink.checkpoint.enable=true,另外还可以设置checkpoint的间隔时间和存储系统。通过flink.checkpoint.interval可设置checkpoint的间隔时间,默认为60000ms。通过flink.stateBackend可设置checkpoint的存储位置。 +#### Feedback State存储位置配置 + +wormhole在0.6版本之前的feedback state默认存储在ES中,在0.6版本之后,将支持用户根据需求在ES与MySQL中间选择合适的存储库进行数据存储。如果需要将存储位置由ES迁往MySQL,可以参照下面的步骤进行配置。通过配置monitor.database.type选择存储位置 + +`monitor.database.type="MYSQL" #存储到mysql中` + +`monitor.database.type="ES" #存储到ES中` + +当选择存储到mysql时,需要在wormhole/rider/conf/wormhole.sql新建feedback_flow_stats表,并在wormhole配置的数据库中执行该文件,从而在数据库中建立feedback_flow_stats表 + #### Wormhole集群部署 **部署说明** @@ -370,9 +380,7 @@ update udf set map_or_agg='udf'; (2)停止所有flow -在0.6.0-beta版本启动之前,停止以前版本所有的flow(包括flink和spark)。 - -启动0.6.0-beta版本之后,重启这些flow即可。 +在0.6.0-beta版本启动之前,需停止以前版本所有sparkx的flow(包括starting、running、suspending、updating状态的flow), 并记录当前stream消费到的topic offset,重启stream时,手动设定从之前记录的offset消费 #### 0.5.0-0.5.2版本升级到0.6.0版本 diff --git a/rider/conf/wormhole.sql b/rider/conf/wormhole.sql index 82cf562cf..d551283b5 100644 --- a/rider/conf/wormhole.sql +++ b/rider/conf/wormhole.sql @@ -213,7 +213,7 @@ CREATE TABLE IF NOT EXISTS `flow` ( `consumed_protocol` VARCHAR(100) NOT NULL, `sink_config` VARCHAR(5000) NOT NULL, `tran_config` LONGTEXT NULL, - `table_keys` VARCHAR(1000) NULL, + `table_keys` VARCHAR(100) NULL, `desc` VARCHAR(1000) NULL, `status` VARCHAR(200) NOT NULL, `started_time` TIMESTAMP NULL, @@ -233,7 +233,7 @@ alter table `flow` modify column `consumed_protocol` VARCHAR(100); alter table `flow` add column `parallelism` INT NULL after `sink_ns`; alter table `flow` add column `log_path` VARCHAR(2000) NULL after `stopped_time`; alter table `flow` add column `flow_name` VARCHAR(200) NOT NULL; -alter table `flow` add column `table_keys` VARCHAR(1000) NULL; +alter table `flow` add column `table_keys` VARCHAR(100) NULL; alter table `flow` add column `desc` VARCHAR(1000) NULL; CREATE TABLE IF NOT EXISTS `directive` ( @@ -341,7 +341,7 @@ CREATE TABLE IF NOT EXISTS `job` ( `source_config` VARCHAR(2000) NULL, `sink_config` VARCHAR(2000) NULL, `tran_config` VARCHAR(5000) NULL, - `table_keys` VARCHAR(1000) NULL, + `table_keys` VARCHAR(100) NULL, `desc` VARCHAR(1000) NULL, `status` VARCHAR(200) NOT NULL, `spark_appid` VARCHAR(200) NULL, @@ -369,7 +369,7 @@ alter table `job` modify column `sink_config` varchar(2000); alter table `job` add column `jvm_driver_config` VARCHAR(1000) NULL; alter table `job` add column `jvm_executor_config` VARCHAR(1000) NULL; alter table `job` add column `others_config` VARCHAR(1000) NULL; -alter table `job` add column `table_keys` VARCHAR(1000) NULL; +alter table `job` add column `table_keys` VARCHAR(100) NULL; alter table `job` add column `desc` VARCHAR(1000) NULL; @@ -379,7 +379,7 @@ CREATE TABLE IF NOT EXISTS `udf` ( `full_class_name` VARCHAR(200) NOT NULL, `jar_name` VARCHAR(200) NOT NULL, `stream_type` VARCHAR(100) NOT NULL, - `map_or_agg` VARCHAR(100) NOT NULL, + `map_or_agg` VARCHAR(10) NOT NULL, `desc` VARCHAR(200) NULL, `public` TINYINT(1) NOT NULL, `create_time` TIMESTAMP NOT NULL DEFAULT '1970-01-01 08:00:01', @@ -392,7 +392,7 @@ ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci; drop index `full_class_name_UNIQUE` on `udf`; alter table `udf` add `stream_type` VARCHAR(100) NOT NULL; -alter table `udf` add `map_or_agg` VARCHAR(100) NOT NULL; +alter table `udf` add `map_or_agg` VARCHAR(10) NOT NULL; CREATE TABLE IF NOT EXISTS `feedback_heartbeat` ( `id` BIGINT NOT NULL AUTO_INCREMENT, diff --git a/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala b/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala index a0de139b0..b117f523e 100644 --- a/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala +++ b/rider/rider-server/src/main/scala/edp/rider/rest/util/FlowUtils.scala @@ -1101,8 +1101,8 @@ object FlowUtils extends RiderLogger { } def getFlowName(flowId: Long, sourceNs: String, sinkNs: String): String = - if (RiderConfig.riderServer.clusterId != "") s"${RiderConfig.riderServer.clusterId}-$flowId-$sourceNs-$sinkNs".toLowerCase - else s"$flowId-$sourceNs-$sinkNs".toLowerCase + if (RiderConfig.riderServer.clusterId != "") s"${RiderConfig.riderServer.clusterId}-$sourceNs-$sinkNs".toLowerCase + else s"$sourceNs-$sinkNs".toLowerCase def updateUdfsByStart(flowId: Long, udfIds: Seq[Long], userId: Long): Unit = { if (udfIds.nonEmpty) {