Do not use. This is an exploratory project to learn more about what it takes to build typical real time systems.
Other projects you should probably consider instead:
This implements a trivial subset of an API commonly found in streaming data systems. You can have a Stream and can create various other streams from that stream using common modifiers like map, filter, scan, etc.. Eventually you create a sink that consumes results.
from streams import Stream
source = Stream()
L = []
stream = source.map(inc).scan(add)
stream.sink(L.append)
stream.sink(print)
Now as you feed data into the source all of the operations trigger as necessary
>>> for i in range(3):
... source.emit(i)
1
3
7
>>> L
[1, 3, 7]
You can use the typical map/filter/scan syntax. Everything can have multiple subscribers at any point in the stream.
Additionally everything responds to backpressure, so if the sink blocks the source will block (although you can add in buffers if desired). Additionally everything supports asynchronous workloads with Tornado coroutines, so you can do async/await stuff if you prefer (or gen.coroutine/yield in Python 2).
async def f(result):
... do non-blocking stuff with result ...
stream.sink(f) # f might impose waits like while a database ingests results
for i in range(10):
await source.emit(i) # waiting at the sink is felt here at the source
This means that if the sinks can't keep up then the sources will stop pushing data into the system. This is useful to control buildup.
By connecting sources to sinks you can create feedback loops. Here is a tiny web crawler:
source = Stream()
pages = source.unique()
content = (pages.map(requests.get)
.map(lambda x: x.content))
links = (content.map(get_list_of_links)
.concat())
links.sink(source.emit)
pages.sink(print)
>>> source.emit('http://github.com')
http://github.com
http://github.com/features
http://github.com/business
http://github.com/explore
http://github.com/pricing
...
This was not an intentional feature of the system. It just fell out from the design.
Everything above runs with normal Python in the main thread or optionally in a Tornado event loop. Alternatively this library plays well with Dask. You can scatter data to the cluster, map and scan things up there, gather back, etc..
source.to_dask()
.scatter()
.map(func) # Runs on a cluster
.scan(func) # Runs on a cluster
.gather()
.sink(...)
source = Source()
output = open('out')
s = source.map(json.loads) # Parse lines of JSON data
.timed_window(0.050) # Collect data into into 50ms batches
.filter(len) # Remove any batches that didn't have data
.to_dask().scatter() # Send to cluster
.map(pd.DataFrame) # Convert to pandas dataframes on the cluster
.map(pd.DataFrame.sum) # Sum rows of each batch on the custer
.scan(add) # Maintain running sum of all data on the cluster
.gather() # Collect results back to local machine
.map(str) # Convert to string
.sink(output.write) # Write to file
from some_kafka_library import KafkaReader
topic = KafkaReader().subscribe('data')
while True:
for line in topic:
source.emit(line)
This is still a toy library. It has never been used for anything. So presumably many things are wrong. I've tried to build a simple system that can grow if use cases arrive. Here are some obvious things that are missing:
- A lot of API. I recommend looking at the Rx or Flink APIs to get a sense of what people often need.
- Integration to collections like lists, numpy arrays, or pandas dataframes.
For example we should be able to think about streams of lists of things.
In this case
seq_stream.map(func)
would apply the function across every element in the constituent lists. - Thinking about time. It would be nice to be able to annotate elements with things like event and processing time, and have this information pass through operations like map
- Multi-stream operations like zip and joins
- More APIs for common endpoints like Kafka
- It's small
- It scales down and is very lightweight. Common operations can be used without concurrency, without event loops, without Dask. Nothing but pure Python.
- I think that it can probably combine well with Dask to do very large things