Code Monkey home page Code Monkey logo

streaming's Introduction

streaming Hackage version

Contents

§ 1. The freely generated stream on a streamable functor

§ 2. A freely generated stream of individual Haskell values is a Producer, Generator or Source

§ 3. Streaming.Prelude

§ 4. Mother's Prelude v. Streaming.Prelude

§ 5. How come there's not one of those fancy "ListT done right" implementations in here?

§ 6. Didn't I hear that free monads are a dog from the point of view of efficiency?

§ 7. Interoperation with the streaming-io libraries

§ 8. Where can I find examples of use?

§ 9. Problems

§ 10. Implementation and benchmarking notes


§ 1. The freely generated stream on a streamable functor

Stream can be used wherever FreeT or Coroutine are used. The compiler's standard range of optimizations work better for operations written in terms of Stream. Stream f m r, like FreeT f m r or Couroutine f m r - is of course extremely general, and many functor-general combinators are exported by the general module Streaming.

In the applications we are thinking of, the general type Stream f m r expresses a succession of steps arising in a monad m, with a shape determined by the 'functor' parameter f, and resulting in a final value r. In the first instance you might read Stream as Repeatedly, with the understanding that one way of doing something some number of times, is to do it no times at all.

Readings of f can be wildly various. Thus, for example,

 Stream Identity IO r

is the type of an indefinitely delayed IO r, or an extended IO process broken into stages marked by the Identity constructor. This is the Trampoline type of the "Coroutine Pipelines" tutorial, and the IterT of the free library (which is mysteriously not identified with FreeT Identity - all of the associated combinators are found within the general Streaming module.)

In particular, though, given readings of f and m we can, for example, always consider the type Stream (Stream f m) m r, in which steps of the form Stream f m are joined end to end. Such a stream-of-streams might arise in any number of ways; a crude (because hyper-general) way would be with

chunksOf :: Monad m, Functor f => Int -> Stream f m r -> Stream (Stream f m) m r

and we can always rejoin such a stream with

concats ::  Monad m, Functor f =>  Stream (Stream f m) m r -> Stream f m r

But other things can be chunked and concatenated in that sense; they need not themselves be explicitly represented in terms of Stream; indeed chunksOf and concats are modeled on those in pipes-group. In our variant of pipes-group, these have the types

chunksOf :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r
concats ::  Monad m =>  Stream (Producer a m) m r -> Producer a m r

§ 2. A freely generated stream of individual Haskell values is a Producer, Generator or Source

Of course, as soon as you grasp the general form of succession you are already in possession of the most basic concrete form: a simple succession of individual Haskell values one after another, the effectful list or sequence. This is just Stream ((,) a) m r. Here we prefer to write Stream (Of a) m r, strictifying the left element of the pair with

data Of a r = !a :> r deriving Functor

Either way, the pairing just links the present element with the rest of the stream. The primitive yield statement just expresses the pairing of the yielded item with the rest of the stream; or rather it is itself the trivial singleton stream.

yield 17  :: Stream (Of Int) IO ()

Streaming.Prelude is focused on the manipulation of this all-important stream-form, which appears in the streaming IO libraries under titles like:

io-streams: Generator a r
pipes:      Producer a m r
conduit:    ConduitM () o m r
streaming:  Stream (Of a) m r

The only difference is that in streaming the simple generator or producer concept is formulated explicitly in terms of the general concept of successive connection. But this is a concept you need and already possess anyway, as your comprehension of the streaming ABCs showed.

The special case of a stream of individual Haskell values that simply comes to an end without a special result is variously expressed thus:

io-streams: InputStream a 
pipes:      Producer a m ()
conduit:    Source m a
machines:   SourceT m a (= forall k. MachineT m k a)
streaming:  Stream (Of a) m ()

Note that the above libraries generally employ elaborate systems of type synonyms in order to intimate to the reader the meaning of specialized forms. io-streams is an exception. This libary is completely opposed to this tendency, and exports no synonyms.

§ 3. Streaming.Prelude

Streaming.Prelude closely follows Pipes.Prelude. But since it restricts itself to use only of the general idea of streaming, it cleverly omits the pipes:

ghci> S.stdoutLn $ S.take 2 S.stdinLn
let's<Enter>
let's
stream<Enter>
stream

Here's a little connect and resume, as the streaming-io experts call it:

ghci> rest <- S.print $ S.splitAt 3 $ S.each [1..10]
1
2
3
ghci> S.sum rest
49

Somehow, we didn't even need a four-character operator for that, nor advice about best practices! - just ordinary Haskell common sense.

§ 4. Mother's Prelude v. Streaming.Prelude

The effort of Streaming.Prelude is to leverage the intuition the user has acquired in mastering Prelude and Data.List and to elevate her understanding into a general comprehension of effectful streaming transformations. Unsurprisingly, it takes longer to type out the signatures. It cannot be emphasized enough, though, that the transpositions are totally mechanical:

Data.List.Split.chunksOf :: Int -> [a]          -> [[a]]
Streaming.chunksOf       :: Int -> Stream f m r -> Stream (Stream f m) m r

Prelude.splitAt   :: Int -> [a]          -> ([a],[a])
Streaming.splitAt :: Int -> Stream f m r -> Stream f m (Stream f m r)

These concepts are "functor general", in the jargon used in the documentation, and are thus exported by the main Streaming module. Something like break requires us to inspect individual values for their properties, so it is found in the Streaming.Prelude

Prelude.break           :: (a -> Bool) -> [a]               -> ([a],[a])
Streaming.Prelude.break :: (a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)

It is easy to prove that resistance to these types is resistance to effectful streaming itself. I will labor this point a bit more below, but you can also find it developed, with greater skill, in the documentation for the pipes libraries.

