Code Monkey home page Code Monkey logo

Comments (8)

ajaxros avatar ajaxros commented on June 7, 2024

We've tried some basic fixes - capping the stream size and such, but it seems the problem is within the elastic search indexer itself, as the documents seem to flow into that without queuing much in the river code.
Other than a naive rate throttle (x documents per second) is there any better way of monitoring whether the elastic search back end is keeping up, to intelligently throttle input and prevent it falling over?

from elasticsearch-river-mongodb.

spancer avatar spancer commented on June 7, 2024

I came to the OOM problem. It seems that coutchdb has some 'throttling_size' setting which may make the falling over under control. Is there any settings same like 'throttling_size' in mongo river?

from elasticsearch-river-mongodb.

spancer avatar spancer commented on June 7, 2024

I fixed this issue after I modified some code. Based on the coutchdb river implementation, I modifed the stream map type from TransferQueue<Map<String, Object>> to BlockingQueue<Map<String, Object>>, and added var throttleSize to keep the map size just as coutchdb did. And I also changed the way of adding data to the stream, changed from add to put, which will wait for space to hold data.
Below is my MongoDBRiver source code-changed block.

public final static String THROTTLE_SIZE_FIELD = "throttle_size";

private final BlockingQueue<Map<String, Object>> stream;

@SuppressWarnings("unchecked")
@Inject
public MongoDBRiver(final RiverName riverName,
        final RiverSettings settings,
        @RiverIndexName final String riverIndexName, final Client client,
        final ScriptService scriptService) {
    super(riverName, settings);
    if (logger.isDebugEnabled()) {
        logger.debug("Prefix: " + logger.getPrefix() + " - name: " + logger.getName());
    }
    this.riverIndexName = riverIndexName;
    this.client = client;
    String mongoHost;
    int mongoPort;

    if (settings.settings().containsKey(RIVER_TYPE)) {
        Map<String, Object> mongoSettings = (Map<String, Object>) settings
                .settings().get(RIVER_TYPE);
        if (mongoSettings.containsKey(SERVERS_FIELD)) {
            Object mongoServersSettings = mongoSettings.get(SERVERS_FIELD);
            logger.info("mongoServersSettings: " + mongoServersSettings);
            boolean array = XContentMapValues.isArray(mongoServersSettings);

            if (array) {
                ArrayList<Map<String, Object>> feeds = (ArrayList<Map<String, Object>>) mongoServersSettings;
                for (Map<String, Object> feed : feeds) {
                    mongoHost = XContentMapValues.nodeStringValue(feed.get(HOST_FIELD), null);
                    mongoPort = XContentMapValues.nodeIntegerValue(feed.get(PORT_FIELD), 0);
                    logger.info("Server: " + mongoHost + " - " + mongoPort);
                    try {
                        mongoServers.add(new ServerAddress(mongoHost, mongoPort));
                    } catch (UnknownHostException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }   
            }
        }
        else {
            mongoHost = XContentMapValues.nodeStringValue(
                    mongoSettings.get(HOST_FIELD), DEFAULT_DB_HOST);
            mongoPort = XContentMapValues.nodeIntegerValue(
                    mongoSettings.get(PORT_FIELD), DEFAULT_DB_PORT);
            try {
                mongoServers.add(new ServerAddress(mongoHost, mongoPort));
            } catch (UnknownHostException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        // MongoDB options
        if (mongoSettings.containsKey(OPTIONS_FIELD)) {
            Map<String, Object> mongoOptionsSettings = (Map<String, Object>) mongoSettings.get(OPTIONS_FIELD);
            mongoSecondaryReadPreference = XContentMapValues.nodeBooleanValue(
                    mongoOptionsSettings.get(SECONDARY_READ_PREFERENCE_FIELD), false);
        }
        else {
            mongoSecondaryReadPreference = false;
        }

        // Credentials
        if (mongoSettings.containsKey(CREDENTIALS_FIELD)) {
            String dbCredential;
            String mau = "";
            String map = "";
            String mlu = "";
            String mlp = "";
            String mdu = "";
            String mdp = "";
            Object mongoCredentialsSettings = mongoSettings.get(CREDENTIALS_FIELD);
            boolean array = XContentMapValues.isArray(mongoCredentialsSettings);

            if (array) {
                ArrayList<Map<String, Object>> credentials = (ArrayList<Map<String, Object>>) mongoCredentialsSettings;
                for (Map<String, Object> credential : credentials) {
                    dbCredential = XContentMapValues.nodeStringValue(credential.get(DB_FIELD), null);
                    if (DB_ADMIN.equals(dbCredential)) {
                        mau = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
                        map = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
                    } else if (DB_LOCAL.equals(dbCredential)) {
                        mlu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
                        mlp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
                    } else {
                        mdu = XContentMapValues.nodeStringValue(credential.get(USER_FIELD), null);
                        mdp = XContentMapValues.nodeStringValue(credential.get(PASSWORD_FIELD), null);
                    }
                }   
            }
            mongoAdminUser = mau;
            mongoAdminPassword = map;
            mongoLocalUser = mlu;
            mongoLocalPassword = mlp;
            mongoDbUser = mdu;
            mongoDbPassword = mdp;

        } else {
            mongoAdminUser = "";
            mongoAdminPassword = "";
            mongoLocalUser = "";
            mongoLocalPassword = "";
            mongoDbUser = "";
            mongoDbPassword = "";
        }

        mongoDb = XContentMapValues.nodeStringValue(
                mongoSettings.get(DB_FIELD), riverName.name());
        mongoCollection = XContentMapValues.nodeStringValue(
                mongoSettings.get(COLLECTION_FIELD), riverName.name());
        mongoGridFS = XContentMapValues.nodeBooleanValue(
                mongoSettings.get(GRIDFS_FIELD), false);
    } else {
        mongoHost = DEFAULT_DB_HOST;
        mongoPort = DEFAULT_DB_PORT;
        try {
            mongoServers.add(new ServerAddress(mongoHost, mongoPort));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        mongoSecondaryReadPreference = false;
        mongoDb = riverName.name();
        mongoCollection = riverName.name();
        mongoGridFS = false;
        mongoAdminUser = "";
        mongoAdminPassword = "";
        mongoLocalUser = "";
        mongoLocalPassword = "";
        mongoDbUser = "";
        mongoDbPassword = "";
    }
    mongoOplogNamespace = mongoDb + "." + mongoCollection;

    if (settings.settings().containsKey(INDEX_OBJECT)) {
        Map<String, Object> indexSettings = (Map<String, Object>) settings
                .settings().get(INDEX_OBJECT);
        indexName = XContentMapValues.nodeStringValue(
                indexSettings.get(NAME_FIELD), mongoDb);
        typeName = XContentMapValues.nodeStringValue(
                indexSettings.get(TYPE_FIELD), mongoDb);
        bulkSize = XContentMapValues.nodeIntegerValue(
                indexSettings.get(BULK_SIZE_FIELD), 100);
        if (indexSettings.containsKey(BULK_TIMEOUT_FIELD)) {
            bulkTimeout = TimeValue.parseTimeValue(
                    XContentMapValues.nodeStringValue(
                            indexSettings.get(BULK_TIMEOUT_FIELD), "10ms"),
                    TimeValue.timeValueMillis(10));
        } else {
            bulkTimeout = TimeValue.timeValueMillis(10);
        }
        throttleSize = XContentMapValues.nodeIntegerValue(indexSettings.get(THROTTLE_SIZE_FIELD), bulkSize * 5);
    } else {
        indexName = mongoDb;
        typeName = mongoDb;
        bulkSize = 100;
        bulkTimeout = TimeValue.timeValueMillis(10);
        throttleSize = bulkSize * 5;
    }
     if (throttleSize == -1) {
            stream = new LinkedTransferQueue<Map<String, Object>>();
        } else {
            stream = new ArrayBlockingQueue<Map<String, Object>>(throttleSize);
        }
}

private void addToStream(final String operation,
            final BSONTimestamp currentTimestamp,
            final Map<String, Object> data) {
        data.put(OPLOG_TIMESTAMP, currentTimestamp);
        data.put(OPLOG_OPERATION, operation);

        try
        {
            stream.put(data);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

from elasticsearch-river-mongodb.

spancer avatar spancer commented on June 7, 2024

you can replace the jar with the new jar file. download url: http://dl.iteye.com/topics/download/b897a860-6828-32d2-90ad-d70fcf00c182

from elasticsearch-river-mongodb.

richardwilly98 avatar richardwilly98 commented on June 7, 2024

Hi Spancer,

Thank you I will integrate this fix this week.

Thanks,
Richard.

from elasticsearch-river-mongodb.

renaudboutet avatar renaudboutet commented on June 7, 2024

I have the same issue on the project I am working on.

Is it possible to integrate the fix faster?

Thanks!

from elasticsearch-river-mongodb.

richardwilly98 avatar richardwilly98 commented on June 7, 2024

Version 1.5.0 has been published. It should fix this issue (I was able to index 12 million documents with no issue).

from elasticsearch-river-mongodb.

renaudboutet avatar renaudboutet commented on June 7, 2024

Thanks a lot. I'll get it as soon as I can.

from elasticsearch-river-mongodb.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.