Code Monkey home page Code Monkey logo

grenache-nodejs-http's Introduction

Grenache Node.JS HTTP implementation

Grenache is a micro-framework for connecting microservices. Its simple and optimized for performance.

Internally, Grenache uses Distributed Hash Tables (DHT, known from Bittorrent) for Peer to Peer connections. You can find more details how Grenche internally works at the Main Project Homepage

Setup

Install

npm install --save grenache-nodejs-http

Other Requirements

Install Grenache Grape: https://github.com/bitfinexcom/grenache-grape:

npm i -g grenache-grape
// Start 2 Grapes
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'

Examples

RPC Server / Client

This RPC Server example announces a service called rpc_test on the overlay network. When a request from a client is received, it replies with world. It receives the payload hello from the client.

The client sends hello and receives world from the server.

Internally the DHT is asked for the IP of the server and then the request is done as Peer-to-Peer request via websockets.

Grape:

grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'

Server:

const Link = require('grenache-nodejs-link')

const link = new Link({
  grape: 'http://127.0.0.1:30001'
})
link.start()

const peer = new PeerRPCServer(link, {
  timeout: 300000
})
peer.init()

const service = peer.transport('server')
service.listen(_.random(1000) + 1024)

setInterval(function () {
  link.announce('rpc_test', service.port, {})
}, 1000)

service.on('request', (rid, key, payload, handler) => {
  console.log(payload) // hello
  handler.reply(null, 'world')
})

Client:

const Link = require('grenache-nodejs-link')

const link = new Link({
  grape: 'http://127.0.0.1:30001'
})
link.start()

const peer = new PeerRPCClient(link, {})
peer.init()

peer.request('rpc_test', 'hello', { timeout: 10000 }, (err, data) => {
  if (err) {
    console.error(err)
    process.exit(-1)
  }
  console.log(data) // world
})

Code Server Code Client

API

Class: PeerRPCServer

Event: 'stream'

Always emitted as son as a request arrives. Emits the raw req and res streams of the request and some preparsed metadata. Used for streaming. If disableBuffered is set to false, the server will attempt to buffer after emitting the stream event.

serviceStr.on('stream', (req, res, meta, handler) => {
  console.log(meta) // meta.isStream === true

  const [rid, key] = meta.infoHeaders

  req.pipe(process.stdout)

  handler.reply(rid, null, 'world') // convenience reply
})

Example.

Event: 'request'

Emitted when a request from a RPC client is received. In the lifecycle of a request this happens after the server has parsed an buffered the whole data. When the server runs with disableBuffered: true, the event must emitted manually, if needed, or by calling the buffering request handlers manually.

  • rid unique request id
  • key name of the service
  • payload Payload sent by client
  • handler Handler object, used to reply to a client.
service.on('request', (rid, key, payload, handler) => {
  handler.reply(null, 'world')
})

new PeerRPCServer(link, [options])

  • link <Object> Instance of a Link Class
  • options <Object>
    • disableBuffered <Boolean> Disable automatic buffering of the incoming request data stream. Useful for streaming.
    • timeout <Object> Server-side socket timeout
    • secure <Object> TLS options
      • key <Buffer>
      • cert <Buffer>
      • ca <Buffer>
      • requestCert <Boolean>
      • rejectUnauthorized <Boolean>

Creates a new instance of a PeerRPCServer, which connects to the DHT using the passed link.

peer.init()

Sets the peer active. Must get called before we get a transport to set up a server.

peer.transport('server')

Must get called after the peer is active. Sets peer into server- mode.

peer.listen(port)

Lets the PeerRPCServer listen on the desired port. The port is stored in the DHT.

peer.port

Port of the server (set by listen(port)).

Example

This RPC Server example announces a service called rpc_test on the overlay network. When a request from a client is received, it replies with world. It receives the payload hello from the client.

The client sends hello and receives world from the server.

Internally the DHT is asked for the IP of the server and then the request is done as Peer-to-Peer request via websockets.

Server:

const Link = require('grenache-nodejs-link')

const link = new Link({
  grape: 'http://127.0.0.1:30001'
})
link.start()

const peer = new PeerRPCServer(link, {})
peer.init()

const service = peer.transport('server')
service.listen(_.random(1000) + 1024)

setInterval(function () {
  link.announce('rpc_test', service.port, {})
}, 1000)

service.on('request', (rid, key, payload, handler) => {
  console.log(payload) // hello
  handler.reply(null, 'world')
})

Client:

const Link = require('grenache-nodejs-link')

const link = new Link({
  grape: 'http://127.0.0.1:30001'
})
link.start()

const peer = new PeerRPCClient(link, {})
peer.init()

peer.request('rpc_test', 'hello', { timeout: 10000 }, (err, data) => {
  if (err) {
    console.error(err)
    process.exit(-1)
  }
  console.log(data) // world
})

