Code Monkey home page Code Monkey logo

doris-flink-connector's Introduction

Flink Connector for Apache Doris

License Join the Doris Community at Slack

Flink Doris Connector

Flink Doris Connector now support flink version from 1.11 to 1.19.

If you wish to contribute or use a connector from flink 1.13 (and earlier), please use the branch-for-flink-before-1.13

More information about compilation and usage, please visit Flink Doris Connector

License

Apache License, Version 2.0

How to Build

You need to copy customer_env.sh.tpl to customer_env.sh before build and you need to configure it before build.

git clone [email protected]:apache/doris-flink-connector.git
cd doris-flink-connector/flink-doris-connector
./build.sh

how-to-build

Code Style

Code Formatting

You need to install the google-java-format plugin. Spotless together with google-java-format is used to format the codes.

It is recommended to automatically format your code by applying the following settings:

  1. Go to "Settings" → "Other Settings" → "google-java-format Settings".
  2. Tick the checkbox to enable the plugin.
  3. Change the code style to "Android Open Source Project (AOSP) style".
  4. Go to "Settings" → "Tools" → "Actions on Save".
  5. Under "Formatting Actions", select "Optimize imports" and "Reformat file".
  6. From the "All file types list" next to "Reformat code", select "Java".

For earlier IntelliJ IDEA versions, the step 4 to 7 will be changed as follows.

  • 4.Go to "Settings" → "Other Settings" → "Save Actions".
  • 5.Under "General", enable your preferred settings for when to format the code, e.g. "Activate save actions on save".
  • 6.Under "Formatting Actions", select "Optimize imports" and "Reformat file".
  • 7.Under "File Path Inclusions", add an entry for .*\.java to avoid formatting other file types. Then the whole project could be formatted by command mvn spotless:apply.

Checkstyle

Checkstyle is used to enforce static coding guidelines.

  1. Go to "Settings" → "Tools" → "Checkstyle".
  2. Set "Scan Scope" to "Only Java sources (including tests)".
  3. For "Checkstyle Version" select "8.14".
  4. Under "Configuration File" click the "+" icon to add a new configuration.
  5. Set "Description" to "doris-flink-connector".
  6. Select "Use a local Checkstyle file" and link it to the file tools/maven/checkstyle.xml which is located within your cloned repository.
  7. Select "Store relative to project location" and click "Next".
  8. Configure the property checkstyle.suppressions.file with the value suppressions.xml and click "Next".
  9. Click "Finish".
  10. Select "doris-flink-connector" as the only active configuration file and click "Apply".

You can now import the Checkstyle configuration for the Java code formatter.

  1. Go to "Settings" → "Editor" → "Code Style" → "Java".
  2. Click the gear icon next to "Scheme" and select "Import Scheme" → "Checkstyle Configuration".
  3. Navigate to and select tools/maven/checkstyle.xml located within your cloned repository.

Then you could click "View" → "Tool Windows" → "Checkstyle" and find the "Check Module" button in the opened tool window to validate checkstyle. Or you can use the command mvn clean compile checkstyle:checkstyle to validate.

Report issues or submit pull request

If you find any bugs, feel free to file a GitHub issue or fix it by submitting a pull request.

Contact Us

Contact us through the following mailing list.

Name Scope
[email protected] Development-related discussions Subscribe Unsubscribe Archives

Links

doris-flink-connector's People

Contributors

bingquanzhao avatar caoliang-web avatar codeantg avatar cygnusdark avatar dinggege1024 avatar dongliang-0 avatar gnehil avatar gogowen avatar hf200012 avatar huyuanfeng2018 avatar ikazuchi-akira avatar jnsimba avatar legendtkl avatar liuyaolin avatar lsy3993 avatar madongz avatar morningman avatar morningman-cmy avatar murong00 avatar myasuka avatar qg-lin avatar smallhibiscus avatar thehuldra avatar vinlee19 avatar wolfboys avatar wunan1210 avatar wuwenchi avatar xhmz avatar yangzhg avatar zy-kkk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

doris-flink-connector's Issues

[Enhancement] build project improvement

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Currently, in the build.sh script. env checks such as thrift, java, maven, etc., needs to be optimized.

  1. thrift needs to verify the version is 0.13.0
  2. mvnw support(if not install maven)
  3. check java env need to improvement
  4. release process need to replace versions, see doc,this steps can be improved

Solution

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] StreamLoadException: stream load error: failed to stream load data with label: flink_connector_20221017_170854_ef4f558e2bf44994a469c148bdd0c7b1

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.13.6

What's Wrong?

