Code Monkey home page Code Monkey logo

glint's Introduction

glint's People

Contributors

rjagerman avatar wuciawe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

glint's Issues

BigMatrix push and pull is high network consumption

In parameter server scenario, I think network delay is always an important issue. However, each BigMatrix push and pull demand a tuple of size 3. But, row indices and col indices could be merge into one indices. It' will reduce about 1/3 network consumption. Also, the row index must be Long is also a limit. (Not always Long is needed)

Struggling with data transfer / actor disassociated

@rjagerman when doing some localhost testing I've run into timeout issues during push/pull of a vector.

When the vectors are relatively small (like ~100k elements) everything seems to work fine. When I've tried larger feature dimensions (1,355,191 elements for news20.binary libsvm dataset) things start timing out.

At the beginning of an iteration in rdd.foreachPartition I do:

val values = Await.result(vector.pull(keys), 5 seconds)
// compute gradient and update
...
vector.push(keys, update.data)

After turning on more logging in client.akka.loglevel I see the following:

[WARN] [09/21/2016 10:53:55.915] [glint-client-akka.remote.default-remote-dispatcher-5] [akka.tcp://[email protected]:50080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1] Association with remote system [akka.tcp://[email protected]:63032] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$f] to Actor[akka://glint-client/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$j] to Actor[akka://glint-client/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.921] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter] Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.923] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [09/21/2016 10:53:55.925] [glint-client-akka.remote.default-remote-dispatcher-13] [akka.tcp://[email protected]:50080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2] Association with remote system [akka.tcp://[email protected]:63204] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
[INFO] [09/21/2016 10:53:55.926] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.927] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/deadLetters] Message [glint.messages.server.request.PullVector] from Actor[akka://glint-client/temp/$k] to Actor[akka://glint-client/deadLetters] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.928] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2/endpointWriter] Message [akka.remote.EndpointWriter$BackoffTimer$] from Actor[akka://glint-client/deadLetters] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63204-2/endpointWriter#-1245711256] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:55.932] [glint-client-akka.actor.default-dispatcher-2] [akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter] Message [akka.remote.EndpointWriter$BackoffTimer$] from Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] to Actor[akka://glint-client/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fglint-server%40192.168.100.122%3A63032-1/endpointWriter#1446622162] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:56.292] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [09/21/2016 10:53:56.292] [glint-client-akka.actor.default-dispatcher-3] [akka://glint-client/deadLetters] Message [akka.remote.RemoteWatcher$Heartbeat$] from Actor[akka://glint-client/system/remote-watcher#-1279985067] to Actor[akka://glint-client/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
16/09/21 10:54:01 ERROR Executor: Exception in task 1.0 in stage 28.0 (TID 59)
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]

Any idea on cause? I would expect the vector array size to be around 10.8mb. I've set the max frame and send/rec buffer sizes to 32mb and launched 1x Master and 2x Servers with this config as well as my client app with same config (based on the one you provided earlier).

Strangely, I'm not seeing exceptions like in #48

Implement consistent key hashing for (key,value) distribution over parameter servers

We wish to distribute (key, value) pairs over parameter servers. This can be done consistently via key hashing. To get started assume we have n parameter servers labeled 1 ... n. This information can be stored in a context-like object which is distributed to all workers. We can then hash a key and compute the nth modulo of the output. This gives us the appropriate server storing that key. For a large set of keys we will get a roughly uniform distribution for a proper hash function.

BigMatrix should support push(rows)

BigMatrix supports pulling a set of row indices, i.e. matrix.pull(rows: Array[Long]). For push, only matrix.push(rows: Array[Long], cols: Array[Int], values: Array[V]) is supported.

For common use cases of pull/push a set of rows, this effectively doubles the set of indices that must be sent (2 entries per value). Though I'm not sure if it makes that much difference on performance (it may add a bit to the comm cost of large pushes), for ease of use it would be useful to be able to push a set of rows in the same manner as pull.

Client-side get and set functionality

Implement basic get and set functionality on the client-side of the parameter server.

This also requires some consideration for integration with Spark. The client interface should be serializable so Spark tasks can use it.

Documentation: Library API

