emitter-io / go Goto Github PK
View Code? Open in Web Editor NEWGo/Golang client for emitter
License: Eclipse Public License 1.0
Go/Golang client for emitter
License: Eclipse Public License 1.0
Starting from the sample code, The p.Occupancy always shows 0, no matter how many examples I have running and subscribed at the same time. But the p.Who lists the clients that are subscribed.
package main
import (
"fmt"
"time"
emitter "github.com/emitter-io/go"
)
func main() {
// Create the options with default values
o := emitter.NewClientOptions()
// Set the message handler
o.SetOnMessageHandler(func(client emitter.Emitter, msg emitter.Message) {
fmt.Printf("Received message: %s\n", msg.Payload())
})
// Set the presence notification handler
o.SetOnPresenceHandler(func(_ emitter.Emitter, p emitter.PresenceEvent) {
fmt.Printf("Occupancy: %v\n", p.Occupancy)
fmt.Println("Who:", p.Who)
})
// Create a new emitter client and connect to the broker
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
panic("Error on Client.Connect(): " + sToken.Error().Error())
}
// Subscribe to the presence demo channel
c.Subscribe("X4-nUeHjiAygHMdN8wst82S3c2KcCMn7", "presence-demo/1")
// Publish to the channel
c.Publish("X4-nUeHjiAygHMdN8wst82S3c2KcCMn7", "presence-demo/1", "hello")
// Ask for presence
r := emitter.NewPresenceRequest()
r.Key = "X4-nUeHjiAygHMdN8wst82S3c2KcCMn7"
r.Channel = "presence-demo/1"
c.Presence(r)
// stop after 10 seconds
time.Sleep(10 * time.Second)
}
Output:
$ go run main.go
Occupancy: 0
Who: [{S25DTNEXPNLFWGXT5IEUEQK6CM } {5X46AEOWRUQSBRVEKVJQLG44YQ }]
So, the NewV1 function signature has changed. It no longer returns an id and an error. It just needs to be changed to accept that NewV1 is no longer faulty, so it can't return an error. I'm happy to submit a PR if that's okay. I keep modifying this locally, but every time I update a dependency and run dep ensure, my change gets blown away, and my app won't compile anymore. I'd much rather this just be updated. :)
I have a simple HTTP server with an endpoint (GET http://localhost:8099/keygen
) to generate a key. The handler for this endpoint invokes the GenerateKey
func from the emitter-io client.
I have bench-marked this API endpoint, and found that the invocations to GenerateKey results in a deadlock state when I attempt to maintain a set number of concurrent requests for a duration of time. The problem is easily reproducible with 50 concurrent requests over a 2 minute period. Note that the problem is not evident when running a single keygen request at a time.
Here is a sample program to reproduce the deadlock. Ensure to set appropriate values for the SECRET_KEY
, BROKER_HOST
and BROKER_PORT
constants.
Start it with the -race
flag: go run -race main.go
package main
import (
"fmt"
"io"
"log"
"net/http"
emitter "github.com/emitter-io/go/v2"
uuid "github.com/satori/go.uuid"
)
const (
SECRET_KEY = "<your_secret_key>"
BROKER_HOST = "172.16.238.16"
BROKER_PORT = "8080"
)
func main() {
// Create the client and connect to the broker
c, err := emitter.Connect(fmt.Sprintf("tcp://%s:%s", BROKER_HOST, BROKER_PORT),
func(_ *emitter.Client, msg emitter.Message) {
log.Fatalf("[emitter] -> [B] received: '%s' topic: '%s'\n", msg.Payload(), msg.Topic())
})
if err != nil {
log.Panic(err)
}
id := c.ID()
fmt.Println("[emitter] -> [B] my name is " + id)
// HTTP server
keygenHandler := func(w http.ResponseWriter, req *http.Request) {
channelId := uuid.NewV4()
var key string
key, err = c.GenerateKey(SECRET_KEY, fmt.Sprintf("channel/%s/", channelId), "r", 600)
if err != nil {
log.Panic(err)
}
fmt.Println(key)
io.WriteString(w, key)
}
http.HandleFunc("/keygen", keygenHandler)
log.Fatal(http.ListenAndServe(":8099", nil))
}
Sample output in console. The data race warning is displayed and the client stopped processing any GenerateKey
requests. You have to restart the application for the emitter to process keygen requests again once it got into this state.
[emitter] -> [B] my name is 4V6M6QLQUALAVMBFHYAB4FBUAM
Dth2fColoWPf-ev9oMrALfiR4tHq8IfJ
il0NhYJBE39DjyZEW0E5JE_UkSs1ZpjT
==================
WARNING: DATA RACE
Write at 0x00c000012d10 by goroutine 23:
main.main.func2()
/home/fedora/code/go/src/gallagher.com/emitter/main.go:39 +0x1cc
net/http.HandlerFunc.ServeHTTP()
/usr/local/go/src/net/http/server.go:2012 +0x51
net/http.(*ServeMux).ServeHTTP()
/usr/local/go/src/net/http/server.go:2387 +0x288
net/http.serverHandler.ServeHTTP()
/usr/local/go/src/net/http/server.go:2807 +0xce
net/http.(*conn).serve()
/usr/local/go/src/net/http/server.go:1895 +0x837
Previous write at 0x00c000012d10 by goroutine 17:
main.main.func2()
/home/fedora/code/go/src/gallagher.com/emitter/main.go:39 +0x1cc
net/http.HandlerFunc.ServeHTTP()
/usr/local/go/src/net/http/server.go:2012 +0x51
net/http.(*ServeMux).ServeHTTP()
/usr/local/go/src/net/http/server.go:2387 +0x288
net/http.serverHandler.ServeHTTP()
/usr/local/go/src/net/http/server.go:2807 +0xce
net/http.(*conn).serve()
/usr/local/go/src/net/http/server.go:1895 +0x837
Goroutine 23 (running) created at:
net/http.(*Server).Serve()
/usr/local/go/src/net/http/server.go:2933 +0x5b6
net/http.(*Server).ListenAndServe()
/usr/local/go/src/net/http/server.go:2830 +0x102
net/http.ListenAndServe()
/usr/local/go/src/net/http/server.go:3086 +0x394
main.main()
/home/fedora/code/go/src/gallagher.com/emitter/main.go:50 +0x39f
Goroutine 17 (running) created at:
net/http.(*Server).Serve()
/usr/local/go/src/net/http/server.go:2933 +0x5b6
net/http.(*Server).ListenAndServe()
/usr/local/go/src/net/http/server.go:2830 +0x102
net/http.ListenAndServe()
/usr/local/go/src/net/http/server.go:3086 +0x394
main.main()
/home/fedora/code/go/src/gallagher.com/emitter/main.go:50 +0x39f
==================
A6hXQ3rYsDnqumkJI6P6d7AK_d1Em9Qi
Ntz_T1oKWqJITuND_XzOedEE6zHE38nE
UzZ2LRhUI1TrabWS3zgbAAU_4EskDihq
3vPlZHTZLfGVnYqHhlxd6NT1yM6VeeYq
KuwMTDGgCNynLaV0WwGCLPxLvrG_-Mrw
L3vPQNZ_C3AkUEQUfEJasW-8_76j3tyw
4FI-mJx0CjwI-zN7Zf30F61VZvYwAyFD
919r9uI313ab3z9KOv7cnHy8FEwY1vSL
UbmcPFKhI2Ud-qK3p35qYxr8UGdQKmmm
iIkN4riDZwCkFh3uuGA0g0LDFgB20shT
kmvTzE-6M0HKiIv1Abp-KDRqQI0THnv7
BQ2uc8FOb7xIi9JHqjmn69X6EQaLFc_O
ZAdYcVfG5z9_6cU75F0XD5m2_hFy0qAp
eEiYIuOet1LoRltOC1BPJ6caE9rAUN8K
aeS3Uy-jWw-LvtBLMVats9EyS9jnj0SV
0cFPKG1W_EaWff5dWCzgT9n_OM-MgiRO
XPwNcVWJEyE-r1q41rtWcMzRIDGL674T
b8k9eXyYYJA6tK6KjZc76lic1Qu_MYfk
k3EuWAQ2aWCx7zxzjwEb9gwSUfWoOzDN
GyxFArsBXfT6yg24rmRnDI1p753qLisu
C7YrBQE4IJ8rgxAJPPRUBl-RJjEESIh8
37zUludZIaCYSzorWLSRSA5QG-Vh0Wex
LhjXcEGiO68vEcm44l8YntyMBGywcgZx
gPqq6uKs8Z4QLiMh7rNLjF7dv-mx5iJ1
I tested from and until options with my C# library but could not get them working.
Can you please confirm that these options actually work?
PS: There is a bug in WithUntil option in the go source code:
// WithUntil request messages until a point in time.
func WithUntil(until time.Time) Option {
return option("from=" + strconv.FormatInt(getUTCTimestamp(until), 10))
}
"from="
must be "until="
I want to join developers
I want to increase ๏ผ SetCleanSession and QoS=2 configuration
When I try to go get github.com/emitter-io/go
I get:
# github.com/emitter-io/go
../../go/src/github.com/emitter-io/go/options.go:35:35: multiple-value uuid.NewV1() in single-value context
Line 35 in f6f9eac
Starting from the sample code and adding:
r.Status = false
r.Changes = true
The received PresenceEvent for "changes" shows Who as being empty[] and occupancy 0. Output:
$ go run main2.go
Received message: hello
Occupancy: 0
Who: []
Occupancy: 0
Who: []
Full code:
package main
import (
"fmt"
"time"
emitter "github.com/emitter-io/go"
)
func main() {
// Create the options with default values
o := emitter.NewClientOptions()
// Set the message handler
o.SetOnMessageHandler(func(client emitter.Emitter, msg emitter.Message) {
fmt.Printf("Received message: %s\n", msg.Payload())
})
// Set the presence notification handler
o.SetOnPresenceHandler(func(_ emitter.Emitter, p emitter.PresenceEvent) {
fmt.Printf("Occupancy: %v\n", p.Occupancy)
fmt.Println("Who:", p.Who)
})
// Create a new emitter client and connect to the broker
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
panic("Error on Client.Connect(): " + sToken.Error().Error())
}
// Subscribe to the presence demo channel
c.Subscribe("X4-nUeHjiAygHMdN8wst82S3c2KcCMn7", "presence-demo/1")
// Publish to the channel
c.Publish("X4-nUeHjiAygHMdN8wst82S3c2KcCMn7", "presence-demo/1", "hello")
// Ask for presence
r := emitter.NewPresenceRequest()
r.Key = "X4-nUeHjiAygHMdN8wst82S3c2KcCMn7"
r.Channel = "presence-demo/1"
r.Status = false
r.Changes = true
c.Presence(r)
// stop after 10 seconds
time.Sleep(10 * time.Second)
}
We should give tinygo a try. Its very active and has been reasonable. sturdy in my tests.
https://github.com/tinygo-org/tinygo
It does not support 100% of the golang std though. But there is a good chance it will work.
I have not tried compiling the go client with it yet.
The simplest way is to use Docker as shown here: https://tinygo.org/getting-started/using-docker/
This would allow reuse of the go client on tiny embedded devices.
Pretty cool if we can leverage the code and logic.
Hi, if you don't set the default OnMessage, the specific message handlers to Subscribe are not called. I just wasted more time on this than I'd like to admit.
I see the fix is here, so it would be nice to get a new version tag.
Hi, I'm getting the below race condition when emitter disappears and reappears (reconnect). I'm getting it consistently when killing (SIGTERM) emitter.
It looks like the issue is in github.com/eclipse/paho.mqtt.golang
however it was potentially fixed in v1.2.0
WARNING: DATA RACE
Read at 0x00c00010cb18 by goroutine 62:
github.com/eclipse/paho%2emqtt%2egolang.keepalive()
/home/dev/go/pkg/mod/github.com/eclipse/[email protected]/ping.go:42 +0x22e
Previous write at 0x00c00010cb18 by goroutine 28:
github.com/eclipse/paho%2emqtt%2egolang.(*client).reconnect()
/home/dev/go/pkg/mod/github.com/eclipse/[email protected]/client.go:377 +0x11f8
Goroutine 62 (running) created at:
github.com/eclipse/paho%2emqtt%2egolang.(*client).reconnect()
/home/dev/go/pkg/mod/github.com/eclipse/[email protected]/client.go:374 +0x15fc
Goroutine 28 (running) created at:
github.com/eclipse/paho%2emqtt%2egolang.(*client).internalConnLost()
/home/dev/go/pkg/mod/github.com/eclipse/[email protected]/client.go:465 +0x10c
Hi, I've been trying to use emitter for 4 months now, but am constantly plagued with reconnection issues. Basically, whenever there is a network interruption, the Go client seems to notice the problem, but it fails to reconnect and doesn't panic, which leaves the client in a disconnected state.
This is happening in both the v1 and v2 code, and with both standalone emitter servers as well as clustered servers. The problem is much more common when running the emitter behind an AWS Application Load Balancer with a client TTL above 30 seconds (I suspect the AWS ALB is killing the connection more frequently in this case).
I previously found some issues in the dependency, paho.mqtt.golang, that related to autoreconnection, and so I implemented a OnConnectionLost()
handler that tries to reconnect, but it also doesn't seem to help. It seems the only way to reliably reconnect is to panic the whole thing and let my process supervisor restart my application.
Is anyone else experiencing this problem, and/or is there something I can do to gather conclusive information about the problem?
I've tried the stable version, and also the latest master
via my application's Gopkg.toml
:
[[constraint]]
branch = "master"
# version = "2.0.4"
name = "github.com/emitter-io/go"
[[override]]
branch = "master"
# version = "c606a4c5dacdea0598aed291be3120479c509e43"
name = "github.com/eclipse/paho.mqtt.golang"
Thanks!
I use FLutter, and instead of using the Emitter Dart client lib, its also possible to use the go client lib by compilng golang with gomobile.
Then its really easy to embed the gomobile lib inside Flutter. You can even use event streams.
I am raising this because its another way to get the same thing done and has some advantages:
Of course there are some bad things too:
I created a chat topic like
ga, _ := c.GenerateKey("", "xm/chat/group/a/", "rwlsp", 0)
err = c.Presence(ga, "xm/chat/group/a/", true, true)
if err != nil {
panic(err)
}
I created several client to connect emitter-server, like
err := c.Presence("" ,"xm/chat/group/a/", true, true)
if err != nil {
panic(err)
}
err = c.Subscribe("" ,"xm/chat/group/a/", nil)
if err != nil {
panic(err)
}
then 10 seonds ago , i unsubscribe it
err := c.Unsubscribe(key, channel)
if err != nil {
fmt.Println(err)
}
i cannot receive subscribe/unsubscribe but i can receive status from each client.
i want to receive every client when they subscribe or unsubscribe's message.
func getOpts() []func(*emitter.Client) {
return []func(*emitter.Client){
emitter.WithAutoReconnect(true),
emitter.WithBrokers("tcp://127.0.0.1:8788"),
emitter.WithClientID("sys"),
emitter.WithPassword("123456"),
emitter.WithUsername("sys-user"),
}
}
I create client with clientid and password,username, but when i publish message like this
c.Publish(key, channel, "hello a", emitter.WithTTL(86400*30), emitter.WithoutEcho(), emitter.WithLast(999), emitter.WithAtLeastOnce())
then one client disconnect ,when its online ,it can also receive the old messages, but i want is receive the message that the client never received before.
When I try to run the sample on the first line:
o := emitter.NewClientOptions()
I getting this error:
vendor/github.com/emitter-io/go/options.go:33:10: assignment mismatch: 2 variables but 1 values
How can I solve this?
sample:
fmt.Println("[emitter] <- [B] querying own name")
id := c.ID()
fmt.Println("[emitter] -> [B] my name is " + id)
run result:
PS C:\Go\src\emitter-go> go run .\main.go
[emitter] <- [B] querying own name
always wait for response,why?thanks
I'm happy to carry this migration out, there's a couple of breaking changes I would like to suggest for this API and it would be great if we could first switch over to go modules and tag it in this state to enable people using this version to continue using it if they wish.
there are error in at "emitter-io/go/options.go:33:10: assignment mismatch: 2 variables but 1 values"
I'm trying your mqtt server and using this client to do my tests. And just realize that I don't get any errors if I set my key or channel wrong.
func (h *Hub) Subscribe() {
sToken := h.c.Subscribe("wrong key", "wrong channel")
if sToken.Wait() && sToken.Error() != nil {
log.Fatalln("Error on Hub.Subscribe(): " + sToken.Error().Error())
}
}
This issue put me in the dark for a hour until I realize that I'm using a wrong key.
Thank you.
github.com/emitter-io/go/options.go:33:10: assignment mismatch: 2 variables but 1 values
upstream api changed, but in emitter-io/go/vendor fixed version, need change too
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.