Code Monkey home page Code Monkey logo

aws-stream-sort's Introduction

aws-stream-sort

cursor / priority queue using DynamoDB Streams (optionally) and Lambda (no SQS). To enable the following:

out-of-order messages -> [lambda using this library] -> in-order event stream from DynamoDB

Usage

const co = require('co').wrap
const { createAutpilot } = require('aws-cursor')
const docClient = new AWS.DynamoDB.DocumentClient()
const autopilot = createAutopilot({
  docClient,
  cursorTable: 'MyCursorTableName',
  itemsTable: 'MyItemsTableName',
  queueProp: 'author',
  seqProp: 'seq'
})

// lambda - incoming message processor

exports.enqueue = co(function* (event, context, callback) {
  try {
    const result = yield autopilot.put(event)
    if (result.new > result.old) {
      // you can safely process items with sequence numbers `result.old < seq <= result.new`
      // or you can stream from the CursorTable to another lambda and process there (below)
    }

    callback()
  } catch (err) {
    console.error(err)
    callback(err)
  }
})

// lambda - cursor table stream processor

exports.process = co(function* (event, context, callback) {
  // inspect OldImage and NewImage
  // to get change in cursor position
  // process batch indicated by the change in position
})

The algorithm

Given two tables:

  • items: table for your events
  • cursor: table for storing cursor position for queues Q1, Q2, ...

For incoming message M, with sequence number S, which falls into queue Q1

  1. Store M in items
  2. If S is not the next sequence number in Q1, continue to step 3 (else, exit)
  3. increment the cursor value for Q1
  4. scan ahead batchSize items in items to see if we have future messages that arrived out-of-order, and update the cursor for Q1 again
  5. repeat step 4 as long as there are more in-order items ahead

The DynamoDB stream from the Cursor table will be in-order per queue, with the old and new cursor positions for that queue in OldImage and NewImage respectively

Setup

  1. queueProp + seq should be your partition + sort keys in the items table
  2. The cursor table should have queueProp as its partition key (no sort key)

See ./test/inbox-table-schema.json and ./test/cursor-table-schema.json for an example of a compatible schema.

Testing

To run the tests, first run localstack

aws-stream-sort's People

Contributors

mvayngrib avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.