Code Monkey home page Code Monkey logo

jzab's Introduction

Jzab

Build Status

jzab is an implementation of ZooKeeper Atomic Broadcast (Zab) in Java. jzab's features include:

  • High throughput - benchmarked > 20k writes/sec on commodity hardware.
  • Fuzzy snapshot - minimizes service interruption while taking snapshots.
  • Dynamic reconfiguration - add/remove servers without restarting the cluster.
  • Minimum runtime dependencies - netty, protobuf, slf4j.
  • Secure communication - using ssl.

Applications using jzab

  • zabkv - A simple reference server.
  • pulsed - An HTTP-based replicated filestore for distributed coordination.

Documentation

Requirements

  • JDK 1.7 or later: javac -version
  • Maven 3 or later: mvn -v
  • Protocol Buffers compiler 2.6.*: protoc --version

How to build

To build the project, run:

mvn verify

jzab's People

Contributors

easonliao avatar fpj avatar hongchaodeng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jzab's Issues

Recording committed transactions to simplify logic

Hi,

I think over it very carefully over the weekends and compare different methods. And I am pretty sure this is the correct thing to do.

My suggestion is

  1. Append a commit struct (a bit, byte, class) to every log entry on commit. This will keep records of committed transactions and differentiate those not.
  2. On a reboot, a replica will immediately restore all committed transactions even before leader election.
  3. in sync phase, follower will send the highest committed zxid. On receiving DIFF, follower can truncate the uncommitted one.

Advantages:

dynamic reconfig

it doesn't need to write a temporary file on disk in this way.

sync phase

Here is a case in sync phase, A is the old leader, B is the new leader, C is follower of B.

A: <0,1> <0,2>
B: <0,1> <1,1>
C: <0,1>

How could you sync A with B?

  1. traverse the log to figure out where they diverge. O(n) disk operations.
  2. Send the snapshot?!

Well, if A sends highest committed zxid <0,1>, B can send cheap DIFF packets <1,1>. Upon receiving DIFF, A truncates uncommitted txns (i.e. <0,2>) and fix it!
So now the sync logic is simplified to do either SNAP or DIFF.

See this blog:
https://medium.com/@hdeng/dynamic-reconfiguration-3aa492148a0f

More

Not sure if this is an advantage. Now It doesn't need to wait until the end of synchronization phase to deliver everything.

Notify user when zab gets removed?

I found this issue while implementing pulsed.

If a Zab server gets removed, the pulsed process should exit and print out messages. But now the only way to check if the your Zab instance gets removed by you/others or not is to check whether you is included in the clusterConfig of leading and following callback.

This is not very straightforward, maybe add a exit callback?

Simplified reconfiguration for javazab

The dynamic configuration in paper is hard to implement right and needs a lot of changes on our current code, so we're trying to see if there's any simplifications we can make. The differences between Zookeeper and javazab have two points :

  • javazab doesn't assume the elected leader has the "best" history, so the elected leader will try to find the server who owns the "best" history and gets synchronized from it.
  • we only need to support add/remove one server at a time. This means if we move config S to config S', then Q(S) ∩ Q(S') != 0.

Based on these two differences, we proposed a simplified way of doing reconfiguration, but we're not quite sure if it's correct.

