Code Monkey home page Code Monkey logo

jeromq-jms's Introduction

JeroMQ JMS

Introduction

This is JMS 2.0.1 wrapper around ZERO MQ to enable JEE applications to use the ZMQ protocol. The current version uses the JERO MQ 0.4.3 ZMQ, but it should also work with JNI instances.

Core to the wrapper is the Gateway classes that acts as a publisher or subscriber within JMS to/from ZMQ. A gateway contains the protocol for the interaction with the external communicating instances. It also contains 1 or more ZMQ Sockets, to enable failover, and/or parallel through put.

Each gateway has a primary direction, either outgoing, or incoming. All the examples and test use both directions to test the ZERO MQ functionality.

I have made the wrapper very extensible to use;

  • the socket type (PUB, SUB, PULL, PUSH, etc…);
  • the adaptor/marshal for JMS messages to/from and external ZMQ format;
  • the subscription of messages using ZMQ functionality;
  • the JNDI context
  • the optional message store for DR
  • and more.

The library was aimed to work with Spring and with Tomcat. However, it should work in other JEE servers. For this reason I have implement a JMS URI. Sadly there is no open standard, but it is loosely based a similar functionality in Apache MQ.

jms:queue:queue_out?gateway=par&socket.type=DEALER&socket.bind=true&socket.addr=tcp://*:95862&redelivery.retry=3

WIKI

