Code Monkey home page Code Monkey logo

mbassador's Introduction

build status maven central javadoc wiki


MBassador is a light-weight, high-performance event bus implementing the publish subscribe pattern. It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance.

The core of MBassador is built around a custom data structure that provides non-blocking reads and minimized lock contention for writes such that performance degradation of concurrent read/write access is minimal. Benchmarks that illustrate the advantages of this design are available in this github repository.


The code is production ready: 86% instruction coverage, 82% branch coverage with randomized and concurrently run test sets, no major bug has been reported in the last 18 month. No modifications to the core will be made without thoroughly testing the code.

Usage | Features | Installation | Wiki | Release Notes | Integrations | Credits | Contribute | License


Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do) bus = new MBassador(), mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance bus.subscribe(aListener). Start sending messages to your listeners using one of MBassador's publication methods or

As a first reference, consider this illustrative example. You might want to have a look at the collection of examples to see its features on more detail.

// Define your handlers

@Listener(references = References.Strong)
class SimpleFileListener{

    public void handle(File file){
      // do something with the file
    @Handler(delivery = Invoke.Asynchronously)
    public void expensiveOperation(File file){
      // do something with the file
    @Handler(condition = "msg.size >= 10000")
    @Enveloped(messages = {HashMap.class, LinkedList.class})
    public void handleLarge(MessageEnvelope envelope) {
       // handle objects without common super type


// somewhere else in your code

MBassador bus = new MBassador();
bus.subscribe (new SimpleFileListener()); File("/tmp/smallfile.csv")).now(); File("/tmp/bigfile.csv")).asynchronously();


Annotation driven

Annotation Function
@Handler Mark a method as message handler
@Listener Can be used to customize listener wide configuration like the used reference type
@Enveloped A message envelope can be used to pass messages of different types into a single handler
@Filter Add filtering to prevent certain messages from being published

Delivers everything, respects type hierarchy

Messages do not need to implement any interface and can be of any type. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a DeadMessage object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage.

Synchronous and asynchronous message delivery

There are two types of (a-)synchronicity when using MBassador: message dispatch and handler invocation. Message dispatch

Synchronous dispatch means that the publish method blocks until all handlers have been processed. Note: This does not necessarily imply that each handler has been invoked and received the message - due to the possibility to combine synchronous dispatch with asynchronous handlers. This is the semantics of publish(Object obj) and post(Objec obj).now()

Asynchronous dispatch means that the publish method returns immediately and the message will be dispatched in another thread (fire and forget). This is the semantics of publishAsync(Object obj) and post(Objec obj).asynchronously()

Handler invocation

Synchronous handlers are invoked sequentially and from the same thread within a running publication. Asynchronous handlers means that the actual handler invocation is pushed to a queue that is processed by a pool of worker threads.

Configurable reference types

By default, MBassador uses weak references for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and avoid memory-leaks. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just add everything to the bus, it will ignore objects without handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.

Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener(references=References.Strong). Strongly referenced listeners will stick around until explicitly unsubscribed.

Message filtering

MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler. Since version 1.2.0 Java EL expressions in @Handler are another way to define conditional message dispatch. Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message. FilteredMessage events can be handled by registering listeners that handle FilteredMessage.

Note: Since version 1.3.1 it is possible to wrap a filter in a custom annotation for reuse

    public static final class RejectAllFilter implements IMessageFilter {

        public boolean accepts(Object event,  SubscriptionContext context) {
            return false;

    public @interface RejectAll {}
    public static class FilteredMessageListener{
        // will cause republication of a FilteredEvent
        public void handleNone(Object any){


Enveloped messages

Message handlers can declare to receive an enveloped message using Enveloped. The envelope can wrap different types of messages to allow a single handler to handle multiple, unrelated message types.

Handler priorities

A handler can be associated with a priority to influence the order in which messages are delivered when multiple matching handlers exist

Custom error handling

Errors during message delivery are sent to all registered error handlers which can be added to the bus as necessary.


MBassador is designed to be extensible with custom implementations of various components like message dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different kinds of objects. A configuration object is used to customize the different configurable parts, see Features


MBassador is available from the Maven Central Repository using the following coordinates:


You can also download binary release and javadoc from the maven central repository. Of course you can always clone the repository and build from source.


There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus works. Code samples can also be found in the various test cases. Please read about the terminology used in this project to avoid confusion and misunderstanding.


There is a spring-extension available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. Another example is the Guice integration.


The initial inspiration for creating this component comes from Google Guava's event bus implementation. I liked the simplicity of its design and I trust in the code quality of google libraries. Unfortunately it uses strong references only.

Thanks to all contributors, especially

Many thanks also to ej-technologies for providing an open source license of JProfiler and Jetbrains for a license of IntelliJ IDEA

OSS used by MBassador: jUnit | maven | mockito | slf4j | Odysseus JUEL


Pick an issue from the list of open issues and start implementing. Make your PRs small and provide test code! Take a look at this issue for a good example.

Note: Due to the complexity of the data structure and synchronization code it took quite a while to get a stable core. New features will only be implemented if they do not require significant modification to the core. The primary focus of MBassador is to provide high-performance extended pub/sub.

Sample code and documentation are both very appreciated contributions. Especially integration with different frameworks is of great value. Feel free and welcome to create Wiki pages to share your code and ideas. Example: Guice integration


This project is distributed under the terms of the MIT License. See file "LICENSE" for further reference.

mbassador's People


bdavisx avatar bennidi avatar bigbear3001 avatar chriskingnet avatar cyberoblivion avatar dependabot[bot] avatar durron597 avatar kashike avatar kenfromnn avatar kolybelkin avatar leif81 avatar lennartj avatar manish364824 avatar merjadok avatar rossi1337 avatar toddcostella avatar yaronyam avatar


 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar


 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mbassador's Issues

sync/async publish/handling

A question over sync/async publish/handling.
My need is to ALWAYS publish in a 'fire and forget' way with a FIFO politics, thus guess to always use .publishAsync() method. If i got it correctly, mbassy system will manage these messages by a pool of internal threads.
What I think I don't get properly (after reading the jdoc) is the @handler politics: in particular about the delivery mode and the @synchronization option.. and how it affects either the mbassy and listener thread behavioural.. hope made my doubt clear.

Vetoing feature


Is it possible to have vetoing capability in a future release? In vetoing I mean there would be a special Handler annotation like VetoingHandler which would be invoked before Handlers are, and every VetoingHandler annotated method should return a boolean if they veto or not the handling of the event to normal Handlers. If any of the VetoingHandlers return false value than the handling of the current event stops and no Handler is invoked.

Thank you!

Use of interfaces for messages

Perhaps I am just dense or missing the documentation, but it seems like MBassador does not support interface message types. i.e.

public void handleTestMessageStrong(TestMessage message) {
   // do something

In this example, if TestMessage is an interface the handler will not be invoked. Is that correct? (It seems to be so in my testing.) If so, it might be worthwhile to clarify that in the documentation. I think class hierarchy sub-types are generally considered to include interfaces in Java.

mbassador in the GWT environment

In the GWT or SmartGWT I found two issues:

  1. The library cannot be used because there is no GWT module.
  2. If I try to include the source to be compiled in the GWT environment it wonโ€™t compile due to the reference to threading API.

My request is that one of the two solutions below to be provided:

A. A version of the library that 1) also has a GWT module for inheritance and 2) has no reference to threading API

B. A source version which includes no reference to threading API that can be included and compiled along with the application code.
A is preferred but I understand the B is less work. Either would be usable.

Just as a motivator I believe that Web applications are trending to the AJAX paradigm and also that GWT and/or SmartGWT is very popular with Java developers. As you've pointed out event driven UI applications is a great loosely coupled paradigm and helps support the MVC pattern. This is a great library that could be used with a GWT application and reduce a lot of hard-wired code.

GWT's built-in EvetBus is pretty message, complicated, hard to read.


filter usage

Hi everyobdy.

would know if I'm using filters in the correct way.
public class OutcomingMessage extends ZTCFrame implements net.engio.mbassy.listener.IMessageFilter

the purpose is to recive just the instances of classes that extends OutcomingMessage class (i need to receive just messages to let out on a stream..)
@handler(filters = {@filter(OutcomingMessage.class)})
public void outcomingMessageHandler (ZTCFrame ztcFrame){

    //need for treating the outcoming message as a ZTCFrame 

is it properly written??

and more.. what should i do with the IMessage method "accepts" ?

public class OutcomingMessage implements IMessageFilter{

public boolean accepts(Object arg0, MessageHandlerMetadata arg1) {
    // TODO Auto-generated method stub
    return false;


thank you for helping

Setting the Dispatcher thread count via the AbstractMessageBus constructor

Hi Benni,

I found the following in the code yesterday when my thread settings didn't seem to have the expected affect.

Starting at line 82 in org.mbassy.AbstractMessageBus is currently;

public AbstractMessageBus(int dispatcherThreadCount) {
    this(2, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));

but I'm guessing it should be;

public AbstractMessageBus(int dispatcherThreadCount) {
    this(dispatcherThreadCount, new ThreadPoolExecutor(5, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()));

Hope I've got this right.

David E.

Setting custom Executor Service doesn't actually work

I found this in a more realistic example, but here's my program that demonstrates that your ExecutorService (specified in the configuration) is never used. Each method is wrapped with a System.out.println statement, none of the Executor methods are called, ever. Further, you call new Thread in, and I see no code in that file that actually calls service.submit.

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;

public class MultithreadedTest {
    public static void main(String... args) {
        BusConfiguration config = BusConfiguration.Default();
        ExecutorService defaultService = config.getExecutor();
        config.setExecutor(new WrappedExecutorService(defaultService));

        MBassador<Object> bus = new MBassador<>(config);
        bus.publishAsync("Nothing gets printed to the console, ever!");

    public static class WrappedExecutorService implements ExecutorService {
        private final ExecutorService wrappedService;

        public WrappedExecutorService(ExecutorService wrappedService) {
            this.wrappedService = wrappedService;

        public void execute(Runnable command) {
            System.out.println("Calling execute: " + command);

        public void shutdown() {
            System.out.println("Calling shutdown.");

        public List<Runnable> shutdownNow() {
            System.out.println("Calling shutdownNow.");
            return wrappedService.shutdownNow();

        public boolean isShutdown() {
            System.out.println("Calling isShutdown.");
            return wrappedService.isShutdown();

        public boolean isTerminated() {
            System.out.println("Calling isTerminated.");
            return wrappedService.isTerminated();

        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            System.out.println("Calling awaitTermination: " + timeout + " " + unit);
            return wrappedService.awaitTermination(timeout, unit);

        public <T> Future<T> submit(Callable<T> task) {
            System.out.println("Calling submit: " + task);
            return wrappedService.submit(task);

        public <T> Future<T> submit(Runnable task, T result) {
            System.out.println("Calling submit: " + task + " " + result);
            return wrappedService.submit(task, result);

        public Future<?> submit(Runnable task) {
            System.out.println("Calling submit: " + task);
            return wrappedService.submit(task);

        public <T> List<Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            System.out.println("Calling invokeAll: " + tasks);
            return wrappedService.invokeAll(tasks);

        public <T> List<Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks, long timeout,
                TimeUnit unit) throws InterruptedException {
            System.out.println("Calling invokeAll: " + tasks + " " + timeout + " " + unit);
            return wrappedService.invokeAll(tasks, timeout, unit);

        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
            System.out.println("Calling invokeAny: " + tasks);
            return wrappedService.invokeAny(tasks);

        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            System.out.println("Calling invokeAny: " + tasks + " " + timeout + " " + unit);
            return wrappedService.invokeAny(tasks, timeout, unit);

Sample application never stops

I created the following sample application in order to test how Mbassador works and found that it never stops, even after all application's threads exit.

Applications source code is below.

Event class:

package org.mbassy.tests;

public class Event {

    private int eventId;
    private String eventName;

    public Event(final int eventId, final String eventName) {
        this.eventId = eventId;
        this.eventName = eventName;

    public void setEventId(final int eventId) {
        this.eventId = eventId;

    public int getEventId() {
        return eventId;

    public void setEventName(final String eventName) {
        this.eventName = eventName;

    public String getEventName() {
        return eventName;

    public String toString() {
        return "Event [ID = " + eventId + ", Name = " + eventName + "]";


Another event class:

package org.mbassy.tests;

public class MoreSpecificEvent extends Event {

    public MoreSpecificEvent(final int eventId, final String eventName) {
        super(eventId, eventName);

    public String toString() {
        return "MoreSpecificEvent [" + super.toString() + "]";


Event handler:

package org.mbassy.tests;

import java.util.concurrent.atomic.AtomicInteger;

import org.mbassy.listener.Listener;
import org.mbassy.listener.Mode;

public class EventHandler {

     * I am going to count this class methods calls.
    private final AtomicInteger counter = new AtomicInteger(0);

     * Event handler need to implement equals and hashCode method since they stored in
     * WeakHashMap inside the org.mbassy.common.ConcurrentSet<T> class.
     * So, I need to have add at least one member variable.
    private final int uid;

    public EventHandler(final int uid) {
        this.uid = uid;

    @Listener(dispatch = Mode.Asynchronous)
    public void handleEvent(final Event event) {
        System.out.println(counter.incrementAndGet() + " handlings. Current event: " + event.toString());

    @Listener(dispatch = Mode.Asynchronous)
    public void handleEvent(final MoreSpecificEvent event) {
        System.out.println(counter.incrementAndGet() + " handlings. Current event: " + event.toString());

    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + uid;
        return result;

    public boolean equals(final Object obj) {
        if (this == obj) {
            return true;
        if (obj == null) {
            return false;
        if (getClass() != obj.getClass()) {
            return false;
        final EventHandler other = (EventHandler) obj;
        if (uid != other.uid) {
            return false;
        return true;


Main class:

package org.mbassy.tests;

import org.mbassy.BusConfiguration;
import org.mbassy.MBassador;

public class Launcher {

    public static void main(final String[] args) throws InterruptedException {
        final MBassador<Event> eventBus = new MBassador<Event>(new BusConfiguration());

        // registering
        eventBus.subscribe(new EventHandler(1));

        // publishing
        for (int i = 0; i < 10; i++) {
            Event myEvent;
            if (i % 2 == 0) {
                myEvent = new Event(i, "Event ID: " + i);
            else {
                myEvent = new MoreSpecificEvent(i, "Event ID: " + i);

        System.out.println("Launcher finished its job.");


All works just fine except the fact that this application never stops.

I put a breakpoint at System.out.println("Launcher finished its job.") line and inspected running threads. They are:

EventBus Launcher [Java Application]    
    org.mbassy.tests.Launcher at localhost:4227 
        Thread [main] (Suspended (breakpoint at line 26 in Launcher))   
        Daemon Thread [Thread-0] (Running)  
        Daemon Thread [Thread-1] (Running)  
        Thread [pool-1-thread-1] (Running)  
        Thread [pool-1-thread-2] (Running)  
        Thread [pool-1-thread-3] (Running)  
        Thread [pool-1-thread-4] (Running)  
        Thread [pool-1-thread-5] (Running)  
    C:\DEV\Programs\Java\jdk1.6.0_20\bin\javaw.exe (12.12.2012 16:18:32)    

Looks like we have:

  • the main thread, suspended at breakpoint;
  • two daemon threads, which corresponds to message dispatchers
  • and five threads belonging to the ThreadPoolExecutor object which was created in the org.mbassy.BusConfiguration class constructor.

After inspecting all this information I just resumed the main thread and found that it finished its job and disappeared from running treads list:

EventBus Launcher [Java Application]    
    org.mbassy.tests.Launcher at localhost:4227 
        Daemon Thread [Thread-0] (Running)  
        Daemon Thread [Thread-1] (Running)  
        Thread [pool-1-thread-1] (Running)  
        Thread [pool-1-thread-2] (Running)  
        Thread [pool-1-thread-3] (Running)  
        Thread [pool-1-thread-4] (Running)  
        Thread [pool-1-thread-5] (Running)  
        Thread [DestroyJavaVM] (Running)    
    C:\DEV\Programs\Java\jdk1.6.0_20\bin\javaw.exe (12.12.2012 16:18:32)    

Also we can see new thread called 'DestroyJavaVM' in this stack.
Application remains in this state forever, and the only way to stop it is to terminate the VM.

I found only one way to avoid it: get the ExecutorService object by calling the org.mbassy.MBassador.getExecutor() method and call the shutdown() on it:

Main class:

public class Launcher {

    public static void main(final String[] args) throws InterruptedException {

        ... create MBassador instance, register, publish ...

        Thread.sleep(1000); // just to make sure that all events are processed
        ((ExecutorService) eventBus.getExecutor()).shutdown(); // force shutdown
        System.out.println("Launcher finished its job.");


Some suggestions abount BusConfiguration and @Handler.


net.engio.mbassy.bus.BusConfiguration has only one default constructor:

    public BusConfiguration() {
        this.numberOfMessageDispatchers = 2;
        this.maximumNumberOfPendingMessages = Integer.MAX_VALUE;
        this.executor = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), DaemonThreadFactory);

so, when i construct it and i want to set it's executor to another executor, i have to first get the default one and shutdown it. the code is tedious:

        BusConfiguration eventBusConfig = new BusConfiguration();
        // used in synchronous environment only!

        ExecutorService defaultExecutor = eventBusConfig.getExecutor();
        try {
            defaultExecutor.awaitTermination(1, TimeUnit.MINUTES);
        catch (InterruptedException e) {
            logger.error("Fail to closed default executor service!", e);
            // ignored
        eventBusConfig.setExecutor(new ThreadPoolExecutor(1, 1,
                                                          0L, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<Runnable>(), new ThreadFactory()
            public Thread newThread(Runnable r) {
                Thread thread = Executors.defaultThreadFactory().newThread(r);
                return thread;
        eventBusConfig.setMetadataReader(new SubscribeConfigChangedMedataReader());
        this.eventBus = new MBassador<>(eventBusConfig);

may be it need more suitable design, for example: move the code in constructor to Default() method:

 public static BusConfiguration Default() {
      BusConfiguration defaultConfig =  new BusConfiguration();
      // initialize default value
     return defaultConfig;


the test case: src\test\java\org\mbassy\ has a method called testIteratorCleanup():

    public void testIteratorCleanup(){
        final HashSet<Object> persistingCandidates = new HashSet<Object>();
        final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
        Random rand = new Random();

        for (int i = 0; i < numberOfElements; i++) {
            Object candidate = new Object();

            if (rand.nextInt() % 3 == 0) {

        // this will remove all objects that have not been inserted into the set of persisting candidates

        ConcurrentExecutor.runConcurrent(new Runnable() {
            public void run() {
                for (Object testObject : testSet) {
                    // do nothing
                    // just iterate to trigger automatic clean up
        }, numberOfThreads);

        assertEquals(persistingCandidates.size(), testSet.size());
        for (Object test : testSet) {

in my machine(windows 8 x64 + jdk 1.7_10), this test case will fail due to GC threads is running slowly. i have to add some time-pause to get around it.

    public void testIteratorCleanup() throws Exception{
        final HashSet<Object> persistingCandidates = new HashSet<Object>();
        final ConcurrentSet<Object> testSet = new ConcurrentSet<Object>();
        Random rand = new Random();

        for (int i = 0; i < numberOfElements; i++) {
            Object candidate = new Object();

            if (rand.nextInt() % 3 == 0) {

        // this will remove all objects that have not been inserted into the set of persisting candidates
        // add this to prevent some case that GC Threads are processed too slow.

        ConcurrentExecutor.runConcurrent(new Runnable() {
            public void run() {
                for (Object testObject : testSet) {
                    // do nothing
                    // just iterate to trigger automatic clean up
        }, numberOfThreads);

        assertEquals(persistingCandidates.size(), testSet.size());
        for (Object test : testSet) {


i think it may be a good idea to add Custome annotation that can subscribe the event. for example, change @handle's defination, let it can annotate to another Custome annotation that can subscribe event:

@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.METHOD,ElementType.ANNOTATION_TYPE})
public @interface Handler {}

then, i can define my custome event handle annotation:

@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.METHOD})
public @interface ConfigChangedEventHandle

Change Request

Thanks for the great software.

Is there a way to report to the publisher (the entity that calls publish()) the error that was thrown from the handler. Currently, I can register an IPublicationErrorHandler but this approach has couple limitation as below:

  1. I can add an error handler but cannot remove one.
  2. there is no way to register a per thread error handler. So, if my publisher is multithreaded e.g., a singleton, then I cannot register multiple error handlers to handle errors for each invoking thread separately.

My usecase is quite straightforward. I have a singleton which publishes synchronous events via an Mbassador object, and I want to take some action based on whether an error has occurred. Since each publication is occurring in its own thread, I need to handle each thread's error in isolation/separately.

Simply adding a version of publish that takes an IPublicationErrorHandler as extra parameter will meet my requirement. The MBassador implementation will then call handle error on all registered error handlers (as before) as well as on my passed/adhoc error handler


How To Use MBassador?

I have write some class to test MBassador. But I found the main thread is hand up. Unless i use system.exists(0)..Did i miss something?


public class PersonNewEvent {

private String name="cool name";
private int age = 20;
 * @return the name
public String getName() {
    return name;
 * @return the age
public int getAge() {
    return age;

/* (non-Javadoc)
 * @see java.lang.Object#toString()
public String toString() {
    StringBuilder builder = new StringBuilder();
    builder.append("PersonNewEvent [name=").append(name).append(", age=")
    return builder.toString();


public class PersonEventHandler {

public void handlePersonNewEvent(PersonNewEvent event) {
    System.out.println("handle PersonNewEvent" + event.toString());

public void handlePersonNewEvent2(PersonNewEvent event) {
    System.out.println("handle PersonNewEvent 2" + event.toString());

--------------------The Test Class-------------------
public class TestEvent {
public static void main(String[] args) {
MBassador bus = new MBassador(new BusConfiguration());
PersonEventHandler listener = new PersonEventHandler();
//the listener will be registered using a weak-reference
// objects without handlers will be ignored
//bus.subscribe(new ClassWithoutAnyDefinedHandlers());
PersonNewEvent event = new PersonNewEvent();

AbstractMessageBus.addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) method improvement

I just noticed that introduction of the MaximumNumberOfPendingMessages option was a little bit incomplete.

The AbstractMessageBus.addAsynchronousDeliveryRequest(SubscriptionDeliveryRequest<T> request) method internally calls the LinkedBlockingQueue.offer(E e) method.

According to Javadoc, the offer(E e) method does not make the running thread wait for space to become available. It simply return false when the queue's capacity is exceeded.

So, several published events may be lost when queue size is small and there is no way to detect it.

I suggest the following improvement:

  1. The MBassador.publishAsync(T message) should return true or false, just like the LinkedBlockingQueue.offer(E e) method. In this case the method's caller will be able to determine whether the message was published or not.
  2. Add new method boolean MBassador.publishAsync(T message, long timeout, TimeUnit unit) throws InterruptedException which should internally call the LinkedBlockingQueue.offer(E e, long timeout, TimeUnit unit) method (see Javadoc).

Dead message handler doesn't work correctly with unsubscribe

The dead message handler doesn't work correctly after the last bus unsubscribes. Here is a test case to demonstrate. I am using 1.1.7.

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.common.DeadMessage;
import net.engio.mbassy.listener.Handler;

import static org.junit.Assert.*;
import org.junit.Test;

public class BusTest {
    public void testBus() {
        // Some strings to play with
        String first = "Hello World!";
        String second = "Another String!";

        // Make a bus
        MBassador<Object> bus = new MBassador<>(BusConfiguration.Default());

        // Make some stuff to handle messages
        MsgHandler handler = new MsgHandler();
        DeadHandler deadHandler = new DeadHandler();

        // Only add the dead message handler

        // The message should be caught, as it's the only listener

        // Clear deadmessage for future tests
        deadHandler.deadMessage = null;

        // Add the real handler, and publish the message again

        // Verify we got it
        assertEquals(first, handler.theString);
        // And that the dead handler didn't get it.

        // Unsubscribe, publish a different string

        // We should still be the first message, because we unsubscribed
        assertEquals(first, handler.theString);

        // One message while the handler was subscribed, two while it wasn't.
        assertEquals(1, handler.counter);
        assertEquals(2, deadHandler.counter); // This fails!

        // This should be caught again, as there are no listeners.
        assertNotNull(deadHandler.deadMessage); // This fails!

    public static class MsgHandler {
        public String theString;

        public int counter = 0;

        public void handleMessage(String theMessage) {
            theString = theMessage;

    public static class DeadHandler {
        public DeadMessage deadMessage;
        public int counter = 0;

        public void handleDead(DeadMessage message) {
            deadMessage = message;

Messages pooling

Is it possible to pool created messages? I mean, that I can simply obtain new messages/events from my own pool, but I can't find a proper moment to free those objects (put them back into the pool).

Possible bug in the org.mbassy.listener.MetadataReader.isHandler(Method m) method

This method have the following condition: annotation.equals(Listener.class) i.e. java.lang.annotation.Annotation.equals(Class). So, the objects being compared are unlikely to be members of the same class at runtime.

Normally, such conditions looks like annotation.annotationType().equals(Listener.class).
Could you please check that everything is OK with this method?

No way to shutdown an MBassador instance

First of all let me say how impressed I am at the ease of use of MBassador. Thanks for all the great work!

Now onto my issue.

I am integrating the message bus into a Tomcat application. I am storing the bus as a singleton in the ServletContext and accessing it from there.

I have a ServletContextListener that creates an MBassador instance and starts it.

This all works rather well.

Now, when I shutdown my application the Threads that MBassador uses as dispatchers are not shut down.

After a little investigation I found that these threads are not accessible in any way so I cannot kill them. I found that MBassador has a "finalize()" method that call a private "shutdown()" method that stops these threads.

The only solution I could come up with for stopping these threads when the application is stopped was to subclass MBassador and create a method that call "finalize()"

private static final class StoppableMBassador<T> extends MBassador<T> {

        public StoppableMBassador(BusConfiguration configuration) {

        public void stop() {
            try {
            } catch (Throwable ex) {
                throw new RuntimeException(ex);
            try {
                ((ExecutorService) getExecutor()).awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException ex) {

This seems a little hacky to me. Could you make a public "shutdown()" method that can be called in this situation?

ConcurrentSetTest yields an error when run within the suite

Running mvn clean test -Dtest=ConcurrentSetTest results in the test passing OK.

However, running mvn clean test yields a build failure:
Failed tests: testIteratorCleanup(net.engio.mbassy.ConcurrentSetTest): expected:<33278> but was:<53438>

The surefire report provides little else in terms of information:

Test set: net.engio.mbassy.ConcurrentSetTest

Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 7.159 sec <<< FAILURE! testIteratorCleanup(net.engio.mbassy.ConcurrentSetTest) Time elapsed: 1.256 sec <<< FAILURE! java.lang.AssertionError: expected:<33278> but was:<53438> at at org.junit.Assert.failNotEquals( at org.junit.Assert.assertEquals( at org.junit.Assert.assertEquals( at net.engio.mbassy.common.UnitTest.assertEquals( at net.engio.mbassy.ConcurrentSetTest.testIteratorCleanup( at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( at sun.reflect.DelegatingMethodAccessorImpl.invoke( at java.lang.reflect.Method.invoke( at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( at at org.junit.runners.model.FrameworkMethod.invokeExplosively( at org.junit.internal.runners.statements.InvokeMethod.evaluate( at org.junit.runners.ParentRunner.runLeaf( at org.junit.runners.BlockJUnit4ClassRunner.runChild( at org.junit.runners.BlockJUnit4ClassRunner.runChild( at org.junit.runners.ParentRunner$ at org.junit.runners.ParentRunner$1.schedule( at org.junit.runners.ParentRunner.runChildren( at org.junit.runners.ParentRunner.access$000( at org.junit.runners.ParentRunner$2.evaluate( at at org.apache.maven.surefire.junit4.JUnit4Provider.execute( at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet( at org.apache.maven.surefire.junit4.JUnit4Provider.invoke( at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( at sun.reflect.DelegatingMethodAccessorImpl.invoke( at java.lang.reflect.Method.invoke( at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray( at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke( at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider( at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess( at org.apache.maven.surefire.booter.ForkedBooter.main(

Issue with message delivery when using method overloading and inheritance

I've recently switched from EventBus to MBassador for performance reasons. It makes a world of difference! However, I did run into an issue which results in messages not being delivered. It's easily reproducible and is best explained by an example.

In the example below, I'd expect the two message handlers that listen for event "TestEventA" to be invoked. But instead only one is invoked. However, if the event handler for event "TestEventB " is removed, both message handlers are invoked as expected. I've narrowed down the issue to it having something to do with method overloading and inheritance (also see the comments in the code for useful hints).

Also, though not really relevant for this issue, Google makes it really hard to find your library due to MBassador being "corrected" to ambassador. It's a shame that such a great library is so hard to find.

import net.engio.mbassy.listener.*;

public class Issue extends IssueBase
  public static void main(String[] args)
    new Issue();
    mBassadorA.publishAsync( new TestEventA() );

    catch (InterruptedException e)


   * (!) Remove this method and BOTH message handlers in class IssueBase will be called.
  public void handleEvent(TestEventB event)
    System.out.println("Received event B");

import net.engio.mbassy.bus.BusConfiguration;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.listener.Handler;
import net.engio.mbassy.listener.Listener;
import net.engio.mbassy.listener.References;

@Listener(references = References.Strong)
public class IssueBase
  static MBassador<BaseEventClassA> mBassadorA = new MBassador<BaseEventClassA>( BusConfiguration.Default() );

  public IssueBase()

   * (!) If this method is removed, NO event handler will be called.
  public void handleEventWithNonOverloadedMethodName(TestEventA event)
    System.out.println("Received event A (non-overloaded method)");

  public void handleEvent(TestEventA event)
    System.out.println("Received event A (overloaded method)");

  // ---------------------------------------------------------------------------

  public static class TestEventA extends BaseEventClassA {}
  public static class TestEventB extends BaseEventClassA {}
  public static class BaseEventClassA {}

child event bus

I love your work on mbassador and particularly its flexibility but there isn't any hint to create a child event bus, maybe this is something missing and will lead to a new feature :)
Given an A event bus, I'd like to create a B event bus receiving all A's events but keeping its own event B for itself.
Maybe I'm not clear enough here as I'm not a native english speaker.

Getting exception propagate from the handler to the sending code


I'm using MBassador with great joy, currently only using its basic features.

I would like to decouple few aspects of my application, like e.g. authorization (permission checks inside my BL). For that, synchronous messages can be used. But I would like to get authorization exceptions propagated out to the code that sends the message. e.g.:

bus.publish(new PermissionCheck(object, action, currentUser));   // assume handler throws when needed
catch (PermissionException e)

As handlers can be priorities, I think such approach can make sense.
According to the usage documentation, it seems such use case / behavior is not currently supported.
Do you think it can be supported? or do you see some other way to support my use case?

Potential for integer overflow in SubscriptionByPriorityDesc comparator which can result in handlers being called in the wrong order

There's a bug in the SubscriptionByPriorityDesc comparator:

public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
    public int compare(Subscription o1, Subscription o2) {
        int result = o1.getPriority() - o2.getPriority();
        return result == 0 ? : result;

The following line has potential for integer overflow:

int result = o1.getPriority() - o2.getPriority();

When o1.getPriority() returns -2 and o2.getPriority() returns Integer.MAX_VALUE, then result will evaluate to Integer.MIN_VALUE and the handlers will get called in the wrong order.

I verified this behavior by setting a breakpoint in the comparator and using the testcode shown below.

public class ListeningBean {

    @Handler(priority = -2)
    public void handleTestMessageHighestPriority(TestMessage message) {

    @Handler(priority = Integer.MAX_VALUE)
    public void handleTestMessageLowestPriority(TestMessage message) {


public class App {

    public static void main(String[] args ) {
        MBassador<TestMessage> bus = new MBassador<TestMessage>(BusConfiguration.Default());
        ListeningBean listener = new ListeningBean();
        TestMessage message = new TestMessage();


public class TestMessage {}



'pendingMessages' LinkedBlockingQueue capacity


Is there any plans to allow configuring the 'pendingMessages' LinkedBlockingQueue capacity?
I think it would be nice to have such possibility to prevent excessive queue expansion when producer thread puts more elements than the consumer consumes.

Delayed message dispatch?

I just found MBassador and am really enjoying it. Thanks, bennidi, for the excellent work on it.

I have a use case in which I post messages to the bus in response to user input in a text editor. However, I'd like to delay dispatch of the message (by, say, 250 milliseconds) to ensure that user typing is complete. The message posted to the bus will kick off a (potentially) long-running task that I'd like to be able to interrupt if additional user input is received before the task is complete.

Do you think that it would be possible to add a method to such as

MessagePublication later(long delay, long timeout, TimeUnit units)

to IMessageBus.IPostCommand? Furthermore, would it be possible to put a method interrupt() on MessagePublication that interrupts the Thread (kills the task) that is asynchronously delivering the message?

Or, is there, perhaps, a better way to do this? I suppose I could dispatch the message asynchronously and then simply wait 250 milliseconds at the top of the Handler method. But I think it might be more elegant to have MBassador handle the delay in a consistent manner.

I'm wondering what would be considered best practice here.

Thanks again for the excellent code.

Unit Tests failing

Just downloaded mbassador to try out in code I'm writing. The following tests are failing for me:

  1. net.engio.mbassy.MBassadorTest.testAsynchronousMessagePublication()
  2. net.engio.mbassy.SyncBusTest.testSynchronousMessagePublication()
  3. net.engio.mbassy.DeadMessageTest.testDeadMessage()

They all seem to fail in the same way: the expected count and the actual count of iterations run are not equal. Each time I run, I get different values for the actual iterations, so my expectation is that something isn't waiting around for the concurrent tests to complete. I've glanced briefly at the code and it looks as though it ought to work, but it still fails.

In addition, if I run the tests using the debugger some of the failing tests succeed (although its not the same set every time), but running outside of the debugger all three fail every time.

My configuration is as follows:

Win 7, 32GB RAM, IBM RSA 8.5 (running Java 1.6)

Remote delivery ?

Remote delivery feature would be very cool to enable easy distributed system design. As now, I only know Akka that does such.

Subscription Comparator is limiting the number of classes that are notified of events

Hi Benni,

I spent half of yesterday debugging an issue I was seeing when I would subscribe multiple classes to the same event but only see the first 2 registered listeners being notified so I thought I'd share my findings with you.

I've forked your repo and updated it with the debugging log entries I created and a bit of hack code to get around the cause of the problem I was seeing.

I've also added a new unit test class (org.mbassy.MBassadorExtendedTest) which exposes the issue for you to see.

In the Subscription class I've included a boolean switch in the SubscriptionByPriorityDesc comparator so you can turn the original code on and off to see it's affect on the unit test results.

Hope this is useful.

David E.

Missing events

i am benchmarking mbassador and others eventbus software, there is a bug in mbassador, we miss events, here is the reproductible test :
change the nbListener value, there are always some dozen of events missing.
fyi :same test with EventBus library work like a charm with 60k listener

public class MbassadorTest {
private AtomicInteger nbreceived = new AtomicInteger(0);
int nbListener = 2000;
public void sendEvent() {
for (int i = 0; i < nbListener; i++) {
MyEventBus.getInstance().subscribe(new MyListener());
MyEventBus.getInstance().publish(new MyEvent());

public void check() {
    System.out.println("received : " + nbreceived.get());
    assertEquals(nbListener, nbreceived.get());

public void endEvent(MyEndEvent event) {
    System.out.println("nb received : " + nbreceived.get());


public class MyListener {
public MyListener() {
public void WorkerChangeEvent(MyEvent e) {
MyEventBus.getInstance().publish(new MyEndEvent());
public class MyEvent {
public MyEvent() {
public class MyEndEvent {
public MyEndEvent() {

Filter on listeners

I have a unique tree-like Hierarchical data structure composed of Nodes. I would like to send events from a node to all the other Nodes but the one who sent it.

I was thinking I could do something like this:

public void nodeStarted(NodeMessage message) {

public void start() {
   eventBus.publish(new NodeMessage(this, "blabla"));

public boolean accepts(Object message, MessageHandlerMetadata meta) {
    NodeMessage nm = (NodeMessage)message;
    return message.getNode() != meta.getListener();

Unfortunately, getListener doesn't exist--and upon further investigate the Filter is run only once, and not per listener.

I have another related usecase that I hope MBassador can solve, but perhaps you have an alternate solution? =)
Since the nodes are in a tree-like data structure, most messages should be sent ONLY to the node parents. Again, if filters were applied per listener, I could easily check if the current listener is part of nm.getNode().getParents(). A ugly workaround would be to implement this check in the message handler, but it's terribly inefficient since the message is dispatched to ALL nodes even if only 1 parent actually does something meaningful.

Perhaps a sophisticated (but very elegant) solution like MBassador isn't what's best for my usecases. Do you know of any other libraries that work well with tree-like hierarchy. Thanks.

Improve message publication performance

I wonder if SubscriptionManager.getSubscriptionsByMessageType(Class messageType) can be improved by using a Cache. If you're willing to add Guava as a dependency, you could create a Cache<Class, Collection<Subscription>> to keep track of it, so the work only needs to be one once instead of every time. This could be especially slow for messages that have many interfaces and subclasses.

In order to keep it accurate, when subscribe and unsubscribe is called, you could use invalidateAll, something like invalidateAll(ReflectionUtils.getSuperClasses(listener)).

This is complicated enough that I haven't tried implementing it yet [so I don't know what the performance improvement would be], but I might if you think it's a good idea.

How to deal with event handler priorities which are not known until runtime?

In my specific use case I have a set of objects in which each of them can have one or more annotated handler methods. All handler methods of a particular object must have the same priority but this priority is not known until runtime. This problem boils down to the handlers being called in the same order as the objects were registered to the event bus (FIFO-order). Is it possible to guarantee this behavior, and, if so, how?

subscription objects and subscribing

I have a spring based application and the proxies are jamming up subscription. I can get the target class of the proxy using spring techniques but I need to be able to add a subscription to an event bus that is not just passing the actual object itself that has the @subscribe annotation, but add a subscription to a bus by creating a "subscription" object directly then registering.

Does the event bus support this type of direct registration?



For some reason I still get messages published to my listeners after I unsubscibed them. I looked at the code and I found following:

The message is published to all listeners which are in following collection: subscriptionsPerMessage. This collection is filled when calling subscribe(), but it is not cleared when calling unsubscribe().

The unsubscribe() only looks at the subscriptionsPerListener collection.

Can someone help me with this? Am I understanding/using it wrong?


Handlers priority

The priority of the handlers doesn't work correctly.

public static final Comparator<Subscription> SubscriptionByPriorityDesc = new Comparator<Subscription>() {
        public int compare(Subscription o1, Subscription o2) {
            int byPriority = ((Integer)o1.getPriority()).compareTo(o2.getPriority());
            return byPriority == 0 ? : byPriority;

Comparator name is SubscriptionByPriorityDesc, but it sorts by asc.
It seems to me that, the correct code is :

int byPriority = ((Integer)o2.getPriority()).compareTo(o1.getPriority());

Fire a single Listener with multiple/different Events

Hi Ben,

I'm facing the situation where several different events should/could fire the same Listener (handler).

So far, and if I understand correctly, MBassador allows one Event to fire one or more Listener(s), but a Listener may only be fired by one Event.

Is there a smart way to make it so a given Listener can be fired by multiple Events ?

Feel free to edit my post as needed. Thanks for the library, it's easy and fun to use so far !

What is the difference between post and publish?

I am familiar with Guava's EventBus library. But it seems mbassador's publish is the same as EventBus's post. However, it seems to me, from briefly looking at the source code, that the post seems to return a 'packaged' message that hasn't been published yet.

I wanted some clarification on this. Perhaps this should be added to the wiki (i.e terminology section)?

Inherit @Listener annotation

To me it would make sense if the @Listener annotation would be automatically inherited. I'd therefore propose that the @inherited annotation is added to the class:

@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = {ElementType.TYPE})
public @interface Listener {

Can I Using MBassador In Our Project..

Our project is based on spring. And we have layer like facade/application/domain described in DDD.

Is it possible to use MBassador as a infrastructure component? Like a mechanism to publish domain event?

For example: When book is purchase

//how to inject the MBassador eventBus?

But I can't figer out how to inject MBassador into our domain object..

Probably filtered events should be treated as dead events

Hello Benjamin,

I just thought that it would be nice to have possibility to process filtered events as dead events.

Consider the following example.
I need to have several listeners which should filter event objects depending on different criteria. For example Listener1 should process events with their member variable equal to 'A', Listener2 - "B", Listener3 - "C" and so on.

Also I need to have ListenerX which should process all other events which are not covered by Listener1, Listener2 ... ListenerX-1.

Currently I can achieve it by creating specific filter for ListenerX which will filter all events satisfying Listener1, Listener2 ... ListenerX-1.

Such approach is not very comfortable because I need to have too many conditions in ListenerX filter.

It would be much easier for me to handle all events which have been filtered by Listener1, Listener2 ... ListenerX-1 as dead events.

What do you think?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.