Code Monkey home page Code Monkey logo

iteratee's Introduction

iteratee.io

Build status Coverage status Gitter Maven Central

This project is an iteratee implementation for Cats that began as a port of Scalaz's iteratee package, although the API and implementation are now very different from Scalaz's. There are API docs (but they're a work in progress), and I've published a blog post introducing the project.

The motivations for the port are similar to those for circe—in particular I'm aiming for a more consistent API, better performance, and better documentation.

Note that this library doesn't support many of the use cases that fs2 (formerly Scalaz Stream) is designed to handle. It doesn't support nondeterministic reading from multiple streams, for example, and in general is a less appropriate choice for situations where concurrency and parallelism are primary goals. Where the use cases of fs2 and this library do overlap, however, it's often likely to be a simpler, faster solution.

The initial performance benchmarks look promising. For example, here are the throughput results for summing a sequence of numbers with this library and cats.Id (II), this library and Monix's Task (IM), this library and Scalaz's Task (IT), this library and Twitter futures (IR), Scalaz Stream (S), scalaz-iteratee (Z), play-iteratee (P), the Scala collections library (C), and fs2 (F). Higher numbers are better.

Benchmark                      Mode  Cnt      Score     Error  Units
InMemoryBenchmark.sumInts0II  thrpt   80  10225.388 ± 191.612  ops/s
InMemoryBenchmark.sumInts1IM  thrpt   80  13395.800 ±  30.912  ops/s
InMemoryBenchmark.sumInts2IT  thrpt   80  18609.579 ±  47.491  ops/s
InMemoryBenchmark.sumInts3IR  thrpt   80  15999.740 ± 114.949  ops/s
InMemoryBenchmark.sumInts4S   thrpt   80     72.074 ±   1.209  ops/s
InMemoryBenchmark.sumInts5Z   thrpt   80    310.472 ±   4.368  ops/s
InMemoryBenchmark.sumInts6P   thrpt   80     43.071 ±   0.543  ops/s
InMemoryBenchmark.sumInts7C   thrpt   80  12975.042 ±  48.702  ops/s
InMemoryBenchmark.sumInts8F   thrpt   80   9610.699 ±  41.936  ops/s

And the results for collecting the first 10,000 values from an infinite stream of non-negative numbers into a Vector:

Benchmark                         Mode  Cnt     Score    Error  Units
StreamingBenchmark.takeLongs0II  thrpt   80  2787.725 ± 16.812  ops/s
StreamingBenchmark.takeLongs1IM  thrpt   80  1617.848 ± 19.899  ops/s
StreamingBenchmark.takeLongs2IT  thrpt   80  1052.494 ±  7.707  ops/s
StreamingBenchmark.takeLongs3IR  thrpt   80   979.514 ± 26.197  ops/s
StreamingBenchmark.takeLongs4S   thrpt   80    56.882 ±  0.969  ops/s
StreamingBenchmark.takeLongs5Z   thrpt   80   154.103 ± 10.350  ops/s
StreamingBenchmark.takeLongs6P   thrpt   80     1.216 ±  0.005  ops/s
StreamingBenchmark.takeLongs7C   thrpt   80  3273.158 ± 55.187  ops/s
StreamingBenchmark.takeLongs8F   thrpt   80     7.915 ±  0.044  ops/s

And allocation rates (lower is better):

Benchmark                                            Mode  Cnt           Score          Error  Units
InMemoryBenchmark.sumInts0II:gc.alloc.rate.norm     thrpt   20      159953.462 ±       11.863   B/op
InMemoryBenchmark.sumInts1IM:gc.alloc.rate.norm     thrpt   20      160203.272 ±        5.949   B/op
InMemoryBenchmark.sumInts2IT:gc.alloc.rate.norm     thrpt   20      160622.026 ±        6.323   B/op
InMemoryBenchmark.sumInts3IR:gc.alloc.rate.norm     thrpt   20      160398.303 ±        6.685   B/op
InMemoryBenchmark.sumInts4S:gc.alloc.rate.norm      thrpt   20    63936897.241 ±   320928.043   B/op
InMemoryBenchmark.sumInts5Z:gc.alloc.rate.norm      thrpt   20    16401510.998 ±        6.115   B/op
InMemoryBenchmark.sumInts6P:gc.alloc.rate.norm      thrpt   20    13802446.593 ±   229152.745   B/op
InMemoryBenchmark.sumInts7C:gc.alloc.rate.norm      thrpt   20      159851.547 ±       14.556   B/op
InMemoryBenchmark.sumInts8F:gc.alloc.rate.norm      thrpt   20      260454.260 ±     1522.736   B/op

Benchmark                                            Mode  Cnt           Score          Error  Units
StreamingBenchmark.takeLongs0II:gc.alloc.rate.norm  thrpt   20     3043720.338 ±        0.018   B/op
StreamingBenchmark.takeLongs1IM:gc.alloc.rate.norm  thrpt   20     3444961.639 ±        4.168   B/op
StreamingBenchmark.takeLongs2IT:gc.alloc.rate.norm  thrpt   20     5804308.795 ±    61718.228   B/op
StreamingBenchmark.takeLongs3IR:gc.alloc.rate.norm  thrpt   20     5124124.296 ±        5.147   B/op
StreamingBenchmark.takeLongs4S:gc.alloc.rate.norm   thrpt   20    75347149.315 ±   555268.150   B/op
StreamingBenchmark.takeLongs5Z:gc.alloc.rate.norm   thrpt   20    28588033.048 ±   238419.245   B/op
StreamingBenchmark.takeLongs6P:gc.alloc.rate.norm   thrpt   20  1206196498.000 ±    71329.621   B/op
StreamingBenchmark.takeLongs7C:gc.alloc.rate.norm   thrpt   20      526752.310 ±        0.029   B/op
StreamingBenchmark.takeLongs8F:gc.alloc.rate.norm   thrpt   20   531380973.839 ± 13505581.754   B/op

License

iteratee.io is licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

iteratee's People

Contributors

alexandru avatar anovstrup avatar bmjames avatar channingwalton avatar crafter6432 avatar crispywalrus avatar dchenbecker avatar halcat0x15a avatar ilya-murzinov avatar johnynek avatar jonm avatar larsrh avatar liff avatar purefn avatar retronym avatar romaniakovlev avatar rossabaker avatar runarorama avatar scala-steward avatar sullivan- avatar tonymorris avatar travisbrown avatar travisbrown-stripe avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

iteratee's Issues

Avoid resource leaks

Right now in some of the FileModule read method implementations (for example), if a FileReader is instantiated successfully but then new BufferedReader(...) fails, the FileReader will not be closed.

This is kind of a corner case but it should be fixed.

Move iteratee-twitter into catbird

Now that Cats is approaching 1.0 and the core iteratee API is getting close to something I'd consider stable (apart from an upcoming set of changes to iteratee-files), I'm considering moving the iteratee-twitter module into catbird.

This would allow us to decouple the iteratee.io release cycle from Twitter Util's—we'll be more easily able to publish new iteratee-twitter releases for every Twitter Util release (which tends to happen every month or two).

Add helper methods for working with side-effecting iteratees

From Gitter, in response to a question by @alexknvl:

I was thinking just last week of adding something like the following methods to the Iteratee companion > object (omitting constraints and other details):

def foreach[F[_], A](f: A => Unit): Iteratee[F, A, Unit]
def foreachM[F[_], A](f: A => F[Unit]): Iteratee[F, A, Unit]

And then on Iteratee instances:

def discard: Iteratee[F[_], E, Unit]

I'd happily accept a PR adding these—I probably wouldn't get to them this week myself.

Add a buffer Enumeratee

It would be nice to have a buffer Enumeratee in order to speed latency dependent iteratees up.

Track down issue with large negative argument to take

One of the jobs for #9 failed for a reason apparently unrelated to the changes there:

[info] - take *** FAILED ***
[info]   GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
[info]    (IterateeSuite.scala:120)
[info]     Falsified after 8 successful property evaluations.
[info]     Location: (IterateeSuite.scala:120)
[info]     Occurred when passed generated values (
[info]       arg0 = LargeEnumerator(Stream(2147483647, ?),io.iteratee.Enumerator$$anon$15@421c498f),
[info]       arg1 = -2147483648
[info]     )

This is a little disturbing, but I'm just restarting the job for now.

Add fs2 module

I'd like to add a module with support for fs2's Task that's similar to what we have for Scalaz, as well as some converters to and from fs2's types.

tailRecM doesn't guarantee stack safety with large chunks

This is fixed in #141, but I figured I should report it for clarity.

The problem is that when you have a chunk that's large enough to feed several thousand loops of a tailRecM, you might get a stack overflow if the underlying monad's flatMap isn't stack-safe. For example:

scala> import cats.FlatMap, cats.instances.option._
import cats.FlatMap
import cats.instances.option._

scala> import io.iteratee._, io.iteratee.modules.option._
import io.iteratee._
import io.iteratee.modules.option._

scala> type OptionIntIteratee[A] = Iteratee[Option, Int, A]
defined type alias OptionIntIteratee

scala> val it = FlatMap[OptionIntIteratee].tailRecM[Int, Option[Int]](0)(i =>
     |   head[Int].map { case x if i > 5000 => Right(x); case x => Left(i + 1) }
     | )
it: OptionIntIteratee[Option[Int]] = io.iteratee.Iteratee@77a2db9f

scala> iterate(0)(_ + 1).into(it)
res0: Option[Option[Int]] = Some(Some(5001))

scala> enumVector((0 to 10000).toVector).into(it)
java.lang.StackOverflowError
  at io.iteratee.internal.Step$PureCont.bind(Step.scala:103)
  at io.iteratee.Iteratee$$anonfun$flatMap$1.apply(Iteratee.scala:53)
  at io.iteratee.Iteratee$$anonfun$flatMap$1.apply(Iteratee.scala:53)
  at scala.Option.flatMap(Option.scala:171)
  at cats.instances.OptionInstances$$anon$1.flatMap(option.scala:21)
  at cats.instances.OptionInstances$$anon$1.flatMap(option.scala:9)
  at io.iteratee.Iteratee.flatMap(Iteratee.scala:53)
  ...

The fact that Cats's FlatMap[Option] has a safe tailRecM should be enough here, but it's not.

The fix is a smarter tailRecM implementation for Iteratee that uses the underlying monad's tailRecM when sequencing on a single chunk.

Investigate Travis CI failures

The build is failing after the 0.2.0 release commits, which didn't change any code. I can't reproduce the failures locally, so I'm going to write them off as Travis CI brokenness and try restarting them again tomorrow.

NPE when running readBytes with future module

The following ammonite script

import $ivy.`io.iteratee::iteratee-files:0.8.0`

import scala.concurrent.ExecutionContext.Implicits.global

val fileModule = io.iteratee.modules.future
val fileFuture = io.iteratee.files.future

import cats.implicits._
import fileModule._
import fileFuture._

val res = readBytes(new java.io.File("/dev/urandom")).into(takeI(1))
res.onFailure { case t => t.printStackTrace }

prints a following stack trace

java.lang.NullPointerException
        at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368)
        at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367)
        at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375)
        at scala.concurrent.impl.Promise.transformWith(Promise.scala:36)
        at scala.concurrent.impl.Promise.transformWith$(Promise.scala:34)
        at scala.concurrent.impl.Promise$KeptPromise$Successful.transformWith(Promise.scala:375)
        at scala.concurrent.Future.flatMap(Future.scala:301)
        at scala.concurrent.Future.flatMap$(Future.scala:301)
        at scala.concurrent.impl.Promise$KeptPromise$Successful.flatMap(Promise.scala:375)
        at cats.instances.FutureInstances$$anon$1.flatMap(future.scala:13)
        at cats.instances.FutureInstances$$anon$1.flatMap(future.scala:10)
        at io.iteratee.Enumeratee$$anon$25.apply(Enumeratee.scala:11)
        at io.iteratee.Enumerator.intoStep(Enumerator.scala:15)
        at io.iteratee.Enumerator.$anonfun$into$1(Enumerator.scala:17)
        at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:302)
        at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

