Code Monkey home page Code Monkey logo

axon's People

Contributors

alexeykupershtokh avatar fastner avatar forbeslindesay avatar gjohnson avatar james-huston avatar jcrugzz avatar jdesboeufs avatar lfk avatar loghorn avatar malash avatar mgesmundo avatar nickpoorman avatar nisaacson avatar porkchop avatar reynaldot avatar sailxjx avatar schneiderl avatar tj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

axon's Issues

friendlier codec exceptions

Not super important, but might be helpful for new users. For example, accidentally sending objects without format('json').

assert.js:102
  throw new assert.AssertionError({
        ^
AssertionError: missing value
    at writeUInt32 (buffer.js:917:12)
    at Buffer.writeUInt32BE (buffer.js:950:3)
    at Message.pack (/Users/gjohnson/Projects/axon/lib/message.js:86:7)
    at Message.write (/Users/gjohnson/Projects/axon/lib/message.js:40:18)
    at RepSocket.Socket.pack (/Users/gjohnson/Projects/axon/lib/sockets/sock.js:121:11)
    at reply (/Users/gjohnson/Projects/axon/lib/sockets/rep.js:53:23)
    at RepSocket.<anonymous> (/Users/gjohnson/Projects/axon/examples/reqrep/rep.js:9:3)
    at RepSocket.EventEmitter.emit (events.js:91:17)
    at Parser.RepSocket.onmessage (/Users/gjohnson/Projects/axon/lib/sockets/rep.js:47:15)
    at Parser.frameBody (/Users/gjohnson/Projects/axon/lib/parser.js:106:10)

benchmark/pub.js often crashes after benchmark/sub.js exits

ubuntu 11.04, node 0.8.12, axon 0.4.2

wicked@wicked-desktop:~/s2s/node_modules/axon/benchmark$ node pub.js
pub bound
sending 1000 per tick
sending 1023 byte messages

events.js:68
        throw arguments[1]; // Unhandled 'error' event
                       ^
Error: This socket is closed.
    at Socket._write (net.js:518:19)
    at Socket.write (net.js:510:15)
    at PubSocket.flushBatch (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:57:10)
    at PubSocket.send (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:75:54)
    at more (/home/wicked/Alawar/s2s/node_modules/axon/benchmark/pub.js:21:42)
    at process.startup.processNextTick.process._tickCallback (node.js:244:9)

benchmark/pub.js often crashes after benchmark/sub.js exits

ubuntu 11.04, node 0.8.12, axon 0.4.2

wicked@wicked-desktop:~/s2s/node_modules/axon/benchmark$ node pub.js
...
  axon:pub flushBatch to 1 sockets, 1 writable +0ms
  axon:pub flushBatch to 1 sockets, 0 writable +1ms
  ...
  axon:pub flushBatch to 1 sockets, 0 writable +0ms
  axon:sock disconnect 127.0.0.1:50770 +633ms
  axon:sock start total sockets: 1 +0ms
  axon:sock remove socket 0 +0ms
  axon:sock end total sockets: 0 +0ms
  axon:sock on('error') socket= +3ms { _handle: null,
  _pendingWriteReqs: 1481,
  _flags: 0,
  _connectQueueSize: 0,
  destroyed: true,
  errorEmitted: true,
  bytesRead: 0,
  _bytesDispatched: 72095400,
  allowHalfOpen: false,
  writable: false,
  readable: false,
  server: 
   { _events: { connection: [Function], listening: [Function] },
     _connections: 0,
     connections: [Getter/Setter],
     allowHalfOpen: false,
     _handle: 
      { writeQueueSize: 0,
        onconnection: [Function: onconnection],
        owner: [Circular] },
     _connectionKey: '4:0.0.0.0:3003' },
  _peername: { address: '127.0.0.1', family: 'IPv4', port: 50770 },
  _events: 
   { data: [Function],
     error: [Function],
     timeout: [Function],
     close: [Function] },
  _connecting: false,
  _connectQueue: null,
  _idleNext: null,
  _idlePrev: null,
  _idleTimeout: -1 }
  axon:sock error This socket is closed. +1ms
  axon:sock start total sockets: 0 +0ms
  axon:sock removal of unknown socket +0ms

events.js:68
        throw arguments[1]; // Unhandled 'error' event
                       ^
Error: This socket is closed.
    at Socket._write (net.js:518:19)
    at Socket.write (net.js:510:15)
    at PubSocket.flushBatch (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:60:14)
    at PubSocket.send (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:80:54)
    at more (/home/wicked/Alawar/s2s/node_modules/axon/benchmark/pub.js:21:42)
    at process.startup.processNextTick.process._tickCallback (node.js:244:9)  

I propose similar fix to pub.js as done here:
websockets/ws#74

PS: also note double socket removal, once on 'close' event then once on 'error'.

timeout and/or re-send REQ/REP replies?

if there's no response and a pipe breaks etc that callback will be in limbo. in many cases this may not matter at all, we could just timeout and error with err.timeout, the lib can ignore it as necessary, but it would be nice if we had optional support for re-transmission

Multiple Pull Sockets Incorrectly Buffer Unread Messages

Please see the following example https://gist.github.com/3976658

Whenever a push socket has no clients, it will buffer messages until that client connects. However when I open two push sockets, the first socket places its buffered messages into the second sockets buffer.

  • A client connecting to PORT1 will not see any buffered messages
  • A client connecting to PORT2 will see buffered messages from both ports

docs for next release

Lots of stuff to cover.

sockets

  • req
  • rep
  • router
  • dealer

configuration

  • get/set
  • identities

misc

  • cover which sockets will NOT retry/buffer failed messages.
  • multiple connects/binds
  • ????

expose greetings

might be able to use this for some stuff that im sort of re-implementing since it's not currently exposed

remove socket -1

some bug:

axon:sock remove socket 0 +0ms
  axon:sock remove socket -1 +0ms

implementing request/reply

I hacked around with request/reply leveraging the multipart stuff and no callbacks. The thing to think about is how we want the reply socket to behave. I think in the ZMQ world the reply socket will block until it has sent back the reply to the peer. So that might not make a whole bunch of sense in the node world? maybe? So I think our version of reply would be a lot more like a dealer.

Anyhow, in my little example it uses a multipart message; message 1 being the "identity" of the request socket and message 2 plus being the body. It defaults to '\u0000' for the sole purpose of the reply socket being able to know if it needs to generate an identity for it.

var req = ss.socket('req')
  , rep = ss.socket('rep');

req.identity('tobi');

rep.bind(3000);
req.connect(3000);

req.on('message', function(pong){
  pong.toString().should.equal('pong');
  rep.close();
  req.close();
});

rep.on('message', function(id, ping){
  ping.toString().should.equal('ping');
  id.toString().should.equal('tobi');
  rep.send(id, 'pong');
});

req.send('ping');

4159bd2cb75d316c71ff2794df1028a334d82c7c

multiple connect examples

Need examples for these, probably add about the benefits of multiple backends with multiple connect()'s over something like cluster. (I get questions about axon+cluster)

unix sockets

I don't absolutely need this yet, but I have an app running now that does some local communication, so it would be nice to have at some point...

Perhaps:

.bind('unix:///tmp/foo');
.connect('unix:///tmp/foo');

sub.subscribe test issue

The test seems a bit wierd to me.

sub.subscribe(/^user:*/); // equivalent to /^user.*$/
sub.subscribe(/^page:view/); // equivalent to /^page:view.*$/

Therefore I think it doesn't demonstrate best practices that some users could look in tests for.

Queue buffering bug

So this is boggling my mind... if you run test.socket.reconnection.js a few times. A passing test looks like (I added some extra output):

writing 9
received 9
  ss:queue close +11ms
  ss:sock close +11ms
pull closed
  ss:sock connect 127.0.0.1:3000 +0ms
  ss:queue disconnect undefined +0ms
writing 10
  ss:queue connect 127.0.0.1 +1ms
  ss:queue flush +0ms
  ss:sock connect +1ms
  ss:queue flush +0ms
received 10
writing 11
received 11

So the disconnect event from Queue emitted just fine and "10" got buffered.... Now I also get this sometimes, causing the test to fail:

writing 9
received 9
  ss:queue close +9ms
  ss:sock close +9ms
pull closed
  ss:sock connect 127.0.0.1:3000 +1ms
writing 10
  ss:queue disconnect undefined +1ms
  ss:queue connect 127.0.0.1 +0ms
  ss:queue flush +0ms
  ss:sock connect +1ms
  ss:queue flush +1ms
writing 11
received 11

As you can see, "10" snuck in prior to disconnect being emitted from Queue but after the socket seemingly closed, causing "10" to never be received by the Pull socket.

The crazy thing is, I tried beating the timer with some nextTick hacks and even tried removing the socket check within Push#sendToPeers in hopes for a EPIPE or something, no luck though....

accept() socket error handling

internal sockets that you shouldn't really be interacting with, maybe it's best if we just self.emit('error', err) these. The one I ran across today was our RepSocket.prototype.onmessage, a reply() to a dead connection will error, however node is even worse because node will invoke the .write() callback, and emit "error" on the socket, so we would have to do:

sock.write(self.pack(args), function(err){
  if (err) {
    sock.once('error', function(){});
    self.emit('error', err);
  }
});

for me REP is acting as the bind() in this case but from what I can see we're not handing those socket errors and removing them

integration with express

Hi, the way req-rep is implemented is quite different from zmq so I was wondering what type of socket/pattern would suit best the following scenario.
user hit app.get(/) then the request is passed down to the pipeline (sequence of push-pull) until reaching the sink which in turn would reply to the request made by app.get(/) (but only after receiving the last message from the pipleline.)
I tried different patterns for connecting the starting route and the sink but they all have some drawback.
Using zmq I use to do it like that but as you know the node.js bind has all sort of unpredictable issue.

pull.on('message',function(data){
  reply.send(data)
})

HWM support

and emit an event when messages are dropped so you can implement your own queuing etc..

add batching to more than pub

once it's working well, or remove all together since it's not likely to have more than one msg per tick, but ill have to see how things work out

Peername for messages

Events like "connect" or "disconnect" come with a _peername object that looks like this : { address: '127.0.0.1', family: 'IPv4', port: 52824 }. This is very convenient to identify the specific origin of the event.

Is it possible to add the same object to all messages ?

// ^ dont do this

In the file examples/workers/consumer.js there is a comment // ^ dont do this. Please enhance the example to show what we should be doing instead. The examples should be there to help us learn how to do things properly.

add multipart support to socket patterns

leveraging the new stuff. Regular non-multipart:

sock.send('hey')

multipart:

sock.send('hey', 'there', 'tobi')
sock.send(['hey', 'there', 'tobi'])

sub.on('message', function(a, b, c){ ... })

I haven't checked but we'll have to make sure the new multipart stuff supports "nesting" for envelopes

pub-end filtering

for subscriptions. back when we were going the router/dealer route propagation would have been a bit more annoying but now that we would just relay with pub/sub all the way through this should be trivial

configurable bug

I think since configurable is attaching on socket.prototype and all socket's proto is socket.prototype we have sockets playing games on each other. :-)

Good example is in test.router.js, they both end up with the same identities. lol

add wildcard support in emitter

could be nice if EventEmitter2 syntax was supported on events

sock.on('*', function(data){
  console.log('yay wildcard', data);
});

"close" for multiple .connect()s should only emit once

when all have closed, currently we handle some of this as if there was only one connect(), same with "connect" event etc. we can have say "socket close" or similar to use internally (and for logging) in order to-reconnect

smaller default identities

for now we only really need them to be process specific so a PID is fine, this is just so that a reply to a dead req socket does not invoke a new callback since the ids get reset to 0

Expose .createStream()

I want to use axon with secure-peer for super easy security and thus need access to the raw stream. Any plans on doing this?

multipart delimiters

with zmq it was easy to just do sock.send([envelope, id, 0, buf]); but we need the nul ATM

do we even need delimiters? I'm not sure of zmq's internals on this but I'd think we should be able to arbitrarily shift / unshift to maintain the envelope "stack"

add full url support

hahaha i just screwed up the ordering and was creating socket files name ./192.168.0.198 hhaha, the port / host order feels unnatural, though it's what node does too I guess. I'd like to also have sock.connect('192.168.0.198:3000') to compliment sock.connect(3000, '192.168.0.198')

perf regression

on my mbp at least, ill try it on the air later and see if maybe there's just something funky going on with this machine and profile later:

      min: 853 ops/s
     mean: 15963 ops/s
   median: 26103 ops/s
    total: 59238 ops in 9.663s
  through: 15.5888671875 mb/s

perf regression in master

damn!

much lower:

      min: 372 ops/s
     mean: 7886 ops/s
   median: 4777 ops/s
    total: 22317 ops in 4.152s
  through: 7.701171875 mb/s

higher level libs

some ideas for fun or that may actually be useful (in other repos):

  • map / reduce over axon, shipping toString()ed js functions
  • unbuffered multipart (like multipart uploads etc)
  • rpc

req callback

since we dont lock-step maybe:

var sock = axon.socket('req');
sock.send('something', function(res){

})

Thoughts on Multipart

I have been thinking about REQ/REP a little bit which really relies heavily on multipart messages. So I thought I would throw out what I have been thinking to get some feedback on it before I start hacking away. (cc: #3).

Multipart Protocol

octet:     0      1      2      3       4     <length>
        +-------+------+------+------+-------+------------
        | final | meta | <length>            | data ...
        +-------+------+------+------+-------+------------
  • Final frame should be 1 or 0 indicating whether anymore frames are left in the "envelope".
  • This could ideally allow us to handle down to 2 byte frames. Perhaps when a sockets underlying parser is in the "meta" state, it could know how to handle various values of the meta octet for the situations where the length and data octets are irrelevant.

REQ socket

  • The REQ socket sends() data to a connected endpoint, It should never bind().
  • The second argument to send() is a flag as to whether more frames will be written; it defaults to false. We are just explicit were for examples sake.
  • After a message is emitted, the socket will be closed.
  • It is also follows ZMQ in that a null frame is appended to create an envelope.
var req = ss.socket('req');

req.format('json');
req.connect(3000);

req.on('message', function(msg) {
  console.log(msg);
});

req.send({ says: 'ping' }, false);

This would result in the following packets.

Null frame:
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
Body frame:
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0  | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------

REP socket

  • A REP socket has no send().
  • Only message events will be emitted, which will contain a reply() function as the second argument. This can be invoked to build up an envelope to go back to the REQ socket.
  • The second argument to reply() is a flag as to whether more frames will be written; it defaults to false.
var rep = ss.socket('rep');

rep.format('json');
rep.bind(3000);

rep.on('message', function(msg, reply) {
  reply({ says: 'pong' }, false);
});

Just like ZMQ, once the REP socket receives a "final" message, it rips through all frames in the envelope upto and including the null frame. It then emits the message body.

Upon reply(), it will prepend the earlier ripped off frames and send the message back.

Similarly to the REP parsing process, REQ will rip through the all frames in the response envelope upto and including its original null frame, then emits the message body.


More?

With multipart we can do elegant multi-hop routing from peer to peer just like ZMQ with address frames in the envelopes. Something like REQ <-> ROUTER <-> DEALER <-> REP

REQ -> ROUTER

+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0  | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------

ROUTER -> DEALER

+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x6 | 'req_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0  | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------

DEALER -> REP

+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x9 | 'router_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x6 | 'req_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0  | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0  | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------

Is subscribe() needed?

I can do the same in a more beautiful manner with no ifs in on('message') callback.

var axon = require('..')
  , should = require('should')
  , pub = axon.socket('pub')
  , sub = axon.socket('sub')
  , EventEmitter2 = require('eventemitter2').EventEmitter2;

pub.bind(3013);

var ee = new EventEmitter2({wildcard: true, delimiter: ':'});

ee.on('user:*', function(name) {
  msgs.push(this.event, name);
});
ee.on('page:view', function(name) {
  msgs.push(this.event, name);
  msgs.map(String).should.eql(expected);
  pub.close();
  sub.close();
});

var msgs = [];
sub.connect(3013, function(){
  pub.send('user:login', 'tobi');
  pub.send('user:login', 'loki');
  pub.send('user:logout', 'jane');
  pub.send('unrelated', 'message');
  pub.send('other', 'message');
  pub.send('page:view', '/home');
});

sub.on('message', function(type, name){
  ee.emit(String(type), String(name));
});

var expected = [
  'user:login',
  'tobi',
  'user:login',
  'loki',
  'user:logout',
  'jane',
  'page:view',
  '/home'
];

Probably it's even better to use eventemitter2 in axon internally and allow users subscribe to events like this: subscribe('user:*', function() {});

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.