Code Monkey home page Code Monkey logo

server's People

Contributors

alexsporn avatar bkupidura avatar boskywsmfn avatar clarkqaq avatar dadebue avatar deadprogram avatar dependabot[bot] avatar dgduncan avatar eitol avatar gsagula avatar helderjnpinto avatar heya-naohiro avatar ianrose14 avatar jeroenrinzema avatar jmacd avatar jphastings avatar kenuestar avatar mochi-co avatar muxxer avatar plourdedominic avatar rkennedy avatar soyoo avatar stffabi avatar thedevop avatar tommyminds avatar werbenhu avatar wind-c avatar x20080406 avatar xyzj avatar zgwit avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

server's Issues

The OnConnect and the OnDisconnect event hook trigger order is wrong when using the same clientid

my implement code

        server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
		fmt.Printf("<< OnConnect client connected clientid=%s\n", cl.ID)
	}

	server.Events.OnDisconnect = func(cl events.Client, err error) {
		fmt.Printf("<< OnDisconnect client disconnected clientid=%s, err=%v\n", cl.ID, username, err)
	}

client 1 connects to the server, it prints

<< OnConnect client connected clientid=mqttx_f988b2da

client 2 use the same clientid as client 1 connets to the server, it prints

<< OnConnect client connected clientid=mqttx_f988b2da
<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established

i think when client 2 use the same clientid as client 1 connets to the server, it should print like

<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established
<< OnConnect client connected clientid=mqttx_f988b2da

it should be trigger the OnDisconnect before the OnConnect

[Missing package issue] C-based client gets disconnected

Folks, we have a simple unit test on a cross-stacked client written in C (the most prominent one in ICT industry) please help check below logs and attachment
Ref:
https://github.com/emqx/nanomq
nanomq client built from source

D:\Demos\mqtt\examples\events>go run main.go
Mochi MQTT Server initializing... TCP
  Started!  
<< OnConnect client connected nanomq-d28f8d41: {FixedHeader:{Remaining:27 Type:1 Qos:0 Dup:false Retain:false} AllowClients:[] Topics:[] ReturnCodes:[] ProtocolName:[77 81 84 84] Qoss:[] Payload:[] Username:[] Password:[] WillMessage:[] ClientIdentifier:nanomq-d28f8d41 TopicName: WillTopic: PacketID:0 Keepalive:60 ReturnCode:0 ProtocolVersion:4 WillQos:0 ReservedBit:0 CleanSession:true WillFlag:false WillRetain:false UsernameFlag:false PasswordFlag:false SessionPresent:false}
<< OnDisconnect client dicconnected nanomq-d28f8d41: missing packet id
D:\Demos\gomqtt\nanomq\build\nanomq>nanomq.exe sub start -t "direct/publish"
connect_cb: connected!
disconnected
D:\Demos\mqtt\examples\events>go env
set GO111MODULE=
set GOARCH=amd64
set GOBIN=D:\Progra~1\Go\bin
set GOCACHE=C:\Users\phineas\AppData\Local\go-build
set GOENV=D:\Roaming\go\env
set GOEXE=.exe
set GOEXPERIMENT=
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOINSECURE=
set GOMODCACHE=D:\Progra~1\Go\goget\pkg\mod
set GONOPROXY=
set GONOSUMDB=
set GOOS=windows
set GOPATH=D:\Progra~1\Go\goget
set GOPRIVATE=
set GOPROXY=https://proxy.golang.org,direct
set GOROOT=D:\Progra~1\Go
set GOSUMDB=sum.golang.org
set GOTMPDIR=
set GOTOOLDIR=D:\Progra~1\Go\pkg\tool\windows_amd64
set GOVCS=
set GOVERSION=go1.18
set GCCGO=gccgo
set GOAMD64=v1
set AR=ar
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=D:\Demos\gomqtt\MQTT.js\mqtt\go.mod
set GOWORK=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fmessage-length=0 -fdebug-prefix-map=C:\Users\phineas\AppData\Local\Temp\go-build1490508260=/tmp/go-build -gno-record-gcc-switches

Concurrent map access for clients and inflights causes data race

Earlier today, @muXxer identified an issue in which the broker would crash with fatal errors, caused when the primary server routine and clients attempted to read and write from the inflight messages map simultaneously.

This is a new issue as of 1.3.0, and resulted from the changes made to inflight handling - specifically, allowing the server event loop to scan and resend inflights (where previously, inflights would only be engaged during connection establishment or within the client itself).

