Code Monkey home page Code Monkey logo

watermill's Introduction

Watermill

CI Status Go Reference Go Report Card codecov

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Goals

  • Easy to understand.
  • Universal - event-driven architecture, messaging, stream processing, CQRS - use it for whatever you need.
  • Fast (see Benchmarks).
  • Flexible with middlewares, plugins and Pub/Sub configurations.
  • Resilient - using proven technologies and passing stress tests (see Stability).

Getting Started

Pick what you like the best or see in order:

  1. Follow the Getting Started guide.
  2. See examples below.
  3. Read the full documentation: https://watermill.io/

Our online hands-on training

Examples

Background

Building distributed and scalable services is rarely as easy as some may suggest. There is a lot of hidden knowledge that comes with writing such systems. Just like you don't need to know the whole TCP stack to create a HTTP REST server, you shouldn't need to study all of this knowledge to start with building message-driven applications.

Watermill's goal is to make communication with messages as easy to use as HTTP routers. It provides the tools needed to begin working with event-driven architecture and allows you to learn the details on the go.

At the heart of Watermill there is one simple interface:

func(*Message) ([]*Message, error)

Your handler receives a message and decides whether to publish new message(s) or return an error. What happens next is up to the middlewares you've chosen.

You can find more about our motivations in our Introducing Watermill blog post.

Pub/Subs

All publishers and subscribers have to implement an interface:

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	Close() error
}

Supported Pub/Subs:

All Pub/Subs implementation documentation can be found in the documentation.

Unofficial libraries

Can't find your favorite Pub/Sub or library integration? Check Awesome Watermill.

If you know another library or are an author of one, please add it to the list.

Contributing

Please check our contributing guide.

Stability

Watermill v1.0.0 has been released and is production-ready. The public API is stable and will not change without changing the major version.

To ensure that all Pub/Subs are stable and safe to use in production, we created a set of tests that need to pass for each of the implementations before merging to master. All tests are also executed in stress mode - that means that we are running all the tests 20x in parallel.

All tests are run with the race condition detector enabled (-race flag in tests).

For more information about debugging tests, you should check tests troubleshooting guide.

Benchmarks

Initial tools for benchmarking Pub/Subs can be found in watermill-benchmark.

All benchmarks are being done on a single 16 CPU VM instance, running one binary and dependencies in Docker Compose.

These numbers are meant to serve as a rough estimate of how fast messages can be processed by different Pub/Subs. Keep in mind that the results can be vastly different, depending on the setup and configuration (both much lower and higher).

Here's the short version for message size of 16 bytes.

Pub/Sub Publish (messages / s) Subscribe (messages / s)
GoChannel 331,882 118,943
Redis Streams 61,642 11,213
NATS Jetstream (16 Subscribers) 49,255 33,009
Kafka (one node) 44,090 108,285
SQL (MySQL) 5,599 167
SQL (PostgreSQL, batch size=1) 3,834 455
Google Cloud Pub/Sub 3,689 30,229
AMQP 2,702 13,192

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Three Dots Labs Discord.

Every bit of feedback is very welcome and appreciated. Please submit it using the survey.

Why the name?

It processes streams!

License

MIT License

watermill's People

Contributors

0michalsokolowski0 avatar 0xflotus avatar alexcuse avatar boreq avatar breml avatar checkmunza avatar czeslavo avatar dependabot[bot] avatar dkotik avatar elgohr avatar fossabot avatar jbszczepaniak avatar kochetkov-av avatar looklose avatar m110 avatar ma-hartma avatar maclav3 avatar martinforreal avatar mehran-prs avatar minghsu0107 avatar mountcount avatar roblaszczak avatar sagikazarmark avatar simonjanss avatar smixi avatar stong1994 avatar terev avatar thpk avatar unjello avatar vladtenlive avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

watermill's Issues

Clarification for reading multiple events

In the docs BookingsFinancialReport is only listening for RoomBooked however what if it needed to listen to multiple event types to form the read model.

My guess is you'd have to have a composite of struct each listening for a single event type. Wanted to avoid boilerplate but just making sure before moving forward.

SMTP-based MQ engine

I was looking for an SMTP-based MQ pub/sub capability: basically using gmail or any other SMTP provider as a "poor man's" MQ for small number of messages. Has anyone seen such a system that can be wrapped as a Watermill component?

Redis base MQ

Redis 5.0 release new data type: Stream
That's really powerful mq now.
Besides, redis below version 5, may use data type "list" to implement a mq (i think it should fit software framework that has not much third party dependencies)

