Code Monkey home page Code Monkey logo

Comments (6)

wuchong avatar wuchong commented on August 16, 2024

Could you share the SQL query (including DDL) and the job graph in web UI?

from flink-cdc-connectors.

zqyltx2008 avatar zqyltx2008 commented on August 16, 2024

Could you share the SQL query (including DDL) and the job graph in web UI?

DDL 和 SQL 如下:
// 创建源表 - bs_user_enroll
val bs_user_enroll_source_ddl =
"""
CREATE TABLE bs_user_enroll_source (
ue_id int,
ue_pt_id int,
ue_opt_id int,
ue_user_id int,
ue_state int,
ue_state_time varchar,
ue_time varchar,
ue_remark varchar,
ue_signt_ime varchar,
ue_sign_url varchar,
ue_relieve_sign_url varchar,
ue_sign_nbr varchar,
ue_relieved_nbr varchar,
ue_signup_id int,
ue_agreement_type int,
ue_settle_user_id int,
ue_fdd_sign_url varchar
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '%s',
'port' = '%s',
'username' = '%s',
'password' = '%s',
'database-name' = '%s',
'table-name' = 'bs_user_enroll'
)
""".format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
t_env.executeSql(bs_user_enroll_source_ddl)

  /*  创建mysql维表  */
  // dim_pt_detail
  val dim_pt_detail_ddl =
    """
      CREATE TABLE dim_pt_detail (
          `pt_id` int,
          `pt_nbr` varchar,
          `pt_ct_id` int,
          `pt_bu_id` int,
          `pt_bu_name` varchar,
          `pt_user_id` int,
          `pt_opt_id` int,
          `pt_name` varchar,
          `pt_invoice` int,
          `pt_sku` int,
          `pt_state` int,
          `pt_state_name` varchar,
          `pt_state_remark` varchar,
          `pt_state_time` varchar,
          `pt_time` varchar,
          `pt_type` int,
          `pt_type_name` varchar,
          `pt_signup` int,
          `pt_signup_name` varchar,
          `pt_agreement` int,
          `pt_incubator_flag` int,
          `pt_incubator_flag_name` varchar,
          `pt_cycle` int,
          `pt_case_count` int,
          `pt_residue_count` int,
          `pt_start_time` varchar,
          `pt_end_time` varchar,
          `pt_root_nid` int,
          `pt_billing_type` varchar,
          `pt_fee_info` varchar,
          `pt_user_count` int,
          `pt_signup_count` int,
          `pt_is_plat` int,
          `pt_is_plat_name` varchar,
          `pt_plat_opt_id` int,
          `pt_desc` varchar,
          `pt_standard` varchar,
          `pt_attachment` varchar,
          `pt_suc_count` int,
          `pt_plat_fee` bigint,
          `pt_incubator_count` int,
          `pt_audit_remark` varchar,
          `pt_audit_opt_list` varchar ,
          `pt_settle_user_id` int,
          `pt_fee_value` varchar,
          `pt_star` int,
          `pt_star_plat` int,
          `pt_ratie` int,
          `pt_is_lock` int,
          `pt_is_lock_name` varchar,
          `pt_account_days` varchar,
          `pt_remit_channel` int,
          `pt_account_type` int,
          `pt_sucfee` bigint,
          `pt_step` varchar ,
          `pt_scene_file` varchar,
          `pt_must_signed` int,
          `pt_must_signed_name` varchar,
          `pt_auto_get` int,
          `pt_auto_get_name` varchar,
          `pt_auto_sub` int,
          `pt_auto_sub_name` varchar,
          `pt_max_count` int,
          `pt_root_id` int,
          `pt_root_nbr` varchar,
          `pt_root_user_id` int
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'dim_pt_detail'
      )
      """.format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
  t_env.executeSql(dim_pt_detail_ddl)

  // mp_user
  val dim_mp_user_ddl =
    """
      CREATE TABLE dim_mp_user (
        `mp_id` int,
        `mp_name` varchar,
        `mp_ct_id` int,
        `mp_type` int,
        `mp_spec_flag` int,
        `mp_time` varchar,
        `mp_state` int,
        `mp_state_time` varchar COMMENT '状态时间',
        `mp_link_phone` varchar COMMENT '联系方式',
        `mp_link_man` varchar COMMENT '联系人',
        `mp_link_email` varchar COMMENT '邮箱',
        `mp_link_address` varchar COMMENT '联系地址',
        `mp_bank` varchar,
        `mp_bank_no` varchar,
        `mp_bank_name` varchar,
        `mp_audit_state` int,
        `mp_audit_state_time` varchar,
        `mp_audit_remark` varchar,
        `mp_is_cancel` int,
        `mp_inc_state` tinyint,
        `mp_inc_state_time` varchar,
        `mp_oper_opt_id` int,
        `mp_oper_aft_opt_id` int,
        `mp_sale_opt_id` int,
        `mp_fin_opt_id` int,
        `mp_relation_user_id` int,
        `mp_sign_user_id` int,
        `mp_settle_user_id` int,
        `mp_proxy_user_id` int,
        `mp_pay_channel` int,
        `mp_auth_photo_url` varchar,
        `mp_top_photo_url` varchar,
        `mp_contract_id` int,
        `mp_industry` int,
        `mp_incubator` int,
        `mp_join_type` int,
        `mp_fee_json` varchar,
        `mp_settle_path_json` varchar,
        `mp_back_fee_json` varchar,
        `mp_plat_opt_id` int,
        `mp_opt_id` int,
        `mp_audit_opt_id` int,
        `mp_remark` varchar,
        `mp_video` varchar,
        `mp_inc_errcount` int,
        `mp_old_id` int,
        `mp_self_user_id` int
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'mp_user'
      )
      """.format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
  t_env.executeSql(dim_mp_user_ddl)

  // dim_se_detail
  val dim_se_detail_ddl =
    """
      CREATE TABLE dim_se_detail (
        `se_id` int,
        `se_name` varchar,
        `se_ct_id` int,
        `se_type` int,
        `se_spec_flag` int,
        `se_time` varchar,
        `se_state` int,
        `se_state_time` varchar,
        `se_link_phone` varchar,
        `se_link_man` varchar,
        `se_link_email` varchar,
        `se_link_address` varchar,
        `se_bank` varchar,
        `se_bank_no` varchar,
        `se_bank_name` varchar,
        `se_audit_state` int,
        `se_audit_state_time` varchar,
        `se_audit_remark` varchar,
        `se_is_cancel` int,
        `se_inc_state` int,
        `se_inc_state_time` varchar,
        `se_oper_opt_id` int,
        `se_oper_aft_opt_id` int,
        `se_sale_opt_id` int,
        `se_fin_opt_id` int,
        `se_relation_user_id` int,
        `se_sign_user_id` int,
        `se_settle_user_id` int,
        `se_proxy_user_id` int,
        `se_pay_channel` int,
        `se_auth_photo_url` varchar,
        `se_top_photo_url` varchar,
        `se_contract_id` int,
        `se_industry` int,
        `se_incubator` int,
        `se_join_type` int,
        `se_fee_json` varchar,
        `se_settle_path_json` varchar ,
        `se_back_fee_json` varchar,
        `se_plat_opt_id` int,
        `se_opt_id` int,
        `se_audit_opt_id` int,
        `se_remark` varchar,
        `se_video` varchar,
        `se_inc_errcount` int,
        `se_old_id` int,
        `se_self_user_id` int,
        `se_hm_name` varchar,
        `se_hm_sex` varchar,
        `se_hm_conceal` varchar,
        `se_hm_idcard` varchar,
        `se_hm_xphone` varchar,
        `se_hm_phone` varchar,
        `se_incubator_flag` int,
        `se_xbank_no` varchar,
        `se_audit_remark_name` varchar,
        `se_specialflag` int,
        `se_special_content` varchar,
        `se_bl_url` varchar,
        `se_incubator_type` varchar,
        `se_special_type` varchar,
        `se_incstate_name` varchar,
        `se_auditstate_name` varchar
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'dim_se_detail'
      )
      """.format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
  t_env.executeSql(dim_se_detail_ddl)

  // dim_human_detail
  val dim_human_detail_ddl =
    """
      CREATE TABLE dim_human_detail (
        `hm_id` int COMMENT '操作员id',
        `hm_old_id` varchar COMMENT '原用户id',
        `hm_pwd` varchar COMMENT '登录密码',
        `hm_phone` varchar COMMENT '手机号码',
        `hm_email` varchar COMMENT '邮箱',
        `hm_name` varchar COMMENT '用户名',
        `hm_head_portrait` varchar COMMENT '头像',
        `hm_real_state` int COMMENT '认证状态',
        `hm_state_time` varchar COMMENT '状态时间',
        `hm_state` int,
        `hm_time` varchar COMMENT '创建时间',
        `hm_activate_flag` int COMMENT '是否激活 1是 2否',
        `hm_last_login_time` varchar COMMENT '最后一次登录时间',
        `hm_enc_phone` varchar,
        `hm_xphone` varchar,
        `hm_ic_id` int COMMENT '证件id',
        `hm_readflag` varchar,
        `hm_wechat_id` varchar,
        `hm_sex` varchar COMMENT '性别',
        `hm_conceal` varchar,
        `hm_idcard` varchar COMMENT '身份证号',
        `hm_specialflag` int COMMENT '是否特殊用户',
        `hm_special_content` varchar COMMENT '特殊用户原因',
        `hm_special_type` varchar COMMENT '特殊用户类型'
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'dim_human_detail'
      )
      """.format(HOST, PORT, USERNAME, PASSWORD, DATABASE)
  t_env.executeSql(dim_human_detail_ddl)

  // mp_operator_dept
  val dim_mp_operator_dept_ddl =
    """
      CREATE TABLE dim_mp_operator_dept (
        `od_id` int,
        `od_user_id` int,
        `od_dept_id` int,
        `od_opt_id` int,
        `od_father_opt_id` int,
        `od_time` varchar,
        `od_join_type` int,
        `od_state` int,
        `od_state_time` varchar,
        `od_remark` varchar
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'mp_operator_dept'
      )
      """.format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
  t_env.executeSql(dim_mp_operator_dept_ddl)

  // mp_dept
  val dim_mp_dept_ddl =
    """
      CREATE TABLE dim_mp_dept (
        `dt_id` int,
        `dt_user_id` int,
        `dt_name` varchar,
        `dt_fatherid` int,
        `dt_name_path` varchar,
        `dt_id_path` varchar,
        `dt_state` int,
        `dt_state_time` varchar,
        `dt_root_id` int,
        `dt_is_temp` int
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'mp_dept'
      )
      """.format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
  t_env.executeSql(dim_mp_dept_ddl)

  // sys_param
  val sys_param_dim_ddl =
    """
      CREATE TABLE sys_param_dim (
        `sp_id` varchar,
        `sp_father_id` varchar,
        `sp_name` varchar,
        `sp_value` varchar,
        `sp_ext_value` varchar,
        `sp_remark` varchar
      )  WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '%s',
          'port' = '%s',
          'username' = '%s',
          'password' = '%s',
          'database-name' = '%s',
          'table-name' = 'sys_param'
      )
      """.format(SOURCE_HOST, SOURCE_PORT, SOURCE_USERNAME, SOURCE_PASSWORD, SOURCE_DATABASE)
  t_env.executeSql(sys_param_dim_ddl)

  /* 创建mysql结果表 */
  // getstartuplist
  val getstartuplist_sink_ddl =
    """
      CREATE TABLE `getstartuplist_sink` (
        `uePtId` int,
        `ueId` int,
        `ueOptId` int,
        `ueStateName` varchar,
        `sp_remark` varchar,
        `ueState` int,
        `ueStateTime` varchar,
        `ueSignUrl` varchar,
        `ueRelieveSignUrl` varchar,
        `ueRelievedNbr` varchar,
        `ueSignupId` int,
        `ueRemark` varchar,
        `ptName` varchar,
        `platIncubatorFlag` int,
        `agreement` int,
        `ptTypeName` varchar,
        `ptEndTime` varchar,
        `ptStar` int,
        `ptNbr` varchar,
        `ptState` int,
        `ptStateName` varchar,
        `pt_user_id` int,
        `mpName` varchar,
        `mpId` int,
        `mpTime` varchar,
        `mpIncState` int,
        `star` bigint,
        `userIncubatorFlag` bigint,
        `mpBankNo` varchar,
        `bankNo` varchar,
        `mpStateTime` varchar,
        `mpIncStateTime` varchar,
        `mpAuditState` int,
        `mpRemark` varchar,
        `mpAuditStateTime` varchar,
        `mpAuditRemark` varchar,
        `mpAuditRemarkName` varchar,
        `incStateName` varchar,
        `incErroCount` int,
        `mpIncStateName` varchar,
        `mpAuditStateName` varchar,
        `icPhone` varchar,
        `phone` varchar,
        `icName` varchar,
        `spSex` varchar,
        `icAge` BIGINT,
        `icID` varchar,
        `icCode` varchar,
        `specialflag` bigint,
        `specialContent` varchar,
        `specialType` varchar,
        `blUrl` varchar,
        `dtNamePath` varchar,
        `od_dept_id` int,
        `mp_id` int,
        `ue_state_time` varchar,
        PRIMARY KEY (ueId) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = '%s',
          'table-name' = 'getstartuplist',
          'driver' = 'com.mysql.cj.jdbc.Driver',
          'username' = '%s',
          'password' = '%s'
      )
      """.format(URL, USERNAME, PASSWORD)
  t_env.executeSql(getstartuplist_sink_ddl)

  // 注册函数
  t_env.createTemporarySystemFunction("TimeUdf", classOf[TimeUdf])

  // 执行sql
  t_env.executeSql(
    """
      |insert into getstartuplist_sink
      |select
      |     a.ue_pt_id as uePtId
      |     ,a.ue_id AS ueId
      |     ,a.ue_opt_id AS ueOptId
      |     ,s1.sp_name AS ueStateName
      |     ,s1.sp_remark AS sp_remark
      |     ,a.ue_state AS ueState
      |     ,TimeUdf(a.ue_signt_ime) as ueStateTime
      |     ,a.ue_sign_url AS ueSignUrl
      |     ,a.ue_relieve_sign_url AS ueRelieveSignUrl
      |     ,a.ue_relieved_nbr AS ueRelievedNbr
      |     ,a.ue_signup_id AS ueSignupId
      |     ,COALESCE(a.ue_remark, '') AS ueRemark
      |     ,b.pt_name AS ptName
      |     ,b.pt_incubator_flag as platIncubatorFlag
      |     ,b.pt_agreement as agreement
      |     ,b.pt_type_name AS ptTypeName
      |     ,TimeUdf(b.pt_end_time) AS ptEndTime
      |     ,b.pt_star as ptStar
      |     ,COALESCE(b.pt_root_nbr,'') as ptNbr
      |     ,b.pt_state as ptState
      |     ,b.pt_state_name as ptStateName
      |     ,b.pt_user_id as pt_user_id
      |     ,c.mp_name AS mpName
      |     ,c.mp_id as mpId
      |     ,d.se_time as mpTime
      |     ,COALESCE(d.se_inc_state, 0) AS mpIncState
      |     ,cast(COALESCE(d.se_incubator,0) as bigint) as star
      |     ,cast(COALESCE(d.se_incubator_flag, 0) as bigint) as userIncubatorFlag
      |     ,d.se_xbank_no as mpBankNo
      |     ,COALESCE(d.se_bank_no,'') as bankNo
      |     ,TimeUdf(d.se_state_time) AS mpStateTime
      |     ,(case when COALESCE(d.se_inc_state, 0) =1 then TimeUdf(d.se_inc_state_time) else '' END) AS mpIncStateTime
      |     ,d.se_audit_state AS mpAuditState
      |     ,COALESCE(d.se_remark,'') as mpRemark
      |     ,TimeUdf(d.se_audit_state_time) AS mpAuditStateTime
      |     ,COALESCE(d.se_audit_remark, '') AS mpAuditRemark
      |     ,d.se_audit_remark_name as mpAuditRemarkName
      |     ,d.se_incubator_type as incStateName
      |     ,d.se_inc_errcount as incErroCount
      |     ,d.se_incstate_name AS mpIncStateName
      |     ,d.se_auditstate_name AS mpAuditStateName
      |     ,g.hm_xphone AS icPhone
      |     ,g.hm_phone AS phone
      |     ,g.hm_name AS icName
      |     ,g.hm_sex AS spSex
      |     ,(case when CHAR_LENGTH(g.hm_idcard)>14 then (YEAR(CURRENT_DATE) - cast(SUBSTR(g.hm_idcard, 7, 4) as int)) else 0 end) AS icAge
      |     ,g.hm_conceal AS icID
      |     ,g.hm_idcard as icCode
      |     ,cast(COALESCE(g.hm_specialflag, 0) as bigint) as specialflag
      |     ,g.hm_special_content as specialContent
      |     ,g.hm_special_type as specialType
      |     ,d.se_bl_url as blUrl
      |     ,COALESCE(f.dt_name_path, '') AS dtNamePath
      |     ,COALESCE(f.dt_id, 0) as od_dept_id
      |     ,d.se_id as mp_id
      |     ,a.ue_state_time
      |from bs_user_enroll_source a
      |left join dim_pt_detail b on a.ue_pt_id = b.pt_id
      |left join dim_mp_user c on b.pt_root_user_id = c.mp_id
      |left join dim_se_detail d on d.se_id = a.ue_user_id
      |left join dim_human_detail g on g.hm_id = a.ue_opt_id
      |left join dim_mp_operator_dept e on a.ue_opt_id = e.od_opt_id and b.pt_root_user_id = e.od_user_id
      |left join dim_mp_dept f on e.od_dept_id = f.dt_id
      |left join sys_param_dim s1 ON s1.sp_father_id = '41' AND  a.ue_state = cast(s1.sp_value as int)
    """.stripMargin
  )

web UI job截图如下:
image
image

超过了3表联查

from flink-cdc-connectors.

zqyltx2008 avatar zqyltx2008 commented on August 16, 2024

Could you share the SQL query (including DDL) and the job graph in web UI?

同样的DDL和SQL,通过es connector sink到es,数据能正常写入

from flink-cdc-connectors.

wuchong avatar wuchong commented on August 16, 2024

Looks like this is a bug in JDBC conector sink .

from flink-cdc-connectors.

zqyltx2008 avatar zqyltx2008 commented on August 16, 2024

Looks like this is a bug in JDBC conector sink .

嗯嗯 谢谢哈 我先写到es

from flink-cdc-connectors.

jjiey avatar jjiey commented on August 16, 2024

ref FLINK-19423, It has been fixed, use flink-connector-jdbc.jar with version 1.11.3 pls

from flink-cdc-connectors.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.