admin管理员组文章数量:1122849
一、实现方案
表1 是用户进店记录
表2 是用户留资信息
业务:用户留资在前,然后才会有跟进记录,数据上留资时间必然在进店跟进之前,最小相差时间毫秒级别。
通过将留资信息实时同步holo,利用holo做维表的方式,进店记录表通过会话窗口延迟5分钟关联确保数据完全能关联到不会丢失。
二、代码实现
--SQL
--********************************************************************--
--Author: mdp_prd
--CreateTime: 2023-05-05 11:51:38
--Comment: 事件引擎 展厅接待
--********************************************************************--
--声明组织区域维度表 dealer_shop_a_d
CREATE TABLE dealer_shop_a_d(
dealer_shop_no VARCHAR COMMENT '专营店编码,长度默认为50',
province_name VARCHAR COMMENT '省份名称',
city_name VARCHAR COMMENT '城市名称',
warzone_name VARCHAR COMMENT '战区名称',
latest_shop_no VARCHAR COMMENT '最新的专营店编码,长度默认为50',
short_name VARCHAR COMMENT '专营店简称',
group_name_full VARCHAR COMMENT '集团店全称',
group_name VARCHAR COMMENT '集团简称',
oz_name_full VARCHAR COMMENT '区域店全称',
oz_name VARCHAR COMMENT '区域简称',
category_name VARCHAR COMMENT '店面类别',
investor_name VARCHAR COMMENT '投资人',
name VARCHAR Comment '专营店名称',
PRIMARY KEY (dealer_shop_no),
PERIOD FOR SYSTEM_TIME
) with (
type = 'odps',
endPoint = 'http://阿里云odpsendpoint/api',
project = 'dwd',
accessId = '*******************',
accessKey = '&&&&&&&&&&&&&&&&&&&&&&&&&',
tableName = 'dealer_shop_a_d',
maxRowCount = '99999999',
`partition` = 'data_date=max_pt()',
cache = 'ALL',
cacheTTLMs = '3600000'
);
--声明dict码表 follow_sys_dict_a_d
CREATE TABLE follow_sys_dict_a_d(
id varchar COMMENT 'ID',
dict_type BIGINT COMMENT '字典分组',
dict_code BIGINT COMMENT '字典明细项Code',
dict_name varchar COMMENT '字典明细项名称',
dict_sort BIGINT COMMENT '排序',
p_code BIGINT COMMENT '父级字典 关联主表中的dict_code字段 默认0表示无父级节点',
remark varchar COMMENT '备注',
application varchar COMMENT '字典所属系统,默认为空,所有系统都可使用',
version BIGINT COMMENT '版本',
create_time varchar COMMENT '创建时间',
create_by varchar COMMENT '创建人',
update_time varchar COMMENT '更新时间',
update_by varchar COMMENT '更新人',
PRIMARY KEY (dict_code),
PERIOD FOR SYSTEM_TIME
) with (
type = 'odps',
endPoint = 'http://阿里云odpsendpoint/api',
project = 'ads_rpt',
accessId = '***************',
accessKey = '&&&&&&&&&&&&&&&&&&&&&',
tableName = 'follow_sys_dict_a_d',
maxRowCount = '99999999',
`partition` = 'data_date=max_pt()',
cache = 'ALL',
cacheTTLMs = '3600000'
);
--声明SCRM商机表 tb_scrm_opportunity_d
CREATE TABLE tb_scrm_opportunity_d(
id VARCHAR --潜客表ID
,shop_id VARCHAR --店铺ID
,opportunity_type BIGINT --线索类型(线上/线下)
,brand_name VARCHAR
,model_name VARCHAR
,model_code VARCHAR
,customer_name VARCHAR
,customer_mobile_phone VARCHAR
,PRIMARY KEY (id),
PERIOD FOR SYSTEM_TIME
) with (
type = 'hologres',
dbname = 'ods',
tablename = 'scrm.tb_scrm_opportunity_d',
username = '********************',
password = '&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&',
endpoint = '阿里云hologres的url:80',
cache = 'None',
cacheSize = '100000',
cacheTTLMs = '60000',
async = 'false'
);
--声明车型维表
CREATE TABLE brand_model_a_d (
model_code VARCHAR COMMENT '车型编码',
model_name VARCHAR COMMENT '车型',
PRIMARY KEY (model_code),
PERIOD FOR SYSTEM_TIME
) with (
type = 'odps',
endPoint = 'http://阿里云odps/api',
project = 'dwd',
accessId = '***************',
accessKey = '&&&&&&&&&&&&&&&&&&&&&&&&&&&',
tableName = 'brand_model_a_d ',
maxRowCount = '99999999',
`partition` = 'data_date=max_pt()',
cache = 'ALL',
cacheTTLMs = '3600000'
);
--声明SCRM FoLLOW
CREATE TABLE follow_p(
id VARCHAR Comment '跟进单号' --ID
,opportunity_id VARCHAR comment '商机ID'
,shop_id VARCHAR Comment '虚拟店ID'
,follow_time TIMESTAMP Comment '实际跟进时间'
,follow_mode BIGINT comment '跟进方式编码'
,follow_result_code BIGINT comment '跟进结果编码'
,sa_name VARCHAR comment '销售顾问名称'
,event_type BIGINT comment '事件类型'
,new_dts_sync_dts_after_flag VARCHAR --所有列的值是否更新后的值取值:Y或N。
,new_dts_sync_dts_operation_flag VARCHAR --操作类型取值:I:INSERT操作。D:DELETE操作。U:UPDATE操作。
--这里通过process time 时间窗口延迟流数据源5分钟的作用,在5分02秒是触发计算
,ts as proctime()
(,ts as TO_TIMESTAMP (NOW () * 1000),
WATERMARK wk FOR ts as withOffset (ts, 2000) -- 为rowtime定义watermark
这样定义有bug,窗口不会到时关闭,需要通过阿里自己的关键词)
) with (
type = 'datahub',
endPoint = 'http://datahub的endPoint ',
accessId = '**************************',
accessKey = '&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&',
project = 'scrm',
topic = 'follow_p',
maxRetryTimes = '20',
retryIntervalMs = '1000',
batchReadSize = '500',
lengthCheck = 'NONE',
columnErrorDebug = 'true'
);
create table print_table0(
event_name VARCHAR --事件名称
,json_message VARCHAR --事件内容
,unique_id VARCHAR --unique_id
,etl_time TIMESTAMP --数据时间
) with (
type='print'
-- type='datahub',
-- endPoint = 'https://datahub的endPoint ',
-- project = 'it',
-- topic='event_info',
-- accessId = '*****************',
-- accessKey = '&&&&&&&&&&&&&&&&&&&&&&&&',
-- batchSize='300',
-- batchWriteTimeoutMs='1000'
);
create view view0 as (
select
follow.id,
follow.opportunity_id,
follow.shop_id,
follow.follow_time,
follow.follow_mode,
follow.follow_result_code,
follow.sa_name,
follow.event_type,
follow.new_dts_sync_dts_after_flag,
follow.new_dts_sync_dts_operation_flag
FROM (
select
*
FROM
follow_p
where
new_dts_sync_dts_after_flag = 'Y'
AND new_dts_sync_dts_operation_flag <> 'D'
AND follow_mode = '10420004'
AND follow_result_code is not null
AND event_type is not null
AND DATE_FORMAT (
TO_TIMESTAMP (FROM_UNIXTIME (cast (follow_time as bigint) / 1000)),
'yyyy-MM-dd HH:mm:ss'
) > Date_format (
substr (cast (current_timestamp as varchar), 1, 10),
'yyyy-MM-dd',
'yyyy-MM-dd HH:mm:ss'
)
) as follow
GROUP
BY SESSION (ts, INTERVAL '5' minute),
follow.id,
follow.opportunity_id,
follow.shop_id,
follow.follow_time,
follow.follow_mode,
follow.follow_result_code,
follow.sa_name,
follow.event_type,
follow.new_dts_sync_dts_after_flag,
follow.new_dts_sync_dts_operation_flag
);
create view view1 as (
select
follow.id
,follow.opportunity_id
,follow.shop_id
,follow.follow_time
,follow.follow_mode
,follow.follow_result_code
,follow.sa_name
,follow.event_type
,follow.new_dts_sync_dts_after_flag
,follow.new_dts_sync_dts_operation_flag
,opportunity.id opportunityid --潜客表ID
,opportunity.shop_id --店铺ID
,opportunity.opportunity_type --线索类型(线上/线下)
,opportunity.brand_name
,opportunity.model_name
,opportunity.model_code
,opportunity.customer_name
,opportunity.customer_mobile_phone
FROM
view0 as follow
LEFT JOIN
tb_scrm_opportunity_d FOR SYSTEM_TIME AS OF PROCTIME() AS opportunity
ON follow.opportunity_id=opportunity.id
);
insert into print_table0
select
'exhibithall_reception' event_name
,concat('{',CONCAT_WS(',',brand,car_model,`name` ,phone,id,dealer_code,dealer_shop_name,follow_time,action_time,follow_mode,follow_result,sa_name,event_type
),'}') json_message
,concat('event_exhibithall_reception','-',followid) unique_id
,cast(cast(CURRENT_TIMESTAMP as bigint)*1000 as timestamp) etl_time
FROM
(select
CONCAT_WS(':','\"brand\"',concat('\"',case when follow.brand_name='A' then 'A'
WHEN follow.brand_name='B' then 'B'
WHEN follow.brand_name='C' or follow.brand_name='c' then 'C'
WHEN follow.brand_name='D' then 'D'
when follow.brand_name='E' then 'E'
ELSE '' END ,'\"')) brand
,CONCAT_WS(':','\"car_model\"',concat('\"',coalesce(model.model_name,''),'\"')) car_model
,CONCAT_WS(':','\"name\"',concat('\"',coalesce(follow.customer_name,''),'\"')) `name`
,CONCAT_WS(':','\"phone\"',concat('\"',coalesce(follow.customer_mobile_phone,''),'\"')) phone
,CONCAT_WS(':','\"id\"',concat('\"',coalesce(follow.id,''),'\"')) id
,CONCAT_WS(':','\"dealer_code\"',concat('\"',coalesce(dealer_shop.latest_shop_no,''),'\"')) dealer_code
,CONCAT_WS(':','\"dealer_shop_name\"',concat('\"',coalesce(dealer_shop.name,''),'\"')) dealer_shop_name --专营店名称
,CONCAT_WS(':','\"follow_time\"',concat('\"',coalesce(DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(cast(follow.follow_time as bigint)/1000)) ,'yyyy-MM-dd HH:mm:ss'),''),'\"')) follow_time
,CONCAT_WS(':','\"action_time\"',concat('\"',coalesce(DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(cast(follow.follow_time as bigint)/1000)) ,'yyyy-MM-dd HH:mm:ss'),''),'\"')) action_time
,CONCAT_WS(':','\"follow_mode\"',concat('\"',coalesce(dict0.dict_name,''),'\"')) as follow_mode --跟进方式
,CONCAT_WS(':','\"follow_result\"',concat('\"',coalesce(dict1.dict_name,''),'\"')) as follow_result --跟进结果
,CONCAT_WS(':','\"sa_name\"',concat('\"',coalesce(follow.sa_name,''),'\"')) sa_name
,CONCAT_WS(':','\"event_type\"',concat('\"',coalesce(dict2.dict_name,''),'\"')) as event_type --跟进事件类型
,follow.id as followid
FROM
view1 as follow
LEFT JOIN
brand_model_a_d FOR SYSTEM_TIME AS OF PROCTIME() AS model
ON follow.model_code=model.model_code
left join
dealer_shop_a_d FOR SYSTEM_TIME AS OF PROCTIME() AS dealer_shop
ON follow.shop_id = dealer_shop.dealer_shop_no
left JOIN
follow_sys_dict_a_d FOR SYSTEM_TIME AS OF PROCTIME() AS dict0
ON follow.follow_mode=dict0.dict_code
left JOIN
follow_sys_dict_a_d FOR SYSTEM_TIME AS OF PROCTIME() AS dict1
ON follow.follow_result_code=dict1.dict_code
left JOIN
follow_sys_dict_a_d FOR SYSTEM_TIME AS OF PROCTIME() AS dict2
ON follow.event_type=dict2.dict_code
WHERE DATE_FORMAT(TO_TIMESTAMP(FROM_UNIXTIME(cast(follow_time as bigint)/1000)),'yyyy-MM-dd HH:mm:ss')
>Date_format(substr(cast(current_timestamp as varchar),1,10),'yyyy-MM-dd','yyyy-MM-dd HH:mm:ss')
) T
;
版权声明:本文标题:Blink 会话窗口解决双流关联ID关联不上问题 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/biancheng/1725943522a1034462.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论