Comments (6)
Could you share the SQL query (including DDL) and the job graph in web UI?
from flink-cdc-connectors.
Could you share the SQL query (including DDL) and the job graph in web UI?
DDL 和 SQL 如下:
// 创建源表 - bs_user_enroll
val bs_user_enroll_source_ddl =
"""
CREATE TABLE bs_user_enroll_source
(
ue_id
int,
ue_pt_id
int,
ue_opt_id
int,
ue_user_id
int,
ue_state
int,
ue_state_time
varchar,
ue_time
varchar,
ue_remark
varchar,
ue_signt_ime
varchar,
ue_sign_url
varchar,
ue_relieve_sign_url
varchar,
ue_sign_nbr
varchar,
ue_relieved_nbr
varchar,
ue_signup_id
int,
ue_agreement_type
int,
ue_settle_user_id
int,
ue_fdd_sign_url
varchar
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'bs_user_enroll'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(bs_user_enroll_source_ddl)
/* 创建mysql维表 */
// dim_pt_detail
val dim_pt_detail_ddl =
"""
CREATE TABLE dim_pt_detail (
`pt_id` int,
`pt_nbr` varchar,
`pt_ct_id` int,
`pt_bu_id` int,
`pt_bu_name` varchar,
`pt_user_id` int,
`pt_opt_id` int,
`pt_name` varchar,
`pt_invoice` int,
`pt_sku` int,
`pt_state` int,
`pt_state_name` varchar,
`pt_state_remark` varchar,
`pt_state_time` varchar,
`pt_time` varchar,
`pt_type` int,
`pt_type_name` varchar,
`pt_signup` int,
`pt_signup_name` varchar,
`pt_agreement` int,
`pt_incubator_flag` int,
`pt_incubator_flag_name` varchar,
`pt_cycle` int,
`pt_case_count` int,
`pt_residue_count` int,
`pt_start_time` varchar,
`pt_end_time` varchar,
`pt_root_nid` int,
`pt_billing_type` varchar,
`pt_fee_info` varchar,
`pt_user_count` int,
`pt_signup_count` int,
`pt_is_plat` int,
`pt_is_plat_name` varchar,
`pt_plat_opt_id` int,
`pt_desc` varchar,
`pt_standard` varchar,
`pt_attachment` varchar,
`pt_suc_count` int,
`pt_plat_fee` bigint,
`pt_incubator_count` int,
`pt_audit_remark` varchar,
`pt_audit_opt_list` varchar ,
`pt_settle_user_id` int,
`pt_fee_value` varchar,
`pt_star` int,
`pt_star_plat` int,
`pt_ratie` int,
`pt_is_lock` int,
`pt_is_lock_name` varchar,
`pt_account_days` varchar,
`pt_remit_channel` int,
`pt_account_type` int,
`pt_sucfee` bigint,
`pt_step` varchar ,
`pt_scene_file` varchar,
`pt_must_signed` int,
`pt_must_signed_name` varchar,
`pt_auto_get` int,
`pt_auto_get_name` varchar,
`pt_auto_sub` int,
`pt_auto_sub_name` varchar,
`pt_max_count` int,
`pt_root_id` int,
`pt_root_nbr` varchar,
`pt_root_user_id` int
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'dim_pt_detail'
)
""".format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
t_env.executeSql(dim_pt_detail_ddl)
// mp_user
val dim_mp_user_ddl =
"""
CREATE TABLE dim_mp_user (
`mp_id` int,
`mp_name` varchar,
`mp_ct_id` int,
`mp_type` int,
`mp_spec_flag` int,
`mp_time` varchar,
`mp_state` int,
`mp_state_time` varchar COMMENT '状态时间',
`mp_link_phone` varchar COMMENT '联系方式',
`mp_link_man` varchar COMMENT '联系人',
`mp_link_email` varchar COMMENT '邮箱',
`mp_link_address` varchar COMMENT '联系地址',
`mp_bank` varchar,
`mp_bank_no` varchar,
`mp_bank_name` varchar,
`mp_audit_state` int,
`mp_audit_state_time` varchar,
`mp_audit_remark` varchar,
`mp_is_cancel` int,
`mp_inc_state` tinyint,
`mp_inc_state_time` varchar,
`mp_oper_opt_id` int,
`mp_oper_aft_opt_id` int,
`mp_sale_opt_id` int,
`mp_fin_opt_id` int,
`mp_relation_user_id` int,
`mp_sign_user_id` int,
`mp_settle_user_id` int,
`mp_proxy_user_id` int,
`mp_pay_channel` int,
`mp_auth_photo_url` varchar,
`mp_top_photo_url` varchar,
`mp_contract_id` int,
`mp_industry` int,
`mp_incubator` int,
`mp_join_type` int,
`mp_fee_json` varchar,
`mp_settle_path_json` varchar,
`mp_back_fee_json` varchar,
`mp_plat_opt_id` int,
`mp_opt_id` int,
`mp_audit_opt_id` int,
`mp_remark` varchar,
`mp_video` varchar,
`mp_inc_errcount` int,
`mp_old_id` int,
`mp_self_user_id` int
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'mp_user'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(dim_mp_user_ddl)
// dim_se_detail
val dim_se_detail_ddl =
"""
CREATE TABLE dim_se_detail (
`se_id` int,
`se_name` varchar,
`se_ct_id` int,
`se_type` int,
`se_spec_flag` int,
`se_time` varchar,
`se_state` int,
`se_state_time` varchar,
`se_link_phone` varchar,
`se_link_man` varchar,
`se_link_email` varchar,
`se_link_address` varchar,
`se_bank` varchar,
`se_bank_no` varchar,
`se_bank_name` varchar,
`se_audit_state` int,
`se_audit_state_time` varchar,
`se_audit_remark` varchar,
`se_is_cancel` int,
`se_inc_state` int,
`se_inc_state_time` varchar,
`se_oper_opt_id` int,
`se_oper_aft_opt_id` int,
`se_sale_opt_id` int,
`se_fin_opt_id` int,
`se_relation_user_id` int,
`se_sign_user_id` int,
`se_settle_user_id` int,
`se_proxy_user_id` int,
`se_pay_channel` int,
`se_auth_photo_url` varchar,
`se_top_photo_url` varchar,
`se_contract_id` int,
`se_industry` int,
`se_incubator` int,
`se_join_type` int,
`se_fee_json` varchar,
`se_settle_path_json` varchar ,
`se_back_fee_json` varchar,
`se_plat_opt_id` int,
`se_opt_id` int,
`se_audit_opt_id` int,
`se_remark` varchar,
`se_video` varchar,
`se_inc_errcount` int,
`se_old_id` int,
`se_self_user_id` int,
`se_hm_name` varchar,
`se_hm_sex` varchar,
`se_hm_conceal` varchar,
`se_hm_idcard` varchar,
`se_hm_xphone` varchar,
`se_hm_phone` varchar,
`se_incubator_flag` int,
`se_xbank_no` varchar,
`se_audit_remark_name` varchar,
`se_specialflag` int,
`se_special_content` varchar,
`se_bl_url` varchar,
`se_incubator_type` varchar,
`se_special_type` varchar,
`se_incstate_name` varchar,
`se_auditstate_name` varchar
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'dim_se_detail'
)
""".format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
t_env.executeSql(dim_se_detail_ddl)
// dim_human_detail
val dim_human_detail_ddl =
"""
CREATE TABLE dim_human_detail (
`hm_id` int COMMENT '操作员id',
`hm_old_id` varchar COMMENT '原用户id',
`hm_pwd` varchar COMMENT '登录密码',
`hm_phone` varchar COMMENT '手机号码',
`hm_email` varchar COMMENT '邮箱',
`hm_name` varchar COMMENT '用户名',
`hm_head_portrait` varchar COMMENT '头像',
`hm_real_state` int COMMENT '认证状态',
`hm_state_time` varchar COMMENT '状态时间',
`hm_state` int,
`hm_time` varchar COMMENT '创建时间',
`hm_activate_flag` int COMMENT '是否激活 1是 2否',
`hm_last_login_time` varchar COMMENT '最后一次登录时间',
`hm_enc_phone` varchar,
`hm_xphone` varchar,
`hm_ic_id` int COMMENT '证件id',
`hm_readflag` varchar,
`hm_wechat_id` varchar,
`hm_sex` varchar COMMENT '性别',
`hm_conceal` varchar,
`hm_idcard` varchar COMMENT '身份证号',
`hm_specialflag` int COMMENT '是否特殊用户',
`hm_special_content` varchar COMMENT '特殊用户原因',
`hm_special_type` varchar COMMENT '特殊用户类型'
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'dim_human_detail'
)
""".format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
t_env.executeSql(dim_human_detail_ddl)
// mp_operator_dept
val dim_mp_operator_dept_ddl =
"""
CREATE TABLE dim_mp_operator_dept (
`od_id` int,
`od_user_id` int,
`od_dept_id` int,
`od_opt_id` int,
`od_father_opt_id` int,
`od_time` varchar,
`od_join_type` int,
`od_state` int,
`od_state_time` varchar,
`od_remark` varchar
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'mp_operator_dept'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(dim_mp_operator_dept_ddl)
// mp_dept
val dim_mp_dept_ddl =
"""
CREATE TABLE dim_mp_dept (
`dt_id` int,
`dt_user_id` int,
`dt_name` varchar,
`dt_fatherid` int,
`dt_name_path` varchar,
`dt_id_path` varchar,
`dt_state` int,
`dt_state_time` varchar,
`dt_root_id` int,
`dt_is_temp` int
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'mp_dept'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(dim_mp_dept_ddl)
// sys_param
val sys_param_dim_ddl =
"""
CREATE TABLE sys_param_dim (
`sp_id` varchar,
`sp_father_id` varchar,
`sp_name` varchar,
`sp_value` varchar,
`sp_ext_value` varchar,
`sp_remark` varchar
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'sys_param'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(sys_param_dim_ddl)
/* 创建mysql结果表 */
// getstartuplist
val getstartuplist_sink_ddl =
"""
CREATE TABLE `getstartuplist_sink` (
`uePtId` int,
`ueId` int,
`ueOptId` int,
`ueStateName` varchar,
`sp_remark` varchar,
`ueState` int,
`ueStateTime` varchar,
`ueSignUrl` varchar,
`ueRelieveSignUrl` varchar,
`ueRelievedNbr` varchar,
`ueSignupId` int,
`ueRemark` varchar,
`ptName` varchar,
`platIncubatorFlag` int,
`agreement` int,
`ptTypeName` varchar,
`ptEndTime` varchar,
`ptStar` int,
`ptNbr` varchar,
`ptState` int,
`ptStateName` varchar,
`pt_user_id` int,
`mpName` varchar,
`mpId` int,
`mpTime` varchar,
`mpIncState` int,
`star` bigint,
`userIncubatorFlag` bigint,
`mpBankNo` varchar,
`bankNo` varchar,
`mpStateTime` varchar,
`mpIncStateTime` varchar,
`mpAuditState` int,
`mpRemark` varchar,
`mpAuditStateTime` varchar,
`mpAuditRemark` varchar,
`mpAuditRemarkName` varchar,
`incStateName` varchar,
`incErroCount` int,
`mpIncStateName` varchar,
`mpAuditStateName` varchar,
`icPhone` varchar,
`phone` varchar,
`icName` varchar,
`spSex` varchar,
`icAge` BIGINT,
`icID` varchar,
`icCode` varchar,
`specialflag` bigint,
`specialContent` varchar,
`specialType` varchar,
`blUrl` varchar,
`dtNamePath` varchar,
`od_dept_id` int,
`mp_id` int,
`ue_state_time` varchar,
PRIMARY KEY (ueId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = '%s',
'table-name' = 'getstartuplist',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '%s',
'password' = '%s'
)
""".format(URL, USERNAME, PASSWORD)
t_env.executeSql(getstartuplist_sink_ddl)
// 注册函数
t_env.createTemporarySystemFunction("TimeUdf", classOf[TimeUdf])
// 执行sql
t_env.executeSql(
"""
|insert into getstartuplist_sink
|select
| a.ue_pt_id as uePtId
| ,a.ue_id AS ueId
| ,a.ue_opt_id AS ueOptId
| ,s1.sp_name AS ueStateName
| ,s1.sp_remark AS sp_remark
| ,a.ue_state AS ueState
| ,TimeUdf(a.ue_signt_ime) as ueStateTime
| ,a.ue_sign_url AS ueSignUrl
| ,a.ue_relieve_sign_url AS ueRelieveSignUrl
| ,a.ue_relieved_nbr AS ueRelievedNbr
| ,a.ue_signup_id AS ueSignupId
| ,COALESCE(a.ue_remark, '') AS ueRemark
| ,b.pt_name AS ptName
| ,b.pt_incubator_flag as platIncubatorFlag
| ,b.pt_agreement as agreement
| ,b.pt_type_name AS ptTypeName
| ,TimeUdf(b.pt_end_time) AS ptEndTime
| ,b.pt_star as ptStar
| ,COALESCE(b.pt_root_nbr,'') as ptNbr
| ,b.pt_state as ptState
| ,b.pt_state_name as ptStateName
| ,b.pt_user_id as pt_user_id
| ,c.mp_name AS mpName
| ,c.mp_id as mpId
| ,d.se_time as mpTime
| ,COALESCE(d.se_inc_state, 0) AS mpIncState
| ,cast(COALESCE(d.se_incubator,0) as bigint) as star
| ,cast(COALESCE(d.se_incubator_flag, 0) as bigint) as userIncubatorFlag
| ,d.se_xbank_no as mpBankNo
| ,COALESCE(d.se_bank_no,'') as bankNo
| ,TimeUdf(d.se_state_time) AS mpStateTime
| ,(case when COALESCE(d.se_inc_state, 0) =1 then TimeUdf(d.se_inc_state_time) else '' END) AS mpIncStateTime
| ,d.se_audit_state AS mpAuditState
| ,COALESCE(d.se_remark,'') as mpRemark
| ,TimeUdf(d.se_audit_state_time) AS mpAuditStateTime
| ,COALESCE(d.se_audit_remark, '') AS mpAuditRemark
| ,d.se_audit_remark_name as mpAuditRemarkName
| ,d.se_incubator_type as incStateName
| ,d.se_inc_errcount as incErroCount
| ,d.se_incstate_name AS mpIncStateName
| ,d.se_auditstate_name AS mpAuditStateName
| ,g.hm_xphone AS icPhone
| ,g.hm_phone AS phone
| ,g.hm_name AS icName
| ,g.hm_sex AS spSex
| ,(case when CHAR_LENGTH(g.hm_idcard)>14 then (YEAR(CURRENT_DATE) - cast(SUBSTR(g.hm_idcard, 7, 4) as int)) else 0 end) AS icAge
| ,g.hm_conceal AS icID
| ,g.hm_idcard as icCode
| ,cast(COALESCE(g.hm_specialflag, 0) as bigint) as specialflag
| ,g.hm_special_content as specialContent
| ,g.hm_special_type as specialType
| ,d.se_bl_url as blUrl
| ,COALESCE(f.dt_name_path, '') AS dtNamePath
| ,COALESCE(f.dt_id, 0) as od_dept_id
| ,d.se_id as mp_id
| ,a.ue_state_time
|from bs_user_enroll_source a
|left join dim_pt_detail b on a.ue_pt_id = b.pt_id
|left join dim_mp_user c on b.pt_root_user_id = c.mp_id
|left join dim_se_detail d on d.se_id = a.ue_user_id
|left join dim_human_detail g on g.hm_id = a.ue_opt_id
|left join dim_mp_operator_dept e on a.ue_opt_id = e.od_opt_id and b.pt_root_user_id = e.od_user_id
|left join dim_mp_dept f on e.od_dept_id = f.dt_id
|left join sys_param_dim s1 ON s1.sp_father_id = '41' AND a.ue_state = cast(s1.sp_value as int)
""".stripMargin
)
超过了3表联查
from flink-cdc-connectors.
Could you share the SQL query (including DDL) and the job graph in web UI?
同样的DDL和SQL,通过es connector sink到es,数据能正常写入
from flink-cdc-connectors.
Looks like this is a bug in JDBC conector sink .
from flink-cdc-connectors.
Looks like this is a bug in JDBC conector sink .
嗯嗯 谢谢哈 我先写到es
from flink-cdc-connectors.
ref FLINK-19423, It has been fixed, use flink-connector-jdbc.jar with version 1.11.3 pls
from flink-cdc-connectors.
Related Issues (20)
- I found flink-cdc supported connectors using YAML definition,its very convenient,i love it.so my question is when other pipeline is supported?for example,i want sync data form oracle to pg or pg to mysql.Thanks! HOT 1
- Flink Mongodb CDC在遇到DDL操作(eg: db.c.drop()),程序报错, 期待相应功能的支持 HOT 1
- SQL Server does not support scan. startup. mode: initial only HOT 2
- flink cdc 2.4.2版本关于报错io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4 的问题 HOT 1
- Exception in thread "main" java.lang.NoSuchMethodError: 'void org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.<init>(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink, boolean, boolean)' HOT 3
- Information that has disappeared from github appears on the web page. HOT 1
- DataStreamSource oracle ID HOT 1
- Information that has disappeared from github appears on the web page. HOT 2
- supported mysql8.4? HOT 1
- Flink 1.18.1 and 1.19.0 having build jdk 1.8 and causing incompatibilities with Java 17 HOT 3
- update operation does not take effect HOT 5
- mysql pipeline connector lost table which database and table with the same name
- Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.doris.flink.cfg.DorisOptions to field org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier.dorisOptions of type org.apache.doris.flink.cfg.DorisOptions in instance of org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier HOT 1
- flink-cdc sql sqlserver HOT 4
- [ERROR] Could not execute SQL statement. Reason: java.io.StreamCorruptedException: unexpected block data HOT 1
- lost data when gtid with Multiple transaction id on mgr cluster HOT 3
- flinkcdc3 official website operation error HOT 5
- When restoring a task from a savepoint, tables that were previously synchronized but later deleted still exist in a state HOT 1
- java.lang.IllegalStateException: Failed to send request to coordinator: com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest@5b433541 HOT 3
- addSplits only Snapshot,not foud binlog,so, incrementing data don't sync to sink,need help HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from flink-cdc-connectors.