Code Monkey home page Code Monkey logo

queue-schedule's Introduction

Queue Shedule

NPM version Build status Test coverage

Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule.

Install

npm install queue-schedule

How to use

Use rdkafka

const Kafka = require('node-rdkafka');
const {RdKafkaProducer,RdKafkaConsumer} = require('queue-schedule');
const producerRd = new Kafka.HighLevelProducer({
    'metadata.broker.list': KAFKA_HOST,
    'linger.ms':0.1,
    'queue.buffering.max.ms': 500,
    'queue.buffering.max.messages':1000,
    // debug: 'all'
});
producerRd.on('event.error',function(err) {
    slogger.error('producer error');
});
producerRd.on('event.log',function(log) {
    slogger.debug('producer log',log);
});
const producer = new RdKafkaProducer({
    name : SCHEDULE_NAME1,
    topic: TOPIC_NAME1,
    producer:producerRd,
    delayInterval: 500
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        slogger.error('write to queue error',err);
        return done('write to queue error');
    }
    slogger.info('write to kafka finished');
});


const consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': KAFKA_HOST,
    'group.id': 'test-rdkafka-0',
    'auto.offset.reset':'earliest',
    'socket.keepalive.enable': true,
    'socket.nagle.disable': true,
    'enable.auto.commit': true,
    'fetch.wait.max.ms': 5,
    'fetch.error.backoff.ms': 5,
    'queued.max.messages.kbytes': 1024 * 10,
    debug:'all'
});
let hasDone = false;
new RdKafkaConsumer({
    name: 'kafka',
    consumer,
    topics: [ TOPIC_NAME1],
    
    doTask:function(messages,callback) {
        slogger.trace(messages);
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(RdKafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
    slogger.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(RdKafkaConsumer.EVENT_CLIENT_READY,function() {
    slogger.trace('the consumer client is ready');
    
}).on(RdKafkaConsumer.EVENT_LOG,function(log) {
    // slogger.trace(JSON.stringify(log));
});

Using kafkajs

const { Kafka } = require('kafkajs');
const {KafkaJsProducer,KafkaJsConsumer} = require('queue-schedule');

const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.kafkajs';
const client =  new Kafka({
    brokers: ['xxxx', 'yyyy']
});

const producer = new KafkaJsProducer({
    topic: TOPIC_NAME1,
    client,
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        console.error('write to queue error',err);
        return;
    }
    console.info('write to kafka finished');
});
producer.on(KafkaJsProducer.EVENT_PRODUCER_ERROR, function(err) {
    console.error('error in consumer', err);
});

new KafkaJsConsumer({
    name: 'kafka',
    client,
    topic: TOPIC_NAME1,
    consumerOption: {
        groupId: 'kafkajs',
        fromBeginning: true
    },
    doTask:function(messages,callback) {
        console.log(messages);
        const value = messages[0].value;//read the first value
        let data = null;
        try {
            data = JSON.parse(value);
            console.log('recieve data',data);
        } catch (e) {
            console.error('parse message error',e);
        }

        callback();//the next loop
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(KafkaJsConsumer.EVENT_CONSUMER_ERROR,function(err) {
    console.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(KafkaJsConsumer.EVENT_CONSUMER_READY,function() {
    console.log('the consumer is ready');
});

API

For detail usage, see the document online here.

License

MIT

queue-schedule's People

Contributors

yunnysunny avatar e174596549 avatar

Watchers

 avatar James Cloos avatar  avatar

queue-schedule's Issues

Memory leak when can't connect to kafka server

In RdKafkaProducer, it uses kafka connection promise's then function to send data. For not adding the promise's catch function, when kafka server is not connectable, it will create a lot of then functions.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.