Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【有奖征文】DSS在蔚来汽车的实践 #26

Open
sparklezzz opened this issue Nov 21, 2020 · 1 comment
Open

【有奖征文】DSS在蔚来汽车的实践 #26

sparklezzz opened this issue Nov 21, 2020 · 1 comment

Comments

@sparklezzz
Copy link

sparklezzz commented Nov 21, 2020

1. 背景

蔚来汽车的数据平台Insight,经过三年多的发展,已经有了比较完善的各个组件和工具,可以完成数据采集、处理、分析、生成中间表,生成指标,形成报表,元信息管理、权限管理(Crystal)等等。很多人会疑问,为什么需要单独做一个一站式数据应用交互管理平台?

原因有以下几点:

  1. 能够提高用户的开发效率。目前数据平台的用户在进行业务开发、数据查询,要分别进入不同的组件进行操作,使用感觉上是比较割裂的,比如:查询元数据的时候,使用自研的Crystal工具;做数据分析时候,使用Zeppelin交互式查询平台;进行任务调度的时候,使用Airflow或者Oozie上去配置;下载HDFS上的数据的时候,需要使用hue去下载。因此我们认为,如果能有一个统一的一站式数据开发、分析、可视化的平台,可以降低用户使用Insight数据平台的成本,并起到一定的在内部推广Insight的作用。

  2. 能够将用户的所有脚本都管控起来,之后就可以通过代码扫描来监控代码质量,落实业务规范,以及合理化资源的使用情况等,甚至可以通过对脚本进行限制或者改写,来避免一些不合理的集群使用方式。

在今年下半年,经过一段时间的一站式数据应用交互管理平台的技术调研选型,我们最终确定以微众银行开源的DataSphere Studio作为Insight的一站式数据应用交互管理平台,并根据公司业务需要进行一些定制开发。

下图展示了目前Insight数据平台架构和DSS在其中的定位:

image2020-11-20_20-7-25

2. DSS与Airflow的结合

由于公司已经较大范围了使用了Airflow作为数据平台任务调度工具,但是DataSphere Studio目前只支持Azkaban的调度工具,尚未适配支持Airflow。

因次我们进行了一些二次开发,将DSS和Airflow初步结合了起来。修改的地方仅在DSS一个repo中,已经向社区提交了Pull request: https://github.com/WeBankFinTech/DataSphereStudio/pull/241/files

运行截图如下:

企业微信截图_3689aaf2-0672-41b1-ae7a-c5c80fe23613

企业微信截图_9e16da55-8759-4e05-85ad-64027edc3c6e

我们主要做了以下工作:

  1. 参照DSS已有调度模块dss-azkaban-scheduler-appjoint,开发了一个新的模块:dss-airflow-scheduler-appjoint。
  2. 参照plugins中azkaban的插件linkis-jobtype,开发了一个airflow worker上的运行client:linkis-airflow-client。

DSS和Airflow交互的总体架构图如图所示:

image2020-11-21_18-50-36

模块位置如下图所示:

image2020-11-20_16-44-50

2.1. dss-airflow-scheduler-appjoint

该模块主体也是一个SchedulerAppJoint类的实现,和dss-azkaban-scheduler-appjoint是替代关系。由于Airflow中没有project的概念,为了保留dss中project的概念,该模块实现方式如下:

  1. 发布一个project时候,将这个project中的每个flow都生成一个airflow的DAG的py文件,通过airflow的Restful API发布到Airflow中。这里做了一个优化,我们在提交前,会先从Airflow中获取Project下所有flow对应的DAG文件,筛选出变化的DAG文件进行发布。
  2. 删除一个project时候,将这个project中的每个flow对应的airflow的DAG的py,都通过Airflow的Restful API从Airflow中删除
  3. 删除单个flow的时候,需要通过Airflow的Restful API将flow对应的Airflow的dag删除
  4. 权限上,支持Airflow的LDAP和RBAC两种认证方式

2.2. linkis-airflow-client

  1. 对于Airflow一个Dag中的每个node,执行时候都调用linkis airflow client,它负责连接到linkis gateway中去执行具体任务(执行spark sql/datachecker等等),并获得gateway返回的任务日志进行输出。
  2. linkis airflow client是一个java进程,在Airflow的dag节点中,通过bash_operator进行调用。
  3. client的jar包需要提前部署在airflow的所有worker节点的相同路径中。

2.3. airflow-rest-api-plugin的扩展

