大廠中Flink SQL開(kāi)發(fā)流程及模板
Flink SQL開(kāi)發(fā)鏈路全流程
先確定源,一般都是Mysql到kafka/mq,通過(guò)讀binlog獲取,也可以直接讀后端提供kafka/mq,其次是流表,流表大多數(shù)都在中間件中存放,最后落地結(jié)果時(shí)可落starrocks/doris中。
流程如下:
流表創(chuàng)建
首先需要先創(chuàng)建流表的庫(kù)就是流表kafka、olap的ods、dwd、dws、ads方便后續(xù)操作,再創(chuàng)建配置流表。
實(shí)時(shí)開(kāi)發(fā)存在3版塊內(nèi)容,1flink sql實(shí)現(xiàn)實(shí)時(shí)sql方式查詢,2.實(shí)時(shí)jar包上傳,3.flink cdc來(lái)接入來(lái)源庫(kù)
數(shù)據(jù)源CDC接入
方法1:Binlog獲取,DTS配置,生成ODS流表
這里任務(wù)名和ODS任務(wù)保持一致即可,配置好MQ信息,及數(shù)據(jù)源(這里數(shù)據(jù)源指的是mysql實(shí)例信息,即連接串)
創(chuàng)建任務(wù)后,再進(jìn)行Topic的訂閱,即可把Mysql Binlog同步到MQ
方法2:使用CDC接生成ODS流表
實(shí)時(shí)表命名
dwd_一級(jí)域_二級(jí)域_ri
ads_一級(jí)主題域_二級(jí)主題域_ri
Flink SQL開(kāi)發(fā)
選擇flink sql模板創(chuàng)建flink sql任務(wù)
由于我們之前已經(jīng)創(chuàng)建過(guò)流表如果需要把流表數(shù)據(jù)全部導(dǎo)入可以直接使用無(wú)代碼模式
如果想寫(xiě)flink sql也可以單獨(dú)去寫(xiě)
配置完flink參數(shù)可上線發(fā)布
實(shí)時(shí)運(yùn)維
實(shí)時(shí)運(yùn)維和任務(wù)運(yùn)維一樣可以看到當(dāng)前實(shí)時(shí)任務(wù)在線上運(yùn)行,具體細(xì)節(jié)需要flink webui定位,后續(xù)單獨(dú)成一個(gè)板塊講。
DWD代碼模板
以訂單表,主訂單為例,任務(wù)為dwd_trade_order_detail_ri,明細(xì)數(shù)據(jù)落kafka流表及hive中
--創(chuàng)建Source表 CREATE TABLE ods_trade_trade_order_ri ( `message` varchar ) WITH ( 'connector' = 'kafka', 'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR', 'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092', 'properties.group.id' = 'GID_YSHOPPING_TRADE_ORDER_CONNECTOR', 'scan.topic-partition-discovery.interval' = '20s', 'format' = 'raw', 'json.fail-on-missing-field' = 'false', 'scan.startup.mode'='timestamp'--指定開(kāi)始時(shí)間 'scan.startup.timestamp-millis'='1706630400000'--指定獲取數(shù)據(jù)的開(kāi)始時(shí)間 ); --創(chuàng)建sink數(shù)據(jù)表 create table dwd_trade_order_detail_ri ( order_id varchar, --主訂單號(hào) order_status_code int, --主訂單狀態(tài)code order_status_name varchar, --主訂單狀態(tài)name product_amt int , --商品總額分 freight_amt int , --運(yùn)費(fèi)總額分 buyer_id varchar, create_time datetime, modify_time datetime ) WITH ( -- 這里接收ods數(shù)據(jù)就不用再寫(xiě)消費(fèi)者組了 'connector' = 'kafka', 'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR', 'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092', 'format' = 'raw' ); create table dwd_trade_order_detail_2hive_ri ( --先在hive建好表 order_id varchar, --主訂單號(hào) order_status_code int, --主訂單狀態(tài)code order_status_name varchar, --主訂單狀態(tài)name product_amt int , --商品總額分 freight_amt int , --運(yùn)費(fèi)總額分 buyer_id varchar, create_time varchar, modify_time varchar ) WITH ( 'path' = ' hdfs://cluster100/user/bdms/hive_db/yx_dwd.db/dwd_trade_order_detail_ri/ds=2024-10-22', 'krb.keytab' = 'sloth.keytab', 'krb.principal' = **********', 'part.size' = '267386880', 'rollover.interval' = '900000', 'format' = 'parquet', 'compression' = 'none', 'partition.keys' = 'date', 'connector.type'='filesystem', 'connector.property-version'='1', 'update-mode'='append', 'hdfs-site'='hdfs-site.xml', 'core-site'='core-site.xml', 'krb.conf'='krb5.conf', 'is.related.mammunt'='false', 'part.prefix'='ndi-128', 'part.suffix'='-success', 'inactivity.interval'='300000' ); INSERT INTO dwd_trade_order_detail_ri SELECT COALESCE(json_value(message,'$.orderId'),'') as order_id ,cast(json_value(message,'$.orderStatusCode') as int) as order_status_code ,case when json_value(message,'$.orderStatusCode')='1' then '未付款' when json_value(message,'$.orderStatusCode')='2' then '已付款' when json_value(message,'$.orderStatusCode')='3' then '待收貨' when json_value(message,'$.orderStatusCode')='4' then '已收貨' when json_value(message,'$.orderStatusCode')='5' then '已完結(jié)' when json_value(message,'$.orderStatusCode')='6' then '退貨' end as order_status_name ,COALESCE(cast(json_value(message,'$.productAmt') as int),0) as product_amt ,COALESCE(json_value(message,'$.freightAmt'),0) as freight_amt ,COALESCE(json_value(message,'$.buyerId'),0) as buyer_id ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as create_time ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as modify_time ,from_unixtime(cast(substring(JSON_VALUE (CAST (`message` as VARCHAR), '$.ts'),1,10) as bigint)) ts from ( select split_index(message,'orderLog:',1) as message from ods_trade_trade_order_ri where message like '%xxxxx%' ) lateral table ( json_array_to_map(message, 'data') ) as t (data) where is_del=0 ; INSERT INTO dwd_trade_order_detail_2hive_ri select order_id ,order_status_code ,order_status_name ,product_amt ,freight_amt ,buyer_id ,cast(create_time as varchar) as create_time ,cast(modify_time as varchar) as modify_time from dwd_trade_order_detail_ri