Code Monkey home page Code Monkey logo

garagemq's People

Contributors

andhe avatar bondarevpavel avatar dependabot[bot] avatar lbajolet avatar valinurovam avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

garagemq's Issues

Pluggable backend support?

It may be interesting see how it performs with different backends, I'm thinking in goleveldb (or the C++ implementation + cgo) and/or rocksdb

Any way to silence/reformat the logs?

Using the garageMQ library in my app, I see lots of logs coming from it, in a format that differs from mine. That is a problem for log consumers like CloudWatch or ElasticSearch FileBeats as they expect all logs to be in the same format. Can I somehow completely silence the logs coming from the library, or specify an output format? Currently, it looks like this (the first line is mine, and the rest is from garageMQ):

[2023-05-09 22:16:35.284] INFO Starting RabbitMQ Simulation service at amqp://localhost:55672
INFO[0000] Server starting pid=78687
INFO[0000] Open db storage engine=buntdb path=":memory:/buntdb/cf1e8c14e54505f60aa10ceb8d5d8ab3"
INFO[0000] Initialize default vhost vhost=/
INFO[0000] Initialize host message msgStorage
INFO[0000] Open db storage engine=buntdb path=":memory:/buntdb/179eac422549de60ed2ede0fdbec4b2b"
INFO[0000] Open db storage engine=buntdb path=":memory:/buntdb/179eac422549de60ed2ede0fdbec4b2b.transient"

[POSSIBLE BUG] - Publisher get frozen when publishing without any delay

I'm using BadgerDB as persistent DB and when trying to publish around 500K messages with Persistent messages for testing without any delay, it causes publisher to freeze.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"time"
	
	amqp "github.com/oarkflow/amqp/amqp091"
	
	grabbit "github.com/oarkflow/amqp"
)

func OnPubReattempting(name string, retry int) bool {
	log.Printf("callback_redo: {%s} retry count {%d}", name, retry)
	return true // want continuing
}

// OnNotifyPublish CallbackNotifyPublish
func OnNotifyPublish(confirm amqp.Confirmation, ch *grabbit.Channel) {
	log.Printf("callback: publish confirmed status [%v] from queue [%s]\n", confirm.Ack, ch.Queue())
}

// OnNotifyReturn CallbackNotifyReturn
func OnNotifyReturn(_ amqp.Return, ch *grabbit.Channel) {
	log.Printf("callback: publish returned from queue [%s]\n", ch.Queue())
}

func PublishMsg(publisher *grabbit.Publisher, start, end int) {
	message := amqp.Publishing{DeliveryMode: amqp.Persistent} // This just runs for some messages and just freezes
	// message := amqp.Publishing{}
	message.Headers = map[string]any{
		"next-queue": "I'm loving it",
	}
	data := make([]byte, 0, 64)
	buff := bytes.NewBuffer(data)
	
	for i := start; i < end; i++ {
		// <-time.After(1 * time.Millisecond)
		buff.Reset()
		buff.WriteString(fmt.Sprintf("test number %04d", i))
		message.Body = buff.Bytes()
		log.Println("going to send:", buff.String())
		
		if err := publisher.Publish(message); err != nil {
			log.Println("publishing failed with: ", err)
		}
	}
}

func main() {
	ctxMaster, ctxCancel := context.WithCancel(context.TODO())
	conn := grabbit.NewConnection("amqp://guest:guest@localhost:5672", amqp.Config{}, grabbit.WithConnectionCtx(ctxMaster))
	pubOpt := grabbit.DefaultPublisherOptions()
	pubOpt.WithKey("workload").WithContext(ctxMaster).WithConfirmationsCount(20)
	
	topos := make([]*grabbit.TopologyOptions, 0, 8)
	topos = append(topos, &grabbit.TopologyOptions{
		Name:          "workload",
		IsDestination: true,
		Durable:       true,
		Declare:       true,
	})
	publisher := grabbit.NewPublisher(conn, pubOpt,
		grabbit.WithChannelCtx(ctxMaster),
		grabbit.WithChannelTopology(topos),
		grabbit.OnChannelRecovering(OnPubReattempting),
		grabbit.OnPublishSuccess(OnNotifyPublish),
		grabbit.OnPublishFailure(OnNotifyReturn),
	)
	if !publisher.AwaitAvailable(30*time.Second, 1*time.Second) {
		log.Println("publisher not ready yet")
		ctxCancel()
		return
	}
	
	PublishMsg(publisher, 0, 500000)
}

