Comments (8)
We've tried some basic fixes - capping the stream size and such, but it seems the problem is within the elastic search indexer itself, as the documents seem to flow into that without queuing much in the river code.
Other than a naive rate throttle (x documents per second) is there any better way of monitoring whether the elastic search back end is keeping up, to intelligently throttle input and prevent it falling over?
from elasticsearch-river-mongodb.
I came to the OOM problem. It seems that coutchdb has some 'throttling_size' setting which may make the falling over under control. Is there any settings same like 'throttling_size' in mongo river?
from elasticsearch-river-mongodb.
I fixed this issue after I modified some code. Based on the coutchdb river implementation, I modifed the stream map type from TransferQueue<Map<String, Object>> to BlockingQueue<Map<String, Object>>, and added var throttleSize to keep the map size just as coutchdb did. And I also changed the way of adding data to the stream, changed from add to put, which will wait for space to hold data.
Below is my MongoDBRiver source code-changed block.
public final static String THROTTLE_SIZE_FIELD = "throttle_size";
private final BlockingQueue<Map<String, Object>> stream;
@SuppressWarnings("unchecked")
@Inject
public MongoDBRiver(final RiverName riverName,
final RiverSettings settings,
@RiverIndexName final String riverIndexName, final Client client,
final ScriptService scriptService) {
super(riverName, settings);
if (logger.isDebugEnabled()) {
logger.debug("Prefix: " + logger.getPrefix() + " - name: " + logger.getName());
}
this.riverIndexName = riverIndexName;
this.client = client;
String mongoHost;
int mongoPort;
if (settings.settings().containsKey(RIVER_TYPE)) {
Map<String, Object> mongoSettings = (Map<String, Object>) settings
.settings().get(RIVER_TYPE);
if (mongoSettings.containsKey(SERVERS_FIELD)) {
Object mongoServersSettings = mongoSettings.get(SERVERS_FIELD);
logger.info("mongoServersSettings: " + mongoServersSettings);
boolean array = XContentMapValues.isArray(mongoServersSettings);
if (array) {
ArrayList<Map<String, Object>> feeds = (ArrayList<Map<String, Object>>) mongoServersSettings;
for (Map<String, Object> feed : feeds) {
mongoHost = XContentMapValues.nodeStringValue(feed.get(HOST_FIELD), null);
mongoPort = XContentMapValues.nodeIntegerValue(feed.get(PORT_FIELD), 0);
logger.info("Server: " + mongoHost + " - " + mongoPort);
try {
mongoServers.add(new ServerAddress(mongoHost, mongoPort));
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
else {
mongoHost = XContentMapValues.nodeStringValue(
mongoSettings.get(HOST_FIELD), DEFAULT_DB_HOST);
mongoPort = XContentMapValues.nodeIntegerValue(
mongoSettings.get(PORT_FIELD), DEFAULT_DB_PORT);
try {
mongoServers.add(new ServerAddress(mongoHost, mongoPort));
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// MongoDB options
if (mongoSettings.containsKey(OPTIONS_FIELD)) {
Map<String, Object> mongoOptionsSettings = (Map<String, Object>) mongoSettings.get(OPTIONS_FIELD);
mongoSecondaryReadPreference = XContentMapValues.nodeBooleanValue(
mongoOptionsSettings.get(SECONDARY_READ_PREFERENCE_FIELD), false);
}
else {
mongoSecondaryReadPreference = false;
}
// Credentials
if (mongoSettings.containsKey(CREDENTIALS_FIELD)) {
String dbCredential;
String mau = "";
String map = "";
String mlu = "";
String mlp = "";
String mdu = "";
String mdp = "";
Object mongoCredentialsSettings = mongoSettings.get(CREDENTIALS_FIELD);
boolean array = XContentMapValues.isArray(mongoCredentialsSettings);
if (array) {
ArrayList<Map<String, Object>> credentials = (ArrayList<Map<String, Object>>) mongoCredentialsSettings;
for (Map<String, Object> credential : credentials) {
dbCredential = XContentMapValues.nodeStringValue(credential.get(DB_FIELD), null);
if (DB_ADMIN.equals(dbCredential)) {
mau = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
map = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
} else if (DB_LOCAL.equals(dbCredential)) {
mlu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
mlp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
} else {
mdu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
mdp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
}
}
}
mongoAdminUser = mau;
mongoAdminPassword = map;
mongoLocalUser = mlu;
mongoLocalPassword = mlp;
mongoDbUser = mdu;
mongoDbPassword = mdp;
} else {
mongoAdminUser = "";
mongoAdminPassword = "";
mongoLocalUser = "";
mongoLocalPassword = "";
mongoDbUser = "";
mongoDbPassword = "";
}
mongoDb = XContentMapValues.nodeStringValue(
mongoSettings.get(DB_FIELD), riverName.name());
mongoCollection = XContentMapValues.nodeStringValue(
mongoSettings.get(COLLECTION_FIELD), riverName.name());
mongoGridFS = XContentMapValues.nodeBooleanValue(
mongoSettings.get(GRIDFS_FIELD), false);
} else {
mongoHost = DEFAULT_DB_HOST;
mongoPort = DEFAULT_DB_PORT;
try {
mongoServers.add(new ServerAddress(mongoHost, mongoPort));
} catch (UnknownHostException e) {
e.printStackTrace();
}
mongoSecondaryReadPreference = false;
mongoDb = riverName.name();
mongoCollection = riverName.name();
mongoGridFS = false;
mongoAdminUser = "";
mongoAdminPassword = "";
mongoLocalUser = "";
mongoLocalPassword = "";
mongoDbUser = "";
mongoDbPassword = "";
}
mongoOplogNamespace = mongoDb + "." + mongoCollection;
if (settings.settings().containsKey(INDEX_OBJECT)) {
Map<String, Object> indexSettings = (Map<String, Object>) settings
.settings().get(INDEX_OBJECT);
indexName = XContentMapValues.nodeStringValue(
indexSettings.get(NAME_FIELD), mongoDb);
typeName = XContentMapValues.nodeStringValue(
indexSettings.get(TYPE_FIELD), mongoDb);
bulkSize = XContentMapValues.nodeIntegerValue(
indexSettings.get(BULK_SIZE_FIELD), 100);
if (indexSettings.containsKey(BULK_TIMEOUT_FIELD)) {
bulkTimeout = TimeValue.parseTimeValue(
XContentMapValues.nodeStringValue(
indexSettings.get(BULK_TIMEOUT_FIELD), "10ms"),
TimeValue.timeValueMillis(10));
} else {
bulkTimeout = TimeValue.timeValueMillis(10);
}
throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkSize * 5);
} else {
indexName = mongoDb;
typeName = mongoDb;
bulkSize = 100;
bulkTimeout = TimeValue.timeValueMillis(10);
throttleSize = bulkSize * 5;
}
if (throttleSize == -1) {
stream = new LinkedTransferQueue<Map<String, Object>>();
} else {
stream = new ArrayBlockingQueue<Map<String, Object>>(throttleSize);
}
}
private void addToStream(final String operation,
final BSONTimestamp currentTimestamp,
final Map<String, Object> data) {
data.put(OPLOG_TIMESTAMP, currentTimestamp);
data.put(OPLOG_OPERATION, operation);
try
{
stream.put(data);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
from elasticsearch-river-mongodb.
you can replace the jar with the new jar file. download url: http://dl.iteye.com/topics/download/b897a860-6828-32d2-90ad-d70fcf00c182
from elasticsearch-river-mongodb.
Hi Spancer,
Thank you I will integrate this fix this week.
Thanks,
Richard.
from elasticsearch-river-mongodb.
I have the same issue on the project I am working on.
Is it possible to integrate the fix faster?
Thanks!
from elasticsearch-river-mongodb.
Version 1.5.0 has been published. It should fix this issue (I was able to index 12 million documents with no issue).
from elasticsearch-river-mongodb.
Thanks a lot. I'll get it as soon as I can.
from elasticsearch-river-mongodb.
Related Issues (20)
- Make the river more resistant to bulk import failures HOT 1
- Compatibility with ES 2.0 & MongoDB 3.0 or 2.6.11 HOT 1
- can't use 'local' database through mongos
- version update HOT 1
- MongoDB location field type is identified as boolean
- Help,My mongo-river has error~ClassNotFoundException[mongodb] HOT 3
- Data is not replicating from MongoDB to elasticSearch and mongo-river is not running HOT 1
- ES 2.2.0 how install river for mongodb error 'plugin-descriptor.properties' HOT 6
- Release new version HOT 6
- River in replica set and shard not updating index at all times HOT 4
- ELASTICSEARCH connect MONGOLAB BDD seems empty
- I am trying to use the MongoDB river (v2.0.11) with elasticsearch (v1.7.3) , can't sync data to elasticsearch HOT 1
- CollectionScan died due to position in capped collection being deleted
- create river on mongodb sharded cluster
- how to install for elasticsearch 2.2.0 ?
- how to install it for elasticsearch 5.0.1 HOT 2
- Impossible to import collection with binary _id HOT 1
- Question: can't install mongodb river plugin on windows command HOT 1
- Support for ElasticSearch 5.0+ Version HOT 3
- Project seems dead. Is there an alternative? HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from elasticsearch-river-mongodb.