Refer to the WIKI for more details (https://github.com/zeromq/jeromq-jms/wiki)

Release 3.1

Minor release to fix 'inproc' socket addresses

  • inproc pub/sub appears to hang #13
  • removal of 'gateway' as a alternative prefix within the URI
  • Add 'context.name' within the URI to pool ZMQ context instances

Release 3.0

Major release to upgrade to Java 8.0. Othewrwise, bug fixes

  • Extended is not working #7
  • Creating a subscriber (almost) always takes exactly 5seconds #9
  • ZmqConnection.close should be a no-op #12
  • Connect jeromq-jms subscriber to zeroMQ Publisher #14

Release 2.0

This is a major release, with allot of bug fixes and new functionality

  • move the JMS version to 2.0.1 and implement the "simplified" API
  • update JeroMQ version 4.0, and general all version dependencies
  • extends the URI to enable extends of other URIs (drop duplication)
  • switch over gateway.{properties} to socket.{propertes} and adds ALL ZMQ properties (i.e. linger, HWM, etc...) to tweak the underlying ZMQ socket
  • adds DR to the gateways, to enable failover
  • allow N-N (broker-less) messaging (without PROXY n-1-n)
  • adds the ZMQ proxy (with failover) to enable n-1-n setups
  • fixes issues around the PAR protocol to enable DR, Failover, etc..
  • adds Google Protobuf and JSON message marshaling examples in the tests
  • adds Spring annotation based test examples
  • adds journal store functionality to enable BCP and loss less messaging

Examples

  • Simple Queue with ZMQ PUSH/PULL
jms:queue:queue_1?socket.addr=tcp://*:9728&event=stomp
  • Simple Topic with ZMQ PUB/SUB
jms:topic:topic_1?socket.addr=tcp://*:9711&event=stomp
  • Proxied N-1-N example

By specifying a proxy on the receiver queue defintion to enable multiple sender connecting to multiple receivers to enable fan in and out. Only one proxy can bind to the sockets, so any others will staying an PENDING state until the bound proxy drops out.

jms:queue:sender?socket.addr=tcp://*:9728&event=stomp
jms:queue:receiver?proxy.proxyAddr=tcp://*:9728&socket.addr=tcp://*:9729&socket.bind=false&event=stomp
  • Enable JOURNALING on a queue
jms:queue:queueWithJournal?gateway=par&gateway.socket=tcp://*:9711&event=stomp&journal=file
  • Queue showing ZMQ socket property example settings
  • Topic with ZMQ label filters (alternative to JMS subscription filtering)
jms:topic:topic_2?socket.addr=tpc://*:9990&filter=propertyTag&filter.subTags=NASA,APAC&filter.pubPropertyName=Region&event=stomp

Contribution Process

This project uses the C4 process for all code changes.

Licensing

Copyright (c) 2015 Jeremy Miller

Copyright other contributors as noted in the AUTHORS.txt file.

Free use of this software is granted under the terms of the Mozilla Public License Version 2.0 (MPL). If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

jeromq-jms's People

Contributors

ggallen avatar mjeremym avatar scoheb avatar tommie-lie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jeromq-jms's Issues

Creating a subscriber (almost) always takes exactly 5seconds

I have source code like this:

TopicConnectionFactory zmqConnectionFactory = new ZmqConnectionFactory();
TopicConnection connection = zmqConnectionFactory.createTopicConnection();
final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

final Topic topicA = session.createTopic("jms:topic:topic_A?socket.addr=tcp://localhost:55555");

Instant before = Instant.now();
TopicSubscriber subscriber = session.createSubscriber(topicA);
System.out.println("A: subscribing took " + Duration.between(before, Instant.now()).toMillis() + "ms");

It always prints a duration of a little over 5000ms. I don't lose any messages during those 5 seconds, instead they are buffered and I receive them all at once. But createSubscriber just blocks for 5 seconds.

5000ms, incidentally, is the value of SOCKET_STATUS_TIMEOUT_MILLI_SECOND. Looking at https://github.com/zeromq/jeromq-jms/blob/master/src/main/java/org/zeromq/jms/protocol/AbstractZmqGateway.java#L187, if the timeout is -1, waitTime is set to 5000. The socket status seems to be asynchronous, as usually the first execution of the loop in waitOnStatus always yields PENDING for the status. It then sleeps for waitTime milliseconds which was set to 5000. Normally, the socket becomes RUNNING only a few milliseconds later, so that I'm waiting uselessly for ~4950ms.

Judging from the naming of the constants, what the code should be doing if the timeout was given as -1 is to keep polling the state but cancel the polling after SOCKET_STATUS_TIMEOUT_MILLI_SECOND. Instead, it polls only every SOCKET_STATUS_TIMEOUT_MILLI_SECOND and still waits infinitely long.

I prepared a pull request to fix this issue.

Connect jeromq-jms subscriber to zeroMQ Publisher

I am trying to connect a jms based subscriber to a plain ZeroMQ Publisher. But the subscriber does not recive any message. Is there any way to achive this?

Publisher:

    Context context = ZMQ.context(1);

    final Socket publisherSocket = context.socket(ZMQ.PUB);

    publisherSocket.bind("tcp://*:9714");
    Thread t = new Thread(() -> {
        while (true) {

            // Write two messages, each with an envelope and content
            publisherSocket.sendMore("evt");
            publisherSocket.send("TestEvent");

        }

JMS Template configuration:

  final JmsTemplate template = new JmsTemplate();

    template.setPubSubDomain(true);
    template.setConnectionFactory(connectionFactory());
    template.setDefaultDestinationName(getPubServerUri(null));
    template.setPubSubNoLocal(false);
    template.setReceiveTimeout(100);
    return template;
private TopicConnectionFactory connectionFactory() {

      final TopicConnectionFactory connectionFactory = new ZmqConnectionFactory(
              new String[] { getPubServerUri(null)});

      return connectionFactory;
  }


  private ConnectionFactory connectionFactory() {

      final ConnectionFactory connectionFactory = new ZmqConnectionFactory(
              new String[] { getPubServerUri(null)});

      return connectionFactory;
  }

 private String getPubServerUri(ZMQ_CHANNEL channel)
  {
      return "jms:topic:all?gateway.addr=tcp://*:9714&socket.addr=tcp://*:9714";
  }

Subscriber:

           System.out.println(jmsTemplate.receiveAndConvert());

`inproc` pub/sub appears to hang

The example code below works if the topic is changed to use a tcp address but hangs indefinitely if an inproc address is used.

Could this be related in some way to zeromq/libzmq#1257? It seems unlikely, but....

Note: as written, this example throws an exception on successful completion because of #12.

public class Jms {

  public static void main(String[] args) throws Exception {
    TopicConnectionFactory connectionFactory = new ZmqConnectionFactory();
    try (Connection connection = connectionFactory.createConnection()) {
      Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);

      Topic sessionsTopic = session.createTopic("jms:topic:sessions?socket.addr=inproc://example");
//      Topic sessionsTopic = session.createTopic("jms:topic:sessions?socket.addr=tcp://localhost:1234");

      MessageProducer producer = session.createProducer(sessionsTopic);

      CountDownLatch latch = new CountDownLatch(1);
      MessageConsumer consumer = session.createConsumer(sessionsTopic);
      consumer.setMessageListener(message -> {
        try {
          System.out.println(((TextMessage) message).getText());
        } catch (JMSException e) {
          throw new RuntimeException(e);
        }
        latch.countDown();
      });

      TextMessage message = session.createTextMessage("Hello, World!");

      producer.send(sessionsTopic, message);

      latch.await();
    }
  }
}

Release of this project?

Hi,

I just want to know if you intend on making a release of this product on Maven Central?

Thanks

Scott

`ZmqConnection.close` should be a no-op

javax.jms.Connection is AutoCloseable, so expected usage is in a try-with-resources block. This automatically calls close when leaving the block. The current implementation of ZmqConnection.close throws an UnsupportedOperationException. Making this a no-op (and potentially logging the problem?) would allow the expected JMS usage to work.

Extended is not working

Hello,

I've been testing this library and found that I cannot make "extend=" in the query params to work.
I've defined a parent topic and pass it in the ZmqConnectionFactory destinations parameter, then I've tried to create the topic using the ZmqSession obteined from the ZmqConnection created by the mentioned ZmqConnectionFactory, but I get an error saying "Unable to resolve 'socket.addr' or 'gateway.addr'" for my queue URI.

Additionally to the error previously mentioned, I've tried to download the project and see how ZmqExtendedURI is used, but I couldn't find any reference to that class being used, except for the corresponding UnitTest class.

Best regards,

tools.jar and jconsole.jar in pom.xml

I was just trying to use jeromq-jms in my spring boot project by using the following maven dependency of the newest (I suppose not yet official) release:

<dependency>
	<groupId>org.zeromq</groupId>
	<artifactId>jeromq-jms</artifactId>
	<version>3.0.1-RELEASE</version>
</dependency>

After that I have problems with these two dependencies:

<dependency>
	<groupId>com.sun.tools</groupId>
	<artifactId>jconsole</artifactId>
	<version>${version.java}</version>
	<scope>system</scope>
	<systemPath>${path.jconsolejar}</systemPath>
</dependency>
<dependency>
	<groupId>com.sun.tools</groupId>
	<artifactId>tools</artifactId>
	<version>${version.java}</version>
	<scope>system</scope>
			<systemPath>${path.toolsjar}</systemPath>
</dependency>

When using the lastest official release it works:

<dependency>
	<groupId>org.zeromq</groupId>
	<artifactId>jeromq-jms</artifactId>
	<version>2.0.2-RELEASE</version>
</dependency>

Why are these dependencies necessary? Shouldn't tools.jar and jconsole.jar be part of the installed jdk? In my installation (Mac OS 10.15.3, Amazon Coretto JDK) they are located here:

/Library/Java/JavaVirtualMachines/amazon-corretto-8.jdk/Contents/Home/lib

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.