Code Monkey home page Code Monkey logo

elasticsearch-river-rethinkdb's Issues

River does not restart after crashing/erroring

We have a setup of a RethinkDB cluster being read from an Elasticsearch cluster using this plugin. Recently when we updated RethinkDB - which caused downtime on RethinkDB - the plugin failed to find the tables and bailed out as in the logs below.

So the documents between ES and RethinkDB went out of sync - and only when we went into the logs did we notice that the plugin had completely bailed out. Restarting the ES node where the plugin was running fixes this - but it would be great if the plugin would attempt restarting after failure like ES itself.

Logs:

[2015-04-07 11:56:48,630][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 450 documents
[2015-04-07 18:14:03,286][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 460 documents
[2015-04-08 04:02:28,360][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 470 documents
[2015-04-08 06:19:23,834][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 480 documents
[2015-04-08 12:13:20,714][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 490 documents
[2015-04-08 12:29:34,956][INFO ][cluster.metadata         ] [cortex-elasticsearch2] [cortex] update_mapping [layouts] (dynamic)
[2015-04-08 13:55:32,769][INFO ][cluster.metadata         ] [cortex-elasticsearch2] [cortex] update_mapping [layouts] (dynamic)
[2015-04-08 13:58:30,165][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 500 documents
[2015-04-08 22:07:51,018][WARN ][monitor.jvm              ] [cortex-elasticsearch2] [gc][young][4331504][511] duration [1.4s], collections [1]/[2.4s], total [1.4s]/[2.5m], memory [567.9mb]->[40mb]/[3.9gb], all_pools {[young] [531.5mb]->[3.5mb]/[532.5mb]}{[survivor] [1.4mb]->[1.2mb]/[66.5mb]}{[old] [34.8mb]->[35.3mb]/[3.3gb]}
[2015-04-14 19:56:46,340][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 510 documents
[2015-04-14 22:28:37,068][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 520 documents
[2015-04-18 14:15:29,601][WARN ][monitor.jvm              ] [cortex-elasticsearch2] [gc][young][5166964][660] duration [1.2s], collections [1]/[1.5s], total [1.2s]/[2.6m], memory [569.6mb]->[43.2mb]/[3.9gb], all_pools {[young] [532.5mb]->[6.2mb]/[532.5mb]}{[survivor] [31.1kb]->[33kb]/[66.5mb]}{[old] [37mb]->[37mb]/[3.3gb]}
[2015-04-20 18:56:07,735][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 530 documents
[2015-04-28 15:57:42,071][ERROR][river.rethinkdb.feedworker] [cortex.symlinks] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,075][INFO ][river.rethinkdb.feedworker] [cortex.symlinks] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,075][ERROR][river.rethinkdb.feedworker] [cortex.symlinks] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
    at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
    at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
    at com.rethinkdb.Cursor.next(Cursor.java:85)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
    at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,075][INFO ][river.rethinkdb.feedworker] [cortex.symlinks] thread shutting down
[2015-04-28 15:57:42,075][ERROR][river.rethinkdb.feedworker] [cortex.templates] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,078][ERROR][river.rethinkdb.feedworker] [cortex.layouts] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.layouts] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.templates] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,080][ERROR][river.rethinkdb.feedworker] [cortex.layouts] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
    at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
    at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
    at com.rethinkdb.Cursor.next(Cursor.java:85)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
    at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.layouts] thread shutting down
[2015-04-28 15:57:42,080][ERROR][river.rethinkdb.feedworker] [cortex.templates] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
    at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
    at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
    at com.rethinkdb.Cursor.next(Cursor.java:85)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
    at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,081][INFO ][river.rethinkdb.feedworker] [cortex.templates] thread shutting down

Incorrect Authorization Key

I get an authorization error, with the correct password. I Double checked this, could it be wrongly parsed somewhere? With no password, it works fine

[2014-10-04 15:20:46,170][ERROR][river.rethinkdb.feedworker] [InkOverFlow.Posts] failed due to exception
com.rethinkdb.RethinkDBException: ERROR: Incorrect authorization key.

    at com.rethinkdb.RethinkDBConnection.reconnect(RethinkDBConnection.java:56)
    at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:45)
    at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:37)
    at com.rethinkdb.RethinkDB.connect(RethinkDB.java:66)
    at org.elasticsearch.river.rethinkdb.FeedWorker.connect(FeedWorker.java:41)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:64)
    at java.lang.Thread.run(Thread.java:745)
