Code Monkey home page Code Monkey logo

peer-crdt's Introduction

peer-crdt

NOT BEING ACTIVELY MAINTAINED

(Superseded by delta-crdts).

An extensible collection of operation-based CRDTs that are meant to work over a p2p network.

Index

API

CRDT.defaults(options)

Returns a CRDT collection that has these defaults for the crdt.create() options.

CRDT.create(type, id[, options])

  • type: string representing the CRDT type
  • id: string representing the identifier of this CRDT. This ID will be passed down into the Log constructor, identifying this CRDT to all peers.
  • options: optional options. See options.

Returns an new instance of the CRDT type.

crdt.on('change', () => {})

Emitted when the CRDT value has changed.

crdt.value()

Returns the latest computed CRDT value.

async crdt.peerId()

Resolves to a peer id. May be useful to identify current peer.

Other crdt methods

Different CRDT types can define different methods for manipulating the CRDT.

For instance, a G-Counter CRDT can define an increment method.

Composing

Allows the user to define high-level schemas, composing these low and high-level CRDTs into their own (observable) high-level structure classes.

CRDT.compose(schema)

Composes a new CRDT based on a schema.

Returns a constructor function for this composed CRDT.

const MyCRDT = CRDT.compose(schema)
const myCrdtInstance = MyCRDT(id)
  • schema: an object defining a schema. Example:
const schema = {
  a: 'g-set',
  b: 'lww-set'
}

(Internally, the IDs of the sub-CRDTs will be composed by appending the key to the CRDT ID. Ex: 'id-of-my-crdt/a')

Instead of a key-value map, you can create a schema based on an array. The keys for these values will be the array indexes. Example:

const schema = [
  'g-set',
  'lww-set'
]

Any change in a nested object will trigger a change event in the container CRDT.

You can then get the current value by doing:

const value = myCrdtInstance.value()

Full example:

const schema = {
  a: 'g-set',
  b: 'lww-set'
}
const MyCRDT = CRDT.compose(schema)
const myCrdtInstance = MyCRDT(id)

myCrdtInstance.on('deep change', () => {
  console.log('new value:', myCrdtInstance.value())
})

Dynamic composition

You can use a CRDT as a value of another CRDT. For that, you should use crdt.createForEmbed(type) like this:

const array = myCRDT.create('rga', 'embedding-test', options)

const counter = array.createForEmbed('g-counter')
array.push(counter)

array.once('change', (event) => {
  console.log(array.value()) // [0]

  array.on('deep change', () => {
    console.log(array.value()) // [1]
  })

  event.value.increment()
})

Options

Here are the options for the CRDT.create and composed CRDT constructor are:

  • network: a network plugin constructor. Should be a function with the following signature: function (id, log, onRemoteHead) and return an instance of Network
  • store: a constructor function with thw following signature: function (id), which returns an implementation of the Store interface
  • sign: a function that's used to generate the authentication data for a certain log entry. It will be called with the log entry (Object) and an array of parent entry ids (string), like this:
async function sign (entry, parents) {
  return await signSomehow(entry, parents)
}
  • authenticate: a function that's used to valudate the authentication data for a certain log entry. Should resolve to a boolean, indicating if this entry is authentic or not. It will be called with the log entry (Object), an array of parent entry ids (string) and a signature like this:
async function authenticate (entry, parents, signature) {
  return await authenticateSomehow(entry, parents, signature)
}
  • signAndEncrypt: an optional function that accepts a value object and resolves to a buffer, someting like this:
async function signAndEncrypt(value) {
  const serialized = Buffer.from(JSON.stringify(value))
  const buffer = signAndEncryptSomehow(serialized)
  return buffer
}

(if no options.signAndEncrypt is provided, the node is on read-only mode and cannot create entries).

  • decryptAndVerify: a function that accepts an encrypted message buffer and resolves to a value object, something like this:
async function decryptAndVerify(buffer) {
  const serialized = await decryptAndVerifySomehow(buffer)
  return JSON.parse(Buffer.from(serialized).toString())
}

