任务管理

实时开发提供了一套完成的WEB-SQL IDE工具,同时还提供原生Flink代码的提交运行。

任务类型

实时开发提供了3种任务类型,分别适用于不同的使用场景:

  • FlinkSQL任务

SQL任务支持您直接在Web端编辑和维护Flink SQL代码,并可方便地调试运行和协作开发。实时开发还支持代码内容的版本管理、关键字高亮能功能。

  • Flink任务

基于原生Flink Java API的任务。

  • 实时采集任务

实时任务主要完成数据实时采集以供后续实时开发或其他组件处理,详细的配置规范请参考 实时采集 模块。

实时开发流程

实时开发流程.jpg

如上图所示,实时任务的处理流程通常分3步,前2步建立好数据源(维表)与数据目标,之后再写数据计算的逻辑,完成对实时数据的分析处理。

FlinkStreamSQL概念.jpg

FlinkSQL任务使用SQL的形式处理Kafka内的流式数据,因此,在处理时,需要开发人员将Kafka内的数据看做一张虚拟的"`二维表`",之后即可通过FlinkStreamSQL实现对二维表的处理,并将计算结果输出。

新建任务

流计算提供了3种任务类型,本节以创建SQL任务为例,介绍如何创建一个任务并编辑代码内容。

FlinkSQL任务

本节以创建FlinkSQL任务为例,介绍一个FlinkSQL任务创建的完整流程。

新建FlinkSQL任务

进入数据开发 菜单,点击创建任务 按钮,并填写新建任务弹出框中的配置项,配置项说明:

  1. 任务名称:需输入英文字母、数字、下划线组成,不超过64个字符。

  2. 任务类型:可选择FlinkSQL、Flink、实时采集。

  3. 存储位置:此任务在页面左侧的任务存储结构中的位置。

  4. 描述:此任务的描述,可输入长度不超过200个的任意字符。

点击 保存 ,弹窗关闭,即完成了新建任务,同时系统自动打开新建的SQL任务。

可视化添加数据源

在SQL任务中,用户可以通过引用数据资产中的元数据表和直接创建两种方式方式添加数据源。

数据开发仅能引用自身有权限的元数据表进行任务创建,项目管理员及以上角色可直接引用数据源进行任务创建
引用元数据表

在打开SQL任务后,点击右侧面板的 源表/维表/结果表 ,打开配置面板:

  1. 点击添加输入,若需要添加多个数据源作为输入时,可继续在下面的模块点击添加输入

  2. 字段信息:默认加载元数据表信息,用户不可对字段进行编辑和修改,但可修改别名以简化SQL开发中的使用。+

  3. 其他参数:默认加载元数据表的初始化参数,在 源表/维表/结果表 的使用中都有不同的详细参数配置,数据开发也可针对部分进行细致调整。

新增元数据表与数据权限需要使用到数据资产模块中的部分功能,具体请参考 标准化建表数据权限管理
直接创建

在打开SQL任务后,点击右侧面板的 源表,打开配置面板:

  1. 点击添加输入,若需要添加多路Kafka作为输入时,可继续在下面的模块点击添加输入

  2. 选择数据源类型:目前只支持Kafka

  3. 选择Kafka Topic;

  4. 映射表:由Kafka Topic内的数据映射到Flink中的"表",需在此输入Flink的表名,从而在Flink中以SQL的形式处理数据;

  5. 字段信息:将Json格式中的数据映射为关系型数据表,即Flink中此表对应的字段信息和类型。

    • 点击 数据预览 查看采集到的Json语句的格式

    • 根据字段名称进行映射,输入模式为 <源表字段名><字段类型>AS<源表映射字段名>多字段信息通过回车进行分割。仅支持JSON格式数据源,若为嵌套格式,字段名称由JSON的各层级key组合隔开,例如:

原Json

{
  "a":
  {
    "b":"1",
    "c":"2"
  }
}

在实时采集任务中不勾选Json平铺,源表中填写的字段信息

a.b varchar as b a.c varchar as c

在实时采集任务中勾选Json平铺后

{
	 "a_b": "1",
   "a_c":"2"
}

源表中填写的字段信息

a_b varchar as b
a_c varchar as c