2022-10-17 17:09:08.886 WARN 19180 --- [Unnamed (1/1)#0] o.a.doris.flink.table.DorisStreamLoad : failed to stream load data with label: flink_connector_20221017_170905_802f8e174c884bd9805c8bc60c72ac50

org.apache.http.conn.HttpHostConnectException: Connect to 172.20.0.3:8040 [/172.20.0.3] failed: Connection refused: connect
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at org.apache.doris.flink.table.DorisStreamLoad.loadBatch(DorisStreamLoad.java:136)
at org.apache.doris.flink.table.DorisStreamLoad.load(DorisStreamLoad.java:99)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.flush(DorisDynamicOutputFormat.java:309)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.writeRecord(DorisDynamicOutputFormat.java:223)
at org.apache.doris.flink.cfg.GenericDorisSinkFunction.invoke(GenericDorisSinkFunction.java:51)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema.deserialize(JsonDebeziumDeserializationSchema.java:74)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
... 45 common frames omitted

2022-10-17 17:09:08.888 INFO 19180 --- [Unnamed (1/1)#0] o.a.doris.flink.table.DorisStreamLoad : Streamload Response:{"status":-1,"respMsg":"Connect to 172.20.0.3:8040 [/172.20.0.3] failed: Connection refused: connect","respContent":"failed to stream load data with label: flink_connector_20221017_170905_802f8e174c884bd9805c8bc60c72ac50"}
2022-10-17 17:09:08.889 ERROR 19180 --- [Unnamed (1/1)#0] o.a.d.f.table.DorisDynamicOutputFormat : doris sink error, retry times = 2

org.apache.doris.flink.exception.StreamLoadException: stream load error: failed to stream load data with label: flink_connector_20221017_170905_802f8e174c884bd9805c8bc60c72ac50
at org.apache.doris.flink.table.DorisStreamLoad.load(DorisStreamLoad.java:102)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.flush(DorisDynamicOutputFormat.java:309)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.writeRecord(DorisDynamicOutputFormat.java:223)
at org.apache.doris.flink.cfg.GenericDorisSinkFunction.invoke(GenericDorisSinkFunction.java:51)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema.deserialize(JsonDebeziumDeserializationSchema.java:74)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:745)

2022-10-17 17:09:08.890 INFO 19180 --- [Unnamed (1/1)#0] o.a.d.f.table.DorisDynamicOutputFormat : Send request to Doris FE 'http://192.168.77.210:8030/api/backends?is_alive=true' with user 'root'.
2022-10-17 17:09:08.928 INFO 19180 --- [Unnamed (1/1)#0] o.a.d.f.table.DorisDynamicOutputFormat : Backend Info:{"backends":[{"ip":"172.20.0.3","http_port":8040,"is_alive":true}]}
2022-10-17 17:09:08.929 WARN 19180 --- [Unnamed (1/1)#0] o.a.d.f.table.DorisDynamicOutputFormat : streamload error,switch be: http://172.20.0.3:8040/api/ritrac/server_monitor_info/_stream_load?

org.apache.doris.flink.exception.StreamLoadException: stream load error: failed to stream load data with label: flink_connector_20221017_170905_802f8e174c884bd9805c8bc60c72ac50
at org.apache.doris.flink.table.DorisStreamLoad.load(DorisStreamLoad.java:102)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.flush(DorisDynamicOutputFormat.java:309)
at org.apache.doris.flink.table.DorisDynamicOutputFormat.writeRecord(DorisDynamicOutputFormat.java:223)
at org.apache.doris.flink.cfg.GenericDorisSinkFunction.invoke(GenericDorisSinkFunction.java:51)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:143)
at com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema.deserialize(JsonDebeziumDeserializationSchema.java:74)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:118)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:54)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:745)

What You Expected?

flink cdc导入docker部署的doris时,出现以上错误

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] job hang from long time because of no timeout set when flink sql validate

Search before asking

  • I had searched in the issues and found no similar issues.

Version

flink 1.15

What's Wrong?

when we use this connector to write data into doris, it will hang for a long time because of timeout not set

What You Expected?

fail with timeout

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] write data to two doris at the same time

Search before asking

  • I had searched in the issues and found no similar issues.

Description

How to change the code so that this connector can write data to two Doris clusters at the same time (to achieve master-slave cluster data synchronization), can you provide some ideas?

Use case

to achieve master-slave cluster data synchronization

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Improvement] Bump Flink CDC MySQL Connector version to 2.4.0

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Flink CDC 2.4 has been released at Jul 25, which provides many new features, for example, automatically close idle readers at the end of the snapshot phase. I have already upgraded part of our online streaming ETL jobs based on Flink CDC 2.4 and Flink Doris Connector 1.4 framework, which look good so far.
In this process, I solved some problems, and want to contribute my code to community.

Solution

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] Error: Unsupported OS type: msys when i buid in windows os

Search before asking

  • I had searched in the issues and found no similar issues.

Version

<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>

What's Wrong?

i have copy custom_env.sh.tpl to custom_env.sh, when i use " sh build.sh --flink 1.12.0 --scala 2.11" to build the project,i got an error: "Error: Unsupported OS type: msys" my os is windows 10,and i user git bash to execute the command.

What You Expected?

someone can help me?

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] (concurrency) Thread safety problem of loading data to Doris

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.1

What's Wrong?

When I used flink-doris-connector_1.14_2.12(1.1.1) to load data to Doris, Checkpoint failed due to java.lang.InterruptedException which was thrown by exception checker. But Stream load job in Doris were normal.