Server Client

Class: PeerRPCClient

new PeerRPCClient(link, [options])

  • link <Object> Instance of a Link Class
  • options <Object>
    • maxActiveKeyDests <Number>
    • maxActiveDestTransports <Number>
    • secure <Object> TLS options
      • key <Buffer>
      • cert <Buffer>
      • ca <Buffer>
      • rejectUnauthorized <Boolean>

Creates a new instance of a PeerRPCClient, which connects to the DHT using the passed link.

A PeerRPCClient can communicate with multiple Servers and map work items over them. With maxActiveKeyDests you can limit the maximum amount of destinations. Additionally, you can limit the amount of transports with maxActiveDestTransports.

peer.init()

Sets the peer active. Must get called before we start to make requests.

peer.map(name, payload, [options], callback)

  • name <String> Name of the service to address
  • payload <String> Payload to send
  • options <Object> Options for the request
    • timeout <Number> timeout in ms
    • limit <Number> maximum requests per available worker
  • callback <Function>

Maps a number of requests over the amount of registered workers / PeerRPCServers. Example.

peer.request(name, payload, [options], callback)

  • name <String> Name of the service to address
  • payload <String> Payload to send
  • options <Object> Options for the request
    • timeout <Number> timeout in ms
    • retry <Number> attempts to make before giving up. default is 1
  • callback <Function>

Sends a single request to a RPC server/worker. Example.

peer.stream(name, opts)

  • name <String> Name of the service to address
  • options <Object> Options for the request
    • timeout <Number> timeout in ms
    • headers <Object> Headers to add to the request

Looks a service up and returns a req-object which is a stream. Additional parameters (e.g. content-type), can be added via options.

The default metadata values for the request id and key are automatically via header.

Example.

grenache-nodejs-http's People

Contributors

andrewreedy avatar face avatar mafintosh avatar prdn avatar robertkowalski avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

grenache-nodejs-http's Issues

Streaming support

Right now the http and ws adapters for grenache are both buffering data.

That is becoming a problem, when we send larger data. One contract all transports have with each other is that the message format it always in JSON. The message contains a key and a request / msg id. Our current use case for streaming is mainly binary data, like big file uploads.

Recently I thought about streaming integration into the Transports, but I ran into multiple problems.

Right now our system catches the request event emitted by the http module, then starts buffering and then emits itself a request event with the buffered data.

A typical handler looks like this:

service.on('request', (rid, key, payload, handler) => {
  // rid, key also comes from the buffered data, which is always JSON
  // payload is from the buffered data
  handler.reply(null, 'world')
})

The payload is buffered here:

socket.on('request', (req, rep) => {
let body = []
req.on('data', (chunk) => {
body.push(chunk)
}).on('end', () => {
body = Buffer.concat(body).toString()
const cert = secure ? req.socket.getPeerCertificate() : undefined
this.handleRequest(

How could a streaming API look like?

service.on('stream', (req, res) => {
  // rid, key would either have to be parsed from the stream or they are not needed... (?)
  req.pipe(consumer).on('end', () => { res.end('') })
})

internally we would take a look if we are about to receive a stream:

    socket.on('request', (req, rep) => {
      if (req.headers['transfer-encoding'] === 'chunked') {
        this.handleStream(req, rep)
        return
      }

      // old buffer code follows here...

handleStream would then emit a stream event:

handleStream (req, res) {
  this.emit('stream', req, res)
}

Thoughts and considerations:

  1. overwriting the request event

right now we catch the request event from the socket, then do the buffering and emit our own event named request with the buffered data. internally we could rename our request event to grenache-request and additionally just proxy the original request event. that allows users to build all kind of different, specialized request handlers. or maybe, to not introduce a breaking change, we additionally emit the original request event under a different name, e.g. original-request. this way users of the lib can build very specific, custom request handlers by accessing the raw request object.

  1. svc-js integration of streaming

right now we are searching for the corresponding method on the object and just call the method with the data that was buffered before: https://github.com/bitfinexcom/bfx-wrk-api/blob/c695ff36a801103bad6bb3c4fa059f38c7081f9a/api/base.js#L41-L66

With stream integration we have to pass the stream or chunks into the handler. the big question is how we can integrate that in a good way.

we could take the current api handler methods and let them emit data. still unclear is how to stream data back:

foo (space, args, cbOrStream) {
  this.on('data') // ...

  cbOrStream.end('') // uh?!
}

another way is to have two handlers:

foo (space, args, cb) {}
fooStream (space, req, res) {}

bfx-wrk-api would then check if the method exists and in an error case return something like ERR_API_NO_STREAM_METHOD. So it would depend on the service developer if they want to support streaming.

what do you think about the svc-js integration? do you have a more elegant solution?

Mono repository

Why not have everything grenache related in a mono repository?
Makes contribution easier.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.