Code Monkey home page Code Monkey logo

lavinmq's Introduction

Build Status Build Status

LavinMQ

A message queue server that implements the AMQP 0-9-1 protocol. Written in Crystal.

Aims to be very fast, has low RAM requirements, handles very long queues, many connections, and requires minimal configuration.

Read more at LavinMQ.com

Installation

From source

Begin with installing Crystal. Refer to Crystal's installation documentation on how to install Crystal.

Clone the git repository and build the project.

git clone [email protected]:cloudamqp/lavinmq.git
cd lavinmq
make
sudo make install # optional

Now, LavinMQ is ready to be used. You can check the version with:

lavinmq -v

Debian/Ubuntu

curl -fsSL https://packagecloud.io/cloudamqp/lavinmq/gpgkey | gpg --dearmor | sudo tee /usr/share/keyrings/lavinmq.gpg > /dev/null
. /etc/os-release
echo "deb [signed-by=/usr/share/keyrings/lavinmq.gpg] https://packagecloud.io/cloudamqp/lavinmq/$ID $VERSION_CODENAME main" | sudo tee /etc/apt/sources.list.d/lavinmq.list
sudo apt-get update
sudo apt-get install lavinmq

If you need to install a specific version of LavinMQ, do so using the following command: sudo apt install lavinmq=<version>. This works for both upgrades and downgrades.

Fedora

sudo tee /etc/yum.repos.d/lavinmq.repo << 'EOF'
[lavinmq]
name=LavinMQ
baseurl=https://packagecloud.io/cloudamqp/lavinmq/fedora/$releasever/$basearch
gpgkey=https://packagecloud.io/cloudamqp/lavinmq/gpgkey
repo_gpgcheck=1
gpgcheck=0
EOF
sudo dnf install lavinmq

Usage

LavinMQ only requires one argument, and it's a path to a data directory.

Run LavinMQ with: lavinmq -D /var/lib/lavinmq

More configuration options can be viewed with -h, and you can specify a configuration file too, see extras/lavinmq.ini for an example.

Docker

Docker images are published to Docker Hub. Fetch and run the latest version with:

docker run --rm -it -p 5672:5672 -p 15672:15672 -v /var/lib/lavinmq:/tmp/amqp cloudamqp/lavinmq

You are then able to visit the management UI at http://localhost:15672 and start publishing/consuming messages to amqp://guest:guest@localhost.

Debugging

In Linux, perf is the tool of choice when tracing and measuring performance.

To see which syscalls that are made use:

perf trace -p $(pidof lavinmq)

To get a live analysis of the mostly called functions, run:

perf top -p $(pidof lavinmq)

A more detailed tutorial on perf is available here.

In OS X the app, Instruments that's bundled with Xcode can be used for tracing.

Memory garbage collection can be diagnosed with boehm-gc environment variables.

Contributing

Kindly read our contributing guide

LavinMQ with various plattforms

All AMQP client libraries work with LavinMQ and there are AMQP client libraries for almost every platform on the market. Here are guides for a couple of common plattforms.

  1. Ruby
  2. Node.js
  3. Java
  4. Python
  5. PHP
  6. Crystal

Performance

A single m6g.large EC2 instance, with a GP3 EBS drive (XFS formatted), can sustain about 700.000 messages/s (16 byte msg body, single queue, single producer, single consumer). A single producer can push 1.600.000 msgs/s and if there's no producers auto-ack consumers can receive 1.200.000 msgs/s.

Enqueueing 100M messages only uses 25 MB RAM. 8000 connection uses only about 400 MB RAM. Declaring 100.000 queues uses about 100 MB RAM. About 1.600 bindings per second can be made to non-durable queues, and about 1000 bindings/second to durable queues.

Implementation

LavinMQ is written in Crystal, a modern language built on the LLVM, with a Ruby-like syntax. It uses an event loop library for IO, is garbage collected, adopts a CSP-like concurrency model and compiles down to a single binary. You can liken it to Go, but with a nicer syntax.