The way we planed to do is :

  • One reconfig at a time (add one server / remove one server). This means that Q(S) ∩ Q(S') != 0
  • After COP, the leader only broadcasts to S'.
  • Once the leader receives an ACK for the COP from Q(S), the leader sends out ACTIVATE. If the leader is also in S', it continues its leadership. If the leader is not in S', the ACTIVATE message should designate a new leader.
  • The leader in S should not allow multiple pending COPs.
  • In case of the leader failure, the recovery is done with the latest 'seen' configuration. There are 2 cases:
    • Q(S) hasn't seen COP. In this case, the recovery is done with S. This is ok because S' could not have been activated without support from Q(S).
    • Q(S) has seen COP. In this case, the recovery is done with S'.

Replaying logs up to only committed transactions

@m1ch1 @fpj

When a server crashed and restarted, it will call replayLogFrom. Right?

It should only replay the log up to committed txns. However, the log will also append uncommitted ones into it. Zab should differentiate those.

I suggest change the interface to:

replayLogFrom(Zxid begin, Zxid end)

and add a persistent variable committedUpTo.

Add a primitive to implement session tracker

I'd like to be able to use javazab to implement session tracker. Here is how ZooKeeper implements its session tracker:

  • Client sends connect_request. The request gets forwarded to the leader. The leader assigns session ID and session timeout, commits the session information, and responds to the request.
  • Once the client receives the response, it starts sending heartbeat periodically to the connected server, which might not be the leader. The server stores the heartbeat information in memory.
  • The leader periodically sends out ping to followers. The follower piggybacks the client heartbeat information in the ping response.
  • The leader uses this information to decide whether to expire client sessions.
  • If the client gets disconnected from the server, it tries to connect to another server and revalidate the session.

the key here is that only session creation/revalidation/expiration touches the disk, but heartbeating doesn't.

I think the simplest way to implement this in javazab would be to allow the leader to short-circuit from the preprocess() method by returning null or something. The application (session tracker) uses the send() method to forward the client session information to the leader. The leader can update the session information in the preprocess method and return null to indicate to javazab that the message doesn't need to be broadcasted.

Move lastDeliveredZxid into PersistentState?

Current QuorumZab has a lastDeliveredZxid member. It's like a place holder, we need to pass/get it for each epoch. Probably we should put it in PersistentState though it's not persistent.

Figure out how to manage peer connections

Currently ZooKeeper has 2 different connection modes with 2 separate ports for leader election and quorum server.

For leader election, QuorumCnxManager manages one TCP connection for each pair, and if I remember correctly, the connections get closed after each election.

https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Once the leader is elected, followers connect to the leader.

https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java

  • Does it make sense to keep these 2 separate, or can we use a single TCP connection for both leader election and quorum server?
  • Another approach for QuorumCnxManager would be to use 2 TCP connections for each pair. I think it's simpler to implement, but of course the drawback is that you have 2 TCP connections instead of 1.
  • I'm thinking we'll use netty so that we can add ssl support easily.

Any thought?

Put snapshot + log implementation together?

Currently the log implementation and snapshot implementation are totally separate, probably we need group them together like what ZooKeeper does (FileTxnSnapLog).

We do feel this will make our logic of taking snapshot and synchronization easier.

decide how to communicate errors back to the client

Flavio brought this up a while back. We should figure out how to communicate zab errors (e.g. disk full, unexpected thread death, corrupted log). One option is to add getFuture() method to QuorumZab. This way the client can either check the status periodically or block until QuorumZab stops.

Design changes

After discussing with Michi and taking reconfiguration into consideration, we might change some design of our implementation:

  • Instead of sharing quorumSet in all processors, each processor keeps a copy of quorumSet and their copies should be changed by passing messages.
  • Some new messages will be added. Since there might be some messages on the fly, we should synchronize the pipeline by passing message around.

Currently on the leader side, we have pipeline :

MainThread -(REQUEST)-> PreProcessor -(PROPOSAL)-> PeerHandler -> ShortCircuitTransport -> SyncProposalProcessor -(ACK)-> Transport -> MainThread -> AckProcessor -(COMMIT) -> all the PeerHandlers .

For example, when a new follower of current configuration joins in we need to synchronize them and bring them up-to-date. Since there're some pending proposals in different processors of pipeline and they are not shown in log at this time and they need to be captured and synchronized to new peer . We need to inject two messages from the source of the pipeline:

  1. FLUSH.
  2. ADD_HANDLER.

FLUSH message will go through PreProcessor - PeerHandler - SCTransport - SyncProposalProcessor - Transport - MainThread Once its captured in MainThread, we know the last proposed proposal id before get the JOIN request and start a new synchronization thread to synchronize them up to that point.

The ADD_HANDLER message will go through PreProcessor - PeerHandler - SCTransport - SyncProposalProcessor - Transport - MainThread -> AckProcessor. Once SyncProposalProcessor and AckProcessor receives this message, they add a new PeerHandler in their copies of quorumSet.

This is just an example of synchronizing new joined follower. We need also use similar message in reconfiguration (e.g. add new quorum set to AckProcessor and add new peers in PreProcessor or delete old quorum from AckProcessor and PreProcessor);

PreProcessor should only pass the "request" field of the Request message

I tried using quorum zab with zabkv. The deliver method fails to deserialize the message because the PreProcessor pass the whole serialized Request object instead of just the request field.

Request request = this.requestQueue.take();
ZabMessage.Request req = request.getMessage().getRequest();
ByteBuffer bufReq = ByteBuffer.wrap(req.toByteArray());
// Invoke the callback to convert the request into transaction.
ByteBuffer update = this.stateMachine.preprocess(this.nextZxid, bufReq);

test timed out after xxxx milliseconds

There is probably something wrong with my local config, but I'm getting a lot of errors like this when running the tests:

java.lang.Exception: test timed out after 20000 milliseconds
    at java.net.InetAddress.getLocalHost(InetAddress.java:1456)
    at com.github.zk1931.jzab.TestBase.getHostPort(TestBase.java:98)
    at com.github.zk1931.jzab.TestBase.getUniqueHostPort(TestBase.java:109)
    at com.github.zk1931.jzab.SnapshotTest.testSnapshotCluster(SnapshotTest.java:185)

Any idea of what could be wrong with my local address? This is happening on mac os, but I'll check on linux as well.

Problems about snapshot

Hi @fpj , I have a question about snapshot.

Zookeeper will take the snapshot once the synchronization is done. Currently javazab won't take snapshot after synchronization and it only takes snapshot after certain amount of transactions are delivered in broadcasting phase. So I was wondering if there's any reason for Zookeeper to do that? I feel whether take snapshot or not after synchronization doesn't matter but still want verify it to make sure we don't miss anything.

Thanks!

About the synchronization phase

Hi @fpj , I've problems in understanding why the NEWLEADER proposal needs to be atomic.

When a follower receives NEWLEADER proposal, it does two things

1. Accepts all the transactions synchronized from leader.
2. Updates its f.a to e. 

The paper says the two steps need to be atomic. I can understand that we can't bump f.a before we accept all the transactions synchronized from leader. But in #17 , you said

Atomic in that step means that either the server takes the proposed initial history entirely or not at all. If it takes it, then it means that it has accepted a new epoch.

I don't know why the this step needs to be atomic. What happens if the follower or leader crashed and the follower only has accepted the partial initial history of leader ?

I think it's fine since follower only add some more transactions to its log.

Could you tell me why this step needs to be atomic? I guess I missed something. Thanks!

Create a reference server against the single node zab

Implement a reference server in a separate repo once #12 is done. The purpose of the reference server is to demonstrate the usage of the ZAB API. it would be good to do this early to ensure that the interface is clean and complete. I'm thinking the reference server can be an HTTP based queue server. It supports 2 operations; enqueue and dequeue:

enqueue: curl -X POST -d "hello world" example.com/myqueue
dequeue: curl -X GET example.com/myqueue

Queue is a good example for demonstrating how to transform non-idempotent operations to idempotent state updates.

Also, We need to find a place to host the maven artifacts for javazab. We could probably just put them on another repo.

Change configuration in different processors by passing messages.

Like changing quorumSet by passing messages, we should also change configuration in different processors(PreProcessor, AckProcessor) by passing messages instead of sharing a common one.

Currently we have ADD and REMOVE message to tell processors that recovery/death of followers in current configuration. And I use the same message for reconfig. However, they should be distinguished since JOIN and LEAVE change configuration as well as quroumSet.

This brings a new problem. Before, only ADD message needs to flush the pipeline. But now we should also deal with JOIN and LEAVE. The tricky part is that each message will pass through main thread several times during the flushing of pipeline. like :

MainThread -> PreProcessor -> MainThread -> SyncProposalProcessor -> MainThread -> AckProcessor

So when main thread receives ADD\LEAVE\JOIN, it needs to figure out where it should be passed to.

Before since only ADD needs to flush pipeline, so Once we got FLUSH_PRE_PROCESSOR or FLUSH_SYNC_PROCESSOR, we know that it's for adding stuff. But now we need deal with more than one messages.

Probably we should add one more field for ADD\LEAVE\JOIN messages, the last processed processor ID. e.g. PreProcessor is 0, SyncProposalProcessor is 1 ... And main thread can figure out where it should send to by looking at the last processed processor ID.

StateMachine interface

I was under the impression that deliver should be an upcall, while setState isn't, so I don't understand the comment about both being called by the same thread.

I'm also a bit curious about how you see an application using the zxid values. Perhaps we need some written guidelines.

Dynamic configuration for javazab

Hi Zookeeper gurus,

I've a lot of confusions on dynamic configuration, part of the reason is that I don't have much knowledge of Zookeeper implementation, also the implementations of javazab and Zookeeper are not exactly the same.

To make sure I can implement it correctly, I'll post all my questions and thoughts about reconfiguration here and I really need discussions like #17 to help me out.
@fpj @fengjingchao @m1ch1

Replace message passing by Disruptor

LMAX Disruptor is a high performance framework for message passing -- event faster than java native linkedblocking queue.

I am thinking that the zab project might be a good place for it. E.g. used in the stream processing.

Replace replayLogFrom with a 'lastZxid' parameter in the constructor

The user of QuorumZab cannot call replayLogFrom before the synchronization phase completes because the local log might get truncated. I think it's easier if the QuorumZab constructor takes the last committed zxid from the snapshot (null if there is no snapshot) and redelivers all the transactions from that zxid after the synchronization phase.

Snapshots

My understanding is that the application on top of zab will be the one maintaining the state (e.g., data objects, like znodes). In this case, the zab layer can't be responsible for generating snapshots, it must be the app on top. The zab layer is only responsible for ordering and maintaining the txn log.

There are two issues related to this:

  • Recovery: assuming the app takes snapshots, when a replica comes back up, it will request from the zab layer a suffix of txns. I assume that the app will load a snapshot, and this snapshot will have a reference to a txn t. This txn t must be the last one applied when the app started the snapshot. The app next requests the suffix corresponding to all txns after t. Consequently, we need a call to request such a suffix.
  • Trimming the log: we could have a call that allows a replica server to trim a prefix of the log. Once the app generates a snapshot successfully, it can tell the zab layer to drop a prefix of the log locally.

I know we talked about postponing the implementation of snapshots, but I'm thinking that we probably don't need to do more than what I'm saying here. How does it sound?

Make quorumSet local to lead() method

Right now we need to remember to clear() the quorumSet at the beginning of each iteration.

https://github.com/ZK-1931/javazab/blob/master/src/main/java/org/apache/zab/Participant.java#L486

It's probably better to make the quorumSet local to the lead() method. This way it's also easier to unit test each method separately. It might look something like:

lead() {
      quorumSet = getPropsedEpochFromQuorum();
      proposeNewEpoch(quorumSet);
      waitEpochAckFromQuorum(quorumSet);
      String serverId = selectSyncHistoryOwner();
      synchronizeFromFollower(serverId);
      synchronizeFollowers(quorumSet);
      beginBroadcasting(quorumSet);
}

how do i start from log dir?

i'm having trouble starting pulsed from logdir.

$ ./bin/pulsed -port 8080 -addr localhost:5000
# ctrl-c
$ ./bin/pulsed -dir localhost:5000 -port 8080                     
2014-12-06 12:45:23,096 INFO  main org.eclipse.jetty.util.log:188 Logging initialized @235ms
2014-12-06 12:45:23,141 WARN  main com.github.zk1931.jzab.Zab:172 Caught an exception while initializing Zab.
Exception in thread "main" java.lang.IllegalStateException: Failed to initialize Zab.
        at com.github.zk1931.jzab.Zab.<init>(Zab.java:173)
        at com.github.zk1931.jzab.Zab.<init>(Zab.java:127)
        at com.github.zk1931.jzab.Zab.<init>(Zab.java:83)
        at com.github.zk1931.pulsed.Pulsed.<init>(Pulsed.java:63)
        at com.github.zk1931.pulsed.Main.main(Main.java:100)
Caused by: java.lang.RuntimeException: Can't find configuration file.
        at com.github.zk1931.jzab.Zab$MainThread.<init>(Zab.java:493)
        at com.github.zk1931.jzab.Zab.<init>(Zab.java:166)
$ find localhost:5000 
localhost:5000
localhost:5000/ack_epoch
localhost:5000/proposed_epoch
localhost:5000/cluster_config.000000000000000_000000000000000
localhost:5000/data000000000000000
localhost:5000/data000000000000000/transaction.000000000000000_000000000000000

it would be good if the help message from the pulsed command is more descriptive.

Don't send COMMIT with zxid higher than the last acknowledged zxid of the peer

@m1ch1 we discussed about some ways of how to synchronize new joined follower in broadcasting phase correctly. They're all not very easy understanding.

So here is the new way I think it might be easier to follow.

  • When a new follower joins in, we just flush the pipeline and get last proposed zxid and synchronize the follower up to that point. It doesn't matter whether these proposals are committed or not, there's no extra logic to handle these.
  • In the follower side, when they receive all the proposals of the synchronization, they just go to broadcasting phase directly without waiting for the COMMIT message. But at this time, they don't notify the application to serve request.
  • New joined followers notify application of WORKING state only when its first time delivered transactions (First time CommitProcessor deliver transactions).

The tricky part is that in our current implementation, CommitProcessor only maintains pending transactions which are proposed after synchronization phase. And the reason we do this is that when the CommitProcessor receives a COMMIT message, the zxid of this message might not in the log.

The way to solve this is that the AckProcessor of leader won't send COMMIT with zxid higher than the lastAckZxid of follower. Current our implementation just find out the zxid can commit and broadcast to all. Once we only send COMMIT with zxid smaller or equal than lastAckZxid, once the follower receives a COMMIT message of Zxid z, it's guaranteed that the transaction of Zxid z is in log.

The another advantage of doing this is that we won't put PROPOSAL in CommitProcessor anymore. Currently we give both COMMIT and PROPOSAL to CommitProcessor, which is a little confusing.

missing protoc

I'm getting this error when compiling:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-antrun-plugin:1.3:run (generate-sources) on project javazab: An Ant BuildException has occured: Execute failed: java.io.IOException: Cannot run program "protoc": error=2, No such file or directory -> [Help 1]

Do I need to install the protobufs compiler on the side?

Zab interface improvements

I noticed a few things while implementing zabkv.

  • The user of Zab needs to be notified when the leader goes down so that it can clear all the pending transactions.
    https://github.com/ZK-1931/zabkv/blob/master/src/main/java/org/apache/zabkv/Database.java#L47
  • SingleNodeZab.send() should not throw IOException. I guess we could just get rid of SingleNodeZab once we have QuorumZab working with a single server.
  • The deliver method needs to be able to identify the origin of the message to match up the message with the pending transaction. The user could encode the server id and the local transaction id directly into the message, but maybe Zab should provide these info.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.