通过API方式Airflow进行交互,有两种方式:(1)Github上的第三方开源Restful API实现,见 https://github.com/teamclairvoyant/airflow-rest-api-plugin (2)使用Airflow官方提供的实验性的Restful API: https://airflow.apache.org/docs/1.10.3/api.html 。经过调研,我们发现上述两种方式都缺少一些DSS和Airflow中所需要的Restful功能。考虑到第三方开源实现的代码。理解起来容易且便于扩展(比如一些Restful API本质上是去Airflow机器上执行文件系统操作,或者是调用Airflow的cli命令),因此在第三方开源实现上进行了一些扩展,加入一些新的功能,比如:

  1. 删除某个DAG文件
  2. 删除某个DAG在Airflow上的元信息
  3. 获得一个DAG文件的内容(base64编码)
  4. 获取某个DAG在某一次运行实例的日志

具体的扩展实现可以参考:https://github.com/sparklezzz/airflow-rest-api-plugin/blob/master/plugins/rest_api_plugin.py

2.4. 可以改进的地方

  1. schedulis是针对azkaban定制的,这部分没有做修改,所以只是直接指向airflow的web首页,当做一个内嵌的iframe使用,不带查看project的功能。如果需要,可以考虑使用Airflow的Restful API,开发一个简单的airflow日志查询和任务控制页面。
  2. linkis-airflow-client,目前配置的单个java进程实例内存占用限制在64MB,但多了的话还是会给airflow worker节点造成一定内存压力。可以考虑实现一个单进程的java服务,负责从各个linkis/dss服务获取任务运行日志,在airflow节点上常驻;一个DSS的Airflow任务开始调度时候,airflow woker和该java守护进程交互,从java守护进程中拉取linkis/dss的任务运行日志。
  3. dss-airflow-scheduler-appjoint和dss-azkaban-scheduler-appjoint是互斥的。

3. Linkis对Yarn Cluster,Kerboros的支持

3.1. 对Kerberos的支持

我们使用CDH 5.15版本, 其上的Hive、HDFS、Yarn都启用了Kerberos认证,并且CDH使用Sentry做Hive和HDFS的权限管理。

为了适配这种环境, 我们对Linkis相关模块进行修改,统一使用Hadoop的proxy user机制以DSS用户的身份来访问文件系统和提交Spark Job,这样不再需要给每个用户生成keytab。

由此,可以把用户由Linkis产生的私有数据都放在HDFS上,通过proxy user机制实现更方便的权限控制。

3.2. 对Yarn Cluster模式的支持

3.2.1. engine生命周期管理

在client模式下,engine本质上在使用端口号作为'id'。这一套机制的合理性建立在所有engine都和engine manager在同一台机器的前提下。

engine manager会使用当前机器的可用的端口作为engine的port,但是当engine位于另一台服务器上的时候,比如在spark的yarn cluster模式下,这个port不一定可用, 而且不同的engine实例的port可能是相同的。这就给engine的生命周期管理带来了一些问题。

我们对Linkis进行了改造:

  1. 使用一个单独的id来标识engine
  2. engine自己寻找本地可用端口
  3. engine在Eureka上注册元数据,以使得engine manager可以识别自己管理的engine
  4. 使用Yarn客户端来kill掉engine。

3.2.2. 部署

  1. 通过proxy user机制,以DSS用户的身份提交Spark程序和申请资源。
  2. 将Spark engine对Linkis的相关依赖打成一个jar包,并且shade掉与CDH自带版本相冲突的Guava库。

4. CICD落地实践

4.1. 问题

Linkis 和 DSS 这两个项目中包含大量的 maven module,我们内部用到了 11 个 Linkis module 和 8 个 DSS module。每个 module 对应一个 Spring 服务。虽然官方提供了安装和启动脚本,但是这两个脚本是面向整个项目的,不够灵活,也不够便捷。特别是我们对 Linkis 和 DSS 进行二次开发和测试的时候,如果只改了某个 module 的一部分代码,如何快速的部署并且测试改动的效果?如果采用官方的脚本,需要对整个项目的所有module编译并打包,然后部署,效率非常低。

4.2. 方案

为了解决上述问题,我们设计了一个 module 级别 CICD 的流程,方便开发高效测试和部署代码。具体流程是:开发本地修改代码 -> GitLab -> Jenkins -> 服务器。

4.3. 实现

我们梳理了 Linkis 和 DSS 两个项目的 module,参考了官方的部署和启动脚本,将每个 module 编译、打包、部署、根据环境下发配置参数和启动步骤进行拆分,通过脚本实现了 module 级别的自动化部署。

image2020-11-20_15-34-22

4.4. 效果


image2020-11-20_15-34-50

4.5. 待解决的问题

  1. module之间也有依赖关系,有时候修改一处代码,可能需要部署多个 module,还无法做到智能化。
  2. module级别k8s部署

5. 未来计划

后续我们对DSS有以下的二次开发计划:

  1. 前端页面的定制化改造,以适配公司业务分析人员的特定需要
  2. 更加便捷的script的版本管理功能
  3. 二进制数据结果下载功能

同时未来也会把一些较为通用的有价值的Feature贡献回社区,希望其他使用DSS的公司和个人能够从中受益。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants
@sparklezzz and others