Instead of trying to cache messages in RAM, we write all messages as fast as we can to disk and let the OS cache do the caching.

Each queues is backed by a message store on disk, which is just a series of files (segments), by default 8MB each. Message segments are memory-mapped files allocated using the mmap syscall. However, to prevent unnecessary memory usage, we unmap these files and free up the allocated memory when they are not in use. When a file needs to be written or read, we re-map it and use only the memory needed for that specific segment. Each incoming message is appended to the last segment, prefixed with a timestamp, its exchange name, routing key and message headers.

When a message is being consumed it reads sequentially from the segments. Each acknowledged (or rejected) message position in the segment is written to an "ack" file (per segment). If a message is requeued its position is added to a in memory queue. On boot all acked message positions are read from the "ack" files and then when deliviering messages skip those when reading sequentially from the message segments. Segments are deleted when all message in them are acknowledged.

Declarations of queues, exchanges and bindings are written to a definitions file (if the target is durable), encoded as the AMQP frame they came in as. Periodically this file is compacted/garbage-collected by writing only the current in-memory state to the file (getting rid of all delete events). This file is read on boot to restore all definitions.

All non-AMQP objects like users, vhosts, policies, etc. are stored in JSON files. Most often these type of objects does not have a high turnover rate, so we believe that JSON in this case makes it easy for operators to modify things when the server is not running, if ever needed.

In the data directory we store users.json and vhosts.json as mentioned earlier, and each vhost has a directory in which we store definitions.amqp (encoded as AMQP frames), policies.json and the messages named such as msgs.0000000124. Each vhost directory is named after the sha1 hash of its real name. The same goes for the queue directories in the vhost directory. The queue directories only has two files, ack and enq, also described earlier.

Flows

Here is an architectural description of the different flows in the server.

Publish

Client#read_loop reads from the socket, it calls Channel#start_publish for the Basic.Publish frame and Channel#add_content for Body frames. When all content has been received (and appended to an IO::Memory object) it calls VHost#publish with a Message struct. VHost#publish finds all matching queues, writes the message to the message store and then calls Queue#publish with the segment position. Queue#publish writes to the message store.

Consume

When Client#read_loop receives a Basic.Consume frame it will create a Consumer class and add it to the queue's list of consumers. Each consumer has a deliver_loop fiber that will be notified by an internal Channel when new messages are available in the queue.

Getting help

For questions or suggestions:

Features

  • AMQP 0-9-1 compatible
  • AMQPS (TLS)
  • HTTP API
  • Publisher confirm
  • Transactions
  • Policies
  • Shovels
  • Queue federation
  • Exchange federation
  • Dead-lettering
  • TTL support on queue, message, and policy level
  • CC/BCC
  • Alternative exchange
  • Exchange to exchange bindings
  • Direct-reply-to RPC
  • Users and ACL rules
  • VHost separation
  • Consumer cancellation
  • Queue max-length
  • Importing/export definitions
  • Priority queues
  • Delayed exchanges
  • AMQP WebSocket
  • Single active consumer
  • Replication
  • Stream queues

Currently missing but planned features

  • Automatic leader election in clusters

Known differences to other AMQP servers

There are a few edge-cases that are handled a bit differently in LavinMQ compared to other AMQP servers.

  • When comparing queue/exchange/binding arguments all number types (e.g. 10 and 10.0) are considered equivalent
  • When comparing queue/exchange/binding arguments non-effective parameters are also considered, and not ignored
  • TTL of queues and messages are correct to the 0.1 second, not to the millisecond
  • Newlines are not removed from Queue or Exchange names, they are forbidden

Replication

LavinMQ supports replication between a leader server and one or more followers. All changes on the leader is replicated to followers.

Replication configuration

A shared secret is used to allow nodes in a cluster to communicate, make sure that the .replication_secret file is the same in all data directores of all nodes.

Then enable the replication listener on the leader:

[replication]
bind = 0.0.0.0
port = 5679