§ 5. How come there's not one of those fancy "ListT done right" implementations in here?

The use of the final return value appears to be a complication, but in fact it is essentially contained in the idea of effectful streaming. This is why this library does not export a _ListT done right/, which would be simple enough - following pipes, as usual:

newtype ListT m a = ListT (Stream (Of a) m ())

The associated monad instance would wrap

yield :: (Monad m)            => a -> Stream (Of a) m ()
for   :: (Monad m, Functor f) => Stream (Of a) m r -> (a -> Stream f m ()) -> Stream f m r

To see the trouble, consider this signature for splitting a ListT very much done right. Here's what becomes of chunksOf. As long as we are trapped in some sort of ListT, however much rightly implemented, these operations can't be made to stream; something like a list must be accumulated. Similarly, try to imagine adding a splitAt or lines function to this API. It would accumulate strict text forever, just as this does and this doesn't and this doesn't The difference is simply that the latter libraries operate with the general concept of streaming, and the whole implementation is governed by it. The attractions of the various "ListT done right" implementations are superficial; the concept belongs to logic programming, not stream programming.

Note similarly that you can write a certain kind of take and drop with the machines library - as you can even with a "ListT done right". But I wish you luck writing splitAt! Similarly you can write a getContents; but I wish you luck dividing the resulting bytestream on its lines. This is - as usual! - because the library was not written with the general concept of effectful succession or streaming in view. Materials for sinking some elements of a stream in one way, and others in other ways - copying each line to a different file, as it might be, but without accumulation - are documented within. So are are myriad other elementary operations of streaming io.

§ 6. Didn't I hear that free monads are a dog from the point of view of efficiency?

We noted above that if we instantiate Stream f m r to Stream ((,) a) m r or the like, we get the standard idea of a producer or generator. If it is instantiated to Stream Identity m r then we have the standard _free monad construction/. This construction is subject to certain familiar objections from an efficiency perspective; efforts have been made to substitute exotic cps-ed implementations and so forth. It is an interesting topic.

But in fact, the standard alarmist talk about retraversing binds and quadratic explosions and costly appends, and so on become transparent nonsense with Stream f m r
in its streaming use. The conceptual power needed to see this is basically nil: Where m is read as IO, or some transformed IO, then the dreaded retraversing of the binds in a stream expression would involve repeating all the past actions. Don't worry, to get e.g. the second chunk of bytes from a handle, you won't need to start over and get the first one again! The first chunk has vanished into an unrepeatable past.

All of the difficulties a streaming library is attempting to avoid are concentrated in the deep irrationality of

sequence :: (Monad m, Traversable t) => t (m a) -> m (t a)

In the streaming context, this becomes

sequence :: Monad m, Functor f => Stream f m r -> Stream f m r
sequence = id

It is of course easy enough to define

accumulate :: Monad m, Functor f => Stream f m r -> m (Stream f Identity r)

or reifyBindsRetraversingWherePossible or _ICan'tTakeThisStreamingAnymore, as you might call it. The types themselves teach the user how to avoid or control the sort of accumulation characteristic of sequence in its various guises e.g. mapM f = sequence . map f and traverse f = sequence . fmap f and replicateM n = sequence . replicate n. See for example the types of

Control.Monad.replicateM :: Int -> m a -> m [a]
Streaming.Prelude.replicateM :: Int -> m a -> Stream (Of a) m ()

If you want to tempt fate and replicate the irrationality of Control.Monad.replicateM, then sure, you can define the hermaphroditic chimera

accumulate . Streaming.Prelude.replicateM :: Int -> m a -> m (Stream (Of a) Identity ())

which is what we find in our diseased base libraries. But once you know how to operate with a stream directly you will see less and less point in what is called extracting the (structured) value from IO. Consider the apparently innocent distinction between

"getContents" :: String

and

getContents :: IO String 

Omitting consideration of eof, we might define getContents thus

getContents = sequence $ repeat getChar

There it is again! The very devil! By contrast there is no distinction between

"getContents" :: Stream (Of Char) m ()  -- the IsString instance is monad-general

and

getContents :: MonadIO m => Stream (Of Char) m ()

They unify just fine. That is, if I make the type synonym

type String m r = Stream (Of Char) m r

I get, for example:

"getLine"                              :: String m  ()
getLine                                :: String IO ()
"getLine" >> getLine                   :: String IO ()
splitAt 20 $ "getLine" >> getLine      :: String IO (String IO ())
length $ "getLine" >> getLine          :: IO Int

and can dispense with half the advice they will give you on #haskell. It is only a slight exaggeration to say that a stream should never be "extracted from IO".

With sequence and traverse, we accumulate a pure succession of pure values from a pure succession of monadic values. Why bother if you have intrinsically monadic conception of succession or traversal? Stream f m r gives you an immense body of such structures and a simple discipline for working with them. Spinkle id freely though your program, under various names, if you get homesick for sequence and company.

§ 7. Interoperation with the streaming-io libraries

The simplest form of interoperation with pipes is accomplished with this isomorphism:

Pipes.unfoldr Streaming.next        :: Stream (Of a) m r   -> Producer a m r
Streaming.unfoldr Pipes.next        :: Producer a m r      -> Stream (Of a) m r                     

Of course, streaming can be mixed with pipes wherever pipes itself employs Control.Monad.Trans.Free; speedups are frequently appreciable. (This was the original purpose of the main Streaming module, which just mechanically transposes a simple optimization employed in Pipes.Internal.) Interoperation with io-streams is thus:

Streaming.reread IOStreams.read     :: InputStream a       -> Stream (Of a) IO ()
IOStreams.unfoldM Streaming.uncons  :: Stream (Of a) IO () -> IO (InputStream a)