In the source code, exception checker will invoke method DorisStreamLoad.handlePreCommitResponse() if variable loading = true, in method DorisWriter.prepareCommit(), variable loading will be set to false to disable exception checker, and then invokes method DorisStreamLoad.handlePreCommitResponse(). But method DorisStreamLoad.handlePreCommitResponse() may be invoked by exception checker thread before loading is set to false by main thread. So exception checker thread and main thread will invokes DorisStreamLoad.handlePreCommitResponse() at the same time. So this may cause concurrent access problems.

What You Expected?

Exception checker thread and main thread Invokes method handlePreCommitResponse() synchronously

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] CDCTools job cannot restart from previous checkpoint/savepoint when multiple tables in sync

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.4.0

What's Wrong?

CDCTools job cannot restart from previous checkpoint/savepoint when multiple tables in sync.

What You Expected?

The job should be able to use previous checkpoint/savepoint to restart.

How to Reproduce?

To reproduce, sync multiple tables, for example:
--including-tables test|test2|test_clone|test_clone2

The reason is that Flink is a stateful application, and it is recommended by official that each stateful operator should have a unique uid.
I fixed this issue by using table name as the uid of its associated sink operators.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] Multiple doris sink writer error when disable 2pc

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.0

What's Wrong?

One source stream write to two doris sink on one flink application. All doris sink enable2pc is false, but have one label prefix.
error stack:
image

What You Expected?

application not error

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] Can one job works for multiples tables?

Search before asking

  • I had searched in the issues and found no similar issues.

Description

It's wasterful for my system to handle every table with their alone slot, because my tables is not so big and change not so fast.

My temp solution is creating custom streamLoad and doriswriter. And it works for all Listening tables .

Use case

one job can works for multiples tables

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] QueryPlan deserialization error if there're exceptions

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.0

What's Wrong?

Deserialization error on QueryPlan with exceptions.

截屏2022-09-25 12 46 49

What You Expected?

Fix the deserialization error and correct the error stack.

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

建议集成Postgres数据库

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Postgres是广泛使用的关系型数据库 , flinkcdc也已适配Postgres的连接. doris可以增加配套使用.使得生态更完善

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] retry infinite 2pc

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.15
sink.max-retries = 1

What's Wrong?

retry infinite 2pc


2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 (019990f47bcb3edc3ef9a00232143186).
2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 019990f47bcb3edc3ef9a00232143186.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from CREATED to DEPLOYING.
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) [DEPLOYING].
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3d7d4ae1
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,021 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:49,021 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)

2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2).
2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 0eedf3500794baa86ab2fdfcbefeb1c2.
2022-11-18 11:53:50,025 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from CREATED to DEPLOYING.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) [DEPLOYING].
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3c9058a4
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,033 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:50,033 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)

What You Expected?

don't retry infinite 2pc which not found transaction in doris be.

How to Reproduce?

You can reproduce If you killed doris backend during sinking data through flink.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Release Note 1.1.1

Feature

Add JsonDebeziumSchemaSerializer for schema change with datastream #64

Thanks

Thanks to everyone who has contributed to this release:
@JNSimba

Release Note 1.1.0

Feature

  1. Refactoring DorisSouce based on FLIP-27

  2. With the two-phase commit of Stream Load in Doris 1.x version, Flink Doris Connector implements Exactly Once semantics

  3. Support streaming loading in json format, support read_json_by_line instead of strip_outer_array

Bugfix

  1. fix flink schema and doris schema column mappingn

  2. fix flink date and timestamp type not mapping

  3. Fix row type decimal convert bug

Note: We will no longer support Flink versions prior to 1.14 in this version, and we will provide long-term support versions for subsequent versions of Flink based on this version.

Thanks

Thanks to everyone who has contributed to this release:

@aiwenmo
@bridgeDream
@cxzl25
@gj-zhang
@hf200012
@JNSimba
@liuyaolin
@madongz
@morningman
@stalary
@yangzhg
@Yankee24

[Bug] The log does not show the problem

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.0

What's Wrong?

The log does not show the problem

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.

What You Expected?

The log shows normal

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Proposal] Make datastream connector more user-friendly when constructing a DorisSource

Search before asking

  • I had searched in the issues and found no similar issues.

Description

When constructing a DorisSink, we can directly call DorisSink’s builder method since it has a static inner Builder.

However when it comes to constructing a DorisSource, unlike the way used above, the builder method needed here is from a builder class named DorisSourceBuilder which is seperated from DorisSource

Would it be more friendly to users if we add a static inner builder class to DorisSource too so that users may follow the same pattern when creating Doris sources&sinks?

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] Row type decimal convert bug

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.0.0

What's Wrong?

Scale conversion error when querying decimal type field. For example, 30 is converted to 0.03.

What You Expected?

30 is converted to 30.

How to Reproduce?

Query decimal type field.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] Stream load fails when there's no data

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.0

What's Wrong?

