Code Monkey home page Code Monkey logo

rabit's Introduction

Rabit: Reliable Allreduce and Broadcast Interface

Build Status Documentation Status

Recent developments of Rabit have been moved into dmlc/xgboost. See discussion in dmlc/xgboost#5995.

rabit is a light weight library that provides a fault tolerant interface of Allreduce and Broadcast. It is designed to support easy implementations of distributed machine learning programs, many of which fall naturally under the Allreduce abstraction. The goal of rabit is to support portable , scalable and reliable distributed machine learning programs.

Features

All these features comes from the facts about small rabbit:)

  • Portable: rabit is light weight and runs everywhere
    • Rabit is a library instead of a framework, a program only needs to link the library to run
    • Rabit only replies on a mechanism to start program, which was provided by most framework
    • You can run rabit programs on many platforms, including Yarn(Hadoop), MPI using the same code
  • Scalable and Flexible: rabit runs fast
    • Rabit program use Allreduce to communicate, and do not suffer the cost between iterations of MapReduce abstraction.
    • Programs can call rabit functions in any order, as opposed to frameworks where callbacks are offered and called by the framework, i.e. inversion of control principle.
    • Programs persist over all the iterations, unless they fail and recover.
  • Reliable: rabit dig burrows to avoid disasters
    • Rabit programs can recover the model and results using synchronous function calls.
    • Rabit programs can set rabit_boostrap_cache=1 to support allreduce/broadcast operations before loadcheckpoint rabit::Init(); -> rabit::AllReduce(); -> rabit::loadCheckpoint(); -> for () { rabit::AllReduce(); rabit::Checkpoint();} -> rabit::Shutdown();

Use Rabit

  • Type make in the root folder will compile the rabit library in lib folder
  • Add lib to the library path and include to the include path of compiler
  • Languages: You can use rabit in C++ and python
    • It is also possible to port the library to other languages

Contributing

Rabit is an open-source library, contributions are welcomed, including:

  • The rabit core library.
  • Customized tracker script for new platforms and interface of new languages.
  • Tutorial and examples about the library.

rabit's People

Contributors

abdealiloko avatar akrylysov avatar cblsjtu avatar chenqin avatar codingcat avatar dennisobrien avatar elferdo avatar ericchendm avatar export-default avatar felixybw avatar hcho3 avatar headupinclouds avatar hjk41 avatar horgh avatar kabu4i avatar lqhl avatar nachocano avatar nateagr avatar nthelement avatar snehlatamohite avatar sperlingxx avatar tbjohns avatar thirdwing avatar tomlaube avatar tqchen avatar trivialfis avatar zhengruifeng avatar ziyuehuang avatar zjf avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabit's Issues

What's the difference between AllreduceRing and AllreduceTree?

I noticed that when data is big enough, the code seems to choose TryAllreduceRing() function rather than TryAllreduceTree() function.
Is the TryAllreduceRing()'s performance better than TryAllreduceTree()?
If so, in what's way?
I guess the communication cost is the key.
I googled the question, but can't get the suitable answer.
Looking for your reply.

How to set the environment variables in rabit?

when I want to compile the C++ files,there are so much error like error: template with C linkageanderror: redeclaration of ‘unsigned int wait::<anonymous struct>::__w_retcode’ .etc.I don't how to fix it,but there must be something wrong in the environment variables.
This is my environment variables.

export CPLUS_INCLUDE_PATH=/home/panjianqiao/Downloads/rabit/include/rabit:/home/panjianqiao/Downloads/dmlc-core/include/dmlc:/home/panjianqiao/Downloads/dmlc-core/include:$CPLUS_INCLUDE_PATH
export LIBRARY_PATH=/home/panjianqiao/Downloads/rabit/lib:$LIBRARY_PATH
export LD_LIBRARY_PATH=/home/panjianqiao/Downloads/rabit/lib:$LD_LIBRARY_PATH

btw,my clone path is /home/panjianqiao/Downloads/rabit and /home/panjianqiao/Downloads/dmlc-core

[DISCUSSION] add random reshuffle support

Recently had conversation with (horovod) distributed tensor flow author, one of take away is Spark and other data processing frameworks lack of good random reshuffle support that can achieve good randomization as well as performance to move large chunk of training dataset around.
The abstract problem illustration as below, with N workers each holding M_i rows, we want to have an interface for each worker to call which will achieve certain randomization with lowest time cost.

Canonical random shuffle in Spark and other frameworks go through certain number of rows and get random destination one row after another. This bipartite graph shuffle may not achieve optimal network thought nor failure recovery in case of huge dataset.

Naive idea is to reduce random insert and delete rows from local dataset to bucketized swap as each worker random reshuffle. This benefit cache as it fit better with filesystem to memory swap. e.g if we divide 1M rows in single host into 100 groups, when every worker work on same group it sort rows ranked by other peer ranks and then swap(or lazy swap) with that peer once other peer also finished sorting in same group. After 1st group finished, next chunk of rows load into memory and repeat until M_i reached.

screen shot 2018-10-30 at 21 46 40

dmlc-submit does not accept ssh cluster type

dmlc-submit does not accept ssh cluster type.

I see that ssh.py exists, but it is not included in submit.py. I was able to run a small test with ssh cluster type after changing sybmit.py to include ssh.py and accepting it as an option.

Is there any specific reason why it was left out?

