Code Monkey home page Code Monkey logo

go-mod-messaging's Introduction

go-mod-messaging

Build Status Code Coverage Go Report Card GitHub Latest Dev Tag) GitHub Latest Stable Tag) GitHub License GitHub go.mod Go version GitHub Pull Requests GitHub Contributors GitHub Committers GitHub Commit Activity

Messaging client library for use by Go implementation of EdgeX micro services. This project contains the abstract Message Bus interface and an implementation for Redis Pub/Sub, MQTT and NATS. These interface functions connect, publish, subscribe and disconnect to/from the Message Bus. For more information see the MessageBus documentation.

What is this repository for?

  • Create new MessageClient
  • Connect to the Message Bus
  • Public messages to the Message Bus
  • Subscribe to and receives messages from the Messsage Bus
  • Disconnect from the Message Bus

Installation

  • Make sure you have modules enabled, i.e. have an initialized go.mod file
  • If your code is in your GOPATH then make sure GO111MODULE=on is set
  • Run go get github.com/edgexfoundry/go-mod-messaging/v3
    • This will add the go-mod-messaging to the go.mod file and download it into the module cache

How to Use

This library is used by Go programs for interacting with the Message Bus (i.e. redis).

The Message Bus connection information as well as which implementation to use is stored in the service's toml configuration as:

MessageBus:
  Protocol: redis
  Host: localhost
  Port: 6379
  Type: redis

Additional Configuration

Individual client abstractions allow additional configuration properties which can be provided via configuration file:

MessageBus:
  Protocol: tcp
  Host: localhost
  Port: 1883
  Type: mqtt
  Topic: events
  Optional:
    ClientId: MyClient
    Username: MyUsername
    ...

Or programmatically in the Optional field of the MessageBusConfig struct. For example,

types.MessageBusConfig{
				Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
				Optional: map[string]string{
					"ClientId":          "MyClientID",
					"Username":          "MyUser",
					"Password":          "MyPassword",
					...
				}}

NOTE
For complete details on configuration options see the MessageBus documentation

Usage

The following code snippets demonstrate how a service uses this messaging module to create a connection, send messages, and receive messages.

This code snippet shows how to connect to the abstract message bus.

var messageBus messaging.MessageClient

var err error
messageBus, err = msgFactory.NewMessageClient(types.MessageBusConfig{
    Broker:   types.HostInfo{
    Host:     Configuration.MessageBus.Host,
    Port:     Configuration.MessageBus.Port,
    Protocol: Configuration.MessageBus.Protocol,
  },
  Type: Configuration.MessageBus.Type,})

if err != nil {
  LoggingClient.Error("failed to create messaging client: " + err.Error())
}

err = messsageBus.Connect()

if err != nil {
  LoggingClient.Error("failed to connect to message bus: " + err.Error())
}

This code snippet shows how to publish a message to the abstract message bus.

...
payload, err := json.Marshal(evt)
...
msgEnvelope := types.MessageEnvelope{
  CorrelationID: evt.CorrelationId,
  Payload:       payload,
  ContentType:   clients.ContentJson,
}

err = messageBus.Publish(msgEnvelope, Configuration.MessageBus.Topic)

This code snippet shows how to subscribe to the abstract message bus.

messageBus, err := factory.NewMessageClient(types.MessageBusConfig{
    Broker:   types.HostInfo{
    Host:     Configuration.MessageBus.Host,
    Port:     Configuration.MessageBus.Port,
    Protocol: Configuration.MessageBus.Protocol,
  },
  Type: Configuration.MessageBus.Type,
})

if err != nil {
  LoggingClient.Error("failed to create messaging client: " + err.Error())
  return
}

if err := messageBus.Connect(); err != nil {
  LoggingClient.Error("failed to connect to message bus: " + err.Error())
  return
}

topics := []types.TopicChannel{
    {
      Topic:    Configuration.MessageBus.Topic,
      Messages: messages,
    },
}

