rjagerman / glint Goto Github PK
View Code? Open in Web Editor NEWGlint: High performance scala parameter server
License: MIT License
Glint: High performance scala parameter server
License: MIT License
We need to set up a documentation (e.g. on github's free pages) that provides good descriptions on all methods and classes that the library offers.
I start a master service, and config the ip and port in a config file. Then both server and client setup by parsing the config file. Although the servers setup successfully, client failed with ActorNotFound exception.
ActorSelection[Anchor(akka.tcp://glint-master@*.*.*.*:13370/), Path(/user/master)]
I am trying this implementation of Glint with Apache Flink. I am rather new to Akka and can not figure out the following error message:
java.lang.NoSuchMethodError: akka.pattern.AskableActorRef$.$qmark$default$3$extension(Lakka/actor/ActorRef;Ljava/lang/Object;)Lakka/actor/ActorRef; at glint.Client.<init>(Client.scala:40) at glint.Client$$anonfun$start$1.apply(Client.scala:302) at glint.Client$$anonfun$start$1.apply(Client.scala:300) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This message appears simply when I try to create a client ( i.e val gc = Client() )
Is it possible that this message is caused because Apache Flink uses a different version of Akka that Glint and somehow Glint is using the Akka version that Apache Flink has imported? If so, how can I ensure that the right version of Akka is called for Glint and Apache Flink respectively?
Thanks
To improve performance we should change the current Java serialization to something better like Kryo or Protobuf.
I want a global word dict, but I found glint has no api for (key, value) store.
In general, SSP mode gives a better performance. Will glint support SSP mode? Thanks.
glint.exceptions.PullFailedException: Failed 10 out of 10 attempts to push data at glint.models.client.async.PullFSM.glint$models$client$async$PullFSM$$execute(PullFSM.scala:67) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:79) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:76) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Glint works well in my toy dataset, but fails with exception in big dataset.
I compiled Glint using Scala 2.10.6 and successfully got the jars.
Then I try to run the example in spark.md, however, I got this error when initializing the client.
@transient val client = Client()
java.lang.NoSuchMethodError: com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
at glint.Client$.start(Client.scala:294)
at glint.Client$.apply(Client.scala:270)
at glint.Client$.apply(Client.scala:258)
at
at
at
at
at
at
at
at $iwC.(:43)
at (:45)
at .(:49)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
at org.apache.spark.repl.Main$.main(Main.scala:35)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
In the line of the file, it looks like it should be val cols = Array.fill[Int](matrix.rows)(index)
instead of val cols = Array.fill[Int](matrix.cols)(index)
?
Implement basic get and set functionality on the server-side of the parameter server.
The parameter server will need to be started on certain machines in a cluster. The first step is to get an Akka listener working on machines and provide some form of stand-alone and easy deployment.
It seems one can only pull (and push) pieces of a Matrix
that have the same number of rows & columns:
e.g. this works:
val matrix = client.matrix[Double](10, 10)
val result = matrix.pull(Array(0L, 1L), Array(0, 1))
But this doesn't:
val matrix = client.matrix[Double](10, 10)
val result = matrix.pull(Array(0L, 1L), Array(0))
with
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at scala.collection.mutable.WrappedArray$ofInt.apply$mcII$sp(WrappedArray.scala:155)
at scala.collection.mutable.WrappedArray$ofInt.apply(WrappedArray.scala:155)
at scala.collection.mutable.WrappedArray$ofInt.apply(WrappedArray.scala:152)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at glint.models.client.async.AsyncBigMatrix$$anonfun$6.apply(AsyncBigMatrix.scala:101)
at glint.models.client.async.AsyncBigMatrix$$anonfun$6.apply(AsyncBigMatrix.scala:99)
at glint.models.client.async.AsyncBigMatrix$$anonfun$glint$models$client$async$AsyncBigMatrix$$mapPartitions$2.apply(AsyncBigMatrix.scala:169)
at glint.models.client.async.AsyncBigMatrix$$anonfun$glint$models$client$async$AsyncBigMatrix$$mapPartitions$2.apply(AsyncBigMatrix.scala:169)
Is this intended to be supported? Actually I was trying to create a 1-column matrix to use in place of vector, so that I could try to use GranularMatrix
to see if I could scale feature space. So I need to do matrix.push(keys, Array(0))
for example (so only the first, and only, column of the matrix, but a slice of rows)
In parameter server scenario, I think network delay is always an important issue. However, each BigMatrix push and pull demand a tuple of size 3. But, row indices and col indices could be merge into one indices. It' will reduce about 1/3 network consumption. Also, the row index must be Long is also a limit. (Not always Long is needed)
By default the parameter server aggregates new values by addition. This is ideal for some algorithms, but not for others. I've made some changes to make aggregation more flexible in commit 5485c6c. Right now it supports aggregation through:
I've been unexpectedly busy these last summer months, so I unfortunately did not find much time to work on Glint. There has been some interesting discussion from users in issues #54, #49 and #28, thank you for your feedback! Starting from November I can free up time in my regular schedule to dedicate to the project.
To address some of the issues that other contributors and users have made, I plan to make some pretty big changes to Glint's internal workings. I'm opening it up for discussion here, so feel free to speak your mind. These are just some of the ideas I have and I love to hear your feedback, concerns and comments on these.
1) Integration with Spark
I have been working on a very simple proof of concept to simplify integration with Spark. In this concept you can include Glint as a library in your application by adding a line to your build.sbt
file and then call glint.run(sparkcontext)
somewhere in your code to start the entire Glint system (master and parameter servers) on your existing Spark cluster. This has several big advantages:
The major drawback is that we would no longer be able to run Glint as a stand-alone service. This change will make Glint dependent on Spark to ship functions across machines. As a result we can not run it outside of Spark. Nevertheless, I feel the benefits definitely outweigh the disadvantages.
2) Setting up benchmarks on simple machine-learning tasks
To measure the impact of future changes, we should have several standard machine-learning tasks on various data sizes (Linear SVM, Logistic Regression, etc.). Perhaps even very simple tasks such as an all-reduce. It is important to establish a baseline here, so that progress can be accurately measured.
3) Breeze dense & sparse vectors/matrices instead of Arrays
Currently all model data is stored as dense Arrays to prevent garbage collection overhead from being extreme. However, sparse models have significant advantages in terms of scalability. I propose a re-work of the internals of the PartialMatrix and PartialVector actors to utilize breeze's dense and sparse classes under the hood. We can return breeze structures as the result of pull requests, they should be easier to use than Arrays for machine learning applications. To manage possible garbage collection problems, we could eventually look into things like scala-offheap.
4) Fault tolerance
If a parameter server goes down, you lose that part of the model. Depending on your algorithm this may range from "we can recover" to "very very bad news". Other implementations of parameter servers boast a chord-like DHT structure that replicates the model across multiple machines to mitigate system failures (see figure 8 of Li et al (2014)). However, this is likely to give us significant performance degradation, because push requests need to go through all replicated models on other machines. We can of course vary the replication rate to balance fault-tolerance against performance. This will probably take the longest as it is quite complicated to implement. Nevertheless, it seems like an important thing to do to bring Glint to a larger audience.
5) Documentation
The documentation is in a bit of an awkward place right now. Several parts are missing, and other parts need to be clarified or rewritten. With the earlier proposed changes, the documentation can change quite significantly, so I don't want to waste a lot of time writing documentation that will be obsolete soon. If anyone has a good suggestion for a documentation framework in which we can easily publish to gh-pages, I'd love to hear it. I'm currently toying around in Sphinx and it works decently well, however, suggestions are welcome.
This will cause errors when having more than one parameter server running.
I have been playing around with glint for a few weeks and am excited about it's potential. However the counts I want to eventually push to the parameter server are larger than the max value of type Long. I see there isn't any support for BigInts. Wondering if there is any reason for this?
We need to add instructions on how to run the system as a simple localhost test.
Analogous to #19 the same problem exists with the asynchronous implementation of BigVector.
I am following the instructions on the Glint Spark Integration page. I have a Glint "master" and a "server" running on localhost. I am trying to create a default Glint client in Spark shell and am getting the following error:
java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; at akka.actor.ActorCell$.<init>(ActorCell.scala:336) at akka.actor.ActorCell$.<clinit>(ActorCell.scala) at akka.actor.RootActorPath.$div(ActorPath.scala:185) at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:465) at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:124) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:192) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:231) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578) at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) at glint.Client$.start(Client.scala:289) at glint.Client$.apply(Client.scala:270) at glint.Client$.apply(Client.scala:258) ... 48 elided
Although in a simple sbt console, I am able to create a Glint client
We wish to distribute (key, value) pairs over parameter servers. This can be done consistently via key hashing. To get started assume we have n
parameter servers labeled 1 ... n
. This information can be stored in a context-like object which is distributed to all workers. We can then hash a key and compute the n
th modulo of the output. This gives us the appropriate server storing that key. For a large set of keys we will get a roughly uniform distribution for a proper hash function.
Hi Rolf,
I saw your comment in ticket: SPARK-6932. I'm excited and very interested in this project. I seriously believe Parameter Server is an essential piece to further scale ML algorithms into super high dimensional space. Do you have any examples of how to try this project out in YARN?
Many Thanks!
Yitong
Artery will be the new Akka remoting, based in Aeron IPC. It will be interesting to see the performance implications for communication.
I have millions of discrete 64-byte features, remapping features to continuous ids is expensive. Can glint support that?
We should separate out the performance benchmark tests and run them automatically CI. Additionally, more performance benchmark tests should be written to properly test the performance of all the different components. This should make it easy to see when future code changes impact the performance in a significant way.
Hello Rolf!
I have had a look about your code, what troubles me is how glint is interface with spark,i even don't see
a line code related to spark.
Best Wishes!
Codelife
Would there be interest in some random initialisation mechanism for vectors / matrices? Often it is ok to init with 0s (the implicit case with new Glint data structures). But sometimes you want to have random normal or random uniform init.
It's possible to do it yourself via a forEachPartition
and using a push
, but it is a bit of a hack really. It seems much simpler & cleaner to provide an interface for this, like client.random.vector.randn[Double](dim, mean = 0, std = 1.0)
.
What do you think?
@rjagerman when doing some localhost testing I've run into timeout issues during push/pull of a vector.
When the vectors are relatively small (like ~100k elements) everything seems to work fine. When I've tried larger feature dimensions (1,355,191
elements for news20.binary libsvm dataset) things start timing out.
At the beginning of an iteration in rdd.foreachPartition
I do:
val values = Await.result(vector.pull(keys), 5 seconds)
// compute gradient and update
...
vector.push(keys, update.data)
After turning on more logging in client.akka.loglevel
I see the following:
[WARN] [09/21/2016 10:53:55.915] [glint-client-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:50080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1] Association with remote system [akka.tcp://[email protected]:63032] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$f] to Actor[akka://glint-client/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$j] to Actor[akka://glint-client/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.923] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [09/21/2016 10:53:55.925] [glint-client-akka.remote.default-remote-dispatcher-13] [akka.tcp://[email protected]:50080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2] Association with remote system [akka.tcp://[email protected]:63204] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[INFO] [09/21/2016 10:53:55.926] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.927] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$k] to Actor[akka://glint-client/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.928] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2/endpointWriter] Message [akka.remote.EndpointWriter$BackoffTimer$] from Actor[akka://glint-client/deadLetters] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2/endpointWriter#-1245711256] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.932] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter] Message [akka.remote.EndpointWriter$BackoffTimer$] from Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:56.292] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:56.292] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
16/09/21 10:54:01 ERROR Executor: Exception in task 1.0 in stage 28.0 (TID 59)
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
Any idea on cause? I would expect the vector array size to be around 10.8mb. I've set the max frame and send/rec buffer sizes to 32mb and launched 1x Master and 2x Servers with this config as well as my client app with same config (based on the one you provided earlier).
Strangely, I'm not seeing exceptions like in #48
Currently the code has several hard-coded parameters and timeouts that work well but should still be configurable. The main objective is to unify all parameters into a single default configuration.
Some examples:
Hey Rolf!
I am struggling to setup glint on a standalone spark cluster. i am following your tutorial
with one master and one server. (and i use the standard conf from your repo)
I can manipulate the bigVector fine on the master node / but my futures time out if I try a
.push() within and RDD foreach statement?
so the client seems to be running fine - but i cannot be accessed from the worker nodes.
thanks
christian
16/07/18 17:26:19 ERROR yarn.ApplicationMaster: User class threw exception: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:13370/), Path(/user/master)] akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:13370/), Path(/user/master)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:557) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:411) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I have set a fixed IP in config file, but actor seems to seek in localhost
The Problem which is used parameter server, the weight vector/matrix is huge and sparse.
Need Save Operation to store weight into HDFS.
It is not immediately obvious how to use the implicit execution context and timeout together with Spark. In particular, the following piece of code will not run:
implicit val ec = ExecutionContext.Implicits.global
implicit val timeout = new Timeout(30 seconds)
rdd.foreach {
case value => vector.push(Array(0L), Array(value))
}
Spark attempts to serialize both the execution context and timeout which causes errors because these objects are never meant to be serialized. Instead, one would have to write something like this:
rdd.foreach {
case value =>
implicit val ec = ExecutionContext.Implicits.global
implicit val timeout = new Timeout(30 seconds)
vector.push(Array(0L), Array(value))
}
To make this easier, it might be a good idea to either remove the implicits and make these configurable defaults or to add new methods that don't have the implicits as a requirement.
Currently data is send as an Array of keys/values in a basic Akka message. For best performance it is worthwhile to investigate low level Akka TCP streams.
To make it easier to use, we should publish the package to a centralized repository such as Sonatype.
We need to add instructions on how to run the system on a cluster.
BigMatrix
supports pulling a set of row indices, i.e. matrix.pull(rows: Array[Long])
. For push, only matrix.push(rows: Array[Long], cols: Array[Int], values: Array[V])
is supported.
For common use cases of pull/push a set of rows, this effectively doubles the set of indices that must be sent (2 entries per value). Though I'm not sure if it makes that much difference on performance (it may add a bit to the comm cost of large pushes), for ease of use it would be useful to be able to push a set of rows in the same manner as pull.
In order to facilitate an easier setup for localhost testing it would be nice to spawn a glint subsystem from within an application that uses the framework. E.g.:
val system = Client.localSystem( nrOfParameterServers: Int, [optional configuration] )
// do work
system.shutdown()
It should then run a master node and the specified number of parameter server nodes. We could run all of them within the same JVM process, although the overhead might be a bit much.
This will make it easier to run localhost tests without having to start a master and server in separate terminal windows. The current automated tests do something similar already.
Hi @rjagerman, this project looks very interesting and I'd like to explore it a bit more. You mention Spark integration as a goal, has there been work done on that? What about example algorithms using this parameter server?
Currently Akka configuration is supplied as Strings in the code with the appropriate string replacements built in. There is probably a better way to do this.
Hi, all,
It is so exciting to see this project!
Recently I have been working on huge dimension machine learning, and I found Parameter Server is indispensable.
As simply viewing parts of the codes, I wonder why the api "matrix.push" or "vector.push" adding rather than replacing the given value to the corresponding element on the ps server?
Generally, users may have several "update-like" demands, like replace, accumulate or other UDF functions to refresh the parameter values on the ps server, what should I do to satisfy these demands or you guys have some relative plans?
Thanks so much!
Instead of sending a push/pull request every time, it would be nice to have some automatic mechanism to batch multiple requests together and then send it.
Implement basic get and set functionality on the client-side of the parameter server.
This also requires some consideration for integration with Spark. The client interface should be serializable so Spark tasks can use it.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.