Stream load fails if there's no data to flush.

截屏2022-08-09 17 41 43

We could fix this by skip the flush if there's no pending data.

What You Expected?

No failure when there's no input data.

How to Reproduce?

Submit a job with an empty data source and Doris sink.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] the character type of postgres and the char type of doris have different meanings of length.

Search before asking

  • I had searched in the issues and found no similar issues.

Version

Apache Doris: 1.2.7.1
flink-doris-connector: 1.17-1.5.0-SNAPSHOT
PostgreSQL: 12

What's Wrong?

In postgres, character(n), n represents the number of characters, while in doris, char(n), n represents the number of bytes, so direct mapping will cause the length of input is too long than schema.

What You Expected?

use varchar type mapping logic for postgres character type

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] varchar length is not enough while emoj is in varchar column defing autimatically

Search before asking

  • I had searched in the issues and found no similar issues.

Version

Doris 1.2.5
Mysql: 5.7+

What's Wrong?

While mysql charset is utf8mb4, it suppoerts emoj, one emoj is encoded as four character,but program only deal with Chinese content situation in which one Chinese word is encoded as three character.

Error msg

Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [INTERNAL_ERROR]too many filtered rows, see more in http://127.0.0.1:8040/api/_load_error_log?file=__shard_0/error_log_insert_stmt_c54018ca99a31941-7cfcd16524d4948b_c54018ca99a31941_7cfcd16524d4948b
	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:158)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$SinkWriterV1Adapter.prepareCommit(SinkV1Adapter.java:151)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:300)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
	... 13 more

url above content

curl http://127.0.0.1:8040/api/_load_error_log?file=__shard_0/error_log_insert_stmt_c54018ca99a31941-7cfcd16524d4948b_c54018ca99a31941_7cfcd16524d4948b 

Reason: column_name[str], the length of input is too long than schema. first 32 bytes of input str: [😊] schema length: 3; actual length: 4; . src line [];

What You Expected?

varchar content may contains emoj ,so the table auto created by program should extend the column length to 4 times origin column length ragher than 3

How to Reproduce?

Mysql table create ddl:

CREATE TABLE `emoj_str` (
  `id` int(11) NOT NULL,
  `str` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

Error data:

insert into emoj_str value (1,'😊');

Running command line:

flink116 run \
-t yarn-per-job \
-Dyarn.application.name=mysql-sync-database \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
/data/****/flink-doris-connector-1.16-1.4.0.jar \
mysql-sync-database
--database "test_emoji"
--mysql-conf hostname=127.0.0.1
--mysql-conf username="user"
--mysql-conf password="password"
--mysql-conf database-name="test_emoji"
--mysql-conf port=1111
--mysql-conf scan.startup.mode="initial"
--including-tables "emoj_str"
--sink-conf fenodes=127.0.0.1:8030
--sink-conf username=user
--sink-conf password=password
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030
--table-conf replication_num=3
--job-name test

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Enhancement] DorisRowConverter need support TIMEZONE

Search before asking

  • I had searched in the issues and found no similar issues.

Description

What's Wrong?
DorisRowConverter need support TIMEZONE

What You Expected?
When TIMESTAMP_WITH_TIME_ZONE, internal conver with timezone.
image

Solution

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] [Connector] Empty readOptions in the Build() method of setDorisReadOptions

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.2.4.1

What's Wrong?

In the DorisExecutionOptions constructor mode, some code is redundant. When building DorisSink, the dorisReadOptions property is nulled in the Build() method. The setDorisReadOptions(DorisReadOptions.builder().build()) configuration is not required when creating a connection, but it is not used in the build() method of creating DorisSource. Therefore, the above configuration must be added when creating a DorisSource connection. However, this configuration only provides DorisReadOptions objects without explicit property configuration. Therefore, it is not necessary to add this configuration when creating a connection.

What You Expected?

I advised to follow the DorisSink method when creating a DorisSource connection. In the Build() method of setDorisReadOptions, empty readOptions. If it is empty, assign a value to it. it does not need to add this configuration when creating a connection.

How to Reproduce?

// DorisSource
DorisSource<List> dorisSource = DorisSourceBuilder.>builder()
.setDorisOptions(builder.build())
.setDorisReadOptions(DorisReadOptions.builder().build()) // DorisSource不设置会报空指针
.setDeserializer(new SimpleListDeserializationSchema())
.build();
// DorisSink
DorisSink.builder()
.setDorisReadOptions(DorisReadOptions.builder().build())
// .setDorisOptions(DorisOptions.builder() // DorisSink可不设置
.setFenodes("hadoop102:7030")
.setTableIdentifier("test.table1")
.setUsername("root")
.setPassword("aaaaaa")
.build())

Anything Else?

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Release Note 1.4.0