[2014-10-04 15:20:46,170][ERROR][river.rethinkdb.feedworker] [InkOverFlow.Users] failed due to exception
com.rethinkdb.RethinkDBException: ERROR: Incorrect authorization key.

    at com.rethinkdb.RethinkDBConnection.reconnect(RethinkDBConnection.java:56)
    at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:45)
    at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:37)
    at com.rethinkdb.RethinkDB.connect(RethinkDB.java:66)
    at org.elasticsearch.river.rethinkdb.FeedWorker.connect(FeedWorker.java:41)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:64)
    at java.lang.Thread.run(Thread.java:745)
[2014-10-04 15:20:46,174][INFO ][river.rethinkdb.feedworker] [InkOverFlow.Posts] thread shutting down
[2014-10-04 15:20:46,175][INFO ][river.rethinkdb.feedworker] [InkOverFlow.Users] thread shutting down

Crash: java.lang.Double cannot be cast to java.lang.String

Hi,

I am seeing a crash of the plugin in Elastic Search logs:

==> /var/log/elasticsearch/elasticsearch.log <==
[2014-11-26 01:04:34,074][INFO ][node                     ] [elasticsearch1] initialized
[2014-11-26 01:04:34,076][INFO ][node                     ] [elasticsearch1] starting ...
[2014-11-26 01:04:34,085][INFO ][transport                ] [elasticsearch1] bound_address {local[1]}, publish_address {local[1]}
[2014-11-26 01:04:34,132][INFO ][discovery                ] [elasticsearch1] elasticsearch/CQXRqkhGSzew7d72s5HpPA
[2014-11-26 01:04:34,135][INFO ][cluster.service          ] [elasticsearch1] master {new [elasticsearch1][CQXRqkhGSzew7d72s5HpPA][es1][local[1]]{local=true}}, removed {[elasticsearch1][VvEi9KvjTzilxNsqceBiwQ][es1][local[1]]{local=true},}, reason: local-disco-initial_connect(master)
[2014-11-26 01:04:34,401][INFO ][http                     ] [elasticsearch1] bound_address {inet[/0:0:0:0:0:0:0:0:8000]}, publish_address {inet[/128.199.226.132:8000]}
[2014-11-26 01:04:34,401][INFO ][node                     ] [elasticsearch1] started
[2014-11-26 01:04:35,941][INFO ][gateway                  ] [elasticsearch1] recovered [3] indices into cluster_state
[2014-11-26 01:04:36,900][INFO ][river.rethinkdb          ] [elasticsearch1] [rethinkdb][rethinkdb] ChangeRecords: ChangeRecords({twitter={samples=ChangeRecord(twitter,samples,backfill,}})
[2014-11-26 01:04:36,908][INFO ][river.rethinkdb          ] [elasticsearch1] [rethinkdb][rethinkdb] Starting up RethinkDB River for 10.130.204.52:8000
[2014-11-26 01:04:36,916][INFO ][river.rethinkdb          ] [elasticsearch1] [rethinkdb][rethinkdb] Starting feed watcher for twitter.samples
[2014-11-26 01:04:37,805][INFO ][river.rethinkdb.feedworker] [twitter.samples] Beginning backfill of documents
[2014-11-26 01:04:38,385][ERROR][river.rethinkdb.feedworker] [twitter.samples] failed due to exception
java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.String
    at org.elasticsearch.river.rethinkdb.FeedWorker.backfill(FeedWorker.java:151)
    at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:70)
    at java.lang.Thread.run(Thread.java:745)
[2014-11-26 01:04:38,392][INFO ][river.rethinkdb.feedworker] [twitter.samples] thread shutting down

coming from this block of code:

bulkRequest.add(client.prepareIndex(
                                changeRecord.targetIndex,
                                changeRecord.targetType,
                                (String) doc.get(primaryKey)) // <--------
                                .setSource(doc)
                );

I'm streaming in raw tweet data into RethinkDB and then indexing on Elastic Search using this plugin (for fun). I'm able to store the raw tweet into Elastic Search, so does not seem to be an mapping issue.

I believe the problem is this: Rethink prefers storing the primary key id as a string, but the data coming from twitter has the id as a double. hence the error when type casting!

