Code Monkey home page Code Monkey logo

msb-java's People

Contributors

agritsik avatar alexandr-zolotov avatar anha1 avatar anstr avatar galkin avatar plaenen avatar rootvm avatar sergiimeleshko avatar sergiv83 avatar timbortnik avatar vossim avatar vso-tc avatar yuriysavchuk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

msb-java's Issues

durationMS is 0 or negative value only

sample message

{"id":"ac5d40ba-5889-40c5-9101-653c114159d2","correlationId":"ccc893bc-e3f5-46a2-b991-74342e3da2de","tags":["ping-service","52365b7b-4691-40b8-8e59-3c484ef81600","ghcm-microservice"],"topics":{"to":"mdm:events:hotel:v1:response:msbd06a-ed59-4a39-9f95-811c5fb6ab87"},"meta":{"createdAt":"2015-12-08T19:42:10.780Z","publishedAt":"2015-12-08T19:42:10.899Z","durationMs":-119,"serviceDetails":{"name":"ghcm-microservice","version":"1.0.0","instanceId":"e36581d3-71a1-4b60-a437-cb559ef4087d","hostname":"msb-java-staging","ip":"10.182.8.68","pid":3264}},"ack":{"responderId":"155f5953-1e8a-4da3-b482-8c02abfbd294","responsesRemaining":-1},"payload":{"event":"convertHotelCode","body":{"convertedCodeValue":["A0003","A8565","305AL","305BJ"],"codeValue":"vsx2djy","sourceCodeTypeID":"13","targetCodeTypeID":"2"}}}

value for field is incorrect. see https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html#between-java.time.temporal.Temporal-java.time.temporal.Temporal-

and pull request that introduce bug:3c88906

Improvement of MSB developer API

Issues with the current API

Here’s a part of DateExtractor implemented using msb-java (you can find full source here):

public class DateExtractor {

    public static void main(String... args) {

        MsbContext msbContext = new MsbContext.MsbContextBuilder().
                withShutdownHook(true).
                build();
        DefaultChannelMonitorAgent.start(msbContext);

        MessageTemplate options = new MessageTemplate();
        final String namespace = "search:parsers:facets:v1";

        final Pattern YEAR_PATTERN = Pattern.compile("^.*(20(\\d{2})).*$");

        ResponderServer.create(namespace, options, msbContext, (request, responder) -> {

                    RequestQuery query = request.getQueryAs(RequestQuery.class);
                    String queryString = query.getQ();
                    Matcher matcher = YEAR_PATTERN.matcher(queryString);

                    if (matcher.matches()) {
                        // send acknowledge
                        responder.sendAck(500, null);

                        // parse year
                        String str = matcher.group(1);
                        Integer year = Integer.valueOf(matcher.group(2));

                        // populate response body
                        Result result = new Result();
                        result.setStr(str);
                        result.setStartIndex(queryString.indexOf(str));
                        result.setEndIndex(queryString.indexOf(str) + str.length() - 1);
                        result.setInferredDate(new HashMap<>());
                        result.setProbability(0.9f);

                        Result.Date date = new Result.Date();
                        date.setYear(year);
                        result.setDate(date);

                        ResponseBody responseBody = new ResponseBody();
                        responseBody.setResults(Arrays.asList(result));
                        Payload responsePayload = new Payload.PayloadBuilder()
                                .withStatusCode(200)
                                .withBody(responseBody).build();

                        responder.send(responsePayload);
                    }
                })
                .listen();
    }
//…
}

This approach is cloned from msb (NodeJS) and is very flexible. However it leads to the following issues:

  • A lot boilerplate code that needs to be copy-pasted:
    • Initialization of MsbContext
    • Creation and hooking up of ResponderServer
    • Manual parsing of input message (request.getQueryAs(RequestQuery.class);)
    • Explicit starting of DefaultChannelMonitorAgent
  • A lot of static methods and constructors are used which complicates unit testing.
  • Lifecycle steps (initialization/shutdown) need to be manually triggered by microservice developer. In the example above we don't have to worry about that too much but imagine a microservice that needs to gracefully close database connection upon shutdown.
  • Microservice developer has to use too many MSB classes which may complicate further msb-java development (for compatibility reasons).