rabit - failure no network connection

xgboost4j / rabit seems to fail to start the tracker in case there are no network connections available.

I have to admit that I tried to run xgboost4j in spark local mode on my laptop whilst it was not connected to a WIFI. Still a strange error, I thought it would work in local only mode as well.

tracker started, with env={}
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger: 2016-11-11 19:16:00,643 WARNING gethostbyname(socket.getfqdn()) failed... trying on hostname()
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger: Traceback (most recent call last):
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:   File "/var/folders/zm/bts0g37j0l9637sjxb3b700h0000gn/T/tracker1300409870996753788.py", line 475, in <module>
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:     main()
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:   File "/var/folders/zm/bts0g37j0l9637sjxb3b700h0000gn/T/tracker1300409870996753788.py", line 470, in main
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:     start_rabit_tracker(args)
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:   File "/var/folders/zm/bts0g37j0l9637sjxb3b700h0000gn/T/tracker1300409870996753788.py", line 432, in start_rabit_tracker
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:     rabit = RabitTracker(hostIP=get_host_ip(args.host_ip), nslave=args.num_workers)
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:   File "/var/folders/zm/bts0g37j0l9637sjxb3b700h0000gn/T/tracker1300409870996753788.py", line 396, in get_host_ip
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:     s.connect(('10.255.255.255', 1))
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:   File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 228, in meth
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger:     return getattr(self._sock,name)(*args)
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger: socket.error: [Errno 51] Network is unreachable
16/11/11 19:16:00 INFO RabitTracker$TrackerProcessLogger: Tracker Process ends with exit code 1
16/11/11 19:16:00 INFO XGBoostSpark: repartitioning training set to 4 partitions

How to run rabit on cluster?

please,I want to know more details about using the rabit to implement program communication in a distributed cluster.

are docs in guide/ folder out of date?

I don't see that ../tracker/rabit_demo.py file anywhere. Also I am wondering what's the expected output for the first c++ demo in guide.md, coz the output i got makes no sense to me at all.

$ python2.7 basic.py
@node[0] before-allreduce: a=[ 0. 1. 2.]
@node[0] after-allreduce-max: a=[ 0. 1. 2.]
@node[0] after-allreduce-sum: a=[ 0. 1. 2.]

a problem when use mpi cluster

I changed a little the example basic.cc , in the orignal code array a is just 3 elements , i malloc a space about 125000 double elements ,and the program stuck:
the command i use as follow:
rabit_mpi.py -n 4 "basic.exe 125000"
code as follow:

include <rabit.h>

using namespace rabit;
//const int N = 3;
int main(int argc, char _argv[]) {
int *a;
int *aa;
int len;
printf("start\n");
rabit::Init(argc, argv);
len = atoi(argv[1]);
a = (int *)malloc(len_sizeof(int));
aa = a;
for (int i = 0; i < len; ++i) {
*aa = rabit::GetRank() + i;
aa++;
}
Allreduceop::Sum(&a[0], len);
printf("done\n!");
rabit::Finalize();
return 0;
}
is it a bug?

Require more details about building rabit with MPI support

Hi,

I'm trying to build wormhole from scratch in which rabit is a dependent lib. My target platform is MPI-enable with assistance from some Batch Job Management System. I wonder if it is possible to add more details about bulidng rabit with MPI support in README. For example,

  1. In order to run wormhole with MPI, is it necessary to build rabit with MPI support? How?
  2. Regarding MPI libraries out there (OpenMPI, Intel MPI, MPICH2, MVAPCHI...), what't your recommendation and in which you have tested yet?

Best,

Boroadcast issue in openmp parallel loop

    template<typename DType>
    static void Allgather(std::vector<DType>& datas)
    {
        std::vector<std::vector<DType> > values(rabit::GetWorldSize());
        values[rabit::GetRank()] = move(datas);
        for (size_t i = 0; i < values.size(); i++)
        {
            rabit::Broadcast(&values[i], i);
            gezi::merge(datas, values[i]);
        }
    }

         #pragma omp parallel for
    for (size_t i = 0; i < 16; i++)
    {
        vector<int> values;
        if (rabit::GetRank() == 0)
        {
            values = { 1, 2, 3 };
        }
        else
        {
            values = { rabit::GetRank(), 4, 5, 6 };
        }
        Rabit::Allgather(values);
    }

This will cause
*** Aborted at 1434420821 (unix time) try "date -d @1434420821" if you are using GNU date ***

AssertError:Allreduce: boundary check

PC: @ 0x471f33 (anonymous namespace)::cpp_alloc()

AssertError:maxdf must be smaller than FDSETSIZE
AssertError:maxdf must be smaller than FDSETSIZE

AssertError:PushTemp inconsistent
^CTraceback (most recent call last):
File "/home/users/chenghuige/tools/rabit_demo.py", line 96, in
tracker.submit(args.nworker, [], fun_submit = mthread_submit, verbose = args.verbose)
File "/home/users/chenghuige/tools/rabit_tracker.py", line 316, in submit
master.accept_slaves(nslave)
File "/home/users/chenghuige/tools/rabit_tracker.py", line 258, in accept_slaves
fd, s_addr = self.sock.accept()
File "/home/users/chenghuige/.jumbo/lib/python2.7/socket.py", line 202, in accept
sock, addr = self._sock.accept()

