Code Monkey home page Code Monkey logo

bigqueue's Introduction

PkgGoDev MIT license Build Status codecov

Go Report Card Codacy Badge Maintainability CodeFactor

bigqueue

bigqueue provides embedded, fast and persistent queue written in pure Go using memory mapped (mmap) files. bigqueue is now thread safe as well.

Installation

go get github.com/grandecola/bigqueue

Requirements

  • Only works for linux and darwin OS
  • Only works on Little Endian architecture

Usage

Standard API

Create or open a bigqueue:

bq, err := bigqueue.NewMmapQueue("path/to/queue")
defer bq.Close()

bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:

bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()

Bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
	    bigqueue.SetMaxInMemArenas(10))
defer bq.Close()

In this case, bigqueue will never allocate more memory than 4KB*10=40KB. This memory is above and beyond the memory used in buffers for copying data.

Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. This is a best effort flush.

This is how we can set these options:

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))

In this case, a flush is done after every two mutate operations.

bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))

In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

Write to bigqueue:

err := bq.Enqueue([]byte("elem"))

bigqueue allows writing string data directly, avoiding conversion to []byte:

err := bq.EnqueueString("elem")

Read from bigqueue:

elem, err := bq.Dequeue()

we can also read string data from bigqueue:

elem, err := bq.DequeueString()

Check whether bigqueue has non zero elements:

isEmpty := bq.IsEmpty()

Advanced API

bigqueue allows reading data from bigqueue using consumers similar to Kafka. This allows multiple consumers from reading data at different offsets (not in thread safe manner yet). The offsets of each consumer are persisted on disk and can be retrieved by creating a consumer with the same name. Data will be read from the same offset where it was left off.

We can create a new consumer as follows. The offsets of a new consumer are set at the start of the queue wherever the first non-deleted element is.

consumer, err := bq.NewConsumer("consumer")

We can also copy an existing consumer. This will create a consumer that will have the same offsets into the queue as that of the existing consumer.

copyConsumer, err := bq.FromConsumer("copyConsumer", consumer)

Now, read operations can be performed on the consumer:

isEmpty := consumer.IsEmpty()
elem, err := consumer.Dequeue()
elem, err := consumer.DequeueString()

Benchmarks

Setup

goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
Go version: 1.16

NewMmapQueue

BenchmarkNewMmapQueue/ArenaSize-4KB-12         	     312	   3762844 ns/op	    2506 B/op	      36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128KB-12       	     310	   3897333 ns/op	    2506 B/op	      36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-4MB-12         	     301	   4033893 ns/op	    2504 B/op	      36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128MB-12       	     309	   3954329 ns/op	    2504 B/op	      36 allocs/op

Enqueue

BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12         	 1021855	      1172 ns/op	      15 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12         	  999122	      1178 ns/op	      15 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12      	 1000000	      1027 ns/op	      20 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12       	  258444	      4602 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12      	  246780	      4610 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12     	  271261	      4118 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12        	   10000	    108440 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12        	   10000	    108159 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12     	   10000	    104991 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12       	     330	   3619772 ns/op	      13 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12      	     339	   3502254 ns/op	      13 B/op	       0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12     	     336	   3478795 ns/op	      13 B/op	       0 allocs/op

EnqueueString

BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12   	  843966	      1186 ns/op	      15 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12   	 1000000	      1180 ns/op	      15 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12         	 1000000	      1026 ns/op	      15 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12          	  257824	      4642 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12         	  256230	      4621 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12        	  266560	      4101 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12           	   10000	    107929 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12           	   10000	    107948 ns/op	      14 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12        	   11434	    103482 ns/op	      13 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12          	     333	   3650641 ns/op	      13 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12         	     339	   3559835 ns/op	      13 B/op	       0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12        	     334	   3546090 ns/op	      13 B/op	       0 allocs/op

Dequeue

BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12                  	 1000000	      3201 ns/op	     142 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12                  	 1000000	      3187 ns/op	     142 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12               	 6737412	       174.0 ns/op	     128 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12                	  502522	      3478 ns/op	    4109 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12               	  516555	      3509 ns/op	    4109 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12              	 1000000	      1156 ns/op	    4096 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12                 	   29844	     39677 ns/op	  131085 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12                 	   30626	     39388 ns/op	  131085 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12              	   45805	     26247 ns/op	  131072 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12                	    1005	   1241554 ns/op	 4194316 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12               	    1257	   1164477 ns/op	 4194314 B/op	       1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12              	    1260	    884842 ns/op	 4194304 B/op	       1 allocs/op

DequeueString

BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12            	 1000000	      3200 ns/op	     142 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12            	 1000000	      3206 ns/op	     142 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12         	 6239718	       188.8 ns/op	     128 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12          	  501561	      3511 ns/op	    4109 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12         	  507860	      3535 ns/op	    4109 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12        	 1000000	      1236 ns/op	    4096 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12           	   29692	     39532 ns/op	  131085 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12           	   30268	     39709 ns/op	  131085 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12        	   46911	     25956 ns/op	  131072 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12          	     968	   1254574 ns/op	 4194316 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12         	    1429	   1175763 ns/op	 4194314 B/op	       1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12        	    1364	    865977 ns/op	 4194304 B/op	       1 allocs/op$$

Note: Before running benchmarks ulimit and vm.max_map_count parameters should be adjusted using below commands:

ulimit -n 50000
echo 262144 > /proc/sys/vm/max_map_count

bigqueue's People

Contributors

ashish-goswami avatar darkcoderrises avatar mangalaman93 avatar moredure avatar rohansuri avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bigqueue's Issues

Limit memory size not available

Bigqueue. Setarenasize() default 128M

Bigqueue. Setmaxinmememarenas() defaults to 3

Theoretically, the maximum number of Enqueue data is 128 * 3, but in practice, I can test unlimited Enqueue data.

How can I limit the Enqueue data to 1024m?

Support for limited memory use

Given that it is a queue, I think we at least need following pages in memory -

  • the page that is currently written
  • the page that is currently read

Further, we can keep more pages after the currently read page in memory as well for more performance.

Whenever we mmap a new page, before mmap, we need to ensure that we do not cross the provided threshold limit for memory usage. If so, we remove some pages from memory before proceeding.

Concurrent requests/reply IPC

I'm trying to improve the throughput of an app I'm building. Essentially, it currently uses Unix Domain Sockets to transfer messages between two processes. This is because they are well supported in many programming languages, and easy to use for request/reply.

But combining the two process into a single process I get 10x the TPS. So I know there is up-to 10x potential improvement.

My questions is, can mmap do this, give some constraints:

  • Request/reply (like HTTP).
  • Concurrent, multiple request/replies in flight.

Update APIs to use offsets

This is along the lines of how Kafka works. Given that bigqueue is persistent, it makes a lot of sense to not delete the data after it is read once. Certainly, it could be configured to do so, but that shouldn't be the default choice. Instead, we should allow using offsets per client so that the data could be read from anywhere in the queue as needed.

Add support for configuring bigqueue

We should use a Config object to allow configuring BigQueue. We should allow a useful default value for each configuration as well as ensure that configurations are set correctly using possible checks around each parameter. We should allow creating bigqueue without the config object using the default values.

Here is the list of configuration parameters -

  • Maximum size of Arena (check for a value at least as much as size of a OS page)
  • Maximum memory used by BigQueue, allow an option for using minimum possible memory, or all available memory (check for a value of at least as much as 2 * Size of Arena)
  • GC frequency (check for a positive value)

We will have to persist the configuration parameters so that we can read these parameters back across different invocation of same application.

I still think that we should keep the directory as an argument to NewBigQueue to ensure an explicit invocation of creating a queue using a path. Given that path is what defines a BigQueue, the expectation will be set properly in that if you lose the directory, you lose the queue

Why mmap?

Just curious, why mmap the files? It seems like simply writing serially and reading serially also should work, no? Would also give higher control over flushing and buffering.

Support for periodic GC

GC will delete files from disk that are not in use anymore. This will be done periodically or on demand.