The collision occurred between three locations:

  1. Map reads at github.com/mochi-co/mqtt/server.(*Server).ResendClientInflight() (mqtt/server/server.go:912)
  2. Map writes at github.com/mochi-co/mqtt/server/internal/clients.(*Inflight).Delete() (server/internal/clients/clients.go:570)
  3. Map writes at github.com/mochi-co/mqtt/server/internal/clients.(*Clients).Add() (server/internal/clients/clients.go:48)

The problem was reproducible by placing the server under heavy load with inovex/mqtt-stresser, using qos=2 for both the publisher and subscriber: mqtt-stresser -broker tcp://localhost:1883 -num-clients=200 -num-messages=100 -subscriber-qos=2 -publisher-qos=2

A patch is immediately forthcoming and will be released as v1.3.2

Slow clients slow down the whole broker

We are using the MQTT broker and publishing messages directly to all clients using the broker's Publish() func.
This func adds a new publish packet to the inlineMessages.pub buffered channel (size 1024) and the inlineClient() loop will publish those packets to all subscribed clients.
For each subscribed client this will call client.WritePacket() which in the end will call Write() on the clients writer.

If a single subscribed client is too slow, the clients write buffer will fill up and the whole inlineClient() loop will hang until this client's buffer has space again (see awaitEmpty inside Write()). Shortly after the inlineMessages.pub buffered channel will fill up and further calls to Publish() will hang.

This means a single slow client (even one using QoS 0 with no guarantees of receiving packets) can make the whole broker wait indefinitely and not deliver any more packets to any client.

A possible workaround for this could be to instead of waiting for the buffer to be freed, to just return a "client buffer full" error and skip sending the packet to this client. If the client is using QoS 1/2 the inflight message retry mechanism should try to re-deliver the message.

What do you think? I can write a PR with this changes. Or do you have a better solution to this problem?

Event OnMessage stop publish message

Hi,
I just discovered this software and i liked it. Thanks.

One question:

I want to generate new messages to diferent topics based on original message, but i want to not publish the original.

Is posible to delete (not publish) a message ?

Lock values are being copied in buffer and clients

go vet reports that lock values are being copied in clients.Inflight and circ.Buffer. Additionally, the signature for the buffer WriteTo is incorrect.

# github.com/mochi-co/mqtt/server
server/server.go:224:18: assignment copies lock value to cl.Inflight: github.com/mochi-co/mqtt/server/internal/clients.Inflight
# github.com/mochi-co/mqtt/server/internal/circ
server/internal/circ/buffer.go:77:9: return copies lock value: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:18:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:28:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:19:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:29:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/pool_test.go:11:20: call of require.NotNil copies lock value: sync.Pool contains sync.noCopy
server/internal/circ/writer.go:34:18: method WriteTo(w io.Writer) (total int, err error) should have signature WriteTo(io.Writer) (int64, error)

The above issues should be corrected.

Wrong go versioning in v2.0.0

Looks like versioning in v2.0.0 is broken.

% go get -v github.com/mochi-co/mqtt@latest
go: added github.com/mochi-co/mqtt v1.3.2
% go get -v github.com/mochi-co/[email protected]
go: github.com/mochi-co/[email protected]: invalid version: module contains a go.mod file, so module path must match major version ("github.com/mochi-co/mqtt/v2")

Stale inflight messages

I start observing increased number of inflight messages which are almost never going down. In same time i didnt notice any disconnects from clients - so looks like clients just didnt received message and they will never send ACK (network issue on client side?)

I see that there is ResendClientInflight (https://github.com/mochi-co/mqtt/blob/master/server/server.go#L878), but looks like its executed only on new client connection (send messages from persistence store to reconnecting clients?).

Is there any internal mechanism which will try to deliver inflight messages or mochi-co/mqtt user should implement this on his own?

Add Client Username to Event Client Info

The events Client struct should expose the client's username to the event hook receiver functions.

Discussed in #77

Originally posted by youmisun June 11, 2022
The client information of ondisconnet is too limited, I hope to include username informationใ€‚Instead, I think the listener information is not useful to me

Internal publish

When embedding this server, can the application doing the embedding publish a message to a topic directly? If not, can that be added?

Cluster MQTT broker based on mochi-co/mqtt

I was looking for cluster ready mqtt broker for my home environment (k8s) for some time already. But unfortunately all i checked (emqx, vernemq) have some issues (e.g not compatible with Paho, or broken in home environment).

So based on mochi-co/mqtt i build my own broker - i was heavily inspired by #45

Maybe other people looking for clustering capabilities will be interested, or even better some pieces can be used inside mochi-co/mqtt!

If you think that this is not a place for this kind of communication, please close :)

