Code Monkey home page Code Monkey logo

alibaba-rsocket-broker's Introduction

logo

Gitter Maven GitHub repo size Open Issues Build Status Apache License 2

Alibaba RSocket Broker是一款基于RSocket协议的反应式对等通讯系统,为通讯多方构建分布式的RPC, Pub/Sub, Streaming等通讯支持。

  • 反应式: 无需担心线程模型、全异步化、流式背压支持、独特的对等通讯模式可适应各种内部网络环境和跨云混云的需求。
  • 程控:完善的控制面(Control Plane)支持,可定制和方便的功能扩展,如支持反向的Prometheus Metrics采集、ZipKin RSocket Collector、Chaos等。
  • 消息:面向消息通讯,服务路由、过滤、observability都非常简单。
  • 交换系统:完全分布式、异构系统整合简单,无论应用什么语言开发、部署在哪里,都可以相互通讯。

更多RSocket Broker资源和介绍,请访问以下资源:

RSocket Broker工作原理

RSocket Broker桥接应用间通讯的双方,相当于一个中间人的角色。 应用在启动后,和Broker创建一个长连接,在连接创建的时候需要标明自己的身份,如果是服务提供者,会注册自己能提供的服务信息。 Broker会针对所有的连接和服务列表建立对应的映射关系。 当一个应用需要调用其他服务时,应用会将请求以消息的方式发给Broker,然后Broker会解析消息的元信息,然后根据路由表将请求转发给服务提供者,然后将处理结果后的消息再转发给调用方。 Broker完全是异步化的,你不需要关心线程池这些概念,而且消息转发都是基于Zero Copy,所以性能非常高,这也是为何不用担心中心化Broker成为性能瓶颈的主要原因。

RSocket Broker Structure

通过上述的架构图,RSocket Broker彻底解决了传统设计中众多的问题:

  • 配置推送: 连接已经建立,只需要通过RSocket的metadataPush可以完成配置推送
  • 服务注册和发现:应用和Broker建立连接后,这个长连接就是服务注册和发现,你不需要额外的服务注册中心
  • 透明路由: 应用在调用服务时,不需要知道服务对应的应用信息, Broker会完成路由
  • Service-to-service调用: RSocket提供的4个模型可以很好地解决服务到服务调用的各种复杂需求
  • Load balancing: 所有的应用和Broker建立长连接后,负载均衡在broker中心路由表完成,对应用完全透明。
  • Circuit Breakers: 断路保护,现在调整为Back Pressure支持,更贴近实际业务场景
  • Distributed messaging: RSocket本身就是基于消息推送的,而且是分布式的。
  • 多语言支持: RSocket是一套标准协议,主流语言的SDK都有支持,详情请访问 RSocket SDK Stack

项目模块

  • alibaba-rsocket-service-common: RSocket服务接口定义基础模块,包括Annotation, Reactive相关框架和支撑类
  • alibaba-rsocket-core: RSocket核心功能模块
  • alibaba-rsocket-spring-boot-starter: Spring Boot Starter for RSocket, 包括RSocket服务发布和消费
  • alibaba-broker-spring-boot-starter: Spring Boot Starter for RSocket Broker, 方便第三方进行扩展
  • alibaba-rsocket-broker: Alibaba RSocket Broker参考实现
  • alibaba-broker-registry-client-spring-boot-starter: 通过RSocket Broker对外提供服务发现服务
  • alibaba-broker-config-client-spring-boot-starter: 通过RSocket Broker对外提供配置推送服务
  • rsocket-broker-gateway-http: RSocket Broker HTTP网关,将HTTP转换为RSocket协议
  • rsocket-broker-gateway-grpc: RSocket Broker gRPC网关,将gRPC转换为RSocket协议

开发环境要求

  • JDK 11: RSocket Broker Server基于Java 11,但是Broker Client等是Java 8兼容的
  • Maven 3.5.x
  • Node 16+: RSocket Broker采用Vaadin 23.0版本构建控制界面,所以你需要安装Node 16以上版本

如何运行Example?

注意: 样例代码中的AccountService接口采用了Protobuf进行序列化,使用了protobuf-maven-plugin生成对应的Protobuf, 建议使用IDE导入项目之前,首先在项目的根目录下执行一下"mvn -DskipTests package" 完成Protobuf对应的代码生成,不然直接在IDE中编译可能出现编译不通过的情况。

项目提供了完成的样例,你可以在example模块下找到,包括服务接口定义、服务实现和服务调用三个部分。

