双流Join

场景设定

某电商平台进行销售,实时计算可帮助平台在迅速出票的过程中,实现两个数据流的合并计算总订单数、总销售量。

解决方案

双流-1.png

操作步骤

订单源表dts_ds_orders_real

订单源表dts_ds_orderdetail_real

订单状态说明

源表-字段填写

在流计算源表中进行字段映射填写,从kafka中将数据流映射为具体字段。

映射表dts_ds_orders_real

after_dts_ordercodeofsys varchar as dts_ordercodeofsys
after_dts_paytime bigint as dts_paytime
after_dts_deliveredtime varchar as dts_deliveredtime
after_dts_storecode varchar as dts_storecode
after_dts_warehousecode varchar as dts_warehousecode
after_dts_cancelled bigint as dts_cancelled
after_dts_delivered bigint as dts_delivered
after_dts_receivercity varchar as dts_receivercity
after_dts_receiverprovince varchar as dts_receiverprovince
after_dts_record_id varchar as dts_record_id
after_dts_operation_flag varchar as dts_operation_flag
after_dts_instance_id varchar as dts_instance_id
after_dts_db_name varchar as dts_db_name
after_dts_table_name varchar as dts_table_name
after_dts_utc_timestamp varchar as dts_utc_timestamp
after_dts_before_flag varchar as dts_before_flag
after_dts_after_flag varchar as dts_after_flag

源表dts_ds_orders_real-详细配置

双流-2.png

映射表dts_ds_orderdetail_real

after_dts_ordercodeofsys varchar as dts_ordercodeofsys
after_dts_skuname varchar as dts_skuname
after_dts_skucode varchar as dts_skucode
after_dts_quantity bigint as dts_quantity
after_dts_dividedamount double as dts_dividedamount
after_dts_salechanneldividedamount double as dts_salechanneldividedamount
after_dts_initialcost double as dts_initialcost
after_dts_record_id varchar as dts_record_id
after_dts_operation_flag varchar as dts_operation_flag
after_dts_instance_id varchar as dts_instance_id
after_dts_db_name varchar as dts_db_name
after_dts_table_name varchar as dts_table_name
after_dts_utc_timestamp varchar as dts_utc_timestamp
after_dts_before_flag varchar as dts_before_flag
after_dts_after_flag varchar as dts_after_flag

源表dts_ds_orderdetail_real-详细配置

双流-3.png

创建结果表dts_ds_all_count_target

create table dts_ds_all_count_target(
bill_date bigint,--下单时间
bill_count bigint,--总的订单总数
qty bigint,--总的销售量
primary key (bill_date)
)

FlinkSQL代码

--FlinkSQL代码
--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM
dts_ds_orders_real
GROUP BY
dts_ordercodeofsys;
--订单详情表,有效订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE
WHEN dts_operation_flag='U' AND dts_before_flag='Y' AND dts_after_flag='N' THEN -1*dts_quantity
WHEN dts_operation_flag='U' AND dts_before_flag='N' AND dts_after_flag='Y' THEN dts_quantity
WHEN dts_operation_flag='D' THEN -1*dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
dts_ds_orderdetail_real;
--订单总单数,总销售量
INSERT
INTO
dts_ds_all_count_target
SELECT
a.dts_paytime AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
dts_ds_orders_real a
JOIN
new_orderdetail b
ON a.dts_ordercodeofsys=b.dts_ordercodeofsys
GROUP BY
a.dts_paytime;

运行结果

  • 实时采集、FlinkSQL任务有正常的数据吞吐

双流-4.png
双流-5.png
双流-6.png