signAndEncrypt/decryptAndVerify contract

The options.decryptAndVerify function should be the inverse of options.signAndEncrypt.

const value = 'some value'
const signedAndEncrypted = await options.signAndEncrypt(value)
const decryptedValue = await options.decryptAndVerify(signedAndEncrypted)

assert(value === decryptedValue)

Errors

If options.decryptAndVerify(buffer) cannot verify a message, it should resolve to an error.

Built-in types

All the types in this package are operation-based CRDTs.

The following types are built-in:

Counters

Name Identifier Mutators Value Type
Increment-only Counter g-counter .increment() int
PN-Counter pn-counter .increment(),.decrement() int

Sets

Name Identifier Mutators Value Type
Grow-Only Set g-set .add(element) Set
Two-Phase Set 2p-set .add(element), .remove(element) Set
Last-Write-Wins Set lww-set .add(element), .remove(element) Set
Observerd-Remove Set or-set .add(element), .remove(element) Set

Arrays

Name Identifier Mutators Value Type
Replicable Growable Array rga .push(element), .insertAt(pos, element), .removeAt(pos), .set(pos, element) Array
TreeDoc treedoc .push(element), .insertAt(pos, element), .removeAt(pos, length), .set(pos, element) Array

Registers

Name Identifier Mutators Value Type
Last-Write-Wins Register lww-register .set(key, value) Map
Multi-Value Register mv-register .set(key, value) Map (maps a key to an array of concurrent values)

(TreeDoc is explained in this document)

(For the other types, a detailed explanation is in this document.)

Text

Name Identifier Mutators Value Type
Text based on Treedoc treedoc-text .push(string), .insertAt(pos, string), .removeAt(pos, length) String

Extending types

This package allows you to define new CRDT types.

CRDT.define(name, definition)

Defines a new CRDT type with a given name and definition.

The definition is an object with the following attributes:

  • first: a function that returns the initial value
  • reduce: a function that accepts a message and the previous value and returns the new value
  • mutators: an object containing named mutator functions, which should return the generated message for each mutation

Example of a G-Counter:

{
  first: () => 0,
  reduce: (message, previous) => message + previous,
  mutators: {
    increment: () => 1
  }
}

Read-only nodes

You can create a read-only node if you don't pass it an options.encrypt function.

const readOnlyNode = crdt.create('g-counter', 'some-id', {
  network, store, decrypt
})

await readOnlyNode.network.start()

Opaque replication

A node can be setup as a replicating node, while not being able to decrypt any of the CRDT operation data, thus not being able to track state.

Example:

const replicatingNode = crdt.replicate('some-id')

await replicatingNode.network.start()

Interfaces

Store

A store instance should expose the following methods:

  • async empty (): resovles to a boolean indicating if this store has no entries
  • async put (entry): puts an arbitrary JS object and resolves to a unique identifier for that object. The same object should generate the exact same id.
  • async get (id): gets an object from the store. Resolves to undefined if entry couldn't be found.
  • async setHead(id): stores the current head (string).
  • async getHead(): retrieves the current head.

Network

A network constructor should return a network instance and have the following signature:

function createNetwork(id, log, onRemoteHead) {
  return new SomeKindOfNetwork()
}

onRemoteHead is a function that should be called once a remote head is detected. It should be called with one argument: the remote head id.

A network instance should expose the following interface:

  • async start(): starts the network
  • async stop(): stops the network
  • async get(id): tries retrieveing a specific entry from the network
  • setHead(headId): sets the current log head

Internals

docs/INTERNAL.md

License

MIT

peer-crdt's People

Contributors

fritzy avatar hacdias avatar legastero avatar pgte avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

peer-crdt's Issues

Treedoc type throws on erasing

Erasing all of the text from a treedoc/treedoc-text throws an exception:

