Code Monkey home page Code Monkey logo

redis-streams-broker's Introduction

redis-streams-broker

This package is based on redis stream data type which provides you with following features

  1. Broker to redis stream which can be used as centralized que between microservices. (Using Redis)
  2. Support for injectable redis client ioredisonly
  3. Guarantee of message delivery via consumer acknowledgements.
  4. Consumer Group functionality for scalability. (Just like Kafka)
  5. Option to drop a message when its acked, thus keeping memory footprint in check.

Getting Started

  1. Install using npm -i redis-streams-broker
  2. Require in your project. const brokerType = require('redis-streams-broker').StreamChannelBroker;
  3. Run redis on local docker if required. docker run --name streamz -p 6379:6379 -itd --rm redis:latest
  4. Instantiate with a redis client and name for the stream. const broker = new brokerType(redisClient, name);
  5. All done, Start using it!!.

Examples/Code snippets

  1. Please find example code for injectable ioredis client here
  2. Please find example code for injectable custom client here
  3. Please find multi threading examples here
  4. Please find async processing examples here
const Redis = require("ioredis");
const redisConnectionString = "redis://127.0.0.1:6379/";
const qName = "Queue";
const redisClient = new Redis(redisConnectionString);
const brokerType = require('redis-streams-broker').StreamChannelBroker;
const broker = new brokerType(redisClient, qName);

//Used to publish a paylod on stream.
const payloadId = await broker.publish({ a: "Hello", b: "World" }); 

//Creates a consumer group to receive payload
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); 

//Registers a new consumer with Name and Callback for message handlling.
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); 

// Handler for arriving Payload
async function newMessageHandler(payloads) {
    for (let index = 0; index < payloads.length; index++) {
        try {
            const element = payloads[index];
            console.log("Payload Id:", element.id); //Payload Id
            console.log("Payload Received from :", element.channel); //Stream name
            console.log("Actual Payload:", element.payload); //Actual Payload
            await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped.
        }
        catch (exception) {
            console.error(exception);
        }
    }
}

//Provides summary of payloads which have delivered but not acked yet.
const summary = await consumerGroup.pendingSummary();

//Unsubscribes the consumer from the group.
const sucess = consumerGroup.unsubscribe(subscriptionHandle); 

//Amount of memory consumed by this stream in bytes.
const consumedMem = await broker.memoryFootprint();

Built with

  1. Authors :heart for Open Source.
  2. nanoid for auto generating subscribtion handles.
  3. redis-scripto2 for handling lua scripts.
  4. relief-valve future refactoring to be open to any redis connection lib.

Contributions

  1. New ideas/techniques are welcomed.
  2. Raise a Pull Request.

Current Version:

0.0.15[Beta]

License

This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.

API

Class StreamChannelBroker

  1. constructor(redisClient: any, channelName: string)

    Creates a broker instance.

    redisClient: Injectable redis client which will be used to send commands to redis server.

    channelName: Name of the stream key, if this doesnot exists it will be created on first push or group subscription.

  2. publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount:boolean): Promise<string>;

    Publishes provided message into the stream and returns id generated by server.

    payload: A JS object containing properties which are passed as key values pairs.

    maximumApproximateMessages: Appropiate length of the stream it is equal to ~ MAXLENGTH option in redis. Defaulted to 100, If negative number is passed then it behaves as non capped stream.

    failOnMaxMessageCount: if maximumApproximateMessages is positive number and failOnMaxMessageCount is set to true then it will only publish messages untill it reaches the maximum count post that it will start failling by returning null as message id, default value is false.

  3. joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>

    Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a ConsumerGroup object.

    groupName: Name of the group to be created ot joined.

    readFrom: Id of the mesage to start reading from. defaulted to $ to only read new messages recevied on redis, check redis docs for more info.

  4. memoryFootprint(): Promise<number>

    Returns number of bytes consumed by the current stream.

  5. destroy(): Promise<boolean>;

    Starts to unsubscribe all the handles that were subscribed to this instance.

Class ConsumerGroup

  1. subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>

    Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name.

    consumerName: Name of the consumer who is subscribing via the consumer group object.

    handler: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature (payload: Payload[]) => Promise<number> should be async & return from this function is number of messages to fetch from redis(expected +ve number; -ve or 0 will unsubscribe from the group stopping all further reads from stream,if NAN then defaults to number provided when subscribing), look at Payload class below for more details.

    pollSpan: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.

    payloadsToFetch: Maximum number of messages to fetch in one poll to server this is simillar to COUNT command in redis, this is optional and defaulted to 2.

    subscriptionHandle: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid.

    readPending: If set to true will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to false it will always look for new message from the stream, this is defaulted to false.

  2. unsubscribe(subscriptionHandle: string): Promise<boolean>

    Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure.

    subscriptionHandle: Name of the subscription handle which was returned by subscribe api.

  3. pendingSummary(): Promise<GroupSummary>

    Returns details of the pending items for the given group by exposing GroupSummary object.