Allreduce will some times hang

The code is like below, will hang in allreduce, but if remove "#pragma omp parallel for " will ok, even though allreduce is outside of parallel loop. During some run I also face "
:AssertError:check ack & check pt cannot occur together with normal ops
", ":zero size check point is not allowed", all these will not occur if remove "#pragma omp parallel for"

          vector<bool> needMoreStep(TrainData.NumFeatures, false);
            #pragma omp parallel for 
        for (int featureIndex = 0; featureIndex < TrainData.NumFeatures; featureIndex++)
        {
            if (IsFeatureOk(featureIndex))
            {
                needMoreStep[featureIndex] = CalculateSamllerChildHistogram(featureIndex);
            }
        }

            for (int featureIndex = 0; featureIndex < TrainData.NumFeatures; featureIndex++)
            {
                if (IsFeatureOk(featureIndex) && needMoreStep[featureIndex])
                {

               //rabit allreduce here.. will hang 
                        AllreduceSum((*_smallerChildHistogramArray)[featureIndex]);
                }
            }

[RFC] Rabit2 Design Proposal

Problem

MPI became de facto protocol for distributed machine learning synchronization framework. From traditional ML such as xgboost or light bgm to recent deep learning framework such as Pytorch or Tensorflow (horovod), MPI communication primitives fits well with machine learning iterative synchronization diagrams.

Majority of MPI frameworks such as facebook gloo or nvidia nccl or uber horvood focus on efficiency especially on GPU communication bandwidth. The driving force behind mostly were able to sync large neural network model efficiently and avoid congestion.

In light of larger dataset, distributed ml involves more hosts and running longer. The risk of single point of failure increases. MPI by nature requires all up which makes it vulnerable to single point of failure. In response to this challenge, majority of ML frameworks support synchronous external checkpoints where training job will perform upload model to external storage before the next iteration. In case any host failure happens, scheduler will redo data distribution, command workers pull last checkpoint from external storage and resume training.

The upside of such implementation divide synchronization failure recovery from MPI operation implementation. It allows various frameworks implement its own checkpointing logic and integrate with device specific synchronization libraries; The downside of such implementation is recovery usually take much longer as it involves reingest entire dataset and involve all workers bootstrap from last checkpoint. Moreover, checkpoint to external storage is relative expensive operation where users choose to do only after a few iterations, making training time lost grows even larger.

On the other venue, compute resource schedulers (K8S GCP, Peloton) introduced preemptive resource allocation to improve overall compute resource utilization. While preemptive resource allocation did improve overall resource utilization ratio and way cheaper to get from cloud vendors. It post a big stability challenge to large scale distributed ML jobs.

Architecture

Resilient ML sync designed to served as generic and performant infra piece tackling common issues in large scale machine learning jobs.

  • Partitioned Mapper: it served as interface with data source and resource scheduler. ML code along with ML sync worker were executed with mapper interface
  • Rabitagent: it served as agent on each mapper and form mapi network to execute allreduce workload from user ML code performantly and reliably.
  • User ML code: it served as building blocks of distributed machine learning building block, to end user Partitioned Mapper and ML sync worker are transparent thanks to machine learning frameworks support(XGBoost, Tensorflow)
  • Shard In Memory Format: thanks to various languages and frameworks involved in large scale machine learning. There is a need to avoid memory copy and ser/des cost.

    Capture

EventDriven Rabit FSM

we employed event based with epoll/kqueue, oa allreduce worker is going through following FSM while executing Allreduce /BroadCast workload. State transition of a worker is triggered by function calls or network events from epoll sockets on non blocking and threading fashion.

List of function call events from code are

  • Start agent: [F1]
  • Connect tracker [F2]
  • Connect peers [F3]
  • Shutdown agent [F4]
  • Call AllReduce [F5]
  • Call Broadcast [F6]
  • Accept missing peers[F7]

List of network events from between agents are

  • EPOLLIN [E1]
  • EPOLLOUT [E2]
  • EPOLLPRI [E3]
  • EPOLLERR [E4]
  • EPOLLUP [E5]

Minor note, thread safe exactly once delivery is guarded with low level edge trigger. It offers better threading parallelism when dealing with multiple connections heavy weight send/recv events handling.

  • AllReduce(AllGather) state, worker no longer doing full loop of connections. It instead only listen to set of connections with data and decide if partial reduce can be executed / partial write can be pipelined.
  • Broadcast(All Scatter) state, thanks to the efficiency of able to operate on large number of connections.(ref C10K), nodes on top of tree can choose to run parallel speculative execution and flood latest model to entire topology when socket EPOLL WRITE is available.
  • Peer Error state, EPOLL ERR/ EPOLLUP will be instrumented to all connected agents in topology. Agents not connected or not interacting peer in substate within allreduce/broadcast state doesn’t need to wait for new agent back online. So peer error state only triggered when E4/E5 happens and agent is reading/writing to it. Otherwise, it will be auto recovered without change agent state in separate thread. (More detail of failure recovery will be introduced in latter section)
  • Task Complete state: agent sends to all peers when it finished broadcast/allreduce task with out of bound message. When all agents recvs out of bound messages from all peers it interacted with, it goes to idle state (mark entire job finishes) and ready for the next job. In the next section of documentation, we will cover more how we utilize asynchronous nature of task complete state in MPI cluster to build efficient snapshot and asynchronous external checkpoint/restore.