We need to set up a documentation (e.g. on github's free pages) that provides good descriptions on all methods and classes that the library offers.

Actor Not Found

I start a master service, and config the ip and port in a config file. Then both server and client setup by parsing the config file. Although the servers setup successfully, client failed with ActorNotFound exception.
ActorSelection[Anchor(akka.tcp://glint-master@*.*.*.*:13370/), Path(/user/master)]

why make "push" as an “accumulator” rather than “replacer”?

Hi, all,
It is so exciting to see this project!

Recently I have been working on huge dimension machine learning, and I found Parameter Server is indispensable.

As simply viewing parts of the codes, I wonder why the api "matrix.push" or "vector.push" adding rather than replacing the given value to the corresponding element on the ps server?

Generally, users may have several "update-like" demands, like replace, accumulate or other UDF functions to refresh the parameter values on the ps server, what should I do to satisfy these demands or you guys have some relative plans?

Thanks so much!

A question about glint

Hello Rolf!
I have had a look about your code, what troubles me is how glint is interface with spark,i even don't see
a line code related to spark.
Best Wishes!
Codelife

Got runtime issues when using spark-shell

I compiled Glint using Scala 2.10.6 and successfully got the jars.

Then I try to run the example in spark.md, however, I got this error when initializing the client.

@transient val client = Client()

java.lang.NoSuchMethodError: com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
at glint.Client$.start(Client.scala:294)
at glint.Client$.apply(Client.scala:270)
at glint.Client$.apply(Client.scala:258)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:35)
at $iwC$$iwC$$iwC$$iwC.(:37)
at $iwC$$iwC$$iwC.(:39)
at $iwC$$iwC.(:41)
at $iwC.(:43)
at (:45)
at .(:49)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
at org.apache.spark.repl.Main$.main(Main.scala:35)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Cannot find master

16/07/18 17:26:19 ERROR yarn.ApplicationMaster: User class threw exception: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:13370/), Path(/user/master)] akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:13370/), Path(/user/master)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:533) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:569) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:559) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:557) at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:411) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I have set a fixed IP in config file, but actor seems to seek in localhost

Spark integration, and examples

Hi @rjagerman, this project looks very interesting and I'd like to explore it a bit more. You mention Spark integration as a goal, has there been work done on that? What about example algorithms using this parameter server?

Make localhost testing easier

In order to facilitate an easier setup for localhost testing it would be nice to spawn a glint subsystem from within an application that uses the framework. E.g.:

val system = Client.localSystem( nrOfParameterServers: Int,  [optional configuration] )
// do work
system.shutdown()

It should then run a master node and the specified number of parameter server nodes. We could run all of them within the same JVM process, although the overhead might be a bit much.

This will make it easier to run localhost tests without having to start a master and server in separate terminal windows. The current automated tests do something similar already.

Random init

Would there be interest in some random initialisation mechanism for vectors / matrices? Often it is ok to init with 0s (the implicit case with new Glint data structures). But sometimes you want to have random normal or random uniform init.

It's possible to do it yourself via a forEachPartition and using a push, but it is a bit of a hack really. It seems much simpler & cleaner to provide an interface for this, like client.random.vector.randn[Double](dim, mean = 0, std = 1.0).

What do you think?

Akka Actor Error when initializing Glint Client

I am trying this implementation of Glint with Apache Flink. I am rather new to Akka and can not figure out the following error message:

java.lang.NoSuchMethodError: akka.pattern.AskableActorRef$.$qmark$default$3$extension(Lakka/actor/ActorRef;Ljava/lang/Object;)Lakka/actor/ActorRef; at glint.Client.<init>(Client.scala:40) at glint.Client$$anonfun$start$1.apply(Client.scala:302) at glint.Client$$anonfun$start$1.apply(Client.scala:300) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This message appears simply when I try to create a client ( i.e val gc = Client() )
Is it possible that this message is caused because Apache Flink uses a different version of Akka that Glint and somehow Glint is using the Akka version that Apache Flink has imported? If so, how can I ensure that the right version of Akka is called for Glint and Apache Flink respectively?

Thanks

Can not create Glint Client in Apache Spark

