Skip to content

Commit

Permalink
update flink flow submit name
Browse files Browse the repository at this point in the history
  • Loading branch information
liuwenli committed Dec 6, 2018
1 parent 8b1c21f commit 07bde55
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
14 changes: 11 additions & 3 deletions docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ maintenance = {

如果使用checkpoint则需要配置flink.checkpoint.enable=true,另外还可以设置checkpoint的间隔时间和存储系统。通过flink.checkpoint.interval可设置checkpoint的间隔时间,默认为60000ms。通过flink.stateBackend可设置checkpoint的存储位置。

#### Feedback State存储位置配置

wormhole在0.6版本之前的feedback state默认存储在ES中,在0.6版本之后,将支持用户根据需求在ES与MySQL中间选择合适的存储库进行数据存储。如果需要将存储位置由ES迁往MySQL,可以参照下面的步骤进行配置。通过配置monitor.database.type选择存储位置

`monitor.database.type="MYSQL" #存储到mysql中`

`monitor.database.type="ES" #存储到ES中`

当选择存储到mysql时,需要在wormhole/rider/conf/wormhole.sql新建feedback_flow_stats表,并在wormhole配置的数据库中执行该文件,从而在数据库中建立feedback_flow_stats表

#### Wormhole集群部署

**部署说明**
Expand Down Expand Up @@ -370,9 +380,7 @@ update udf set map_or_agg='udf';

(2)停止所有flow

在0.6.0-beta版本启动之前,停止以前版本所有的flow(包括flink和spark)。

启动0.6.0-beta版本之后,重启这些flow即可。
在0.6.0-beta版本启动之前,需停止以前版本所有sparkx的flow(包括starting、running、suspending、updating状态的flow), 并记录当前stream消费到的topic offset,重启stream时,手动设定从之前记录的offset消费

#### 0.5.0-0.5.2版本升级到0.6.0版本

Expand Down
12 changes: 6 additions & 6 deletions rider/conf/wormhole.sql
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ CREATE TABLE IF NOT EXISTS `flow` (
`consumed_protocol` VARCHAR(100) NOT NULL,
`sink_config` VARCHAR(5000) NOT NULL,
`tran_config` LONGTEXT NULL,
`table_keys` VARCHAR(1000) NULL,
`table_keys` VARCHAR(100) NULL,
`desc` VARCHAR(1000) NULL,
`status` VARCHAR(200) NOT NULL,
`started_time` TIMESTAMP NULL,
Expand All @@ -233,7 +233,7 @@ alter table `flow` modify column `consumed_protocol` VARCHAR(100);
alter table `flow` add column `parallelism` INT NULL after `sink_ns`;
alter table `flow` add column `log_path` VARCHAR(2000) NULL after `stopped_time`;
alter table `flow` add column `flow_name` VARCHAR(200) NOT NULL;
alter table `flow` add column `table_keys` VARCHAR(1000) NULL;
alter table `flow` add column `table_keys` VARCHAR(100) NULL;
alter table `flow` add column `desc` VARCHAR(1000) NULL;

CREATE TABLE IF NOT EXISTS `directive` (
Expand Down Expand Up @@ -341,7 +341,7 @@ CREATE TABLE IF NOT EXISTS `job` (
`source_config` VARCHAR(2000) NULL,
`sink_config` VARCHAR(2000) NULL,
`tran_config` VARCHAR(5000) NULL,
`table_keys` VARCHAR(1000) NULL,
`table_keys` VARCHAR(100) NULL,
`desc` VARCHAR(1000) NULL,
`status` VARCHAR(200) NOT NULL,
`spark_appid` VARCHAR(200) NULL,
Expand Down Expand Up @@ -369,7 +369,7 @@ alter table `job` modify column `sink_config` varchar(2000);
alter table `job` add column `jvm_driver_config` VARCHAR(1000) NULL;
alter table `job` add column `jvm_executor_config` VARCHAR(1000) NULL;
alter table `job` add column `others_config` VARCHAR(1000) NULL;
alter table `job` add column `table_keys` VARCHAR(1000) NULL;
alter table `job` add column `table_keys` VARCHAR(100) NULL;
alter table `job` add column `desc` VARCHAR(1000) NULL;


Expand All @@ -379,7 +379,7 @@ CREATE TABLE IF NOT EXISTS `udf` (
`full_class_name` VARCHAR(200) NOT NULL,
`jar_name` VARCHAR(200) NOT NULL,
`stream_type` VARCHAR(100) NOT NULL,
`map_or_agg` VARCHAR(100) NOT NULL,
`map_or_agg` VARCHAR(10) NOT NULL,
`desc` VARCHAR(200) NULL,
`public` TINYINT(1) NOT NULL,
`create_time` TIMESTAMP NOT NULL DEFAULT '1970-01-01 08:00:01',
Expand All @@ -392,7 +392,7 @@ ENGINE = InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci;

drop index `full_class_name_UNIQUE` on `udf`;
alter table `udf` add `stream_type` VARCHAR(100) NOT NULL;
alter table `udf` add `map_or_agg` VARCHAR(100) NOT NULL;
alter table `udf` add `map_or_agg` VARCHAR(10) NOT NULL;

CREATE TABLE IF NOT EXISTS `feedback_heartbeat` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,8 +1101,8 @@ object FlowUtils extends RiderLogger {
}

def getFlowName(flowId: Long, sourceNs: String, sinkNs: String): String =
if (RiderConfig.riderServer.clusterId != "") s"${RiderConfig.riderServer.clusterId}-$flowId-$sourceNs-$sinkNs".toLowerCase
else s"$flowId-$sourceNs-$sinkNs".toLowerCase
if (RiderConfig.riderServer.clusterId != "") s"${RiderConfig.riderServer.clusterId}-$sourceNs-$sinkNs".toLowerCase
else s"$sourceNs-$sinkNs".toLowerCase

def updateUdfsByStart(flowId: Long, udfIds: Seq[Long], userId: Long): Unit = {
if (udfIds.nonEmpty) {
Expand Down

0 comments on commit 07bde55

Please sign in to comment.