Possible solution: inversion of control

With inversion of control and dependency injection we could solve all the problems described in the previous section. The general idea is to free microservice developer from implementing own main method. Instead he/she has to implement some simple interface with business logic and have the lifecycle methods of that interface invoked externally by msb-java "infrastructure" (whatever it is).

The idea is taken from Java Servlet specification because each microservice is in some way resembles good-old javax.servlet.http.HttpServlet.

However different microservices might have different lifecycles so let's discuss various types of them in detail.

Common microservices

Microservice driven by MSB messages

This type of microservice listens to a single request topic, does something and produces acks and responses to the corresponding response topic. DateExtractor considered above is an example of such microservice. It obtains a query string, tries to parse a date from it and sends the result back.

Optionally during request processing such microservice can consult with another microservice by sending request to it. For this reason we need to inject an instance of Requester during initialization. An important point here is that with such approach we have to use single instance of Requester inside the microservice as opposed to creating a new one for each request (as currently implemented).

Here's the proposed interface for such type of microservice:

public interface MsbMicroservlet {
    /**
     * Invoked during initialization of microservice
     * @param msbContext holds initial configuration and core MSB objects
     * @param requester can be used to send requests to (and process responses from) other microservices via bus
     */
    void init(MsbContext msbContext, Requester requester);

    /**
     * Processes incoming request
     * @param request the payload of the incoming request
     * @param responder allows to send responses and acks back
     */
    void process(Payload request, Responder responder);

    /**
     * Shuts down this microservice
     * @param shouldInterrupt whether active tasks should be interrupted
     */
    void shutdown(boolean shouldInterrupt);
}

Of course something has to instantiate the object that implements such interface and subscribe it to the given namespace. That something is going to be instance of library class MsbMicroserviceRunner which has main method implemented. It creates instances of MsbMicroservlet and invokes their lifecycle methods.

So each microservice is launched from command line as a separate OS process like this:

java -cp my_microservice.jar:msb.jar MsbMicroserviceRunner

Also MsbMicroserviceRunner has to know the actual class and namespace name. To achieve this microservice author specifies this information in the config file:

msbConfig {
#...
  microservletClass = io.github.tcdl.examples.DateExtractor
  microservletNamespace = search:parsers:facets:v1
#...
}

Microservice driven by messages coming from external systems

Another common type of microservice doesn't explicitly subscribe to serve requests from bus but rather only puts them there.
Examples are:

  • http2bus listens to HTTP requests, converts them into bus messages and publishes into the configured topics. After that MSB responses are collected, converted back to HTTP and sent back.
  • monitor and logger have internal timer that periodically triggers heartbeat requests into a _channels:heartbeat topic to get stats from other microservices. Actually they also subscribe to _channels:announce so they are hybrid (driven both by internal timer and by incoming MSB bus messages).

In this case interface MsbMicroservlet given in the previous section also works and:

  • Service setup (starting of heartbeater or HTTP server) should be done in init
  • process may be no-op if the microservice doesn't subscribe to any topic.

An interesting caveat is that we need to reserve a value to specify that MsbMicroservlet shouldn't listen to any topic:

msbConfig {
#...
  microservletNamespace = NONE
#...
}

Less-common microservices

Microservice that listens to multiple topics

Now let's consider a microservice that subscribes to multiple topics (possibly dynamically). An example of such microservice would be logger that dynamically subscribes to every topic that it gets from other microservices.

To streamline development of such microservices we need to introduce further changes in interfaces:

First of all we need to add another parameter to MsbMicroservlet.init:

    /**
     * Invoked during initialization of microservice
     * @param msbContext holds initial configuration and core MSB objects
     * @param requester can be used to send requests to (and process responses from) other microservices via bus
     * @param msbMicroservletManager allows to dynamically instantiate new services in the same JVM
     */
    void init(MsbContext msbContext, Requester requester, MsbMicroservletManager msbMicroservletManager);