实时计算通过 _ 将嵌套JSON进行平铺展开。

  1. 时间特征:Flink分为ProcTime和 EventTime两种时间特征。

    1. ProcTime:处理时间指执行对应Operation的设备的系统时间;

    2. EventTime:事件时间是每个单独事件在它的生产设备上发生的时间,若选择了EventTime,则还需补充时间列、偏移量和时区信息,这是Flink Watermark机制的要求。

      1. 时间列必须是映射表中已声明的一列(当前仅支持为Timestamp类型),含义是基于该列生成Watermark,并且标识该列为Event Time列,可以在后续Query中用来定义窗口。

      2. 偏移量单位为毫秒,含义为Watermark值与Event time值的偏移量。通常一条记录中的某个字段就代表了该记录的发生时间。此处的详细信息可参考 Watermark

      3. 通过配置作业的时区调整时间类型数据的输出结果。默认时区为东八区(Asia/Shanghai)。此处的详情信息可参考 时区

  2. 并行度:算子的并发数,指的是Flink集群的Task Slot的数量;

    • 在使用Event Time Watermark的时候,rowtime必须是 TIMESTAMP 的类型,现在支持毫秒级别的、在Unix时间戳里是13位的,如果是其他类型或者是在Unix时间戳不是13位的建议使用计算列来做转换。

    • 如果数据源注册了watermarker:默认添加了 rowtime 字段,否则添加 processtime 字段;

每个任务的并行度配置不能超过整个Flink集群的Task Slot的数量,每个任务的并行度累加起来也不能超出,否则会造成任务的报错;

可视化添加数据目标

在SQL任务中,用户可以直接通过页面配置的方式添加MySQL、ElasticSearch、HBase、Redis等结果数据的输出目标,在打开SQL任务后,点击右侧面板的 结果表 ,打开配置面板:

  1. 点击 添加输入 ,若需要添加多路输出时,可继续在下面的模块点击 添加输入

  2. 选择存储类型:可选择MySQL、HBase、ElasticSearch、Redis等数据库;

    1. 若选择了MySQL,则还需选择MySQL中的一张表;

    2. 若选择了HBase,则还需选择HBase中的一张表及rowkey;

    3. 若选择了ElasticSearch,则还需选择ElasticSearch中的索引、id和索引类型;

    4. 若选择了Redis,则还需要选择Redis中的一张表及主键;

  3. 映射表:由Kafka Topic内的数据映射到Flink中的"`表`",需在此输入Flink的表名,从而在Flink中以SQL的形式处理数据;

  4. 字段信息:即Flink中此表对应的字段信息和类型。输入模式为 ` <源表字段名><字段类型>AS <源表映射字段名>` ,多字段信息通过回车进行分割

  5. 并行度:算子的并发数,指的是Flink集群的Task Slot的数量;

  6. 数据输出时间:结果表输出数据的时间间隔,任务运行后每满足指定时间间隔就输出一次数据

  7. 数据输出条数:结果表输出数据的条数间隔,任务运行后每满足指定条数就输出一次数据

数据输出时间和数据输出条数为独立计算,两者触发的 输出数据 操作不会互相重置各自的计数。

每个任务的并行度配置不能超过整个Flink集群的Task Slot的数量,每个任务的并行度累加起来也不能超出,否则会造成任务的报错;

  • 支持目标数据库

当前版本实时计算支持以下作为流式计算的数据输出目标数据库:

  • MySQL

  • Oracle

  • Kudu

  • HBase

  • ElasticSearch

  • ElasticSearch 6

  • Redis

  • MongoDB

  • Kafka

  • Impala

  • Polardb for MySQL 8

可视化添加维表

在SQL任务中,用户可以直接通过页面配置的方式添加MySQL作为维表数据源,在打开SQL任务后,点击右侧面板的维表,打开配置面板:

  1. 点击添加输入,若需要添加多路维表时,可继续在下面的模块点击添加输入

  2. 选择数据源类型:目前只支持MySQL;

  3. 选择MySQL中的一张表;

  4. 输入映射表:Flink中的"表";

  5. 字段信息:即MySQL中的哪些字段需要作为维表加载;

  6. 主键:维表中的主键信息,系统进行缓存刷新时会根据主键判断数据的超时时间;

  7. 并行度:算子的并发数,指的是Flink集群的Task Slot的数量;

  8. 缓存策略

    • LRU(部分缓存):Least Recently Used,记录维表每条数据的访问热度,仅缓存热度较高的数据;

    • None(不缓存):表示不需要缓存

    • ALL(全缓存):缓存维表的全部数据;

  9. 缓存大小:即缓存空间需要保存多少条数据;

  10. 缓存超时时间:数据超出缓存时间时会在缓存空间中被删除

