Code Monkey home page Code Monkey logo

manifold's People

Contributors

aengelberg avatar alexander-yakushev avatar andreasthoelke avatar arnaudgeiser avatar biiwide avatar bts avatar cosineblast avatar dm3 avatar kachayev avatar kingmob avatar led avatar ljie-pi avatar lvh avatar malcolmsparks avatar matthiaslange avatar maxcountryman avatar meggermo avatar olegthecat avatar pawelstroinski avatar phillipvh avatar pyr avatar rborer avatar robertluo avatar slipset avatar sonntag avatar stig avatar tanzoniteblack avatar tyano avatar valerauko avatar ztellman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

manifold's Issues

Question: How to extract exception error from deferred?

Using (http/websocket-connection req) I might get an error (in the form of an exception). How do I get it out so I can, say, print out what it was?

(let [stream @(http/websocket-connection)]
  (if (d/deferred? stream)
    (println (.getMessage (d/error-value stream (Exception. "Unknown."))))
    (do-something-with stream)))

That seems to work, but feels like I'm missing the point.

If I (.getMessage @stream), something locks up. Somehow a manifold.deferred.ErrorDeferred is not really a deferred? I don't know.

Also error-value is not in the documentation (which is marked as 0.1.1-SNAPSHOT).

I should probably be using d/loop and d/recur and d/catch and d/chain for processing messages, right?

Fake clocks for deterministic testing

We have some code that wants to do something periodically. The original version used core.async + timeout, which, while a lot of code, allowed re-use of some code I wrote in icecap that gives me a fake clock, a la twisted.internet.task.Clock. I'll take it as given that having a deterministic clock like this is desirable; I'd be happy to elaborate if that's a point of contention ๐Ÿ˜„

That piece of core.async code predictably became a lot smaller when using manifold instead, but of course manifold happily ignored my pleas to rebind timeout. I'm trying to figure out what the best way to do this in Manifold would be. I think the answer is to implement a fake scheduler. https://github.com/ztellman/manifold/blob/master/src/manifold/time.clj#L175 constrains this to be a ScheduledThreadPoolExecutor, which is fine. Alas, the let binding means I don't know how to override that. I'm not entirely sure why that doesn't use (manifold.executor/executor). I think that's a thread local and not a dynamic variable because of performance, but I'm not sure.

Thanks in advance!

cc @derwolfe

Unexpected 'map' behavior

When I try to run the trivial code

(require '[manifold.stream :as s])

(def a (s/stream))
(def b (s/stream))

(s/map (fn [x y] [x y]) a b)

I receive the following error:

IllegalArgumentException cannot convert manifold.stream$map$fn__4180 to source  manifold.stream/->source (stream.clj:202) 

Either I have a fundamental misunderstanding of the function, or this result is a bug :)

should manifold.time/every and manifold.stream/periodically keep a process running?

I am relatively new to using manifold, but am accustomed to writing Twisted. My expectation of every and periodically is that, once called, either could keep a process running indefinitely.

Unless I am misunderstanding how every and periodically function, it appears that they only keep running if another function is keeping the process alive. For example, my expectation of the code in jedesmal, is that "I work" would be printed every second; until the application is terminated. In reality, it prints once, then exits.

In short - my expectations have not matched what the library does :-). Does this seem like a valuable feature to you? Or is there another way that you believe this should be dealt with outside of manifold?

Thank you!

AssertionError not catched by manifold.deferred/catch