[watermill-amqp] What is the mechanism of exception handling?

If my rabbmit restarts

How to reestablish channel

[watermill] 2020/03/13 08:59:33.510953 subscriber.go:166: level=INFO msg="Starting consuming from AMQP channel" amqp_exchange_name= amqp_queue_name=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f topic=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f

Better integration with opentracing

Jaeger is kind of multilanguage standard for opentracing -> https://www.jaegertracing.io/
The required header name

const MessageUUIDHeaderKey = "_watermill_message_uuid"

Is Watermill specific, but messages produced/consumed by this library can be used by anything else. So I can suggest a few things:

Why would it be cool to have it?
If, for example, two companies use Watermill and Jaeger, they have to do the same work twice. The integration is faster and more comfortable then.

MySQL Binlog subscriber

MySQL binlog subscriber may be useful for implementing tool like Debezium.

To check before implementation:

This implementation should be added in https://github.com/ThreeDotsLabs/watermill-sql/ repository. It need to be compatible with already existing SQL Subscriber.

That means, that you should use SQL publisher for publishing messages, and binlog subscriber to read them.
We would, of course, keep already existing SQL based subscriber.

Cross-languages example

@roblaszczak
The Architectural constructs are really nice.
But if someone has to use python or rust or blah blah for their "MicroService" what options are there ?

GRPC API wrapping Watermill BUS and SQL layers ?
GRPC Gateway wrapping GRPC ?
Other ideas ?

Or OUT OF SCOPE ?

Retrieve Kafka topic partitions offset

I have an API service which needs to consume a Kafka compacted topic before to be considered ready to handle any request traffic. How can I determine its readiness?

In this case, it would be best to know how many messages are still left to process before to have catched up with the latest messages on the topic. It seems there is no way to know this information currently.