I am able to run simple enumerators like iterate(1)(_ + 1) with future module. But all of the file enumerators that I tried fail with NPE when ran with future module.

NPE is from executor.prepare(). Execution context in the future module is null?

Enumerator.ensure needs lazy monads

See:

https://github.com/travisbrown/iteratee/blob/master/core/src/main/scala/io/iteratee/Enumerator.scala#L55

since ensure takes a value, not a call-by-name, if the monad in question is strict, then the ensure will have already happened by the time the call is made. This leads to strange behavior with, for instance, some of the file iteratees.

I think the solution would be to take an Eval[F[A]] there (or a call-by-name, but in cats it seems that Eval is the standard way to go) so that the caller can pass Eval.later(captureEffect(stream.close())) which, I think is what we want.

To work around this I made a quick Eval[M[T]] MonadError but that was an annoying expenditure of time. (just kidding! like all fuctional programmers I secretly enjoy implementing such Monads, at least the first 10-20 times I do it).

Allow consumtion of async sources

Right now there doesn't seem to a way to create an Enumerator for an async source such as an scala Future.

I would propose an similar syntax to play's Enumerator.fromCallback

Add fs2 converters

#171 addresses the main part of #166, but I'd also like to have converters allowing us to go between the iteratee abstractions and fs2's streams.