I am following the instructions on the Glint Spark Integration page. I have a Glint "master" and a "server" running on localhost. I am trying to create a default Glint client in Spark shell and am getting the following error:
java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; at akka.actor.ActorCell$.<init>(ActorCell.scala:336) at akka.actor.ActorCell$.<clinit>(ActorCell.scala) at akka.actor.RootActorPath.$div(ActorPath.scala:185) at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:465) at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:124) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:192) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:231) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578) at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) at glint.Client$.start(Client.scala:289) at glint.Client$.apply(Client.scala:270) at glint.Client$.apply(Client.scala:258) ... 48 elided
Although in a simple sbt console, I am able to create a Glint client

PullFailedException in large dataset

glint.exceptions.PullFailedException: Failed 10 out of 10 attempts to push data at glint.models.client.async.PullFSM.glint$models$client$async$PullFSM$$execute(PullFSM.scala:67) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:79) at glint.models.client.async.PullFSM$$anonfun$request$1.applyOrElse(PullFSM.scala:76) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Glint works well in my toy dataset, but fails with exception in big dataset.

Unify parameters into single configuration

Currently the code has several hard-coded parameters and timeouts that work well but should still be configurable. The main objective is to unify all parameters into a single default configuration.

Some examples:

Better integration of akka configuration

Currently Akka configuration is supplied as Strings in the code with the appropriate string replacements built in. There is probably a better way to do this.

Easier serialization for Spark

It is not immediately obvious how to use the implicit execution context and timeout together with Spark. In particular, the following piece of code will not run:

implicit val ec = ExecutionContext.Implicits.global
implicit val timeout = new Timeout(30 seconds)
rdd.foreach {
    case value => vector.push(Array(0L), Array(value))
}

Spark attempts to serialize both the execution context and timeout which causes errors because these objects are never meant to be serialized. Instead, one would have to write something like this:

rdd.foreach {
    case value =>
      implicit val ec = ExecutionContext.Implicits.global
      implicit val timeout = new Timeout(30 seconds)
      vector.push(Array(0L), Array(value))
}

To make this easier, it might be a good idea to either remove the implicits and make these configurable defaults or to add new methods that don't have the implicits as a requirement.

Add options for different types of aggregation

By default the parameter server aggregates new values by addition. This is ideal for some algorithms, but not for others. I've made some changes to make aggregation more flexible in commit 5485c6c. Right now it supports aggregation through:

  • Addition (adds the existing value and pushed value together, this is still the default option)
  • Maximum (takes the largest of the existing value and pushed value)
  • Minimum (takes the smallest of the existing value and pushed value)
  • Replacement (always replaces existing value with pushed value)

Automatic performance benchmarks

We should separate out the performance benchmark tests and run them automatically CI. Additionally, more performance benchmark tests should be written to properly test the performance of all the different components. This should make it easy to see when future code changes impact the performance in a significant way.

cluster conf example

Hey Rolf!

I am struggling to setup glint on a standalone spark cluster. i am following your tutorial
with one master and one server. (and i use the standard conf from your repo)

I can manipulate the bigVector fine on the master node / but my futures time out if I try a
.push() within and RDD foreach statement?

so the client seems to be running fine - but i cannot be accessed from the worker nodes.

thanks
christian

Start ParameterServer threads on workers

The parameter server will need to be started on certain machines in a cluster. The first step is to get an Akka listener working on machines and provide some form of stand-alone and easy deployment.

Can glint support BigInt type?

I have been playing around with glint for a few weeks and am excited about it's potential. However the counts I want to eventually push to the parameter server are larger than the max value of type Long. I see there isn't any support for BigInts. Wondering if there is any reason for this?

Not able to pull matrix slice with rows != cols

It seems one can only pull (and push) pieces of a Matrix that have the same number of rows & columns:

e.g. this works:

val matrix = client.matrix[Double](10, 10)
val result = matrix.pull(Array(0L, 1L), Array(0, 1))

But this doesn't:

val matrix = client.matrix[Double](10, 10)
val result = matrix.pull(Array(0L, 1L), Array(0))

