MongoFlink is a connector between MongoDB and Apache Flink. It acts as a Flink sink (and an experimental Flink source), and provides transaction mode(which ensures exactly-once semantics) for MongoDB 4.2 above, and non-transaction mode for MongoDB 3.0 above.
MongoFlink is in its early phase, and any use, feedback or contribution is welcome!
- Flink 1.12 above. MongoFlink is built on top of the new sink API added in FLIP-143 or Flink 1.12.0.
- MongoDB 3.0 above. The official MongoDB Java driver supports 3.0 above.
- JDK 1.8 above.
MongoFlink is not registered on Maven central yet, so users need to build the artifacts first.
Checkout the project, and use Maven to install the project locally.
$ mvn install
Then add the following dependency in your project's pom.xml.
<dependency>
<groupId>org.mongoflink</groupId>
<artifactId>mongo-flink</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
Use MongoSink in your Flink application.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// non-transactional sink with a flush strategy of 1000 documents or 10 seconds
Properties properties = new Properties();
properties.setProperty(MongoOptions.SINK_TRANSACTION_ENABLED, "false");
properties.setProperty(MongoOptions.SINK_FLUSH_ON_CHECKPOINT, "false");
properties.setProperty(MongoOptions.SINK_FLUSH_SIZE, String.valueOf(1_000L));
properties.setProperty(MongoOptions.SINK_FLUSH_INTERVAL, String.valueOf(10_000L));
env.addSource(...)
.sinkTo(new MongoSink<>("mongodb://user:[email protected]:27017", "mydb", "mycollection",
new StringDocumentSerializer(), properties));
env.execute();
MongoFlink can be configured using properties.
key | description | default value |
---|---|---|
sink.transaction.enable | Whether use transactions in MongoSink (requires MongoDB 4.2+). | false |
sink.flush.on-checkpoint | Whether flush the buffered documents on checkpoint barriers. | false |
sink.flush.size | Max buffered documents before flush. Only valid when sink.flush.on-checkpoint is false . |
1000 |
sink.flush.interval | Flush interval in milliseconds. Only valid when sink.flush.on-checkpoint is false . |
30000 |
Welcome to file an issue if you need help on adopting MongoFlink. Please describe your environment in the issue (e.g. what MongoDB/Flink version you're using).
Welcome to open pull requests if you want to improve MongoFlink. Please state the purpose (and the design if it's a big feature) for better review experience.