I suggest the partitions offset to be retrieved at the time the consumer subscribes to a topic (from all partitions consumed). An alternative would be to include a field to messages, a flag IsLatest for example to let the consumer knows that a message was the last one at the time of retrieval (of course the offsets are keeping growing therefore it should not be considered as an absolute indication, it's time sensitive).

The best I can do as a workaround for now is to infer it. I can use the throughput and assume I've reached the end of the topic once the number of message/sec significantly dropped (during catch up phase, the consumer processes as much messages as it can, then after only as much as "live" events currently produced on the topic).

I'll be happy to know other alternatives people might have come up with. Thanks.

Add context to the message

It might be a good idea to add a context.Context to the message (just like in case of http.Request) to allow passing information through API boundaries. This could be a way to implement distributed tracing for example.

amqp message rejected if header is not string

I've got message with header that is not string, and it gets rejected by watermill:
[watermill] 2019/04/17 15:48:14.371452 subscriber.go:245: level=ERROR msg="Processing message failed, sending nack" amqp_exchange_name=x amqp_queue_name=y err="metadata x-death is not a string, but []interface {}....

x-death header is added by dead letter, and I would like to process this message again after some time in DL.

My flow looks like this:

  1. try to process message,
  2. after few NACKs message gets moved to dead letter queue not to block main queue,
  3. there's policy on DL that moves message back to main queue after some few minutes to try to process it again.

Most simple solution is to ignore headers that are not strings and not add them to Metadata - this would work me.

I'm willing to make fix/PR, but please advise if that's good solution or should I do it in some other way (maybe flattening

YugabyteDB

Have you seen yubabyte db. Its 100% open equivalent to Cockroach.
https://github.com/yugabyte/yugabyte-db

It uses a Postresql api
have redis api and cassandra api too but its all done with the underlying yugabyte engine.
Pretty cool and its basis is event driven. It called the READ ONLY instances Tablets ( cute name ).

SO i noticed you only have mysql diver at the moment.

EDIT: My mistake. Looks like you have a postresql driver happening here: https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/schema_adapter_postgresql.go
SO i think this might be perfect for using the DB as a Message queue.
yugabyte has a proper CDC engine so its possibel to listen to changes and then populate the "PUB SUB" tables as needed. A the moment i think the engine is Java based ( OMG :)).
SO i raised this issue and it looks like they are sort of going to make it work with golang: yugabyte/yugabyte-db#2513

Failing "go get -u github.com/ThreeDotsLabs/watermill" on Windows

Following fails on my Windows. It looks like watermill is having some exotic external dependencies which do not work out of the box on Windows, neither it is documented bzr is required.

This makes it hard to get a working environment on at least Windows to use watermill. Would be great if this could be simplyfied. Or maybe some of external deps to be internalized.

$ go get -u github.com/ThreeDotsLabs/watermill                                                                                        go: finding github.com/armon/go-metrics latest                                                                                        go: finding github.com/streadway/amqp latest                                                                                          go: finding github.com/pascaldekloe/goe latest                                                                                        go: finding golang.org/x/net latest
go: finding golang.org/x/sync latest
go: finding golang.org/x/oauth2 latest
go: finding github.com/eapache/go-xerial-snappy latest
go: finding golang.org/x/crypto latest
go: finding github.com/google/btree latest
go: finding golang.org/x/sys latest
go: finding golang.org/x/lint latest
go: finding google.golang.org/genproto latest
go: finding golang.org/x/build latest
go: finding github.com/shurcooL/gopherjslib latest
go: labix.org/v2/[email protected]: bzr branch --use-existing-dir https://launchpad.net/mgo/v2 . in C:\private-s tuff\go-workspace\pkg\mod\cache\vcs\ca61c737a32b1e09a0919e15375f9c2b6aa09860cc097f1333b3c3d29e040ea8: exit status 4:
    bzr: ERROR: httplib.IncompleteRead: IncompleteRead(34 bytes read)

    Traceback (most recent call last):
         File "bzrlib\commands.pyo", line 920, in exception_to_return_code
         File "bzrlib\commands.pyo", line 1131, in run_bzr
         File "bzrlib\commands.pyo", line 673, in run_argv_aliases
         File "bzrlib\commands.pyo", line 695, in run
         File "bzrlib\cleanup.pyo", line 136, in run_simple
         File "bzrlib\cleanup.pyo", line 166, in _do_with_cleanups
         File "bzrlib\builtins.pyo", line 1438, in run
         File "bzrlib\controldir.pyo", line 779, in open_tree_or_branch
         File "bzrlib\controldir.pyo", line 459, in _get_tree_branch
         File "bzrlib\bzrdir.pyo", line 1082, in open_branch
         File "bzrlib\branch.pyo", line 2375, in open
         File "bzrlib\controldir.pyo", line 687, in open
         File "bzrlib\controldir.pyo", line 716, in open_from_transport
         File "bzrlib\transport\__init__.pyo", line 1718, in do_catching_redirections
         File "bzrlib\controldir.pyo", line 704, in find_format
         File "bzrlib\controldir.pyo", line 1149, in find_format
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 235, in probe_transport
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 182, in probe_http_transport
         File "socket.pyo", line 348, in read
         File "httplib.pyo", line 522, in read
         File "httplib.pyo", line 565, in _read_chunked
    IncompleteRead: IncompleteRead(34 bytes read)

    bzr 2.5.1 on python 2.6.6 (Windows-post2008Server-6.2.9200)
    arguments: ['bzr', 'branch', '--use-existing-dir',
        'https://launchpad.net/mgo/v2', '.']
    plugins: bzrtools[2.5.0], changelog_merge[2.5.1], colo[0.4.0],
        explorer[1.2.2], fastimport[0.14.0dev], git[0.6.8], launchpad[2.5.1],
        loom[2.3.0dev], netrc_credential_store[2.5.1], news_merge[2.5.1],
        pipeline[1.4.0], qbzr[0.22.3], rewrite[0.6.4dev], svn[1.2.2],
        upload[1.2.0dev], xmloutput[0.8.8]
    encoding: 'cp1252', fsenc: 'mbcs', lang: 'en_US.UTF-8'

    *** Bazaar has encountered an internal error.  This probably indicates a
          bug in Bazaar.  You can help us fix it by filing a bug report at
              https://bugs.launchpad.net/bzr/+filebug
          including this traceback and a description of the problem.
go: finding grpc.go4.org latest
go: finding golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e
go: golang.org/x/[email protected]: unknown revision 1e06a53dbb7e
go: finding github.com/shurcooL/github_flavored_markdown latest
go: finding github.com/shurcooL/httpgzip latest
go: launchpad.net/[email protected]: bzr branch --use-existing-dir https://launchpad.net/~niemeyer/gocheck/t runk . in C:\private-stuff\go-workspace\pkg\mod\cache\vcs\f46ce2ae80d31f9b0a29099baa203e3b6d269dace4e5357a2cf74bd109e13339: exit stat us 4:
    bzr: ERROR: httplib.IncompleteRead: IncompleteRead(34 bytes read)

    Traceback (most recent call last):
         File "bzrlib\commands.pyo", line 920, in exception_to_return_code
         File "bzrlib\commands.pyo", line 1131, in run_bzr
         File "bzrlib\commands.pyo", line 673, in run_argv_aliases
         File "bzrlib\commands.pyo", line 695, in run
         File "bzrlib\cleanup.pyo", line 136, in run_simple
         File "bzrlib\cleanup.pyo", line 166, in _do_with_cleanups
         File "bzrlib\builtins.pyo", line 1438, in run
         File "bzrlib\controldir.pyo", line 779, in open_tree_or_branch
         File "bzrlib\controldir.pyo", line 459, in _get_tree_branch
         File "bzrlib\bzrdir.pyo", line 1082, in open_branch
         File "bzrlib\branch.pyo", line 2375, in open
         File "bzrlib\controldir.pyo", line 687, in open
         File "bzrlib\controldir.pyo", line 716, in open_from_transport
         File "bzrlib\transport\__init__.pyo", line 1718, in do_catching_redirections
         File "bzrlib\controldir.pyo", line 704, in find_format
         File "bzrlib\controldir.pyo", line 1149, in find_format
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 235, in probe_transport
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 182, in probe_http_transport
         File "socket.pyo", line 348, in read
         File "httplib.pyo", line 522, in read
         File "httplib.pyo", line 565, in _read_chunked
     IncompleteRead: IncompleteRead(34 bytes read)

     bzr 2.5.1 on python 2.6.6 (Windows-post2008Server-6.2.9200)
     arguments: ['bzr', 'branch', '--use-existing-dir',
        'https://launchpad.net/~niemeyer/gocheck/trunk', '.']
    plugins: bzrtools[2.5.0], changelog_merge[2.5.1], colo[0.4.0],
        explorer[1.2.2], fastimport[0.14.0dev], git[0.6.8], launchpad[2.5.1],
        loom[2.3.0dev], netrc_credential_store[2.5.1], news_merge[2.5.1],
        pipeline[1.4.0], qbzr[0.22.3], rewrite[0.6.4dev], svn[1.2.2],
        upload[1.2.0dev], xmloutput[0.8.8]
    encoding: 'cp1252', fsenc: 'mbcs', lang: 'en_US.UTF-8'

    *** Bazaar has encountered an internal error.  This probably indicates a
          bug in Bazaar.  You can help us fix it by filing a bug report at
          https://bugs.launchpad.net/bzr/+filebug
          including this traceback and a description of the problem.
go get: error loading module requirements

Integration with go cloud development toolkit pubsub

Hey guys. First of all compliments on the project! Recently i came across the pubsub abstraction in the google cloud development toolkit: https://gocloud.dev/pages/pubsub. I think it would be a great addition to your systems to integrate with this toolkit. This would prevent us writing our own implementation for AWS and Azure ourselves, and leverage the power of the CDK 🎉

Logging error when produced Messages and returned error

When error is returned in handler we are discarding produced messages.

	producedMessages, err := handler(msg)
	if err != nil {
		h.logger.Error("Handler returned error", err, nil)
		msg.Nack()
		return
	}

	if err := h.publishProducedMessages(producedMessages, msgFields); err != nil {
		h.logger.Error("Publishing produced messages failed", err, nil)
		msg.Nack()
		return
	}

We should log error in this situation.

Found data race on Close() (in Router and GoChannel). Version v1.0.0-rc.1

Hello! I found some data races (probably).
Am I doing something wrong maybe (wrong closing)?

Code:

type Server struct {
	serverPubSub *gochannel.GoChannel
	clientPubSub *gochannel.GoChannel
}
...

logger := log.NewWatermillLogAdapter(lggr)
ServerPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger)
ClientPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger)