Feature & Improvement

  1. Support Flink1.17 version
  2. Support Synchronize the mysql database
    In version 1.4.0, FlinkCDC is integrated to support MySQL's entire database synchronization. Users can use Connector to quickly access the upstream business database.
  3. Asynchronous Lookup Join was introduced in version 1.4.0, and on this basis, the logic of querying and batching was added.
  4. 1.4.0 introduced the thrift-service sdk, which blocked the Thrfit plug-in and lowered the threshold for compiling.
  5. when there is no data import, the StreamLoad connection import will not be enabled
  6. Version 1.4.0 introduces a polling mechanism, and BE nodes are replaced during each Checkpoint, avoiding the pressure of a single node acting as a Coordinator for a long time.
  7. Support reading Doris's DecimalV3/DateV2/DateTimev2/Array/JSONB type (Doris must be 2.0 or above).

Bug

  1. Fix the concurrency problem when streamload link check
  2. Fix the problem that some timeout parameters of connection doris do not take effect
  3. Fix the problem that the hidden separator does not take effect when writing
  4. Fix the problem that the query plan of reading dori is too long and the serialization fails

Thanks

Thanks to everyone who has contributed to this release:
@caoliang-web
@DongLiang-0
@gnehil
@GoGoWen
@JNSimba
@legendtkl
@lsy3993
@Myasuka
@wolfboys

[Bug] when use doris source to read data from doris-2.0,the be will crash

Search before asking

  • I had searched in the issues and found no similar issues.

Version

apache-doris-2.0-beta-bin-x64-noavx2
flink-doris-connector-1.17.0

What's Wrong?

when use DorisSourceFunction to query data:
DataStreamSource<List<?>> listDataStreamSource = env.addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()));
then, be.out will print erro log:
I0808 09:57:44.648388 30942 fragment_mgr.cpp:745] Register query/load memory tracker, query/load id: bb343d2f65b046ae-a5e7eca8e02e1ede limit: 2.00 GB
I0808 09:57:44.648418 30942 plan_fragment_executor.cpp:115] PlanFragmentExecutor::prepare|query_id=TUniqueId(hi=-4957270016049264978, lo=-6491960127771500834)|instance_id=TUniqueId(hi=6071672337474381689, lo=-9070538737182937440)|backend_num=0|pthread_id=139938785482496
I0808 09:57:44.648770 7034 fragment_mgr.cpp:521] PlanFragmentExecutor::_exec_actual|query_id=bb343d2f65b046ae-a5e7eca8e02e1ede|instance_id=5442e9d263b9e779-821ef91388ca6ea0|pthread_id=139944866322176
I0808 09:57:44.648802 7034 plan_fragment_executor.cpp:253] PlanFragmentExecutor::open|query_id=TUniqueId(hi=-4957270016049264978, lo=-6491960127771500834)|instance_id=TUniqueId(hi=6071672337474381689, lo=-9070538737182937440)|mem_limit=2.00 GB
*** Query id: bb343d2f65b046ae-a5e7eca8e02e1ede ***
*** Aborted at 1691459864 (unix time) try "date -d @1691459864" if you are using GNU date ***
*** Current BE git commitID: afe6bb9 ***
*** SIGSEGV address not mapped to object (@0x40) received by PID 6627 (TID 7034 OR 0x7f47740b9700) from PID 64; stack trace: ***
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /root/src/doris/be/src/common/signal_handler.h:413
1# os::Linux::chained_handler(int, siginfo*, void*) in /home/software/jenkins/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so
2# JVM_handle_linux_signal in /home/software/jenkins/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so
3# signalHandler(int, siginfo*, void*) in /home/software/jenkins/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so
4# 0x00007F481A0D42F0 in /lib64/libc.so.6
5# memcpy at /root/src/doris/be/src/glibc-compatibility/memcpy/memcpy_x86_64.cpp:219
6# arrow::FixedSizeBinaryBuilder::AppendValues(unsigned char const*, long, unsigned char const*) in /home/data/apache-doris-2.0/be/lib/doris_be
7# doris::vectorized::DataTypeNumberSerDe<__int128>::write_column_to_arrow(doris::vectorized::IColumn const&, unsigned char const*, arrow::ArrayBuilder*, int, int) const at /root/src/doris/be/src/vec/data_types/serde/data_type_number_serde.cpp:86
8# doris::FromBlockConverter::convert(std::shared_ptrarrow::RecordBatch) at /root/src/doris/be/src/util/arrow/block_convertor.cpp:392
9# doris::convert_to_arrow_batch(doris::vectorized::Block const&, std::shared_ptrarrow::Schema const&, arrow::MemoryPool
, std::shared_ptrarrow::RecordBatch) in /home/data/apache-doris-2.0/be/lib/doris_be
10# doris::vectorized::MemoryScratchSink::send(doris::RuntimeState
, doris::vectorized::Block*, bool) at /root/src/doris/be/src/vec/sink/vmemory_scratch_sink.cpp:83
11# doris::PlanFragmentExecutor::open_vectorized_internal() in /home/data/apache-doris-2.0/be/lib/doris_be
12# doris::PlanFragmentExecutor::open() at /root/src/doris/be/src/runtime/plan_fragment_executor.cpp:273
13# doris::FragmentExecState::execute() at /root/src/doris/be/src/runtime/fragment_mgr.cpp:263
14# doris::FragmentMgr::_exec_actual(std::shared_ptrdoris::FragmentExecState, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /root/src/doris/be/src/runtime/fragment_mgr.cpp:527
15# std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291
16# doris::ThreadPool::dispatch_thread() in /home/data/apache-doris-2.0/be/lib/doris_be
17# doris::Thread::supervise_thread(void*) at /root/src/doris/be/src/util/thread.cpp:466
18# start_thread in /lib64/libpthread.so.0
19# __clone in /lib64/libc.so.6
./bin/start_be.sh: 行 308: 6627 段错误 (吐核)${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/doris_be" "$@" 2>&1 < /dev/null

What You Expected?

how to fix it?

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug]