Class Payload

  1. channel: string: Name of the stream key in redis.

  2. id: string: Id of the message being received.

  3. payload: any: Actual payload to processs.

  4. markAsRead(deleteMessage?: boolean): Promise<boolean>

    This function helps to ack the payload as read or processed, returns status of the operation via boolean return type true indicating success.

    deleteMessage: if set to true it will ack & delete the message from the stream if set to false will only ack the message defaulted to false.

Class GroupSummary

  1. total: number: This is the total number of messages in pending list.
  2. firstId: string: Id of the first message which is pending.
  3. lastId: string: Id of the last message which is pending.
  4. consumerStats: any: Extra information provided by XPENDING command.

redis-streams-broker's People

Contributors

dependabot[bot] avatar lragji avatar saksham-verma-bh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

redis-streams-broker's Issues

error TS1016: A required parameter cannot follow an optional parameter.

error TS1016: A required parameter cannot follow an optional parameter.
Change the parameter failOnMaxMessageCount to optional or put it in front

This is a compile time error

yarn run v1.22.19
$ tsc -p tsconfig.build.json
node_modules/redis-streams-broker/index.d.ts:4:64 - error TS1016: A required parameter cannot follow an optional parameter.

4 publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount: boolean): Promise;
~~~~~~~~~~~~~~~~~~~~~

Found 1 error in node_modules/redis-streams-broker/index.d.ts:4

scripts folder cannot be found when used from npm

Hi! ๐Ÿ‘‹

Firstly, thanks for your work on this project! ๐Ÿ™‚

Today I used patch-package to patch [email protected] for the project I'm working on.

When the module is loaded from npm the scripts dir cannot be found. This is because of different a different current working dir. Simple take any of the examples and try to run them from a clean npm init and you will see.

** reproduction steps **

yarn init
yarn add redist-stream-broker ioredis

...copy any of the examples in the index.js...

yarn .

** The fix **
The fix is to search from the directory where the file exists instead of the cwd.

Here is the diff that solved my problem:

diff --git a/node_modules/redis-streams-broker/index.js b/node_modules/redis-streams-broker/index.js
index 7c614b8..7f7981b 100644
--- a/node_modules/redis-streams-broker/index.js
+++ b/node_modules/redis-streams-broker/index.js
@@ -1,3 +1,4 @@
+const path = require("path");
 const shortid = require("shortid");
 const Scripto = require("redis-scripto");
 
@@ -19,7 +20,7 @@ class StreamChannelBroker {
         this._acknowledgeMessage = this._destroyingCheckWrapper(this._acknowledgeMessage.bind(this));
         this._unsubscribe = this._destroyingCheckWrapper(this._unsubscribe.bind(this), false);
         this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false);
-        this._scriptManager.loadFromDir('./scripts');
+        this._scriptManager.loadFromDir(path.join(__dirname, 'scripts'));
     }
 
     _destroyingCheckWrapper(fn, async = true) {

This issue body was partially generated by patch-package.

error TS1016: A required parameter cannot follow an optional parameter.

in the index.d.ts the class StreamChannelBroker, the method publish on line 4
publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount: boolean): Promise;

when using the StreamChannelBroker class in my project, I get compilation error :

node_modules/redis-streams-broker/index.d.ts:4:64 - error TS1016: A required parameter cannot follow an optional parameter.

4 publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount: boolean): Promise;
~~~~~~~~~~~~~~~~~~~~~

Found 1 error.

on aws linux, there is an issue with # prefix for members

It works fine on my macosx. however on aws linux, there is a compiler error see below.
When I remove the # prefix with __ it works.

node_modules/redis-streams-broker/asyncProcessor.js:2
#dataProcessingHandler
^
SyntaxError: Invalid or unexpected token
at Module._compile (internal/modules/cjs/loader.js:723:23)
at Object.Module._extensions..js (internal/modules/cjs/loader.js:789:10)
at Module.load (internal/modules/cjs/loader.js:653:32)
at tryModuleLoad (internal/modules/cjs/loader.js:593:12)
at Function.Module

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.