2

Notice, with some code change, it is possible to build FSM based on current pollhelper interface instead of epoll. So this is not strictly depending on epoll but more good to IF MORE CONNECTIONS NEEDS TO BE KEPT

Failure Recovery

In original rabit paper, it described failure recovery as following steps

  1. Pause every node until the failed node is fully recovered.
  2. Detect the model version we need to recover by obtaining the minimum operation number. This is done using the Consensus Protocol explained in section 3.2.2.
  3. Transfer the model to the failed node using the Routing Protocol explained in section
  4. Resume the execution of the failed node using the received model. 5. Resume the execution of the other nodes as soon as the failed node catches up.
  5. Single agent crash will stall entire network until 1) failed node back online 2) reach consensus which model to use 3) transferred virtual checkpoints from other agents using routing protocol.

Looking into communication between agents across entire topology, this global lock is not always needed. Those five steps recovery can be optimized with following locking improvements.

Let’s model a single task on a single agent like following:

Data flow

Output = reduce( input1, input2, input3… input_n, input self)

where each input can be parallelized (e.g a dedicated thread); Reduce function enjoy out of order execution property against inputs from sources f(a, b, c) = f(f(a,b), f(c)) and hence can prioritize execution against received results. we introduce two blocking primitives wait_for_peer_recovery and resume to handle peer error state (state transition logic skipped below)

input_i is future running with nonblocking recv 
return entire result from peer_i or reset and keep recv yet transfered result from new peer_j (with same rank as peer_i)
output = reduce(input_self)

while( not all source inputs received){
	for(int i = 0 ; i < n ; i++){
                 if(input_i marked error){
                     input_i.mark_blocked()
                     continue;
                 }
                if(input_i.reduced() == false){
                     output = reduce(reduce(input_i.get()), output)
                 }
         }

        if(all_input_reduced) break;

        for(int i = 0 ; i < n ; i++){
               if(input_i.blocked()){
                     input_i.wait_for_peer_recovery() //blocing call
                     input_i.resume() //resume listening from partial data
               }
        }
}

ouput.send(ouput)
mark allreduce task complete 
//keep output in send buffer until hear allreduce finished at sink peer 

The algorithm fits well with epoll_event based edge trigger input_i ready can be observed with EPOLLIN against each socket independently. Most heavy lift data transfer can be done in parallel while reduce function can be executed on partial inputs instead of waiting for the entire input set become available. It also avoid global locking of network topology with implied data dependency on any connected graph.

Lazy Blocking Recovery

Compared with baseline, we identified two cases where peers downing data exchange with crashing peers could prioritize works and block instead of reset.

  • Peer not waiting for result from crashed agent should keep running as usual.
  • Perr waiting for input from crashed source can still keep other input received/ apply function running. Once crashed source load checkpoint and backfill input, output can be generated against all existing results f(f(existing inputs), f(crashed_backfill_input))

Three cases of running ALLREDUCE MIN on tree based topology(it also applied to ring based)

  1. Agent 1 crash after agent 2 received entire data from agent 1. Async agent processor allreduce(min) keep executing and output 0 despite agent 1 is missing. Agent 1 recovery runs in parallel with allreduce task
  2. Agent 1 crash before send entire data to agent 2, async agent processor allreduce(min) keep executing and output 0 despite agent 1 is missing. It blocks and wait until agent 1 recover (peer error state). During waiting agent 1 recover time, if agent 0 crash, output will treat as case 1)
  3. Agent 1 crash before agent 2 broadcast to it, agent 2 broadcast rest agents to and wait for agent 1 to recover ( peer error state) wait for agent 1 back with latest virtual checkpoint from last iteration. Once agent 1 recover, agent 0 send output to agent 1 and jump to task complete state. At this point, output still stored in send buffer in case any peer crash before they report task complete. It only clear output when agent goes into wait task state.

5

Agent running on peer error state and agent already finished communicating with peers. This is the case where peer read finished all bytes. Agent in peer error state can keep running until next task read peer state. At the same time, the peer in error state(the one cause current agent in peer error state) needs to go through the recovery process by pulling the latest version of model as well as result collected from other peers via read peer. Such data lineage can be found with send buffer per link (up to one iteration). By storing model checkpoint along with send buffers per link in each agent across entire cluster(see rabit paper), rabit2 will be able to recover only crashed agents.

Partial Result

As stated in the previous sections, sources are sending data in non blocking and asynchronous fashion. In case agent crashed, recovered agent can sync with peer on how many bytes already sent to other side. Recovered peer would be able to just send starting from already sent offset instead of resend everything. This helps save network bandwidth.

Screen Shot 2019-04-04 at 11 26 50

minor problem: misc python2-only print statements

/lib/python3.6/site-packages/xgboost-0.7-py3.6.egg/xgboost/rabit/guide/basic.py", line 20
print '@node[%d] before-allreduce: a=%s' % (rank, str(a))
^
SyntaxError: invalid syntax

This shows up as a bit of noise in the install process.

Memory leak when broadcasting in a loop

Hi,

I am using the rabit code from aeb4008 as I am using gcc 4.4 which does not have nullptr and the next commit doesn't compile with the gcc 4.4 I have.

When I create a simple with a large integer array with lots of elements and broadcast it to all the nodes multiple times, I seem to be getting an error.