Search before asking

  • I had searched in the issues and found no similar issues.

Version

doris ,1.2.4
flink ,1.1.4
flink-doris-connect , 1.1.1

What's Wrong?

flink写doris,报错:Reason: column(xxx) values is null while columns is not nullable

UNIQUE 模型。xxx字段列 :NOT NULL DEFAULT '1' 。已经指定了not null后的默认值。

当插入的字段值为null,为何不生效默认值?而是直接报错了。

What You Expected?

当插入的字段值为null,为何不生效默认值?而是直接报错了。
期望: 不报错,且not null后指定的默认值生效。

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] 整库同步时,自动排除不符合Doris表及字段规范的表名和字段名

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Doris表名和字段名 均不支持 中文和其他特殊字符。

希望在使用整库 同步时, 生成Doris 建表语句 时,排除掉 表名以及字段名不符合 doris建表语句规范的表及字段。

同时在 flink cdc 时, 也忽略掉不规范的字段,

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] No checkpointing is enabled by default when using CdcTools

Search before asking

  • I had searched in the issues and found no similar issues.

Version

I'm using code from the master branch

What's Wrong?

When I am using CdcTools to sync mysql database to doris, there is no checkpointing enabled by default, so the data can not be transformed to doris.

What You Expected?

Fix this problem by adding some parameters related to flink checkpointing configuration to the program args

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug][MultiThead] flink connector: stream load finished unexpectedly, interrupt worker thread! Stream closed

Search before asking

  • I had searched in the issues and found no similar issues.

Version

version: 1.14_2.11-1.1.1

What's Wrong?

flink interrupted with multi-thread issue. the root cause is dorisStreamLoad.handlePreCommitResponse handling the same future [details refer to steps in repro].

please refer to the log:
2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - load Result {

"TxnId": 19257242,

"Label": "minutely_zt_video_effects_1682243486014_12_9760",

"TwoPhaseCommit": "true",

"Status": "Success",

"Message": "OK",

"NumberTotalRows": 0,

"NumberLoadedRows": 0,

"NumberFilteredRows": 0,

"NumberUnselectedRows": 0,

"LoadBytes": 0,

"LoadTimeMs": 30053,

"BeginTxnTimeMs": 0,

"StreamLoadPutTimeMs": 21,

"ReadDataTimeMs": 0,

"WriteDataTimeMs": 29957,

"CommitAndPublishTimeMs": 0

}

2023-04-24 05:09:45.779 ERROR [stream-load-check-th:k-thread-1] org.apache.doris.flink.sink.writer.DorisWriter - stream load finished unexpectedly, interrupt worker thread! Stream closed

2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.RecordBuffer - start buffer data, read queue size 0, write queue size 400

2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - stream load started for minutely_zt_video_effects_1682243486014_12_9761

2023-04-24 05:09:45.779 INFO [stream-load-upload-t:d-thread-1] org.apache.doris.flink.sink.writer.DorisStreamLoad - start execute load

2023-04-24 05:09:45.786 INFO [stream-load-upload-t:d-thread-1] org.apache.http.impl.execchain.RetryExec - I/O exception (java.net.SocketException) caught when processing request to {}->http://11.148.133.129:8240: Socket is closed

2023-04-24 05:09:45.786 INFO [stream-load-upload-t:d-thread-1] org.apache.http.impl.execchain.RetryExec - Retrying request to {}->http://11.148.133.129:8240