Screenshot

Seems Channel is closed before all messages are being published

Does this MQ brocker support cluster?

We know RabbitMQ supports cluster and HA, but the Erlang implemented lets somebody have no choice. We want to search for a substitution.Does this MQ brocker support cluster?

[POSSIBLE BUG] - Server or badger seems to freeze when consumer is rerun when using Topic Exchange

When using topic exchange, the server freezes and doesn't respond.

Code example: https://gist.github.com/sujit-baniya/81894e8b0dc3b1c1f4934b0051e88414

Steps to reproduce:

  • Run server
  • Run consumer as go run receive.go "kern.*"
  • Emit payload as go run emit.go "kern.critical" "A critical kernel error"
  • Close the consumer using Ctrl + c
  • Rerun the consumer as go run receive.go "kern.*". Here it just freezes

Concern Regarding Code Quality of Badger

My Posts will be deleted on badger repository. So posting it here, since you guys use badger.

Badger team did changes on May 9th to introduce move keys.
dgraph-io/badger@7af0076

There were many bugs, issues related to same thing which were fixed over months. The last fix was done a week back. The issues ranged from badger being stuck in recursive loop during get to badger not returning data which was inserted.

dgraph-io/badger@d1185f0
dgraph-io/badger@af99e5f
dgraph-io/badger@d055ef4
dgraph-io/badger@3340933
dgraph-io/badger@2e3a32f

@manishrjain How do you plan to address the code quality of badger.

Changes for ARM

Hi. I try build gragemq for ARM(QNAP NAS) and get fail at work. I try made some changes for running code at ARM. All changes here. What you recommend improve in commit for pull request?

Support signaling readiness to systemd

Hi,

FYI I recently packaged garagemq for Debian (currently stuck in the NEW queue: http://ftp-master.debian.org/new.html ). While doing so I added a systemd service file that will allow to easily start up garagemq as a service on the system. FWIW The file can be found at: https://salsa.debian.org/go-team/packages/garagemq/blob/master/debian/garagemq.service (feel free to ship this upstream if you wish!). This lead up to the feature request I'm about to describe.....

Please consider supporting signalling "readiness" via the sd_notify protocol. This will allow systemd to properly track when garagemq is actually ready to start serving requests instead of blindly assuming it's up and running as soon as the binary has been launched (which opens up for a race and can cause depending services to fail if they start and fail to connect before garagemq is ready), i.e. with readiness support we can change from Type=simple to Type=notify in the service file which is much better.

I think the most popular helper library is https://github.com/coreos/go-systemd
Example usage can be found at eg. https://vincent.bernat.ch/en/blog/2017-systemd-golang

Note that the above example URL (vincent bernat) also talks about "liveness" (aka watchdog) which would be another really nice thing to have support for. Extensive information on the systemd watchdog concept can be found at http://0pointer.de/blog/projects/watchdog.html . The applications responsibility can be reduced down to a/ reading the WATCHDOG_USEC environment variable and (if set) b/ simply kicking the watchdog every WATCHDOG_USEC/2 from the main thread (ie. the best place where getting stuck might actually the watchdog kicking to stop happening).

I'd be happy to answer any questions or provide patches. If you have any opinions on the details for what a patch needs to get accepted for this I'd be happy to hear them as soon as possible! :)

