Code Monkey home page Code Monkey logo

Comments (25)

wuchong avatar wuchong commented on June 28, 2024 2
  1. 是的。目前确实有这个问题。也是将来的一个改进方向。
  2. 支持的。具体可以看 debezium 的文档。

from flink-cdc.

jindyliu avatar jindyliu commented on June 28, 2024
  1. 是的。目前确实有这个问题。也是将来的一个改进方向。
  2. 支持的。具体可以看 debezium 的文档。

1、、这几天看了下debezium,这个snapshot reader + binlog的切换确实,相比其它cdc工具的一个亮点。目前公司内部的一些cdc 系统(自研)都是增量数据往kafka发送的模式,全量数据基本都是离线导入,kafka offsize按全量导入的时间点往前点开始消费,主要大多数的job场景都只需at_least_once的要求。

2、关于1中的效率问题除了i/o线程不能复用以外,还有体现在dump的效率上;现在debezium mysql connector无法感知后续的任务是什么,debezium都会把表的全量数据导入(写死了为select * from table),对于10亿级的表,有些任务其它加了些字段过滤条件后,大部分可能只有几百w条左右需要全量输入,效率一下可以提升满多的。后续的优化思路上,jark会在这方面做考虑么?

另外,看过你的一些分享,你概括的几种场景的flink cdc接入方式:你开源的mysql-cdc的方式确实对于没有历史包袱的公司很方便(批流统一起来,还能Exactly-once);但估计有些公司都已经是用了cdc(平台)+kafka的架构,并运行了较久且有专门人员维护,主要是中间层为做业务上的解藕。像这种cdc平台+kafka的场景,如果要用flink的对接做一些任务,除了去做一个cdc contector(类flink的canal-json 、debezium-json)做格式的适配外,全量数据的导入是个问题;就拿一个实时宽表来看,但历史数据怎么先流入flink并和后续的kafka的结合起来,任务目前语义上可以at_least_once(大不了把kafka offsize往前移多一点,比如半小时)。

关于cdc的历史数据与增量数据的按顺序拼接在一起,不知道jark可以分享flink一些做法和思路吗?
我目前没找到flink比较好的做法,尝试过往两个思路想:
a、历史数据做为一个自定义的stream, 历史数据支持条件过滤(非全表),并将历史数据转成cdc的格式流(都是insert);增量cdc数据做为别一个stream,但这里flink里没有很好的算子(不知道是不是我没找到),去协调两个stream的消费顺序,需先装历史数据先消费完再消费kafka数据……(这里先不考虑语义的Exactly-once,先只考虑业务的at_least_once)
b、像这个开源的mysql cdc connector,自定义一个source function,将jdbc的历史数据与kafka的stream数据做合并,但初看了下kafka的源代码,感觉没完全看懂之前,容易搞出bug,并且后续维护成本较高...
想问下,jark有没有一些可实践的思路或已经实践的路路推荐 ^_^

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024
  1. filter&projection pushdown 是后续的优化方向
  2. 全量数据+增量数据拼接的问题
  • 如果用的 debezium+kafka,这个问题应该也还好,应为 debezium 支持全量+增量同步到 kafka。 kafka topic 可以开启 compaction 机制,所以 kafka topic 中存了全量+增量的数据,但中间的历史过程会清理,所以存储上一般不是问题。
  • 如果上面存储还是存不下,flink 社区有些公司在尝试多层级存储的机制,即历史数据存在 s3,临近的数据存 kafka。 flink source 读这种数据时,先读 s3, 再读 kafka (s3中有存对应 kafka 中的位点,所以也能exactly once)。我们把这种叫做 hybrid source。
  • 另外一种思路是利用支持多层级存储的 message queue, 比如pulsar自动支持老数据放到 hdfs 上。 kafka 也有个 类似的 KIP 在做这样的事情。

from flink-cdc.