The tweet object looks like this:

{
  "created_at": "Wed Nov 26 05:37:56 +0000 2014",
  "id": 537480352798097400,
  "id_str": "537480352798097409",
  "text": "RT @LittleLiars: How great were @janelparrish and @iamValC on #DWTS tonight? See the results here: http://t.co/wT1cCvagxR http://t.co/F9LMP…"
}

I'm not fluent in Java, but I believe instead of type casting, it should do convert the id primaryKey into string format with something like Integer.toString().

index does not exists when I do http://localhost:9200/rethinkdbname/rethinkdoc/_search?q=eMail:nikola

I got the error:

{
    "error": "IndexMissingException[[rethinkdbname] missing]",
    "status": 404
}

when trying to search http://localhost:9200/rethinkdbname/rethinkdoc/_search?q=eMail:nikola

http://localhost:9200/_river/rethinkdb/_meta gives me:

{
    "_index": "_river",
    "_type": "rethinkdb",
    "_id": "_meta",
    "_version": 4,
    "found": true,
    "_source": {
        "type": "rethinkdb",
        "rethinkdb": {
            "databases": {
                "rethinkdbname": {
                    "rethinkdoc": {
                        "backfill": true
                    }
                }
            },
            "host": "localhost",
            "port": 28015,
            "auth_key": "4f752a0aac5a1a2ed0a6627854d174facb99dc36cd756776b609e9cb8dcce275"
        }
    }
}

Backfill fails at >1000 docs

I haven't figured out at exactly how many docs backfill starts failing, but here's what I've seen:

Created a river from a table that had 1221 items. In the logs, I see "Backfilled 1221 items" yet ElasticSearch only has 1027 indexed.

The problem gets worse the more initial documents you have. When trying to backfill ~15k docs, only 3k-8k get indexed. The number indexed changes every time you nuke the index and re-run the river.

When starting with 999 items, all 999 get indexed :)

Using version 1.0.0

elasticsearch-river-rethinkdb has Private Subrepositories

ts2@linux-0fiz:~/tmp/xx10/demo$ ls
ts2@linux-0fiz:~/tmp/xx10/demo$ date
Wed Apr 12 10:09:57 EEST 2017
ts2@linux-0fiz:~/tmp/xx10/demo$ git clone --recursive https://github.com/rethinkdb/elasticsearch-river-rethinkdb.git
Cloning into 'elasticsearch-river-rethinkdb'...
remote: Counting objects: 247, done.
remote: Total 247 (delta 0), reused 0 (delta 0), pack-reused 247
Receiving objects: 100% (247/247), 43.56 KiB | 0 bytes/s, done.
Resolving deltas: 100% (108/108), done.
Checking connectivity... done.
Submodule 'rethink-java-driver' ([email protected]:npiv/rethink-java-driver) registered for path 'rethink-java-driver'
Cloning into 'rethink-java-driver'...
Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.
Clone of '[email protected]:npiv/rethink-java-driver' into submodule path 'rethink-java-driver' failed
ts2@linux-0fiz:~/tmp/xx10/demo$

does this incorporate the rethinkDB changefeed for near-realtime ingestion?

When glancing the docs it seems I need to manually start (re)indexing of an ES index based on RethinkDB-table. Is that correct?

For occasional full re-indexing this is prefect. However, it would be great to be able to hook the ES-river into the RethinkDB-changefeed for near-realtime updating/syncing ES with RethinkDB. I'm sure this is not a novel idea :), but is are there any thoughts / progress done in this direction?

Mark this deprecated in favor of Logstash plugin

