Code Monkey home page Code Monkey logo

embulk-input-mongodb's Introduction

MongoDB input plugin for Embulk

Build Status

MongoDB input plugin for Embulk loads records from MongoDB. This plugin loads documents as single-column records (column name is "record"). You can use filter plugins such as embulk-filter-expand_json or embulk-filter-add_time to convert the json column to typed columns. Rename filter is also useful to rename the typed columns.

Overview

This plugin only works with embulk >= 0.8.8.

  • Plugin type: input
  • Guess supported: no

Configuration

  • Connection parameters One of them is required.

    • use MongoDB connection string URI
    • use separated URI parameters
      • hosts: list of hosts. hosts are pairs of host(string, required) and port(integer, optional, default: 27017)
      • auth_method: Auth method. One of scram-sha-1, mongodb-cr, auto (string, optional, default: null)
      • auth_source: Auth source. The database name where the user is defined (string, optional, default: null)
      • user: (string, optional)
      • password: (string, optional)
      • database: (string, required)
      • tls: true to use TLS to connect to the host (boolean, optional, default: false)
      • tls_insecure: true to disable various certificate validations (boolean, optional, default: false)
  • collection: source collection name (string, required)

  • fields: (deprecated) hash records that has the following two fields (array, required) - name: Name of the column - type: Column types as follows - boolean - long - double - string - timestamp

  • id_field_name Name of Object ID field name. Set if you want to change the default name _id (string, optional, default: "_id")

  • query: A JSON document used for querying on the source collection. Documents are loaded from the colleciton if they match with this condition. (string, optional)

  • projection: A JSON document used for projection on query results. Fields in a document are used only if they match with this condition. (string, optional)

  • sort: Ordering of results (string, optional)

  • aggregation: Aggregation query (string, optional) See Aggregation query for more detail.

  • batch_size: Limits the number of objects returned in one batch (integer, optional, default: 10000)

  • incremental_field List of field name (list, optional, can't use with sort option)

  • last_record Last loaded record for incremental load (hash, optional)

  • stop_on_invalid_record Stop bulk load transaction if a document includes invalid record (such as unsupported object type) (boolean, optional, default: false)

  • json_column_name: column name used in outputs (string, optional, default: "record")

Example

Authentication

Use separated URI prameters

in:
  type: mongodb
  hosts:
  - {host: localhost, port: 27017}
  user:  myuser
  password: mypassword
  database: my_database
  auth_method: scram-sha-1
  auth_source: auth_db
  collection: "my_collection"

If you set auth_method: auto, The client will negotiate the best mechanism based on the version of the server that the client is authenticating to.

If the server version is 3.0 or higher, the driver will authenticate using the SCRAM-SHA-1 mechanism.

Otherwise, the driver will authenticate using the MONGODB_CR mechanism.

Use URI String

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database?authMechanism=SCRAM-SHA-1&authSource=another_database

Exporting all objects

Specify with MongoDB connection string URI.

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"

Specify with separated URI parameters.

in:
  type: mongodb
  hosts:
  - {host: localhost, port: 27017}
  - {host: example.com, port: 27017}
  user: myuser
  password: mypassword
  database: my_database
  collection: "my_collection"

Filtering documents by query and projection

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ field1: { $gte: 3 } }'
  projection: '{ "_id": 1, "field1": 1, "field2": 0 }'
  sort: '{ "field1": 1 }'

Incremental loading

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ field1: { $gt: 3 } }'
  projection: '{ "_id": 1, "field1": 1, "field2": 1 }'
  incremental_field:
    - "field2"
  last_record: {"field2": 13215}

Plugin will create new query and sort value. You can't use incremental_field option with sort option at the same time.

query { field1: { $gt: 3 }, field2: { $gt: 13215}}
sort {"field2", 1} # field2 ascending

You have to specify last_record with special characters when field type is ObjectId or DateTime.

# ObjectId field
in:
  type: mongodb
  incremental_field:
    - "_id"
  last_record: {"_id": {"$oid": "5739b2261c21e58edfe39716"}}

# DateTime field
in:
  type: mongodb
  incremental_field:
    - "time_field"
  last_record: {"time_field": {"$date": "2015-01-25T13:23:15.000Z"}}

Run Incremental load

$ embulk run /path/to/config.yml -c config-diff.yml

Aggregation query

This plugin supports aggregation query. You can write complex query like below.

aggregation option can't be used with sort, limit, skip, query option. Incremental load also doesn't work with aggregation query.

