Code Monkey home page Code Monkey logo

epidemic-broadcast-trees's Introduction

Epidemic Broadcast Trees

This module is loosely based on plumtree Epidemic Broadcast Trees EBT paper, but adapted to also replicate logs, and optimized to achive a minimal overhead (the cost of the protocol is linear with the number of messages to be sent)

It's a algorithm that combines the robustness of a flooding epidemic gossip broadcast, with the efficiency of a tree model. It's intended for implementing realtime protocols (such as chat, scuttlebutt, also radio/video) over networks with random topology - or networks where otherwise peers may be unable to all connect to each other or to a central hub.

Although the primary motivation for this module is to use it in secure scuttlebutt, it's intended to be decoupled sufficiently to use for other applications.

Example

implement a simple in memory log replicator.

var clocks = {}
var logs = {}

function append (msg, cb) {
  var log = logs[msg.author] || {}
  //check that this is the next expected message.
  if(msg.sequence != Object.keys(log).length + 1)
    cb(new Error('out of order, found:'+msg.sequence+', expected:'+log.length))
  else {
    log[msg.sequence] = msg
    ebt.onAppend(msg)
    cb()
  }
}

var ebt = EBT({
  //NOTE: in this example, we are using readable strings for clarity
  //but ideally you'd use cryptographic ids, like public keys.
  id: 'alice',
  getClock: function (id, cb) {
    //load the peer clock for id.
    cb(null, clocks[id] || {})
  },
  setClock: function (id, clock) {
    //set clock doesn't have take a cb, but it's okay to be async.
    clocks[id] = clock
  },
  getAt: function (pair, cb) {
    //load a message particular message, by id:sequence
    if(!logs[pair.id] || !logs[pair.id][pair.sequence])
      cb(new Error('not found'))
    else
      cb(null, logs[pair.id][pair.sequence])
  },
  append: append
})

ebt.append({
  author: 'alice', sequence: 1, content: {}
}, function () {})

//must explicitly say we are replicating which peers.
ebt.request('alice', true)
ebt.request('bob', true)

//create a stream and pipe it to another instance
//isClient and version are required.
var stream = ebt.createStream('bob', version=3, isClient = true)
stream.pipe(remote_stream).pipe(stream)

note about push-stream: push-stream is only new, so you'll probably need to convert this to a pull-stream to connect stream to a network io stream and serialization

var pushToPull = require('push-stream-to-pull-stream')
var stream = pushToPull(ebt.createStream(remote_id, 3, isCient = true))
pull(stream, remote_pull_stream, stream)

API

EBT(opts) => ebt

where opts provides the necessary things to connect ebt to your system.

opts = {
  id: string,
  timeout: 3000, //default,
  getClock: function (id, cb),
  setClock: function (id, clock),
  getAt: function ({id:string, sequence:number}, cb),
  append: function (msg, cb),
  isFeed: function (id),
  isMsg: function(data),
  getMsgAuthor: function(msg),
  getMsgSequence: function(msg)
}

Create a new EBT instance. id is a unique identifier of the current peer. In secure-scuttlebutt this is a ed25519 public key.

getClock(id, cb) and setClock(id, clock) save a peer's clock object. This is used to save bandwidth when reconnecting to a peer again.

getAt({id, sequence}, cb) retrives a message in a feed and an sequence. messages must have {author, sequence, content} fields.

append(msg, cb) append a particular message to the log.

timeout is used to decide when to switch a feed to another peer. This is essential to detecting when a peer may have stalled.

isFeed(id) is a validation function that returns true if id is a valid feed identifier. If not, it is ignored'

optional for backwards compatibility

isMsg(data) is a validation function used to distinguish between data messages and status messages. A message must contain an author field that corresponds to the feed identifier and a sequence field.

getMsgAuthor(msg) is a function that given a message returns the author.

getMsgSequence(msg) is a function that given a message returns the sequence.

ebt.onAppend (msg)

When a message is appended to the database, tell ebt about it. this must be called whenever a message is successfully appended to the database.

ebt.createStream(id, version, isClient) => PushStream

Create a stream for replication. returns a push-stream. The current version is 3, and isClient must be either true or false. On the client side stream, it will wait for the server to send their vector clock, before replying. This means that if the server doesn't actually support this api, you give them a change to send back an error before sending a potentially large vector clock.

ebt.request(id, follow)

Tell ebt to replicate a particular feed. id is a feed id, and follow is a boolean. If follow is false, but previously was called with true, ebt will stop replicating that feed.

ebt.progress()

returns an object which represents the current replication progress.

an example object output looks like this, all values are integers >= 0.

{
  start: S, //where we where at when we started
  current: C, //operations done
  total: T //operations expected
}