bigqueue panics when benchmarks are run

goos: linux
goarch: amd64
pkg: github.com/grandecola/bigqueue
BenchmarkNewMmapQueue/ArenaSize-4KB-8         	     288	   4376092 ns/op	    2853 B/op	      46 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128KB-8       	     278	   4286689 ns/op	    2837 B/op	      46 allocs/op
BenchmarkNewMmapQueue/ArenaSize-4MB-8         	     282	   4316120 ns/op	    2835 B/op	      46 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128MB-8       	     282	   4317199 ns/op	    2817 B/op	      46 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8         	 1195894	      1006 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8         	  992317	      1037 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8      	 1240567	       967 ns/op	      53 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-8       	  321355	      3634 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-8      	  296962	      3627 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-8     	  337028	      3608 ns/op	      51 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-8        	   14071	     85243 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-8        	   14617	     82313 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-8     	   14323	     89502 ns/op	      52 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-8       	     469	   2677177 ns/op	      49 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-8      	     450	   3082011 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-8     	     444	   3013916 ns/op	      50 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-8   	 1000000	      1072 ns/op	      34 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-8   	 1152850	      1096 ns/op	      34 B/op	       1 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-8         	panic: runtime error: index out of range [0] with length 0

goroutine 1864 [running]:
github.com/grandecola/mmap.(*File).Flush(0xc0009e4d50, 0x4, 0xc0002f8628, 0x685bc0)
	/home/aman/gocode/pkg/mod/github.com/grandecola/[email protected]/mmap_data.go:93 +0xd6
github.com/grandecola/bigqueue.(*arenaManager).flush(0xc000ed0180, 0xc000b9e120, 0xc0004dce08)
	/home/aman/gocode/src/github.com/grandecola/bigqueue/arenamanager.go:135 +0xaf
github.com/grandecola/bigqueue.(*MmapQueue).Flush(0xc00021c000, 0x0, 0x0)
	/home/aman/gocode/src/github.com/grandecola/bigqueue/bigqueue.go:163 +0x8a
github.com/grandecola/bigqueue.(*MmapQueue).periodicFlush(0xc00021c000)
	/home/aman/gocode/src/github.com/grandecola/bigqueue/bigqueue.go:208 +0x1d3
created by github.com/grandecola/bigqueue.NewMmapQueue
	/home/aman/gocode/src/github.com/grandecola/bigqueue/bigqueue.go:96 +0x422
exit status 2
FAIL	github.com/grandecola/bigqueue	64.609s

Build a queue lock

A lock that provides write, read and delete lock.

Write Lock

Only one producer can write to BigQueue. Write is acceptable as long as it is at the tail of the queue.

Read Lock

Multiple consumers can simultaneously read from BigQueue. Read is acceptable as long as it is within the boundary of head to tail.

Delete Lock

This is fine, reads may fail if deletes are done before reads in the same region.

Microbenchmark

Add benchmark to find performance for each function in BigQueue. This will answer following questions -

  • Performance of read and write when size of the buffer is increased (with and without flush)
  • Performance for creating queues and closing them

Write multicore benchmark and Improve performance

Benchmarks currently single threaded. Now that bigqueue is thread safe, we should be able to utilize the multi core performance for a higher throughput. We can also profile the system and improve the performance.

Allow flushing periodically

We should expose the flush function as part of the bigqueue interface. Additionally, we should not trust the OS periodic syncing, and instead, enable flushing periodically, with a timer or probably by amount of data change, with configuration parameters to choose the period.

Allow calls to be blocked on empty queue

This is similar to a producer-consumer model, except that queue can never be full. We should potentially add an API such that instead of returning an error when queue is empty, we simply block the caller until an element is added and can be returned.

Document thread-safety

There's no documentation on the thread-safety of the API. Would be great to have some explicit statements there, describing the current state of things.

Add support for multithreading

We can try two difference approaches -

  • In this approach, we keep the queue single threaded and acquire lock from outside
  • In this approach, we acquire locks on all the subcomponents such as index, arena.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.