haskell-distributed / distributed-process Goto Github PK
View Code? Open in Web Editor NEWCloud Haskell core libraries
Home Page: http://haskell-distributed.github.io
Cloud Haskell core libraries
Home Page: http://haskell-distributed.github.io
At the moment it is not possible to wait for a message from the main channel or from a typed channel. It might be useful to add a function
expectChan :: Serializable a => Process (ReceivePort a)
which creates a ReceivePort
for messages of a specific type sent to the main channel. This ReceivePort
can then be merged with other ReceivePort
s as usual.
Write a script-driven Network.Transport
implementation with which we can test for specific network failures, and then use QuickCheck to generate scripts (similar to how ByteString
is tested).
With the standard Cloud Haskell primitives it is impossible to write processes such as a proxy; for instance, we cannot write something like
proxy :: Process ()
proxy = forever $ do
msg <- expect
send someOtherProcess msg
which forwards messages of any type. The most recent version of Cloud Haskell supports matchAny
, with which the above process can be written as
proxy :: Process ()
proxy = forever $ do
msg <- receiveWait [ matchAny return ]
forward someOtherProcess msg
but we still cannot write something like
proxy :: Process ()
proxy = forever $ do
(destination, msg) <- expect
send destination msg
For this we would need an alternative Binary
encoding (maybe even something like protocol buffers?) which would allow us to decode a message into a pair of messages without knowing the types of the pair components, i.e., something like
decodePair :: ByteString -> (ByteString, ByteString)
"bind: failed (Cannot assign requested address (WSAEADDRNOTAVAIL))
I am not able to install distributed-process alongside other packages (e.g., snap).
It seems this is due to version conflicts with random and time.
cabal install distributed-process
Resolving dependencies...
In order, the following would be installed:
random-1.0.1.1 (reinstall) changes: time-1.4 -> 1.2.0.5
distributed-process-0.2.1 (reinstall) changes: random-1.0.1.1 added
when I put '--force-reinst', then others appear broken (red with ghc-pkg list),
when I force-reinstall those, then distributed-process is broken.
It seems that the current implementation doesnt like IPv4 when binding the server and it doesnt like IPv6 when connecting from a client.
Using distributed-process v0.3.1 installed with cabal install.
Workstation :: ~DEV » runhaskell Server.hs 127.0.0.1 12493
Bind to 127.0.0.1:12493
Server.hs: bind: unsupported operation (Can't assign requested address)
Workstation :: ~DEV » runhaskell Server.hs ::ffff:127.0.0.1 12493
Bind to ::ffff:127.0.0.1:12493
Echo server started at ::ffff:127.0.0.1:12493:0
Workstation :: ~DEV » runhaskell Client.hs ::ffff:127.0.0.1 11111 127.0.0.1:12493:0
ConnectionOpened 1024 ReliableOrdered 127.0.0.1:12493:0
Received 1024 ["Hello world"]
ConnectionClosed 1024
Workstation :: ~DEV » runhaskell Client.hs ::ffff:127.0.0.1 11111 ::ffff:127.0.0.1:12493:0
Client.hs: Error connecting: TransportError ConnectFailed "user error (Could not parse)"
so that startSlave and startMaster are truly optional.
It'd be great to have a go at implementing OTP style behaviours (a la gen_server, supervisor, etc) now that distribute-process is stable (looking). I'm quite happy to contribute, but there are no contribution guidelines so I'm not sure if I should fork and add a distributed-process-framework sub package to the repository or do this as a separate project or what. Any pointers would be much appreciated.
There should be a something like receiveTimeout/expectTimeout for channels.
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
I understand that with every library, you can only do so much
with the API doc - at some point users just have to read
the underlying paper to get the fundamental abstractions right, and there's no shortcut.
But - here for me the problem is not so much the abstraction,
but rather the technicalities of the work-arounds for not having "static".
With technicalities, examples are quite helpful.
You have them distributed over the API docs, that's fine,
but a minimal complete example would be a welcome addition.
I tried for the better part of an hour until arriving at the following. Is it idiomatic?
{-# language TemplateHaskell #-}
import System.Environment (getArgs)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node (initRemoteTable)
import Control.Distributed.Process.Backend.SimpleLocalnet
compute :: Integer -> Process Integer
compute n = do
liftIO $ putStrLn "the slave does a computation"
return $ n + 1
$(remotable ['compute])
master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
liftIO . putStrLn $ "Slaves: " ++ show slaves
case slaves of
[] -> liftIO $ putStrLn "no slaves"
s : _ -> do
out <- call $(functionSDict 'compute) s
$ $(mkClosure 'compute) ( 10 ::Integer)
liftIO $ putStrLn $ show out
terminateAllSlaves backend
main :: IO ()
main = do
args <- getArgs
let rtable :: RemoteTable
rtable = Main.__remoteTable $ initRemoteTable
case args of
["master", host, port] -> do
backend <- initializeBackend host port rtable
startMaster backend (master backend)
["slave", host, port] -> do
backend <- initializeBackend host port rtable
startSlave backend
Suppose A and B are connected, but the connection breaks. When A realizes this immediately and sends a new (heavyweight) connection request to B, then it /might/ happen that B has not yet realized that the current connection has broken and will therefore reject the incoming request from A as invalid.
This is low priority because
In particular, various closeXYZ
operations are not implemented.
The underlying motivation is pub-sub and Erlang process groups seems like a good starting point. Is there support for process groups in distributed-process or are there plans to support it in the future?
When one CH process A monitors another B, it expects to be notified if the connection between them breaks, even when A never sends anything to B (but only receives messages from B). This means that it is not enough to rely on send
to detect network problems. This can be solved at the CH level or at the NT level.
The static label used for mkStatic is ModuleName.function_name, but the ModuleName is not used for the source and target dictionaries.
The following code in Network.Transport.Util
is unsafe with regard to asynchronous exceptions. If an asynchronous exception is thrown to the forked thread before the endpoint address is put to the addr
MVar then the final takeMVar
will dead-lock:
spawn :: Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn transport proc = doo
addr <- newEmptyMVar
forkIO $ do
Right endpoint <- newEndPoint transport
putMVar addr (address endpoint)
proc endpoint
takeMVar addr
One way to solve this is using a combination of mask
and try
. However a way more simple implementation is:
spawn :: Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn transport proc = do
Right endpoint <- newEndPoint transport
forkIO $ proc endpoint
return $ address endpoint
Since the original code has to wait for the completion for newEndPoint
anyway we could just as well execute it in the main thread and only execute proc endpoint
in a separate thread. No need for an MVar
so no posibility of dead-lock.
However since this code is so simple I wonder if there's actually a need for this combinator. Is it used anywhere? If not, I propose to remove it.
Perhaps this is too much to hope for, but it would be ideal for packages such as meta-par, HDpH and distributed-process to be able to share basic RPC functionality (Closure/Static).
With meta-par we copied and hacked a version of the original Cloud Haskell ("Remote") Closure. The ugly bit was that it had hard-coded recognition of the IO and ProcessM monads, and we had to tweak that to include monad-par's "Par" monad.
I don't understand the current Closure implementation, but it looks like it may be the case that CP.hs (the part that deals with Process values) is pretty well isolated from the rest. Does that mean that everything but CP.hs could become its own package?
It looks like TH.hs also deals with Process presently. TH.hs would either need to be replicated in all consumers of the hypothetical factored library, or it would need to become more extensible. When we were using the Closure mechanism in monad-par/meta-par this was the sticking point -- we didn't see a way to do it without adding extra arguments to the compile-time TH functions (e.g. remotable), which would be very ugly.
Any good ideas here?
Consider a process such as
pingServer :: Process ()
pingServer = forever $ do
client <- expect
send client ()
This server "leaks" connections. Since CH guarantees ordering, we cannot close "time out" connections for instance. The programmer can fix this manually:
pingServer :: Process ()
pingServer = forever $ do
client <- expect
send client ()
reconnect client
(perhaps the name reconnect
is confusing?), but ideally we would take care of this at the CH level instead.
so that we have multiple independent Cloud Haskell applications running on the same network.
Hello,
I'm primarily interested in using Cloud Haskell for running computations on clusters that use job schedulers like Platform LSF and Oracle Grid Engine. The typical pattern is that I submit an array of N jobs to the scheduler, and the scheduler decides which machines to run them on, and at what time. As a user, some key features of this setup are:
Different processes in the job array are started at different times. This is typically because there are other users on the cluster, and the scheduler uses priorities and queues to determine what should be run when.
The number of processes running at any given time is almost always less than N. The simplest example of this is if I schedule 1000 jobs on a cluster with 100 machines, then obviously some of the jobs must be run in sequence. A more common example for me is that the cluster is busy, and my jobs get interleaved with jobs from other users, reducing the effective number of available machines.
I have no control over which machines processes are started on. There's also no way to know which machine a process will be started on before the process actually starts.
Individual processes may be killed or suspended at any time. This is most common when they happen to be running on a machine for which another user has higher priority (enough to kick me off).
I'm wondering what would be involved in writing a Cloud Haskell backend for this type of environment. I have written some ad-hoc programs to deal with this sort of thing before. An example situation is that I have a function f which is very expensive to compute, and I would like to farm different calls to this function out to different machines. The model I've used is:
This is all to cope with the (somewhat frustrating) fact that the "cloud" is dynamic, and many of its properties are only known at runtime. The number of available slaves can grow or shrink during the course of the computation. From what I've read, it looks like Cloud Haskell prefers to assume that the size and topology of the cloud is static. Is this necessary? Any recommendations on writing a backend for the environment above?
Erlang uses cookies to authenticate nodes; we could do something similar or something much more sophisticated. This should be done in individual backends, as security might vary widely from one setup to another. Ideally, security is handled entirely within the backend (within the network-transport
?) so that the core Cloud Haskell library is unaffected.
functionDict should be functionSDict?
This avoids the need for Control.Distributed.Process.Node in applications that use it.
I had some issues where cabal had installed more than one version of these packages and apps were compiled using a mix of versions. Wondering if having a meta package (no code) that includes the other ones will help managing this.
I had to do something like this to get out of the cabal mess:
ghc-pkg list | grep -e distributed -e network-transport | xargs -t -I pkg ghc-pkg unregister pkg --force
And then reinstall the packages.
Maybe the following is already a good way?
cabal install distributed-process-simplelocalnet
On line 451 of TestTransport.hs :
https://github.com/haskell-distributed/distributed-process/blob/edsko/network-transport-2/tests/TestTransport.hs#L451
If I add another line beneath this, so that two connections are made between an endpoint and its own address, then the 'testConnectToSelf' fails. e.g.
testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf transport numPings = do
done <- newEmptyMVar
Right endpoint <- newEndPoint transport
tlog "Creating self-connection"
Right conn <- connect endpoint (address endpoint) ReliableOrdered
Right dummyConn1 <- connect endpoint (address endpoint) ReliableOrdered
It is throwning an error for line 473 :
https://github.com/haskell-distributed/distributed-process/blob/edsko/network-transport-2/tests/TestTransport.hs#L473
where (Received cid' msg) is not correctly being pattern matched:
Running "ConnectToSelf": failed (exception: TestTransport.hs:473:46-99: Non-exhaustive patterns in lambda
So - having multiple connections from an endpoint to its address appears to be causing problems.
if I change the cabal constraints to
Build-Depends: base <4.7 && > 4.4,
ghc-prim >= 0.2 && < 0.4,
binary >= 0.5 && < 0.6
it seems to build fine, and then I can cabal install distributed-process without any compile time problems
(i'll start playing around and testing if it works as desired on ghc 7.6 shortly :) )
I don't know the technical implications of this, but I think functions like genericGet
and genericPut
would make things jolly convenient for tutorial writers.
In the course of adding a new named pipes transport, I noticed the following behavior of the TCP transport on demo0.
Ten messages are sent, one concatenated message is received, resulting in the output below.
Note: I'm testing this in rev 1611171 (not yet merged) but it should apply to rev 8c57693 as well.
MVAR Transport:
logServer rcvd: ["hello 1"]
logServer rcvd: ["hello 2"]
logServer rcvd: ["hello 3"]
logServer rcvd: ["hello 4"]
logServer rcvd: ["hello 5"]
logServer rcvd: ["hello 6"]
logServer rcvd: ["hello 7"]
logServer rcvd: ["hello 8"]
logServer rcvd: ["hello 9"]
logServer rcvd: ["hello 10"]
TCP Transport:
logServer rcvd: ["hello 1hello 2hello 3hello 4hello 5hello 6hello 7hello 8hello 9hello 10"]
Pipes Transport:
logServer rcvd: ["hello 1"]
logServer rcvd: ["hello 2"]
logServer rcvd: ["hello 3"]
logServer rcvd: ["hello 4"]
logServer rcvd: ["hello 5"]
logServer rcvd: ["hello 6"]
logServer rcvd: ["hello 7"]
logServer rcvd: ["hello 8"]
logServer rcvd: ["hello 9"]
logServer rcvd: ["hello 10"]
I'm having a crack at a zeromq transport and I'm having some trouble building the examples.
DemoTransport.hs
refers to Network.Transport.MVar
which doesn't seem to exist. When commenting DemoTransport from the make file (and get rid of the ".exe" suffixes) I get:
ghc -package-conf ../network-transport-zmq/cabal-dev/packages-7.4.2.conf -O2 -rtsopts -threaded --make DemoProcess.hs -o DemoProcess
[1 of 1] Compiling Main ( DemoProcess.hs, DemoProcess.o )
DemoProcess.hs:7:31:
Module `Network.Transport.TCP' does not export `mkTransport'
DemoProcess.hs:7:44:
Module `Network.Transport.TCP' does not export `TCPConfig(..)'
make: *** [DemoProcess] Error 1
Are the examples still the preferred method to play around with the library or should I be looking at other things now, like benchmarks and tests?
Cheers,
Ben
(sorry Edsko, here's another beginner's request)
From reading the API docs I guess that Control.Distributed.Process.call
is one basic method to "run a process remotely and wait for it to reply".
(I guess "to reply" is "execute 'return x' in the Process monad?)
The type of spawn contains both Static and Closure,
and when I track their API docs, I am just seeing data decls with hidden
constructors. What I'm missing is a note (right there) about how such objects
should be created - or how the construction can be avoided.
Since ByteString's can point to (larger) bytestrings, it is easy to create "bytestring memory leaks". We have already fixed some of these, but there may be others.
Is it possible to make distributed-process
depend on binary-0.6
rather than 0.5 ?
I wanted to use Cloud Haskell with this new database library (which requires binary-0.6)
http://hackage.haskell.org/package/CurryDB
This worked in the 'remote' Cloudhaskell package (and works on Erlang)
https://github.com/dysinger/distributed-process-broken/blob/master/Main.hs
Is there a chance we see a tutorial on task/promise layer as implemented in the prototype?
Or is it job of another package?
I would like to implement a simple tasks on a master connected to many worker nodes, like:
do promises <- mapM startComputation inputList
results <- mapM redeemPromise promises
And I have no idea where to start...
I just want to play around with distributed-process
(and later I want my students to do this).
So I find it very useful to have a working example in the docs.
http://hackage.haskell.org/packages/archive/distributed-process-simplelocalnet/0.2.0.1/doc/html/Control-Distributed-Process-Backend-SimpleLocalnet.html
I can compile this (I put -threaded -rtsopts just out of habit, but
is it necessary? If so, the doc should say.)
but for running,
I am missing a sentence that says how this program should be used.
I guess, start slave(s) and master - but on the same machine?
I do not understand what the second cmd line argument ("host") is used for.
When I start slave and master on one machine,
and use "localhost" for host, it does not seem to work, but it does with "127.0.0.1"
(so perhaps this is an issue with my resolver).
I do not understand whether this example is supposed to work with slaves/master
on different machines. If so, then again I do not see what "host" arguments to use.
The API doc of "findPeers" does not help:
"findPeers t sends out a who's there? request,..."
since it does not say to whom this message is sent.
Sure, these are very basic questions, but if the package is
going to be used widely,
then I guess they will come up over and over again.
If you think the answer would be too long to put into the API doc,
then put it elsewhere, and refer to it.
Thanks, Johannes.
Small CH messages have a large overhead. For instance, consider sending (Nothing :: Maybe ProcessId)
. The binary encoding of Nothing
is one byte, but the total message sent looks like
<channel ID> 4 bytes
<message length> 4 bytes
<message type fingerprint> 16 bytes
<payload> 1 byte
For an overhead of 2400% :) Obvious ways to reduce the overhead are
Messages sent by Cloud Haskell processes are always sent whole, independent of their size. This means that when a process sends a very large message, it can hog the network connection (moreover, Network.Socket.Bytestring
can't handle large messages).
See comment at haskell/network#58 (comment) -- number of chunks passed to sendMany should be short (<= 16).
The bug is found in DemoTransport.hs, which will hang when trying to "killThread"
(Pipes backend only, demo3 and demo4). Yet this hang happens ONLY under GHC
7.0.4 and 7.2.1. Under GHC 6.12.3 and GHC 7.4.1 it works fine!
At first I thought this may be an issue with non-allocating threads not
being preempted by the runtime system (and therefore not servicing the
ThreadKilled asynchronous exception). But it's hard to explain that
pattern of outcomes on different GHC versions.
See commit: 48e257a
which closes the entire "bundle" of (outgoing and incoming) connections to another endpoint. Basically, "disconnect completely from this other endpoint" (a "heavyweight disconnect").
Once this is implemented we can resolve a TODO in Control.Distributed.Process.Node
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.