I'm not sure exactly what this would look like, and I don't want to block 0.9.0 waiting for us to figure it out, but it should happen before 1.0.

Consider more convenient syntax for transforming enumerators

Right now if you want e.g. to take a certain number of elements from an enumerator, you have to create a take enumeratee and use mapE. This is intentional, to keep the size of the Enumerator API reasonable, and to encourage people to compose transformations independently, but I'm thinking that for the sake of convenience we might want to provide the operations in EnumerateeModule on Enumerator directly.

some `Monad[M]` can be `FlatMap[M]` in Enumerator

(maybe elsewhere).

For instance: https://github.com/travisbrown/iteratee/blob/master/core/src/main/scala/io/iteratee/Enumerator.scala#L75

some WriterT instances are only FlatMap (for instance if the type we are writing to is only a Semigroup). Also Map[K, ?] has a FlatMap but no Monad. May be a stretch, but it could be cool to imagine a bunch of parallel operations in a FlatMap[Map[K, ?]] that are the same up to a key. That might be a model of map/reduce?

Update to Finagle 6.36.0

The iteratee-twitter module currently depends on Finagle 6.35.0 via catbird.io 0.5.0. The next version of Finagle (6.36.0) is only published for Scala 2.11, and it's not yet clear exactly when Finch will update, so I'm currently keeping the catbird version unchanged, but eventually we'll need to update (and to configure SBT to publish iteratee-twitter only for 2.11).

