mochi-mqtt / server Goto Github PK
View Code? Open in Web Editor NEWThe fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
License: MIT License
The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
License: MIT License
my implement code
server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
fmt.Printf("<< OnConnect client connected clientid=%s\n", cl.ID)
}
server.Events.OnDisconnect = func(cl events.Client, err error) {
fmt.Printf("<< OnDisconnect client disconnected clientid=%s, err=%v\n", cl.ID, username, err)
}
client 1 connects to the server, it prints
<< OnConnect client connected clientid=mqttx_f988b2da
client 2 use the same clientid as client 1 connets to the server, it prints
<< OnConnect client connected clientid=mqttx_f988b2da
<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established
i think when client 2 use the same clientid as client 1 connets to the server, it should print like
<< OnDisconnect client disconnected clientid=mqttx_f988b2da err=client session re-established
<< OnConnect client connected clientid=mqttx_f988b2da
it should be trigger the OnDisconnect before the OnConnect
Folks, we have a simple unit test on a cross-stacked client written in C (the most prominent one in ICT industry) please help check below logs and attachment
Ref:
https://github.com/emqx/nanomq
nanomq client built from source
D:\Demos\mqtt\examples\events>go run main.go
Mochi MQTT Server initializing... TCP
Started!
<< OnConnect client connected nanomq-d28f8d41: {FixedHeader:{Remaining:27 Type:1 Qos:0 Dup:false Retain:false} AllowClients:[] Topics:[] ReturnCodes:[] ProtocolName:[77 81 84 84] Qoss:[] Payload:[] Username:[] Password:[] WillMessage:[] ClientIdentifier:nanomq-d28f8d41 TopicName: WillTopic: PacketID:0 Keepalive:60 ReturnCode:0 ProtocolVersion:4 WillQos:0 ReservedBit:0 CleanSession:true WillFlag:false WillRetain:false UsernameFlag:false PasswordFlag:false SessionPresent:false}
<< OnDisconnect client dicconnected nanomq-d28f8d41: missing packet id
D:\Demos\gomqtt\nanomq\build\nanomq>nanomq.exe sub start -t "direct/publish"
connect_cb: connected!
disconnected
D:\Demos\mqtt\examples\events>go env
set GO111MODULE=
set GOARCH=amd64
set GOBIN=D:\Progra~1\Go\bin
set GOCACHE=C:\Users\phineas\AppData\Local\go-build
set GOENV=D:\Roaming\go\env
set GOEXE=.exe
set GOEXPERIMENT=
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOINSECURE=
set GOMODCACHE=D:\Progra~1\Go\goget\pkg\mod
set GONOPROXY=
set GONOSUMDB=
set GOOS=windows
set GOPATH=D:\Progra~1\Go\goget
set GOPRIVATE=
set GOPROXY=https://proxy.golang.org,direct
set GOROOT=D:\Progra~1\Go
set GOSUMDB=sum.golang.org
set GOTMPDIR=
set GOTOOLDIR=D:\Progra~1\Go\pkg\tool\windows_amd64
set GOVCS=
set GOVERSION=go1.18
set GCCGO=gccgo
set GOAMD64=v1
set AR=ar
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=D:\Demos\gomqtt\MQTT.js\mqtt\go.mod
set GOWORK=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fmessage-length=0 -fdebug-prefix-map=C:\Users\phineas\AppData\Local\Temp\go-build1490508260=/tmp/go-build -gno-record-gcc-switches
I tried to use OnMessage
to change the Retain
flag, but it seems the flag is evaluated before the OnMessage
callback is triggered.
Would it be possible to change that order, to allow changing the Retain
flag?
Earlier today, @muXxer identified an issue in which the broker would crash with fatal errors, caused when the primary server routine and clients attempted to read and write from the inflight messages map simultaneously.
This is a new issue as of 1.3.0, and resulted from the changes made to inflight handling - specifically, allowing the server event loop to scan and resend inflights (where previously, inflights would only be engaged during connection establishment or within the client itself).
The collision occurred between three locations:
The problem was reproducible by placing the server under heavy load with inovex/mqtt-stresser, using qos=2 for both the publisher and subscriber: mqtt-stresser -broker tcp://localhost:1883 -num-clients=200 -num-messages=100 -subscriber-qos=2 -publisher-qos=2
A patch is immediately forthcoming and will be released as v1.3.2
We are using the MQTT broker and publishing messages directly to all clients using the broker's Publish()
func.
This func adds a new publish packet to the inlineMessages.pub
buffered channel (size 1024) and the inlineClient()
loop will publish those packets to all subscribed clients.
For each subscribed client this will call client.WritePacket()
which in the end will call Write()
on the clients writer.
If a single subscribed client is too slow, the clients write buffer will fill up and the whole inlineClient()
loop will hang until this client's buffer has space again (see awaitEmpty
inside Write()
). Shortly after the inlineMessages.pub
buffered channel will fill up and further calls to Publish()
will hang.
This means a single slow client (even one using QoS 0 with no guarantees of receiving packets) can make the whole broker wait indefinitely and not deliver any more packets to any client.
A possible workaround for this could be to instead of waiting for the buffer to be freed, to just return a "client buffer full" error and skip sending the packet to this client. If the client is using QoS 1/2 the inflight message retry mechanism should try to re-deliver the message.
What do you think? I can write a PR with this changes. Or do you have a better solution to this problem?
Hi,
I just discovered this software and i liked it. Thanks.
One question:
I want to generate new messages to diferent topics based on original message, but i want to not publish the original.
Is posible to delete (not publish) a message ?
Hi @mochi-co
I think it would be interesting to have onConnect and onDisconnect event hooks, at least to log connections.
Igor
go vet
reports that lock values are being copied in clients.Inflight
and circ.Buffer
. Additionally, the signature for the buffer WriteTo
is incorrect.
# github.com/mochi-co/mqtt/server
server/server.go:224:18: assignment copies lock value to cl.Inflight: github.com/mochi-co/mqtt/server/internal/clients.Inflight
# github.com/mochi-co/mqtt/server/internal/circ
server/internal/circ/buffer.go:77:9: return copies lock value: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:18:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/reader.go:28:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:19:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/writer.go:29:3: literal copies lock value from b: github.com/mochi-co/mqtt/server/internal/circ.Buffer contains sync.RWMutex
server/internal/circ/pool_test.go:11:20: call of require.NotNil copies lock value: sync.Pool contains sync.noCopy
server/internal/circ/writer.go:34:18: method WriteTo(w io.Writer) (total int, err error) should have signature WriteTo(io.Writer) (int64, error)
The above issues should be corrected.
Func (s * Server) Publish (topic string, payload [] byte, retain bool) error method Is the v2 version not available?
Folks, we love your work, please help support MQTT 5 protocol,
Cheers,
Looks like versioning in v2.0.0 is broken.
% go get -v github.com/mochi-co/mqtt@latest
go: added github.com/mochi-co/mqtt v1.3.2
% go get -v github.com/mochi-co/[email protected]
go: github.com/mochi-co/[email protected]: invalid version: module contains a go.mod file, so module path must match major version ("github.com/mochi-co/mqtt/v2")
I start observing increased number of inflight messages which are almost never going down. In same time i didnt notice any disconnects from clients - so looks like clients just didnt received message and they will never send ACK (network issue on client side?)
I see that there is ResendClientInflight
(https://github.com/mochi-co/mqtt/blob/master/server/server.go#L878), but looks like its executed only on new client connection (send messages from persistence store to reconnecting clients?).
Is there any internal mechanism which will try to deliver inflight messages or mochi-co/mqtt
user should implement this on his own?
The events Client struct should expose the client's username to the event hook receiver functions.
Originally posted by youmisun June 11, 2022
The client information of ondisconnet is too limited, I hope to include username informationใInstead, I think the listener information is not useful to me
Congrats on the v2 release !
The go doc could look better, with the following improvements:
package
line to prevent is showing in the doc (see https://github.com/fluhus/godoc-tricks/blob/master/doc.go#L1-L5 for an example)When embedding this server, can the application doing the embedding publish a message to a topic directly? If not, can that be added?
I was looking for cluster ready mqtt broker for my home environment (k8s) for some time already. But unfortunately all i checked (emqx, vernemq) have some issues (e.g not compatible with Paho, or broken in home environment).
So based on mochi-co/mqtt
i build my own broker - i was heavily inspired by #45
Maybe other people looking for clustering capabilities will be interested, or even better some pieces can be used inside mochi-co/mqtt
!
If you think that this is not a place for this kind of communication, please close :)
Hi,
First of all, I want to thank you for this great module!
We have embedded this module into our application, we noticed messages received were lower than expected. After eliminating all possibilities, we started investigate deeper into the library, we noticed the BufferSize setting is not only the default size, it's also the absolute limit (wasn't clear from the documentation) for the connection (one for read and one for write). When a single MQTT message is greater than the BufferSize, it will be blocked until the connection timeout, this also does not trigger the on(OnProcess)Message events.
We expect to handle very large number of connections, the vast majority of the message is only couple KB (but very small fraction can be many hundred KB or larger), the message frequency is once a minute. Given module will allocate 2x of the BufferSize (read and write) for each connection, we set a relatively small BufferSize. However, to handle few corner case messages, we would need to dramatically increase the BufferSize, which will substantially increase the total memory requirement.
There are few possible improvements:
I noticed this issue when i switched from my PC to my raspberry pi, trying to narrow down the cause it seems like i get the same SIGSEGV error when running the tcp example.
Running on Raspberry Pi 3 Model B Plus Rev 1.3, ARMv7 Processor rev 4 (v7l). I've confirmed that there are no issues to bind to the port.
root@housekeeper:/home/ogelami/mochi-mqtt-issue# go run main.go
Mochi MQTT Server initializing... TCP
Started!
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1238c]
goroutine 22 [running]:
runtime/internal/atomic.goLoad64(0x1cae03c, 0x0, 0x0)
/usr/lib/go-1.15/src/runtime/internal/atomic/atomic_arm.go:131 +0x1c
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve(0x1cae000, 0x1c9a028)
/root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:89 +0x28
github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve.func1(0x1cac000, 0x29db60, 0x1cae000, 0x1c9a028)
/root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:94 +0x70
created by github.com/mochi-co/mqtt/server/listeners.(*Listeners).Serve
/root/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/listeners.go:91 +0x9c
exit status 2
When running ./mqtt-stresser -broker tcp://localhost:1883 -num-clients=40 -num-messages=10000
the broker appears to occasionally stall, although it does not freeze, and it continues to process other messages as expected.
Testing against the latest https://github.com/fhmq/hmq we find only marginal performance difference.
This raises questions about whether there is a benefit to continue using circular buffers (which are difficult to maintain and control) now that the performance of channels has been improved significantly, or if the circular buffers mechanism should be replaced with a worker pool. This would also alleviate issues discussed in #95 and could potentially reduce the overall number of goroutines as mentioned in #80.
Discussion invited :)
Hi I'm looking for an embeddable MQTT broker and came across this project and it looks pretty good, but from a short look around i couldn't see that it supports using certificates to authorize mqtt clients, is that the case and if so would it be possible to add that?
It needs a way to set RequireAndVerifyClientCert
on the tls.ClientAuthType on the TLS config being used by the server, and then some plugable way to manage the pool of client CA certificates.
mqtt-stresser.exe -broker tcps://localhost:18883 -skip-tls-verification -num-clients 10 -num-messages 80000 -rampup-delay 1s -rampup-size 10 -global-timeout 180s -timeout 20s
2022/11/07 19:12:35 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61022: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61125: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61126: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61128: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61127: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 19:20:38 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:61130: wsasend: An existing connection was forcibly closed by the remote host.
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x10ff227]
goroutine 150 [running]:
github.com/mochi-co/mqtt/server/internal/circ.(*Writer).Write(0x0, {0xc0048eaa00, 0x4e, 0xf?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/circ/writer.go:80 +0x27
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:500 +0x3fd
github.com/mochi-co/mqtt/server.(*Server).writeClient(, , {{0x0, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:440 +0x5d
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:690 +0x634
github.com/mochi-co/mqtt/server.(*Server).processPublish(_, , {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:613 +0x698
github.com/mochi-co/mqtt/server.(*Server).processPacket(, _, {{0x4c, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:463 +0x13b
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc0009087e0, 0xc0001d1a58)
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/internal/clients/clients.go:386 +0x19d
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc0001162c0, {0x11fd4b8, 0x2}, {0x1285e78, 0xc0004c0380}, {0x1283ca8?, 0x147d0d8?})
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/server.go:364 +0x1152
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1()
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:106 +0x3d
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
C:/Users/test/go/pkg/mod/github.com/mochi-co/[email protected]/server/listeners/tcp.go:105 +0xea
Ongoing discussion about current and future releases can be found on the discussions page:
I was testing out the 1.2.0 branch of this broker, and discovered that it appears to fail silently when i try to publish messages with a larger-sized (>100k) binary payload.
Any thoughts? Thanks in advance.
As mentioned in the readme, it'd be great to have a Docker image for this
I have come across a situation where it is necessary to change the username used to authenticate the client. I am implementing a system where a user logs in using either (1) username and password (2) webtoken obtained from authentication done by the client itself. Since the username
user and the webtoken
user are one and the same, I require to change the client username to userId which is the same regardless of how the client authenticates.
I have so far managed to do it this way:
Changing the auth interface
Authenticate(user, password []byte) (interface{}, error)
Obtaining new username and assigning to client during authentication
// if !ac.Authenticate(pk.Username, pk.Password) {
// if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
// return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
// }
// return s.onError(cl.Info(), ErrConnectionFailed)
// }
username, err := ac.Authenticate(pk.Username, pk.Password)
if err != nil {
if err := s.ackConnection(cl, packets.CodeConnectBadAuthValues, false); err != nil {
return s.onError(cl.Info(), fmt.Errorf("invalid connection send ack: %w", err))
}
return s.onError(cl.Info(), ErrConnectionFailed)
}
if username != nil {
pk.Username = []byte(username.(string))
cl.Identify(lid, pk, ac)
}
Is this a feature you could consider?
In onConnect event it would be interesting to be able to read client net.Conn. For example to view remoteAddr.
Hi Mochi,
I love the new features of v2, but i see a significant performance drop.
could you explain the reasons ?
mqtt-stresser -broker tcp://192.168.11.52:1883 -num-clients=10 -num-messages=10000
Median: 45949 msg/sec
Median: 43708 msg/sec
Median: 37352 msg/sec
Median: 45628 msg/sec
Median: 23287 msg/sec
Median: 23744 msg/sec
Median: 27340 msg/sec
Median: 23394 msg/sec
While testing #42 as a fix for #37, I found an unrelated issue with message retention and persistence. Using only mosquitto_pub/sub for testing, the first time you publish with retain, it will save to DB. (server.go: retainMessage q=1)
The second time you publish with the same topic, server.go, retainMessage is q=0 and doesn't save. Therefore, it must not be saving what it originally reads in. Not sure if this applies to other persistent data. Example:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"time"
mqtt "github.com/mochi-co/mqtt/server"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
"github.com/mochi-co/mqtt/server/persistence/bolt"
"go.etcd.io/bbolt"
)
func main() {
runNumber := flag.Int("run", 1, "number of times we've run")
flag.Parse()
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")
server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})
err := server.AddStore(bolt.New("mochi-test.db", &bbolt.Options{
Timeout: 500 * time.Millisecond,
}))
if err != nil {
log.Fatal(err)
}
server.Serve()
fmt.Printf("Server is running\n\n")
if *runNumber == 1 {
fmt.Printf("Send a message to broker with retain flag:\n")
fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I Persist\" \n")
fmt.Printf("\nPress ENTER to exit, then re-run with -run=2 flag")
bufio.NewReader(os.Stdin).ReadBytes('\n')
os.Exit(0)
} else if *runNumber == 2 {
fmt.Printf("This is the second run and message has been loaded from DB\n")
fmt.Printf("Test that you can retrieve the message by running\n")
fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n")
fmt.Printf("\nThen press ENTER after you've confirmed you received the message\n")
bufio.NewReader(os.Stdin).ReadBytes('\n')
// Second run, (s *Server) retainMessage will not save in DB
fmt.Printf("\nPublish the same message again with:\n")
fmt.Printf("\nmosquitto_pub -r -t \"test/persist\" -m \"I persist\" \n\n")
fmt.Printf("When complete, hit ENTER to exit and re-run with: -run=3 flag\n")
bufio.NewReader(os.Stdin).ReadBytes('\n')
os.Exit(0)
} else {
fmt.Printf("This is our third run. You should be able to get the message\n")
fmt.Printf("by subscribing, but it doesn't work\n")
fmt.Printf("\nmosquitto_sub -C 1 -t \"test/persist\"\n\n")
fmt.Printf("Press ENTER to exit. If you re-run this test, remove mochi-test.db first\n")
bufio.NewReader(os.Stdin).ReadBytes('\n')
os.Exit(1)
}
}
To test:
go run main.go
mosquitto_pub -r -t "test/persist" -m "I persist"
go run main.go -run=2
mosquitto_sub -C 1 -t "test/persist"
# then
mosquitto_pub -r -t "test/persist" -m "I persist"
go run main.go -run=3
mosquitto_sub -C 1 -t "test/persist"
# Message was not saved in DB
Hi,
How can i do authentication for each MQTT client to the user's own web server / web service ?
Thanks
As per @rkennedy's comments in #24, various tests fail intermittently and typically exhibit when running go test ./... -count=100
. Backtesting indicates these failures occur at least as early as v1.0.5. The failures can occur as few as 1 in every 500 runs. This is likely to be a problem with the tests rather than a problem with the broker code.
Determine the precise cause for these intermittent failures and correct it.
After running go test ./... -count=100
several times, the known failures are as follows:
writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.01s)
server_test.go:2096:
Error Trace: server_test.go:2096
Error: Not equal:
expected: []byte{0x3a, 0xe, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x0, 0xb, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
actual : []byte{}
Diff:
--- Expected
+++ Actual
@@ -1,3 +1,2 @@
-([]uint8) (len=16) {
- 00000000 3a 0e 00 05 61 2f 62 2f 63 00 0b 68 65 6c 6c 6f |:...a/b/c..hello|
+([]uint8) {
}
Test: TestServerResendClientInflightBackoff
writing err io: read/write on closed pipe
--- FAIL: TestServerResendClientInflightBackoff (0.00s)
server_test.go:2105:
Error Trace: server_test.go:2105
Error: Not equal:
expected: 1
actual : 2
Test: TestServerResendClientInflightBackoff
2022/01/30 10:16:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerWriteClient (0.01s)
server_test.go:671:
Error Trace: server_test.go:671
Error: Not equal:
expected: []byte{0x70, 0x2, 0x0, 0xe}
actual : []byte{}
Diff:
--- Expected
+++ Actual
@@ -1,3 +1,2 @@
-([]uint8) (len=4) {
- 00000000 70 02 00 0e |p...|
+([]uint8) {
}
Test: TestServerWriteClient
2022/01/30 10:19:33 error writing to buffer io.Writer; io: read/write on closed pipe
--- FAIL: TestServerCloseClientLWT (0.01s)
server_test.go:1756:
Error Trace: server_test.go:1756
Error: Not equal:
expected: []byte{0x30, 0xc, 0x0, 0x5, 0x61, 0x2f, 0x62, 0x2f, 0x63, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
actual : []byte{}
Diff:
--- Expected
+++ Actual
@@ -1,3 +1,2 @@
-([]uint8) (len=14) {
- 00000000 30 0c 00 05 61 2f 62 2f 63 68 65 6c 6c 6f |0...a/b/chello|
+([]uint8) {
}
Test: TestServerCloseClientLWT
Please add any others to this issue if you see them.
While debugging an application, I was at some point confounded by what looked to both the client and the server like the other had side had closed the connection. On further investigation, there were two identical clients, one a zombie process, that were both trying to connect, and each time one connects the other is disconnected.
The errors that were printed in OnDisconnect both do not include the remote address field, which revealed the presence of duplicate client IDs, these also happened to look like ordinary client-initiated disconnects, so the server logs would not reveal there to be a session takeover.
Auditing the code, I learned that both an explicit Disconnect packet could trigger disconnection, but so could EOF or a variety of errors in several code paths. The existing Client was well synchronized for this, but the original cause of stopping the connection was being lost. This adds the first cause to stop the connection. In EstablishConnection(), the root cause will be the one passed to OnDisconnect, so the server logs will reveal the root of the problem. (Since this is MQTT 3.x, there is no way for the Disconnect packet to tell the client, so it could place an explanation in its own logs.)
In addition, there would be no way for a user to configure visibility into other errors, which might less obviously be the cause of or associated with disconnection or simply provide warning flags. Proposing to add an OnError
handler to capture support logging all errors.
Related to #21, since if there are duplicate clients it will be nice to say where they're coming from.
the method processPublish,when client pub QoS2 msg,broker should set inflight,but the newest version not
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1287e94]
goroutine 52708 [running]:
github.com/mochi-co/mqtt/server/internal/clients.(*Client).WritePacket(0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:446 +0x94
github.com/mochi-co/mqtt/server.(*Server).writeClient(0xc000652460, 0xc000e63440, 0x0, 0x103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:413 +0x78
github.com/mochi-co/mqtt/server.(*Server).publishToSubscribers(0xc000652460, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:662 +0x45e
github.com/mochi-co/mqtt/server.(*Server).processPublish(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:586 +0x258
github.com/mochi-co/mqtt/server.(*Server).processPacket(0xc000652460, 0xc000b07560, 0x5c, 0x10103, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:436 +0x118
github.com/mochi-co/mqtt/server/internal/clients.(*Client).Read(0xc000b07560, 0xc002657848, 0xc001c96970, 0xf)
/app/vendor/github.com/mochi-co/mqtt/server/internal/clients/clients.go:375 +0x1f5
github.com/mochi-co/mqtt/server.(*Server).EstablishConnection(0xc000652460, 0x177de10, 0x2, 0x19857e0, 0xc000645038, 0x1965aa8, 0xc000370550, 0x0, 0x0)
/app/vendor/github.com/mochi-co/mqtt/server/server.go:342 +0x1071
github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve.func1(0xc000370150, 0xc000492000, 0x19857e0, 0xc000645038)
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:106 +0x66
created by github.com/mochi-co/mqtt/server/listeners.(*TCP).Serve
/app/vendor/github.com/mochi-co/mqtt/server/listeners/tcp.go:105 +0x95
Publish works fine except for honoring the retain flag. Broker works fine with external publishing with retain.
Using master branch: 460f0ef
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/mochi-co/mqtt/server"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
)
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")
server.AddListener(tcp, &listeners.Config{Auth: new(auth.Allow)})
// server.Serve doesn't block, so no need for goroutine, but for some
// reason, we have to delay a LONG time for Serve's goroutines to start-up
// > 1 second on a 5900X !!
server.Serve()
fmt.Println("\nServer is running")
fmt.Println("Sleeping for 2 seconds")
time.Sleep(2 * time.Second)
// Publish with retain - doesn't work
server.Publish("direct/publish", []byte("Published from broker"), true)
fmt.Println("\nMessage published to \"direct/publish\" with retain")
fmt.Println("\nTry running: mosquitto_sub -v -t \"direct/publish\"")
<-done
}
go run main.go
# In another terminal window
# The following shows nothing
mosquitto_sub -v -t "direct/publish"
# But this works:
mosquitto_pub -t "mosquitto/publish" -m "from mosquitto_pub" -r
mosquitto_sub -v -t "mosquitto/publish"
Am I missing something?
In Server.EstablishConnection()
there is an authorization test, and even if the test fails it first checks whether there's a session present and disconnects the other, valid session. This could lead to an invalid connection disrupting a valid connection.
It seems unintentional. Presumably MQTT does not require a correct response for sessionPresent
when authorization has failed.
I use Mosquitto as Broker and Go-auth plugin for Authentication and ACL manage using MySQL or MongoDB on ubuntu server. I hope you need to add this type of features. When this type of features will add in your Application i will come back from Mosquitto.
Thanks to make mqtt broker Super Fast application using GO Lang.
The benchmarks on the readme are a little out of date, it would be good to refresh them.
Hi.
Please, add to auth Authenticate interface more options.
I think Authenticate interface can be changed to next:
type AuthRequest struct {
Remote string
ClientID string
User []byte
Password []byte
}
type Controller interface {
Authenticate(req AuthRequest) bool
...
}
It would be nice to have the python paho interoperability tests run against example/paho/main.go
when a PR is merged into a v*
branch.
Currently we're using Travis for build checks. It would be simpler if this was done using GitHub Actions.
For testing I want to use an anonymous server port like in httptest
:
tcp := listeners.NewTCP("t1", "")
Doing so works, but I habe no option to retrieve the port as the TCP listener is private.
Alternatively, it would be nice if the listener could be created and passed into the server on the consumer side.
Following #17 we found that various struct fields were not 8bit aligned, which resulted in panics on 32bit builds. Fixing this has highlighted that there are other structs within the code which can be optimised to have better memory alignment.
These should be refactored so that each struct uses the optimal amount of memory for better caching, and that all fields should be 8bit aligned for improved 32bit compatibility.
Hello, I'm trying to use this example by eclipse,
https://www.eclipse.org/paho/index.php?page=clients/js/index.php
but, when I try to connect to ws server I get this error:
I'm using this example: https://github.com/mochi-co/mqtt/blob/master/examples/websocket/main.go
Thanks :c
Instead of 1.0.0, would you please re-tag them correctly?
Is there some way I could configure something so that I can set a root topic based on the host name that devices are connecting to the broker with?
What I mean is my server is available with a wildcard domain - *.myserver.com. Clients can then connect with eg foo.myserver.com or bar.myserver.com and publish / subscribe to topics, so is there some way if a client connects to foo.myserver.com and publishes to topic 'someTopic' that the server would make the actual topic "foo/someTopic"?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.