启动RSocket Broker
  • Jbang方式启动: 通过jbang rsocket-broker@alibaba-rsocket-broker 命令启动RSocket Broker
  • Docker Compose运行RSocket Broker: 在RSocket Broker项目目录下执行 'docker-compose up -d' 启动RSocket Broke
  • 在IDE中运行RSocket Broker: 找到AlibabaRSocketBrokerServer类,运行main函数,启动RSocket Broker
运行 RSocket Responder & Requester
  • 找到RSocketResponderServer类,运行main函数,启动RSocket Responder对外提供Reactive服务
  • 找到RSocketRequesterApp类,运行main函数,启动RSocket Requester, 进行Reactive Service消费
  • 在IDEA中,找到example.http,运行 "GET http://localhost:8181/user/2" 或者运行以下命令,进行服务调用测试。
$ curl http://localhost:8181/user/2

样例的详细介绍请访问 Example

RSocket服务编写流程

包括如何创建一个Reactive服务接口,在Responder端实现该接口,在Requester完成Reactive服务调用,以及通讯双方是如何和Broker交互的。

  • 创建一个RSocket服务接口,你可以创建一个单独的Maven Module存放这些接口,如user-service-api,样例代码如下:
public interface UserService {
    Mono<User> findById(Integer id);
}
  • 在RSocket Responder端实现该接口,同时给实现类添加 @RSocketService annotation,如下:
@RSocketService(serviceInterface = UserService.class)
@Service
public class UserServiceImpl implements UserService {
    @Override
    public Mono<User> findById(Integer id) {
        return Mono.just(new User(1, "nick:" + id));
    }
}

不少开发者会问道,如果是MySQL数据库,如何和Reactive集成。目前R2DBC有对MySQL的支持,你可以参考一个Spring Cloud RSocket + R2DBC + MySQL的Demo实现: https://github.com/linux-china/spring-cloud-function-demo/

  • 在RSocket Requester,以Proxy方式创建Reactive服务接口对应的Spring bean, 如下:
    @Bean
    public UserService userService(@Autowired UpstreamManager upstreamManager) {
        return RSocketRemoteServiceBuilder
                .client(UserService.class)
                .upstreamManager(upstreamManager)
                .build();
    }
  • 在RSocket Requester端,进行代码调用,如HTTP REST API提供给:
@RestController
public class PortalController {
    @Autowired
    UserService userService;

    @GetMapping("/user/{id}")
    public Mono<User> user(@PathVariable Integer id) {
        return userService.findById(id);
    }
}

样例项目请参考: https://github.com/alibaba-rsocket-broker/rsocket-broker-simple-example

References

alibaba-rsocket-broker's People

Contributors

alibaba-oss avatar anuger avatar chenggangpro avatar fox1ck avatar he-pin avatar hero-zhanghao avatar jackie1in avatar jimichan avatar kevinten10 avatar linux-china avatar sawravchy avatar slowrookie avatar szihai avatar xiaoboey avatar yiyanghua avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

alibaba-rsocket-broker's Issues

Reactive兼容RxJava 3

目前RSocket Broker支持RxJava 2, RxJava 3 估计在在1月底发布,要兼容 RxJava 3。 主要在RSocketRequestRpcProxy 和 RSocketResponderSupport 进行调整。

RSocket Payload自定数据类型支持

在RSocket Broker的场景中,一个RSocket长连接会传输各种类型的Payload,虽然RSocket长连接支持全局的Payload的data mime type,但是实际中会存在不用的业务场景使用不同的数据编码格式,如RPC调用和Kafka消息可能就是不同的数据类型,这个时候就需要给每一个Payload设置独立的数据编码格式,对于RPC场景,可能还存在对返回数据的数据编码需求,如以下签名:

User findUserByNick(String nick);

有可能是请求的时候是Text/Plain编码,而返回的对象结果要求是JSON编码,这种不同的编码需求主要是出于个性化和性能要求。 在RSocket 281的Metadata规范中,增加了data encoding和accept data encoding两者元数据类型,这样可以保证满足该场景的要求。

在RSocket Broker中,建议服务提供方支持多种数据类型,目前主要包括:

  • 数字类型编码支持: 如果Integer, long, double等
  • 字符串支持: String
  • byte数组: 如在文件上传的场景
  • 自定义序列化方式: JSON, Hessian, Protobuf, Avor这四者

Composite Metadata Extension: data Mime/type definition per-stream: rsocket/rsocket#281

手动执行 Example 代码长时间未操作后再次调用服务出现 500 错误