...
defer s.Stop()

func (s *Server) Stop() {
	err := s.clientPubSub.Close()
	if err != nil {
		panic(err)
	}
	err = s.serverPubSub.Close()
	if err != nil {
		panic(err)
	}
}
...

Logs for gochannel.Gochannel.Close() (for message.Publisher)
go test -failfast -count 1 -race ./ledger/light/integration -run '^Test_BootstrapCalls$'

==================
WARNING: DATA RACE
Write at 0x00c0000fa288 by goroutine 7:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel.(*GoChannel).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel/pubsub.go:272 +0xad
  github.com/insolar/insolar/ledger/light/integration_test.(*Server).Stop()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/server_test.go:483 +0x122
  github.com/insolar/insolar/ledger/light/integration_test.Test_BootstrapCalls()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/light_test.go:64 +0x499
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163

Previous read at 0x00c0000fa288 by goroutine 81:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel.(*GoChannel).Publish()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel/pubsub.go:83 +0x61
  github.com/insolar/insolar/insolar/bus.(*Bus).sendTarget()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/bus/bus.go:261 +0xf81
  github.com/insolar/insolar/insolar/bus.(*Bus).SendRole()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/bus/bus.go:179 +0x5ba
  github.com/insolar/insolar/ledger/light/proc.(*HotObjects).sendConfirmationToHeavy()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/proc/hot_data.go:175 +0x64f
  github.com/insolar/insolar/ledger/light/proc.(*HotObjects).Proceed()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/proc/hot_data.go:157 +0x1368
  github.com/insolar/insolar/insolar/flow/internal/thread.(*Thread).procedure.func1()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/flow/internal/thread/thread.go:143 +0x56

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:916 +0x65a
  testing.runTests.func1()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1157 +0xa8
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163
  testing.runTests()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1155 +0x523
  testing.(*M).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1072 +0x2eb
  main.main()
      _testmain.go:76 +0x222

