欧美1区2区3区激情无套,两个女人互添下身视频在线观看,久久av无码精品人妻系列,久久精品噜噜噜成人,末发育娇小性色xxxx

大廠中Flink SQL開(kāi)發(fā)流程及模板

Flink SQL開(kāi)發(fā)鏈路全流程

先確定源,一般都是Mysql到kafka/mq,通過(guò)讀binlog獲取,也可以直接讀后端提供kafka/mq,其次是流表,流表大多數(shù)都在中間件中存放,最后落地結(jié)果時(shí)可落starrocks/doris中。

流程如下:

流表創(chuàng)建

首先需要先創(chuàng)建流表的庫(kù)就是流表kafka、olap的ods、dwd、dws、ads方便后續(xù)操作,再創(chuàng)建配置流表。

實(shí)時(shí)開(kāi)發(fā)存在3版塊內(nèi)容,1flink sql實(shí)現(xiàn)實(shí)時(shí)sql方式查詢,2.實(shí)時(shí)jar包上傳,3.flink cdc來(lái)接入來(lái)源庫(kù)

數(shù)據(jù)源CDC接入

方法1:Binlog獲取,DTS配置,生成ODS流表

這里任務(wù)名和ODS任務(wù)保持一致即可,配置好MQ信息,及數(shù)據(jù)源(這里數(shù)據(jù)源指的是mysql實(shí)例信息,即連接串)

創(chuàng)建任務(wù)后,再進(jìn)行Topic的訂閱,即可把Mysql Binlog同步到MQ

方法2:使用CDC接生成ODS流表

實(shí)時(shí)表命名

dwd_一級(jí)域_二級(jí)域_ri

ads_一級(jí)主題域_二級(jí)主題域_ri

Flink SQL開(kāi)發(fā)

選擇flink sql模板創(chuàng)建flink sql任務(wù)

由于我們之前已經(jīng)創(chuàng)建過(guò)流表如果需要把流表數(shù)據(jù)全部導(dǎo)入可以直接使用無(wú)代碼模式

如果想寫(xiě)flink sql也可以單獨(dú)去寫(xiě)

配置完flink參數(shù)可上線發(fā)布

實(shí)時(shí)運(yùn)維

實(shí)時(shí)運(yùn)維和任務(wù)運(yùn)維一樣可以看到當(dāng)前實(shí)時(shí)任務(wù)在線上運(yùn)行,具體細(xì)節(jié)需要flink webui定位,后續(xù)單獨(dú)成一個(gè)板塊講。

DWD代碼模板

以訂單表,主訂單為例,任務(wù)為dwd_trade_order_detail_ri,明細(xì)數(shù)據(jù)落kafka流表及hive中

--創(chuàng)建Source表
CREATE TABLE ods_trade_trade_order_ri (
    `message`   varchar
) WITH (
  'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'properties.group.id' = 'GID_YSHOPPING_TRADE_ORDER_CONNECTOR',
  'scan.topic-partition-discovery.interval' = '20s',
  'format' = 'raw',
  'json.fail-on-missing-field' = 'false',
  'scan.startup.mode'='timestamp'--指定開(kāi)始時(shí)間
  'scan.startup.timestamp-millis'='1706630400000'--指定獲取數(shù)據(jù)的開(kāi)始時(shí)間
);


--創(chuàng)建sink數(shù)據(jù)表
create table dwd_trade_order_detail_ri (
    order_id      varchar, --主訂單號(hào)
    order_status_code int, --主訂單狀態(tài)code
    order_status_name   varchar, --主訂單狀態(tài)name
    product_amt int , --商品總額分
    freight_amt int  , --運(yùn)費(fèi)總額分
    buyer_id varchar,
    create_time datetime,
    modify_time datetime
) WITH ( 
-- 這里接收ods數(shù)據(jù)就不用再寫(xiě)消費(fèi)者組了
   'connector' = 'kafka',
  'topic' = 'YSHOPPING_TRADE_ORDER_CONNECTOR',
  'properties.bootstrap.servers' = 'xxx:9092,xxxx:9092',
  'format' = 'raw'
);


create table dwd_trade_order_detail_2hive_ri (
--先在hive建好表
    order_id      varchar, --主訂單號(hào)
    order_status_code int, --主訂單狀態(tài)code
    order_status_name   varchar, --主訂單狀態(tài)name
    product_amt int , --商品總額分
    freight_amt int  , --運(yùn)費(fèi)總額分
    buyer_id varchar,
    create_time varchar,
    modify_time varchar
)
WITH (
'path' = ' hdfs://cluster100/user/bdms/hive_db/yx_dwd.db/dwd_trade_order_detail_ri/ds=2024-10-22',
'krb.keytab' = 'sloth.keytab',
'krb.principal' = **********',
'part.size' = '267386880',
'rollover.interval' = '900000',
'format' = 'parquet',
'compression' = 'none',
'partition.keys' = 'date',
'connector.type'='filesystem',
'connector.property-version'='1',
'update-mode'='append',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'krb.conf'='krb5.conf',
'is.related.mammunt'='false',
'part.prefix'='ndi-128',
'part.suffix'='-success',
'inactivity.interval'='300000'
);


INSERT INTO dwd_trade_order_detail_ri
SELECT
    COALESCE(json_value(message,'$.orderId'),'')  as order_id
    ,cast(json_value(message,'$.orderStatusCode') as int) as order_status_code
    ,case when json_value(message,'$.orderStatusCode')='1'
          then '未付款'
          when json_value(message,'$.orderStatusCode')='2'
          then '已付款'
          when json_value(message,'$.orderStatusCode')='3'
          then '待收貨'
          when json_value(message,'$.orderStatusCode')='4'
          then '已收貨'
          when json_value(message,'$.orderStatusCode')='5'
          then '已完結(jié)'
          when json_value(message,'$.orderStatusCode')='6'
          then '退貨'
     end as order_status_name
    ,COALESCE(cast(json_value(message,'$.productAmt') as int),0) as product_amt
    ,COALESCE(json_value(message,'$.freightAmt'),0) as freight_amt
    ,COALESCE(json_value(message,'$.buyerId'),0) as buyer_id
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as create_time
    ,TO_TIMESTAMP(json_value(message,'$.data'),'yyyy-MM-dd HH:mm:ss') as modify_time
    ,from_unixtime(cast(substring(JSON_VALUE (CAST (`message` as VARCHAR), '$.ts'),1,10) as bigint)) ts 
from ( 
select split_index(message,'orderLog:',1) as message
from 
ods_trade_trade_order_ri
where message like '%xxxxx%'
)
lateral table (
    json_array_to_map(message, 'data')
  ) as t (data)
where is_del=0
;

INSERT INTO dwd_trade_order_detail_2hive_ri
select order_id
      ,order_status_code
      ,order_status_name
      ,product_amt
      ,freight_amt
      ,buyer_id
      ,cast(create_time as varchar) as create_time
      ,cast(modify_time as varchar) as modify_time      
from dwd_trade_order_detail_ri


全部評(píng)論

相關(guān)推薦

ResourceUtilization:差不多但是估計(jì)不夠準(zhǔn)確,一面沒(méi)考慮到增長(zhǎng)人口,另一方面也沒(méi)考慮到能上大學(xué)的人數(shù)比例,不過(guò)我猜肯定只多不少
點(diǎn)贊 評(píng)論 收藏
分享
評(píng)論
1
7
分享

創(chuàng)作者周榜

更多
??途W(wǎng)
牛客企業(yè)服務(wù)