代码为 Example 中的代码,按照文档步骤操作后访问正常,
过了六小时未操作后再次调用接口服务器会报 500 异常,手动将 Broker、Respones、Request 重启后正常访问

2019-12-11 10:27:20.625  INFO 97481 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8181
2019-12-11 10:27:20.628  INFO 97481 --- [           main] c.a.s.b.r.demo.RSocketRequesterApp       : Started RSocketRequesterApp in 1.702 seconds (JVM running for 2.302)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115)
	at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110)
	at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146)
	at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-12-11 16:14:48.833 ERROR 97481 --- [ctor-http-nio-4] a.w.r.e.AbstractErrorWebExceptionHandler : [fc5bd7d9]  500 Server Error for HTTP GET "/user/1"

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler com.alibaba.spring.boot.rsocket.demo.PortalController#user(Integer) [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/user/1" [ExceptionHandlingWebHandler]
Stack trace:
		at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54) ~[rsocket-core-1.0.0-RC5.jar:na]
		at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
		at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]

当Responder端的service未指定version时,Broker的控制台报错

1. Stack

	at com.alibaba.rsocket.broker.web.ui.ServicesView.lambda$services$0(ServicesView.java:50) ~[classes/:na]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_231]
	at java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3566) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[na:1.8.0_231]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_231]
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_231]

使用state machine来管理应用的状态

目前的应用状态主要包括 connected, serving, paused, stopped,同时触发不同的逻辑。 使用状态机管理逻辑可能更清晰,代码也容易管理。 目前spring state machine的flux支持还没有发布,不过目前可以做一些测试,性能不用担心,主要是StateMachine对象的大小。

spring-projects/spring-statemachine#397

应用端到Broker集群的Load Balance架构和稳定性

目前RSocket主要涉及到的Load Balance到Broker 集群的多连接管理。 Alibaba RSocket Broker采用share nothing架构,也就是集群中broker相互之间不通讯,不承担消息转发。 目前客户端SDK的设计思路是通过集群推过来的拓扑结构变更,SDK端完成连接的重连和路由更新等。 目前代码只有基础功能,需要再细致完善,提升稳定性。

服务路由性能提升

目前路由的算法是 group + service name + version,然后对其进行hashcode,然后在进行bitmap匹配查找。 可以考虑在调用方提前将路由信息进行HashCode化,然后基于Integer Hashcode进行bitmap匹配,这样只需要读取composite metadata的前8个字节(1 + 3 + 4)就可以完成路由匹配。 考虑到Hashcode各个语言的一致性性,还是采用 MurmurHash3 算法。

Alibaba RSocket Broker的Docker镜像

提供RSocket Broker 开发环境Docker镜像,方便使用Docker或者Docker Compose的开发者快速启动RSocket Broker进行相关的测试。 目前已经RSocket Broker已经使用Jib进行镜像制作,但是还没有提供到docker hub上。

Alibaba RSocket Broker 1.0.0.M2

准备1.0.0.M2发布:

  • RSocket
    • RSocket Java SDK 1.0.0
    • Spring Boot 2.3.0
  • Development
    • Performance testing for memory leak
  • Deployment
    • Maven Central Deployment
  • Documents
    • M2 Release Note

Scalecube-cluster支持broker到broker的rpc调用

在某些情况下,需要支持broker到broker直接的RPC调用,例如访问broker的一些配置等,目前一个场景就是broker要支持外部应用应用接入,应用无法直接访问broker的内部IP,如office的机器访问云vpc内部的rsocker broker集群,这个时候broker要有两个ip,所以对外部应用推送就需要走外部IP或者域名推送,内部应用则走内部IP推送。

解决思路:

  • 添加json rpc支持,这个比较简单一些,scalecube-cluster已经支持request/response,添加一个规范就可以。
  • 添加自定义的scalecube-jackson-codec支持,支持broker直接的对象序列化,目前主要要支持CloudEvents的JSON序列化

Broker RPC特性可以方便后续对broker的功能的扩展,broker之间通讯也比较简洁。

为RSocketRemoteServiceBuilder添加Interceptor支持

RSocket默认支持Interceptor机制,你可以通过ClientRSocketFactory.addRequesterPlugin() 添加你自定义的RSocketInterceptor,但是这些interceptor相对比较低级一些,如果你要获取具体的信息还需要进行ByteBuf解析,尤其是metadata。 如目前的Zipkin Trace实现,实现相对麻烦一些,如果以Interceptor实现,就会简单很多。