维表定义后的字段在一个Job中的不同SQL代码段可重复使用,无需反复重新定义

环境参数配置

实时计算支持用户手动对当前任务的系统环境参数进行配置,具体配置参数请参考环境参数中的注释内容。

本地调试

在SQL任务中,用户完成相关SQL任务的编辑后,可点击 调试 按键,打开调试窗口:

4 5 2021 16 39 45 PM
  1. 用户配置的源表均会显示在窗口左侧的列表中,当数据配置完毕,会显示绿色√标识

  2. 选中目标数据表,点击下载模版,可下载包含对应字段信息的CSV数据模板

  3. 点击上传测试数据,将对应CSV数据文件上传,读取完成后可在数据预览中查看详情;

  4. 也可直接点击 采集线上数据,通过设置采集条数,从Kafka中进行读取,

  5. 确定数据内容后点击 确定,此时任务提交至后台进行本地调试,等待片刻后用户可查看到具体调试结果。

新建Flink任务

交互方式与新建FlinkSQL任务类似,但还需补充如下信息: . 资源:基于原生Flink Java API编写的Jar包,需要用户提前上传(上传方式可参考资源管理)。
. mainClass:用户jar包的入口函数。
. 命令行参数:请输入对应MainClass的入口命令行参数,例如 String[] args。
. 附加资源:关联上传至项目内的其他资源文件,例如Kerberos认证使用krb5、Keytab文件等,具体使用可参考 Flink Jar Demo

多资源引用创建Flink任务时无法基于K8s进行运行和调度。

新建实时采集任务

交互方式与新建FlinkSQL任务类似,但还需补充配置模式信息。
实时采集提供两种模式进行选择,具体说明如下:

模式名称 说明

向导模式

提供向导式的开发引导,通过可视化的填写和下一步的引导,帮助快速完成数据同步任务的配置工作。学习成本低,但无法享受到一些高级功能。

脚本模式

通过直接编写数据同步的JSON脚本来完成数据同步开发,适合高级用户,学习成本较高。脚本模式可以提供更丰富灵活的能力,做精细化的配置管理。

如果选择了向导模式,您可以参考 实时采集 完成任务配置。
如果选择了脚本模式,您可以在编辑区左上角点击 导入模板,并选择数据源类型、数据库等信息,确定后即可导入模板,您只需编辑其中一部分信息.

数据源配置中密码已加密,导入模版后需要重新填写数据源密码.
通过克隆功能复制的脚本任务密码有效,可无需修改.

向导/脚本模式一旦选择,不可修改。向导模式可转化为脚本模式(操作不可逆)。

其他亮点功能

其他亮点功能包括:

  • Flink SQL智能提示

您在输入Flink SQL过程中,IDE提供包括关键字、内置函数等提示功能。

  • Flink SQL语法高亮

针对Flink SQL关键字,提供不同颜色的语法高亮功能,以区分Flink SQL不同结构。

  • Flink SQL语法检测

您在完成Flink SQL的输入后,可点击 语法检测 按键对所有代码进行语法检测并修正。

  • SQL版本管理

数据开发涵盖了日常开发工作的关键领域,包括代码辅助、代码版本。数据开发提供了一个代码版本管理功能。您每次提交即可生成一个代码版本,代码版本为追踪修改以及日后回滚所用。
您每次提交一个作业,提交之后,实时计算均会为他生成一份代码快照,用于日后的代码追踪使用。在SQL任务的 任务详情 面板查看 历史提交版本 ,罗列了该任务的所有版本信息。您可以使用对比功能,查看最新代码和指定版本的差异。

  • Flink SQL本地调试

您在完成Flink SQL的输入后,可点击 调试 按键,通过本地上传CSV文件或采集线上数据进行本地调试,对业务逻辑进行验证。