And here's MsbMicroservletManager:

public interface MsbMicroservletManager {
    /**
     * Initializes a given microservice by its class name and subscribes it to the given namespace. All initialization lifecycle steps are executed.
     * @param namespace defines a topic to subscribe to
     * @param microserviceClass defines a class name to instantiate microservice from
     * @return initialized microservice instance
     */
    MsbMicroservlet initMicroservice(String namespace, Class<MsbMicroservlet> microserviceClass);

    /**
     * Shuts down the microservice by executing its shutdown lifecycle steps. The microservice is also unsubscribed from its namespace
     * @param namespace defines a topic that the microservice is subscribed to
     */
    void shutdownMicroservice(String namespace);

    /**
     * Shuts down the microservice by executing its shutdown lifecycle steps. The microservice is also unsubscribed from its namespace
     * @param microservice the microservice to shut down
     */
    void shutdownMicroservice(MsbMicroservlet microservice);
}

Another point is that probably those instances of MsbMicroservlet need to share some state so we need to enrich MsbContext as well:

public class MsbContext {
//...
    public Set<String> getAttributeNames() {
        //...
    }

    public Object getAttribute(String name) {
        // ...
    }

    void setAttribute(String name, Object value) {
        // ...
    }

    void removeAttribute(String name) {
        // ...
    }
//...
}

NOTE: The actual logger uses raw broker consumers to react on messages because in that particular case there's no need to send the replies back.

Even more!!!

The current flexible approach is not going away. For ninjas that need to implement other patterns not covered above we still expose ChannelManager, Requester, ResponderServer through MsbContext. That is going to allow doing any low-level stuff one may need.

Deadlocks using TestMsbAdapterFactory

We have run into a deadlock while running tests with the TestMsbAdapterFactory as brokerAdapterFactory. In our case it happened when we had a responder server that handed off the work to another thread that then tried to send a response with the responder that it got from the response context.

The problem seems to be that multiple threads access methods on the TestMsbStorageForAdapterFactory instance of that factory concurrently and everything is synchronized, so the entire thing deadlocks.

It looks like this could be solved by using java.util.concurrent.ConcurrentMap instances.

Expose full incoming message to handlers

In rare cases microservice developer needs access not only to message payload but to message envelope as well. Current implementation allows sneaky access to it in io.github.tcdl.msb.api.ResponderServer.RequestHandler via responder.getOriginalMessage() but this doesn't look very elegantly. So:

  1. Consider adding the whole message to the signature of io.github.tcdl.msb.api.ResponderServer.RequestHandler.process
  2. Consider adding the whole response message to the signature of io.github.tcdl.msb.api.Requester.onResponse instead of having separate method onRawResponse

Channel monitor agent throws exceptions upon microservices shutdown

The monitor throws exception upon microservice shutdown

15:43:17,122 ERROR [AMQP Connection 127.0.0.1:5672] AmqpAutoRecoveringChannel - Shutdown is NOT initiated by application. Resetting the channel.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange '_channels:heartbeat' in vhost '/', class-id=60, method-id=40)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
    at java.lang.Thread.run(Thread.java:745)

Steps to reproduce:

  1. Start io.github.tcdl.msb.examples.Monitor
  2. Start io.github.tcdl.msb.examples.DateExtractor
  3. Observe monitor logging heartbeat to console (as intended)
  4. Stop the microservice
  5. Observe abovementioned error in monitor console (not intended)

Improve response timeout processing

  • Both msb and msb-java have hardcodded default response timeout 3000 ms.
  • In case when a small timeout like "1" is provided by a mistake in client code, response timeout can't be scheduled:
TimeoutManager Enabling response timeout for -6 ms
TimeoutManager Unable to schedule timeout with negative delay : -6

These issues could be resolved, when introducing a configuration parameter like "minWaitForResponsesTimeout" that is used both as minimal and as a default timeout value. Please consider to introduce this change.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.