2023-04-24 05:09:45.789 INFO [Sink doris-sink-dail: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - load Result {

the log also may be: stream closed
2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - load Result {

"TxnId": 19257242,

"Label": "minutely_zt_video_effects_1682243486014_12_9760",

"TwoPhaseCommit": "true",

"Status": "Success",

"Message": "OK",

"NumberTotalRows": 0,

"NumberLoadedRows": 0,

"NumberFilteredRows": 0,

"NumberUnselectedRows": 0,

"LoadBytes": 0,

"LoadTimeMs": 30053,

"BeginTxnTimeMs": 0,

"StreamLoadPutTimeMs": 21,

"ReadDataTimeMs": 0,

"WriteDataTimeMs": 29957,

"CommitAndPublishTimeMs": 0

}

2023-04-24 05:09:45.779 ERROR [stream-load-check-th:k-thread-1] org.apache.doris.flink.sink.writer.DorisWriter - stream load finished unexpectedly, interrupt worker thread! Stream closed

2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.RecordBuffer - start buffer data, read queue size 0, write queue size 400

2023-04-24 05:09:45.779 INFO [Sink doris-sink-minu: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - stream load started for minutely_zt_video_effects_1682243486014_12_9761

2023-04-24 05:09:45.779 INFO [stream-load-upload-t:d-thread-1] org.apache.doris.flink.sink.writer.DorisStreamLoad - start execute load

2023-04-24 05:09:45.786 INFO [stream-load-upload-t:d-thread-1] org.apache.http.impl.execchain.RetryExec - I/O exception (java.net.SocketException) caught when processing request to {}->http://11.148.133.129:8240: Socket is closed

2023-04-24 05:09:45.786 INFO [stream-load-upload-t:d-thread-1] org.apache.http.impl.execchain.RetryExec - Retrying request to {}->http://11.148.133.129:8240

2023-04-24 05:09:45.789 INFO [Sink doris-sink-dail: (13/20)#0] org.apache.doris.flink.sink.writer.DorisStreamLoad - load Result {

What You Expected?

works well.

How to Reproduce?

steps to reproduce:
image
1, main thread call prepareCommit , set loading =false to stop the load-1,http request return success,this time dorisStreamLoad.getPendingLoadFuture().isDone() is true. now the checker thread start to run.
2, checker thread run to "if (dorisStreamLoad.getPendingLoadFuture() != null&& dorisStreamLoad.getPendingLoadFuture().isDone())" ,now this check pass like step 1; then main thread continue to start a new load-2, set the loading =true and new Future. turn to checker thread, the thread continue to "if(!loading)", currently this check also will pass, then checker thread continue to "RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());", as loading is running, it blocked, will be wait loading to finish.
3, after a while, main thread call prepareCommit to finish load-2, the future return, wake up main thread and checker thread to process response concurrently, the checker will interrupt itself.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] Optimize the synchronization of primary key field and unique field

Search before asking

  • I had searched in the issues and found no similar issues.

Description

如果源数据库表不使用主键,请检查源数据库表中是否存在唯一的键索引。如果表中存在唯一键索引,请使用相应的唯一键索引字段作为Doris唯一键的字段。使用唯一键索引时,有必要调整Doris表创建语句中的字段顺序。这种调整确保了Doris表中的字段与唯一键索引中字段的顺序相匹配

If the source database table does not use a primary key, then check if there is a unique key index in the source database table. If a unique key index exists in the table, use the corresponding unique key index field as the field for the Doris unique key. When using a unique key index, it is necessary to adjust the field order in the Doris table creation statement. This adjustment ensures that the fields in the Doris table match the order of the fields in the unique key index

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] unique表新增default current_timestamp的列, 有的行没有填充current_timestamp

Search before asking

  • I had searched in the issues and found no similar issues.

Version

2.0.0-alpha

What's Wrong?

Doris里有个表, 通过alter table add columnt data_ts datetime default current_timestamp的方式增加了一列, 但是有的数据被填充了值, 有的却没有.

而且使用全库同步的方式建的表新增了这个字段(mysql源表并没有), 有新的数据sink进来后这一列也没有被填充.

What You Expected?

应该每一行数据都填充或者添加这列时间点之前的数据不填充.

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Enhancement]

Search before asking

  • I had searched in the issues and found no similar issues.

Description

In version 1.4.0,Release Note 1.4.0 shows:
6. Version 1.4.0 introduces a polling mechanism, and BE nodes are replaced during each Checkpoint, avoiding the pressure of a single node acting as a Coordinator for a long time. 
The Improvement is friendly to one stream load task.
but In a high concurrency scenario, the load task is distributed to a BE node, resulting in great pressure on this node. my environment 72 parallelism, 3 FE node, 6 BE node, one BE node as 72 Coordinator is too heavy just some time after startup。traffic needs to be distributed to all BE nodes in a near-average.

Solution

In DorisWriter.initializeLoad() 107 line.
//cache backend
this.backends = RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG);
Collections.shuffle(backends); //Is it possible to add this line ?
String backend = getAvailableBackend();

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] invalid maven metadata for 1.4.0 release

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.4.0

What's Wrong?

The metadata on maven central is incorrect. If you look at https://search.maven.org/artifact/org.apache.doris/flink-doris-connector-1.17/1.4.0/jar the metadata file contains <revision>1.4.0-SNAPSHOT</revision> and <version>${revision}</version>.

In previous releases it appears there was <version>1.3.0</version> and no <revision> tag.

What You Expected?

<revision>1.4.0-SNAPSHOT</revision> should be 1.4.0, or there should be no <revision> tag and <version> should be set to 1.4.0.

How to Reproduce?

Add flink-doris-connector dependency to your project, per https://search.maven.org/artifact/org.apache.doris/flink-doris-connector-1.17/1.4.0/jar.