Here are my observations with 5 nodes/machines with 1GB memory allocated to them:

  • Integer array size: 8Million , 10 loops - Works
  • Integer array size: 80Million , 10 loops - Fails after 5 loops
  • Integer array size: 8Million , 100 loops - Fails after 32 loops

Also, If I give 3GB memory allocated to every node/machine, with: Integer array size: 8Million , 100 loops - it fails after 64 loops

I'm using a modified code from https://github.com/dmlc/rabit/blob/a9a2a69dc1144180a43f7d2d1097264482be7817/guide/broadcast.cc where I use a int array instead of a char array, and broadcasting in a loop.

It seems to be that after I broadcast the array once, the memory is not de allocated. Do I need to write some specific code to deallocate this array which has been broadcasted ?

CC @joshyjoseph

Does the source package has any dependencies? like python version etc.

The Python version I am using is:

Python 3.5.0a1 (default, Jul 15 2015, 17:58:06)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-51)] on linux

When I run "python ../tracker/rabit_demo.py -n 2 basic.rabit".
I get :

Traceback (most recent call last):
File "../tracker/rabit_demo.py", line 96, in
tracker.submit(args.nworker, [], fun_submit = mthread_submit, verbose = args.verbose)
File "/home/zhaoyin.zy/rabit/tracker/rabit_tracker.py", line 316, in submit
master.accept_slaves(nslave)
File "/home/zhaoyin.zy/rabit/tracker/rabit_tracker.py", line 259, in accept_slaves
s = SlaveEntry(fd, s_addr)
File "/home/zhaoyin.zy/rabit/tracker/rabit_tracker.py", line 53, in init
magic = slave.recvint()
File "/home/zhaoyin.zy/rabit/tracker/rabit_tracker.py", line 35, in recvint
return struct.unpack('@i', self.recvall(4))[0]
File "/home/zhaoyin.zy/rabit/tracker/rabit_tracker.py", line 33, in recvall
return ''.join(res)
TypeError: sequence item 0: expected str instance, bytes found
AssertError:ReConnectLink failure 2
Socket RecvAll Error:Connection reset by peer

Fault tolerance not work: Allreduce Recovered data size do not match the specification of function call

I ran xgboost on yarn and test if fault tolerance could work. I started 4 workers. When xgboost started to update model, I killed one worker(called worker0). Yarn started a worker named worker0_1 instead. But the worker failed finally due to this error:  Allreduce Recovered data size do not match the specification of function call.
The responding code is,

allreduce_robust.cc(line 817)
if (role == kRequestData || role == kHaveData) {
utils::Check(data_size == size,
"Allreduce Recovered data size do not match the specification of function call.\n"
"Please check if calling sequence of recovered program is the "
"same the original one in current VersionNumber");
}

I printed some details then. data_size is 800 and size is 8. But I don't know the reason.

Publish rabit on Pypi ?

Currently to use rabit we checkout, compile and manually include the libs into our python project

This also means forking rabit.py and adapting _loadlib to load our own lib instead of rabit.so

(as it is also done for XGBoost)

This nevertheless means some code duplication.

It would be easier for us if we directly could add rabit to our requirements.txt and it would pull rabit's pythons files, headers & natives libs.

Any plans on creating a wheel for rabit that would be published to pypi ?

As an alternative we could adapt rabit.py to be able to load with any lib (such that when compiling we just copy the file instead of forking it)
Seems like a minor change in _loadlib and maybe make _find_lib_path injectible.

stale guide.md?

it seems that the directory structure of rabit has changed a lot, e.g. rabit_demo.py does not appear anywhere while guide.md indicates that users can run the demo with this command...