in:
  type: mongodb
  aggregation: { $match: {"int32_field":{"$gte":5 },} }

See also Aggregation โ€” MongoDB Manual and Aggregation Pipeline Stages โ€” MongoDB Manual

Advanced usage with filter plugins

in:
  type: mongodb
  uri: mongodb://myuser:mypassword@localhost:27017/my_database
  collection: "my_collection"
  query: '{ "age": { $gte: 3 } }'
  projection: '{ "_id": 1, "age": 1, "ts": 1, "firstName": 1, "lastName": 1 }'

filters:
  # convert json column into typed columns
  - type: expand_json
    json_column_name: record
    expanded_columns:
      - {name: _id, type: long}
      - {name: ts, type: string}
      - {name: firstName, type: string}
      - {name: lastName, type: string}

  # rename column names
  - type: rename
    columns:
      _id: id
      firstName: first_name
      lastName: last_name

  # convert string "ts" column into timestamp "time" column
  - type: add_time
    from_column:
      name: ts
      timestamp_format: "%Y-%m-%dT%H:%M:%S.%N%z"
    to_column:
      name: time
      type: timestamp

Build

$ ./gradlew gem

Test

Firstly install Docker and Docker compose then docker-compose up -d, so that an MongoDB server will be locally launched then you can run tests with ./gradlew test.

$ docker-compose up -d
Creating embulk-input-mongodb_server ... done
Creating mongo-express               ... done
Creating mongoClientTemp             ... done

$ docker-compose ps
           Name                          Command                 State                            Ports
------------------------------------------------------------------------------------------------------------------------------
embulk-input-mongodb_server   docker-entrypoint.sh mongod      Up           0.0.0.0:27017->27017/tcp, 0.0.0.0:27018->27018/tcp
mongo-express                 tini -- /docker-entrypoint ...   Up           0.0.0.0:8081->8081/tcp
mongoClientTemp               docker-entrypoint.sh mongo ...   Restarting

$ ./gradlew test  # -t to watch change of files and rebuild continuously

embulk-input-mongodb's People

Contributors

amaya382 avatar dmikurube avatar hakobera avatar kamatama41 avatar legiangthanh avatar sakama avatar shoken0x avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

embulk-input-mongodb's Issues

Incremental loading resets last_record field in the config file if no new record is uploaded

Hi,

When I use incremental loading in the way specified in README, when there has been new records (as specified in "incremental field") since the last run, the plugin would correctly upload only the new records and writes the last record to the config file. However, if there are no new record since last run, the plugin would correctly upload nothing, but will also empty the last_record field of the config file. The would cause problems as the next time Embulk will upload everything again. Just wondering if this is a bug, a intended behavior, and whether there are any workarounds. Thanks! @friendofasquid

error when using query with incremental load

Error occurs with using the incremental_field option along with query.

org.embulk.exec.PartialExecutionException: org.embulk.config.ConfigException: Could not generate new query for incremental load.
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
        at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:566)
        at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
        at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
        at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
        at org.embulk.spi.Exec.doWith(Exec.java:22)
        at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
        at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:242)
        at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:291)
        at org.embulk.EmbulkRunner.run(EmbulkRunner.java:155)
        at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:431)
        at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:90)
        at org.embulk.cli.Main.main(Main.java:64)
        Suppressed: java.lang.NullPointerException
                at org.embulk.exec.BulkLoader.doCleanup(BulkLoader.java:463)
                at org.embulk.exec.BulkLoader$3.run(BulkLoader.java:397)
                at org.embulk.exec.BulkLoader$3.run(BulkLoader.java:394)
                at org.embulk.spi.Exec.doWith(Exec.java:22)
                at org.embulk.exec.BulkLoader.cleanup(BulkLoader.java:394)
                at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:245)
                ... 5 more
Caused by: org.embulk.config.ConfigException: Could not generate new query for incremental load.
        at org.embulk.input.mongodb.MongodbInputPlugin.buildIncrementalCondition(MongodbInputPlugin.java:394)
        at org.embulk.input.mongodb.MongodbInputPlugin.transaction(MongodbInputPlugin.java:86)
        at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:507)
        ... 11 more

Error: org.embulk.config.ConfigException: Could not generate new query for incremental load.

how to query _id with timestamp

I want to query _id field with timestamp, but hit 'query' parameter error.

following is my yaml:

in:
  type: mongodb
  hosts:
  - {host: 192.168.99.100, port: 32768}
  database: test
  collection: "restaurants"
  query: '{ "_id": { $gt: ObjectId(Math.floor((new Date("2015/4/4"))/1000).toString(16) + "0000000000000000") } }'