For example implementation("org.apache.doris:flink-doris-connector-1.17:1.4.0") if using gradle kotlin DSL.

The build fails because the metadata is inconsistent:

> Could not resolve org.apache.doris:flink-doris-connector-1.17:1.4.0.
         > inconsistent module metadata found. Descriptor: org.apache.doris:flink-doris-connector-1.17:1.4.0-SNAPSHOT Errors: bad version: expected='1.4.0' found='1.4.0-SNAPSHOT'

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] Synchronize the 'change column' statement (DML) to Doris

Search before asking

  • I had searched in the issues and found no similar issues.

Description

I noticed that the JsonDebeziumSchemaSerializer only detects schema changes with ADD or DROP. May I ask whether the reason why 'alter table change column' statements are not synchronized is due to lack of support or for some other reason?

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Enhancement] assign proxy IP replace, When the BE Nodes is in intranet IP

Search before asking

  • I had searched in the issues and found no similar issues.

Description

When Use Stream Load to access FE's public network address to import data, but is redirected to the intranet IP, this situation casue connect timeout.

Solution

can we add a options "benodes" in DorisOptions to force assign backends IP?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] IllegalArgumentException: Row arity: 5, but serializer arity: 4

Search before asking

  • I had searched in the issues and found no similar issues.

Version

flink version : flink-1.14.4-scala_2.12

flink doris connector version : flink-doris-connector-1.14_2.12-1.0.3.jar

doris version : doris-0.15.0

What's Wrong?

submit a job use flink sql-client :insert into select

error logs:

2022-04-22 22:27:56
java.lang.IllegalArgumentException: Row arity: 5, but serializer arity: 4
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

What You Expected?

bug

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] Dataloss silently during streamload

Search before asking

  • I had searched in the issues and found no similar issues.

Version

1.1.0

What's Wrong?

We have a job that loads MySQL CDC data into Doris with Doris Flink connector 1.1.0. The Flink logs and metrics show that there are 1464 records written, but Streamload only gets 1463 (verified via Doris select and streamload logs), leaving 1 record missing.

image

image

I'm pretty sure there are no other streamloads getting the remaining 1 record.

What You Expected?

No data loss.

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] doris默认值列存在时间差问题

Search before asking

  • I had searched in the issues and found no similar issues.

Version

flink-1.15.2,doris-2.0.1-rc03

What's Wrong?

通过flink消费数据插入到doris,doris有一列为default current_timestamp(3),查询数据发现doris默认值数据的时间比业务时间小,这不符合常规逻辑

What You Expected?

Doris默认值时间要比业务时间大

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Feature] Add documentation to support developers to read

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Everyone using doris-flink-connector lacks details in Chinese and English documents, such as cdc synchronization

Use case

No response

Related issues

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] DorisRuntimeException: stream load error

Search before asking

  • I had searched in the issues and found no similar issues.

Version

doris :1.1.2
flink-doris-connector-1.14_2.12 :1.1.0

What's Wrong?

When a BE node process fails, the flink program cannot recover automatically. The following error occurs:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1583)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1559)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:764)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:1062)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:1041)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:857)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:643)
at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{org.apache.doris.flink.exception.DorisRuntimeException: stream load error: null}
... 14 more
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: null
at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:107)
at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
at org.apache.flink.streaming.runtime.operators.sink.StreamingCommitterHandler.commit(StreamingCommitterHandler.java:54)
at org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler.retry(AbstractStreamingCommitterHandler.java:96)
at org.apache.flink.streaming.runtime.operators.sink.AbstractCommitterHandler.retry(AbstractCommitterHandler.java:66)
at org.apache.flink.streaming.runtime.operators.sink.CommitRetrier.retry(CommitRetrier.java:80)
at org.apache.flink.streaming.runtime.operators.sink.CommitRetrier.lambda$retryAt$0(CommitRetrier.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 13 more

What You Expected?

When one of the Doris cluster's BE fails, the Flink process resumes automatically.

How to Reproduce?

No response

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

[Bug] connector build base flink 1.14.x failed

Search before asking

  • I had searched in the issues and found no similar issues.

Version

master

What's Wrong?

sh build.sh --flink 1.14.3 --scala 2.12

[ERROR] Failed to execute goal on project flink-doris-connector-1.14: Could not resolve dependencies for project 

org.apache.doris:flink-doris-connector-1.14:jar:1.0.0-SNAPSHOT: The following artifacts could not be resolved: 

org.apache.flink:flink-clients:jar:1.14.3, org.apache.flink:flink-table-planner-loader:jar:1.14.3, org.apache.flink:flink-table-api-

java-bridge:jar:1.14.3, org.apache.flink:flink-table-runtime:jar:1.14.3: Failure to find org.apache.flink:flink-clients:jar:1.14.3 in 

https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update 

interval of central has elapsed or updates are forced -> [Help 1]

What You Expected?

✅ Build success(sh build.sh --flink 1.14.3 --scala 2.12)

How to Reproduce?

flink-table-planner-loader olny support flink 1.15.x.

It need to be compatible.

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.