实时采集

以实时电商案例数据源表存储为例,为您介绍如何准备数据源表存储并将其实时采集到Kafka内等待消费。

数据源表创建

在本地MySQL数据库通过建表语句创建数据源表

-- 创建数据源表 singal_stream_simple_source
create table singal_stream_simple_source
(
   event_id   int unsigned auto_increment, -- 数据库自增id,主键
   id         varchar(255),                -- 商品id
   buyer_id   varchar(255),                -- 消费者id
   goods_type varchar(255),                -- 商品类型
   pay_time   timestamp,                --  支付时间
   buy_amount double,                      -- 购买金额
   buy_way    varchar(255),                -- 购买渠道
   primary key (event_id)
)

数据源接入

1.登陆 数栈-实时计算模块
2.选择目标项目,点击 数据开发,进入项目。
3.点击 数据源-引用数据源 ,弹出 引用数据源 窗口如下:

列表中仅展示分配给实时开发的数据源,若无目标数据源需前往 数据源中心 分配至实时开发产品。

4.根据数据源类型进行筛选,选择目标数据源接入至实时开发中。

创建实时采集任务

静态关系数据库中的数据通过实时采集任务可转化为Kafka内的数据流,从而进入Flinkx被消费。
1.在 数据开发 页面,点击创建任务 ,创建 实时采集 任务如下:

实时采集-1.png

2.数据源类型选择 MySQLBinLog ,选择实时采集的数据源表,并配置相关参数

实时采集-2.png

3.指向目标Kafka的某一Topic(后续FlinkSQL任务中会调用这一Topic)

实时采集-4.png

4.根据需求配置速率,默认不修改

实时采集-5.png

5.再次浏览所有配置内容,确认无误后点击 保存 即完成实时采集任务配置。

实时采集-6.png

此时,实时采集任务已创建于开发环境,后续调用需提交至生产环境,具体操作可参考 任务上线及运维