patriknw / akka-data-replication Goto Github PK
View Code? Open in Web Editor NEWReplication of CRDTs in Akka Cluster
License: Other
Replication of CRDTs in Akka Cluster
License: Other
Hi,
do you have any Java CRDTs implementations? In this repository I can only find Scala implementations, and though both languages are similar I was looking specifically for a Java implementation.
Thanks,
David
Caused confusion for someone reading the code.
As discovered by @huntc and @hseeberger when removing and adding from mutli-map (ORMap with ORSet values).
Add unit tests that verifies/illustrates the behavior.
I just studied the code and I think it's a great jump forward towards stateful data stream processing.
One of the things to think about is state partitioning. Based on what I see, all the cluster nodes in particular role will be in replication cycle: https://github.com/patriknw/akka-data-replication/blob/master/src/main/scala/akka/contrib/datareplication/Replicator.scala#L1066-L1068
So if I have total 10GB of state to maintain for my stream processing, I will have to allocate 10GB of memory for each node. It would be great to manage number of partitions as well as partition replication factor kafkaesque style. This way I would be able to have 10 partitions, approximately 1GB each.
Assuming my replication factor is 3, I would have to allocate only 3GB on each node.
This would need to be combined with partition based routing to make sure each request gets routed to a partition leader.
Two concurrent updates can overwrite the data anyway if one happens to use the right sequence number.
The local sequence number for optimistic optimistic concurrency control of local updates is not the right approach, and it adds a lot of complexity to the API usage.
I will remove the seqNo, but that also means that the RemovedNodePruning features must be adjusted. First step will be to disable that feature until I have been able to think it through.
For example there could be sealed trait Replicator.UpdateResponse
and then Replicator.UpdateSuccess
, Replicator.WrongSeqNr
, etc. would mix that in.
Hi folks, I was wondering what recommendation you would make if we need a CRDT data model today. Would it be prudent to use akka-persistence and akka-data-replication together to provide this, or go with something like Riak 2.0 CRDTs (or Redis if that's your thing) until this is ready?
JVM crash before the updated value was sent out with gossip means that the update is "lost".
Use case for WriteOne is when you need low latency and are it's acceptable to loose updates if JVM crashes before the data has been spread to other nodes.
For example: web page counter
Current state should be sent back immediately as a Changed message. I think it should be optional, i.e. a parameter in the Subscribe message.
The word "datareplication" doesn't exist. Usually one would write "data replication", hence for the name of this project the hyphenated version should be used.
I wonder if the current names are too technical for a broader audience, which doesn't care about the papers.
Here is one proposal:
Flag -> ReplicatedFlag
GCounter -> ReplicatedGrowOnlyCounter
GSet -> ReplicatedGrowOnlySet
LWWMap -> ReplicatedLastWriterWinsMap
LWWRegister -> ReplicatedLastWriterWinsRegister
ORMap -> ReplicatedMap
ORMultiMap -> ReplicatedMultiMap
ORSet -> ReplicatedSet
PNCounter -> ReplicatedCounter
PNCounterMap -> ReplicatedCounterMap
and here is another:
Flag -> Flag
GCounter -> GrowOnlyCounter
GSet -> GrowOnlySet
LWWMap -> LastWriterWinsMap
LWWRegister -> LastWriterWinsRegister
ORMap -> ReplicatedMap
ORMultiMap -> ReplicatedMultiMap
ORSet -> ReplicatedSet
PNCounter -> Counter
PNCounterMap -> CounterMap
Noticed by xmal: https://twitter.com/xmal/statuses/476059053719056384
That was how I started, but then I thought that the Riak orswot merged with existing. Probably I misunderstood the Erlang code: https://github.com/basho/riak_dt/blob/develop/src/riak_dt_orswot.erl#L381-L392
i noticed akka-Data-Replication also use VectorClock(source from Akka), i have some question:
use ordered UniqueAddress for compare in Akka's VectorClock maybe it is ok, because the akka memberships status maintained by one 'leader' node,and leader is in the first place in UniqueAddress of nodes ring (my understanding is correct?)
but for ORSet's merge, it use Set's VectorClock to compare element's 'birth dot', in the 'mergeDisjointKeys' function 'if (vclock > dots || vclock == dots)'
this '>' compare also use ordered UniqueAddress, is it necessary? or just maybe compare 'vclock.versions.get(node)' vs dots?
if i miss something, please correct me, thanks!
I've noticed doing this on occasion:
ORSet.empty + nodeBundleFile
It'd be nice to have some convenience constructors such that the following could be achieved:
ORSet(nodeBundleFile)
It only means that it could not meet the given write consistency level.
Perhaps rename it to something less fatal then Failed
. Perhaps UpdateTimeout
?
I think this has been mentioned in the docs, but I have seen confusion around it.
We need one for read and write consistency, but without the request:
Replicator.Update(replicatorKey, ORSet.empty, Replicator.ReadQuorum, Replicator.WriteQuorum, timeout.duration)
Changed message should only be created by the replicator.
Same with some other classes.
Currently it's in "compile" (default) configuration, see https://github.com/patriknw/akka-datareplication/blob/master/build.sbt#L19.
Specialized ORMap with ORSet values
It was disabled in #16
Reported by @hseeberger
:+
and :-
are terribly misleading, at least if one knows the Scala collections. There the colon is only used for sequences (always facing the col(on)lection).
Designing for eventual consistency is difficult, or at least different. We should provide guidelines for various design choices, such as read/write consistency levels.
Let's collect some topics in this issue. Please contribute. Links to blog posts or other related reading/presentations?
Looks like the published maven is not compatible with jdk6?
java.lang.UnsupportedClassVersionError: akka/contrib/datareplication/protobuf/ms
g/ReplicatorMessages$Status : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:14
1)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at akka.contrib.datareplication.protobuf.ReplicatorMessageSerializer.sta
tusToProto(ReplicatorMessageSerializer.scala:78)
at akka.contrib.datareplication.protobuf.ReplicatorMessageSerializer.toB
inary(ReplicatorMessageSerializer.scala:54)
at akka.remote.serialization.MessageContainerSerializer.serializeSelecti
on(MessageContainerSerializer.scala:36)
at akka.remote.serialization.MessageContainerSerializer.toBinary(Message
ContainerSerializer.scala:25)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint
.scala:845)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint
.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abst
ractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:19
79)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThre
ad.java:107)
system
]
in Update
and Get
messages.
The reason is that there is no good default. It depends, and is something that should be thought of by the user.
Our workaround is to clone your repo and do a 'git checkout tags/v0.7;sbt publishLocal'. If you remove your .ivy2 (or rename it) you'll hit the same problem. Perhaps I can publish it to maven or somewhere else? I can provide more details like the project where this is occurring, etc.
Thanks
Kam
Perhaps consider the use of Scala's MultiMap
inside the Replicator
when managing subscribers, although it is a mutable data structure of course. It may simplify the code though.
When using Update
with readConsistency != ReadOne and Get
fails the Update
should not continue, as is done now.
I must investigate if we need to support current behavior also.
From docs:
If readConsistency != ReadOne
it will first retrieve the data from other nodes
and then apply the modify
function with the latest data. If the read fails the update
will continue anyway, using the local value of the data.
To support "read your own writes" all incoming commands for this key will be
buffered until the read is completed and the function has been applied.
after I upgrade the current project to akka 2.4-m1,I found the register will never success.
I am trying to find why.
replicator ! Update(registryKey, ORMultiMap.empty[Service], WriteAll(timeout), Some(cacheRegisteringPromise(promise, service)))(_.addBinding(service.name, service))
this don't success.
maybe I will try the akka-2.4 snapshot once it merged into akka.
... instead of only the message. This would not only allow for very elegant programming (using failure messages extending Exception
), but also to properly signal different failure cases.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.