this follows a common pattern used across ssbc modules for representing progress, used for example here: https://github.com/ssbc/scuttlebot/blob/master/lib/progress.js

ebt.state

The state of the replication is available at ebt.state. Read only access is okay, but updating should only be done via ebt methods.

{
  id: <id>, //our id,
  clock: {<id>: <seq>}, //our local clock,
  follows: {<id>: <boolean>}, //who we replicate, true if we replicate.
  blocks: {<id>: {<id>: <boolean>}}, //who blocks who, true if they are blocked.
  peers: { //currently connected peers
    <id>: {
      clock: {<id>: <seq|-1>}, //feeds that we KNOW the peer is up to. -1 if they do not replicate that feed.
      msgs: [<msg>], //queue of messages waiting to be sent.
      retrive: [<id>], //ids of feeds ready for the next message to be retrived.
      notes: null || {<id>: <encoded_seq>}, //notes object (encoded vector clock to be sent)
      replicating: { //feeds being replicated to peer.
        <id>: {
          rx: <boolean>, //true if we have asked to recieve this feed
          tx: <boolean>, //true if we have been asked to send this feed
          sent: <seq|-1|null>, //sequence number of message we have sent.
          requested: <seq|-1|null> //sequence number the remote peer asked for, and thus we know they have.
        }
      }
    }
  },
  receive: [<msg>] //queue of incoming messages
}

notes: <X> is a value type.