Uncaught (in promise) TypeError: Cannot read property '1' of undefined
    at sliceAndRemove (treedoc.js:306)
    at remove (treedoc.js:160)
    at walkDepthFirst (treedoc.js:180)
    at walkDepthFirst (treedoc.js:346)
    at Array.removeAt (treedoc.js:164)
    at Promise (type.js:177)
    at new Promise (<anonymous>)
    at EventEmitter._args [as removeAt] (type.js:175)
    at setInterval (index.js:67)

Target performance for use in peerpad

Since we want to use peer-crdt for peerpad, we need to create a performance test and determine a performance target.

  • 1,000 dag operation test with several branches and simulated network latency
    • several branches
    • network latency
    • use the register type and collide occasionally on set
  • determine how many operations per second a typical peerpad session creates
    • 3 people typing at once
    • y.js + richtext type
    • Can peer-crdt keep up with a similar workload?
  • develop textarea demo with peer-crdt
  • develop richtext type for for peer-crdt

At this point, we can start implementing more optimizations if needed (faster dag merging, rollups to remove the need to fetch serially, etc).

fast multi-log sync

The setting

Even though peer-crdt is transport agnostic, it makes some assumptions about the transport that it's using underneath. Here are some of these assumptions:

  • Each CRDT has an independent append-only operation log.
  • In a CRDT, each local operation generates a data block that is stored on this append-only log.
  • An operations is content-addressable. When stored, an operation is identified by a unique string (from here on we'll call it Content ID, or CID for short).
  • Operations are back-linked to represent causal dependency. An operation that's appended to the log points to the operation (or operations) that preceded it.
  • When concurrent operations are detected, a special merge log entry containing no data is created, pointing to the concurrent operations as its parents.
  • At any given point in time, each replica has a HEAD CID, which is the CID of the latest log entry.

The transport, (peer-crdt-ipfs, for instance) is responsible for:

  • creating new log entries locally
  • broadcasting the local HEAD CID to other participating replicas
  • making log entries available for replication by request

The problem

Even though it's very IPFS-y, this structure poses some performance challenges, namely the replication performance:

To replicate a set of log entries, the receiving replica has to pull them one by one. For each log entry received, it has to inspect it and get the log entry parent CIDs. For each one, it has to verify if it exists locally. If it doesn't exist locally, it has to fetch it from the IPFS network. And so own recursively until all the missing log entries are replicated. This means that it has to do a series of sequential lookups to sync the local log with a remote log. This is obviously very inefficient.

(One optimisation can is in place: Alongside with the HEAD CID, a replica can also broadcast all the parent content ids, up to a count of MAX_PARENTS. This allows a replica to make a bunch of fetch requests in parallel. But if MAX_PARENTS is greater than the count of missing log entries, we're back to the vey inefficient fetch-and-resolve-all-parents loop as before.)

Efficient replication requires knowing the remote replica state

I recently realised that this is inefficient because a replica doesn't know what is the replication state of another replica. If it did, it could send all the missing log entries in one burst.

Also, this happens because of content-addressing. A CID is made from hashing the block content, and so doesn't tell us anything from the replication state of the remote replica.

We need a solution that allows a replica to know exactly which log entries a remote replica needs, so that it can send them immediately and in one burst.

One solution

Vector clocks. (If you are not familiar with VCs, this is a good explanation).

Here I propose a solution where replicas keep track of the remote replica state, and push changes instead of expecting the remote to fetch them.

Here is, broadly the main algorithm I propose:

  • Every time a replica creates an operation, it increments the vector clock for its replica id.
  • A replica frequently broadcasts the current vector clock to other replicas.
  • Each replica keeps track of the replication state of other remote replicas by keeping their HEAD vector clock in memory. This replica state is updated in these events:
    • every time a replica receives the remote HEAD vector clock it updates the replica state.
    • every time a replica sends log entries to a remote replica, it updates the replica state.
  • A replica does its best to keep remote replicas in sync. When it detects that a remote vector clock diverged (which happens when receiving a remote VC update or when local changes happen), it starts the sync protocol, which is simply:
    • is the local state bigger or divergent than the remote replica state?
    • if so, send the missing log entries in one burst
    • and update the remote replica vector clock

Illustrated

A picture is worth a thousand words. Let's then illustrate how this could work with two replicas.

One writer, one reader

Replica A creates 3 operations, O1, O2 and O3.
Replica B does not create any operation.

Our goal here is simply to keep replica B in sync with the changes created by replica A.

untitled drawing-28

Replica A starts by creating each log entry. Each log entry has a vector clock. For the last log entry, O3, the vector clock is {A: 3}.

Now replica B comes online. Since it has no entries, it's vector clock is empty. It broadcasts it's VC to other listening nodes. Replica A receives that message, and now it knows about replica B's state.

fast-multilog-swap

Replica A realises that replica B vector clock is smaller than the local vector clock. By doing some simple vector clock math, it realises that replica B is missing 3 log entries. It fetches those entries and sends them (O1, O2 and O3) to replica B in one burst. It also sends each log entry correspondent vector clock. All this in one burst.

fast-multilog-swap-2

Now replica B has all the missing log entries, both replicas converged:

fast-multilog-swap-3

1 more writer

Now replica B makes some changes to the CRDT, creating 2 new operations (O4 and O5). It also updates the vector clock for each operation:

fast-multilog-swap-5

Now replicas B head is {A:3, B:2}. Because replica B knows that the replication state of replica a is {A:3}, it realizes that replica's A vector clock is smaller than the current local vector clock. Because of that, replica B immediately realizes it needs to send replica A some entries to update it's state.

And so replica B calculates the difference between it's own vector clock and replica A's vector clock. Because of this, it realizes that replica A is missing 2 log entries with the following vector clocks: {A:3, B:1} and {A:3, B:2}, which correspond to operations O4 and O5. Replica B sends these entries to replica A:

fast-multilog-swap-6

Now, replica A has the missing operations, and is able to integrate them into its log:

fast-multilog-swap-7

The two replicas are in sync.

Two concurrent writers

Now let's see how our system can handle two concurrent writes, one done by each replica.

Let's continue with out example replicas:

Replica A creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for the new operation (which we're calling O6) is going to be {A:4, B:2}

At the same time, replica B also creates a new operation and appends it to the local log. Since the VC for the previous head was {A:3, B:2}, the VC for this new operation (which we're calling O7) is going to be {A:3, B:3}.

fast-multilog-swap-12

Now both replicas have different head vector clocks. Replica A's head VC is {A:4, B:2} and replica B's head VC is {A:3, B:3}.

Let's see what happens in each replica once each new operation is created.

Replica A:

Replica A has a head VC of {A:4, B:2} and knows replica B has a VC of {A:3, B:2}, which means it knows B is lagging behind. By doing some vector clock math it knows that replica B is missing one operation that corresponds to the VC {A:4, B:2}. It then fetches that operation from the operation log and sends it to replica B.

fast-multilog-swap-10

Replica B:

When it creates operation O7 (which has a vector clock of {A:3, B:3}) replica B knows that replica A has a VC of {A:3, B:2}, and is lagging behind. By doing some vector clock math it knows that replica A is missing one operation that corresponds to the VC {A:3, B:3}. It then fetches that operation from the operation log and sends it to replica A.

fast-multilog-swap-11


Now, each replica has all the operations in their log:

fast-multilog-swap-13

Convergence

But now each replica doesn't have a defined HEAD operation. Since there was a divergence, each replica will have to create a merge log entry pointing to both parents. The vector clock for each of the merge entries will be the same: it will be the result of the merge of both vector clocks:

vc1 := {A:4, B:2}
vc2 := {A:3, B:3}
merged := merge(vc1, vc2) // {A:4, B:3}

The result of the merging operation is {A:4, B:3}, and that will be the vector clock for both merge log entries:

fast-multilog-swap-14

Now both replicas A and B are definitely in sync.

in sync

Treedoc Text Cursor Positioning

Is there a related form of the treedoc type that can track and expose a cursor position across edits?

Without that, binding to an editor like a textarea has troubles as the user's cursor gets thrown to odd spots, inserting text in the wrong place when typing.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.