Error: Invalid JSON string was given for 'query' parameter. [{ "_id": { $gt: ObjectId(Math.floor((new Date("2015/4/4"))/1000).toString(16) + "0000000000000000") } }]

Thanks for the helping.

Run an aggregate query

Perhaps not in the spirit of embulk, but I would to send an aggregate query that looks like the below. Is this currently possible?

db.getCollection('users').aggregate([
    { $match: {
        "active_from":{"$lte":new Date() },
        $or: [
            { "active_until":{"$gt":new Date() }},
            { "active_until": null }
        ]
    } },
    { $group: {_id: "$account_id", total: {$sum: 1} } }
]);

Issue decoding UUID

Hello,

I've been trying to use this source without success for UUID fields.

Managed to make it work by trying to decode binary as UUID before decoding as actually binary:

image

Not a mongo expert, but maybe there is a better way to actually identify the type as UUID and use the codec as codec from the codec registry.

Anyone had any similar issue?

Thankyou!

Support JSON type

This is BREAKING CHANGE
Works only embulk >= 0.8.7

For first release, just support JSON column,

Source data

{ "_id": "55eae883689a08361045d64a", "name": "obj1", "rank": 1, "value": 1.1, "created_at": { "$date" : 1441533998786 }, "embeded": { "key": "value1" } }
{ "_id": "55eae883689a08361045d64b", "name": "obj2", "rank": 2, "value": 2.2, "created_at": { "$date" : 1441533988786 }, "embeded": { "key": "value2" } }
{ "_id": "55eae883689a08361045d64c", "name": "obj3", "rank": 3, "value": 3.3, "created_at": { "$date" : 1441533978786 }, "embeded": { "key": "value3", "key2": "value3-2", "key3": ["v1", "v2"]} }
{ "_id": "55eae883689a08361045d64d", "name": "obj4", "rank": 4, "value": 4.4, "created_at": { "$date" : 1441533968786 }, "embeded": { "key": "value4" } }
{ "_id": "55eae883689a08361045d64e", "name": "obj5", "rank": 5, "value": 5.5, "created_at": { "$date" : 1441533958786 }, "embeded": { "key": "value5" } }
{ "_id": "55eae883689a08361045d64f", "name": "obj6", "rank": 6, "value": 6.6, "created_at": { "$date" : 1441533948786 }, "embeded": { "key": "value6" } }
{ "_id": "55eae883689a08361045d650", "name": "obj7", "rank": 7, "value": 7.7, "created_at": { "$date" : 1441533938786 }, "embeded": { "key": "value7" } }
{ "_id": "55eae883689a08361045d651", "name": "obj8", "rank": 8, "value": 8.8, "created_at": { "$date" : 1441533928786 }, "embeded": { "key": "value8" } }
{ "_id": "55eae883689a08361045d652", "name": "obj9", "rank": 9, "value": 9.9, "created_at": { "$date" : 1441533918786 }, "embeded": { "key": "value9" } }

embulk config

in:
  type: mongodb
  uri: mongodb://localhost:27017/my_database
  collection: "my_collection"
  fields:
    - { name: id, type: string }
    - { name: name, type: string }
    - { name: rank, type: long }
    - { name: value, type: double }
    - { name: created_at, type: timestamp }
    - { name: embeded, type: json }
  query: '{ rank: { $gte: 3 } }'
  sort: '{ rank: -1 }'
  batch_size: 100
out:
  type: file
  path_prefix: ./tmp/full
  file_ext: csv
  formatter:
    type: csv
    header_line: true
    charset: UTF-8
    newline: CRLF

Output

id,name,rank,value,created_at,embeded
55eae883689a08361045d652,obj9,9,9.9,2015-09-06 10:05:18.786000 +0000,"{""key"":""value9""}"
55eae883689a08361045d651,obj8,8,8.8,2015-09-06 10:05:28.786000 +0000,"{""key"":""value8""}"
55eae883689a08361045d650,obj7,7,7.7,2015-09-06 10:05:38.786000 +0000,"{""key"":""value7""}"
55eae883689a08361045d64f,obj6,6,6.6,2015-09-06 10:05:48.786000 +0000,"{""key"":""value6""}"
55eae883689a08361045d64e,obj5,5,5.5,2015-09-06 10:05:58.786000 +0000,"{""key"":""value5""}"
55eae883689a08361045d64d,obj4,4,4.4,2015-09-06 10:06:08.786000 +0000,"{""key"":""value4""}"
55eae883689a08361045d64c,obj3,3,3.3,2015-09-06 10:06:18.786000 +0000,"{""key2"":""value3-2"",""key3"":[""v1"",""v2""],""key"":""value3""}"

