Flink语法

实时计算作业支持flink多个版本。本文为您介绍不同flink版本之间的语法差异。

数据类型

  • flink 1.12 支持结构化的数据类型,如:ARRAY<t>、ARRAY, MAP<kt, vt>, MULTISET<t>、MULTISET, ROW<n0 t0, n1 t1, …​>、ROW(n0 t0, n1 t1, …​.)

  • flink 1.10 支持的结构化数据类型有ARRAY<t>,其余的不支持;

  • flink 1.10 支持 Long,String,这两个类型flink 1.12 不支持;

  • flink 1.12支持的所有类型请查看:flink 1.12 支持的数据类型

查询 select

Flink 1.12

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

tablePath:
  [ [ catalogName . ] schemaName . ] tableName

systemTimePeriod:
  FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

CREATE语句

CREATE TABLE

Flink 1.12

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

Flink1.10

CREATE TABLE table_name
  (
    { <physical_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
    [ PARMAEY KEY (k1, k2, ...) ]
    [ PERIOD FOR SYSTEM_TIME ]
  )
  [COMMENT table_comment]
  WITH (key1=val1, key2=val2, ...)

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [ AS <alias>]

<computed_column_definition>:
  column_name AS computed_column_expression

<PARMARY KEY (k1, k2, ...)>:
  Parmary key of dimension table, which used as conditions for join on.
  'k1,k2,...' either a physical key or an alias

<PERIOD FOR SYSTEM_TIME>:
  This is the flag of dimension table, which is used to indicate  dimension table.

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  table_name

CREATE FUNCTION

Flink 1.12

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

CREATE VIEW

Flink 1.12

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [{columnName [, columnName ]* }] [COMMENT view_comment]
  AS query_expression

Flink 1.10

CREATE VIEW view_name AS query_expression

DROP语句

flink 1.10 不支持此语法;
Flink 1.12
=== DROP TABLE

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

DROP VIEW

DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name

DROP FUNCTION

DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;

ALTER语句

flink 1.10 不支持此语法;以下均为Flink 1.12语法规范+ === ALTER TABLE
重命名表

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
  • 为指定的表设置一个或者多个属性。若个别属性已经存在于表中,则使用新的值覆盖旧的值;
    设置或修改表属性

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
  • 为指定的表设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值;

ALTER DATABASE

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
  • 在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值;

ALTER FUNCTION

ALTER [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF EXISTS] [catalog_name.][db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
  • 在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值;

INSERT语句

将查询数据插入表中

Flink 1.12

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])
  • INSERT OVERWRITE 将会覆盖表中或分区中的任何已存在的数据。否则,新数据会追加到表中或分区中。

  • PARTITION 语句应该包含需要插入的静态分区列与值。

Flink 1.10

INSERT INTO table_name select_statement

将值插入表中

flink 1.10 不支持此语法;
Flink 1.12

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]

values_row:
    : (val1 [, val2, ...])
  • INSERT OVERWRITE 将会覆盖表中的任何已存在的数据。否则,新数据会追加到表中。