https://github.com/bkupidura/broker-ha

Messages silently dropped

Hi,

First of all, I want to thank you for this great module!

We have embedded this module into our application, we noticed messages received were lower than expected. After eliminating all possibilities, we started investigate deeper into the library, we noticed the BufferSize setting is not only the default size, it's also the absolute limit (wasn't clear from the documentation) for the connection (one for read and one for write). When a single MQTT message is greater than the BufferSize, it will be blocked until the connection timeout, this also does not trigger the on(OnProcess)Message events.

We expect to handle very large number of connections, the vast majority of the message is only couple KB (but very small fraction can be many hundred KB or larger), the message frequency is once a minute. Given module will allocate 2x of the BufferSize (read and write) for each connection, we set a relatively small BufferSize. However, to handle few corner case messages, we would need to dramatically increase the BufferSize, which will substantially increase the total memory requirement.

There are few possible improvements:

  1. Reject message (immediately disconnect client) if size is >BufferSize (maybe rename?) after cl.ReadFixedHeader, trigger an event. Given the current behavior is silently deadlocked in reading and result in connection disconnect due to timeout, this has the same result but fail faster and event callback to upper layer for visibility.
  2. For long lived connections where message frequency is even once a minute, allocate/gc buffer as needed can be amortized, the benefit of using sync.Pool is diminished. The much smaller memory footprint for large concurrent connections can outweight the reduction in memory allocation/gc.
  3. Pre-allocates BufferSize, but dynamically allocates (or expand) buffer as needed.

SIGSEGV crash when running examples/tcp/main.go

I noticed this issue when i switched from my PC to my raspberry pi, trying to narrow down the cause it seems like i get the same SIGSEGV error when running the tcp example.

Running on Raspberry Pi 3 Model B Plus Rev 1.3, ARMv7 Processor rev 4 (v7l). I've confirmed that there are no issues to bind to the port.

root@housekeeper:/home/ogelami/mochi-mqtt-issue# go run main.go

Mochi MQTT Server initializing... TCP
  Started!
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1238c]

goroutine 22 [running]:
runtime/internal/atomic.goLoad64(0x1cae03c, 0x0, 0x0)
        /usr/lib/go-1.15/src/runtime/internal/atomic/atomic_arm.go:131 +0x1c
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve(0x1cae000, 0x1c9a028)
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:89 +0x28
github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve.func1(0x1cac000, 0x29db60, 0x1cae000, 0x1c9a028)
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:94 +0x70
created by github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve
        /root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:91 +0x9c
exit status 2

Broker can stall when stress testing 40 * 10000 messages

When running ./mqtt-stresser -broker tcp://localhost:1883 -num-clients=40 -num-messages=10000 the broker appears to occasionally stall, although it does not freeze, and it continues to process other messages as expected.

Testing against the latest https://github.com/fhmq/hmq we find only marginal performance difference.

This raises questions about whether there is a benefit to continue using circular buffers (which are difficult to maintain and control) now that the performance of channels has been improved significantly, or if the circular buffers mechanism should be replaced with a worker pool. This would also alleviate issues discussed in #95 and could potentially reduce the overall number of goroutines as mentioned in #80.

Discussion invited :)

Malformed packet errors on websocket listener

Hi,

When publishing packages to the websocket listener using the Paho MQTT client library (both 3.11 and 5), I am getting many malformed packet errors. If I switch from ws to tcp listener, I am not getting these errors with the exact same client code.

Screenshot 2022-12-13 at 16 53 11

Any idea what the issue could be here?

Support TLS Client Certificates?

Hi I'm looking for an embeddable MQTT broker and came across this project and it looks pretty good, but from a short look around i couldn't see that it supports using certificates to authorize mqtt clients, is that the case and if so would it be possible to add that?

It needs a way to set RequireAndVerifyClientCert on the tls.ClientAuthType on the TLS config being used by the server, and then some plugable way to manage the pool of client CA certificates.

panic when stress testing if we got the "error writing to buffer io.Writer" error