Alpha Release Roadmap

准备Alpha1的版本发布,一些要解决的核心问题:

  • RSocket
    • RSocket Java SDK 1.0.0
  • Development
    • Deploy artifacts to Maven central repository(进行中)
    • External apps integration from outside of the cluster network
    • Integration testing for all features
    • JDK 8, 11 & 14 compatible testing
  • Deployment
    • Dev deployment
    • Gossip cluster deployment
  • Documents
    • Alibaba RSocket Broker Website
    • Alibaba RSocket Broker Wiki

如果大家还有一些比较关注的问题,这里可以留言一下。

Kotlin Coroutines & Flow支持

目前RSocket Service接口支持Reactor,RxJava2, RxJava 3,还没有对Kolin Coroutines和 Flow的支持。 Kotlin Coroutines和Reactive都可以相互转换的,这里调研一下大家是否有在Kotlin? 是否有意愿使用Kotlin的原生接口?

interface UserService {
    suspend fun getAdmin(): String
    suspend fun getNickById(id: Int): String
    fun getAllNames(): Flow<String>
}

Java Proxy机制调整

目前RSocket Service是基于Java Interface的,所以需要通过Proxy机制代理进行RSocket接口调用。
对比JDK Proxy, Javassist 和 Byte Buddy 发现,ByteBuddy的性能最高。 一个简单的性能测试数据如下,而且ByteBuddy对Java Interface的default method处理也比较友好。

ByteBuddyProxyTest.testOperation   thrpt    2  1391028488.716         ops/s
JavassistProxyTest.testOperation   thrpt    2  582515791.244          ops/s
JdkProxyTest.testOperation         thrpt    2  233792969.032          ops/s

所以Java Proxy代理机制调整到ByteBuddy上。

编译错误:npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

[INFO] Added 40 dependencies to 'D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json'
[INFO] Updated npm D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json.
[INFO] Running npm install ...
npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

npm ERR! A complete log of this run can be found in:
npm ERR! E:\Program Files\nodejs\node_cache_logs\2019-12-16T05_49_17_435Z-debug.log
[ERROR] >>> Dependency ERROR. Check that all required dependencies are deployed in npm repositories.

maven 版本 :3.63
nodejs 版本:v12.13.1

是不是我本地网络比较慢,包下载的不完整?

Prometheus Service Discovery支持

目前RSocket Broker已经提供对各个应用的Prometheus的采集支持,主要是通过MetricsService这个RSocket服务完成的。 而且提供了MetricsScrapeController可以输出Prometheus的格式。

实现一个基于RSocket Broker的Prometheus Service Discovery,完成对接入到RSocket Broker的所有应用进行Prometheus的metrics抓取。

https://prometheus.io/blog/2018/07/05/implementing-custom-sd/

https://github.com/prometheus/prometheus/tree/master/discovery

Hessian序列化增加Java 8数据类型支持,同时兼容Dubbo Hessian Lite

Hessian序列化增加Java 8数据类型支持,如Java time类型,Optional支持。

兼容Dubbo Hessian Lite,相关的代码来自Dubbo Hessian Lite,去除Java反射逻辑,RSocket支持Java 8+版本,不需要使用反射来支持Java 8数据类型。

实现逻辑: META-INF/hessian/serializers添加对应的Serializer类。

其他类型支持,如Joda time, 需要自己添加对应的Serializer类。

有个报错的信息, RST-600500: Failed to parse composite metadata

2020-04-28 00:02:11.427 INFO 1 --- [tor-tcp-epoll-4] b.r.b.r.RSocketBrokerHandlerRegistryImpl : RST-500200: Succeed to accept connection from rsocket-user-service
2020-04-28 00:02:11.460 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.CompositeMetadataRSocket : RST-600500: Failed to parse composite metadata

java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203) ~[na:1.8.0_242]
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tag.of(Tag.java:29) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tags.of(Tags.java:254) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.MeterRegistry.counter(MeterRegistry.java:363) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Metrics.counter(Metrics.java:76) ~[micrometer-core-1.3.2.jar:1.3.2]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.metrics(CompositeMetadataRSocket.java:155) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:70) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

2020-04-28 00:02:11.463 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.impl.RSocketListenerImpl : RST-200501: Exception during rsocket call

io.rsocket.exceptions.InvalidException: RST-600500: Failed to parse composite metadata
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:76) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

gRPC到RSocket协议转换gateway

和HTTP REST API到RSocket转换一样,是否要增加gRPC到RSocket协议转换的gateway? 有需求的留言一下,目前内部已经有一个原型版本。

