Code Monkey home page Code Monkey logo

streamcache's Introduction

streamcache: in-memory caching stream reader

Go Reference Go Report Card License Workflow

Package streamcache implements a Go in-memory byte cache mechanism that allows multiple callers to read some or all of the contents of a source io.Reader, while only reading from the source reader once. When only the final reader remains, the cache is discarded and the final reader reads directly from the source. This is particularly useful for scenarios where multiple readers may wish to sample the start of a stream, but only one reader will read the entire stream.

Let's say we have a program typedetect, and we're reading from stdin. For example:

$ cat myfile.ext | typedetect  

In this scenario, typedetect wants to detect and print the type of data in the file/pipe, and then print the contents. That detection sampling could be done in a separate goroutine per sampler type. The input file could be, let's say, a JSON file, or an XML file.

The obvious approach is to inspect the first few tokens of the input, and check if the tokens are either valid JSON or valid XML. After that process, let's say we want to dump out a preview of the file contents.

Package streamcache provides a facility to create a Stream from an underlying io.Reader (os.Stdin in this scenario), and spawn multiple readers, each of which can operate independently, in their own goroutines if desired. The underlying source (again, os.Stdin in this scenario) will only once be read from, but its data is available to multiple readers, because that data is cached in memory.

That is, until there's only one final reader left, (after invoking Stream.Seal), at which point the cache is discarded, and the final Reader reads directly from the underlying source.

Usage

Add to your go.mod via go get:

go get github.com/neilotoole/streamcache

Here's a simple example that copies the contents of stdin to stdout and stderr, and prints the number of bytes read.

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "os"

    "github.com/neilotoole/streamcache"
)

// Write stdin to both stdout and stderr.
// Some error handling omitted for brevity.
func main() {
    ctx := context.Background()
    stream := streamcache.New(os.Stdin)

    r1 := stream.NewReader(ctx)
    go func() {
        defer r1.Close()
        io.Copy(os.Stdout, r1)
    }()

    r2 := stream.NewReader(ctx)
    go func() {
        defer r2.Close()
        io.Copy(os.Stderr, r2)
    }()
    
    stream.Seal()   // Indicate that there'll be no more readers...
    <-stream.Done() // Receives when all readers are closed.

    if err := stream.Err(); err != nil && !errors.Is(err, io.EOF) {
        fmt.Fprintln(os.Stderr, "error:", err)
        os.Exit(1)
    }

    fmt.Fprintf(os.Stdout, "Read %d bytes from stdin\n", stream.Size())
}

Executing the above program:

$ go install github.com/neilotoole/streamcache/examples/in-out-err
$ echo "hello world" | in-out-err
hello world
hello world
Read 12 bytes from stdin

Examples

  • in-out-err: copy stdin to both stdout and stderr.
  • typedetect: detect the type of input data, and print the head and tail of the contents. streamcache_typedetect.png
  • multicase: transform each line of input to upper, lower, and title case. streamcache_multicase.png

Related work

  • djherbis/fscache
  • sq uses streamcache to stream stdin / HTTP response bodies, allowing sq to being processing data on the fly. sq streamcache
  • fifomu is a FIFO mutex, used by streamcache, which in turn is used by sq.

streamcache's People

Contributors

neilotoole avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

streamcache's Issues

Add a "cache size limit" option

Per this reddit comment:

I would suggest having an optional variant that takes some sort of cache size limit as well, in the general spirit of "I should always be able to set limits on the resources an unbounded stream will consume". It is effectively impossible to implement that from the outside (you could but you'd basically be reimplementing the entire package for that). Exceeding the cache size can then return a special error for that case.

A potential solution (using func opts) might look like:

stream := streamcache.New(os.Stdin, streamcache.MaxCacheSize(1000000))

r := stream.NewReader(ctx)

if _, err := io.Copy(dest, r); err != nil {
  if errors.Is(err, streamcache.ErrCacheLimit) {
    // ... do something
  }
}

Maybe func opts are overkill when there's only one option. It could also be something like:

stream := streamcache.NewWithLimit(os.Stdin, 100000)

Or maybe just a method on Stream:

stream := streamcache.New(os.Stdin)
stream.SetMaxCacheSize(1000000)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.