We should mark this as no longer supported in favor of https://github.com/rethinkdb/logstash-input-rethinkdb .
We're going to update the logstash input soon to make use of RethinkDB 2.2's new include_initial feature (rethinkdb/rethinkdb#3197). Also the river plugin uses a protobuf-based driver, which will no longer work with RethinkDB 2.2.

We should add a "deprecated / no longer maintained" warning in the README, and replace any references in our docs by references to the logstash plugin.

Elastic Search unable to find plugin when used with Docker

I'm trying to create a Dockerfile that will install Elastic Search along with this plugin. When I use Homebrew locally on my machine, everything is fine. But in this container, when Elastic Search is booted it gives the following error.

I'm using: https://github.com/paramaggarwal/elasticsearch/blob/master/Dockerfile

[ root@741cf0741851:/elasticsearch/config ]$ /elasticsearch/bin/elasticsearch
[2014-10-09 11:12:10,517][INFO ][node                     ] [Paragon] version[1.3.2], pid[173], build[dee175d/2014-08-13T14:29:30Z]
[2014-10-09 11:12:10,518][INFO ][node                     ] [Paragon] initializing ...
[2014-10-09 11:12:10,524][TRACE][plugins                  ] [Paragon] --- adding plugin [/elasticsearch/plugins/river-rethinkdb]
[2014-10-09 11:12:10,528][WARN ][plugins                  ] [Paragon] failed to load plugin from [file:/elasticsearch/plugins/river-rethinkdb/es-plugin.properties]
org.elasticsearch.ElasticsearchException: Failed to load plugin class [org.elasticsearch.plugin.river.rethinkdb.RethinkDBRiverPlugin]
    at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:531)
    at org.elasticsearch.plugins.PluginsService.loadPluginsFromClasspath(PluginsService.java:406)
    at org.elasticsearch.plugins.PluginsService.<init>(PluginsService.java:115)
    at org.elasticsearch.node.internal.InternalNode.<init>(InternalNode.java:146)
    at org.elasticsearch.node.NodeBuilder.build(NodeBuilder.java:159)
    at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:70)
    at org.elasticsearch.bootstrap.Bootstrap.main(Bootstrap.java:203)
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:32)
Caused by: java.lang.UnsupportedClassVersionError: org/elasticsearch/plugin/river/rethinkdb/RethinkDBRiverPlugin : Unsupported major.minor version 52.0
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:508)
    ... 7 more
[2014-10-09 11:12:10,552][WARN ][plugins                  ] [Paragon] failed to load plugin from [jar:file:/elasticsearch/plugins/river-rethinkdb/elasticsearch-river-rethinkdb-1.0.0.jar!/es-plugin.properties]
org.elasticsearch.ElasticsearchException: Failed to load plugin class [org.elasticsearch.plugin.river.rethinkdb.RethinkDBRiverPlugin]
    at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:531)
    at org.elasticsearch.plugins.PluginsService.loadPluginsFromClasspath(PluginsService.java:406)
    at org.elasticsearch.plugins.PluginsService.<init>(PluginsService.java:115)
    at org.elasticsearch.node.internal.InternalNode.<init>(InternalNode.java:146)
    at org.elasticsearch.node.NodeBuilder.build(NodeBuilder.java:159)
    at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:70)
    at org.elasticsearch.bootstrap.Bootstrap.main(Bootstrap.java:203)
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:32)
Caused by: java.lang.UnsupportedClassVersionError: org/elasticsearch/plugin/river/rethinkdb/RethinkDBRiverPlugin : Unsupported major.minor version 52.0
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:508)
    ... 7 more
[2014-10-09 11:12:10,567][DEBUG][plugins                  ] [Paragon] [/elasticsearch/plugins/river-rethinkdb/_site] directory does not exist.
[2014-10-09 11:12:10,567][INFO ][plugins                  ] [Paragon] loaded [], sites []
[2014-10-09 11:12:14,704][INFO ][node                     ] [Paragon] initialized
[2014-10-09 11:12:14,705][INFO ][node                     ] [Paragon] starting ...
[2014-10-09 11:12:14,815][INFO ][transport                ] [Paragon] bound_address {inet[/0:0:0:0:0:0:0:0:9300]}, publish_address {inet[/172.17.0.30:9300]}
[2014-10-09 11:12:14,836][INFO ][discovery                ] [Paragon] elasticsearch/_nWsA46NSXeNA5g9AGf08g
[2014-10-09 11:12:17,881][INFO ][cluster.service          ] [Paragon] new_master [Paragon][_nWsA46NSXeNA5g9AGf08g][741cf0741851][inet[/172.17.0.30:9300]], reason: zen-disco-join (elected_as_master)
[2014-10-09 11:12:17,975][INFO ][http                     ] [Paragon] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[/172.17.0.30:9200]}
[2014-10-09 11:12:17,981][INFO ][node                     ] [Paragon] started
[2014-10-09 11:12:18,025][INFO ][gateway                  ] [Paragon] recovered [0] indices into cluster_state

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.