How to run /guide/*.cc

In the rabit/doc/guide.md, we can run the example code in /guide by using python script:

../tracker/rabit_demo.py -n 2 basic.rabit

However, I can't find the ../tracker/rabit_demo.py

Is the file in dmlc-core?

Uploading }12@IC(U7N9(IGZ3KMX`ALV.png…

intra-node memory sharing?

I am interested to see if it is possible to add the feature of intra-node memory sharing to rabit. That said, is it possible for different processors at the same node to work with the same array. As long as the user guarantees the data race will not happen in the design of algorithm, this can be quite useful. For example, a machine learning algorithm whose inputs are (SkipGram) word vectors may want to ensure the embedding of dictionary is one copy per node (saving memory).

In the backend implementation, I believe it is the three commands: shm_open() - ftruncate() - mmap(), which I think is not very difficult to implement into rabit.

Purpose of -msse2 flag used while compiling the rabit source code ?

Hi,

Its been observed that -msse2 flag is used while compiling the code. We have not found any SSE/Intel specific code available in rabit source code. Can we remove the flag since it is no longer needed? It will help to compile code different architectures (apart from x86) without changes in Makefile.

I think this can be solved like what has been done in MXNet: https://github.com/dmlc/mxnet/blob/master/make/config.mk#L119-L124

Thanks,
Snehlata Mohite.

problem with submit the basic.exe to msmpi cluster

follow your guide i compile the basic.exe ,and use rabit_demo.py to test it locally, the result is correct.
but when I try to excute as follow ,the result is wrong :

D:\cluster\clustertest\Release>mpiexec -n 4 basic.exe
@node[0] before-allreduce: a={0, 1, 2}
@node[0] after-allreduce-max: a={0, 1, 2}
@node[0] after-allreduce-sum: a={0, 1, 2}
@node[0] before-allreduce: a={0, 1, 2}
@node[0] after-allreduce-max: a={0, 1, 2}
@node[0] after-allreduce-sum: a={0, 1, 2}
@node[0] before-allreduce: a={0, 1, 2}
@node[0] after-allreduce-max: a={0, 1, 2}
@node[0] after-allreduce-sum: a={0, 1, 2}
@node[0] before-allreduce: a={0, 1, 2}
@node[0] after-allreduce-max: a={0, 1, 2}
@node[0] after-allreduce-sum: a={0, 1, 2}

by the way ,when I use the msmpi.dll and the include folder they apply to code ,and use the mpiexec command , the result is correct.
why can't use mpiexec to submit the rabit job ?

[RFC] vectorize reducer

Hi Folks,

We got some first hand results from running reducer in parallel with openmp. In light of hacky nature and thread "abuse" in previously proposed solution. I think it might worth a bit discussion before we move forward.

Why we address this problem, really?

Short answer is we have seen user running larger reducer exceed general guided depth 6-8. Per iteration speed got hit hard in distributed training as we sync on every depth on each iteration.

What is the problem, precisely?
Screenshot from 2019-09-02 10-57-35
Screenshot from 2019-09-02 10-57-07
compiler will have hard time figure out data-flow and alignment when it tried to vectorize it.

How do we plan to address this problem?

We have 3 options

  1. OpenMP simd pragma
    pro is this is a proven solution which might less likely see bugs or issues given it's well supported.
    more detail here http://hpac.rwth-aachen.de/teaching/pp-16/material/08.OpenMP-4.pdf con is this would introduce openmp4 dependency.

  2. use vector class https://github.com/vectorclass/version2 pro is we can use this as testing ground and optimize xgb as a whole. The con is this is a pretty new project, given it's experimental nature, we might run into various of bugs and issues.

  3. hand tune loop and use compiler vectorization, pro is we control how we do it under gcc, con is this will make code less maintainable on various of platforms.
    https://stackoverflow.com/questions/32000917/c-loop-optimization-help-for-final-assignment-with-compiler-optimization-disabl

For now, I would propose we choose least resistance path and go with openmp simd.

Thanks,
Chen

Why we define a vector<uint64_t> in allreduce_base.c ?

When I read the code in src/allreduc_base.c, I found that you define a vector<uint64_t> buffer.

I'm a little confused.

Why not define a vector<char> buffer?

What's the benefit?

Could you explain more specific and name an example?

          // recv buffer ot get data from child

	// aligned with 64 bits, will be all to perform 64 bits operations freely

	std::vector<uint64_t> buffer_;`

Multi-threading and Rabit allreduce/broadcast ops

Hi everyone,

I've developed allreduce, broadcast and allgather ops for TensorFlow based on Rabit ops. While digging into Rabit ops, I realized that they are not thread safe. So I limited to 1 the number of threads used by TensorFlow to compute the graph so far.

Now, I wonder if there is a way to execute several allreduce/broadcast/allgather in parallel. I've looked into the code of XGBoost to get any hint but I did not manage to find parallel calls of Rabit ops. Is there any plan to make Rabit ops thread safe ?

Thanks in advance for your help.

Rabit should import dmlc-core as a CMake target

Similar to #82 , I'm testing PR #113 and in order to compile rabit I need to pull in the dmlc-core headers dmlc-core/include/dmlc and copy those into rabit/include to make the project compile.

First: Am I doing this right?
Second: If the dmlc headers are indeed a requirement to the project, should they be included (and version) in the rabit repo? Not sure what the motivation is for keeping them separate (I can imagine downstream projects that depend on both like xgboost though)

nullptr issue in Mac Apple Clang

When compiling rabit in Apple's Clang compiler I seem to be getting this error:

In file included from rabit/src/engine.cc:17:
rabit/src/./thread_local.h:43:36: error: initializer for thread-local variable must be a constant
      expression
    static MX_TREAD_LOCAL T* ptr = nullptr;
                                   ^~~~~~~
/Library/Developer/CommandLineTools/usr/include/c++/v1/__nullptr:49:17: note: expanded from macro 'nullptr'
#define nullptr _VSTD::__get_nullptr_t()
                ^~~~~~~~~~~~~~~~~~~~~~~~
/Library/Developer/CommandLineTools/usr/include/c++/v1/__config:392:15: note: expanded from macro '_VSTD'
#define _VSTD std::_LIBCPP_NAMESPACE
              ^

This is occurring with latest master branch

Rabit Architecture Diagram/Ports

Hello,

I need to setup an environment where there is a RabitTracker and multiple Rabit slave nodes. I need to do this in it's own environment, but I have to set up firewall rules for "security" reasons. It's clear what port the RabitTracker will be on by default, but what kind of ports need to be open to the Slaves? Do the slaves make calls to each other or does it only call the RabitTracker? It's unclear as there are limited docs/architectural diagrams. I looked through the source code, but it's not very clear.

Cheers,

Can it support simple mpi send recv ?

I am new to parallel programming and find rabit very elegant, easy to use.
For one simple scenario, i want to train an ensemble of trainers, each distinct node will train one and create one predictor, I want to get all the predictors in node 0 and save.How to realize this ?
Now I use rabit::Broadcast , it works but might simple send to node 0 operation be better?
Now my code looks like below

        if (rabit::GetWorldSize() > 1)
        {
            svec spredictors(_predictors.size(), "");
            for (size_t i = 0; i < _predictors.size(); i++)
            {
                if (_predictors[i] != nullptr)
                {
                    spredictors[i] = serialize_util::save(_predictors[i]);
                }
                rabit::Broadcast(&spredictors[i], i % rabit::GetWorldSize()); 
                if (_predictors[i] == nullptr)
                {
                    _predictors[i] = serialize_util::load<PredictorPtr>(spredictors[i]);
                }
            }
        }

Barrier() Feature?

Hi, I was curious if it is possible to add a barrier function like MPI_Barrier(). Currently, it can be done by articulating a Broadcast() call.

How to make each node use multi core

I use rabit_mpi.py, locally I have two computers, and my hosts file looks like
cat ~/hosts
cp01 slots=1
cp02 slots=1
My command like
rabit_mpi.py -n 2 -H ~/hosts "./exe"

However it will use only one core in each computer,
omp_get_num_procs() --- [1]

But when submitting to big cluster in my company, each node will use full cores for openmp multithread.

How can locally do the same ?

Reducing objects whose size is not known at compile time

Right now the interface of rabit AllReduce requires that the items being reduced are POD types whose byte size is know at compile time.

I think the reason for that is that internally it uses MPI_ALLREDUCE which also requires POD data types and the count of items to calculate the memory cost as sizeof(DATA_TYPE) * count.

My question is this: Are there any constraints for performing reductions on objects whose size is only known at runtime? I'm pretty sure it's not possible with the current codebase, I'm just wondering if I'm missing something that would make this impossible at the MPI side (I know for example that there are not sparse collectives in MPI).

add timeout thread to avoid rabit hang forever

We observed some rabit agent hang forever in some production jobs, this is due to fault tolerant underlying assumption "failed worker will retry and catch up".

In some cases where scheduler has limited resources or configured not launching retry task, rest of fleet will hang forever.

Some options

  • add heatbeat to tracker and shutdown worker after timeout
  • use socket OOB message to propagate and init count down thread, shutdown agent after timeout

heartbeat put periodical weight on tracker which already shows performance issue given large scale cluster. This might not be best approach moving forward.

Socket OOB seems more promising. The idea is when allreduce/broadcast/checkpoint operations return error due to connected peers giving out socket error. CheckAndRecover implementation checks return type which already false and reset links. we might be able to have a singleton thread which sleeps for configurable time before exit program. Only when tracker signal worker with recover signal, we might terminate that singleton timeout thread

  /*!
   * \brief if err_type indicates an error
   *         recover links according to the error type reported
   *        if there is no error, return true
   * \param err_type the type of error happening in the system
   * \return true if err_type is kSuccess, false otherwise
   */
  bool CheckAndRecover(ReturnType err_type);