iterateUntil,iterateUntilM: MatchError

scala.MatchError: Some(...) (of class scala.Some) at io.iteratee.Enumerator$$anon$18.io$iteratee$Enumerator$$anon$$loop(Enumerator.scala:224) at io.iteratee.Enumerator$$anon$18.apply(Enumerator.scala:228) at io.iteratee.Enumerator$$anon$3$$anonfun$apply$2.apply(Enumerator.scala:32) at io.iteratee.Enumerator$$anon$3$$anonfun$apply$2.apply(Enumerator.scala:32) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251) at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Consider renaming sum

See @johnynek's question here.

I wonder if combine is a better name here. Algebird used plus/sum but @non really does not like that except for explicitly"additive" (in name monoid)

Re-enable tests on Scala.js

In #455 the Scala.js tests in the tests module were failing on Scala 3, with some completely unhelpful messages:

[error] Error during tests:
[error] 	io.iteratee.EvalIterateeTests
[error] 	io.iteratee.TryEnumerateeTests
[error] 	io.iteratee.IdIterateeTests
[error] 	io.iteratee.EvalEnumerateeTests
[error] 	io.iteratee.TryEnumeratorTests
[error] 	io.iteratee.IdEnumeratorTests
[error] 	io.iteratee.TryIterateeTests
[error] 	io.iteratee.OptionEnumerateeTests
[error] 	io.iteratee.EitherEnumerateeTests
[error] 	io.iteratee.IdEnumerateeTests
[error] 	io.iteratee.EitherTEnumeratorTests
[error] 	io.iteratee.EitherTEnumerateeTests
[error] 	io.iteratee.EitherIterateeTests
[error] 	io.iteratee.EitherTIterateeTests
[error] 	io.iteratee.OptionEnumeratorTests
[error] 	io.iteratee.OptionIterateeTests
[error] 	io.iteratee.EvalEnumeratorTests
[error] 	io.iteratee.EitherEnumeratorTests
...
[error] (testsJS / Test / test) sbt.TestsFailedException: Tests unsuccessful

I tried conditionally disabling the tests for Scala.js on Scala 3 only, but that made them fail on 2.13 in a similar way, so I just disabled them on Scala.js altogether. That works fine for me, but if someone wants to take a look at some point, I would appreciate it.

Example with ManagedResource[]

Could you give an example of use of your iteratees within a Scala-ARM (J Suereth) ManagedResurce[] context? Maybe a gist?

Rethink tests to more accurately represent usage

In #168 for example the fact that we're only testing FutureFileModule (not files.future) allowed an initialization order bug to slip through. I think that's a somewhat exceptional case, and the lack of coverage is caught by Scoverage, but it would be better not to have to worry about our test code being so abstract that stuff like that can slip through.

Add an enumeratee to support parallel processing

It would be nice to have something like gatherMap that would allow us to run a transformation in parallel over batches of items. I can imagine a method like this:

def gatherMap[F[_]: Monad, O, I]
  (gather: Vector[F[I]] => F[Vector[I]], size: Int)(f: O => F[I]):: Enumeratee[F, O, I]

This would be a little nicer if we had a NonDeterminism type class, and I guess we could introduce our own, but passing in a gather function isn't too bad, either. I'd imagine the implementation would be pretty simple with sequenceI, but I haven't written it out yet.

Investigate test coverage drop

49339ce updates the sbt-scoverage version from 1.2.0 to 1.3.5, which seems to have resulted in the benchmark tests not being counted in the aggregated coverage number used by Codecov.

Consider takeWhileF, etc.

#62 adds takeWhile and dropWhile enumeratees that take or drop values from a stream while a predicate E => Boolean holds. It may also be useful to have effectful versions of these enumeratees that use a predicate E => F[Boolean].

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.