Allow limiting resultset returned

I would like to use limit() to reduce the number of results returned in a single query. Is that possible now? I don't think so.

Would be happy to help contribute, if I could have some help on where to start!

readName can only be called when State is NAME, not when State is VALUE.

I got this error.

org.embulk.exec.PartialExecutionException: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.
    at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:363)
    at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:572)
    at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:33)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:374)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:370)
    at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
    at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:370)
    at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
    at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:606)
    at RUBY.run(uri:classloader:/embulk/runner.rb:84)
    at RUBY.run(uri:classloader:/embulk/command/embulk_run.rb:306)
    at RUBY.<top>(uri:classloader:/embulk/command/embulk_main.rb:2)
    at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:937)
    at usr.local.bin.embulk.embulk.command.embulk_bundle.<top>(file:/usr/local/bin/embulk!/embulk/command/embulk_bundle.rb:30)
    at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:599)
    at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
Caused by: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.
    at org.bson.AbstractBsonReader.throwInvalidState(org/bson/AbstractBsonReader.java:634)
    at org.bson.AbstractBsonReader.readName(org/bson/AbstractBsonReader.java:546)
    at org.embulk.input.mongodb.ValueCodec.decode(org/embulk/input/mongodb/ValueCodec.java:52)
    at org.embulk.input.mongodb.ValueCodec.decode(org/embulk/input/mongodb/ValueCodec.java:24)
    at com.mongodb.connection.ReplyMessage.<init>(com/mongodb/connection/ReplyMessage.java:57)
    at com.mongodb.connection.QueryProtocol.execute(com/mongodb/connection/QueryProtocol.java:305)
    at com.mongodb.connection.QueryProtocol.execute(com/mongodb/connection/QueryProtocol.java:54)
    at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(com/mongodb/connection/DefaultServer.java:159)
    at com.mongodb.connection.DefaultServerConnection.executeProtocol(com/mongodb/connection/DefaultServerConnection.java:286)
    at com.mongodb.connection.DefaultServerConnection.query(com/mongodb/connection/DefaultServerConnection.java:209)
    at com.mongodb.operation.FindOperation$1.call(com/mongodb/operation/FindOperation.java:496)
    at com.mongodb.operation.FindOperation$1.call(com/mongodb/operation/FindOperation.java:482)
    at com.mongodb.operation.OperationHelper.withConnectionSource(com/mongodb/operation/OperationHelper.java:239)
    at com.mongodb.operation.OperationHelper.withConnection(com/mongodb/operation/OperationHelper.java:212)
    at com.mongodb.operation.FindOperation.execute(com/mongodb/operation/FindOperation.java:482)
    at com.mongodb.operation.FindOperation.execute(com/mongodb/operation/FindOperation.java:79)
    at com.mongodb.Mongo.execute(com/mongodb/Mongo.java:772)
    at com.mongodb.Mongo$2.execute(com/mongodb/Mongo.java:759)
    at com.mongodb.OperationIterable.iterator(com/mongodb/OperationIterable.java:47)
    at com.mongodb.FindIterableImpl.iterator(com/mongodb/FindIterableImpl.java:143)
    at org.embulk.input.mongodb.MongodbInputPlugin.run(org/embulk/input/mongodb/MongodbInputPlugin.java:173)
    at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(org/embulk/exec/LocalExecutorPlugin.java:294)
    at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(org/embulk/exec/LocalExecutorPlugin.java:212)
    at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(org/embulk/exec/LocalExecutorPlugin.java:257)
    at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(org/embulk/exec/LocalExecutorPlugin.java:253)
    at java.util.concurrent.FutureTask.run(java/util/concurrent/FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(java/lang/Thread.java:745)

Error: org.bson.BsonInvalidOperationException: readName can only be called when State is NAME, not when State is VALUE.

Support specifying authSource and authMechanism

I'd like to migrate to using separated URI parameters, but I am missing the ability to specify some required settings.

Currently I can specify:

uri: mongodb://myuser:mypassword@localhost:27017/my_database?authMechanism=SCRAM-SHA-1&authSource=another_database

However, I can't do this with the separated parameters. Could you add the ability to specify uri_querystring or the specific parameters that I need?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.