copyright/licensing feedback

Hello,

While preparing a package of garagemq for Debian I had to investigate the licensing and copyright status in great details. I'd like to provide some feedback from my findings of some things that are problematic for Debian and some details you might want to make more clear that might benefit others.

  • The readme/*.jpg screenshots seems to contain an embedded ICC profile in their EXIF data which Apple claims copyright on. It's not clear which license these are provided under (if any?).[1] It would be nice if you could strip off the ICC profile from the images to save us the time having to think about legality of redistributing your screenshots.
  • LICENSE mentions your copyright and license, but it could possibly be nice to more clearly/explicitly point out that protocol/*.xml has a different copyright holder and license.
  • You spell your name diffently in LICENSE vs in the source (which makes your preference of how to be attributed in the Debian package copyright non-obvious), might want to make it consistent? (first last vs last first) :)
  • Possibly point out that admin-frontend/build directory contains compiled/obfucated nodejs files -- without providing the corresponding source files in the garagemq repository.[2]

[1]: Apple copyright claims in readme/*.jpg

$ grep -i copyright readme/*.jpg
Binary file readme/channels.jpg matches
Binary file readme/connections.jpg matches
Binary file readme/exchanges.jpg matches
Binary file readme/overview.jpg matches
Binary file readme/queues.jpg matches
$ for a in readme/*.jpg ; do echo ===== $a ; identify -verbose $a  | grep -i copy ; done
===== readme/channels.jpg
    icc:copyright: Copyright Apple Inc., 2018
===== readme/connections.jpg
    icc:copyright: Copyright Apple Inc., 2018
===== readme/exchanges.jpg
    icc:copyright: Copyright Apple Inc., 2018
===== readme/overview.jpg
    icc:copyright: Copyright Apple Inc., 2018
===== readme/queues.jpg
    icc:copyright: Copyright Apple Inc., 2018

[2]: In Debian, and probably more places, this is problematic for two reasons. a/ a clear separation between "source" (preferable form of modification) and built files are desired. b/ corresponding source (and its licenses) must always be available to be allowed into Debian.

Is it possible to use GarageMQ as an embedded RabbitMQ/AMQP server?

I am building an API service for use by a vendor. The API service has various dependencies, like other services it calls or RabbitMQ queues it listens to. I need to build a simulation for teh vendor to use for their testing, that doesn't depend on any external services. For external http services I use httptest, and for external RabbitMQ dependency I am looking for a library that could simulate a running RabbitMQ/AMQP server, but while being a part of my API (the pia will internal talk to the embedded RabbitMQ as if it was real RabbitMQ hosted outside).

It short, can I somehow run your implementation as a library, starting the AMQP server it from my code?

[POSSIBLE BUG] - Consumer gets auto shutdown at random when publisher publishes multiple times

package utils

import (
	"flag"
	"fmt"
	"log"
	
	amqp "github.com/rabbitmq/amqp091-go"
)

var Conn *amqp.Connection
var Ch *amqp.Channel
var PublishKey = "kern.critical"
var ConsumerKey = "kern.*"
var Exchange = "test_exchange"
var Exchange2 = "test_exchange2"
var ConsumerName = "go-amqp-example"

type User struct {
	FirstName string `json:"first_name"`
	LastName  string `json:"last_name"`
}

var (
	amqpURI = flag.String("amqp", "amqp://guest:guest@localhost:5672/", "AMQP URI")
)

func FailOnError(err error, msg string) {
	if err != nil {
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func Init() {
	flag.Parse()
	initAmqp()
}

func initAmqp() {
	var err error
	Conn, err = amqp.Dial(*amqpURI)
	FailOnError(err, "Failed to connect to RabbitMQ")
	
	log.Printf("got Connection, getting Channel...")
	Ch, err = Conn.Channel()
	FailOnError(err, "Failed to open a channel")
	
}
// consumer.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	
	amqp "github.com/rabbitmq/amqp091-go"
	
	"github.com/valinurovam/garagemq/examples/utils"
)

func main() {
	utils.Init()
	err := utils.Ch.ExchangeDeclare(utils.Exchange, amqp.ExchangeTopic, true, false, false, false, nil)
	utils.FailOnError(err, "Failed to declare the Exchange")
	
	q, err := utils.Ch.QueueDeclare("test-queue-name", true, false, false, false, nil)
	utils.FailOnError(err, "Error declaring the Queue")
	
	err = utils.Ch.QueueBind(q.Name, utils.ConsumerKey, utils.Exchange, false, nil)
	utils.FailOnError(err, "Error binding to the Queue")
	err = utils.Ch.Qos(100, 10, true)
	utils.FailOnError(err, "Error on Qos")
	replies, err := utils.Ch.Consume(q.Name, utils.ConsumerName, true, false, false, false, nil)
	utils.FailOnError(err, "Error consuming the Queue")
	
	count := 1
	for r := range replies {
		if count%10 == 0 {
			err := utils.Ch.PublishWithContext(context.Background(), utils.Exchange2, "abc.info", false, false, amqp.Publishing{
				Headers:         r.Headers,
				ContentType:     r.ContentType,
				ContentEncoding: r.ContentEncoding,
				DeliveryMode:    r.DeliveryMode,
				Priority:        r.Priority,
				CorrelationId:   r.CorrelationId,
				ReplyTo:         r.ReplyTo,
				Expiration:      r.Expiration,
				MessageId:       r.MessageId,
				Timestamp:       r.Timestamp,
				Type:            r.Type,
				UserId:          r.UserId,
				AppId:           r.AppId,
				Body:            r.Body,
			})
			utils.FailOnError(err, "Error re-publishing message")
			r.Nack(true, false)
		} else {
			var user utils.User
			json.Unmarshal(r.Body, &user)
			fmt.Printf("FirstName: %s, LastName: %s\n", user.FirstName, user.LastName)
		}
		count++
	}
}
// publisher.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"math/rand"
	"time"
	
	amqp "github.com/rabbitmq/amqp091-go"
	
	"github.com/valinurovam/garagemq/examples/utils"
)

func randomString(l int) string {
	bytes := make([]byte, l)
	for i := 0; i < l; i++ {
		bytes[i] = byte(randInt(65, 90))
	}
	return string(bytes)
}

func randInt(min int, max int) int {
	return min + rand.Intn(max-min)
}

func publishMessages(messages int) {
	for i := 0; i < messages; i++ {
		user := utils.User{
			FirstName: randomString(randInt(3, 10)),
			LastName:  randomString(randInt(3, 10)),
		}
		
		payload, _ := json.Marshal(user)
		err := utils.Ch.PublishWithContext(context.Background(), utils.Exchange, utils.PublishKey, false, false,
			amqp.Publishing{
				DeliveryMode: amqp.Persistent,
				ContentType:  "application/json",
				Body:         payload,
				Timestamp:    time.Now(),
			},
		)
		utils.FailOnError(err, "Failed to Publish on RabbitMQ")
	}
}

func main() {
	utils.Init()
	log.Println("Starting publisher...")
	publishMessages(10)
	time.Sleep(10 * time.Minute)
	defer utils.Ch.Close()
	defer utils.Conn.Close()
}

Video link: https://drive.google.com/file/d/1EaNCvXHRscRUyLDg0Jwsc9ENnC9uvGBd/view?usp=sharing

Support for durable queues?

If I try to open a durable queue, I get Exception (406) Reason: \"PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'local.incoming_orders': received 'true' but current is 'false'

        // open a queue
	q, err := channel.QueueDeclare(
		config.Queue, // name
		true,         // durable <---- THIS DOESN'T WORK
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)

@valinurovam - are durable queues not supported in your implementation?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.