mqtt-stresser.exe -broker tcps://localhost:18883 -skip-tls-verification -num-clients 10 -num-messages 80000 -rampup-delay 1s -rampup-size 10 -global-timeout 180s -timeout 20s

2022/11/07 19:12:35 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61022: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61125: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61126: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61128: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61127: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61130: wsasend: An existing connection was forcibly closed by the remote host.
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x10ff227]

goroutine 150 [running]:
github.com/mochi-co/mqtt/server/internal/circ.(*Writer).Write(0x0, {0xc0048eaa00, 0x4e, 0xf?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/circ/writer.go:80 +0x27
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:500 +0x3fd
github.com/mochi-co/mqtt/server.(*Server).writeClient(
, , {{0x0, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:440 +0x5d
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(
, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:690 +0x634
github.com/mochi-co/mqtt/server.(*Server).processPublish(_, , {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:613 +0x698
github.com/mochi-co/mqtt/server.(*Server).processPacket(
, _, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:463 +0x13b
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc0009087e0, 0xc0001d1a58)
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:386 +0x19d
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc0001162c0, {0x11fd4b8, 0x2}, {0x1285e78, 0xc0004c0380}, {0x1283ca8?, 0x147d0d8?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:364 +0x1152
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1()
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:106 +0x3d
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:105 +0xea

Unable to publish messages with larger payload sizes

I was testing out the 1.2.0 branch of this broker, and discovered that it appears to fail silently when i try to publish messages with a larger-sized (>100k) binary payload.

Any thoughts? Thanks in advance.

Docker image

As mentioned in the readme, it'd be great to have a Docker image for this

Provide a means of changing client username

I have come across a situation where it is necessary to change the username used to authenticate the client. I am implementing a system where a user logs in using either (1) username and password (2) webtoken obtained from authentication done by the client itself. Since the username user and the webtoken user are one and the same, I require to change the client username to userId which is the same regardless of how the client authenticates.

I have so far managed to do it this way:

  1. Changing the auth interface
    Authenticate(user, password []byte) (interface{}, error)

  2. Obtaining new username and assigning to client during authentication


	// if !ac.Authenticate(pk.Username, pk.Password) {
	// 	if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
	// 		return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
	// 	}
	// 	return s.onError(cl.Info(), ErrConnectionFailed)
	// }
	username, err := ac.Authenticate(pk.Username, pk.Password)
	if err != nil {
		if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
			return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
		}
		return s.onError(cl.Info(), ErrConnectionFailed)
	}
	if username != nil {
		pk.Username = []byte(username.(string))
		cl.Identify(lid, pk, ac)
	}

Is this a feature you could consider?

Performance drop versus v1

Hi Mochi,

I love the new features of v2, but i see a significant performance drop.

could you explain the reasons ?

mqtt-stresser -broker tcp://192.168.11.52:1883 -num-clients=10 -num-messages=10000

Receiving Througput (v1)

Median: 45949 msg/sec
Median: 43708 msg/sec
Median: 37352 msg/sec
Median: 45628 msg/sec

Receiving Througput (v2)

Median: 23287 msg/sec
Median: 23744 msg/sec
Median: 27340 msg/sec
Median: 23394 msg/sec

Messages with retain only persist once

While testing #42 as a fix for #37, I found an unrelated issue with message retention and persistence. Using only mosquitto_pub/sub for testing, the first time you publish with retain, it will save to DB. (server.go: retainMessage q=1)

The second time you publish with the same topic, server.go, retainMessage is q=0 and doesn't save. Therefore, it must not be saving what it originally reads in. Not sure if this applies to other persistent data. Example:

package main

import (
	"bufio"
	"flag"
	"fmt"
	"log"
	"os"
	"time"

	mqtt "github.com/mochi-co/mqtt/server"
	"github.com/mochi-co/mqtt/server/listeners"
	"github.com/mochi-co/mqtt/server/listeners/auth"
	"github.com/mochi-co/mqtt/server/persistence/bolt"
	"go.etcd.io/bbolt"
)

func main() {

	runNumber := flag.Int("run", 1, "number of times we've run")
	flag.Parse()

	server := mqtt.New()
	tcp := listeners.NewTCP("t1", ":1883")
	server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})

	err := server.AddStore(bolt.New("mochi-test.db", &bbolt.Options{
		Timeout: 500 * time.Millisecond,
	}))
	if err != nil {
		log.Fatal(err)

	}

	server.Serve()
	fmt.Printf("Server is running\n\n")

	if *runNumber == 1 {
		fmt.Printf("Send a message to broker with retain flag:\n")
		fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I Persist\" \n")
		fmt.Printf("\nPress ENTER to exit, then re-run with -run=2 flag")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(0)
	} else if *runNumber == 2 {
		fmt.Printf("This is the second run and message has been loaded from DB\n")
		fmt.Printf("Test that you can retrieve the message by running\n")
		fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n")

		fmt.Printf("\nThen press ENTER after you've confirmed you received the message\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')

		// Second run, (s *Server) retainMessage will not save in DB
		fmt.Printf("\nPublish the same message again with:\n")
		fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I persist\" \n\n")

		fmt.Printf("When complete, hit ENTER to exit and re-run with: -run=3 flag\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(0)

	} else {
		fmt.Printf("This is our third run. You should be able to get the message\n")
		fmt.Printf("by subscribing, but it doesn't work\n")
		fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n\n")

		fmt.Printf("Press ENTER to exit. If you re-run this test, remove mochi-test.db first\n")
		bufio.NewReader(os.Stdin).ReadBytes('\n')
		os.Exit(1)

	}
}

To test:

go run main.go 
mosquitto_pub -r -t "test/persist" -m "I persist"

go run main.go -run=2
mosquitto_sub -C 1 -t "test/persist"
# then
mosquitto_pub -r -t "test/persist" -m "I persist"

go run main.go -run=3
mosquitto_sub -C 1 -t "test/persist"
# Message was not saved in DB

Some tests fail due to test timing issues when using -count=100 / 1000

As per @rkennedy's comments in #24, various tests fail intermittently and typically exhibit when running go test ./... -count=100. Backtesting indicates these failures occur at least as early as v1.0.5. The failures can occur as few as 1 in every 500 runs. This is likely to be a problem with the tests rather than a problem with the broker code.

Determine the precise cause for these intermittent failures and correct it.

After running go test ./... -count=100 several times, the known failures are as follows:

writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.01s)
    server_test.go:2096: 
                Error Trace:    server_test.go:2096
                Error:          Not equal: 
                                expected: []byte{0x3a, 0xe, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x0, 0xb, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=16) {
                                - 00000000  3a 0e 00 05 61 2f 62 2f  63 00 0b 68 65 6c 6c 6f  |:...a/b/c..hello|
                                +([]uint8) {
                                 }
                Test:           TestServerResendClientInflightBackoff
writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.00s)
    server_test.go:2105: 
                Error Trace:    server_test.go:2105
                Error:          Not equal: 
                                expected: 1
                                actual  : 2
                Test:           TestServerResendClientInflightBackoff
2022/01/30 10:16:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerWriteClient (0.01s)
    server_test.go:671: 
                Error Trace:    server_test.go:671
                Error:          Not equal: 
                                expected: []byte{0x70, 0x2, 0x0, 0xe}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=4) {
                                - 00000000  70 02 00 0e                                       |p...|
                                +([]uint8) {
                                 }
                Test:           TestServerWriteClient
2022/01/30 10:19:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerCloseClientLWT (0.01s)
    server_test.go:1756: 
                Error Trace:    server_test.go:1756
                Error:          Not equal: 
                                expected: []byte{0x30, 0xc, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
                                actual  : []byte{}
                            
                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,3 +1,2 @@
                                -([]uint8) (len=14) {
                                - 00000000  30 0c 00 05 61 2f 62 2f  63 68 65 6c 6c 6f        |0...a/b/chello|
                                +([]uint8) {
                                 }
                Test:           TestServerCloseClientLWT

Please add any others to this issue if you see them.

Show the reason for client disconnects

While debugging an application, I was at some point confounded by what looked to both the client and the server like the other had side had closed the connection. On further investigation, there were two identical clients, one a zombie process, that were both trying to connect, and each time one connects the other is disconnected.

The errors that were printed in OnDisconnect both do not include the remote address field, which revealed the presence of duplicate client IDs, these also happened to look like ordinary client-initiated disconnects, so the server logs would not reveal there to be a session takeover.

Auditing the code, I learned that both an explicit Disconnect packet could trigger disconnection, but so could EOF or a variety of errors in several code paths. The existing Client was well synchronized for this, but the original cause of stopping the connection was being lost. This adds the first cause to stop the connection. In EstablishConnection(), the root cause will be the one passed to OnDisconnect, so the server logs will reveal the root of the problem. (Since this is MQTT 3.x, there is no way for the Disconnect packet to tell the client, so it could place an explanation in its own logs.)

In addition, there would be no way for a user to configure visibility into other errors, which might less obviously be the cause of or associated with disconnection or simply provide warning flags. Proposing to add an OnError handler to capture support logging all errors.

Related to #21, since if there are duplicate clients it will be nice to say where they're coming from.

panic: runtime error: invalid memory address or nil pointer dereference

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1287e94]

goroutine 52708 [running]:
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:446 +0x94
github.com/mochi-co/mqtt/server.(*Server).writeClient(0xc000652460, 0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:413 +0x78
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(0xc000652460, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:662 +0x45e
github.com/mochi-co/mqtt/server.(*Server).processPublish(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:586 +0x258
github.com/mochi-co/mqtt/server.(*Server).processPacket(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:436 +0x118
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc000b07560, 0xc002657848, 0xc001c96970, 0xf)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:375 +0x1f5
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc000652460, 0x177de10, 0x2, 0x19857e0, 0xc000645038, 0x1965aa8, 0xc000370550, 0x0, 0x0)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:342 +0x1071
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1(0xc000370150, 0xc000492000, 0x19857e0, 0xc000645038)
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:106 +0x66
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:105 +0x95

Publish with retain doesn't retain

Publish works fine except for honoring the retain flag. Broker works fine with external publishing with retain.
Using master branch: 460f0ef

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	mqtt "github.com/mochi-co/mqtt/server"
	"github.com/mochi-co/mqtt/server/listeners"
	"github.com/mochi-co/mqtt/server/listeners/auth"
)

func main() {
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New()
	tcp := listeners.NewTCP("t1", ":1883")
	server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})

	// server.Serve doesn't block, so no need for goroutine, but for some
	// reason, we have to delay a LONG time for Serve's goroutines to start-up
	// > 1 second on a 5900X !!
	server.Serve()
	fmt.Println("\nServer is running")
	fmt.Println("Sleeping for 2 seconds")
	time.Sleep(2 * time.Second)

	// Publish with retain - doesn't work
	server.Publish("direct/publish", []byte("Published from broker"), true)
	fmt.Println("\nMessage published to \"direct/publish\" with retain")

	fmt.Println("\nTry running: mosquitto_sub -v -t \"direct/publish\"")

	<-done
}
go run main.go

# In another terminal window

# The following shows nothing
mosquitto_sub -v -t "direct/publish"

# But this works:
mosquitto_pub -t "mosquitto/publish" -m "from mosquitto_pub" -r
mosquitto_sub -v -t "mosquitto/publish"

Am I missing something?

Do not disconnect existing session for new connection auth failures

In Server.EstablishConnection() there is an authorization test, and even if the test fails it first checks whether there's a session present and disconnects the other, valid session. This could lead to an invalid connection disrupting a valid connection.

It seems unintentional. Presumably MQTT does not require a correct response for sessionPresent when authorization has failed.

Add to Authenticate more options

Hi.

Please, add to auth Authenticate interface more options.

I think Authenticate interface can be changed to next:

type AuthRequest struct {
	Remote   string
	ClientID string
	User     []byte
	Password []byte
}

type Controller interface {
	Authenticate(req AuthRequest) bool
...
}

Migrate to Github Actions

Currently we're using Travis for build checks. It would be simpler if this was done using GitHub Actions.

Howto create anonymous listener for testing

For testing I want to use an anonymous server port like in httptest:

tcp := listeners.NewTCP("t1", "")

Doing so works, but I habe no option to retrieve the port as the TCP listener is private.

Alternatively, it would be nice if the listener could be created and passed into the server on the consumer side.

Optimise struct fields for better memory alignment

Following #17 we found that various struct fields were not 8bit aligned, which resulted in panics on 32bit builds. Fixing this has highlighted that there are other structs within the code which can be optimised to have better memory alignment.

These should be refactored so that each struct uses the optimal amount of memory for better caching, and that all fields should be 8bit aligned for improved 32bit compatibility.

Topic prefix based on host name?

Is there some way I could configure something so that I can set a root topic based on the host name that devices are connecting to the broker with?

What I mean is my server is available with a wildcard domain - *.myserver.com. Clients can then connect with eg foo.myserver.com or bar.myserver.com and publish / subscribe to topics, so is there some way if a client connects to foo.myserver.com and publishes to topic 'someTopic' that the server would make the actual topic "foo/someTopic"?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.