jindyliu avatar jindyliu commented on June 28, 2024
  1. filter&projection pushdown 是后续的优化方向
  2. 全量数据+增量数据拼接的问题
  • 如果用的 debezium+kafka,这个问题应该也还好,应为 debezium 支持全量+增量同步到 kafka。 kafka topic 可以开启 compaction 机制,所以 kafka topic 中存了全量+增量的数据,但中间的历史过程会清理,所以存储上一般不是问题。
  • 如果上面存储还是存不下,flink 社区有些公司在尝试多层级存储的机制,即历史数据存在 s3,临近的数据存 kafka。 flink source 读这种数据时,先读 s3, 再读 kafka (s3中有存对应 kafka 中的位点,所以也能exactly once)。我们把这种叫做 hybrid source。
  • 另外一种思路是利用支持多层级存储的 message queue, 比如pulsar自动支持老数据放到 hdfs 上。 kafka 也有个 类似的 KIP 在做这样的事情。

1、这个公司的cdc目前有历史包袱在,当时选型并没有用debezium+kafka ;pulsar 的Segment 机制在历史数据存储上比kafka确实好很多,只不过目前公司mq还没有引入pulsar o(╥﹏╥)o;

2、感觉jark你说的hybrid source这个更接近我们的场景,能切换读取数据,感觉可以参考下是怎么实现的,请问,flink 的hybrid source社区已经开有开源项目出来了吗?
目前我在看到flink下,看能不能从一些flink的算子的角度上,解决这个流的数据顺序与切换的能力,比如历史流与kafka的增量流做Connected 成ConnectedStreams,再做RichCoFlatMapFunction,RichCoFlatMapFunction里先缓存kafka的binlog,等全量流接收完往下流发完(全量流里,jdbc读完数据后设置一个结束flag),再发kafka流,从而模似一个debezium 支持全量+增量同步到 kafka的过程。
感觉貌似可以,但也有些问题,比如全量任务时长与缓存大小是个问题,这里感觉也只能先保证at_least_once,不过语义业务侧的任务目前可以接受。比起你说的hybrid source方案,我感觉应该是low很多,不过先抛出来(^.^)。还有个算子层面有个接口InputSelectable好像是可以选择一条记录,但实际还没用过,不知道这个InputSelectable能不能也能实现像你说的hybrid source的类似的能力。

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

hybrid source 目前社区还没有官方的出来,也还在孵化中,以前我们做过一个基于 FLIP-7 的 POC,不过也只做了一半:https://github.com/wuchong/flink-hackathon

这个全量流+增量流的连接,在DataStream 上是可以做的,不过在 SQL 上比较麻烦(没有这种语义的算子)。

from flink-cdc.

jindyliu avatar jindyliu commented on June 28, 2024

hybrid source 目前社区还没有官方的出来,也还在孵化中,以前我们做过一个基于 FLIP-7 的 POC,不过也只做了一半:https://github.com/wuchong/flink-hackathon

这个全量流+增量流的连接,在DataStream 上是可以做的,不过在 SQL 上比较麻烦(没有这种语义的算子)。

1、感谢,flink-hackathon我这边也看看,要消化下,看看思路。是拉hybird-sql-source分支还是master?

2、我现在能想到的flink + kakfa的架构下的思路,确实只想到先解决DataStream的(历史与增量数据)结合问题,然后在用DataStream看能不能转成table,再在table级别下,去看看能不能直接加载sql语句去处理业务逻辑,没做过从底层,到table,再到sql整个路径上的,不知道可不可行?看看能否解决大部分的sql语句场景。jark这方面有社区的公司尝试过吗?

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024
  1. master 分支
  2. 思路上是可行的。目前业界这方面尝试还比较少。

from flink-cdc.

SEZ9 avatar SEZ9 commented on June 28, 2024

想问,这个问题目前有解决思路了吗。 目前在实践mysql -flink cdc 增量数据同步过程中,因为一个库要同步40多张表,每个表一个cdc链接,发现都是独立的线程去执行binlog dump。这样导致binlog被重复读取,导致mysql生产压力大。目前有个两个疑惑:
1、单张表的cdc订阅是否也是从头拉取全库的binlog文件进行解析;
2、如果cdc统一只开一个任务订阅binlog ,可以不指定表结构,只把数据全部同步到kafka,再后端再去做解析,数据上能区分库和表?是否可行
image

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

@SEZ9 如果有从库的话,可以连接从库。

  1. 默认都是从最新 binlog 位置读取的。
  2. 可行。你可以自己部署 debezium 把数据写到 kafka,然后用 flink 的kafka connector + debezium-json format 来解析。