err = messageBus.Subscribe(topics, messageErrors)
if err != nil {
  LoggingClient.Error("failed to subscribe for event messages: " + err.Error())
  return
}

This code snippet shows how to receive data on the message channel after you have subscribed to the bus.

...

for {
select {
  case e := <-errors:
  // handle errors
  ...
  
  case msgEnvelope := <-messages:
    LoggingClient.Info(fmt.Sprintf("Event received on message queue. Topic: %s, Correlation-id: %s ", Configuration.MessageBus.Topic, msgEnvelope.CorrelationID))
    if msgEnvelope.ContentType != clients.ContentJson {
      LoggingClient.Error(fmt.Sprintf("Incorrect content type for event message. Received: %s, Expected: %s", msgEnvelope.ContentType, clients.ContentJson))
      continue
    }
    str := string(msgEnvelope.Payload)
    event := parseEvent(str)
    if event == nil {
      continue
    }
}
...

go-mod-messaging's People

Contributors

adam-intel avatar akramtexas avatar alexcuse avatar anthonymbonafide avatar bill-mahoney avatar bnevis-i avatar brandonforster avatar cloudxxx8 avatar dependabot[bot] avatar dweomer avatar ejlee3 avatar ernestojeda avatar farshidtz avatar felixting avatar jamesrgregg avatar jinfahua avatar jinlinguan avatar jpwhitemn avatar jwagantall avatar lenny-goodell avatar michaelestrin avatar rsdmike avatar soda480 avatar tmpowers avatar tonyespy avatar tsconn23 avatar weichou1229 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-mod-messaging's Issues

Create Abstract Messaging with a zeromq implementation.

Currently the messaging functionality connecting core-data with export-distro is tightly coupled to ZeroMQ. We need to put this behind an interface and implement with the appropriate type. The initial thought was to do this in a manner commensurate with how we've integrated database providers (interface in the respective service, type in an internal package). However during the Application Services face-to-face in Boston, an additional requirement has become apparent. That is, this same capability needs to be usable by the soon to be created App Functions SDK. A discussion was had w/r/t implementing this in an externally usable package or module which would then be consumed by the App Functions SDK and edgex-go.

The package would contain a go.mod designating it as a module which can be consumed by any number of 3rd party apps/services.

Does the Messages Channel Have to Contain Pointers?

Messages chan *MessageEnvelope

Any reason other than support of legacy export-distro that this has to be pointers? Export-distro currently expects pointers to be passed through the bus, but this is needless because a message published to a bus should be immutable. Thus it's perfectly fine to pass a copy into the bus and not a mutable pointer.

If you agree, then please remove the pointer definition so I can use it in integrating CBOR into export-distro.

MQTT Implementation doesn't handle different hosts for Publish & Subscribe

The MessageBus interface supports separate PublishHost and SubscribeHost. The MQTT implementation doesn't handle this scenario properly. It only use either the PublishHost or SubscribeHost and makes one connection.

If PublishHost and SubscribeHost are different it should make separate connections for each.

SendMessage race condition when Publishing

The ZMQ SendMessage method is not thread safe. When calling this method from Publish, we must use a mutex. This manifests itself in multiple ways. First was edgexfoundry/edgex-go#1512 in Export Distro. Additionally it causes Core Data to crash either in the zmq C++ library or in the go ZMQ package as this stack trace shows

2019-07-15T18:01:40.276841834Z level=INFO ts=2019-07-15T18:01:40.276727191Z app=edgex-core-data source=event.go:240 msg="Putting event on message queue"
2019-07-15T18:01:40.276869619Z level=INFO ts=2019-07-15T18:01:40.276766473Z app=edgex-core-data source=event.go:240 msg="Putting event on message queue"
2019-07-15T18:01:40.277256157Z fatal error: unexpected signal during runtime execution
2019-07-15T18:01:40.277279205Z fatal error: unexpected signal during runtime execution
2019-07-15T18:01:40.279370588Z [signal SIGSEGV: segmentation violation code=0x1 addr=0x59 pc=0x7f462e34946a]
2019-07-15T18:01:40.279383539Z 
2019-07-15T18:01:40.279387709Z runtime stack:
2019-07-15T18:01:40.279712745Z runtime.throw(0xc8702c, 0x2a)
2019-07-15T18:01:40.279720001Z     /usr/local/go/src/runtime/panic.go:617 +0x72
2019-07-15T18:01:40.279723059Z runtime.sigpanic()
2019-07-15T18:01:40.279725844Z     /usr/local/go/src/runtime/signal_unix.go:374 +0x4a9
2019-07-15T18:01:40.279728636Z 
2019-07-15T18:01:40.279736899Z goroutine 24007 [syscall]:
2019-07-15T18:01:40.279740092Z runtime.cgocall(0xac14b0, 0xc000366ce8, 0xc000028770)
2019-07-15T18:01:40.279742826Z     /usr/local/go/src/runtime/cgocall.go:128 +0x5b fp=0xc000366cb8 sp=0xc000366c80 pc=0x40665b
2019-07-15T18:01:40.279760909Z github.com/pebbe/zmq4._C2func_zmq_send(0x7f462e17a740, 0xc000028770, 0x6, 0x2, 0x0, 0x0, 0x0)
2019-07-15T18:01:40.279776945Z     _cgo_gotypes.go:487 +0x55 fp=0xc000366ce8 sp=0xc000366cb8 pc=0x904c75
2019-07-15T18:01:40.279789559Z github.com/pebbe/zmq4.(*Socket).SendBytes.func1(0xc00044c090, 0xc000366d98, 0xc000028770, 0x6, 0x8, 0x2, 0xc000028770, 0xc00012a0b8, 0x6)
2019-07-15T18:01:40.279798832Z     /go/pkg/mod/github.com/pebbe/[email protected]/zmq4.go:998 +0x12d fp=0xc000366d48 sp=0xc000366ce8 pc=0x91018d
2019-07-15T18:01:40.279815441Z github.com/pebbe/zmq4.(*Socket).SendBytes(0xc00044c090, 0xc000028770, 0x6, 0x8, 0x2, 0x8, 0x280, 0x8)
2019-07-15T18:01:40.279823532Z     /go/pkg/mod/github.com/pebbe/[email protected]/zmq4.go:998 +0x8f fp=0xc000366dc0 sp=0xc000366d48 pc=0x90da7f
2019-07-15T18:01:40.279835711Z github.com/pebbe/zmq4.(*Socket).Send(...)
2019-07-15T18:01:40.279840333Z     /go/pkg/mod/github.com/pebbe/[email protected]/zmq4.go:982
2019-07-15T18:01:40.279854548Z github.com/pebbe/zmq4.(*Socket).sendMessage(0xc00044c090, 0x0, 0xc000366f00, 0x2, 0x2, 0x0, 0x0, 0xb3f620)
2019-07-15T18:01:40.279867252Z     /go/pkg/mod/github.com/pebbe/[email protected]/utils.go:82 +0x7b9 fp=0xc000366e98 sp=0xc000366dc0 pc=0x903689
2019-07-15T18:01:40.279975409Z github.com/pebbe/zmq4.(*Socket).SendMessage(...)
2019-07-15T18:01:40.279981557Z     /go/pkg/mod/github.com/pebbe/[email protected]/utils.go:18
2019-07-15T18:01:40.279986748Z github.com/edgexfoundry/go-mod-messaging/internal/pkg/zeromq.(*zeromqClient).Publish(0xc000250000, 0x0, 0x0, 0xc000524000, 0x24, 0xc0001c22c0, 0x15d, 0x160, 0xc759d2, 0x10, ...)

Do the following to recreate this issue:

  1. Run EdgeX using https://github.com/edgexfoundry/developer-scripts/blob/master/compose-files/docker-compose-edinburgh-no-secty-1.0.0.yml
  2. From Consul set the core-data & export-distro logging levels to TRACE so you can see the data is flowing.
  3. Using a DB tool like Robo 3T, connect to the EdgeX Mongo instance and set the autoEvents frequency to 500ms for all defined devices in the MetaData
  4. Restart device-virtual using docker restart <container id>
  5. Run device-random natively with AutoEvents frequency set to 500ms for all devices in the local configuration.toml

Within a few minutes you will see core-data crash or see issue edgexfoundry/edgex-go#1512

Add running "go fmt" to "make test" in makefile and run it

go fmt is required to be run for all PRs. To enforce this we are adding running go fmt to the test target in all makefiles. As part of doing this effort make test should be run to ensure all go files are formatted so that subsequent PRs don't have changes to unrelated files just for formatting.

Add LICENSE

Since this is an EdgeX provided library, it needs to have an LICENSE file, specifically Apache 2.0

Setting Redis Password for RedisStreams fails due to reflect on private filed being set

The redisstreams implemenation attempt to set the password in the OptionalClientConfiguration via reflection.
This fails since the password field is not publicly exposed.

type OptionalClientConfiguration struct {
	password string
}
```

Also the README says to set `Password` in the `Optional` properties which is a `map[string]string` while it uses the struct field name `password` to index into the map which is case sensitive, thus not finding the password.

Change the struct field name to `Password` fixes both of this issues.

type OptionalClientConfiguration struct {
Password string
}

Upgrade to Go 1.15.2

The following files also need to be updated:

Dockerfile.build
go.mod
Jenkinsfile (pull go reference)
README (update min go??)

Register Custom MessageClient implementations with factory

I am working on services that exploit additional binding types for the message bus (eg Amazon SQS) to bring in workloads from various legacy applications without changing. Would like to be able to register these custom types with something like

messaging.RegisterCustomType("my-custom-client-type", MyFactoryFunc)

To have them picked up by NewMessageClient. These clients would be largely experimental and I'm not sure if any will end up worth contributing back right now. But this seems like it would be a good extensibility point in general, and allow me to work on clients in isolation without messing with the SDK or go-mod-messaging which can get unwieldy.

Runtime panic when running unit-tests against go-mod-messaging module

The current code base of master branch will encounter runtime panic, as shown below, when running make test:

Moreover, we shall also consider to apply DevOps process to this module, so any new code commits could be verified.

CGO_ENABLED=1 GO111MODULE=on go test ./... -coverprofile=coverage.out ./...
?   	github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg	[no test files]
--- FAIL: TestClientCreatorTLS (0.00s)
    --- FAIL: TestClientCreatorTLS/Skip_TLS_Config_from_PEM_Block_for_non-supported_TLS_protocols (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x6cbb0d]

goroutine 32 [running]:
testing.tRunner.func1.1(0x7071c0, 0x963b00)
	/usr/local/go/src/testing/testing.go:1072 +0x30d
testing.tRunner.func1(0xc000153b00)
	/usr/local/go/src/testing/testing.go:1075 +0x41a
panic(0x7071c0, 0x963b00)
	/usr/local/go/src/runtime/panic.go:969 +0x1b9
github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/mqtt.TestClientCreatorTLS.func1(0xc000153b00)
	/home/judehung/MyDev/DevRepos/EdgeX/go-mod-messaging/internal/pkg/mqtt/client_test.go:444 +0x1ad
testing.tRunner(0xc000153b00, 0xc00004b650)
	/usr/local/go/src/testing/testing.go:1123 +0xef
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:1168 +0x2b3
FAIL	github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/mqtt	0.009s
ok  	github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/redis/streams	0.014s	coverage: 77.1% of statements
?   	github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/redis/streams/mocks	[no test files]
--- FAIL: TestPublish (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x5d78d6]

goroutine 8 [running]:
testing.tRunner.func1.1(0x60d6c0, 0x98bc50)
	/usr/local/go/src/testing/testing.go:1072 +0x30d
testing.tRunner.func1(0xc0000da300)
	/usr/local/go/src/testing/testing.go:1075 +0x41a
panic(0x60d6c0, 0x98bc50)
	/usr/local/go/src/runtime/panic.go:969 +0x1b9
sync.(*Mutex).Lock(...)
	/usr/local/go/src/sync/mutex.go:74
github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/zeromq.(*zeromqClient).bindToPort(0xc0000a3180, 0xc000018b60, 0xc, 0x0, 0x0)
	/home/judehung/MyDev/DevRepos/EdgeX/go-mod-messaging/internal/pkg/zeromq/client.go:235 +0x56
github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/zeromq.(*zeromqClient).Publish(0xc0000a3180, 0x0, 0x0, 0x646052, 0x3, 0xc000018b40, 0xa, 0xa, 0x0, 0x0, ...)
	/home/judehung/MyDev/DevRepos/EdgeX/go-mod-messaging/internal/pkg/zeromq/client.go:70 +0x268
github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/zeromq.TestPublish(0xc0000da300)
	/home/judehung/MyDev/DevRepos/EdgeX/go-mod-messaging/internal/pkg/zeromq/client_test.go:112 +0x125
testing.tRunner(0xc0000da300, 0x65a8f8)
	/usr/local/go/src/testing/testing.go:1123 +0xef
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:1168 +0x2b3
FAIL	github.com/edgexfoundry/go-mod-messaging/v2/internal/pkg/zeromq	0.006s
ok  	github.com/edgexfoundry/go-mod-messaging/v2/messaging	0.003s	coverage: 85.7% of statements
ok  	github.com/edgexfoundry/go-mod-messaging/v2/messaging/mqtt	0.003s	coverage: 100.0% of statements
ok  	github.com/edgexfoundry/go-mod-messaging/v2/messaging/redis	0.003s	coverage: 100.0% of statements
ok  	github.com/edgexfoundry/go-mod-messaging/v2/pkg/types	0.004s	coverage: 90.9% of statements
FAIL
Makefile:6: recipe for target 'test' failed
make: *** [test] Error 1

Allow MQTT MessageHandler to receive other formats of MQTT message payload

The current MQTT MessageHandler only accepts the message payload that can be unmarshaled to MessageEnvelope. However, in some scenarios, the message payload may not fit the MessageEnvelope structure. e.g. when using the MQTT message client in app-service as trigger and the SubscribeHost is AWS IoT instead of EdgeX CoreData, the message payload should have its own format (in this case, it has the AWS IoT Device Shadow document format).

`correlationId` constant value doesn't match that used by core data

The Correlation ID key was recently change from correlation-id to X-Correlation-ID in go-mod-core-contracts, which edgex-go uses when the correlation-id is saved in the Context that is passed to NewMessageEnvelope.

The value for the correlationId constant used to extract the correlation-id from the context must match the new value of X-Correlation-ID.

Cannot connect to MQTT Message bus

Below is Golang code,

package main

import (
	"context"
	"fmt"
	"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
	"github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
	"github.com/edgexfoundry/go-mod-core-contracts/models"
	"github.com/edgexfoundry/go-mod-messaging/messaging"
	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
	"log"
	"time"
)

func main() {
	var msgConfig2 = types.MessageBusConfig{
		PublishHost: types.HostInfo{
			Host:     "10.211.55.11",
			Port:     1883,
			Protocol: "tcp",
		},
		Optional:map[string]string{
			"ClientId": "0001_client_id",
		},
		Type:messaging.MQTT,
	}
	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
		log.Fatal(err)
	} else {
		if ec := msgClient.Connect(); ec != nil {
			log.Fatal(ec)
		}
		client := coredata.NewEventClient(local.New("test1"))
		var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
		var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
		var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}

		testEvent.Readings = append(testEvent.Readings, r1, r2)

		data, err := client.MarshalEvent(testEvent)
		if err != nil {
			fmt.Errorf("unexpected error MarshalEvent %v", err)
		} else {
			fmt.Println(string(data))
		}

		env := types.NewMessageEnvelope([]byte(data), context.Background())
		env.ContentType = "application/json"

		if e := msgClient.Publish(env, "application"); e != nil {
			log.Fatal(e)
		} else {
			fmt.Printf("pubToAnother successful: %s\n", data)
		}
		time.Sleep(1500 * time.Millisecond)
	}
}

Make sure that the MQTT broker is available, but it reports timeout error.

telnet 10.211.55.11 1883
Trying 10.211.55.11...
Connected to 10.211.55.11.
Escape character is '^]'.
^]
telnet> quit
Connection closed.
RockydeMBP-2:kuiper rockyjin$ xsql/test/test 
2020/03/31 15:02:04 Timeout occured while performing a 'Connect' operation: Unable to connect

Add TLS support to MQTT Client

Following the work done in #40, we need a way to allow for configuring the tls.Config struct for the underlying MQTT client so that we can secure the communication.

Allow "clean session" config option in MQTT implementation.

This is important for when an application on the subscribing end goes down, the data can be retrieved from MQTT broker. The specific use case I'm trying to handle is if the kuiper rules engine restarts, right now there is no easy way to retrieve the data missed during the shutdown period.

QoS has no effect on the behavior of MQTT msgbus

In functions Publish and Subscribe for MQTT implementation, WillQoS and WillRetained are used instead of the QoS and Retained set in the msgbus configuration. Also, WillQoS and WillRetained are not set anywhere in this module so their default values are used always.

zeromq client times out/loses connection after not sending any messages

Issue description

When using app functions SDK to subscribe to zeromq, I notice if I stop ingesting data for a few time and then try to ingest again, the client loses connection and stops receiving events (not throwing any errors or logs). It will work again only if I restart the container.

After some investigation, this can be solved by setting TCP_KEEPALIVE, TCP_KEEPALIVE_IDLE and TCP_KEEPALIVE_INTVL through http://api.zeromq.org/3-2:zmq-setsockopt

reference: https://stackoverflow.com/a/45109409

The golang dependency pebbe/zmq4 exposes the functions to set KEEPALIVE settings https://github.com/pebbe/zmq4/blob/master/socketset.go#L292-L315

Another potential solution is to keep the connection alive by sending a ping every second or so from the publisher (core-data).

Environment

  • Ubuntu 16.04
  • EdgeX Edinburgh 1.0.1
  • App functions SDK v0.2.0-dev.8
  • go-mod-core-contracts v0.1.0

Replace ZeroMQ library for a pure golang ZeroMQ library

Currently the ZeroMQ library forces EdgeX to use cgo, but there are several reasons to use a pure golang ZeroMQ library instead of cgo.

Using a pure golang ZeroMQ will remove the dependency of external libraries, resolving issues like #54, making easier to work with EdgeX, also making possible to cross compile, and several other benefits.

EdgeX is using pebbe/zmq4, which forces the use of cgo. But this library can be replaced by go-zeromq/zmq4, which is a pure golang ZeroMQ library.

Add received on Topic to MessageEnvelope

The subscribe handlers need to set the Topic in the MessageEnvelope that the message was received on.

This will allows clients, like app services, to have topic specific logic for handling the incoming messages.

The README is outdated

We should update the README and sample code. The Redis Stream should be replaced with Redis pub/sub.

RediStreams implementation doesn't support topic wildcards

In our Ireland implementation, Events are now published to the Message Bus using multi level topics like: edgex/events/<profile>/<device>/<source>
Example: edgex/events/Random-Integer/Random-Integer-Device-001/Int64

For ZMQ and MQTT clients can then subscribe using wild cards to filter what they receive.

Examples to receive all Events:
ZMQ: edgex/events
MQTT: edgex/events/#

Examples to receive just Events from the Random-Integer-Device-001 device
ZMQ: edgex/events/Random-Integer/Random-Integer-Device
MQTT: edgex/events/Random-Integer/Random-Integer-Device/#

The RedisStreams implementation of the Message Bus doesn't recognize any type of wild card in the subscribe topic.
If an Event is published to edgex/events/Random-Integer/Random-Integer-Device-001/Int64, the client has to subscribe to edgex/events/Random-Integer/Random-Integer-Device-001/Int64

Tests should run cleanly when passed the -race option

To ensure our tests (and to some extent production code) are reliable, we should be running unit tests with the -race argument in the Makefile. Currently, the tests fail when run in this mode.

Failing tests:

TestDisconnect()

This test should succeed when passed -race, and then we should add -race to the test target in our Makefile.

Improve ZeroMQ Connection w/r/t Concurrency

I am seeing an issue that I believe is related to a race condition in the ZeroMQ connection logic. If a tight enough group of readings comes through when the client is initialized, it can cause more than one goroutine to attempt to bind to the ZeroMQ port. When this happens, it can cause a condition where messages are logged as being published successfully but they never reach their destination (such as core-data publishing to export-distro). Recommend connecting to ZeroMQ be implemented in a concurrency-safe manner.

Add minimum supported Go version to go.mod

Locally, I've upgraded to Go 1.13 to test it out. The project hasn't officially rev'ed to that version yet. One of the things I notice is that when I do a make test on a module, an entry indicating go 1.13 as the minimum supported version is put into the go.mod. According to the following article, we should have started seeing this behavior with Go 1.12, but I don't recall that being the case

https://golang.org/doc/go1.12#modules

I wouldn't want this to be checked in as go 1.13 until the project moves to that version. I've tested compilation of the edgex-go services locally with the version directive set to go 1.12 and everything works as far as I can tell. Again, this directive is only indicating a minimum supported version.

I think we should do this fairly soon as we could easily get contributions from people who've upgraded to Go 1.13 and who might miss this small edit.

MQTT Client Drops Subscriptions on reconnect

We have been dealing with an unstable network connection the last few days and we are seeing services subscribing through an MQTT messagebus binding left in a zombie state. It seems like the problem is no reconnect behavior specified to the underlying paho client, so after reconnection the channels used in subscription simply go quiet.

Add Mock case to NewMessageClient

For the purposes of unit testing, recommend the NewMessageClient be extended to return a mock implementation of the MessageClient interface. This would increase our ability to unit test logic like the following from edgexfoundry/edgex-go#1185

func initMessaging(messageErrors chan error, messages chan *types.MessageEnvelope) {
	// Create the messaging client
	msgClient, err := messaging.NewMessageClient(types.MessageBusConfig{
		SubscribeHost: types.HostInfo{
			Host:     Configuration.MessageQueue.Host,
			Port:     Configuration.MessageQueue.Port,
			Protocol: Configuration.MessageQueue.Protocol,
		},
		Type: Configuration.MessageQueue.Type,
	})

	if err != nil {
		LoggingClient.Error("failed to create messaging client: " + err.Error())
		return
	}

	if err := msgClient.Connect(); err != nil {
		LoggingClient.Error("failed to connect to message bus: " + err.Error())
		return
	}

	topics := []types.TopicChannel{
		{
			Topic:    Configuration.MessageQueue.Topic,
			Messages: messages,
		},
	}

	LoggingClient.Info("Connecting to incoming message bus at: " + Configuration.MessageQueue.Uri())

	err = msgClient.Subscribe(topics, messageErrors)
	if err != nil {
		LoggingClient.Error("failed to subscribe for event messages: " + err.Error())
		return
	}

	LoggingClient.Info("Connected to inbound event messages for topic: " + Configuration.MessageQueue.Topic)
}

The above function could be refactored with the following signature wherein the supplied MessageClient is a mock initialized during initializeClient on init.go

func initMessaging(client messaging.MessageClient) (chan error, chan *types.MessageEnvelope){
...
}

Add checksum property to Message Envelope

With the CBOR work being done in edgex-go issue #1216, we need to be able to communicate the checksum to use when referencing an Event so that consumers can ack the message via core-data API call. We can add that information in the message envelope for users to use when applicable. For more details please see the comments in the edgex-go issue.

ZeroMQ and Windows

Since the ZeroMQ implementation has so many hoops to jump through and external libraries to compile, we should consider excluding ZeroMQ from all Windows based builds by default and relying on MQTT.

Refactor to move messaging folder out of /pkg

factory.go imports from /internal so it should not be under /pkg as this breaks the intent of separating /internal from /pkg.

The messaging folder simply need to be moved up to the root level.

Added New function for MessageEnvelope

There's too much explicit knowledge required for a publisher to publish with respect to creating the MessageEnvelope. Add NewMessageEnvelope something like this:

func NewMessageEnvelope(payload []byte, ctx context.Context) MessageEnvelope {
   env := msgTypes.MessageEnvelope{}
   env.CorrelationID = {get this from ctx}
   env.ContentType = {get this from ctx}
   env.Payload = payload

   return env
}

The host, port & protocol configuration are lost with MQTT SubscribeHost

I specified below info (MQTT message bus with subscribed) as in below, but it reports timeout problem.

func subEventsFromMQTT() {
	var msgConfig1 = types.MessageBusConfig{
		SubscribeHost: types.HostInfo{
			Host:     "broker.emqx.io",
			Port:     1883,
			Protocol: "tcp",
		},
		Type:messaging.MQTT,
	}

	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
		fmt.Printf("%s", err)
	} else {
		if ec := msgClient.Connect(); ec != nil {
			fmt.Printf("%s", ec)
		} else {
			//log.Infof("The connection to edgex messagebus is established successfully.")
			messages := make(chan types.MessageEnvelope)
			topics := []types.TopicChannel{{Topic: "events", Messages: messages}}
			err := make(chan error)
			if e := msgClient.Subscribe(topics, err); e != nil {
				//log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
				fmt.Printf("%s", e)
			} else {
				var count int = 0
				for {
					select {
					case e1 := <-err:
						fmt.Printf("%s", "%s\n", e1)
						return
					case env := <-messages:
						count ++
						fmt.Printf("%s\n", env.Payload)
						//if count == 1 {
						//	return
						//}
					}
				}
			}
		}
	}
}

After checking code, I found that this is caused by method CreateMQTTClientConfiguration in edgexfoundry/[email protected]/internal/pkg/mqtt/client_options.go only extract information with PublishHost type. Does it mean that currently the SDK only supports to pub message to MQTT message bus?

// CreateMQTTClientConfiguration constructs a MQTTClientConfig based on the provided MessageBusConfig.
func CreateMQTTClientConfiguration(messageBusConfig types.MessageBusConfig) (MQTTClientConfig, error) {
	brokerUrl := messageBusConfig.PublishHost.GetHostURL()
	_, err := url.Parse(brokerUrl)
	if err != nil {
		return MQTTClientConfig{}, pkg.NewBrokerURLErr(fmt.Sprintf("Failed to parse broker: %v", err))
	}

	mqttClientOptions := CreateMQTTClientOptionsWithDefaults()
	err = load(messageBusConfig.Optional, &mqttClientOptions)
	if err != nil {
		return MQTTClientConfig{}, err
	}

	return MQTTClientConfig{
		BrokerURL:         brokerUrl,
		MQTTClientOptions: mqttClientOptions,
	}, nil
}

ZeroMQ send might be interrupted by system call after Go v1.14

Starting with Go 1.14, on Unix-like systems, you will get a lot of interrupted signal calls. See the top of a package documentation for a fix.

This issue would cause event missing at runtime.

Error log example:
level=ERROR ts=2020-11-23T05:44:17.994238292Z app=edgex-core-data source=event.go:303 msg="Unable to send message for event: {...} interrupted system call"

Reference:
https://pkg.go.dev/github.com/pebbe/zmq4#section-documentation

There are two options to prevent this.

The first option is to build your program with the environment variable:
GODEBUG=asyncpreemptoff=1

The second option is to let the program retry after an interrupted system call.

Remove Attribution.txt file

The Attribution.txt file is not needed since this is a module and doesn't create a direct artifact that is published.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.