Problem with Rabit on a Dask + google-gke architecture

Hi,

On a simple Dask XGBoost run I get the following error. The sample code looks like:

from dask_ml.xgboost import XGBRegressor
est = XGBRegressor(...)
x = dd.read_csv('somedata.csv')
y = x.y
del x['y'] 
est.fit(x, y)

And the error is as follows:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-4-d50d84593355> in <module>()
      5 y = x.y
      6 del x['y']
----> 7 est.fit(x, y)

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in fit(self, X, y)
    239         xgb_options = self.get_xgb_params()
    240         self._Booster = train(client, xgb_options, X, y,
--> 241                               num_boost_round=self.n_estimators)
    242         return self
    243 

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    167     """
    168     return sync(client.loop, _train, client, params, data,
--> 169                 labels, dmatrix_kwargs, **kwargs)
    170 
    171 

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    273             e.wait(10)
    274     if error[0]:
--> 275         six.reraise(*error[0])
    276     else:
    277         return result[0]

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in f()
    258             yield gen.moment
    259             thread_state.asynchronous = True
--> 260             result[0] = yield make_coro()
    261         except Exception as exc:
    262             error[0] = sys.exc_info()

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in _train(client, params, data, labels, dmatrix_kwargs, **kwargs)
    122     env = yield client._run_on_scheduler(start_tracker,
    123                                          host.strip('/:'),
--> 124                                          len(worker_map))
    125 
    126     # Tell each worker to train on the chunks/parts that it has locally

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1111                             exc_info = None
   1112                     else:
-> 1113                         yielded = self.gen.send(value)
   1114 
   1115                     if stack_context._state.contexts is not orig_stack_contexts:

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _run_on_scheduler(self, function, *args, **kwargs)
   1911                                                      kwargs=dumps(kwargs))
   1912         if response['status'] == 'error':
-> 1913             six.reraise(*clean_exception(**response))
   1914         else:
   1915             raise gen.Return(response['result'])

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/opt/conda/lib/python3.6/site-packages/dask_xgboost/core.py in start_tracker()
     30     """ Start Rabit tracker """
     31     env = {'DMLC_NUM_WORKER': n_workers}
---> 32     rabit = RabitTracker(hostIP=host, nslave=n_workers)
     33     env.update(rabit.slave_envs())
     34 

/opt/conda/lib/python3.6/site-packages/dask_xgboost/tracker.py in __init__()
    166         for port in range(port, port_end):
    167             try:
--> 168                 sock.bind((hostIP, port))
    169                 self.port = port
    170                 break

OSError: [Errno 99] Cannot assign requested address

Also the result of:

import socket
socket.error is OSError

is true .
Any help will be greatly appreciated.

Thanks.

librabit_wrapper.so error?

Hi,

I am trying to install rabit on my mac (gcc 4.8.5) and got the following error after typing make. Could you help me figure out what's missing?

$ make
c++ -c -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -o allreduce_base.o src/allreduce_base.cc
c++ -c -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -o allreduce_robust.o src/allreduce_robust.cc
c++ -c -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -o engine.o src/engine.cc
ar cr lib/librabit.a allreduce_base.o allreduce_robust.o engine.o
c++ -c -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -o engine_mock.o src/engine_mock.cc
ar cr lib/librabit_mock.a allreduce_base.o allreduce_robust.o engine_mock.o
c++ -c -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -o rabit_wrapper.o wrapper/rabit_wrapper.cc
c++ -O3 -msse2 -Wall -Wextra -Wno-unused-parameter -Wno-unknown-pragmas -fPIC -shared -o wrapper/librabit_wrapper.so rabit_wrapper.o lib/librabit.a -Llib -lrt
ld: library not found for -lrt
collect2: error: ld returned 1 exit status
make: *** [wrapper/librabit_wrapper.so] Error 1

git release

I didn't find a release in the repository. To facilitate package versioning, can a project maintainer please tag an initial release for this project? v1.0.0?

Compiling and running tests?

Hello, I thought I'd help out with testing the new features that @chenqin is going to be working on, but I can't seem to be able to get the tests to compile and run properly.

The README in the test folder refers to a keepalive.sh script but that doesn't seem to exist.

I'm able to build the project through CMake, however running make in the tests folder fails for the lazy_recover target with an unexpected error:

g++ -c -Wall -O3 -msse2  -Wno-unknown-pragmas -fPIC -I../include  -std=c++0x -o lazy_recover.o lazy_recover.cc
In file included from src/../include/rabit/internal/engine.h:10:0,
                 from src/engine_mpi.cc:14:
src/../include/rabit/internal/../serializable.h:12:10: fatal error: dmlc/io.h: No such file or directory
 #include "dmlc/io.h"
          ^~~~~~~~~~~
compilation terminated.
Makefile:88: recipe for target 'engine_mpi.o' failed
make[1]: *** [engine_mpi.o] Error 1

Which is weird because the io.h header is indeed under ../inlcude/dmlc.

When trying to run the rest of tests that do compile only the ring all reduce test runs to completion, the others raise runtime errors.

I'm wondering if I'm doing something wrong when running these tests, I simply call make -f test.mk <test-name>.

Do you have instructions for running the tests @chenqin ?

Eliminate extra dataset copy in Python.

During construction of DMatrix, Python wrapper might duplicate the dataset because it's not continuous or not of the right data type. But we can handle these situations inside c++ code to avoid constructing an extra copy of dataset.

My goal is not to share the underlying buffer with Python data structure, but to eliminate the extra copies constructed during conversion inside Python wrapper.

C vs C++ MPI Usage

As you know, the MPI standard deprecated C++ bindings. Some implementations still offer them (e.g. OpenMPI) while others do not (e.g. MSMPI). Would you be receptive to a re-write of the MPI-backend internals to use the MPI C API? Looking at #31 it seems like you're open to the change at least in principle. I am willing to do this, but I wanted to make sure before I started.

Problem with get the local IP in the basic.rabit example

Using the current code to get local IP, e.g. host = socket.gethostbyname(socket.getfqdn()) in rabit/tracker/rabit_tracker.py will get the following ERROR on MAC
"socket.gaierror: [Errno 8] nodename nor servname provided, or not known".
host = socket.gethostbyname(socket.gethostname()) doesn't work, too.

The following changes works for me:
host = socket.gethostbyname('localhost')
I don't know if it works in all situations. Any suggestion?

librabit_mpi compilation issues?

I'm not sure if this is actually an issue or if I'm just doing something stupid. Either way, I'll happily submit a PR with documentation and/or a patch once I figure it out.

When compiling librabit_mpi (with CXX=mpicxx MPICXX=mpicxx make mpi) the normal librabit symbols are not exported (RabitInit, RabitAllReduce, etc.).

With the addition of $(BPATH)/c_api.o to the librabit_mpi line of the Makefile, the symbols are properly exported by librabit_mpi.{a,so} and using librabit_mpi as xgboost's "LIB_RABIT" allows xgboost to compile.

So, is my workaround "correct" or is the intended usage of librabit_mpi something other as a drop-in for librabit, and it's correct that librabit_mpi doesn't export the librabit symbols?

Conflict of _assert with _assert defined in the standard headers on FreeBSD

/usr/ports/misc/xgboost/work/xgboost-0.90-235-gad4a1c73/rabit/src/allreduce_robust.cc:62:23: error: too many arguments provided to function-like macro invocation
      cur_cache_seq), "Shutdown: check point must return true");
                      ^
/usr/include/assert.h:54:9: note: macro '_assert' defined here
#define _assert(e)      assert(e)
        ^

please provide a release version

Hi, we chooses the xgboost as an important part of our business, but we expect all libraries to be release version. So would you please choose a branch to release a stable version?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.