A simple exit to conduit would be, e.g.:

Conduit.unfoldM Streaming.uncons    :: Stream (Of a) m ()  -> Source m a

These conversions should never be more expensive than a single >-> or =$=.

At a much more general level, we also of course have interoperation with free:

Free.iterTM  Stream.wrap              :: FreeT f m a -> Stream f m a
Stream.iterTM Free.wrap               :: Stream f m a -> FreeT f m a 

§ 8. Where can I find examples of use?

For some simple ghci examples, see the commentary throughout the Prelude module. For slightly more advanced usage see the commentary in the haddocks of streaming-bytestring and e.g. these replicas of shell-like programs from the io-streams tutorial. Here's a simple streaming GET request with intrinsically streaming byte streams. Here is a comically simple 'high - low' game

§ 9. Problems

Questions about this library can be put as issues through the github site or on the pipes mailing list. (This library understands itself as part of the pipes "ecosystem.")

§ 10. Implementation and benchmarking notes

This library defines an optimized FreeT with an eye to use with streaming libraries, namely:

data Stream f m r
     = Return r
     | Step !(f (Stream f m r))
     | Effect (m (Stream f m r))

in place of the standard FreeT that we find in the free library, which is approximately:

newtype FreeT f m r = FreeT {runFreeT :: m (Either r (f (FreeT f m r)))}

Rather than wrapping each step in a monadic 'layer', such a layer is put alongside separate 'pure' constructors for a functor 'layer' and a final return value. The maneuver is very friendly to the compiler, but requires a bit of subtlety to protect a sound monad instance. Just such an optimization is adopted internally by the pipes library. As in pipes, the constructors are here left in an Internal module; the main Streaming module exporting the type itself and various operations and instances.

I ran a simple benchmark (adjusting a script of John Wiegley) using a very simple composition of functions:

toList 
. filter (\x -> x `mod` 2 == 0) 
. map (+1) 
. drop 1000 
. map (+1) 
. filter even 
. each

as it interpreted by various libraries - streaming, conduit, io-streams and machines.

The results were fairly pleasing:

benchmarking sum/streaming
time                 8.996 ms   (8.910 ms .. 9.068 ms)
                     0.999 R²   (0.998 R² .. 1.000 R²)
mean                 9.060 ms   (9.004 ms .. 9.122 ms)
std dev              164.6 μs   (123.9 μs .. 251.9 μs)

benchmarking sum/conduit
time                 15.77 ms   (15.66 ms .. 15.89 ms)
                     0.999 R²   (0.998 R² .. 1.000 R²)
mean                 15.78 ms   (15.70 ms .. 15.89 ms)
std dev              245.3 μs   (176.5 μs .. 379.7 μs)

benchmarking sum/pipes
time                 57.94 ms   (57.68 ms .. 58.27 ms)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 58.10 ms   (57.92 ms .. 58.27 ms)
std dev              324.2 μs   (214.1 μs .. 468.8 μs)

