Code Monkey home page Code Monkey logo

gtrs's Introduction

Go Typed Redis Streams

Go Report Card

Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?

consumer := NewGroupConsumer[MyType](...)
for msg := range consumer.Chan() {
  // Handle mssage
  consumer.Ack(msg)
}

Wait...it is! ๐Ÿ”ฅ

Quickstart

Define a type that represents your stream data. It'll be parsed automatically with all field names converted to snake case. Missing fields will be skipped silently. You can also use the ConvertibleFrom and ConvertibleTo interfaces to do custom parsing.

// maps to {"name": , "priority": }
type Event struct {
  Name     string
  Priority int
}

Consumers

Consumers allow reading redis streams through Go channels. Specify context, a redis client and where to start reading. Make sure to specify StreamConsumerConfig, if you don't like the default ones or want optimal performance. New entries are fetched asynchronously to provide a fast flow ๐Ÿš‚

consumer := NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})

for msg := range cs.Chan() {
  if msg.Err != nil {
    continue
  }
  var event Event = msg.Data
}

Don't forget to Close() the consumer. If you want to start reading again where you left off, you can save the last StreamIDs.

ids := cs.Close()

Group Consumers

They work just like regular consumers and allow sending acknowledgements asynchronously. Beware to use Ack only if you keep processing new messages - that is inside a consuming loop or from another goroutine. Even though this introduces a two-sided depdendecy, the consumer is avoids deadlocks.

cs := NewGroupConsumer[Event](ctx, rdb, "group", "consumer", "stream", ">")

for msg := range cs.Chan() {
  cs.Ack(msg)
}

Stopped processing? Check your errors ๐Ÿ”Ž

// Wait for all acknowledgements to complete
errors := cs.AwaitAcks()

// Acknowledgements that were not sent yet or their errors were not consumed
remaining := cs.Close()

Error handling

This is where the simplicity fades a litte, but only a little :) The channel provides not just values, but also errors. Those can be only of three types:

  • ReadError reports a failed XRead/XReadGroup request. Consumer will close the channel after this error
  • AckError reports a failed XAck request
  • ParseError speaks for itself

Consumers don't send errors on cancellation and immediately close the channel.

switch errv := msg.Err.(type) {
case nil: // This interface-nil comparison in safe
  fmt.Println("Got", msg.Data)
case ReadError:
  fmt.Println("ReadError caused by", errv.Err)
  return // last message in channel
case AckError:
  fmt.Printf("Ack failed %v-%v caused by %v\n", msg.Stream, msg.ID, errv.Err)
case ParseError:
  fmt.Println("Failed to parse", errv.Data)
}

All those types are wrapping errors. For example, ParseError can be unwrapped to:

  • Find out why the default parser failed via FieldParseError (e.g. assigning string to int field)
  • Catch custom errors from ConvertibleFrom
var fpe FieldParseError
if errors.As(msg.Err, &fpe) {
  fmt.Printf("Failed to parse field %v because %v", fpe.Field, fpe.Err)
}

errors.Is(msg.Err, errMyTypeFailedToParse)

Streams

Streams are simple wrappers for basic redis commands on a stream.

stream := NewStream[Event](rdb, "my-stream")
stream.Add(ctx, Event{
  Kind:     "Example event",
  Priority: 1,
})

Installation

go get github.com/dranikpg/gtrs

Gtrs is still in its early stages and might change in further releases.

Examples

Performance

go test -run ^$ -bench BenchmarkConsumer -cpu=1

The iteration cost on a mocked client is about 500-700 ns depending on buffer sizes, which gives it a throughput close to 2 million entries a second ๐Ÿš€. Getting bad results? Make sure to set large buffer sizes in the options.

gtrs's People

Contributors

dranikpg avatar nathan-cormier avatar hiendaovinh avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.