And this - data race for message.Router.Close():

		inRouter, _ := message.NewRouter(message.RouterConfig{}, logger)
		outRouter,_ := message.NewRouter(message.RouterConfig{}, logger)
...

defer s.Stop()
...

func (s *Server) Stop() {
	err := s. inRouter.Close()
	if err != nil {
		panic(err)
	}
	err = s. outRouter.Close()
	if err != nil {
		panic(err)
	}
}

Logs for message.Router.Close() (for message.Publisher)
go test -failfast -count 1 -race ./ledger/light/integration -run '^Test_BasicOperations$'

==================
WARNING: DATA RACE
Read at 0x00c00058e5a0 by goroutine 141:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:350 +0x4f
  github.com/insolar/insolar/ledger/light/integration_test.(*Server).Stop()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/server_test.go:499 +0x122
  github.com/insolar/insolar/ledger/light/integration_test.Test_BasicOperations()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/light_test.go:195 +0x4cf
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163

Previous write at 0x00c00058e5a0 by goroutine 229:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:353 +0x6c
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).closeWhenAllHandlersStopped()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:334 +0x168

Goroutine 141 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:916 +0x65a
  testing.runTests.func1()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1157 +0xa8
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163
  testing.runTests()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1155 +0x523
  testing.(*M).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1072 +0x2eb
  main.main()
      _testmain.go:76 +0x222

Middleware per handler

Currently, you are not able to use middleware per handler in "clean way" (you can still manually wrap the handler function, or have separated router).

It would be nice to be able to add middleware per handler.

As a good reference point, you can use chi router: https://github.com/go-chi/chi

Add CloudEvents integration

CloudEvents is a specification for sending events in the cloud native environment. They also provide SDKs.

It might be a good addition to the project to support CloudEvents. Although it's not stable yet, as of 0.3 the "core" is considered to be stable enough.

[watermill-kafka] Stress test fails in CI

Because of too big load, Watermill Kafka tests are failing in CI. Locally the problem also occurs, until I will not set ulimit.

