Code Monkey home page Code Monkey logo

mod-rxvertx's Introduction

mod-rxvertx

Vert.x module which uses RxJava to add support for Reactive Extensions (RX) using the RxJava library. This allows VertX developers to use the RxJava type-safe composable API to build VertX verticles.

Dependencies

  • The module wraps the VertX core objects to add Observable support so it is tightly bound to the VertX release.
  • This module also contains the Netflix RxJava library.

Status

Currently Observable wrappers are provided for

  • EventBus
  • HttpServer
  • HttpClient
  • NetServer
  • NetClient
  • Timer

There are also base Observable adapters that map Handler and AsyncResultHandler to Observable that can be used to call other Handler based APIs.

Support coming soon for

  • FileSystem
  • SockJSServer

Usage

This is a non-runnable module, which means you add it to your module via the "includes" attribute of mod.json.

All standard API methods of the form

void method(args...,Handler<T> handler)

are typically available in the form

Observable<T> method(args...)

where the operation is executed immediately or

Observable<T> observeMethod(args...)

where the operation is executed on subscribe. This latter form is the more 'pure' Rx method and should be used where possible (required to maintain semantics of concat eg)

EventBus

RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
  public void call(RxMessage<String> message) {
    // Send a single reply
    message.reply("pong!");
  }
});

Observable<RxMessage<String>> obs = rxEventBus.send("foo", "ping!");

obs.subscribe(
  new Action1<RxMessage<String>>() {
    public void call(RxMessage<String> message) {
      // Handle response 
    }
  },
  new Action1<Throwable>() {
    public void call(Throwable err) {
     // Handle error
    }
  }
);

Scheduler

The standard RxJava schedulers are not compatible with VertX. In order to preserve the Vert.x Threading Model all callbacks to a Verticle must be made in the context of that Verticle instance.

RxVertx provides a custom Scheduler implementation that uses the Verticle context to scheduler timers and ensure callbacks run on the correct context.

In the following example the scheduler is used to run a Timer and then buffer the output.

Note: The RxVertx scheduler must always be used to observe results inside the Verticle. It is possible to use the other Schedulers (eg for blocking calls) as long as you always use observeOn to route the callbacks onto the Verticle EventLoop. For timers it is more efficient to just use the Vert.x scheduler

RxVertx rx = new RxVertx(vertx);
Observable o = (some observable source)

Observable
      .timer(10, 10, TimeUnit.MILLISECONDS, rx.contextScheduler())
      .buffer(100,TimeUnit.MILLISECONDS,rx.contextScheduler())
      .take(10)
      .subscribe(...)

Timer

The timer functions are provided via the RxVertx wrapper. The timer is set on-subscribe. To cancel a timer that has not first, or a periodic timer, just unsubscribe.

RxVertx rx = new RxVertx(vertx);
rx.setTimer(100).subscribe(new Action1<Long>() {
  public void call(Long t) {
    // Timer fired
  }
});

The new Scheduler means you can use the native RxJava Timer methods - this Timer may be deprecated in future

Helper

The support class RxSupport provides several helper methods for some standard tasks

Streams

There are two primary wrappers

Observable RxSupport.toObservable(ReadStream)

Convert a ReadStream into an Observable<Buffer>

RxSupport.stream(Observable,WriteStream)

Stream the output of an Observable to a WriteStream.

please note that this method does not handle writeQueueFull so cannot be used as a pump

mod-rxvertx's People

Contributors

ddossot avatar petermd avatar purplefox avatar sharathp avatar squaredfinancialit avatar tavisrudd avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.