<id> is a "feed id" value that opts.isFeed(id) === true. (note, this doesn't actually need to be an ssb feed id, this module can be used for other things too)

<seq> is an positive integer or zero. -1 is used to represent if the are explicitly not replicating that feed.

<msg> is a message where opts.isMsg(id) === true.

Replication overview

The state of other peers are stored outside this module in the SSB-EBT module. See getClock & setClock.

Notes (aka the vector clock) are stored as { feed: (seq === -1 ? -1 : seq << 1 | !rx) } (= * 2 + 1?). The sequence can be extracted using getSequence and rx/tx using getReceive (is even). -1 means do not replicate.

two peers connect, one is the client (who initiated the connection), and the other is the server (that received the request). The protocol is mostly the same for clients and servers, but one exception is that the server starts by sending their vector clock (notes first).

It's assumed that data changes following a long tail pattern, a small number of peers are highly active, but many peers only add data slowly. Connections between peers may be short lived, and data may change more slowly than subsequent connections. If we sent the whole vector clock on each connection that would add up to a significant overhead. Request Skipping is a way to avoid a great deal of bandwidth overhead. Each peer remembers the vector clock sent by the other peer. And, on a new connection, when they send their vector clock they first check it against the vector clock that the remote peer sent last time. If the sequence in the peer's clock is the same as the one in the saved record of the remote peer, then that feed is left out of the request (hence the name "request skipping") The saved record of the remote peer's vector clock may be different to their actual vector clock, but if they have a new sequence for that feed, they will include that feed in their request, and the local peer will respond by sending an additional partial vector clock including their sequence for that feed, once both sides have exchanged their sequence for a particular feed, replication of messages in that feed may occur.

When connecting to multiple peers, only request new messages using rx for a feed from one of the nodes. See test/multiple.js.

Following and blocking are handled in EBT. Following acts as the signal of what feeds to replicate. EBT won't connect to someone that has been blocked. It will not send messages of a peer (including self) to another peer if the first peer blocks the second.

The tests are very readable because they use a simulator where a trace of the run is saved and pretty printed. See test/two.js for a good example.

Comparison to plumtree

I had an idea for a gossip protocol that avoided retransmitting messages by putting unneeded connections into standby mode (which can be brought back into service when necessary) and then was pleasantly surprised to discover it was not a new idea, but had already been described in a paper - and there is an EBT implementation in erlang of that paper.

There are some small differences, mainly because I want to send messages in order, which makes it easy to represent what messages have not been seen using just a incrementing sequence number per feed.

But plumbtree is solely a broadcast protocol, not an eventually consistent replication protocol. Since we are replicating logs it's also necessary to send a handshake to request the feeds from the right points. If you are replicating thousands of feeds the size of the handshake is significant, so we introduce an algorithm for "request skipping" that avoids sending unnecessary requests, and saves a lot of bandwidth compared to just requesting all feeds each connection.

Related work

Brisa also describes a broadcast protocal that at first glace looks very close to the EBT paper. It is modelled using two components: tree construction/maintenance and peer sampling. Peer sampling is in SSB terminology where SSB conn is used. Brisa uses HyParView, written by the same authors as the EBT paper for peer sampling. Compared to EBT, Brisa does not depend on lazy mode between peers where only the sequence information is maintained, instead it depends on HyParView to detect failures. This has the advantage that it does not need a timer, that is highly latency sensitive. It also has a nice property in how messages are disseminated in that they are piggybacked with information about the tree that allows the parent selection to make better choices as it has a better view of the network. The sequence numbers are an important part of the protocol implemented here because they, as described earlier, are used to ensure that messages are disseminated in a eventually consistent manor.

TODO

License

MIT

epidemic-broadcast-trees's People

Contributors

arj03 avatar carloslfu avatar christianbundy avatar cryptix avatar dominictarr avatar jaminfarr avatar mixmix avatar staltz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

epidemic-broadcast-trees's Issues

write after ebt stream ended

Running 6.3.4 I am now getting the following crash in my logs every few minutes when the pub starts syncing data. Not sure what would be causing this. The remote printed below is another pub if that helps chase it down.

 /home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-trees/stream.js:40
  if(this.ended) throw new Error('write after ebt stream ended:'+this.remote)
                  ^
 Error: write after ebt stream ended:@XgC5wDA2EW++ufaDrjRDHXA7Dyd1ce5bTenCm2u6PZU=.ed25519
     at EBTStream.write (/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-trees/stream.js:40:24)
     at next (/home/pi/easy-ssb-pub/node_modules/push-stream-to-pull-stream/sink.js:31:21)
     at PacketStreamSubstream.weird.read (/home/pi/easy-ssb-pub/node_modules/muxrpc/pull-weird.js:33:7)
     at PacketStream._onstream (/home/pi/easy-ssb-pub/node_modules/packet-stream/index.js:204:12)
     at PacketStream.write (/home/pi/easy-ssb-pub/node_modules/packet-stream/index.js:135:41)
     at /home/pi/easy-ssb-pub/node_modules/muxrpc/pull-weird.js:56:15
     at /home/pi/easy-ssb-pub/node_modules/pull-stream/sinks/drain.js:24:37
     at /home/pi/easy-ssb-pub/node_modules/pull-goodbye/node_modules/pull-stream/throughs/filter.js:17:11
     at Object.cb (/home/pi/easy-ssb-pub/node_modules/packet-stream-codec/index.js:111:11)
     at drain (/home/pi/easy-ssb-pub/node_modules/pull-reader/index.js:39:14)

Loosely Based on Epidemic Broadcast Trees?!

After spending some time with the Epedemic Broadcast Trees (EBT) by Leitaõ and trying to run your example, I am wondering what "This module is loosely based on plumtree Epidemic Broadcast Trees EBT paper" exactly means. May it be possible that you explain in which way you differ from the paper?
Cheers!

Request skipping

I found a couple of links and mentions about this but I couldn't find a clear explanation altough it sounds simple.. Is it just leaving out unchanged feed in the note/vector clock updates?

What is the `remote_stream` in the example code?

//create a stream and pipe it to another instance
var stream = ebt.createStream('bob')
stream.pipe(remote_stream).pipe(stream)

I'm trying to understand where that remote_stream variable comes from. And more importantly what it's for.

better protocol documentation

trying to add a feature, I found my self forgetting lots of stuff about how the protocol works.
such as: when peers connect, one is the client (who calls), the server is expected to send vector clock (notes) first. the place where this happens looks like this:

  //client should wait for the server notes, so that stream
  //can error before a peer sends a massive handshake.
  if(peer.replicating == null) return state

kinda difficult to understand, not very obvious that peer.replicating being unset means means...
in connect:

    //if we are client, wait until we receive notes to send code.
    //this is a weird way of doing it! shouldn't we just have a bit of state
    //for wether we have received a vector clock
    replicating: ev.client ? null : {}

now I recall, this was a JGIW (just get it working)...

Crash in canSend

Yesterday my ssb pub was crashing repeatedly, with the following error. I poked around a little trying to figure out the root cause and settled on checking if state was valid in this function. Not sure if that's the preferred solution but it did stop the crash and get my pub running better again. Let me know if there's anything I can do to help

/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-trees/stream.js:84
      state.msgs.length || state.notes
           ^
TypeError: Cannot read property 'msgs' of undefined
    at EBTStream.canSend (/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-
    at EBTStream.resume (/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-t
    at Object.update (/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast-tree
    at Timeout._onTimeout (/home/pi/easy-ssb-pub/node_modules/epidemic-broadcast
    at ontimeout (timers.js:365:14)
    at Timer.unrefdHandle (timers.js:466:3)

Crash in isAvailable

I was running 6.0.1 and hit the following crasher bug. Not sure if this is still a bug in latest version.

/home/chrx/.ssb/node_modules/ssb-ebt/node_modules/epidemic-broadcast-trees/events.js:34
if((peer.clock[feed_id] || 0) > (state.clock[feed_id] || 0)) {
^

TypeError: Cannot read property '@6CAxOI3f+LUOVrbAl0IemqiS7ATpQvr9Mdw9LC4+Uv0=.ed25519' of null
at isAvailable (/home/chrx/.ssb/node_modules/ssb-ebt/node_modules/epidemic-broadcast-trees/events.js:34:21)
at Object.exports.timeout (/home/chrx/.ssb/node_modules/ssb-ebt/node_modules/epidemic-broadcast-trees/events.js:351:22)
at Timeout._onTimeout (/home/chrx/.ssb/node_modules/ssb-ebt/node_modules/epidemic-broadcast-trees/index.js:87:25)
at ontimeout (timers.js:478:11)
at Timer.unrefdHandle (timers.js:598:5)

Document Wire Protocol

It would be nice to have a Documentation on the Wire Protocol. Doing some simple debugging I see that the vector clock is first exchanged then the sequences are sent. But how is backpressure handled?

Add a `pauseRx(<feedId>, isPaused)` method.

According to @dominictarr using request(<feedId>, false) is not a good way to pause replication of a feed.

Context from this ssb message: "On that note, I've just realized that you can't just flipflop replicate.request, because when you do replicate.request(id, false) it will send a "I don't want id signal" (-1) and that will screw with with request-skipping, because if you disconnect with the remote during the -1 state, they will remember that you don't want that feed and not mention it next time. (this may break eventual consistency - there are loads of tests in ebt to catch things like this)

Instead, we want to send a the signal that we do what that feed, but not right now! This is already sent if you are getting it faster from another peer. See the code in epidemic-broadcast-trees/v3.js (and v2.js was the old way to encode that). for example, epidemic-broadcast-trees/events.js timeout (line 390) checks if we are expecting messages on a feed but havn't got them, and cycles to another peer for that feed. it calls setNotes(peer, feed, seq, rx) to tell peer that we do (rx=true) or do not (rx=false) want to receive feed from them, but that we are up to seq.

To pause a feed, we just need to be able to set all peers to rx=false for a given feed."

Missing deps?

Seems all the dependencies are missing? At least push-stream is missing.

6.3.4 breaks scuttlebot tests

Looks like the new block code in 6.3.4 breaks test/block2 and test/block3 in sbot because replicate:finish is called twice.

replicating with multiple peers

if you connect to multiple peers at once, it's possible that they send the same messages. Especially in the case of the initial sync, this could result in transmitting many more messages than expected.

The most obvious solution would be to subscribe to activate transmit on only the first peer, but the others in lazy mode. however, then if the first peer just sends messages very slowly (including possibly so slowly it looks like not sending anything) you may get stalled, because currently you only switch peers when you receive a note from another peer.

so if A has {a: 5, b:5} and B has {a:5, b:5} and we are connected to both and have A sending a,b to us, but it pauses at a:3 and b:3, then we are just stuck, even though we could just ask B for those feeds.

I think the solution is to track the latest time we received a message from a peer, and if this is greater than some threashold, switch peers (randomly?) if there is some other peer more up to date than us.

It might also be a good idea when connected to peers to request from each peer randomly too.

documentation drive:

@corlock wrote:

General documentation of EBT's makes sense, but what was unclear to me was where the actual networking and message propagation is handled. My current understanding is that this is connected to the createStream method, which isn't really documented in this module. It is referenced, but not made explicit what that means or how streams interface with EBT.

would an example clear it up? hmm, I see it just says

var stream = ebt.createStream('bob')
stream.pipe(remote_stream).pipe(stream)

that's a pattern that will be familiar if you know my earlier streams work, but isn't explained or an explaination linked to, not sure where that documentation should be. But it should be somewhere.

Crash replicating

/home/chrx/dev/scuttlebot/node_modules/epidemic-broadcast-trees/events.js:415
if(peer.clock[feed_id] || 0 > state.clock[feed_id] || 0) {
^
TypeError: Cannot read property '@f/6sQ6d2CMxRUhLpspgGIulDxDCwYD7DzFzPNr7u5AU=.ed25519' of null
at /home/chrx/dev/scuttlebot/node_modules/epidemic-broadcast-trees/events.js:415:20
at eachFrom (/home/chrx/dev/scuttlebot/node_modules/epidemic-broadcast-trees/events.js:75:8)
at Object.exports.timeout (/home/chrx/dev/scuttlebot/node_modules/epidemic-broadcast-trees/events.js:413:5)
at Timeout._onTimeout (/home/chrx/dev/scuttlebot/node_modules/epidemic-broadcast-trees/index.js:94:25)
at ontimeout (timers.js:478:11)
at Timer.unrefdHandle (timers.js:598:5)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.