elasticsearch-river-rethinkdb's Issues
River does not restart after crashing/erroring
We have a setup of a RethinkDB cluster being read from an Elasticsearch cluster using this plugin. Recently when we updated RethinkDB - which caused downtime on RethinkDB - the plugin failed to find the tables and bailed out as in the logs below.
So the documents between ES and RethinkDB went out of sync - and only when we went into the logs did we notice that the plugin had completely bailed out. Restarting the ES node where the plugin was running fixes this - but it would be great if the plugin would attempt restarting after failure like ES itself.
Logs:
[2015-04-07 11:56:48,630][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 450 documents
[2015-04-07 18:14:03,286][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 460 documents
[2015-04-08 04:02:28,360][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 470 documents
[2015-04-08 06:19:23,834][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 480 documents
[2015-04-08 12:13:20,714][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 490 documents
[2015-04-08 12:29:34,956][INFO ][cluster.metadata ] [cortex-elasticsearch2] [cortex] update_mapping [layouts] (dynamic)
[2015-04-08 13:55:32,769][INFO ][cluster.metadata ] [cortex-elasticsearch2] [cortex] update_mapping [layouts] (dynamic)
[2015-04-08 13:58:30,165][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 500 documents
[2015-04-08 22:07:51,018][WARN ][monitor.jvm ] [cortex-elasticsearch2] [gc][young][4331504][511] duration [1.4s], collections [1]/[2.4s], total [1.4s]/[2.5m], memory [567.9mb]->[40mb]/[3.9gb], all_pools {[young] [531.5mb]->[3.5mb]/[532.5mb]}{[survivor] [1.4mb]->[1.2mb]/[66.5mb]}{[old] [34.8mb]->[35.3mb]/[3.3gb]}
[2015-04-14 19:56:46,340][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 510 documents
[2015-04-14 22:28:37,068][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 520 documents
[2015-04-18 14:15:29,601][WARN ][monitor.jvm ] [cortex-elasticsearch2] [gc][young][5166964][660] duration [1.2s], collections [1]/[1.5s], total [1.2s]/[2.6m], memory [569.6mb]->[43.2mb]/[3.9gb], all_pools {[young] [532.5mb]->[6.2mb]/[532.5mb]}{[survivor] [31.1kb]->[33kb]/[66.5mb]}{[old] [37mb]->[37mb]/[3.3gb]}
[2015-04-20 18:56:07,735][INFO ][river.rethinkdb.feedworker] [cortex.layouts] Synced 530 documents
[2015-04-28 15:57:42,071][ERROR][river.rethinkdb.feedworker] [cortex.symlinks] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,075][INFO ][river.rethinkdb.feedworker] [cortex.symlinks] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,075][ERROR][river.rethinkdb.feedworker] [cortex.symlinks] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
at com.rethinkdb.Cursor.next(Cursor.java:85)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,075][INFO ][river.rethinkdb.feedworker] [cortex.symlinks] thread shutting down
[2015-04-28 15:57:42,075][ERROR][river.rethinkdb.feedworker] [cortex.templates] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,078][ERROR][river.rethinkdb.feedworker] [cortex.layouts] Worker has a problem: RUNTIME_ERROR: Changefeed aborted (table unavailable).
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.layouts] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.templates] This probably isn't recoverable, bailing.
[2015-04-28 15:57:42,080][ERROR][river.rethinkdb.feedworker] [cortex.layouts] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
at com.rethinkdb.Cursor.next(Cursor.java:85)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,080][INFO ][river.rethinkdb.feedworker] [cortex.layouts] thread shutting down
[2015-04-28 15:57:42,080][ERROR][river.rethinkdb.feedworker] [cortex.templates] failed due to exception
com.rethinkdb.RethinkDBException: RUNTIME_ERROR: Changefeed aborted (table unavailable).
at com.rethinkdb.response.DBResultFactory.convert(DBResultFactory.java:25)
at com.rethinkdb.Cursor.loadNextBatch(Cursor.java:62)
at com.rethinkdb.Cursor.next(Cursor.java:85)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:77)
at java.lang.Thread.run(Thread.java:744)
[2015-04-28 15:57:42,081][INFO ][river.rethinkdb.feedworker] [cortex.templates] thread shutting down
Incorrect Authorization Key
I get an authorization error, with the correct password. I Double checked this, could it be wrongly parsed somewhere? With no password, it works fine
[2014-10-04 15:20:46,170][ERROR][river.rethinkdb.feedworker] [InkOverFlow.Posts] failed due to exception
com.rethinkdb.RethinkDBException: ERROR: Incorrect authorization key.
at com.rethinkdb.RethinkDBConnection.reconnect(RethinkDBConnection.java:56)
at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:45)
at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:37)
at com.rethinkdb.RethinkDB.connect(RethinkDB.java:66)
at org.elasticsearch.river.rethinkdb.FeedWorker.connect(FeedWorker.java:41)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:64)
at java.lang.Thread.run(Thread.java:745)
[2014-10-04 15:20:46,170][ERROR][river.rethinkdb.feedworker] [InkOverFlow.Users] failed due to exception
com.rethinkdb.RethinkDBException: ERROR: Incorrect authorization key.
at com.rethinkdb.RethinkDBConnection.reconnect(RethinkDBConnection.java:56)
at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:45)
at com.rethinkdb.RethinkDBConnection.<init>(RethinkDBConnection.java:37)
at com.rethinkdb.RethinkDB.connect(RethinkDB.java:66)
at org.elasticsearch.river.rethinkdb.FeedWorker.connect(FeedWorker.java:41)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:64)
at java.lang.Thread.run(Thread.java:745)
[2014-10-04 15:20:46,174][INFO ][river.rethinkdb.feedworker] [InkOverFlow.Posts] thread shutting down
[2014-10-04 15:20:46,175][INFO ][river.rethinkdb.feedworker] [InkOverFlow.Users] thread shutting down
Crash: java.lang.Double cannot be cast to java.lang.String
Hi,
I am seeing a crash of the plugin in Elastic Search logs:
==> /var/log/elasticsearch/elasticsearch.log <==
[2014-11-26 01:04:34,074][INFO ][node ] [elasticsearch1] initialized
[2014-11-26 01:04:34,076][INFO ][node ] [elasticsearch1] starting ...
[2014-11-26 01:04:34,085][INFO ][transport ] [elasticsearch1] bound_address {local[1]}, publish_address {local[1]}
[2014-11-26 01:04:34,132][INFO ][discovery ] [elasticsearch1] elasticsearch/CQXRqkhGSzew7d72s5HpPA
[2014-11-26 01:04:34,135][INFO ][cluster.service ] [elasticsearch1] master {new [elasticsearch1][CQXRqkhGSzew7d72s5HpPA][es1][local[1]]{local=true}}, removed {[elasticsearch1][VvEi9KvjTzilxNsqceBiwQ][es1][local[1]]{local=true},}, reason: local-disco-initial_connect(master)
[2014-11-26 01:04:34,401][INFO ][http ] [elasticsearch1] bound_address {inet[/0:0:0:0:0:0:0:0:8000]}, publish_address {inet[/128.199.226.132:8000]}
[2014-11-26 01:04:34,401][INFO ][node ] [elasticsearch1] started
[2014-11-26 01:04:35,941][INFO ][gateway ] [elasticsearch1] recovered [3] indices into cluster_state
[2014-11-26 01:04:36,900][INFO ][river.rethinkdb ] [elasticsearch1] [rethinkdb][rethinkdb] ChangeRecords: ChangeRecords({twitter={samples=ChangeRecord(twitter,samples,backfill,}})
[2014-11-26 01:04:36,908][INFO ][river.rethinkdb ] [elasticsearch1] [rethinkdb][rethinkdb] Starting up RethinkDB River for 10.130.204.52:8000
[2014-11-26 01:04:36,916][INFO ][river.rethinkdb ] [elasticsearch1] [rethinkdb][rethinkdb] Starting feed watcher for twitter.samples
[2014-11-26 01:04:37,805][INFO ][river.rethinkdb.feedworker] [twitter.samples] Beginning backfill of documents
[2014-11-26 01:04:38,385][ERROR][river.rethinkdb.feedworker] [twitter.samples] failed due to exception
java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.String
at org.elasticsearch.river.rethinkdb.FeedWorker.backfill(FeedWorker.java:151)
at org.elasticsearch.river.rethinkdb.FeedWorker.run(FeedWorker.java:70)
at java.lang.Thread.run(Thread.java:745)
[2014-11-26 01:04:38,392][INFO ][river.rethinkdb.feedworker] [twitter.samples] thread shutting down
coming from this block of code:
bulkRequest.add(client.prepareIndex(
changeRecord.targetIndex,
changeRecord.targetType,
(String) doc.get(primaryKey)) // <--------
.setSource(doc)
);
I'm streaming in raw tweet data into RethinkDB and then indexing on Elastic Search using this plugin (for fun). I'm able to store the raw tweet into Elastic Search, so does not seem to be an mapping issue.
I believe the problem is this: Rethink prefers storing the primary key id
as a string
, but the data coming from twitter has the id
as a double
. hence the error when type casting!
The tweet object looks like this:
{
"created_at": "Wed Nov 26 05:37:56 +0000 2014",
"id": 537480352798097400,
"id_str": "537480352798097409",
"text": "RT @LittleLiars: How great were @janelparrish and @iamValC on #DWTS tonight? See the results here: http://t.co/wT1cCvagxR http://t.co/F9LMP…"
}
I'm not fluent in Java, but I believe instead of type casting, it should do convert the id
primaryKey into string format with something like Integer.toString()
.
index does not exists when I do http://localhost:9200/rethinkdbname/rethinkdoc/_search?q=eMail:nikola
I got the error:
{ "error": "IndexMissingException[[rethinkdbname] missing]", "status": 404 }
when trying to search http://localhost:9200/rethinkdbname/rethinkdoc/_search?q=eMail:nikola
http://localhost:9200/_river/rethinkdb/_meta gives me:
{ "_index": "_river", "_type": "rethinkdb", "_id": "_meta", "_version": 4, "found": true, "_source": { "type": "rethinkdb", "rethinkdb": { "databases": { "rethinkdbname": { "rethinkdoc": { "backfill": true } } }, "host": "localhost", "port": 28015, "auth_key": "4f752a0aac5a1a2ed0a6627854d174facb99dc36cd756776b609e9cb8dcce275" } } }
Backfill fails at >1000 docs
I haven't figured out at exactly how many docs backfill starts failing, but here's what I've seen:
Created a river from a table that had 1221 items. In the logs, I see "Backfilled 1221 items" yet ElasticSearch only has 1027 indexed.
The problem gets worse the more initial documents you have. When trying to backfill ~15k docs, only 3k-8k get indexed. The number indexed changes every time you nuke the index and re-run the river.
When starting with 999 items, all 999 get indexed :)
Using version 1.0.0
elasticsearch-river-rethinkdb has Private Subrepositories
ts2@linux-0fiz:~/tmp/xx10/demo$ ls
ts2@linux-0fiz:~/tmp/xx10/demo$ date
Wed Apr 12 10:09:57 EEST 2017
ts2@linux-0fiz:~/tmp/xx10/demo$ git clone --recursive https://github.com/rethinkdb/elasticsearch-river-rethinkdb.git
Cloning into 'elasticsearch-river-rethinkdb'...
remote: Counting objects: 247, done.
remote: Total 247 (delta 0), reused 0 (delta 0), pack-reused 247
Receiving objects: 100% (247/247), 43.56 KiB | 0 bytes/s, done.
Resolving deltas: 100% (108/108), done.
Checking connectivity... done.
Submodule 'rethink-java-driver' ([email protected]:npiv/rethink-java-driver) registered for path 'rethink-java-driver'
Cloning into 'rethink-java-driver'...
Permission denied (publickey).
fatal: Could not read from remote repository.
Please make sure you have the correct access rights
and the repository exists.
Clone of '[email protected]:npiv/rethink-java-driver' into submodule path 'rethink-java-driver' failed
ts2@linux-0fiz:~/tmp/xx10/demo$
ES 1.4.x support
Just wondering if 1.4 .x support is on the roadmap
use logstash as new data ingestion strategy
Be nice to have a way to specify exactly what is exposed to ES.
does this incorporate the rethinkDB changefeed for near-realtime ingestion?
When glancing the docs it seems I need to manually start (re)indexing of an ES index based on RethinkDB-table. Is that correct?
For occasional full re-indexing this is prefect. However, it would be great to be able to hook the ES-river into the RethinkDB-changefeed for near-realtime updating/syncing ES with RethinkDB. I'm sure this is not a novel idea :), but is are there any thoughts / progress done in this direction?
Mark this deprecated in favor of Logstash plugin
We should mark this as no longer supported in favor of https://github.com/rethinkdb/logstash-input-rethinkdb .
We're going to update the logstash input soon to make use of RethinkDB 2.2's new include_initial
feature (rethinkdb/rethinkdb#3197). Also the river plugin uses a protobuf-based driver, which will no longer work with RethinkDB 2.2.
We should add a "deprecated / no longer maintained" warning in the README, and replace any references in our docs by references to the logstash plugin.
Elastic Search unable to find plugin when used with Docker
I'm trying to create a Dockerfile
that will install Elastic Search along with this plugin. When I use Homebrew locally on my machine, everything is fine. But in this container, when Elastic Search is booted it gives the following error.
I'm using: https://github.com/paramaggarwal/elasticsearch/blob/master/Dockerfile
[ root@741cf0741851:/elasticsearch/config ]$ /elasticsearch/bin/elasticsearch
[2014-10-09 11:12:10,517][INFO ][node ] [Paragon] version[1.3.2], pid[173], build[dee175d/2014-08-13T14:29:30Z]
[2014-10-09 11:12:10,518][INFO ][node ] [Paragon] initializing ...
[2014-10-09 11:12:10,524][TRACE][plugins ] [Paragon] --- adding plugin [/elasticsearch/plugins/river-rethinkdb]
[2014-10-09 11:12:10,528][WARN ][plugins ] [Paragon] failed to load plugin from [file:/elasticsearch/plugins/river-rethinkdb/es-plugin.properties]
org.elasticsearch.ElasticsearchException: Failed to load plugin class [org.elasticsearch.plugin.river.rethinkdb.RethinkDBRiverPlugin]
at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:531)
at org.elasticsearch.plugins.PluginsService.loadPluginsFromClasspath(PluginsService.java:406)
at org.elasticsearch.plugins.PluginsService.<init>(PluginsService.java:115)
at org.elasticsearch.node.internal.InternalNode.<init>(InternalNode.java:146)
at org.elasticsearch.node.NodeBuilder.build(NodeBuilder.java:159)
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:70)
at org.elasticsearch.bootstrap.Bootstrap.main(Bootstrap.java:203)
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:32)
Caused by: java.lang.UnsupportedClassVersionError: org/elasticsearch/plugin/river/rethinkdb/RethinkDBRiverPlugin : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:508)
... 7 more
[2014-10-09 11:12:10,552][WARN ][plugins ] [Paragon] failed to load plugin from [jar:file:/elasticsearch/plugins/river-rethinkdb/elasticsearch-river-rethinkdb-1.0.0.jar!/es-plugin.properties]
org.elasticsearch.ElasticsearchException: Failed to load plugin class [org.elasticsearch.plugin.river.rethinkdb.RethinkDBRiverPlugin]
at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:531)
at org.elasticsearch.plugins.PluginsService.loadPluginsFromClasspath(PluginsService.java:406)
at org.elasticsearch.plugins.PluginsService.<init>(PluginsService.java:115)
at org.elasticsearch.node.internal.InternalNode.<init>(InternalNode.java:146)
at org.elasticsearch.node.NodeBuilder.build(NodeBuilder.java:159)
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:70)
at org.elasticsearch.bootstrap.Bootstrap.main(Bootstrap.java:203)
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:32)
Caused by: java.lang.UnsupportedClassVersionError: org/elasticsearch/plugin/river/rethinkdb/RethinkDBRiverPlugin : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.elasticsearch.plugins.PluginsService.loadPlugin(PluginsService.java:508)
... 7 more
[2014-10-09 11:12:10,567][DEBUG][plugins ] [Paragon] [/elasticsearch/plugins/river-rethinkdb/_site] directory does not exist.
[2014-10-09 11:12:10,567][INFO ][plugins ] [Paragon] loaded [], sites []
[2014-10-09 11:12:14,704][INFO ][node ] [Paragon] initialized
[2014-10-09 11:12:14,705][INFO ][node ] [Paragon] starting ...
[2014-10-09 11:12:14,815][INFO ][transport ] [Paragon] bound_address {inet[/0:0:0:0:0:0:0:0:9300]}, publish_address {inet[/172.17.0.30:9300]}
[2014-10-09 11:12:14,836][INFO ][discovery ] [Paragon] elasticsearch/_nWsA46NSXeNA5g9AGf08g
[2014-10-09 11:12:17,881][INFO ][cluster.service ] [Paragon] new_master [Paragon][_nWsA46NSXeNA5g9AGf08g][741cf0741851][inet[/172.17.0.30:9300]], reason: zen-disco-join (elected_as_master)
[2014-10-09 11:12:17,975][INFO ][http ] [Paragon] bound_address {inet[/0:0:0:0:0:0:0:0:9200]}, publish_address {inet[/172.17.0.30:9200]}
[2014-10-09 11:12:17,981][INFO ][node ] [Paragon] started
[2014-10-09 11:12:18,025][INFO ][gateway ] [Paragon] recovered [0] indices into cluster_state
Add script support
Many rivers support using server scripts:
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-scripting.html
This would be nice to have at some point to allow filtering or joining per discussion in rethinkdb/rethinkdb#1009
use logging.yml file to set plugin log level
Not sure if this is the right place to put this issue.
I need to suppress the DEBUG
logs caused by com.rethinkdb.RethinkDBConnection
package. It's not clear to me if the vendor rethinkdb client follows the elasticsearch logging conventions
Thanks!
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.