单流会话窗口

场景设定

某商家双十一期间进行商品促销活动,商家希望能够指定商品没有发生购买行为10s后查看该商品的销售总额。实时计算能够将商品出售情况作为数据流,采用会话窗口(Session Window)的方式,设定一个10s的会话间隔(Session Gap),即可完成对销售总额的自动计算。
会话-1.png

解决方案

会话-2.png

操作步骤

源表-字段填写

在流计算源表中进行字段映射填写,从kafka中将数据流映射为具体字段。

after_event_id int as event_id
after_id varchar as id
after_buy_id varchar as buy_id
after_goods_type varchar as goods_type
after_pay_time timestamp as pay_time
after_buy_amount double as buy_amount
after_but_way varchar as buy_way

源表-详细配置

时间列选择字段类型为Timestamp或Bigint类型的数据。
滑动-3.png [[8ZzZY]]

FlinkSQL代码

对Kafka数据流内容进行SUM运算,记录会话窗口开始/结束时间,并在最后根据商品ID和会话窗口进行划分。

INSERT INTO dts_ds_session_target
SELECT id as id,
      SUM(buy_amount) AS totalsale,
      SESSION_START(ROWTIME, INTERVAL '10' SECOND ) as sws,
      SESSION_END(ROWTIME, INTERVAL '10' SECOND )   as swe
FROM dts_ds_merchant_source
GROUP BY id,SESSION(ROWTIME, INTERVAL '10' SECOND )

运行结果

  • 实时采集、FlinkSQL任务有正常的数据吞吐

滑动-4.png 会话-5.png

  • MYSQL结果表有正常数据写入

根据数据量不同,实时计算处理时长不同,且根据会话间隔不同,计算任务自动重启。
会话-6.png