(require '[manifold.deferred :as d])

(let [dd (d/deferred)]
  (-> dd
      (d/chain #(do (assert (string? %) "invalid") %) prn)
      (d/catch #(str "something unexpected:" (.getMessage %))))
  (d/success! dd nil))

Executing the above I expect one of the two outcomes:

  1. prn is called with the string value put onto the deferred
  2. the catch handler prints some exception

What happens instead is that true is returned, prn is not being called, no exception is printed.

I tried looking into the catch/catch' code but nothing stood out and AssertionError is extending Throwable so not sure what's going on there.

:scheme type inconsistency?

When making requests, the request map requires the scheme to be a str; otherwise https://github.com/ztellman/aleph/blob/0.4.0/src/aleph/http/client.clj#L65 fails. When receiving a request, the request map passed to the handler has :scheme as a keyword:

2015-Feb-20 15:29:13 -0800 zygalski.local DEBUG [shimmer.core] - incoming-request {:remote-addr 0:0:0:0:0:0:0:1, :headers {host localhost:7467, user-agent curl/7.37.1, accept */*}, :server-port 7467, :keep-alive? true, :uri /, :server-name localhost, :query-string nil, :body nil, :scheme :http, :request-method :get}

I'm not sure if this is intentional, or an unintentional inconsistency.

map is causing the stream to hang

For some reason this hangs (not the use of s/map):

> (->> [1 1 1 2 2 3] 
       s/->source
       (s/partition-by identity)
       (s/map s/stream->seq)
       (s/stream->seq)
       (println))

I would it expect it to print:

((1 1 1) (2 2 2) (3))

But using s/mapcat seems to be ok:

> (->> [1 1 1 2 2 3] 
       s/->source
       (s/partition-by identity)
       (s/mapcat s/stream->seq)
       (s/stream->seq)
       (println))

(1 1 1 2 2 3)
nil

d/future versus clojure.core/future

If we create a d/chain, containing a single d/future, the semantics of d/chain will mean that a deref on the chain will provide the deref of its 'links'. So for a single deferred link, a deref on the chain causes a deref on the link.

(deref (d/chain (d/future (Thread/sleep 1) :ok))) 
=> :ok

But if I use a clojure.core future instead, I get a pending future back when I deref the chain.

(deref (d/chain (future (Thread/sleep 1) :ok))) 
=> #<core$future_call$reify__6666@6c002f51: :pending>

I didn't expect this result, but perhaps there's a good reason for it

(I'm on beta5)

error values sometimes unexpectedly becoming success values

it seems manifold.deferred/catch can transform error values into success values (rather than returning the value of the error handler) in some circumstances. perhaps if the erroring deferred is already realized when given to the catch (though i haven't taken any steps to validate this hypothesis)

;;  [manifold "0.1.1-alpha4"]
(require '[manifold.deferred :as d])

(->
 (d/chain (d/success-deferred :boo)
          (fn [v] (throw (ex-info "boo" {})))
          (fn [v] (prn "hi")))
 (d/catch (fn [e]
            (prn "error")
            :caught))
 deref)

;; as expected - prints "error", returns :caught

(->
 (d/chain (d/error-deferred :boo)
          (fn [v] (prn "hi")))
 deref)

;; as expected - explodes

(->
 (d/chain (d/error-deferred :boo)
          (fn [v] (prn "hi")))
 (d/catch (fn [e]
            (prn "error")
            :caught))
 deref)

;; whoa - doesn't print error, returns :boo

(->
 (d/chain (d/error-deferred :boo)
          (fn [v] (prn "hi")))
 (d/catch (fn [e]
            (prn "error")
            :caught))
 (d/chain (fn [v]
            v))
 deref)

;; returns :boo, so the error value has unexpectedly become the success value

Should core.async channels be deferrable?

Just wondering why manifold.deferred contains this logic :-

(utils/when-core-async
  (extend-protocol Deferrable

    clojure.core.async.impl.channels.ManyToManyChannel
    (to-deferred [ch]
      (let [d (deferred)]
        (a/take! ch
          (fn [msg]
            (if (instance? Throwable msg)
              (error! d msg)
              (success! d msg))))
        d))))

I find I'm having to wrap a core.async channel in a record in order for it to return one in a d/chain function. That's OK, but I just wondered what the rationale of making a core.async channel satisfy Deferrable. Is there some interaction between d/chain and core.async channels here?

NPE in graph when websocket stream disconnects

Using Aleph 0.4.1-alpha1 and Manifold 0.1.1-alpha2.

Here is the code that connects a socket stream to a bus subscription stream:

  (s/connect
    (s/filter
      (partial
        room-accept-event?
        idempotent-filter)
      (bus/subscribe
        chatrooms room))
    socket)

The filter fn is:

(defn- room-accept-event?
  [idempotent-filter event]
  (or
    (nil? event)
    (not (.add idempotent-filter event :ignored))))

idempotent-filter is a non-nil-safe bounded FIFO map, we accept nil values because we think it's the proper indicator of a drained source and want to pass it to the sink.

When a browser window gets brutally closed, the following NPE shows up in the log:

[manifold-scheduler-pool-1] 2015-07-22 10:39:54,626 ERROR [manifold.deferred] error in deferred handler
java.lang.NullPointerException
    at manifold.stream.graph$async_connect$this__6562.invoke(graph.clj:183)
    at manifold.stream.graph$async_connect$this__6562$fn__6563$fn__6564.invoke(graph.clj:162)
    at clojure.core$trampoline.invoke(core.clj:6037)
    at manifold.stream.graph$async_connect$this__6562$fn__6563.invoke(graph.clj:162)
    at manifold.deferred.Listener.onSuccess(deferred.clj:219)
    at manifold.deferred.Deferred$fn__6019.invoke(deferred.clj:398)
    at manifold.deferred.Deferred.success(deferred.clj:398)
    at manifold.deferred$success_BANG_.invoke(deferred.clj:245)
    at manifold.stream.default.Stream.put(default.clj:119)
    at manifold.stream.default.Stream.put(default.clj:141)
    at manifold.stream$filter$fn__7154.invoke(stream.clj:628)
    at manifold.stream.Callback.put(stream.clj:452)
    at manifold.stream.graph$async_send.invoke(graph.clj:50)
    at manifold.stream.graph$async_connect$this__6562.invoke(graph.clj:181)
    at manifold.stream.graph$async_connect$this__6562$fn__6563$fn__6564.invoke(graph.clj:162)
    at clojure.core$trampoline.invoke(core.clj:6037)
    at manifold.stream.graph$async_connect$this__6562$fn__6563.invoke(graph.clj:162)
    at manifold.deferred.Listener.onSuccess(deferred.clj:219)
    at manifold.deferred.Deferred$fn__6019.invoke(deferred.clj:398)
    at manifold.deferred.Deferred.success(deferred.clj:398)
    at manifold.deferred$success_BANG_.invoke(deferred.clj:245)
    at manifold.stream.default.Stream.put(default.clj:119)
    at manifold.stream.default.Stream.put(default.clj:141)
    at manifold.bus$event_bus$reify__9469$fn__9472.invoke(bus.clj:110)
    at clojure.core$map$fn__4553.invoke(core.clj:2624)
    at clojure.lang.LazySeq.sval(LazySeq.java:40)
    at clojure.lang.LazySeq.seq(LazySeq.java:49)
    at clojure.lang.RT.seq(RT.java:507)
    at clojure.core$seq__4128.invoke(core.clj:137)
    at clojure.core$apply.invoke(core.clj:630)
    at manifold.bus$event_bus$reify__9469.publish(bus.clj:110)
    at com.unbounce.events_gateway.server_push$broadcast_heartbeat.invoke(server_push.clj:325)
    at com.unbounce.events_gateway.server_push$make_heartbeater$_heartbeater__11697.invoke(server_push.clj:339)
    at manifold.time$fn__5771$every__5780$f__5781.invoke(time.clj:169)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at manifold.executor$thread_factory$reify__5598$f__5599.invoke(executor.clj:36)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:745)

It happens a few times per closed socket, when puts are attempted. Then things clear up internally and the NPE doesn't happen any more.

Deferred timeout! might not be working

I was running through the deferred documentation using the 0.1.0-beta8 version of manifold. I ran the following code:
@(d/timeout! (future (Thread/sleep 2000) :foo) 100)
and there is no TimeoutException. It runs for 2 seconds and spits out :foo. When I followed the example and placed :bar as the timeout-value, it still returns :foo. Could there be an issue with the manifold/time/in function?

manifold.stream creates dependency cycles

I'm not sure if this is technically a bug or not, but I thought it warranted discussion. Cursive includes functionality to check for dependency cycles in namespaces, and will produce an error when the user tries to load a namespace involved in a cycle, the assumption being that they just created it inadvertently. For a while there, the entire transitive closure of dependencies of the file being loaded was calculated and cycles anywhere in the graph were warned about. I've since fixed this, but while that was the case several users reported problems with manifold.stream. This is used from manifold.stream.core, .queue, .seq, .deferred, .async but also requires them itself later in the file. This will cause problems for people wanting to work on the manifold source with Cursive, and seems weird - can it be fixed?

take on a drained deferred source doesn't return nil

Let me dissect that a bit.

I created a stream from a manifold.deferred.

When I call take on the stream initially, it returns the value of the deferred (which is expected) and the stream is drained (also expected).

But then, when I call take again, it should return nil which it doesn't. Here's a sample repl session:

=> (def z (d/deferred))
'z
=> (def a (s/->source z))
'a
=> (d/success! z 42)
true
=> (s/drained? a)
false
=> @(s/take! a)
42
=> (s/drained? a)
true
=> @(s/take! a)

ClassCastException java.lang.Boolean cannot be cast to java.util.concurrent.Future  clojure.core/deref-future (core.clj:2180)

The reason I even came across this is that I was hoping that the deferred which is returned from manifold.stream/reduce could be used in a pipeline transparently. This is a trivial example of what I was wishing to do:

(->> (range 10)
     s/->source
     (s/reduce +)
     s/->source)

Question: Possibility to back-propagate errors

I am using the manifold.bus to communicate between components in my service.

The flow is usually like this:

event on api component ----->  (1) store event in database
                         |---> (2) some coercions ---> send event to an external service
                         |---> (3) some other invocation when intercepting this message

The problem is, that in the api component, I don't know whether any of these consumers throw an exception. For the first two listeners it is crucial that the user knows if anything went wrong with his initial event. The easiest approach would be to make the API handler aware of the components handling (1) and (2) and process it sequentially. But I did like the approach of a central "nerve system" which totally decouples components and would mean adding new listeners without changing the dependency graph.

Could you give me a hint how this issue could be tackled the right "async/manifold" way? Thanks!

Question: How do I send multiple sources to a single sink?

I have two source streams and a single sink which writes to a file.

When stream A closes, it closes the sink and stream B doesn't finish writing.

I see I can provide {:downstream? false} on A, but doesn't that introduce a race condition? What if B happens to finish first?

Trouble with lazy sequences

Some of my deferreds yield a LazySeq. This seems to break manifold. Is there something I've misunderstood here? I can get around this by delazifying the result.

Here's an example of what I'm talking about:

@(d/chain [1 2 3] (fn [x] (map #(* 2 %) x)))
ClassCastException clojure.lang.LazySeq cannot be cast to java.util.concurrent.Future  clojure.core/deref-future (core.clj:2180)
@(d/chain [1 2 3] (fn [x] (vec (map #(* 2 %) x))))
=> [2 4 6]

core.async chans can't be coerced to deferreds

Example:

> (md/->deferred (clojure.core.async/chan))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred.  manifold.deferred/->deferred (deferred.clj:129)
> (md/->deferred (clojure.core.async/promise-chan))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred.  manifold.deferred/->deferred (deferred.clj:129)

md/chain appears to treat the chan as just another value:

> (md/chain (clojure.core.async/promise-chan) inc)
#<ErrorDeferred@7e9ead5e: ClassCastException clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to java.lang.Number  clojure.lang.Numbers.inc (Numbers.java:112)
icecap.handlers.core> (md/chain (clojure.core.async/chan) inc)
#<ErrorDeferred@1625c143: ClassCastException clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to java.lang.Number  clojure.lang.Numbers.inc (Numbers.java:112)

In all cases, I was expecting a deferred that would never fire (since the channel would never fire/close).

The documentation claims otherwise:

Values that can be coerced into a deferred include Clojure futures, Clojure promises, and core.async channels:

I don't know if the documentation is wrong or this is a bug. I could see how there's an impedance mismatch where you have to make sure the channel is closed before you can make the deferred fire?

[Question] How to properly release resource on closed/drained?

I have the following connection between an Aleph websocket sink and a bus source:

  (s/connect
    (s/filter
      (partial
        room-accept-event?
        idempotent-filter)
      (bus/subscribe
        chatrooms room))
    socket)

I wanted to check if the FIFOCache instance that's wrapped in idempotent-filter was collected after the socket disconnects.

Before that, I've added on-drained and on-closed listeners on the above to track streams events. Things look great: I see the socket close event immediately followed by the drained event from s/filter, followed by the drained event from bus/subscribe.

But when I inspect a heap dump (after GC) I see that there are still references maintained on the object I'm concerned about:

screenshot from 2015-07-22 17 31 08

I'm concerned this could be a memory leak. This could also be a complete PEBKAC, whether I misinterpreted the heap dump or I forgot to perform a necessary clean-up on the streams to release related resources (I've tried {:upstream? true} but to no avail). I've also tried removing the filter and in that case, the FIFOCache instances are GCed away as expected.

Thanks for any light you can shed on this, even if it's for exposing my n00bness ๐Ÿ˜…

batch fails to exert back-pressure when downstream streams close

In the following case one would expect blocking put!s to stream a to wait forever after one has exceeded the batch size:

(ns test.batch
  (:require [manifold.stream :as s]))

(def a (s/stream* {:permanent? true}))
(def b (s/batch 10 a))
(def c (s/stream))

(s/connect b c)
(s/close! c)

(time (dotimes [_ 10000] @(s/put! a :test)))
;; "Elapsed time: 45.356857 msecs"

If c is not closed, put!s to stream a will eventually stop completing, as expected.

core.async channels are not deferrable?

In aleph there's an example for http that looks like:

(defn delayed-hello-world-handler
  "Alternately, we can use a [core.async](https://github.com/clojure/core.async) goroutine to
   create our response, and convert the channel it returns using
   `manifold.deferred/->deferred`. This is entirely equivalent to the previous implementation."
  [req]
  (d/->deferred
    (a/go
      (let [_ (a/<! (a/timeout 1000))]
        (hello-world-handler req)))))

When I try this, I get:

IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred.  manifold.deferred/->deferred (deferred.clj:129)

I can't get it to work using the REPL either:

$ lein repl

nREPL server started on port 59006 on host 127.0.0.1 - nrepl://127.0.0.1:59006
REPL-y 0.3.7, nREPL 0.2.10
Clojure 1.7.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e

webapp.main=> (require '[clojure.core.async :as a])
nil
webapp.main=> (require '[manifold.deferred :as d])
nil
webapp.main=> (d/->deferred (a/chan))

IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred.  manifold.deferred/->deferred (deferred.clj:129)
webapp.main=> (d/->deferred (a/go))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred.  manifold.deferred/->deferred (deferred.clj:129)

webapp.main=> (d/deferrable? (a/go))
false
webapp.main=> 

Am I missing something? Is this a bug, or is the example out of date?

I discovered this when I (whimsically) attempted to implement each middleware with go blocks. (Presumably I need a chain of deferreds but I never got far enough to work through the logic.)


My deps:

 [aleph "0.4.1-alpha2" :exclusions [[clj-tuple]]]
   [byte-streams "0.2.0"]
     [primitive-math "0.1.4"]
   [io.netty/netty-all "4.1.0.Beta5"]
   [manifold "0.1.1-alpha3"]
     [io.aleph/dirigiste "0.1.1"]
     [riddley "0.1.10"]
   [potemkin "0.4.1"]
 [ch.qos.logback/logback-classic "1.1.3"]
   [ch.qos.logback/logback-core "1.1.3"]
   [org.slf4j/slf4j-api "1.7.7"]
 [clj-tuple "0.2.2"]
 [com.stuartsierra/component "0.2.3"]
   [com.stuartsierra/dependency "0.1.1"]
 [org.clojure/clojure "1.7.0"]
 [org.clojure/core.async "0.1.346.0-17112a-alpha"]
   [org.clojure/tools.analyzer.jvm "0.1.0-beta12"]
     [org.clojure/core.memoize "0.5.6"]
       [org.clojure/core.cache "0.6.3"]
         [org.clojure/data.priority-map "0.0.2"]
     [org.clojure/tools.analyzer "0.1.0-beta12"]
     [org.ow2.asm/asm-all "4.1"]
 [org.clojure/java.jdbc "0.4.1"]
 [org.clojure/tools.logging "0.3.1"]
 [org.postgresql/postgresql "9.4-1201-jdbc41"]
 [pandect "0.5.3"]
   [org.bouncycastle/bcprov-jdk15on "1.52" :exclusions [[org.clojure/clojure]]]

Construction of chain with clojure.core/apply causes hang in some circumstances

Probably related to #48

Here are some examples. The second one hangs on the deref.

Possibly a bug.?

(require '[manifold.deferred :as d])

;; an example with applying d/chain to a vector of functions.

;; ok
(deref (d/chain
        nil
        identity
        (fn [_]
          (d/chain 100
                   (fn [v]
                     (d/future (str v)))))
        identity))

))

;; hangs!
(deref (apply d/chain
              nil
              [identity
               (fn [_]
                 (d/chain 100
                          (fn [v]
                            (d/future (str v)))))
               identity]))

;; is ok if one of other of the identity functions is omitted
(deref (apply d/chain
              nil
              [#_identity
               (fn [_]
                 (d/chain 100
                          (fn [v]
                            (d/future (str v)))))
               identity]))


;; but reduce is ok
(deref (reduce (fn [c n] (d/chain c n)) {}
               [identity
                (fn [_]
                  (d/chain 100
                           (fn [v]
                             (d/future (str v)))))
                identity]))

Splicing a channel into another channel, one result at a time

I have a piece of software that I'm porting from core.async to manifold. It has a simple interpreter that interprets "plans", which consist of:

  • unordered sets of other plans, which can be executed in any order, even concurrently
  • ordered vecs of other plans, which must be executed one at a time, in order
  • steps, which are individual instructions, like "make an HTTP request"

This is implemented with a simple dispatch multimethod:

(defmulti execute
  (fn [x] (cond (set? x) ::unordered-plans
                (vector? x) ::ordered-plans
                :else (:type x))))

executing a step returns a chan (soon: stream) of results. Usually there's only 1 result on that stream, but e.g. an HTTP request for a large file might send progress reports every 10%.

The first implementation in core.async was:

(defmethod execute ::unordered-plans
  [plans]
  (async/merge (map execute plans)))

which was easy to port to:

(defmethod execute ::unordered-plans
  [plans]
  (ms/concat (ms/map execute plans)))

The second part was trickier. It was originally spelled:

(defmethod execute ::ordered-plans
  [plans]
  (let [out (chan)]
    (go (loop [plans plans]
          (let [results (<! (async/into [] (execute (first plans))))]
            (<! (async/onto-chan out results false)))
          (if (seq (rest plans))
            (recur (rest plans))
            (async/close! out))))
    out))

My first thought was to write:

(defmethod execute ::ordered-plans
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      (ms/connect (execute plan) out))
                    out)
    out))

... but connect doesn't tell me when (execute plan) is done, so that doesn't work. Instead:

(defn impl-b
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      ;; stream->seq's seq will block :(
                      (ms/put-all! out (ms/stream->seq (->source (execute plan)))))
                    out)
    out))

... which works except for the seemingly unnecessary seq with blocking backpressure mechanics. I think I can just write it with deferred tricks:

(defn impl-c
  [plans]
  (let [out (ms/stream)]
    (ms/connect-via (ms/->source plans)
                    (fn [plan]
                      (let [s (execute plan)]
                        (md/loop [v (ms/take! s)]
                          (when (some? v)
                            (ms/put! out v)
                            (recur (ms/take! s)))
                          (ms/close! s))))
                    out)
    out))

... but that seems more involved than it needs to be, since it's spelling most of put-all!. Am I missing something, or is this how you write it? I guess what I want is put-all! for streams instead of seqs?

Is this a pattern you've noticed before? On a related note, (let [...] (connect(-via) in f out) out) seems to be a recurring pattern; do you have any opinions on the return values of connect(-via)?

let-flow chokes when the body closes over a deferred.

These two should be identical:

(ns let-flows-not
  (:require [manifold.deferred :as d]
            [manifold.stream :as s]))


(let [k (d/deferred)
      q (s/stream)
      x (d/chain' (s/try-put! q :x 10)
                  (fn [_] 
                    k
                    {:result :chain}))]
  (Thread/sleep 20)
  (prn :debug :x (d/realized? x))
  (when (d/realized? x)
    (prn :debug :x @x)))


(let [k (d/deferred)
      q (s/stream)
      x (d/let-flow' [_ (s/try-put! q :y 10)]
                     k
                     {:result :let-flow})]
  (Thread/sleep 20)
  (prn :debug :x (d/realized? x))
  (when (d/realized? x)
    (prn :debug :x @x)))

But using [manifold "0.1.1-alpha3"] the output is:

:debug :x true
:debug :x {:result :chain}
:debug :x false

It seems the second let is blocking because k is not realized.

Inconsistent behaviour?

I have some problem detecting when to close the output stream in the example below. I do not know if it is ok to close the file in the on-drained callback.
I noticed different behaviour when using stream and source as described in the code below.

Whats the recommended pattern for writing a stream down to a file as attempted here?

(require '[byte-streams :as bs])
(require '[clojure.java.io :as io])
(require '[manifold.deferred :as d])
(require '[manifold.stream :as s])

(defn bug? [s]
  (let [out (io/output-stream "c:/tmp/f3.txt")]
    (->>
     s
     (s/map bs/to-byte-array)
     ((fn [s] (s/on-drained s 
                            (fn [] 
                              (println "2") (flush)
                              (.flush out) (.close out)
                              ))
        s))
     ((fn [source]
        (s/consume (fn [v] 
                     (.write out v)
                     (when (s/drained? source)
                       (println "1") (flush)
                       (.flush out) (.close out)
                       )) source))))))

;; case 1 prints: "2"
(let [s (s/stream)]
  (s/put! s "test")
  (bug? s)
  (s/close! s))

;; case 2 prints: "2" and "1" and then exception cause of writing to closed file
(let [s  (-> (byte-array [1 2 3])
             (bs/to-byte-buffers)
             (s/->source))]
  (bug? s)
  (s/close! s))

Possible race condition when using future-with and dirigiste executor?

After tearing my hair out for a bit tracking this down, I think I've narrowed it to the util/future-with macro when using an Executor instance returned by dirigiste . A basic sometimes-repro case goes like this:

(dotimes
 [n1 100]
 (let
  [e (exec/fixed-executor 5)]
  (dotimes
   [n2 5]
   (md/future-with e (println (str "inside worker " n1 ":" n2))))
  (.shutdown e)))

More often than not, this will throw a java.util.concurrent.RejectedExecutionException at Executor.java:299. I installed a custom build of dirigiste with some additional logging output which traces the values of numWorkers and numThreads/maxThreadCount in Executors.java, and in both the fixedController and utilizationController implementations of shouldIncrement I was seeing an additional erroneous call with numWorkers failing the less-than inequality check.

If I had to guess, I'd say something funky is happening with the thread-binding management within the future-with macro. Maybe there's some non-threadsafe mutable state lurking in the Executor implementation logic?

Let me know if there's anything I can do to further help debug this. If it turns out the bug is a dirigiste issue rather than manifold, feel free to close this and I'll re-open the issue over on that project.

Allowing return of deferred values from catch?

Hey Zach,

What are your thoughts on allowing the return of deferred values from d/catch? Currently deferreds from d/catch aren't "flattened" as they are with d/chain and friends. e.g.:

user=> (d/catch (d/error-deferred (Exception.)) (fn [e] (d/success-deferred :delayed-recovery)))
<< << :delayed-recovery >> >>

I find myself wanting to do this so that I can catch errors while in a d/loop, and d/recur once I've recovered (asynchronously) from the error (e.g., performing a "cooldown" sleep before recurring.)

Thanks,
Brian

manifold.stream/try-put! to core.async sink throws exception

these are my dependencies

  :dependencies [[org.clojure/clojure "1.6.0"]
                 [org.clojure/core.async "0.1.346.0-17112a-alpha"]
                 [manifold "0.1.0"]]

and these are the steps to reproduce the problem

> (require '[clojure.core.async :as a])
nil
> (require '[manifold.stream :as s])
nil
> (def c (a/chan))
#'c
> (def si (s/->sink c))
#'si
> @(s/try-put! si ::input 1000 ::timeout)

Exception in thread "async-dispatch-1" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.Keyword
    at clojure.core$_cache_protocol_fn.invoke(core_deftype.clj:544)
    at clojure.core.async.impl.protocols$eval512$fn__513$G__503__520.invoke(protocols.clj:15)
    at clojure.core.async$do_alts$fn__5334.invoke(async.clj:231)
    at clojure.core.async$do_alts.invoke(async.clj:223)
    at clojure.core.async$ioc_alts_BANG_.doInvoke(async.clj:359)
    at clojure.lang.RestFn.invoke(RestFn.java:494)
    at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917$state_machine__5209__auto____8918$fn__8920.invoke(async.clj:145)
    at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917$state_machine__5209__auto____8918.invoke(async.clj:145)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917.invoke(async.clj:145)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

long chains of Deferred values hang

i came across this while debugging another problem

(defn chain-delay-deferred
  [n t]
  (let [f (manifold.deferred/deferred)]
    (future (Thread/sleep t) (manifold.deferred/success! f 100))
    (reduce (fn [d i] (manifold.deferred/chain d (fn [v] (+ v i))))
            f
            (range 0 n))))

@(chain-delay-deferred 100 1000) ;; 5050
@(chain-delay-deferred 100000 1000) ;; hangs

there are some values of n where it hangs on some runs and returns on other runs. on my machine

@(chain-delay-deferred 4567 1000)

seems to hang about half the time

i ran this against the current tip of master 4e1fff70c5380b33e9cc2f50f03ce16af6d393b4, but i was observing the same problem against 0.1.1-alpha3

manifold.stream/consume needs on-drained msg or deferred

Add some way using consume to have an on-drained event that will be delivered after all messages have been processed by consume. This is needed for logic where running on-drained prior to processing all messages in the stream would be not optimal.

deferred/chain only works for very few parameters....in a core.async go block

This might be related to #48, but I'm not sure. I was running into a problem where my code kept hitting an OutOfMemory error when it was compiling with `lein run`` (in my IDE, I instead got an error about clojure.core/long not being found...I think there's a subtle difference in how they build). I eventually tracked it down to the length of a (d/chain ...) call inside a core.async go block. So I'm guessing this has to do with the interaction of multiple macros...

I was able to demonstrate it in the repl on manifold 0.1.1-alpha4 (Clojure 1.7.0 and Java 1.8.0_60-b27) by tweaking the example from https://github.com/ztellman/manifold/blob/master/docs/deferred.md#composing-with-deferreds

clj.core=> (def d (d/deferred))
#'clj.core/d
clj.core=> (d/chain d inc inc inc inc inc inc #(println "x + 3 =" %))
<< โ€ฆ >>
clj.core=> (d/success! d 0)
x + 3 = 6
true

That works with even a long chain of functions (hence why I added inc's).

clj.core=> (def d (d/deferred))
#'clj.core/d
clj.core=> (go (d/chain d inc inc inc inc inc inc #(println "x + 3 =" %)))

ExceptionInfo Class not found: clojure.core/long  clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc inc inc #(println "x + 3 =" %)))

ExceptionInfo Class not found: clojure.core/long  clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc inc #(println "x + 3 =" %)))

ExceptionInfo Class not found: clojure.core/long  clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc #(println "x + 3 =" %)))

ExceptionInfo Class not found: clojure.core/long  clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc #(println "x + 3 =" %)))

ExceptionInfo Class not found: clojure.core/long  clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc #(println "x + 3 =" %)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x7f78eaab "clojure.core.async.impl.channels.ManyToManyChannel@7f78eaab"]
clj.core=> (d/success! d 0)
x + 3 = 1
true

In this case, it works for three parameters to d/chain and no more.

Unfortunately, I'm fairly new to Clojure, so debugging this is probably beyond my ability. But, I did see that three arguments is the breakpoint for some code in that function, so that's probably a clue...
https://github.com/ztellman/manifold/blob/master/src/manifold/deferred.clj#L922-L925

I'm probably about to write the world's dumbest implementation of d/chain to fix this for my case, but it would be great if we could figure out if this is a core.async issue or a manifold issue and find a way to fix it...

Retry Logic

I've recently written some retry logic using manifold.deferred and manifold.time that uses exponential back off to retry asynchronous operations. The current implementation is available here and its tests are here.

Do you think this is something that you might like to integrate into manifold? If so, I'd be happy to open up and work on a PR.

Not all pending takes on stream are resolved when stream is closed

Here's what I'm seeing:

(def s (ms/stream))

(dotimes
 [n 10]
 (doto
  (Thread.
   (fn
    []
    (loop
     []
     (when-let
      [x @(ms/take! s)]
      (recur)))
    (println (str "Thread" n "ended"))))
  (.start)))

s  ;; << stream: {:pending-puts 0, :drained? false, :buffer-size 0, 
   ;;             :permanent? false, :type "manifold", :sink? true, :closed? false, 
   ;;             :pending-takes 10, :buffer-capacity 0, :source? true} >>

(ms/close! s)  ;; "Thread 0 ended"

s  ;; << stream: {:pending-puts 0, :drained? true, :buffer-size 0, 
   ;;             :permanent? false, :type "manifold", :sink? true, :closed? false, 
   ;;             :pending-takes 9, :buffer-capacity 10, :source? true} >>

Since all 10 threads are blocking on dereferencing the deferred returned by take!, I would expect that once close! is called on the stream, they'd all get nil and each would end. Instead, only one pending take seems to resolve, and the rest are kind of spinlocked.

Bug? Or am I making an incorrect assumption about the way close! works with the take! function? Would the drained? fix you made affect this? I haven't updated to the snapshot build yet, but I could if that might make a difference.

Closing the underlying core.async channel doesn't cause coerced source to become drained

When a Manifold source is created by coercing a core.async channel with ->source it doesn't become drained when the channel is closed. This behavior differs from what happens with connect and therefore doesn't feel intuitive to me. Could this be a bug or is there some reason why this is actually the intended behavior?

> (require `[manifold.source :as s])
nil
> (require '[clojure.core.async :as a])
nil

> (def ch (a/chan))
#'ch
> (def source (s/->source ch))
#'source
> (a/close! ch)
nil
> (s/drained? source)
false
> (s/take! source ::drained)
<< ::drained >>
> (def ch (a/chan))
#'ch
> (def stream (s/stream))
#'stream
> (s/connect ch stream)
nil
> (a/close! ch)
nil
> (s/drained? stream)
true
> (s/take! stream ::drained)
<< ::drained >>

Different semantics for d/catch based on if deferred is realized

It seems that the semantics of manifold.deferred/catch change depending on whether or not the provided deferred value has already been realized. I would expect this function to work the same as manifold.deferred/timeout! works, always returning a new deferred value. Does this expectation make sense to you? Otherwise it seems that my code would have to check if the value has been realized which breaks the pattern.

See my example below. I would expect the final two values to be equivalent. That is, I would expect them both to be deferred values containing "badness".

#user> (def d (d/deferred))
#'user/d
user> (def d' (d/catch d Exception #(.getMessage %)))
#'user/d'
user> (d/error! d (Exception. "badness"))
true
user> (d/catch d Exception #(.getMessage %))
"badness"
user> d'
<< "badness" >>

Upstream flag in `connect` doesn't seem to work

Given the following:

(deftest test-connect-filter
  (let [s1 (s/stream)
        s2 (s/stream)]
    (s/connect s1 s2 {:upstream? true})
    (s/close! s2)
    (is (s/closed? s2))
    (is (s/closed? s1)))

the test fails as s1 is still open.

According to the documentation of connect I would expect both s1 and s2 to be closed after closing s2, even without specifying the upstream? flag as s2 is the only downstream sink. However, even when :upstream? is true, s1 stays open.

Could you please take a look if is either a gross misunderstanding on my side or a bug?

Communicating backpressure when using consume-async

I have a paginated REST API, and I'm trying to shovel all of its data into a stream. Every request to the API contains n maps of data (that go into a stream), and, maybe, a link to the next page. I have something like this:

(defn scans!
  []
  (let [urls-stream (ms/stream 100) ;; absolutely no science here
        scans-stream (ms/stream 1000) ;; nor here
        shovel (fn [url]
                 (md/chain
                  (get-page! url)
                  (fn [{:keys [pagination scans]}]
                    (ms/put! urls-stream (:next pagination))
                    (ms/put-all! scans-stream scans))))]
    (ms/consume-async shovel urls-stream)
    reports-stream))

This somewhat communicates backpressure in that as soon as scans-stream is full or closed, put-all! will return false, and consume-async will interpret that as time to abort. What I really wanted (I think) is a deferred that fires with true once everything has successfully been put on scans-stream. That would stop consume-async from consuming another URL temporarily, which is the backpressure I was looking for.

I think that means I want something like:

(defn carefully-put!
  "Try putting msg on sink, waiting at most timeout msec. If it fails, try
  again indefinitely. Returns a deferred that fires true once successful.
  "
  [sink msg timeout]
  (md/loop [success? false]
    (if success?
      true
      (md/recur (ms/try-put! sink msg timeout)))))

(defn carefully-put-all!
  "Attempts to put msgs on sink, in order. Returns a deferred that
  fires with true when all msgs have successfully been put onto sink."
  [sink msgs timeout]
  (md/loop [[x & xs] (seq msgs)]
    (carefully-put! sink x timeout)
    (if (seq xs)
      (md/recur xs)
      true)))

(WARNING: I have not ran this, there might be stupid bugs or typos.)

Presumably this needs to be a bit more clever. For example, if the sink is closed, or becomes closed in the mean while, it should stop trying and just return false.

Thoughts?

Clarification on semantics of drained?

Based on the stream documentation, the semantics of the drained? function are described as:

Sources that will never produce any more messages (often because the corresponding sink is closed) is said to be drained. We may check whether a source is drained via drained? and on-drained.

I read this to mean that a source which is both closed, and which also has no pending puts, i.e. it will never produce another message because no more can be added and all pending have been taken.

However, this is the behavior I'm seeing:

(def a (ms/stream))

(ms/put! a 123)
(ms/put! a 123)

(ms/close! a)

a  ;; << stream: {:pending-puts 2, :drained? true, :buffer-size 0, 
   ;;             :permanent? false, :type "manifold", :sink? true, :closed? true, 
   ;;             :pending-takes 0, :buffer-capacity 0, :source? true} >>

@(ms/take! a)   ;; 123
@(ms/take! a)   ;; 123
@(ms/take! a)   ;; nil

I would expect that closed? would retrurn true immediately after the called close! is resolved, but drained? would only return true after the second call to take! has been resolved.

Am I understanding the intent correctly?

Unable to build documentation

On a fresh checkout of manifold, lein codox fails due to a missing plugin ('codox' is not a task. See 'lein help'.). I don't know if that's intended to work, but there's a codox in plugins, so I'm guessing it is. I was able to build the documentation by switching codox to lein-codox and updating the version to something available on Clojars as follows:

diff --git i/project.clj w/project.clj
index a2dc4d3..5cb811b 100644
--- i/project.clj
+++ w/project.clj
@@ -16,7 +16,7 @@
                    :benchmark :benchmark
                    :stress #(or (:stress %) (= :stress (:tag %)))
                    :all (constantly true)}
-  :plugins [[codox "0.8.10"]
+  :plugins [[lein-codox "0.9.4"]
             [lein-jammin "0.1.1"]
             [ztellman/lein-cljfmt "0.1.10"]]
   :cljfmt {:indents {#".*" [[:inner 0]]}}

(after which lein codox worked fine)

chain should document error behavior

One of deferred's critical features is that it knows how to represent errors. However, chain, one of the most common tools (at least for me) for working with deferreds, doesn't document what happens when its input deferred contains an error, or any of the intermediate fns raises.

WARNING: excessive pending puts (> 16384), closing stream

I have a very simple app (less than 200 lines of code) and I thought Manifold would be a good way to set up a simple jobs queue, but when I run the app, after about 15 to 30 minutes, I get this error:

 Aug 10, 2015 11:37:19 PM clojure.tools.logging$eval36$fn__40 invoke
 WARNING: excessive pending puts (> 16384), closing stream
 java.lang.IllegalStateException
at manifold.stream.default.Stream.put(default.clj:111)
at manifold.stream.default.Stream.put(default.clj:139)
at eh.streams$source_worker$fn__141.invoke(streams.clj:58)
at eh.streams$source_worker.invoke(streams.clj:55)

I have a "start" function that spins up 5 "source" workers and 5 "destination" workers:

 (defn start
    [map-of-config-info]
    (dotimes [_ 5]
     (future (source-worker map-of-config-info))
     (future (destination-worker))))

The source-worker pulls records out of Redis and puts them in an anonymous function for some processing and then puts the function on a stream. The destination-worker simply pulls the functions off the stream and executes them:

 (defn destination-worker []
   (loop [event-fn (stream/take! event-stream)]
     (@event-fn)
     (recur (stream/take! event-stream))))

I'm guessing the destination-workers run slowly, because they do all the work, whereas the source-workers do very little work, and so I would guess that slowly, over the course of 20 minutes or so, the puts! build up in excess of the takes!, and so I get this error? How could I test that?

I will experiment with having more destination-workers and less source-workers, though that seems a bit inexact.

Possible bug on default stream take timeout

I've been digging through the logic of the take implementation in default.clj while debugging #27. Possible I found an unrelated bug on https://github.com/ztellman/manifold/blob/master/src/manifold/stream/default.clj#L176:

;; add to the consumers queue
(if (and timeout (<= timeout 0))                      ;; Should be (>= timeout 0) here?
  (d/success-deferred timeout-val executor)
  (let [d (d/deferred executor)]
    (d/timeout! d timeout timeout-val)
    (let [c (Consumer. d default-val)]
      (if (.offer consumers c)
        d
        c))))

Unless I'm missing something (very possible!) a valid timeout value should always be some positive integer. As such, timeout will never validly evaluate to <= 0, and this and will always hit the false path.

0.1.4-alpha1 - stream/put! return nil, rather than true or false

I updated to aleph 0.4.1-beta6, which pulls in manifold 0.1.4-alpha1.

On the old case:

(defn- send!
  [socket data]
  (let [rc @(stream/put! socket (codec/encode data))]
    (log/info "RC:" rc)
    rc))

This function (socket == websocket stream) returns true or false if the message makes it through.

With 0.1.4-alpha, it now returns nil.

Bug?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.