from flink-cdc.

snowwolf007cn avatar snowwolf007cn commented on June 28, 2024
  1. filter&projection pushdown 是后续的优化方向
    这个部分目前进展怎么样了?我用的1.4.0,似乎仍然没有实现pushdown?

from flink-cdc.

leonardBang avatar leonardBang commented on June 28, 2024

filter & project pushdown 只是后续的优化方向,还未开始做...

from flink-cdc.

gitdongjian avatar gitdongjian commented on June 28, 2024

目前cdc支持gtid点位消费吗?

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

@gitdongjian 支持。

from flink-cdc.

gitdongjian avatar gitdongjian commented on June 28, 2024

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

@gitdongjian , 目前还不支持自动切换,需要手动切换。比如原先的 hostname = A, 然后 A 挂了,B成为了新主,这时候 flink job 应该会挂。这时候需要 hostname 改成 B,然后将之前的作业从之前成功的 savepoint 恢复。

from flink-cdc.

gitdongjian avatar gitdongjian commented on June 28, 2024

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

@gitdongjian 有GTID 的话,就会用 GTID 来恢复。看你上面的异常信息是有 GTID的,可能是新主上面binlog被清理了,没有这个 GTID 了。可以在你的新主上执行下 show global variables like '%gtid%'看看 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 在不在你的 gtid_purged 里面。

from flink-cdc.

SEZ9 avatar SEZ9 commented on June 28, 2024

我这边也遇到了GTID 的问题,启动cdc任务,立刻报了个错,查阅了debezium文档 还没找到怎么指定gtid消费。。。有大佬可以指明一下吗。。。。
org.apache.kafka.connect.errors.ConnectException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = ON_PERMISSIVE instead of ON. Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:196)
at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1125)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:985)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: The replication sender thread cannot start in AUTO_POSITION mode: this server has GTID_MODE = ON_PERMISSIVE instead of ON.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:949)
... 3 more

from flink-cdc.

wuchong avatar wuchong commented on June 28, 2024

@SEZ9 这个错误的意思是你的 server 没有开启 GTID。 GTID_MODE 需要设置为 ON

from flink-cdc.

SEZ9 avatar SEZ9 commented on June 28, 2024

@SEZ9 这个错误的意思是你的 server 没有开启 GTID。 GTID_MODE 需要设置为 ON

疑惑~~~ 我们测试环境一直都是OFF,今天产线发现是配置的ON_PERMISSIVE,然后就这样了。。

from flink-cdc.

tcmtang avatar tcmtang commented on June 28, 2024

留言关注此问题

from flink-cdc.

noregretsforever avatar noregretsforever commented on June 28, 2024

问题1:binlog复制问题,这个 conentor (mysql-cdc)在定义源表时,定义几张表,就会建立几个链接去读binlog;感觉有点浪费,作业所需表多了几后,mysql应该i/o压力会比较大?

比如我定义两张表,连的都是同一个mysql server // 输入表test CREATE TABLE test ( idINT,nameVARCHAR(255),timeTIMESTAMP(3),status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_ask', 'table-name' = 'test' );

// 输入表status CREATE TABLE status ( id INT, name VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_task', 'table-name' = 'status' );

在mysql server侧会产生两个线程连接去对binlog进行i/o,感觉这样扩展性不高?比如在同一个mysql实例里,是不是只需要建一个i/o线程就够了?或者有使用上的建议可以规避这个可能的i/o问题吗,debezium有现成的参数可以合并吗?

2、是否支持mysql server的主从切换? 这个cdc connector 能支持mysql实例主从切换后,通过gtid进行binlog位点转换么?

@wuchong 求教~
这个问题现在是不是还没有好的解决方案,生产上还可能有整库同步的需要

from flink-cdc.

Blank-creator397 avatar Blank-creator397 commented on June 28, 2024

我也遇到了相同的问题,请问现在有解决吗0.0

from flink-cdc.

javaDer avatar javaDer commented on June 28, 2024

持续关注,求官方回复

from flink-cdc.

PatrickRen avatar PatrickRen commented on June 28, 2024

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

from flink-cdc.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.