双流Join
操作步骤
源表-字段填写
在流计算源表中进行字段映射填写,从kafka中将数据流映射为具体字段。
映射表dts_ds_orders_real
after_dts_ordercodeofsys varchar as dts_ordercodeofsys
after_dts_paytime bigint as dts_paytime
after_dts_deliveredtime varchar as dts_deliveredtime
after_dts_storecode varchar as dts_storecode
after_dts_warehousecode varchar as dts_warehousecode
after_dts_cancelled bigint as dts_cancelled
after_dts_delivered bigint as dts_delivered
after_dts_receivercity varchar as dts_receivercity
after_dts_receiverprovince varchar as dts_receiverprovince
after_dts_record_id varchar as dts_record_id
after_dts_operation_flag varchar as dts_operation_flag
after_dts_instance_id varchar as dts_instance_id
after_dts_db_name varchar as dts_db_name
after_dts_table_name varchar as dts_table_name
after_dts_utc_timestamp varchar as dts_utc_timestamp
after_dts_before_flag varchar as dts_before_flag
after_dts_after_flag varchar as dts_after_flag
映射表dts_ds_orderdetail_real
after_dts_ordercodeofsys varchar as dts_ordercodeofsys
after_dts_skuname varchar as dts_skuname
after_dts_skucode varchar as dts_skucode
after_dts_quantity bigint as dts_quantity
after_dts_dividedamount double as dts_dividedamount
after_dts_salechanneldividedamount double as dts_salechanneldividedamount
after_dts_initialcost double as dts_initialcost
after_dts_record_id varchar as dts_record_id
after_dts_operation_flag varchar as dts_operation_flag
after_dts_instance_id varchar as dts_instance_id
after_dts_db_name varchar as dts_db_name
after_dts_table_name varchar as dts_table_name
after_dts_utc_timestamp varchar as dts_utc_timestamp
after_dts_before_flag varchar as dts_before_flag
after_dts_after_flag varchar as dts_after_flag
创建结果表dts_ds_all_count_target
create table dts_ds_all_count_target(
bill_date bigint,--下单时间
bill_count bigint,--总的订单总数
qty bigint,--总的销售量
primary key (bill_date)
)
FlinkSQL代码
--FlinkSQL代码
--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM
dts_ds_orders_real
GROUP BY
dts_ordercodeofsys;
--订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE
WHEN dts_operation_flag='U' AND dts_before_flag='Y' AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U' AND dts_before_flag='N' AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
dts_ds_orderdetail_real;
--订单总单数,总销售量
INSERT
INTO
dts_ds_all_count_target
SELECT
a.dts_paytime AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
dts_ds_orders_real a
JOIN
new_orderdetail b
ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
a.dts_paytime;