benchmarking sum/iostreams
time                 61.96 ms   (61.36 ms .. 62.53 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 61.80 ms   (61.54 ms .. 62.08 ms)
std dev              543.7 μs   (375.1 μs .. 715.7 μs)

benchmarking sum/machine
time                 260.4 ms   (257.2 ms .. 263.6 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 259.7 ms   (258.4 ms .. 260.6 ms)
std dev              1.284 ms   (565.9 μs .. 1.690 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking basic/streaming
time                 74.86 ms   (70.07 ms .. 78.78 ms)
                     0.994 R²   (0.987 R² .. 0.999 R²)
mean                 78.25 ms   (75.55 ms .. 84.10 ms)
std dev              6.301 ms   (1.995 ms .. 10.17 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking basic/conduit
time                 90.06 ms   (66.61 ms .. 114.4 ms)
                     0.876 R²   (0.658 R² .. 0.977 R²)
mean                 98.63 ms   (85.28 ms .. 116.5 ms)
std dev              23.06 ms   (10.61 ms .. 30.72 ms)
variance introduced by outliers: 65% (severely inflated)

benchmarking basic/pipes
time                 180.9 ms   (158.7 ms .. 201.3 ms)
                     0.989 R²   (0.971 R² .. 1.000 R²)
mean                 190.5 ms   (183.0 ms .. 197.8 ms)
std dev              10.16 ms   (4.910 ms .. 14.86 ms)
variance introduced by outliers: 14% (moderately inflated)

benchmarking basic/iostreams
time                 269.7 ms   (243.8 ms .. 303.9 ms)
                     0.995 R²   (0.985 R² .. 1.000 R²)
mean                 264.2 ms   (254.0 ms .. 272.0 ms)
std dev              10.87 ms   (5.762 ms .. 15.06 ms)
variance introduced by outliers: 16% (moderately inflated)

benchmarking basic/machine
time                 397.7 ms   (324.4 ms .. 504.8 ms)
                     0.992 R²   (0.977 R² .. 1.000 R²)
mean                 407.7 ms   (391.1 ms .. 420.3 ms)
std dev              19.40 ms   (0.0 s .. 21.88 ms)
variance introduced by outliers: 19% (moderately inflated)

 

This sequence of pre-packaged combinators is, I think, as friendly as it could possibly be to the more recent conduit fusion framework. That framework of course doesn't apply to user-defined operations; there we should expect times like those shown for pipes. Since the combinators from streaming are defined with naive recursion, more or less as the user might, we have reason to think this result is characteristic, but much more benchmarking is needed before anything can be said with certainty.

streaming's People

Contributors

aherrmann avatar andreaspk avatar andrewthad avatar bardurarantsson avatar bodigrim avatar bravit avatar chessai avatar danidiaz avatar fosskers avatar galenhuntington avatar ggreif avatar ivan-m avatar jhrcek avatar kccqzy avatar larskuhtz avatar michaelt avatar mitchellwrosen avatar ocharles avatar ondrap avatar pierrer avatar quasicomputational avatar ryanglscott avatar sergv avatar shlok avatar tikhonjelvis avatar tomjaguarpaw avatar torgeirsh avatar treeowl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streaming's Issues

Should effectful streams *ever* be passed as arguments to more than one function?

Let's say I have an IO-based Stream. Is it ever a good idea to pass the same Stream value to two different functions? As opposed of passing it to a function that returns a new Stream, or to a function that eliminates the Stream and brings us back to the base monad.

If the answer is no, perhaps we should document it, because newcomers to the library might not be aware of the need to follow that discipline (linear types could make it a compile-time restriction, but we don't have them in Haskell yet).

Is there any counterexample to this?

Perhaps use explicit "fold" style

Many streaming functions factor through a "fold" that guarantees that they consume their arguments sequentially and transform them with monad homomorphisms before re-emitting them. (cf. foldMap). Is this style something you're interested in pursuing, either internally or in the public API?

{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ExistentialQuantification #-}

import qualified Streaming as S
import qualified Streaming.Prelude as S
import qualified Control.Monad.Free as F
import qualified Control.Monad.Trans.Writer as W
import qualified Data.Monoid as M

data Fold f m n = forall o. Monad o => Fold (forall a. f a -> o a)
                                            (forall a. m a -> o a)
                                            (forall a. o a -> n a)


fold :: (Functor f,
         Monad n,
         Monad m)
     => Fold f m n
     -> S.Stream f m a
     -> n a
fold fold s = case fold of
  Fold ff fm fo -> fo (S.iterT f (S.hoist fm s))
    where f = S.join . ff

erase :: Monad m
      => Fold (S.Of t) m (S.Stream S.Identity m)
erase = Fold f S.lift id
  where f (_ S.:> rest) = S.yields (S.Identity rest)

filter :: Monad m
       => t
       -> Fold (S.Of Bool) m (S.Stream (S.Of Bool) m)
filter pred = Fold f S.lift id
  where f (a S.:> rest) = if a
                          then S.yield a >> return rest
                          else return rest

with :: (Monad m, Functor f)
     => (a -> f x)
     -> Fold (S.Of a) m (S.Stream f m)
with g = Fold f S.lift id
  where f (a S.:> rest) = S.yields (g a) >> return rest

-- This one needs a little post processing
elem :: (Eq a, Monad m)
     => a
     -> Fold (S.Of a) m (W.WriterT M.Any m)
elem a' = Fold f S.lift id
  where f (a S.:> rest) = if a == a'
                          then W.tell (M.Any True) >> return rest
                          else return rest

Performance issue with "for"

According to the documentation for str f = concats (with str f), but they have very different performance. Let's try a contrieved example using with first.

main :: IO ()
main = S.effects . S.concats . flip S.with S.yield $ S.each [1..20000]

This performs quite reasonably:

       8,360,600 bytes allocated in the heap
         245,616 bytes copied during GC
          35,824 bytes maximum residency (2 sample(s))
          50,192 bytes maximum slop
               5 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0         6 colls,     6 par    0.000s   0.001s     0.0001s    0.0005s
  Gen  1         2 colls,     1 par    0.000s   0.001s     0.0003s    0.0006s

  Parallel GC work balance: 0.31% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.001s elapsed)
  MUT     time    0.000s  (  0.003s elapsed)
  GC      time    0.000s  (  0.001s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    0.000s  (  0.005s elapsed)

  Alloc rate    0 bytes per MUT second

  Productivity 100.0% of total user, 62.1% of total elapsed

What about for?

main :: IO ()
main = S.effects . flip S.for S.yield $ S.each [1..20000]

Now things get interesting:

  25,709,918,336 bytes allocated in the heap
   9,582,313,464 bytes copied during GC
       3,345,424 bytes maximum residency (1845 sample(s))
       4,580,976 bytes maximum slop
              20 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     13035 colls, 13035 par   22.609s  16.827s     0.0013s    0.0743s
  Gen  1      1845 colls,  1844 par    3.719s   3.461s     0.0019s    0.0323s

  Parallel GC work balance: 4.54% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.001s elapsed)
  MUT     time   10.891s  ( 10.745s elapsed)
  GC      time   26.328s  ( 20.288s elapsed)
  EXIT    time    0.000s  ( -0.002s elapsed)
  Total   time   37.219s  ( 31.031s elapsed)

  Alloc rate    2,360,738,555 bytes per MUT second

  Productivity  29.3% of total user, 34.6% of total elapsed

Not quite sure what goes wrong here, but at least it's easy to work around by always using with instead of for.

maybe you need two parameters for sliding window && adding tumbling window

Hi,

after checking your code for the sliding window, I am thinking maybe you can also provide an implementation that takes two parameters -- window size and sliding interval.

also, maybe you can consider the implementation of tumbling windows as well.
ref: https://stackoverflow.com/questions/12602368/sliding-vs-tumbling-windows

I am new to Haskell. If I make some incorrect observations here, please ignore my notes.

Thanks for this nice work!

Unintuitive Functor and Applicative instances

I had expected that S.map and fmap would work the same. E.g., to my mind these should have been equivalent:

x = S.map (+1) $ S.each [1..10]
x' = fmap (+1) $ S.each [1..10] -- this gives a compiler error

Because that's what happens to lists with map and fmap. Instead fmap only affects the result of the stream.

Similarly, with the Applicative instance:

y = S.zipWith (/) (S.each [1..10]) (S.each [2..11])
y' = (/) <$> S.each [1..10] <*> S.each [2..11] -- this gives a compiler error

What is the motivation behind such an implementation? There's certainly a downside for me here since, it seems, I can't have a general function working on Functors that would do the same thing for lists and streams.

Deprecate mapsExposed and mapsMExposed

Original issue by @treeowl:

I don't believe that either mapsExposed or mapsMExposed actually exposes anything it shouldn't. Whereas hoist must be passed a monad morphism to get sensible results, I'm pretty sure these functions will work with arbitrary natural transformations.

Request for co-maintainership

I have an interest in this package and use it in numerous places both recreationally and at work. I would like to aid in the maintainership of this package on GitHub, and take a stab at some of the issues that have been open longer.

support window and MonadOfNoReturn

What is the support window for streaming? I would like to drop support for versions of base prior to Applicative-Monad, and change all instances of 'return' in the code to 'pure'.

Add fusion rules

I certainly don't know which ones we want, but it would be nice to get some sort of fusion. Here's one possibility to consider:

The hoist, maps, and >>= operations can be combined into a single operation

hoistMapsBind :: (Monad m, Functor f)
               => (forall x. f x -> g x)
               -> (forall x. m x -> n x)
               -> (r -> Stream g n s)
               -> Stream f m r
               -> Stream g n s
-- hoistMapsBind phi psi thn str ~= hoist psi (maps phi str) >>= thn
hoistMapsBind phi psi thn = loop where
  loop :: Stream f m r -> Stream g n s2 thn2 (thn1 r)) str
  loop (Return r) = thn r
  loop (Effect m) = Effect $ psi (liftM loop m)
  loop (Step f) = Step $ phi (fmap loop f)

If we so desire, we can rewrite applications of hoist, maps, and >>= to applications of hoistMapsBind, and then fuse them using the following rule:

"hmb/hmb" forall (phi1 :: forall x. f x -> g x) (psi1 :: forall x. m x -> n x) (thn1 :: r -> Stream g n s)
                 (phi2 :: forall x. g x -> h x) (psi2 :: forall x. n x -> o x) (thn2 :: s -> Stream h o t)
                 (str :: Stream f m r).
            hoistMapsBind phi2 psi2 thn2 (hoistMapsBind phi1 psi1 thn1 str) =
              hoistMapsBind (\x -> phi2 (phi1 x)) (\x -> psi2 (psi1 x))
                            (\r -> hoistMapsBind phi2 psi2 thn2 (thn1 r)) str

But of course there could be much more general fusion opportunities we should be considering instead. I am especially curious about whether there's any way to convince GHC to do some of the heavy lifting itself.

Add tests

Original issue by @treeowl:

There doesn't seem to be a test suite. I would recommend adding, at least:

  1. Modules implementing the entire non-internal streaming API using FreeT instead of Stream. I think these might as well be exposed, so users can play with them.
  2. Tests verifying that Stream operations behave the same as their FreeT equivalents. See my old SO question How can I test functions polymorphic over applicatives for some ideas about being arbitrary. In this case, I suspect we want to use functors built from algebraic bits (Sum, Product, Compose, Identity, Const) for f parameters, and free monads over such functors for m parameters.

Release to Hackage

Any chance of getting a release to Hackage? The current Hackage version contains an asymptotic performance regression #47.

How to best write "forWithState"?

It took me a very long time to work out how to write this. I'm not particularly familiar with streaming. Are there any library functions I could have taken advantage of to implement this more easily?

forWithState
  :: forall a f s m r
   . (Monad m, Functor f)
  => S.Stream (S.Of a) m r
  -> s
  -> ((a, s) -> S.Stream f m s)
  -> S.Stream f m (r, s)
forWithState stream s f = St.runStateT (switch inStateT) s
 where
  inStateT :: S.Stream f (St.StateT s m) r
  inStateT = S.for (S.hoist S.lift stream) $ \a -> do
    s  <- St.lift St.get
    s' <- S.hoist S.lift (f (a, s))
    St.lift (St.put s')

switch :: (Functor f,
           Monad m,
           Monad (t m),
           St.MonadTrans t,
           Monad (t (S.Stream f m)),
           S.MFunctor t)
       => S.Stream f (t m) a
       -> t (S.Stream f m) a
switch s = S.destroy s yieldsT effectT pure

yieldsT
  :: (Functor f, Monad m, St.MonadTrans t, Monad (t (S.Stream f m)))
  => f (t (S.Stream f m) a)
  -> t (S.Stream f m) a
yieldsT = St.join . S.lift . S.yields

effectT
  :: ( Monad (t (streamf m))
     , Monad m
     , St.MonadTrans streamf
     , S.MFunctor t
     )
  => t m (t (streamf m) a)
  -> t (streamf m) a
effectT = St.join . S.hoist S.lift

documentation cleanup

documentation has some minour problems, as well as containing references to things that are no longer used e.g. things from resourcet packages
i will probably make a PR sometime within the next few days

Please tag releases

It's generally good practice to create a tag for each release that is pushed to Hackage.

add wrapEffect to the library

Hi,

Would you consider adding a new function, wrapEffect, to the library?

-- | before evaluating the monadic action returning the next step in the 'Stream', it
-- extracts the value in a monadic computation @m a@ and passes it to a new monadic
-- computation, @a -> m ()@, that is ran after etracting
wrapEffect :: (Monad m, Functor f) => m a -> (a -> m ()) -> Stream f m r -> Stream f m r
wrapEffect m f = loop
  where loop stream = do
          x <- lift m
          step <- lift $ inspect stream
          lift $ f x
          either return loop' step
        loop' = wrap . fmap loop

Of course, the function could be written directly in terms of internals of Streaming library,
using Stream constructors - Return, Step, Effect - instead of inspect I used above.

This function will allow, for example, making synchronized streams that could be shared
amongst threads, from ordinary streams:

-- | transforms a stream into a synchronized stream with the help of a 'MVar'
synchronizeStream :: (MonadIO m) => TMVar a -> (a -> a) -> Stream (Of b) m r -> Stream (Of b) m r
synchronizeStream mvar f = wrapEffect (liftIO $ atomically $ takeTMVar mvar) (liftIO . atomically . putTMVar mvar . f)

-- | a lighter variant of 'synchronizeStream' that makes makes no use of synchronization
-- variable's value
synchronizeStream' :: (MonadIO m) => TMVar a -> Stream (Of b) m r -> Stream (Of b) m r
synchronizeStream' mvar = synchronizeStream mvar identity

This could be used to have a stream processed by more concurrent worker threads.

As an example, this is how I use it in my code:

mvar  <- liftIO $ newTMVarIO ()
let stream = synchronizeStream' mvar $ fromSocket udpSocket 512
workers <- liftIO $ replicateM 10 $ async (workerThread stream)
liftIO $ sequence_ $ map wait workers

where fromSocket creates a stream of messages received on an UDP port.

Add sliding window sums

slidingWindowSum
  :: (Monad m, Semigroup s)
  => Int
  -> Stream (Of s) m r
  -> Stream (Of s) m r

This could be used for things like precise sliding window means.

Rather than carrying around a Seq s, this would carry a finger tree-like structure with s annotations. I think a structure like Okasaki's implicit queues, augmented with annotations, will probably do the trick quite nicely; we shouldn't need anything quite as heavy as Hinze-Paterson 2–3 finger trees.

Sliding window min and max could actually use this too, but those can sometimes free memory more quickly with a specialized implementation.

[Discussion] Should we bring back exceptions instances?

I'm not sure of the safety, etc. but since we've removed the ResourceT support it means we can't easily use S.copy to write a Stream to multiple files (as streaming-with needs a MonadMask instance on the Monad for bracket to work).

S.nub referred to in documentation does not exist.

In the example here:

>>> runResourceT $ (S.writeFile "hello2.txt" . S.nub) $ store (S.writeFile "hello.txt" . S.filter (/= "world")) $ each ["hello", "world", "goodbye", "world"]

This function was removed in 855e725. I'm not sure why, as it seems useful, but perhaps there is now another way to do it. The commit message just refers to an Alternative instance, and I don't see how that would help nubbing a stream.

Stream f m r as generalised vector type?

Generalising the Stream type in Data.Vector.Fusion.Stream.Monadic to

data Stream f m r = forall s. Stream (s -> m (Step f s r)) s

data Step f s r = Yield !(f s)
                | Skip s
                | Return r

I have benchmarked a few of its operations here. Most of the functions are direct copies from vector, and it seems to perform as good as Vector.
Comments?

“slidingWindow 1” is not behaving as expected.

slidingWindow 1 is not behaving as expected in streaming-0.2.1.0 (Stack resolver lts-12.4). Example:

$ stack ghci
ghci> import qualified Streaming.Prelude as S
ghci> S.print $ S.slidingWindow 1 $ S.each "1234"
fromList "1"
fromList "2"
fromList "23"
fromList "34"

The expected result is

fromList "1"
fromList "2"
fromList "3"
fromList "4"

`for` appears to have superlinear performance degradation?

I tried porting something from conduit to streaming and was getting really terrible performance. Just messing around with likely culprits, I ended up with this little test case:

import           Data.Attoparsec.Text (isEndOfLine)
import           Data.Attoparsec.ByteString.Char8 (takeTill, endOfLine)
import Data.Attoparsec.ByteString.Streaming (parsed)
import qualified Data.ByteString.Streaming.Char8 as BS8
import qualified Streaming.Prelude as P
import           Control.Monad.Trans.Resource (runResourceT)
import Data.Function ((&))

-- With both "bad" lines commented out - ~3M lines/sec, 188MB allocated
-- With second "bad" line uncommented - ~41K lines/sec, 2.23GB allocated
-- With first "bad" line uncommented - strongly superlinear behavior!
-- 1300 lines -> .053s, 95M 
-- 2600 lines -> .168s, 407M
-- 3900 lines -> .368s, 942M
-- 5200 lines -> .752s. 1.7G
countWords :: IO Int
countWords = runResourceT $ 
    BS8.readFile "/usr/share/dict/web2"
    & parsed (takeTill isEndOfLine <* endOfLine)
    -- & flip P.for P.yield -- Horrendous performance
    -- & flip P.with (P.:> ()) -- this seems fine
    & P.map (const Nothing)
    -- & P.concat -- Bad performance
    & P.fold_ (\i _ -> i+1) 0 id

main :: IO ()
main = countWords >>= print

Looks like for is the most likely problem. Switching from ResourceT IO to IO and using fromHandle instead of readFile does not improve matters, so it's not just a poor implementation of *> from ResourceT.

Unintuitive interaction between chunksOf and effects

I'm not sure if this should be considered a bug or not, but the chunksOf function interacts with effects in a way that was susprising to me. Consider this code:

{-# language ScopedTypeVariables #-}

import Streaming
import qualified Streaming.Prelude as S
import Control.Monad (replicateM_)

data A = A deriving Show
data B = B deriving Show

chunkList :: forall m. Monad m => m (Of [B] (Of [([B], [A])] ()))
chunkList = S.toList . S.toList . S.mapped toLists . chunksOf 2 $ interleavedStream where
    interleavedStream :: Stream (Of A) (Stream (Of B) m) ()
    interleavedStream = replicateM_ 4 $ do
        S.yield A
        lift (S.yield B)

    toLists :: Stream (Of A) (Stream (Of B) m) x
            -> Stream (Of B) m (Of ([B], [A]) x)
    toLists chunk = do
        bs :> as :> r <- lift . S.toList . S.toList $ chunk
        pure $ (bs, as) :> r

The toLists action is meant to collect the values of the chunk (stream of A), as well as effects from the underlying monad (stream of B). This is the unexpected result:

> chunkList
[B,B] :> ([([B],[A,A]),([B],[A,A])] :> ())

B values that occur between chunk boundaries end up in an "outer" stream instead of being included inside the chunks. In all fairness, the type of chunkList does suggest what is going to happen, but I find it a bit unintuitive.

streaming-0.2.0.0 has no bracketStream

When upgrading to lts-10.3 for stack I noticed that this came with streaming-0.2.0.0 and also that bracketStream was now eradicated from the library. I couldn't see this change in the changelog.md.

Is there a reason this was removed? Is there a different tactic for bracketing a stream?

Eliminate resourcet

This library uses MonadResource in two places: readFile and writeFile. It isn't even needed in writeFile. In readFile, it provides a dangerous opportunity to leak file descriptors since it cannot actually provide prompt finalization for a Stream.

In the spirit of pipes and pipes-safe, I'm going to remove this approach to resource management from streaming. If someone would like to create a streaming-safe package, that would be a better place to try to handle finalization correctly.

unnecessary alias hides relationship of Streaming to regular haskell concepts?

I noticed "yield" looks like it's merely "pure" but the documentation doesn't describe what any difference is. If they're the same then the documentation should only discuss the use of "pure" but if they're different then the documentation for "yield" should explain the relationship and differences vs "pure"

Add streaming to Stackage Nightly

It would be nice to add streaming and its associated packages to stackage nightly.

This would avoid its disappearance from the next LTS release.

Thanks

Eliminate time

This library only depends on time to provide the seconds function. This function makes some strong assumptions about what the user wants to do with their timestamps, and its use of a non-monotonic clock makes it potentially dangerous. I'm going to remove it.

Cut a new release of streaming-utils

I just transferred streaming-utils to haskell-streaming (here), but it doesn't have issues enabled. I also don't have admin access to enable issues, so apologies for the odd round-about way of opening an issue here instead.

Anyway, streaming-utils on Hackage isn't buildable with the latest versions of dependencies, notably streaming-0.2. In haskell-streaming/streaming-utils I've fixed that. We just need to get maintainership on Hackage and upload. @andrewthad, would you mind getting that access?

Stepping Down

When I started maintaining streaming a few years ago, I was actively trying to use it in applications. Since then, I've slowly found this abstraction is not a good fit for the domain I'm working in. I've not been active in maintaining this for a while. I wanted to communicate that I'm not planning on maintaining any of the streaming libraries anymore, and I have removed myself as an owner of the haskell-streaming organization. @treeowl and @chessai are both owners on the haskell-streaming organization and have done a good job handling issues, and I'm sure they will continue to do so.

Good luck to everyone who gets mileage out of what streaming offers. I've gotten a lot from just thinking about the streaming pattern even though its incarnation in streaming and streaming-bytestring didn't end up being exactly what I needed.

Unexpected behavior of Streaming.Prelude.head

Consider this GHCi session:

λ> import qualified Streaming.Prelude as S
λ> :set -XTypeApplications 
λ> S.head $ S.readLn @IO @Int
1<Enter>
2<Enter> -- Why didn't the streaming end here? I'd expect this to return `Just 1 :> ()`
3<Enter>
^CInterrupted.

Compare this to behavior of head_ which behaves as I would expect (ending the streaming after the first successful read):

λ> S.head_ $ S.readLn @IO @Int
1
Just 1

Improve sliding window min/max

#95 added sliding window min/max functions with big-O optimal amortized bounds. But a new minimum/maximum can force a pause proportional to the number of elements in the window. If we instead use a more careful sorted list representation, we should be able to cut the pauses to logarithmic time. A relatively simple approach would use a finger tree with a particular monoid described in the Hinze–Paterson paper on 2–3 finger trees, but we can probably tweak it a tad to do better without much difficulty. I don't know if there are more specialized structures that can do even better.

Generalize unzip, perhaps

Original issue by @treeowl:

I don't know if this makes sense, but it should work at least for Of and (,). The constraints on f look scary, but they really boil down to f being a bifunctor that's a comonad in its second argument.

 unzip :: (Monad m, Bifunctor f, Functor (f b), Comonad (f (a,b)))
   => Stream (f (a,b)) m r -> Stream (f a) (Stream (f b) m) r
 unzip = loop where
   loop str = case str of
     Return r -> Return r
     Effect m -> Effect (liftM loop (lift m))
     Step f -> Step
              . first fst
              . extend (Effect . Step . first snd)
              . fmap (Return . loop) $ f

I don't know if the extra laziness here will cause any performance problems. If it does, and if this idea is actually good for something, we can offer both versions. As with copy, we really only need the power of Extend, not of Comonad.

Need Semigroup instance for Of

Streaming fails to build on GHC HEAD:

Building library for streaming-0.2.0.0..
[1 of 4] Compiling Data.Functor.Of  ( src/Data/Functor/Of.hs, dist/build/Data/Functor/Of.o )
src/Data/Functor/Of.hs:22:10: error:
    • Could not deduce (Semigroup (Of a b))
        arising from the superclasses of an instance declaration
      from the context: (Monoid a, Monoid b)
        bound by the instance declaration
        at src/Data/Functor/Of.hs:22:10-48
    • In the instance declaration for ‘Monoid (Of a b)’
   |
22 | instance (Monoid a, Monoid b) => Monoid (Of a b) where
   |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Failed to install streaming-0.2.0.0
cabal: Error: some packages failed to install:
streaming-0.2.0.0-CBylW9J269QGm01HTBln5E failed during the building phase. The
exception was:
ExitFailure 1

I'll try and do a PR for this tomorrow.

A way to not repeat the monadic action in derived streams?

To illustrate:

import Streaming
import qualified Streaming.Prelude as S
import qualified Network.WebSockets as WS

messageId :: MyMessage -> Int
-- we're connecting to some websocket using the `websockets` library
ws :: WS.ClientApp ()
ws connection = do
  S.print . zipSelf . S.map messageId . S.repeatM $ do
    message <- WS.receiveData connection
    return (message :: MyMessage)
    where
      zipSelf s = S.zipWith (,) s s

If I ran ws it would print out something like

(1, 2)
(3, 4)
(5, 6)
...

What I actually want is for it to not duplicate calls to WS.receiveData and output

(1, 1)
(2, 2)
(3, 3)
...

Is this at all possible?

In this example one could easily replace the culprit of the issue - zipSelf - with a call to S.map and be done with it. However the actual code I'm working with is a bit more involved - I've created a bunch of stream processor functions, that somehow transform the original stream, whose results could then as well be fed through some other stream processors or combined and so on. The problem is that each of the derived streams competes for messages from the websocket, fetching them by itself.

My domain logic is rather easily expressed in the above manner. Rewriting it into a single pass over the original stream doesn't seem feasible. If what I'm asking for isn't possible, is there some other approach to structuring my computations I could use that would make sense here?

travis ci problems

i've added a .travis.yml file, and enabled travis for the haskell-streaming org. however, when i look at the repos for our org on travis-ci.com, streaming itself does not appear.

@andrewthad @treeowl might you know why this is the case?

Is there the ability to trace?

Is there a function to do something like

trace :: Monad m
      => (t -> Maybe String)
      -> Stream (Of t) m r
      -> Stream (Of t) m r
trace f s = for s $ \i -> do
  maybe (return ()) traceM (f i)
  yield i

Don't derive Data

Currently, we derive a Data instance for Stream. This seems wrong, because it allows users to distinguish streams based on Effect layering. The way to stick with the current general approach is to work on unexposed streams. Another major option would be to proclaim that Stream is just a representation of something like FreeT, and make a Data instance emulating that. That might be going too far, though.

Severe performance problem with `S.concat` - O(n²) in stream items

I tested this against version 0.2.1.0 (from stackage lts-10.9).

There seems to be a severe performance problem with S.concat, it seems to be O(n²) in the number of streamed items. I have used this in a long-running pipe and in time I got 100% CPU usage and horribly slow processing.

The fixedSConcat is obviously specialized to list and not Foldable, but still..

import qualified Streaming.Prelude as S
import qualified Streaming.Internal as S
import Streaming.Prelude (Of(..))
import Criterion.Main

test1 :: Int -> IO (Of Int ())
test1 mx = S.sum $ fixedSConcat $ S.take mx $ S.repeat [1]

test2 :: Int -> IO (Of Int ())
test2 mx = S.sum $ S.concat $ S.take mx $ S.repeat [1]

fixedSConcat :: Monad m => S.Stream (S.Of [a]) m r ->  S.Stream (S.Of a) m r
fixedSConcat = loop
  where
    loop str = case str of
        S.Return r -> S.Return r
        S.Effect m -> S.Effect (fmap loop m)
        S.Step (lst :> as) -> 
          let inner [] = loop as
              inner (x:rest) = S.Step (x :> inner rest)
          in inner lst

main:: IO ()
main =
  defaultMain [
      bgroup "streaming - 1000" 
        [ bench "fixedSConcat"  $ whnfIO (test1 1000)
        , bench "S.concat" $ whnfIO (test2 1000)
      ],
      bgroup "streaming - 2000" 
        [ bench "fixedSConcat"  $ whnfIO (test1 2000)
        , bench "S.concat" $ whnfIO (test2 2000)
      ]
    ]
benchmarking streaming - 1000/fixedSConcat
time                 55.25 μs   (54.58 μs .. 56.08 μs)
                     0.999 R²   (0.998 R² .. 0.999 R²)
mean                 55.36 μs   (54.76 μs .. 56.15 μs)
std dev              2.353 μs   (1.836 μs .. 3.406 μs)
variance introduced by outliers: 47% (moderately inflated)

benchmarking streaming - 1000/S.concat
time                 35.65 ms   (35.07 ms .. 36.33 ms)
                     0.998 R²   (0.995 R² .. 1.000 R²)
mean                 35.63 ms   (35.22 ms .. 36.09 ms)
std dev              902.6 μs   (631.5 μs .. 1.314 ms)

benchmarking streaming - 2000/fixedSConcat
time                 115.2 μs   (112.0 μs .. 119.3 μs)
                     0.991 R²   (0.981 R² .. 0.998 R²)
mean                 113.8 μs   (112.1 μs .. 117.1 μs)
std dev              7.680 μs   (4.765 μs .. 14.05 μs)
variance introduced by outliers: 66% (severely inflated)

benchmarking streaming - 2000/S.concat
time                 149.0 ms   (133.6 ms .. 158.2 ms)
                     0.993 R²   (0.980 R² .. 0.999 R²)
mean                 169.4 ms   (160.4 ms .. 187.3 ms)
std dev              17.29 ms   (5.326 ms .. 24.42 ms)
variance introduced by outliers: 27% (moderately inflated)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.