tomasmikula / libretto Goto Github PK
View Code? Open in Web Editor NEWDeclarative concurrency and stream processing library for Scala
License: Mozilla Public License 2.0
Declarative concurrency and stream processing library for Scala
License: Mozilla Public License 2.0
def divide[A]: Source[A] -⚬ (Source[A] |*| Source[A])
Forwards incoming element to whichever output polls first.
Biased and unbiased version.
Also, a version that divides elements equally between outputs, regardless of the demand.
To cover:
|+|
and |&|
Signaling
, Deferrable
/Junction
) (#50)Comonoid
, pooling, reference counting) (#52)In particular, don't require the keys of a tree to be Scala values (Val[K]
).
Informally, by a left-biased racing operator we mean one which in case of a "tie" consistently favors the left contestant. (Note that we haven't defined what a "tie" is.)
Can we come up with a precise definition that does not depend on implementation details?
Define type alias
type Initializing[A] = Done |*| A
to mean, by convention, that the mechanism that later produces A
is still initializing until the Done
signal arrives.
For example, Initializing[Pollable[A]]
could be used to signal the event that the stream is connected to a source (e.g. to Kafka with the latest offsets) and after initialization completes, no messages from that source are missed.
Testing non-occurrence of an event is often very useful. However, it is generally tricky. It is like checking that a Future
has not completed yet and will not complete unless additional actions are taken. It is not possible with Future
s: there's no guarantee that the Future
will not complete if we wait a little longer.
The poor man's approximation of waiting for some finite amount of time and then assume that if the event has not occurred yet, it will not occur in the future, can be implemented by racing a timer and the said event.
We could do better (unless we need to interface with asynchronous Scala code, that is).
Consider SimulatedDSL extends ClosedDSL
that adds an operation
def exhaust[A, B]: (A =⚬ B) -⚬ (A =⚬ B)
which takes a subsystem A =⚬ B
and makes ready all the outputs that can be made ready without taking additional inputs. Then, when we test whether an output is ready, the answer we get is conclusive.
Have a good story for how to do supervision of a sub-component, i.e. being able to recover from its unexpected errors.
Optimize the blueprint before executing it.
CoreStreams
should not use stuff from ScalaDSL
, namely Val
, Neg
, ...
Using e.g. http4s
.
Depends on some HTTP server (e.g. http4s
) available for Scala 3.
Implement a tree that supports modifications in different branches concurrently, i.e. update in one branch is not blocked until an update in a different branch completes. Mutual exclusion is needed only along the common path in the tree - as soon as the paths diverge, updates can occur concurrently.
Each node is an active entity (actor, agent) that communicates with its parent and children.
A node may signal completion to its parent.
A node has to listen concurrently (race) for communication from its parent and all children.
Use in Pollable.subscribeByKey
, so that dispatch of a value for key k2
is not blocked until dispatch of a previous value for key k1
is complete.
def eval[A, B]: (Unlimited[Val[A] =⚬ Val[B]] |*| Pollable[A]) -⚬ Pollable[B]
The name eval
comes from the similarity to DSL.eval
(def eval[A, B]: ((A =⚬ B) |*| A) -⚬ B
).
Also, implement a version evaluating multiple elements in parallel, taking parallelism: Int
as an argument. (Perhaps as a different issue.)
A la LambdaCart.
The price to pay for lambda syntax is that linearity checks move from compile-time to assembly-time.
Provide Libretto wrappers for network I/O.
Queue
Map
Signaling
, Deferrable
/Junction
, Deferred
/Detained
, blockOutportUntilPing
, blockInportUntilPong
.
Scala 2 (2.13.4) doesn't properly typecheck stuff like
trait ScalaDSL {
type Done
}
trait CoreLib[DSL <: ScalaDSL] {
val dsl: DSL
}
trait BST[DSL <: ScalaDSL, Lib <: CoreLib[DSL]] {
val dsl: DSL
val lib: Lib with CoreLib[dsl.type]
val lib1: lib.dsl.type = dsl
val lib2: dsl.type = lib.dsl
val a: dsl.Done = ???
val b: lib.dsl.Done = a
val c: dsl.Done = b
}
while Dotty does.
Also, Dotty finally supports path-dependent types in constructor parameter lists and extends
clause, as in
class A
class B[T]
class Foo[T, U]
class Bar(val a: A)(val b: B[a.type]) extends Foo[a.type, b.type]
which will be useful.
Provide Libretto wrappers for working with files.
def feedback[X, A, B](f: (Pollable[X] |*| A) -⚬ (Pollable[X] |*| B)): A -⚬ B
Forward any poll
of the input Pollable[X]
to a poll
of the output Pollable[X]
, via double inversion.
Possible with existing primitives and definitions.
Have a principled way (or two) of how to do logging/tracing.
Future
-based implementationFuture
s don't support removing listeners. This leads to a couple of problems.
There is no way to let a Future
know that we are no longer interested in its result, not to mention to cascade that information to upstream Future
s.
Some recursive programs may lead to arbitrarily long chains of Promise
s.
To illustrate the problem, let's try to define our own flatMap
method on Future
:
def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = {
val pb = Promise[B]()
fa.onComplete {
case Success(a) => pb.completeWith(f(a))
case Failure(e) => pb.failure(e)
}
pb.future
}
Now imagine an infinite chain of such flatMap
s, as in
def loop(): Future[Unit] = {
// using *our* flatMap defined above
flatMap(step()) { _ =>
loop()
}
}
def step(): Future[Unit] = ???
loop()
will create an infinite chain of Promise
s, thus leaking resources.
This problem does not occur with Future
's flatMap
method, because it has a specialized implementation that avoids building up a chain of promises by re-linking the listeners. That's something not available to client code like the Libretto implementation.
Now, Libretto uses constructs with Promise
s like the one in the above implementation of flatMap
, but not exactly flatMap
s.
Avoid Scala Future
s altogether. Instead, introduce our own primitive that gives control over listeners. Also, it need not be as general purpose as Future
, but can exploit Libretto specifics, such as having exactly one listener.
CoreLib
should not depend on ScalaDSL
, only on CoreDSL
.
Depends on #44.
f andThen g
, where f: A -⚬ B
, g: B -⚬ C
, is suggestive of g
taking place later in time than f
, which is misleading. In case of negative information flow through B
(e.g. when B = Need
), there is an event in g
occurring before some event in f
.
Alternative name ideas: concat
, cat
, chain
, connect
, link
.
It's symbolic name, >>>
, is also misleading: it suggests that information flows from left to right, which again is wrong when there is negative information flow.
Alternative name ideas: <>
, <|>
, <:>
.
Same goes for f after g
: does not mean that f
is executed later in time than g
.
after
has been removed, the syntaxes f < g
and f ⚬ g
remain.
Wordy name ideas for ⚬
: compose
, tac
(reversed cat
).
Have a good story for IO and wrapping imperative, side-effecting APIs.
Update: provided by Resource Management (#10).
It will not support duplication, broadcast, discarding values (and thus anything that pre-polls).
def broadcastByKey[A, K](key: A => K): Pollable[A] -⚬ Unlimited[Val[K] =⚬ Pollable[A]]
There is a trivial implementation via broadcast
and filter
. A sophisticated implementation would not emit an element to all subscribers (only to be filtered out by most of them), but only to the ones subscribed to that key.
Ideally, subscriber to Moved to #31.k1
should not backpressure k2
.
Randomize the execution path of concurrent code.
Have a good story for resource management.
Interaction between a Libretto program and components of the host language can in principle be done in two ways:
Libretto program is the master that embeds and manages pockets of (effectful) host language code.
This is supported via Resource Management (#2, #10).
It has a framework feel to it—everything has to be shoehorned to satisfy the framework requirements, in this case, software components of the host language have to be recast as resources in a Libretto program. The problem is that it is not always possible or desirable to make Libretto the main driver of the application and subjugate everything else to it.
(Running) Libretto program is recast as a component in the host language interacted with by the usual means of the host language (methods, callbacks).
Benefits:
NB: The interfacing code in the host language does not have the nice properties of Libretto programs, like ensured linearity, but that is an inevitable price to pay for interfacing with the dirty world of the host language.
This task is to support the second approach, namely to embed an instantiated Libretto program into the host language, by providing an imperative interface for interacting with it.
NB: It is not acceptable to embed mutable objects in a blueprint and then manipulate them from the outside. This might seem obvious, but it is a common practice in IO monad-based effect libraries. For example, it is common to create a blueprint IO[A]
that captures a queue (mutable object) which is manipulated from outside the blueprint.
Make types such as Pollable[A]
, ... opaque.
Depends on mdoc supporting Scala 3.
ValSource.merge
favors the first input, which is a consequence of biased race
. Implement an unbiased variant where arguments to race
are swapped after each step.
loadFunction[A, B]: Val[A -⚬ B] -⚬ (A =⚬ B)
loadFunctionN[A, B]: Val[A -⚬ B] -⚬ Unlimited[A =⚬ B]
def partition[A, B]: Pollable[Either[A, B]] -⚬ (Pollable[A] |*| Pollable[B])
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.