They are probably multiple solutions:

  • find a way to set ulimit in CI (I tried to do it in docker-compose config, but didn't help)
  • find, if we can fix something in the code in order to limit the load
  • reduce the number of parallel tests (not preferred)

Example failed build: https://circleci.com/gh/ThreeDotsLabs/watermill-kafka/235?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link

Retry/Middlewares + CQRS busted?

Hey gang, I am opening this as a quick sanity check, I am using CQRS facade and ran into an interesting issue:

If a handler returns an error (or panics) it will retry that handler until the message is successfully processed. I tried using Retry middleware, but it does not work. The reason being, this line

returns an err (which is the err out of the handler) up the stack, instead of processing it inside of the retryLoop

simply changing it to

return nil, nil

fixes the issue.

Whats odd (but perhaps separate) is that Recoverer middleware has no effect on actual panics in CQRS.

In addition, I observed some weird behavior with backoff, and taking a look at the code I don't see how it accounts for backoff time.

I am happy to submit a PR to change that line if its wrong, but I am afraid I don't quite understand how messages are handled and it could easily break something somewhere else.

[watermill-amqp] construct subscriptions using a single AMQP connection

At present, amqp/subscriber.go establishes a new AMQP connection each time a Subscriber is instantiated. RabbitMQ recommends sharing a single connection. The AMQP connection is heavy, consuming ~ 100 KB of RAM each on the RabbitMQ side and slow to establish. (Citation.)

Ideally, Watermill's amqp.NewSubscriber method would automatically reuse a single connection, creating instead a new Channel. If this isn't feasible, please consider adding a new method that accepts an existing *amqp.Connection parameter (vs the current amqp.Config param.)

It's worth mentioning, the above might require updating reconnect logic, to manage the case of concurrent reconnect attempts by multiple subscribers sharing the same connection.

Event Sourcing Completeness

Event Sourcing Completeness

As of today, we have a fantastic infrastructure to build a complete Event
Sourced system with few components.

  • Messages
  • Pub/Sub
    • Routing
    • Middlewares

And we have few components related to Event Sourcing following the CQRS docs.

However, I would like to track the completeness of Watermill based on what you
are trying to accomplish.

Completeness could mean either it supports the feature with some level of
abstraction and/or has some documentation about it helping to implement by
themselves.

These are some notes of what I think the completeness list would be, but feel
free to send feedback about it, I am more interested in starting a discussion
and have a thread for it.

Commands

  • Command Bus: Message bus for commands.
  • Command: data structure representing a command.
  • Command Handler: handles the incoming commands.
  • Router: routes the incoming command to its command handler.
  • Validations: validates the command before dispatching the command.

Events

  • Event Bus: Message bus for events.
  • Event: data structure representing an event.
  • Event Handler: handles the incoming events.
  • Event Store: stores events.
  • Snapshot: a snapshot of the event stream.

Extra

Also, add some extra abstractions or documentation, helping people to deal with
common use cases.

  • Idempotence: refers to the ability of a system to produce the same
    outcome even if an event or message is received more than once.
  • Aggregate Id: This field is used to associate the particular event to
    a specific Aggregate Root.
  • Causation ID: the ID of the command causing an event, or the event
    causing a command dispatch.
  • Correlation ID: the ID used to correlate related commands/events.

Aggregates

  • Aggregates: is comprised of its state (Aggregate Root), public command
    functions (Command Handlers), and state mutators (Event Handlers).
  • Aggregate Root: data structure representing an aggregate.

Process Manager

  • Process Manager: is responsible for coordinating one or more
    workflows. It handles events and dispatches commands in response.

Saga

Saga: distribution of multiple workflows across multiple systems,
each providing a path (fork) of compensating actions if any of the steps
in the workflow fails.

Read Model Projection

  • Read Model Projection: read model can be built using an event handler
    and whichever storage provider you prefer.

Thoughts?

PostgreSQL LISTEN/NOTIFY Subscriber

The same as #5, but for PostgreSQL :)

This implementation should be added in https://github.com/ThreeDotsLabs/watermill-sql/ repository. It need to be compatible with already existing SQL Subscriber.

That means, that you should use SQL publisher for publishing messages, and binlog subscriber to read them.
We would, of course, keep already existing SQL based subscriber.

I'm not expert in PostgreSQL area, so any comments and ideas are welcome! 😉

It probably depends on #127.

Wildcard subscription

It would be convinient to have ability subscribe on multiple topics via wildcard character (asterisk)

Subscribe(ctx, "user.*")

Cannot install latest master

Trying to install latest master using go modules and replace:

replace github.com/ThreeDotsLabs/watermill v0.4.0 => github.com/ThreeDotsLabs/watermill v0.4.1-0.20190601181058-54fc7f5042f7

I receive the following error:

go: github.com/nats-io/[email protected]: unknown revision v0.4.5
go: error loading module requirements

Looks like the repo has been archived and there is no such revision.

reuse nats streaming connection

I think it would be a good idea to reuse nats connection if I need to run publisher and subscriber in one program.

The scenario would be as follows

I need to subscribe a queue to get command from other agent, and send reply by issuing events in another queue. So I need to setup a publisher and subscriber in one program. But I need to provide two different client-id.

HTTP Publisher

HTTP publisher implementation will work like webhooks based on produced messages.

Define interface for infrastructure component

I think it is a good idea to define an interface for different infrastructure to unify message processing logic. Also, It would help when we need to write unit test cases.

Here we defined an interface for Config.

End-user may need to get configuration from the command prompt. Also, there are a few utilities to bind command flag with config files and environment variables. Such as github.com/spf13/pflags. And there is no need to handle type conversion etc if config struct has such utilities.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.