embulk / embulk-input-mongodb Goto Github PK
View Code? Open in Web Editor NEWMongoDB input plugin for Embulk loads records from MongoDB.
License: Apache License 2.0
MongoDB input plugin for Embulk loads records from MongoDB.
License: Apache License 2.0
Hi,
When I use incremental loading in the way specified in README, when there has been new records (as specified in "incremental field") since the last run, the plugin would correctly upload only the new records and writes the last record to the config file. However, if there are no new record since last run, the plugin would correctly upload nothing, but will also empty the last_record field of the config file. The would cause problems as the next time Embulk will upload everything again. Just wondering if this is a bug, a intended behavior, and whether there are any workarounds. Thanks! @friendofasquid
Hi,
Just wondering if there are plans to upgrade the mongodb java driver to 3.6. According to http://mongodb.github.io/mongo-java-driver/3.6/upgrading/, driver 3.6 is backward compatible with earlier versions, while the current version (3.2) in the plugin is not compatible with higher versions. Since our company is upgrading to mongo 3.6, it will be great if this plugin can be updated to latest version of the java driver. Thanks!
@friendofasquid
This is BREAKING CHANGE
Works only embulk >= 0.8.7
For first release, just support JSON column,
{ "_id": "55eae883689a08361045d64a", "name": "obj1", "rank": 1, "value": 1.1, "created_at": { "$date" : 1441533998786 }, "embeded": { "key": "value1" } }
{ "_id": "55eae883689a08361045d64b", "name": "obj2", "rank": 2, "value": 2.2, "created_at": { "$date" : 1441533988786 }, "embeded": { "key": "value2" } }
{ "_id": "55eae883689a08361045d64c", "name": "obj3", "rank": 3, "value": 3.3, "created_at": { "$date" : 1441533978786 }, "embeded": { "key": "value3", "key2": "value3-2", "key3": ["v1", "v2"]} }
{ "_id": "55eae883689a08361045d64d", "name": "obj4", "rank": 4, "value": 4.4, "created_at": { "$date" : 1441533968786 }, "embeded": { "key": "value4" } }
{ "_id": "55eae883689a08361045d64e", "name": "obj5", "rank": 5, "value": 5.5, "created_at": { "$date" : 1441533958786 }, "embeded": { "key": "value5" } }
{ "_id": "55eae883689a08361045d64f", "name": "obj6", "rank": 6, "value": 6.6, "created_at": { "$date" : 1441533948786 }, "embeded": { "key": "value6" } }
{ "_id": "55eae883689a08361045d650", "name": "obj7", "rank": 7, "value": 7.7, "created_at": { "$date" : 1441533938786 }, "embeded": { "key": "value7" } }
{ "_id": "55eae883689a08361045d651", "name": "obj8", "rank": 8, "value": 8.8, "created_at": { "$date" : 1441533928786 }, "embeded": { "key": "value8" } }
{ "_id": "55eae883689a08361045d652", "name": "obj9", "rank": 9, "value": 9.9, "created_at": { "$date" : 1441533918786 }, "embeded": { "key": "value9" } }
in:
type: mongodb
uri: mongodb://localhost:27017/my_database
collection: "my_collection"
fields:
- { name: id, type: string }
- { name: name, type: string }
- { name: rank, type: long }
- { name: value, type: double }
- { name: created_at, type: timestamp }
- { name: embeded, type: json }
query: '{ rank: { $gte: 3 } }'
sort: '{ rank: -1 }'
batch_size: 100
out:
type: file
path_prefix: ./tmp/full
file_ext: csv
formatter:
type: csv
header_line: true
charset: UTF-8
newline: CRLF
id,name,rank,value,created_at,embeded
55eae883689a08361045d652,obj9,9,9.9,2015-09-06 10:05:18.786000 +0000,"{""key"":""value9""}"
55eae883689a08361045d651,obj8,8,8.8,2015-09-06 10:05:28.786000 +0000,"{""key"":""value8""}"
55eae883689a08361045d650,obj7,7,7.7,2015-09-06 10:05:38.786000 +0000,"{""key"":""value7""}"
55eae883689a08361045d64f,obj6,6,6.6,2015-09-06 10:05:48.786000 +0000,"{""key"":""value6""}"
55eae883689a08361045d64e,obj5,5,5.5,2015-09-06 10:05:58.786000 +0000,"{""key"":""value5""}"
55eae883689a08361045d64d,obj4,4,4.4,2015-09-06 10:06:08.786000 +0000,"{""key"":""value4""}"
55eae883689a08361045d64c,obj3,3,3.3,2015-09-06 10:06:18.786000 +0000,"{""key2"":""value3-2"",""key3"":[""v1"",""v2""],""key"":""value3""}"
Hello,
I've been trying to use this source without success for UUID fields.
Managed to make it work by trying to decode binary as UUID before decoding as actually binary:
Not a mongo expert, but maybe there is a better way to actually identify the type as UUID and use the codec as codec from the codec registry.
Anyone had any similar issue?
Thankyou!
https://docs.mongodb.com/manual/reference/bson-types/
MongoDB has Symbol type, but current version does not support it.
SYMBOL can convert into string.
Hello,
We exported our database of MongoDB 4.4 with this plugin and got invalid data.
It seems all columns are exported as string "1"
.
I found the driver version is very old.
embulk-input-mongodb/build.gradle
Line 33 in c54dfd9
It does not support MongoDB 4.2 or later.
https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/
I'd like to open a pull request to update the driver.
Perhaps not in the spirit of embulk, but I would to send an aggregate query that looks like the below. Is this currently possible?
db.getCollection('users').aggregate([
{ $match: {
"active_from":{"$lte":new Date() },
$or: [
{ "active_until":{"$gt":new Date() }},
{ "active_until": null }
]
} },
{ $group: {_id: "$account_id", total: {$sum: 1} } }
]);
I'd like to migrate to using separated URI parameters, but I am missing the ability to specify some required settings.
Currently I can specify:
uri: mongodb://myuser:mypassword@localhost:27017/my_database?authMechanism=SCRAM-SHA-1&authSource=another_database
However, I can't do this with the separated parameters. Could you add the ability to specify uri_querystring
or the specific parameters that I need?
I would like to use limit()
to reduce the number of results returned in a single query. Is that possible now? I don't think so.
Would be happy to help contribute, if I could have some help on where to start!
Error occurs with using the incremental_field option along with query.
org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: Could not generate new query for incremental load.
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:566)
at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
at org.embulk.spi.Exec.doWith(Exec.java:22)
at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:242)
at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:291)
at org.embulk.EmbulkRunner.run(EmbulkRunner.java:155)
at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:431)
at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:90)
at org.embulk.cli.Main.main(Main.java:64)
Suppressed: java.lang.NullPointerException
at org.embulk.exec.BulkLoader.doCleanup(BulkLoader.java:463)
at org.embulk.exec.BulkLoader$3.run(BulkLoader.java:397)
at org.embulk.exec.BulkLoader$3.run(BulkLoader.java:394)
at org.embulk.spi.Exec.doWith(Exec.java:22)
at org.embulk.exec.BulkLoader.cleanup(BulkLoader.java:394)
at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:245)
... 5 more
Caused by: org.embulk.config.ConfigException: Could not generate new query for incremental load.
at org.embulk.input.mongodb.MongodbInputPlugin.buildIncrementalCondition(MongodbInputPlugin.java:394)
at org.embulk.input.mongodb.MongodbInputPlugin.transaction(MongodbInputPlugin.java:86)
at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:507)
... 11 more
Error: org.embulk.config.ConfigException: Could not generate new query for incremental load.
I want to query _id field with timestamp, but hit 'query' parameter error.
following is my yaml:
in:
type: mongodb
hosts:
- {host: 192.168.99.100, port: 32768}
database: test
collection: "restaurants"
query: '{ "_id": { $gt: ObjectId(Math.floor((new Date("2015/4/4"))/1000).toString(16) + "0000000000000000") } }'
Error: Invalid JSON string was given for 'query' parameter. [{ "_id": { $gt: ObjectId(Math.floor((new Date("2015/4/4"))/1000).toString(16) + "0000000000000000") } }]
Thanks for the helping.
I got this error.
org.embulk.exec.PartialExecutionException: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363)
at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572)
at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:33)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:374)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:370)
at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:370)
at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:606)
at RUBY.run(uri:classloader:/embulk/runner.rb:84)
at RUBY.run(uri:classloader:/embulk/command/embulk_run.rb:306)
at RUBY.<top>(uri:classloader:/embulk/command/embulk_main.rb:2)
at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:937)
at usr.local.bin.embulk.embulk.command.embulk_bundle.<top>(file:/usr/local/bin/embulk!/embulk/command/embulk_bundle.rb:30)
at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:599)
at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
Caused by: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.
at org.bson.AbstractBsonReader.throwInvalidState(org/bson/AbstractBsonReader.java:634)
at org.bson.AbstractBsonReader.readName(org/bson/AbstractBsonReader.java:546)
at org.embulk.input.mongodb.ValueCodec.decode(org/embulk/input/mongodb/ValueCodec.java:52)
at org.embulk.input.mongodb.ValueCodec.decode(org/embulk/input/mongodb/ValueCodec.java:24)
at com.mongodb.connection.ReplyMessage.<init>(com/mongodb/connection/ReplyMessage.java:57)
at com.mongodb.connection.QueryProtocol.execute(com/mongodb/connection/QueryProtocol.java:305)
at com.mongodb.connection.QueryProtocol.execute(com/mongodb/connection/QueryProtocol.java:54)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(com/mongodb/connection/DefaultServer.java:159)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(com/mongodb/connection/DefaultServerConnection.java:286)
at com.mongodb.connection.DefaultServerConnection.query(com/mongodb/connection/DefaultServerConnection.java:209)
at com.mongodb.operation.FindOperation$1.call(com/mongodb/operation/FindOperation.java:496)
at com.mongodb.operation.FindOperation$1.call(com/mongodb/operation/FindOperation.java:482)
at com.mongodb.operation.OperationHelper.withConnectionSource(com/mongodb/operation/OperationHelper.java:239)
at com.mongodb.operation.OperationHelper.withConnection(com/mongodb/operation/OperationHelper.java:212)
at com.mongodb.operation.FindOperation.execute(com/mongodb/operation/FindOperation.java:482)
at com.mongodb.operation.FindOperation.execute(com/mongodb/operation/FindOperation.java:79)
at com.mongodb.Mongo.execute(com/mongodb/Mongo.java:772)
at com.mongodb.Mongo$2.execute(com/mongodb/Mongo.java:759)
at com.mongodb.OperationIterable.iterator(com/mongodb/OperationIterable.java:47)
at com.mongodb.FindIterableImpl.iterator(com/mongodb/FindIterableImpl.java:143)
at org.embulk.input.mongodb.MongodbInputPlugin.run(org/embulk/input/mongodb/MongodbInputPlugin.java:173)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(org/embulk/exec/LocalExecutorPlugin.java:294)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(org/embulk/exec/LocalExecutorPlugin.java:212)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(org/embulk/exec/LocalExecutorPlugin.java:257)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(org/embulk/exec/LocalExecutorPlugin.java:253)
at java.util.concurrent.FutureTask.run(java/util/concurrent/FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:615)
at java.lang.Thread.run(java/lang/Thread.java:745)
Error: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.