with

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
    at scala.collection.mutable.WrappedArray$ofInt.apply$mcII$sp(WrappedArray.scala:155)
    at scala.collection.mutable.WrappedArray$ofInt.apply(WrappedArray.scala:155)
    at scala.collection.mutable.WrappedArray$ofInt.apply(WrappedArray.scala:152)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at glint.models.client.async.AsyncBigMatrix$$anonfun$6.apply(AsyncBigMatrix.scala:101)
    at glint.models.client.async.AsyncBigMatrix$$anonfun$6.apply(AsyncBigMatrix.scala:99)
    at glint.models.client.async.AsyncBigMatrix$$anonfun$glint$models$client$async$AsyncBigMatrix$$mapPartitions$2.apply(AsyncBigMatrix.scala:169)
    at glint.models.client.async.AsyncBigMatrix$$anonfun$glint$models$client$async$AsyncBigMatrix$$mapPartitions$2.apply(AsyncBigMatrix.scala:169)

Is this intended to be supported? Actually I was trying to create a 1-column matrix to use in place of vector, so that I could try to use GranularMatrix to see if I could scale feature space. So I need to do matrix.push(keys, Array(0)) for example (so only the first, and only, column of the matrix, but a slice of rows)

How to try it out in cluster env like YARN

Hi Rolf,
I saw your comment in ticket: SPARK-6932. I'm excited and very interested in this project. I seriously believe Parameter Server is an essential piece to further scale ML algorithms into super high dimensional space. Do you have any examples of how to try this project out in YARN?

Many Thanks!

Yitong

Rework of Glint internals

I've been unexpectedly busy these last summer months, so I unfortunately did not find much time to work on Glint. There has been some interesting discussion from users in issues #54, #49 and #28, thank you for your feedback! Starting from November I can free up time in my regular schedule to dedicate to the project.

To address some of the issues that other contributors and users have made, I plan to make some pretty big changes to Glint's internal workings. I'm opening it up for discussion here, so feel free to speak your mind. These are just some of the ideas I have and I love to hear your feedback, concerns and comments on these.

1) Integration with Spark
I have been working on a very simple proof of concept to simplify integration with Spark. In this concept you can include Glint as a library in your application by adding a line to your build.sbt file and then call glint.run(sparkcontext) somewhere in your code to start the entire Glint system (master and parameter servers) on your existing Spark cluster. This has several big advantages:

  1. It makes Glint much easier to use. You don't have to run it as separate java processes.
  2. It enables us to very easily ship user-defined functions to the parameter server because the compiled jar gets distributed by Spark for us. This enables custom processing at the parameter servers to simplify things like L1 regularization.

The major drawback is that we would no longer be able to run Glint as a stand-alone service. This change will make Glint dependent on Spark to ship functions across machines. As a result we can not run it outside of Spark. Nevertheless, I feel the benefits definitely outweigh the disadvantages.

2) Setting up benchmarks on simple machine-learning tasks
To measure the impact of future changes, we should have several standard machine-learning tasks on various data sizes (Linear SVM, Logistic Regression, etc.). Perhaps even very simple tasks such as an all-reduce. It is important to establish a baseline here, so that progress can be accurately measured.

3) Breeze dense & sparse vectors/matrices instead of Arrays
Currently all model data is stored as dense Arrays to prevent garbage collection overhead from being extreme. However, sparse models have significant advantages in terms of scalability. I propose a re-work of the internals of the PartialMatrix and PartialVector actors to utilize breeze's dense and sparse classes under the hood. We can return breeze structures as the result of pull requests, they should be easier to use than Arrays for machine learning applications. To manage possible garbage collection problems, we could eventually look into things like scala-offheap.

4) Fault tolerance
If a parameter server goes down, you lose that part of the model. Depending on your algorithm this may range from "we can recover" to "very very bad news". Other implementations of parameter servers boast a chord-like DHT structure that replicates the model across multiple machines to mitigate system failures (see figure 8 of Li et al (2014)). However, this is likely to give us significant performance degradation, because push requests need to go through all replicated models on other machines. We can of course vary the replication rate to balance fault-tolerance against performance. This will probably take the longest as it is quite complicated to implement. Nevertheless, it seems like an important thing to do to bring Glint to a larger audience.

5) Documentation
The documentation is in a bit of an awkward place right now. Several parts are missing, and other parts need to be clarified or rewritten. With the earlier proposed changes, the documentation can change quite significantly, so I don't want to waste a lot of time writing documentation that will be obsolete soon. If anyone has a good suggestion for a documentation framework in which we can easily publish to gh-pages, I'd love to hear it. I'm currently toying around in Sphinx and it works decently well, however, suggestions are welcome.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.