clj-commons / manifold Goto Github PK
View Code? Open in Web Editor NEWA compatibility layer for event-driven abstractions
A compatibility layer for event-driven abstractions
See http://dev.clojure.org/jira/browse/CLJ-1641 for some details.
It hasn't been decided whether this is a change that will stay in Clojure 1.7.0 or not, but wanted you to be aware of it.
Using (http/websocket-connection req)
I might get an error (in the form of an exception). How do I get it out so I can, say, print out what it was?
(let [stream @(http/websocket-connection)]
(if (d/deferred? stream)
(println (.getMessage (d/error-value stream (Exception. "Unknown."))))
(do-something-with stream)))
That seems to work, but feels like I'm missing the point.
If I (.getMessage @stream), something locks up. Somehow a manifold.deferred.ErrorDeferred is not really a deferred? I don't know.
Also error-value
is not in the documentation (which is marked as 0.1.1-SNAPSHOT).
I should probably be using d/loop and d/recur and d/catch and d/chain for processing messages, right?
We have some code that wants to do something periodically. The original version used core.async + timeout, which, while a lot of code, allowed re-use of some code I wrote in icecap that gives me a fake clock, a la twisted.internet.task.Clock
. I'll take it as given that having a deterministic clock like this is desirable; I'd be happy to elaborate if that's a point of contention ๐
That piece of core.async code predictably became a lot smaller when using manifold instead, but of course manifold happily ignored my pleas to rebind timeout
. I'm trying to figure out what the best way to do this in Manifold would be. I think the answer is to implement a fake scheduler. https://github.com/ztellman/manifold/blob/master/src/manifold/time.clj#L175 constrains this to be a ScheduledThreadPoolExecutor
, which is fine. Alas, the let
binding means I don't know how to override that. I'm not entirely sure why that doesn't use (manifold.executor/executor)
. I think that's a thread local and not a dynamic variable because of performance, but I'm not sure.
Thanks in advance!
cc @derwolfe
When I try to run the trivial code
(require '[manifold.stream :as s])
(def a (s/stream))
(def b (s/stream))
(s/map (fn [x y] [x y]) a b)
I receive the following error:
IllegalArgumentException cannot convert manifold.stream$map$fn__4180 to source manifold.stream/->source (stream.clj:202)
Either I have a fundamental misunderstanding of the function, or this result is a bug :)
Core.async channels can be coerced into manifold streams, but are there plans to allow a manifold stream to be used as a core.async channel?
I am relatively new to using manifold, but am accustomed to writing Twisted. My expectation of every
and periodically
is that, once called, either could keep a process running indefinitely.
Unless I am misunderstanding how every
and periodically
function, it appears that they only keep running if another function is keeping the process alive. For example, my expectation of the code in jedesmal, is that "I work" would be printed every second; until the application is terminated. In reality, it prints once, then exits.
In short - my expectations have not matched what the library does :-). Does this seem like a valuable feature to you? Or is there another way that you believe this should be dealt with outside of manifold?
Thank you!
(require '[manifold.deferred :as d])
(let [dd (d/deferred)]
(-> dd
(d/chain #(do (assert (string? %) "invalid") %) prn)
(d/catch #(str "something unexpected:" (.getMessage %))))
(d/success! dd nil))
Executing the above I expect one of the two outcomes:
prn
is called with the string value put onto the deferredcatch
handler prints some exceptionWhat happens instead is that true
is returned, prn
is not being called, no exception is printed.
I tried looking into the catch
/catch'
code but nothing stood out and AssertionError
is extending Throwable
so not sure what's going on there.
When making requests, the request map requires the scheme to be a str; otherwise https://github.com/ztellman/aleph/blob/0.4.0/src/aleph/http/client.clj#L65 fails. When receiving a request, the request map passed to the handler has :scheme
as a keyword:
2015-Feb-20 15:29:13 -0800 zygalski.local DEBUG [shimmer.core] - incoming-request {:remote-addr 0:0:0:0:0:0:0:1, :headers {host localhost:7467, user-agent curl/7.37.1, accept */*}, :server-port 7467, :keep-alive? true, :uri /, :server-name localhost, :query-string nil, :body nil, :scheme :http, :request-method :get}
I'm not sure if this is intentional, or an unintentional inconsistency.
For some reason this hangs (not the use of s/map
):
> (->> [1 1 1 2 2 3]
s/->source
(s/partition-by identity)
(s/map s/stream->seq)
(s/stream->seq)
(println))
I would it expect it to print:
((1 1 1) (2 2 2) (3))
But using s/mapcat
seems to be ok:
> (->> [1 1 1 2 2 3]
s/->source
(s/partition-by identity)
(s/mapcat s/stream->seq)
(s/stream->seq)
(println))
(1 1 1 2 2 3)
nil
If we create a d/chain, containing a single d/future, the semantics of d/chain will mean that a deref on the chain will provide the deref of its 'links'. So for a single deferred link, a deref on the chain causes a deref on the link.
(deref (d/chain (d/future (Thread/sleep 1) :ok)))
=> :ok
But if I use a clojure.core future instead, I get a pending future back when I deref the chain.
(deref (d/chain (future (Thread/sleep 1) :ok)))
=> #<core$future_call$reify__6666@6c002f51: :pending>
I didn't expect this result, but perhaps there's a good reason for it
(I'm on beta5)
it seems manifold.deferred/catch
can transform error values into success values (rather than returning the value of the error handler) in some circumstances. perhaps if the erroring deferred is already realized when given to the catch (though i haven't taken any steps to validate this hypothesis)
;; [manifold "0.1.1-alpha4"]
(require '[manifold.deferred :as d])
(->
(d/chain (d/success-deferred :boo)
(fn [v] (throw (ex-info "boo" {})))
(fn [v] (prn "hi")))
(d/catch (fn [e]
(prn "error")
:caught))
deref)
;; as expected - prints "error", returns :caught
(->
(d/chain (d/error-deferred :boo)
(fn [v] (prn "hi")))
deref)
;; as expected - explodes
(->
(d/chain (d/error-deferred :boo)
(fn [v] (prn "hi")))
(d/catch (fn [e]
(prn "error")
:caught))
deref)
;; whoa - doesn't print error, returns :boo
(->
(d/chain (d/error-deferred :boo)
(fn [v] (prn "hi")))
(d/catch (fn [e]
(prn "error")
:caught))
(d/chain (fn [v]
v))
deref)
;; returns :boo, so the error value has unexpectedly become the success value
Just wondering why manifold.deferred contains this logic :-
(utils/when-core-async
(extend-protocol Deferrable
clojure.core.async.impl.channels.ManyToManyChannel
(to-deferred [ch]
(let [d (deferred)]
(a/take! ch
(fn [msg]
(if (instance? Throwable msg)
(error! d msg)
(success! d msg))))
d))))
I find I'm having to wrap a core.async channel in a record in order for it to return one in a d/chain function. That's OK, but I just wondered what the rationale of making a core.async channel satisfy Deferrable. Is there some interaction between d/chain and core.async channels here?
I suppose there are problems with javascript one-threaded execution in browser, but you always have brilliant solutions ;-)
Using Aleph 0.4.1-alpha1
and Manifold 0.1.1-alpha2
.
Here is the code that connects a socket
stream to a bus
subscription stream:
(s/connect
(s/filter
(partial
room-accept-event?
idempotent-filter)
(bus/subscribe
chatrooms room))
socket)
The filter fn is:
(defn- room-accept-event?
[idempotent-filter event]
(or
(nil? event)
(not (.add idempotent-filter event :ignored))))
idempotent-filter
is a non-nil-safe bounded FIFO map, we accept nil
values because we think it's the proper indicator of a drained source and want to pass it to the sink.
When a browser window gets brutally closed, the following NPE shows up in the log:
[manifold-scheduler-pool-1] 2015-07-22 10:39:54,626 ERROR [manifold.deferred] error in deferred handler java.lang.NullPointerException at manifold.stream.graph$async_connect$this__6562.invoke(graph.clj:183) at manifold.stream.graph$async_connect$this__6562$fn__6563$fn__6564.invoke(graph.clj:162) at clojure.core$trampoline.invoke(core.clj:6037) at manifold.stream.graph$async_connect$this__6562$fn__6563.invoke(graph.clj:162) at manifold.deferred.Listener.onSuccess(deferred.clj:219) at manifold.deferred.Deferred$fn__6019.invoke(deferred.clj:398) at manifold.deferred.Deferred.success(deferred.clj:398) at manifold.deferred$success_BANG_.invoke(deferred.clj:245) at manifold.stream.default.Stream.put(default.clj:119) at manifold.stream.default.Stream.put(default.clj:141) at manifold.stream$filter$fn__7154.invoke(stream.clj:628) at manifold.stream.Callback.put(stream.clj:452) at manifold.stream.graph$async_send.invoke(graph.clj:50) at manifold.stream.graph$async_connect$this__6562.invoke(graph.clj:181) at manifold.stream.graph$async_connect$this__6562$fn__6563$fn__6564.invoke(graph.clj:162) at clojure.core$trampoline.invoke(core.clj:6037) at manifold.stream.graph$async_connect$this__6562$fn__6563.invoke(graph.clj:162) at manifold.deferred.Listener.onSuccess(deferred.clj:219) at manifold.deferred.Deferred$fn__6019.invoke(deferred.clj:398) at manifold.deferred.Deferred.success(deferred.clj:398) at manifold.deferred$success_BANG_.invoke(deferred.clj:245) at manifold.stream.default.Stream.put(default.clj:119) at manifold.stream.default.Stream.put(default.clj:141) at manifold.bus$event_bus$reify__9469$fn__9472.invoke(bus.clj:110) at clojure.core$map$fn__4553.invoke(core.clj:2624) at clojure.lang.LazySeq.sval(LazySeq.java:40) at clojure.lang.LazySeq.seq(LazySeq.java:49) at clojure.lang.RT.seq(RT.java:507) at clojure.core$seq__4128.invoke(core.clj:137) at clojure.core$apply.invoke(core.clj:630) at manifold.bus$event_bus$reify__9469.publish(bus.clj:110) at com.unbounce.events_gateway.server_push$broadcast_heartbeat.invoke(server_push.clj:325) at com.unbounce.events_gateway.server_push$make_heartbeater$_heartbeater__11697.invoke(server_push.clj:339) at manifold.time$fn__5771$every__5780$f__5781.invoke(time.clj:169) at clojure.lang.AFn.run(AFn.java:22) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at manifold.executor$thread_factory$reify__5598$f__5599.invoke(executor.clj:36) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
It happens a few times per closed socket, when put
s are attempted. Then things clear up internally and the NPE doesn't happen any more.
I was running through the deferred documentation using the 0.1.0-beta8 version of manifold. I ran the following code:
@(d/timeout! (future (Thread/sleep 2000) :foo) 100)
and there is no TimeoutException. It runs for 2 seconds and spits out :foo
. When I followed the example and placed :bar
as the timeout-value, it still returns :foo
. Could there be an issue with the manifold/time/in function?
I'm not sure if this is technically a bug or not, but I thought it warranted discussion. Cursive includes functionality to check for dependency cycles in namespaces, and will produce an error when the user tries to load a namespace involved in a cycle, the assumption being that they just created it inadvertently. For a while there, the entire transitive closure of dependencies of the file being loaded was calculated and cycles anywhere in the graph were warned about. I've since fixed this, but while that was the case several users reported problems with manifold.stream
. This is used from manifold.stream.core, .queue, .seq, .deferred, .async
but also requires them itself later in the file. This will cause problems for people wanting to work on the manifold source with Cursive, and seems weird - can it be fixed?
Let me dissect that a bit.
I created a stream from a manifold.deferred.
When I call take
on the stream initially, it returns the value of the deferred (which is expected) and the stream is drained (also expected).
But then, when I call take
again, it should return nil
which it doesn't. Here's a sample repl session:
=> (def z (d/deferred))
'z
=> (def a (s/->source z))
'a
=> (d/success! z 42)
true
=> (s/drained? a)
false
=> @(s/take! a)
42
=> (s/drained? a)
true
=> @(s/take! a)
ClassCastException java.lang.Boolean cannot be cast to java.util.concurrent.Future clojure.core/deref-future (core.clj:2180)
The reason I even came across this is that I was hoping that the deferred which is returned from manifold.stream/reduce
could be used in a pipeline transparently. This is a trivial example of what I was wishing to do:
(->> (range 10)
s/->source
(s/reduce +)
s/->source)
I am using the manifold.bus
to communicate between components in my service.
The flow is usually like this:
event on api component -----> (1) store event in database
|---> (2) some coercions ---> send event to an external service
|---> (3) some other invocation when intercepting this message
The problem is, that in the api component, I don't know whether any of these consumers throw an exception. For the first two listeners it is crucial that the user knows if anything went wrong with his initial event. The easiest approach would be to make the API handler aware of the components handling (1) and (2) and process it sequentially. But I did like the approach of a central "nerve system" which totally decouples components and would mean adding new listeners without changing the dependency graph.
Could you give me a hint how this issue could be tackled the right "async/manifold" way? Thanks!
I have two source streams and a single sink which writes to a file.
When stream A closes, it closes the sink and stream B doesn't finish writing.
I see I can provide {:downstream? false}
on A, but doesn't that introduce a race condition? What if B happens to finish first?
Some of my deferreds yield a LazySeq. This seems to break manifold. Is there something I've misunderstood here? I can get around this by delazifying the result.
Here's an example of what I'm talking about:
@(d/chain [1 2 3] (fn [x] (map #(* 2 %) x)))
ClassCastException clojure.lang.LazySeq cannot be cast to java.util.concurrent.Future clojure.core/deref-future (core.clj:2180)
@(d/chain [1 2 3] (fn [x] (vec (map #(* 2 %) x))))
=> [2 4 6]
Example:
> (md/->deferred (clojure.core.async/chan))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred. manifold.deferred/->deferred (deferred.clj:129)
> (md/->deferred (clojure.core.async/promise-chan))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred. manifold.deferred/->deferred (deferred.clj:129)
md/chain
appears to treat the chan as just another value:
> (md/chain (clojure.core.async/promise-chan) inc)
#<ErrorDeferred@7e9ead5e: ClassCastException clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to java.lang.Number clojure.lang.Numbers.inc (Numbers.java:112)
icecap.handlers.core> (md/chain (clojure.core.async/chan) inc)
#<ErrorDeferred@1625c143: ClassCastException clojure.core.async.impl.channels.ManyToManyChannel cannot be cast to java.lang.Number clojure.lang.Numbers.inc (Numbers.java:112)
In all cases, I was expecting a deferred that would never fire (since the channel would never fire/close).
The documentation claims otherwise:
Values that can be coerced into a deferred include Clojure futures, Clojure promises, and core.async channels:
I don't know if the documentation is wrong or this is a bug. I could see how there's an impedance mismatch where you have to make sure the channel is closed before you can make the deferred fire?
I see that manifold.deferred futures are (via ->source
), but regular Clojure ones are not. Was this intentional? I'd be happy to try to make a patch if not.
I have the following connection between an Aleph websocket sink and a bus source:
(s/connect
(s/filter
(partial
room-accept-event?
idempotent-filter)
(bus/subscribe
chatrooms room))
socket)
I wanted to check if the FIFOCache
instance that's wrapped in idempotent-filter
was collected after the socket disconnects.
Before that, I've added on-drained
and on-closed
listeners on the above to track streams events. Things look great: I see the socket
close event immediately followed by the drained event from s/filter
, followed by the drained event from bus/subscribe
.
But when I inspect a heap dump (after GC) I see that there are still references maintained on the object I'm concerned about:
I'm concerned this could be a memory leak. This could also be a complete PEBKAC, whether I misinterpreted the heap dump or I forgot to perform a necessary clean-up on the streams to release related resources (I've tried {:upstream? true}
but to no avail). I've also tried removing the filter and in that case, the FIFOCache
instances are GCed away as expected.
Thanks for any light you can shed on this, even if it's for exposing my n00bness ๐
In the following case one would expect blocking put!
s to stream a
to wait forever after one has exceeded the batch size:
(ns test.batch
(:require [manifold.stream :as s]))
(def a (s/stream* {:permanent? true}))
(def b (s/batch 10 a))
(def c (s/stream))
(s/connect b c)
(s/close! c)
(time (dotimes [_ 10000] @(s/put! a :test)))
;; "Elapsed time: 45.356857 msecs"
If c
is not closed, put!
s to stream a
will eventually stop completing, as expected.
In aleph
there's an example for http that looks like:
(defn delayed-hello-world-handler
"Alternately, we can use a [core.async](https://github.com/clojure/core.async) goroutine to
create our response, and convert the channel it returns using
`manifold.deferred/->deferred`. This is entirely equivalent to the previous implementation."
[req]
(d/->deferred
(a/go
(let [_ (a/<! (a/timeout 1000))]
(hello-world-handler req)))))
When I try this, I get:
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred. manifold.deferred/->deferred (deferred.clj:129)
I can't get it to work using the REPL either:
$ lein repl
nREPL server started on port 59006 on host 127.0.0.1 - nrepl://127.0.0.1:59006
REPL-y 0.3.7, nREPL 0.2.10
Clojure 1.7.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_45-b14
Docs: (doc function-name-here)
(find-doc "part-of-name-here")
Source: (source function-name-here)
Javadoc: (javadoc java-object-or-class-here)
Exit: Control+D or (exit) or (quit)
Results: Stored in vars *1, *2, *3, an exception in *e
webapp.main=> (require '[clojure.core.async :as a])
nil
webapp.main=> (require '[manifold.deferred :as d])
nil
webapp.main=> (d/->deferred (a/chan))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred. manifold.deferred/->deferred (deferred.clj:129)
webapp.main=> (d/->deferred (a/go))
IllegalArgumentException cannot convert clojure.core.async.impl.channels.ManyToManyChannel to deferred. manifold.deferred/->deferred (deferred.clj:129)
webapp.main=> (d/deferrable? (a/go))
false
webapp.main=>
Am I missing something? Is this a bug, or is the example out of date?
I discovered this when I (whimsically) attempted to implement each middleware with go blocks. (Presumably I need a chain of deferreds but I never got far enough to work through the logic.)
My deps:
[aleph "0.4.1-alpha2" :exclusions [[clj-tuple]]]
[byte-streams "0.2.0"]
[primitive-math "0.1.4"]
[io.netty/netty-all "4.1.0.Beta5"]
[manifold "0.1.1-alpha3"]
[io.aleph/dirigiste "0.1.1"]
[riddley "0.1.10"]
[potemkin "0.4.1"]
[ch.qos.logback/logback-classic "1.1.3"]
[ch.qos.logback/logback-core "1.1.3"]
[org.slf4j/slf4j-api "1.7.7"]
[clj-tuple "0.2.2"]
[com.stuartsierra/component "0.2.3"]
[com.stuartsierra/dependency "0.1.1"]
[org.clojure/clojure "1.7.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]
[org.clojure/tools.analyzer.jvm "0.1.0-beta12"]
[org.clojure/core.memoize "0.5.6"]
[org.clojure/core.cache "0.6.3"]
[org.clojure/data.priority-map "0.0.2"]
[org.clojure/tools.analyzer "0.1.0-beta12"]
[org.ow2.asm/asm-all "4.1"]
[org.clojure/java.jdbc "0.4.1"]
[org.clojure/tools.logging "0.3.1"]
[org.postgresql/postgresql "9.4-1201-jdbc41"]
[pandect "0.5.3"]
[org.bouncycastle/bcprov-jdk15on "1.52" :exclusions [[org.clojure/clojure]]]
Probably related to #48
Here are some examples. The second one hangs on the deref.
Possibly a bug.?
(require '[manifold.deferred :as d])
;; an example with applying d/chain to a vector of functions.
;; ok
(deref (d/chain
nil
identity
(fn [_]
(d/chain 100
(fn [v]
(d/future (str v)))))
identity))
))
;; hangs!
(deref (apply d/chain
nil
[identity
(fn [_]
(d/chain 100
(fn [v]
(d/future (str v)))))
identity]))
;; is ok if one of other of the identity functions is omitted
(deref (apply d/chain
nil
[#_identity
(fn [_]
(d/chain 100
(fn [v]
(d/future (str v)))))
identity]))
;; but reduce is ok
(deref (reduce (fn [c n] (d/chain c n)) {}
[identity
(fn [_]
(d/chain 100
(fn [v]
(d/future (str v)))))
identity]))
I have a piece of software that I'm porting from core.async to manifold. It has a simple interpreter that interprets "plans", which consist of:
This is implemented with a simple dispatch multimethod:
(defmulti execute
(fn [x] (cond (set? x) ::unordered-plans
(vector? x) ::ordered-plans
:else (:type x))))
executing a step returns a chan (soon: stream) of results. Usually there's only 1 result on that stream, but e.g. an HTTP request for a large file might send progress reports every 10%.
The first implementation in core.async was:
(defmethod execute ::unordered-plans
[plans]
(async/merge (map execute plans)))
which was easy to port to:
(defmethod execute ::unordered-plans
[plans]
(ms/concat (ms/map execute plans)))
The second part was trickier. It was originally spelled:
(defmethod execute ::ordered-plans
[plans]
(let [out (chan)]
(go (loop [plans plans]
(let [results (<! (async/into [] (execute (first plans))))]
(<! (async/onto-chan out results false)))
(if (seq (rest plans))
(recur (rest plans))
(async/close! out))))
out))
My first thought was to write:
(defmethod execute ::ordered-plans
[plans]
(let [out (ms/stream)]
(ms/connect-via (ms/->source plans)
(fn [plan]
(ms/connect (execute plan) out))
out)
out))
... but connect doesn't tell me when (execute plan)
is done, so that doesn't work. Instead:
(defn impl-b
[plans]
(let [out (ms/stream)]
(ms/connect-via (ms/->source plans)
(fn [plan]
;; stream->seq's seq will block :(
(ms/put-all! out (ms/stream->seq (->source (execute plan)))))
out)
out))
... which works except for the seemingly unnecessary seq with blocking backpressure mechanics. I think I can just write it with deferred tricks:
(defn impl-c
[plans]
(let [out (ms/stream)]
(ms/connect-via (ms/->source plans)
(fn [plan]
(let [s (execute plan)]
(md/loop [v (ms/take! s)]
(when (some? v)
(ms/put! out v)
(recur (ms/take! s)))
(ms/close! s))))
out)
out))
... but that seems more involved than it needs to be, since it's spelling most of put-all!
. Am I missing something, or is this how you write it? I guess what I want is put-all!
for streams instead of seqs?
Is this a pattern you've noticed before? On a related note, (let [...] (connect(-via) in f out) out)
seems to be a recurring pattern; do you have any opinions on the return values of connect(-via)?
These two should be identical:
(ns let-flows-not
(:require [manifold.deferred :as d]
[manifold.stream :as s]))
(let [k (d/deferred)
q (s/stream)
x (d/chain' (s/try-put! q :x 10)
(fn [_]
k
{:result :chain}))]
(Thread/sleep 20)
(prn :debug :x (d/realized? x))
(when (d/realized? x)
(prn :debug :x @x)))
(let [k (d/deferred)
q (s/stream)
x (d/let-flow' [_ (s/try-put! q :y 10)]
k
{:result :let-flow})]
(Thread/sleep 20)
(prn :debug :x (d/realized? x))
(when (d/realized? x)
(prn :debug :x @x)))
But using [manifold "0.1.1-alpha3"]
the output is:
:debug :x true
:debug :x {:result :chain}
:debug :x false
It seems the second let
is blocking because k
is not realized.
I have some problem detecting when to close the output stream in the example below. I do not know if it is ok to close the file in the on-drained callback.
I noticed different behaviour when using stream and source as described in the code below.
Whats the recommended pattern for writing a stream down to a file as attempted here?
(require '[byte-streams :as bs])
(require '[clojure.java.io :as io])
(require '[manifold.deferred :as d])
(require '[manifold.stream :as s])
(defn bug? [s]
(let [out (io/output-stream "c:/tmp/f3.txt")]
(->>
s
(s/map bs/to-byte-array)
((fn [s] (s/on-drained s
(fn []
(println "2") (flush)
(.flush out) (.close out)
))
s))
((fn [source]
(s/consume (fn [v]
(.write out v)
(when (s/drained? source)
(println "1") (flush)
(.flush out) (.close out)
)) source))))))
;; case 1 prints: "2"
(let [s (s/stream)]
(s/put! s "test")
(bug? s)
(s/close! s))
;; case 2 prints: "2" and "1" and then exception cause of writing to closed file
(let [s (-> (byte-array [1 2 3])
(bs/to-byte-buffers)
(s/->source))]
(bug? s)
(s/close! s))
After tearing my hair out for a bit tracking this down, I think I've narrowed it to the util/future-with
macro when using an Executor
instance returned by dirigiste . A basic sometimes-repro case goes like this:
(dotimes
[n1 100]
(let
[e (exec/fixed-executor 5)]
(dotimes
[n2 5]
(md/future-with e (println (str "inside worker " n1 ":" n2))))
(.shutdown e)))
More often than not, this will throw a java.util.concurrent.RejectedExecutionException
at Executor.java:299. I installed a custom build of dirigiste with some additional logging output which traces the values of numWorkers
and numThreads
/maxThreadCount
in Executors.java, and in both the fixedController
and utilizationController
implementations of shouldIncrement
I was seeing an additional erroneous call with numWorkers
failing the less-than inequality check.
If I had to guess, I'd say something funky is happening with the thread-binding management within the future-with
macro. Maybe there's some non-threadsafe mutable state lurking in the Executor implementation logic?
Let me know if there's anything I can do to further help debug this. If it turns out the bug is a dirigiste issue rather than manifold, feel free to close this and I'll re-open the issue over on that project.
Hey Zach,
What are your thoughts on allowing the return of deferred values from d/catch
? Currently deferreds from d/catch
aren't "flattened" as they are with d/chain
and friends. e.g.:
user=> (d/catch (d/error-deferred (Exception.)) (fn [e] (d/success-deferred :delayed-recovery)))
<< << :delayed-recovery >> >>
I find myself wanting to do this so that I can catch errors while in a d/loop
, and d/recur
once I've recovered (asynchronously) from the error (e.g., performing a "cooldown" sleep before recurring.)
Thanks,
Brian
these are my dependencies
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]
[manifold "0.1.0"]]
and these are the steps to reproduce the problem
> (require '[clojure.core.async :as a])
nil
> (require '[manifold.stream :as s])
nil
> (def c (a/chan))
#'c
> (def si (s/->sink c))
#'si
> @(s/try-put! si ::input 1000 ::timeout)
Exception in thread "async-dispatch-1" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: clojure.lang.Keyword
at clojure.core$_cache_protocol_fn.invoke(core_deftype.clj:544)
at clojure.core.async.impl.protocols$eval512$fn__513$G__503__520.invoke(protocols.clj:15)
at clojure.core.async$do_alts$fn__5334.invoke(async.clj:231)
at clojure.core.async$do_alts.invoke(async.clj:223)
at clojure.core.async$ioc_alts_BANG_.doInvoke(async.clj:359)
at clojure.lang.RestFn.invoke(RestFn.java:494)
at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917$state_machine__5209__auto____8918$fn__8920.invoke(async.clj:145)
at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917$state_machine__5209__auto____8918.invoke(async.clj:145)
at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
at manifold.stream.async.CoreAsyncSink$f__8900$fn__8917.invoke(async.clj:145)
at clojure.lang.AFn.run(AFn.java:22)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
It has manifold.stream.core, but looks like it should be manifold.stream.seq
i came across this while debugging another problem
(defn chain-delay-deferred
[n t]
(let [f (manifold.deferred/deferred)]
(future (Thread/sleep t) (manifold.deferred/success! f 100))
(reduce (fn [d i] (manifold.deferred/chain d (fn [v] (+ v i))))
f
(range 0 n))))
@(chain-delay-deferred 100 1000) ;; 5050
@(chain-delay-deferred 100000 1000) ;; hangs
there are some values of n where it hangs on some runs and returns on other runs. on my machine
@(chain-delay-deferred 4567 1000)
seems to hang about half the time
i ran this against the current tip of master 4e1fff70c5380b33e9cc2f50f03ce16af6d393b4
, but i was observing the same problem against 0.1.1-alpha3
Add some way using consume to have an on-drained event that will be delivered after all messages have been processed by consume. This is needed for logic where running on-drained prior to processing all messages in the stream would be not optimal.
This might be related to #48, but I'm not sure. I was running into a problem where my code kept hitting an OutOfMemory error when it was compiling with `lein run`` (in my IDE, I instead got an error about clojure.core/long not being found...I think there's a subtle difference in how they build). I eventually tracked it down to the length of a (d/chain ...) call inside a core.async go block. So I'm guessing this has to do with the interaction of multiple macros...
I was able to demonstrate it in the repl on manifold 0.1.1-alpha4 (Clojure 1.7.0 and Java 1.8.0_60-b27) by tweaking the example from https://github.com/ztellman/manifold/blob/master/docs/deferred.md#composing-with-deferreds
clj.core=> (def d (d/deferred))
#'clj.core/d
clj.core=> (d/chain d inc inc inc inc inc inc #(println "x + 3 =" %))
<< โฆ >>
clj.core=> (d/success! d 0)
x + 3 = 6
true
That works with even a long chain of functions (hence why I added inc's).
clj.core=> (def d (d/deferred))
#'clj.core/d
clj.core=> (go (d/chain d inc inc inc inc inc inc #(println "x + 3 =" %)))
ExceptionInfo Class not found: clojure.core/long clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc inc inc #(println "x + 3 =" %)))
ExceptionInfo Class not found: clojure.core/long clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc inc #(println "x + 3 =" %)))
ExceptionInfo Class not found: clojure.core/long clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc inc #(println "x + 3 =" %)))
ExceptionInfo Class not found: clojure.core/long clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc inc #(println "x + 3 =" %)))
ExceptionInfo Class not found: clojure.core/long clojure.core/ex-info (core.clj:4593)
clj.core=> (go (d/chain d inc #(println "x + 3 =" %)))
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x7f78eaab "clojure.core.async.impl.channels.ManyToManyChannel@7f78eaab"]
clj.core=> (d/success! d 0)
x + 3 = 1
true
In this case, it works for three parameters to d/chain and no more.
Unfortunately, I'm fairly new to Clojure, so debugging this is probably beyond my ability. But, I did see that three arguments is the breakpoint for some code in that function, so that's probably a clue...
https://github.com/ztellman/manifold/blob/master/src/manifold/deferred.clj#L922-L925
I'm probably about to write the world's dumbest implementation of d/chain to fix this for my case, but it would be great if we could figure out if this is a core.async issue or a manifold issue and find a way to fix it...
I've recently written some retry logic using manifold.deferred
and manifold.time
that uses exponential back off to retry asynchronous operations. The current implementation is available here and its tests are here.
Do you think this is something that you might like to integrate into manifold? If so, I'd be happy to open up and work on a PR.
Here's what I'm seeing:
(def s (ms/stream))
(dotimes
[n 10]
(doto
(Thread.
(fn
[]
(loop
[]
(when-let
[x @(ms/take! s)]
(recur)))
(println (str "Thread" n "ended"))))
(.start)))
s ;; << stream: {:pending-puts 0, :drained? false, :buffer-size 0,
;; :permanent? false, :type "manifold", :sink? true, :closed? false,
;; :pending-takes 10, :buffer-capacity 0, :source? true} >>
(ms/close! s) ;; "Thread 0 ended"
s ;; << stream: {:pending-puts 0, :drained? true, :buffer-size 0,
;; :permanent? false, :type "manifold", :sink? true, :closed? false,
;; :pending-takes 9, :buffer-capacity 10, :source? true} >>
Since all 10 threads are blocking on dereferencing the deferred returned by take!
, I would expect that once close!
is called on the stream, they'd all get nil
and each would end. Instead, only one pending take seems to resolve, and the rest are kind of spinlocked.
Bug? Or am I making an incorrect assumption about the way close!
works with the take!
function? Would the drained?
fix you made affect this? I haven't updated to the snapshot build yet, but I could if that might make a difference.
When a Manifold source is created by coercing a core.async channel with ->source it doesn't become drained when the channel is closed. This behavior differs from what happens with connect and therefore doesn't feel intuitive to me. Could this be a bug or is there some reason why this is actually the intended behavior?
> (require `[manifold.source :as s])
nil
> (require '[clojure.core.async :as a])
nil
> (def ch (a/chan))
#'ch
> (def source (s/->source ch))
#'source
> (a/close! ch)
nil
> (s/drained? source)
false
> (s/take! source ::drained)
<< ::drained >>
> (def ch (a/chan))
#'ch
> (def stream (s/stream))
#'stream
> (s/connect ch stream)
nil
> (a/close! ch)
nil
> (s/drained? stream)
true
> (s/take! stream ::drained)
<< ::drained >>
It seems that the semantics of manifold.deferred/catch
change depending on whether or not the provided deferred value has already been realized. I would expect this function to work the same as manifold.deferred/timeout!
works, always returning a new deferred value. Does this expectation make sense to you? Otherwise it seems that my code would have to check if the value has been realized which breaks the pattern.
See my example below. I would expect the final two values to be equivalent. That is, I would expect them both to be deferred values containing "badness".
#user> (def d (d/deferred))
#'user/d
user> (def d' (d/catch d Exception #(.getMessage %)))
#'user/d'
user> (d/error! d (Exception. "badness"))
true
user> (d/catch d Exception #(.getMessage %))
"badness"
user> d'
<< "badness" >>
Given the following:
(deftest test-connect-filter
(let [s1 (s/stream)
s2 (s/stream)]
(s/connect s1 s2 {:upstream? true})
(s/close! s2)
(is (s/closed? s2))
(is (s/closed? s1)))
the test fails as s1
is still open.
According to the documentation of connect
I would expect both s1
and s2
to be closed after closing s2
, even without specifying the upstream?
flag as s2
is the only downstream sink. However, even when :upstream?
is true, s1
stays open.
Could you please take a look if is either a gross misunderstanding on my side or a bug?
I have a paginated REST API, and I'm trying to shovel all of its data into a stream. Every request to the API contains n maps of data (that go into a stream), and, maybe, a link to the next page. I have something like this:
(defn scans!
[]
(let [urls-stream (ms/stream 100) ;; absolutely no science here
scans-stream (ms/stream 1000) ;; nor here
shovel (fn [url]
(md/chain
(get-page! url)
(fn [{:keys [pagination scans]}]
(ms/put! urls-stream (:next pagination))
(ms/put-all! scans-stream scans))))]
(ms/consume-async shovel urls-stream)
reports-stream))
This somewhat communicates backpressure in that as soon as scans-stream
is full or closed, put-all!
will return false, and consume-async
will interpret that as time to abort. What I really wanted (I think) is a deferred that fires with true
once everything has successfully been put on scans-stream
. That would stop consume-async
from consuming another URL temporarily, which is the backpressure I was looking for.
I think that means I want something like:
(defn carefully-put!
"Try putting msg on sink, waiting at most timeout msec. If it fails, try
again indefinitely. Returns a deferred that fires true once successful.
"
[sink msg timeout]
(md/loop [success? false]
(if success?
true
(md/recur (ms/try-put! sink msg timeout)))))
(defn carefully-put-all!
"Attempts to put msgs on sink, in order. Returns a deferred that
fires with true when all msgs have successfully been put onto sink."
[sink msgs timeout]
(md/loop [[x & xs] (seq msgs)]
(carefully-put! sink x timeout)
(if (seq xs)
(md/recur xs)
true)))
(WARNING: I have not ran this, there might be stupid bugs or typos.)
Presumably this needs to be a bit more clever. For example, if the sink is closed, or becomes closed in the mean while, it should stop trying and just return false.
Thoughts?
Based on the stream documentation, the semantics of the drained?
function are described as:
Sources that will never produce any more messages (often because the corresponding sink is closed) is said to be drained. We may check whether a source is drained via drained? and on-drained.
I read this to mean that a source which is both closed, and which also has no pending puts, i.e. it will never produce another message because no more can be added and all pending have been taken.
However, this is the behavior I'm seeing:
(def a (ms/stream))
(ms/put! a 123)
(ms/put! a 123)
(ms/close! a)
a ;; << stream: {:pending-puts 2, :drained? true, :buffer-size 0,
;; :permanent? false, :type "manifold", :sink? true, :closed? true,
;; :pending-takes 0, :buffer-capacity 0, :source? true} >>
@(ms/take! a) ;; 123
@(ms/take! a) ;; 123
@(ms/take! a) ;; nil
I would expect that closed?
would retrurn true
immediately after the called close!
is resolved, but drained?
would only return true after the second call to take!
has been resolved.
Am I understanding the intent correctly?
On a fresh checkout of manifold, lein codox
fails due to a missing plugin ('codox' is not a task. See 'lein help'.
). I don't know if that's intended to work, but there's a codox in plugins, so I'm guessing it is. I was able to build the documentation by switching codox to lein-codox and updating the version to something available on Clojars as follows:
diff --git i/project.clj w/project.clj
index a2dc4d3..5cb811b 100644
--- i/project.clj
+++ w/project.clj
@@ -16,7 +16,7 @@
:benchmark :benchmark
:stress #(or (:stress %) (= :stress (:tag %)))
:all (constantly true)}
- :plugins [[codox "0.8.10"]
+ :plugins [[lein-codox "0.9.4"]
[lein-jammin "0.1.1"]
[ztellman/lein-cljfmt "0.1.10"]]
:cljfmt {:indents {#".*" [[:inner 0]]}}
(after which lein codox worked fine)
One of deferred's critical features is that it knows how to represent errors. However, chain, one of the most common tools (at least for me) for working with deferreds, doesn't document what happens when its input deferred contains an error, or any of the intermediate fns raises.
I have a very simple app (less than 200 lines of code) and I thought Manifold would be a good way to set up a simple jobs queue, but when I run the app, after about 15 to 30 minutes, I get this error:
Aug 10, 2015 11:37:19 PM clojure.tools.logging$eval36$fn__40 invoke
WARNING: excessive pending puts (> 16384), closing stream
java.lang.IllegalStateException
at manifold.stream.default.Stream.put(default.clj:111)
at manifold.stream.default.Stream.put(default.clj:139)
at eh.streams$source_worker$fn__141.invoke(streams.clj:58)
at eh.streams$source_worker.invoke(streams.clj:55)
I have a "start" function that spins up 5 "source" workers and 5 "destination" workers:
(defn start
[map-of-config-info]
(dotimes [_ 5]
(future (source-worker map-of-config-info))
(future (destination-worker))))
The source-worker pulls records out of Redis and puts them in an anonymous function for some processing and then puts the function on a stream. The destination-worker simply pulls the functions off the stream and executes them:
(defn destination-worker []
(loop [event-fn (stream/take! event-stream)]
(@event-fn)
(recur (stream/take! event-stream))))
I'm guessing the destination-workers run slowly, because they do all the work, whereas the source-workers do very little work, and so I would guess that slowly, over the course of 20 minutes or so, the puts! build up in excess of the takes!, and so I get this error? How could I test that?
I will experiment with having more destination-workers and less source-workers, though that seems a bit inexact.
The documentation says that bus/publish!
publishes a message on the bus, returning a deferred result representing the message being accepted by all subscribers.
That's not what I could see from testing it: if an external consumer fails to receive a message, that function still returns a deferred true
.
I've been digging through the logic of the take
implementation in default.clj
while debugging #27. Possible I found an unrelated bug on https://github.com/ztellman/manifold/blob/master/src/manifold/stream/default.clj#L176:
;; add to the consumers queue
(if (and timeout (<= timeout 0)) ;; Should be (>= timeout 0) here?
(d/success-deferred timeout-val executor)
(let [d (d/deferred executor)]
(d/timeout! d timeout timeout-val)
(let [c (Consumer. d default-val)]
(if (.offer consumers c)
d
c))))
Unless I'm missing something (very possible!) a valid timeout
value should always be some positive integer. As such, timeout
will never validly evaluate to <= 0, and this and
will always hit the false path.
I updated to aleph 0.4.1-beta6
, which pulls in manifold 0.1.4-alpha1
.
On the old case:
(defn- send!
[socket data]
(let [rc @(stream/put! socket (codec/encode data))]
(log/info "RC:" rc)
rc))
This function (socket == websocket stream) returns true
or false
if the message makes it through.
With 0.1.4-alpha
, it now returns nil.
Bug?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.