or start LavinMQ with:

lavinmq --data-dir /var/lib/lavinmq --replication-bind 0.0.0.0 --replication-port 5679

Configure the follower(s) to connect to the leader:

[replication]
follow = tcp://hostname:port

or start LavinMQ with:

lavinmq --data-dir /var/lib/lavinmq-follower --follow tcp://leader.example.com:5679

Stream queues

Stream queues are like append-only logs and can be consumed multiple times. Each consumer can start to read from anywhere in the queue (using the x-stream-offset consumer argument) over and over again. Stream queues are different from normal queues in that messages are not deleted (see #retention) when a consumer acknowledge them.

Retention

Messages are only deleted when max-length, max-length-bytes or max-age are applied, either as queue arguments or as policies. The limits are checked only when new messages are published to the queue, and only act on whole segments (which by default are 8MiB), so the limits aren't necessarily exact. So even if a max-age limit is set, but no messages are published to the queue, messages might still be available in the stream queue that is way older that the limit specified.

Contributors

License

The software is licensed under the Apache License 2.0.

Copyright 2018-2024 84codes AB

LavinMQ is a trademark of 84codes AB

lavinmq's People

Contributors

antondalgren avatar baelter avatar carlhoerberg avatar dentarg avatar dependabot[bot] avatar jage avatar johanrhodin avatar kickster97 avatar magnushoerberg avatar markus812498 avatar nyior avatar oskgu360 avatar snichme avatar spuun avatar tbroden84 avatar viktorerlingsson avatar weistrand avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lavinmq's Issues

Make sure delivery_loop has exited before closing index files

^CShutting down gracefully...
vhost=/ queue=perf-test: Unexpected exception in deliver_loop: Closed stream (IO::Error)
from ../../../../usr/share/crystal/src/io.cr:118:5 in 'check_open'
from ../../../../usr/share/crystal/src/io/buffered.cr:131:15 in 'write'
from src/avalanchemq/segment_position.cr:36:7 in 'to_io'
from ../../../../usr/share/crystal/src/io.cr:861:5 in 'write_bytes'
from src/avalanchemq/queue/durable_queue.cr:107:9 in 'delete_message'
from src/avalanchemq/queue/durable_queue.cr:103:15 in 'delete_message'
from src/avalanchemq/queue/queue.cr:572:9 in 'expire_msg'
from src/avalanchemq/queue/queue.cr:554:11 in 'expire_messages'
from src/avalanchemq/queue/queue.cr:339:13 in 'consumer_or_expire'
from src/avalanchemq/queue/queue.cr:284:11 in 'deliver_loop'
from src/avalanchemq/queue/queue.cr:79:9 in '->'
from ../../../../usr/share/crystal/src/primitives.cr:255:3 in 'run'
from ../../../../usr/share/crystal/src/fiber.cr:92:34 in '->'
from ???

FreeBSD build fails `Error: No statfs implementation`

I've found a couple of failures while building on FreeBSD - if these can be fixed I can add avalanchemq to FreeBSD ports/packages. I will report it in crystal repo & link back to here.

> shards version
1.0.0-alpha.27

> crystal version
Crystal 0.36.1 ()

LLVM: 10.0.1
Default target: x86_64-portbld-freebsd13.0

> uname -a
FreeBSD wintermute 13.0-RC4 FreeBSD ... amd64

> gsh
commit a8c83ffc5d458d672758be35ec78a4752151332c (HEAD -> main, origin/main, origin/HEAD)

> shards build --release --production
Resolving dependencies
Fetching https://github.com/cloudamqp/amq-protocol.cr.git
Fetching https://github.com/tbrand/router.cr.git
Fetching https://github.com/luislavena/radix.git
Fetching https://github.com/schovi/baked_file_system.git
Fetching https://github.com/cloudamqp/amqp-client.cr.git
Fetching https://github.com/84codes/http-protection.git
Fetching https://github.com/84codes/systemd.cr.git
Fetching https://github.com/crystal-lang/logger.cr.git
Using radix (0.4.0)
Using router (0.2.7)
Installing baked_file_system (0.9.8 at fd6ba88)
Installing amq-protocol (1.0.0)
Installing amqp-client (1.0.0)
Installing http-protection (0.2.0 at 893a191)
Using systemd (1.1.2)
Installing logger (0.1.0)
Building: avalanchemq
Error target avalanchemq failed to compile:
Showing last frame. Use --error-trace for full trace.

In src/avalanchemq.cr:2:1

 2 | require "./stdlib/*"
     ^
Error: No statfs implementation

FreeBSD's statfs(2) is simlar to Darwin:

      *	filesystem statistics
      */

     #define MFSNAMELEN	     16		     /*	length of type name including null */
     #define MNAMELEN	     1024	     /*	size of	on/from	name bufs */
     #define STATFS_VERSION  0x20140518	     /*	current	version	number */

     struct statfs {
     uint32_t f_version;	     /*	structure version number */
     uint32_t f_type;		     /*	type of	filesystem */
     uint64_t f_flags;		     /*	copy of	mount exported flags */
     uint64_t f_bsize;		     /*	filesystem fragment size */
     uint64_t f_iosize;		     /*	optimal	transfer block size */
     uint64_t f_blocks;		     /*	total data blocks in filesystem	*/
     uint64_t f_bfree;		     /*	free blocks in filesystem */
     int64_t  f_bavail;		     /*	free blocks avail to non-superuser */
     uint64_t f_files;		     /*	total file nodes in filesystem */
     int64_t  f_ffree;		     /*	free nodes avail to non-superuser */
     uint64_t f_syncwrites;	     /*	count of sync writes since mount */
     uint64_t f_asyncwrites;	     /*	count of async writes since mount */
     uint64_t f_syncreads;	     /*	count of sync reads since mount	*/
     uint64_t f_asyncreads;	     /*	count of async reads since mount */
     uint64_t f_spare[10];	     /*	unused spare */
     uint32_t f_namemax;	     /*	maximum	filename length	*/
     uid_t     f_owner;		     /*	user that mounted the filesystem */
     fsid_t    f_fsid;		     /*	filesystem id */
     char      f_charspare[80];		 /* spare string space */
     char      f_fstypename[MFSNAMELEN]; /* filesystem type name */
     char      f_mntfromname[MNAMELEN];	 /* mounted filesystem */
     char      f_mntonname[MNAMELEN];	 /* directory on which mounted */
     };

A partial patch for src/stdlib/filesystem.cr looks like this, but I have no idea yet how the fields (with C comments still) map to crystal syntax/structures:

  {% elsif flag?(:freebsd) %}
    struct Statfs
     #define STATFS_VERSION  0x20140518	     /*	current	version	number */
      version : UInt32
      type : UInt32
      flags : UInt64
      bsize : UInt64
      iosize : UInt64
      blocks : UInt64
      bfree : UInt64
      bavail : Int64
      files : UInt64
      ffree : Int64
      ffree : UInt64
      syncwrites : UInt64
      asyncwrites : UInt64
      syncreads : UInt64
      asyncreads : UInt64
     uint64_t f_spare[10];	     /*	unused spare */
      namemax : UInet32
     uid_t     f_owner;		     /*	user that mounted the filesystem */
     fsid_t    f_fsid;		     /*	filesystem id */
     char      f_charspare[80];		 /* spare string space */
     char      f_fstypename[MFSNAMELEN]; /* filesystem type name */
     char      f_mntfromname[MNAMELEN];	 /* mounted filesystem */
     char      f_mntonname[MNAMELEN];	 /* directory on which mounted */
    end

There's a similar error for getrlimit getrlimit(2)

Error target avalanchemq failed to compile:
Showing last frame. Use --error-trace for full trace.

In src/stdlib/resource.cr:21:22

 21 | fun getrlimit(Int, Rlimit*) : Int
                         ^-----
Error: undefined constant Rlimit

which appears to be https://cgit.freebsd.org/src/tree/sys/sys/resource.h#n112

#define	RLIMIT_NOFILE	8		/* number of open files */
#define	RLIMIT_SBSIZE	9		/* maximum size of all socket buffers */

I'd be happy to collaborate on this if you're willing to help out with the crystal syntax etc.

Can't import definitions file

Trying to import the attached JSON results in an error (Upload failed). The following is logged:

Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]: vhost=padolfex: Compacting definitions
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]: amqpserver: vhost=padolfex created
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]: httpserver: method=POST path=/api/definitions/upload status=500 error=Missing hash key: "guest" (KeyError)
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/vhost_store.cr:34:7 in 'create'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/http/controller/definitions.cr:130:11 in 'import_definitions'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/http/controller/definitions.cr:29:15 in '->'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/primitives.cr:255:3 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call_next'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/http/handler/basic_auth.cr:31:26 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call_next'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/http/controller/static.cr:20:11 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call_next'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from home/runner/work/avalanchemq/avalanchemq/src/avalanchemq/http/handler/defaults_handler.cr:13:9 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call_next'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handlers/compress_handler.cr:12:5 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/handler.cr:28:7 in 'call'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server/request_processor.cr:51:11 in 'process'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/http/server.cr:498:5 in '->'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from usr/share/crystal/src/primitives.cr:255:3 in 'run'
Sep 13 20:45:42 test-freezing-alpaca-01 avalanchemq[4996]:   from ???```

Message bodies not deleted from disk on queue purge/delete

To reproduce:

  1. Generate a file with somewhat larger messages
  2. Send them to an AvalancheMQ server (for example dd if=file | amqpcat --producer --uri=$CLOUDAMQP_URL --exchange testing --routing-key blah)
  3. Purge the queue.
    At this point I expect disk usage to drop down to zero. It does not.
  4. Delete the queue.
    At this point I expect disk usage to drop down to zero. It does not.

avalanchemqctl set_policy inconsistency. Can't set priority

  • When i run for example
    avalanchemqctl set_policy 'queue_A' '{"message-ttl": 50}'
    I get "Command failed:" but the policy is created
    (If the policy exists the command is executed, and policy is updated without any error.)

  • I try to apply a priority but it is ignored. for example
    avalanchemqctl set_policy Policy4 'queue_A' '{"message-ttl": 50}' --priority 8
    The policy is created/updated but the priority is ignored and defaults to 0

Unknown key name 'StartLimitIntervalSec' in section 'Service'

When upgrading to 1.0.0-alpha.30, the following is logged:

Apr 27 07:47:18 host-01 systemd[1]: Stopped AvalancheMQ.
Apr 27 07:47:18 host-01 systemd[1]: Reloading.
Apr 27 07:47:19 host-01 systemd[1]: /lib/systemd/system/avalanchemq.service:12: Unknown key name 'StartLimitIntervalSec' in section 'Service', ignoring.
Apr 27 07:47:19 host-01 systemd[1]: iscsid.socket: Socket unit configuration has changed while unit has been running, no open socket file descriptor left
. The socket unit is not functional until restarted.
Apr 27 07:47:19 host-01 systemd[1]: lvm2-lvmpolld.socket: Socket unit configuration has changed while unit has been running, no open socket file descript
or left. The socket unit is not functional until restarted.
Apr 27 07:47:19 host-01 systemd[1]: Reloading.
Apr 27 07:47:20 host-01 systemd[1]: /lib/systemd/system/avalanchemq.service:12: Unknown key name 'StartLimitIntervalSec' in section 'Service', ignoring.
Apr 27 07:47:20 host-01 systemd[1]: iscsid.socket: Socket unit configuration has changed while unit has been running, no open socket file descriptor left
. The socket unit is not functional until restarted.
Apr 27 07:47:20 host-01 systemd[1]: lvm2-lvmpolld.socket: Socket unit configuration has changed while unit has been running, no open socket file descript
or left. The socket unit is not functional until restarted.
Apr 27 07:47:20 host-01 systemd[1]: Starting Message of the Day...
Apr 27 07:47:20 host-01 systemd[1]: motd-news.service: Succeeded.
Apr 27 07:47:20 host-01 systemd[1]: Finished Message of the Day.
Apr 27 07:47:20 host-01 systemd[1]: Starting AvalancheMQ...

Support for limits

  • Max connections per vhost #372
  • Max connections per user
  • Messages per second
  • Operations per second (declaring things)
  • Max queues per vhost #372
  • Max exchanges
  • Max bindings
  • Max channels

Get containerized/cgroup memory maximum

Podman/cgroupV2 exposes /sys/fs/cgroup/memory.max (and others) inside the container. Use that instead, when available, when checking physical memory available. (It can change, so poll it)

memory.current might be a better alternative than RSS too

Replacing a policy for a queue can fail to update queue behavior

Using 1.0.0.pre.alpha.19

Steps:

  1. Create a queue 'mess_limit'
  2. Add policy for queue with max_length_byte set to 1000
  3. Publish some messages that will exceed limit. Policy is enforced
  4. Add policy with greater priority for queue with max_length set to 1000000
  5. Publish messages messages [doesn't matter if older messages are purged]

**** The max_length_byte limit is still applied to the queue. Should be max_length

I tried other variations:

  • first max_length limit, then Max_length_byte
  • max_length_byte, then smaller max_length_byte

Both of those worked as expected.

server locks up during throughput load test

issue

  • after load testing, avalanche locks up completely and will not shut down
  • kill -TERM doesn't work, nor control-C
  • kill -9 succeeds

environment

reproduce

  • grab a FreeBSD 13.0-RELEASE system
# pkg bootstrap -f
# pkg install -y security/sudo
# pkg add https://pkg.cabal5.net/FreeBSD:13:amd64/All/avalanchemq-1.0.0.a.31.txz
### amend /usr/local/etc/avalanchemq/config.ini as required
# sudo -u avalanchemq avalanchemq --config=/usr/local/etc/avalanchemq/config.ini
### in a new terminal
# avalanchemqperf throughput
... wait a bit

Now, try to control-c in both windows, doing avalanchemqperf first. I have had hangs on either end regularly.

dtruss of processes shows:

  • client hangs are repeatedly doing sendto(11,"\^A\0\^A\0\0\0\^R\0<\0(\0\0\0\tp"...,72,0,NULL,0) = 72 (0x48)
  • yet server has already reported it closed the connection:
2021-08-07 09:33:56 +00:00 [INFO] amqpserver: Updated permissions to vhost=/ for user=__direct
2021-08-07 09:33:56 +00:00 [WARN] : Certificate for AMQPS not configured
2021-08-07 09:33:56 +00:00 [INFO] httpserver: Bound to 100.64.59.243:15672
2021-08-07 09:33:56 +00:00 [INFO] httpserver: Bound to /var/run/avalanchemq/http.sock
2021-08-07 09:33:56 +00:00 [INFO] httpserver: Bound to /tmp/avalanchemq/http.sock
2021-08-07 09:33:56 +00:00 [INFO] amqpserver: Listening on 100.64.59.243:5672
2021-08-07 09:33:56 +00:00 [INFO] amqpserver: Listening on /var/run/avalanchemq/avalanchemq.sock
2021-08-07 09:34:03 +00:00 [INFO] vhost=/ client=100.64.59.243:55121: Connection (100.64.59.243:55121 -> 100.64.59.243:5672) established for user=guest
2021-08-07 09:34:03 +00:00 [INFO] vhost=/ client=100.64.59.243:55122: Connection (100.64.59.243:55122 -> 100.64.59.243:5672) established for user=guest
2021-08-07 09:34:58 +00:00 [INFO] vhost=/: GC segments, collecting sps used 0B memory
2021-08-07 09:34:59 +00:00 [INFO] vhost=/: GC segments, garbage collecting used 0B memory
2021-08-07 09:34:59 +00:00 [INFO] vhost=/: GC segments, garbage collecting took 417.123632 ms
2021-08-07 09:34:59 +00:00 [INFO] vhost=/: GC segments, compact internal queues used 0B memory
2021-08-07 09:34:59 +00:00 [INFO] vhost=/: GC segments, GC collect used 0B memory
load: 7.02  cmd: avalanchemq 77562 [running] 128.34r 69.19u 38.70s 100% 4470060k
sys_recvfrom+0x86 amd64_syscall+0x10c fast_syscall_common+0xf8 
GC Warning: Repeated allocation of very large block (appr. size 3221229568):
        May lead to memory leak and poor performance
2021-08-07 09:38:33 +00:00 [WARN] vhost=/ client=100.64.59.243:55122 channel=1: Error when publishing message #<OverflowError:Arithmetic overflow>
2021-08-07 09:38:33 +00:00 [ERROR] vhost=/ client=100.64.59.243:55122: Unexpected error, while reading: Arithmetic overflow (OverflowError)
  from ???
  from ???
  from ???
  from ???
  from ???
  from ???
  from ???
  from ???
  from ???

2021-08-07 09:38:33 +00:00 [INFO] vhost=/ client=100.64.59.243:55122: Closing, INTERNAL_ERROR - Arithmetic overflow
2021-08-07 09:38:33 +00:00 [INFO] vhost=/ client=100.64.59.243:55122: Connection=100.64.59.243:55122 -> 100.64.59.243:5672 disconnected

Config option if connections via unix socket are TLS terminated already, and show that in the UI

Currently when we have nginx as a TLS terminator in front of avalanchemq all connections coming from there looks like they're non-TLS connections. It's because nginx doesn't support the proxy protocol version 2. That will be confusing to users that don't understand why their connections aren't over TLS. So let's make a config option where all connections from that endpoint are being marked as TLS connections regardless.

Calling delete on a consumer that has already been deleted

Calls to AvalancheMQ::Vhost::Queue#rm_consumer is done more than once for a consumer.

The consumer is not deleted from the AvalancheMQ::Client::Channel@consumers array when calls to AvalancheMQ::Client::Channel::Consumer#cancel is done.

KeyError on restart

When restarting avalanchemq we get a KeyError which results in the broker never starts and some data has to be wiped in order for the broker to come back up.

This only affects the broker if it has queues that has x-message-ttl

After some researching I've narrowed it down to the the issue is that an SP loaded from enq file reference a segment that doesn't exist anymore. It want to read from the segment to get metadata to find out if the message should be expired or not.

Still errors on KeyError on restart in alpha.21, now we handle the error but it should occur in the first place.

Table headers and data desync on page refresh

Screenshot 2021-01-20 at 20 07 35

Steps to reproduce:

  1. Create a queue or similar data and go to the queues page
  2. Filter out some header colums via the +/- button
  3. Refresh the page or similar
  4. Table Header columns are reset but data is not -> creates confusion

Deleted queues got recreated on server crash

Seen on: v1.0.0-alpha.22
What happened:

  1. queue got created through amqpcat
  2. queue got manually deleted through UI
  3. Server crashed due to GC error (Maybe restart does it aswell?)
  4. On startup queue got recreated

Jan 20 13:54:46 [59652]: vhost=****** queue=testqueue: (messages=0) Deleted
Jan 20 14:58:48 [66463]: vhost=****** queue=testqueue: Restoring index
Jan 20 14:58:48 [66463]: vhost=****** queue=testqueue: 0 messages

Pause queue should only pause the consumers on the queue

Pause queue should pause the consumers on the queue but it should be possible to get/reject messages on the queue from the UI and API

The idea behind this feature is to be able to remove messages from a queue without the need to stop the consumers.
When pause all consumers should be stopped and all messages should be moved from unacked to ready.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.