时间属性

本文为您介绍Flink SQL所支持的Event Time和Processing Time时间类型。
Flink支持三种与流数据处理相关的时间概念:Processing Time、Event Time和Ingestion Time。

1576217052010 f9605b16 58d2 4362 b7d4 fbf0fff5bc7a

Flink SQL仅支持其中的两种时间类型Event Time和Processing Time:

  • EventTime:您提供的事件时间(通常是数据的最原始的创建时间)。

  • Processing Time(Proctime):系统对事件进行处理的本地系统时间,单位为毫秒。

Event Time必须是您提供在数据储存里的数据。

EventTime

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将 TIMESTAMP 类型(将来会支持LONG类型)声明成RowTime字段。如果源表中需要声明为Event Time的列不是 TIMESTAMP 类型,需要借助 计算列(链接待补充) ,基于现有列构造出一个 TIMESTAMP 类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个RowTime字段,需要明文定义一个Watermark计算方法。
窗口函数基于EventTime聚合的示例如下:

CREATE TABLE FullLinkTest(
    after_id int AS id,
    after_userid varchar AS userid,
    after_username varchar AS username,
    after_prodid varchar AS prodid,
    after_price double AS price,
    after_amount int AS amount,
    after_discount double AS discount,
    after_tm bigint AS tm,
    WATERMARK FOR tm AS withOffset(tm,30000)  --Watermark计算方法。
 )WITH(
    type ='kafka11',
    bootstrapServers ='<yourbootstrapServers>',
    zookeeperQuorum ='<yourzookeeperQuorum>',
    offsetReset ='latest',
    topic ='<yourtopicname>',
    timezone='<yourtimezone>',
    topicIsPattern ='false',
    parallelism ='1'
 );

CREATE TABLE totalSales(
    totalSales DOUBLE,
    tms TIMESTAMP,
    tme TIMESTAMP
 )WITH(
    type ='mysql',
    url ='<yourmysqlurl>',
    userName ='<youruserName>',
    password ='<yourpassword>',
    tableName ='<yourtableName>',
    parallelism ='1'
 );

insert into totalSales
    select
        sum(price * amount * discount) as totalSales,
        TIMESTAMPADD(HOUR,8,TUMBLE_START( ROWTIME,INTERVAL '10' SECOND)) as tms,
        TIMESTAMPADD(HOUR,8,TUMBLE_END (ROWTIME,INTERVAL '10' SECOND)) as tme
    from
        FullLinkTest
    group by
        TUMBLE( ROWTIME,INTERVAL '10' SECOND);

Processing Time(Proctime)

Processing Time是指正在执行相应操作的机器的系统时间,即物理现实时间。当一个实时计算依赖ProcTIme时间列运行时,所有基于时间的操作(如Window窗口)将使用运行实时计算机器的系统时钟。
若Window窗口函数基于ProcTime,且开窗间隔为1 小时,则Flink会自动将任务启动时间划分在某一整点区间内,而非从启动时间开始间隔一小时进行开窗操作。 例如,如果实时计算任务设定开窗间隔为1小时且在9:15 am开始运行,则第一个Window窗口将包括在9:15 am和10:00 am之间处理的事件(自动将任务划分在9~10点这一整点区间),下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。详情可查看 FlinkV1.8官方文档中对Proctime的相关解释
相较于Event Time,Processing Time有如下特点:

  • 简单易行,不用考虑实时计算任务和机器之间的延迟问题

  • 高性能,低延迟

窗口函数基于Processing Time聚合的示例如下:

CREATE TABLE mq_stream (
  a VARCHAR,
  b VARCHAR,
  c BIGINT,
  d AS PROCTIME()   --在数据源表的声明中明文定义一个Processing Time列。
) WITH (
  type = 'mq',
  topic = '<yourTopic>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
  id VARCHAR,
  c TIMESTAMP,
  f TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = '<yourDatebaseURL>',
  tableName = '<yourDatabasTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);
INSERT INTO rds_output
SELECT a AS id,
     SESSION_START(d, INTERVAL '1' SECOND) AS c,
     SESSION_END(d, INTERVAL '1' SECOND) AS f,
     COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a

时间属性字段传递

时间属性字段经过如下操作后会失去时间属性特性 :