Upgrade to RSocket 1.0.0-RC7

还是有一些变化,主要是UriHandler.java 和 UriTransportRegistry.java 被删除啦,目前RSocket Broker要根据schema做transport层识别,可能需要将这些代码转换到broker内部,RSocket-CLI是添加了这些代码。

如下改变:

  • 保留 UriHandler和UriTransportRegistry
  • RSocketFactory调整到 RSocketServer 和 RSocketConnector

alibaba-rsocket-spring-boot-starter-1.0.0.M1问题反馈

  1. 自动化配置RSocketListenerAutoConfiguration中,当properties中没有配置rsocket.port时,以下表达式会返回true,导致应用启动了监听处理逻辑。建议改为 ${rsocket.port:0}!=0
@Configuration
@ConditionalOnExpression("'${rsocket.port}'!='0'")
public class RSocketListenerAutoConfiguration {
  1. 没有生成spring-autoconfigure-metadata.properties文件导致RSocketAutoConfiguration中@ConditionalOnClass加载了类而报错
    @Bean
    @ConditionalOnClass(PrometheusMeterRegistry.class)
    public MetricsService metricsService(PrometheusMeterRegistry meterRegistry) {
        return new MetricsServicePrometheusImpl(meterRegistry);
    }

文档建议添加如下依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure-processor</artifactId>
            <optional>true</optional>
        </dependency>

参考文档
报错信息如下

java.lang.IllegalStateException: Failed to introspect Class [com.alibaba.spring.boot.rsocket.RSocketAutoConfiguration] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:743) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[na:1.8.0_241]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:681) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:649) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1605) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:494) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:616) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:608) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1242) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at com.example.spring.webflux.SpringWebfluxDemoApplication.main(SpringWebfluxDemoApplication.java:29) [classes/:na]
Caused by: java.lang.NoClassDefFoundError: io/micrometer/prometheus/PrometheusMeterRegistry
	at java.lang.Class.getDeclaredMethods0(Native Method) ~[na:1.8.0_241]
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[na:1.8.0_241]
	at java.lang.Class.getDeclaredMethods(Class.java:1975) ~[na:1.8.0_241]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.micrometer.prometheus.PrometheusMeterRegistry
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_241]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_241]
	... 25 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:1064', transport: 'socket'

Process finished with exit code 1

添加Travis CI支持

使用Travis CI进行持续集成,同时在README.md添加build status图标。

RSocket Broker Banner

目前使用Spring Boot默认的banner,调整为粉色的 Alibaba RSocket Broker

支持Mon<ByteBuffer>避免反复序列化的问题

在不少场景中,我们调用远程服务, 获取资源对象,然后再对象输出。 如下述代码中,我们调用远程服务,获取用户,然后再将用户信息以REST API方式输出。 但是这里有序列化的问题: RPC网络调用有两次序列化和反序列化,然后REST API输出又再进行JSON序列化,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<User> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

能否有一个机制,在服务端就输出对应的数据格式,如JSON,而通讯的过程中都不要涉及多次序列化的问题,而是将服务端输出的bytes直接输出给最终的调用者,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<ByteBuffer> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

Vaadin to Vue or React?

目前RSocket Broker的控制台使用Vaadin开发,主要是方便Java程序员,同时减少各种REST API调用的问题,不知道多少同学对其他JS框架的了解程度。 这里调查一下,是否需要将控制台UI调整到Vue、React等框架? 但是Node环境和基本开发这个都是需要的。

RSocket HTTP Gateway的调整

考虑到开发和部署的便捷性,RSocket Broker Server内置支持RSocket Service的HTTP访问,格式如下,请注意 "/api" 前缀。

POST http://127.0.0.1:9998/api/com.alibaba.user.UserService/findById
Authorization: Bearer jwt_token
Content-Type: application/json

[
  1
]

这里有一个小问题: Broker Console目前是用Vaadin开发,而Vaadin现在还不能支持WebFlux,所以RSocket REST API并不能很好地利用WebFlux的Reactive特性,性能和thread模型还是Spring MVC的。
Vaadin的reactive支持要等待这里: vaadin/spring#565

而alibaba-broker-http-gateway还继续保留,这个是完全Reactive的,同时方便开发者对其他协议进行集成,如gRPC, Dubbo等。 如果大家有gRPC -> RSocket的需求,我们会考虑在http gateway中添加gRPC的集成。

如果大家有更好的建议,欢迎留言。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.