窗口函数
什么是窗口函数
FlinkSQL支持对一个特定的窗口的聚合。例如有用户想统计在过去的1分钟内有多少用户点击了某个的网页。在这种情况下,我们可以定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
FlinkSQL支持的窗口聚合主要是两种:window aggregate,和over aggregate。他们最核心的区别是over aggregate从语义上保障了对每个输入都有一个输出,因此over agregate常被用于ranking,moving average等场景。关于over aggregate的细节可以参考后面的over aggregate的session章。本节下来主要介绍window aggregate。
Window aggregate支持两种时间类型做窗口:Event Time和Processing Time。每种类型下,又分别支持三种窗口类型:滚动窗口(TUMBLE),滑动窗口(HOP),和会话窗口(SESSION)。
-
时间类型
BlinkSQL支持两种时间:
-
Event Time:用户提供的事件时间(通常是数据的最原始的创建时间),event time一定是用户提供在表的schema里的数据
-
Processing Time:表示系统对事件进行处理的本地系统时间
下图中是不同时间属性在流上的概念:
从上面的定义可以看出,ingestion time和 processing time是系统为流记录增加的时间属性,用户并不能控制。EventTime则是流记录本身携带的时间属性,但由于数据本身有乱序,加之网络抖动或其它原因,eventTime为t1时刻的纪录,有可能会晚于t2(t2 > t1)时刻的被Flink处理。
-
基于processing time的Aggregate
processing time是系统产生的,不在用户的原始数据中,数栈无需显示定义一个processtime,下面给出一个基于processing time的Aggregat例子:
SELECT COUNT(amount) OVER (
PARTITION BY user
ORDER BY proctime
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
-
基于Event Time的Aggregate
event time是用户的原始数据,我们不需要显式重新定义一个event time列, 但是我们要求用户必须指定watermark的计算方法。这是因为用户的数据往往是乱序的,如果不配置一个watermark来合理的delay用户数据,那样数据聚合的结果往往都有很大的偏差,详情请参考Watermark。
实时计算中的窗口类型
目前实时计算全面支持Flink三种窗口类型,通过不同窗口类型的使用可覆盖绝大多数使用场景。
滚动窗口(Tumble Window)
滚动窗口将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果指定了一个5分钟大小的滚动窗口,那么无限流的数据会根据时间划分成[0:00 - 0:05), [0:05, 0:10), [0:10, 0:15)… 等窗口。如下图展示了一个 30秒大小的滚动窗口划分。
-
窗口函数语法
用在GROUP BY子句中,定义window。
TUMBLE(time_attr, size_interval)
-
time_attr:参数必须是流中的一个合法的时间属性字段,即指定了 processing time 或是 event time;
-
size_interval:窗口时间间隔;
-
使用案例
-
请参考常见场景Demo中的 单流滚动窗口。
滑动窗口(Slide Window)
定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
-
窗口函数语法
用在GROUP BY子句中,定义window。
HOP(time_attr, slide_interval, size_interval)
-
time_attr:参数必须是流中的一个合法的时间属性字段,即指定了 processing time 或是 event time;
-
slide_interval:滑动窗口的滑动时间间隔;
-
size_interval:滑动窗口的固定时间间隔;
-
使用案例
-
请参考常见场景Demo中的 单流滑动窗口 。
会话窗口(Session Window)
定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)
-
窗口函数语法
用在GROUP BY子句中,定义window。
SESSION(time_attr, interval)
-
time_attr:参数必须是流中的一个合法的时间属性字段,即指定了 processing time 或是 event time;
-
interval:会话窗口的gap时间间隔;
-
使用案例
-
请参考常见场景Demo中的 单流会话窗口 。
选择分组窗口的开始和结束时间戳
可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:
辅助函数 | 描述 | ||
---|---|---|---|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) |
返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 |
||
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) |
返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。
|
||
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) |
返回相对应的滚动、滑动和会话窗口范围以内的上界时间戳。 |
||
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) |
返回一个可用于后续需要基于时间的操作的 处理时间参数,比如基于时间窗口的 join 以及 分组窗口或分组窗口上的聚合. |
辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用。 |
以下的例子展示了如何在流处理表中指定使用分组窗口函数的 SQL 查询:
SELECT user,
TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user