xuxueli / xxl-rpc Goto Github PK
View Code? Open in Web Editor NEWA high performance, distributed RPC framework.(分布式服务框架XXL-RPC)
Home Page: http://www.xuxueli.com/xxl-rpc/
License: Apache License 2.0
A high performance, distributed RPC framework.(分布式服务框架XXL-RPC)
Home Page: http://www.xuxueli.com/xxl-rpc/
License: Apache License 2.0
这里通过 ThreadPoolUtil 启动了一个默认(core=60, max-300, core线程不回收)的线程池来 serve 请求,但是里面几乎没有阻塞场景。基本是反射调用 core 包的业务执行器,然后提交任务到队列就结束了,而业务执行器也持有一个自己专属的线程池。
私以为这里的线程池可以大幅缩减默认线程数(core=1 就足够),提升引入 xxl 执行器工程的整体资源利用率。
迫于没有投入太多时间看这里,我不确定这个 NettyServerHandler/NettyHttpServerHandler 在任何情况下整套执行逻辑都没有阻塞,倘若是这样或可以通过简单改动做到如此,这个线程池完全可以移除。
请问项目中是否应用了设计模式
项目中使用了xxljob,xxljob依赖了xxlrpc(1.4.1版本),发现在特定情况下调度线程池堆积任务,最终拒绝新的调度任务。
查询来原因有2个,第一NettyHttpConnectClient没有链接超时时间(这个问题我查看1.6版本已经加上了,默认写死10秒)。
第二个问题是在com.xxl.rpc.core.remoting.net.common.ConnectClient#getPool这个方法中,使用了synchronized加锁。假设xxljob每秒触发一次同一个任务,调用getPool方法频率为每秒1次,而获取client超时10秒才释放,如此每秒会阻塞9个线程,最终导致线程池任务堆积满。
复现方法:
在xxljob上设置手动执行器指定ip,设置任务每秒触发一次(理论上频率低于10秒一次就会出现来不及释放的情况)。当这个执行器的机器直接从局域网中移除(拔网线),就会出现任务堆积,线程被锁在getPool 94行。
建议优化这块加锁逻辑,可以采用trylock+超时时间来防止线程阻塞在这里。
目前在 spring cloud 环境中使用 xxl-job,想实现从注册中心直接拉地址。
在 docker 里,admin 重启之后 ip 地址会变,所以不能在启动完成之后用 xxl-job-admin 的服务名来拉取 xxl job admin 的 ip 地址。
我看了一下代码想在 LoadBalance 上做扩展,发现不太好自定义。
简化项目,烦请推送谢谢
19:43:35.021 logback [nioEventLoopGroup-4-1] ERROR c.x.r.r.n.i.n.c.NettyHttpClientHandler - >>>>>>>>>>> xxl-rpc netty_http client caught exception io.netty.handler.codec.TooLongFrameException: Response entity too large: DefaultHttpResponse(decodeResult: success, version: HTTP/1.1) HTTP/1.1 200 OK content-type: text/html;charset=UTF-8 content-length: 6818748 connection: keep-alive at io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:283) at io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:87) at io.netty.handler.codec.MessageAggregator.invokeHandleOversizedMessage(MessageAggregator.java:405) at io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:254) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
想做一个日志下载的功能,但是netty表示我的报文太长了。。请问在哪可以调整netty的报文长度?
如题。
是否应该改为existValues.addAll(childPathData.values());
public static LoadBalance match(String name, LoadBalance defaultRouter) {
for (LoadBalance item : LoadBalance.values()) {
if (item.equals(name)) {
return item;
}
}
return defaultRouter;
}
改后
public static LoadBalance match(String name, LoadBalance defaultRouter) {
for (LoadBalance item : LoadBalance.values()) {
if (item.name().equals(name)) {
return item;
}
}
return defaultRouter;
}
跨语言调用,怎么用的
注解方式注册服务,在服务包裹事务的情况下,服务注册失败.
service和client直连不通过注册中心,只能通过@XxlRpcReference(address="127.0.0.1")吗,为什么不能通过invokerFactory.setServiceRegistryClass(LocalRegistry.class)的呢?
如题
因网络不通而频繁创建NioEvenLoopGroup导致内存耗尽,是我在项目中真实遇到的问题。
另外如果连接目标(包括失效的)过多也会导致内存耗尽,该问题在docker/k8s随机网络分配的环境下更明显 。
NioEvenLoopGroup应该使用单例模式,以下是我的解决方案:
定义自动检查的单例:
// 须使用getEventLoopGroup方法获取,勿直接使用
private static EventLoopGroup group = new NioEventLoopGroup();
private static ReentrantLock reentrantLock = new ReentrantLock();
public static EventLoopGroup getEventLoopGroup() {
try {
// 共享NioEventLoopGroup,因为是频繁要用的所以不要关闭group
if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) {
// 如果关闭了则重新创建NioEventLoopGroup
if (group == null || group.isShutdown() || group.isShuttingDown()) {
group = new NioEventLoopGroup();
}
return group;
} else {
throw new IllegalStateException("Get the EventLoopGroup fail");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Get the EventLoopGroup fail");
} finally {
reentrantLock.unlock();
}
}
使用时调用 getEventLoopGroup() 方法。
1.请问下这个是纯个人开发的项目么?
2.和google的grpc的比较呢,更确切的说,这个项目只是初步尝试,还是经过实际线上验证,首次接触rpc,最近一直在研读此代码。
3.什么想要做这一套框架?
In XXL-JOB-ADMIN , we can see a lot of nio group if some error occurs.
For some reasons, we may register some unreachable ip to xxl-job-admin.
In xxl-job-admin, we should invoke XxlRpcReferenceBean
We can see code below
// do invoke
client.asyncSend(finalAddress, xxlRpcRequest);
In this sense , XXL_RPC may produces a NettyHttpConnectionClient
But this way may not work fine
public void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception {
if (!address.toLowerCase().startsWith("http")) {
address = "http://" + address; // IP:PORT, need parse to url
}
this.address = address;
URL url = new URL(address);
this.host = url.getHost();
int port = url.getPort()>-1?url.getPort():80;
this.group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
/*.addLast(new HttpResponseDecoder())
.addLast(new HttpRequestEncoder())*/
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(5*1024*1024))
.addLast(new NettyHttpClientHandler(xxlRpcInvokerFactory, serializer));
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
this.channel = bootstrap.connect(host, port).sync().channel();
this.serializer = serializer;
// valid
if (!isValidate()) {
close();
return;
}
logger.debug(">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}", host, port);
}
I don't think we should use a new NioEventGroup here.
// serialize request
byte[] requestBytes = xxlRpcReferenceBean.getSerializer().serialize(xxlRpcRequest);
// httpclient
HttpClient httpClient = getJettyHttpClient(xxlRpcReferenceBean.getInvokerFactory());
// request
Request request = httpClient.newRequest(reqURL);
request.method(HttpMethod.POST);
request.timeout(xxlRpcReferenceBean.getTimeout() + 500, TimeUnit.MILLISECONDS); // async, not need timeout
request.content(new BytesContentProvider(requestBytes));
// invoke
request.send(new BufferingResponseListener() {
在init时,如若不断发生无法连接,连接拒绝,连接超时等异常时,netty相关组件无法close。造成内存泄漏。在这种错误数过多时,会有oom 以及fd 句柄膨胀进而达到限制,无法调度的风险
xxl-rpc-sample-client和xxl-rpc-sample-server这两个spring boot独立小模块,maven打包之后生成的jar包不能直接在linux终端中运行,原因是依赖的第三方jar不能打进去最终的jar包里面去。所以建议作者修改下这两个模块的pom.xml文件,以xxl-rpc-sample-server为例,在pom.xml里添加如下内容:
<build> <finalName>xxl-rpc-server</fileName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> <mainClass>com.xxl.rpc.sample.server.XxlRpcServerApplication</mainClass> </configuration> <executions> <goals> <goal>repackage</goal> </goals> </executions> </plugin> </plugins> </build>
这样子最终在linux终端输入java -jar xxl-rpc-server.jar就可以直接运行起来了。
期望admin按ip的维度禁用
请问 如何配置RpcProvider为netty呢,看了一圈文档里没有示例代码
http://www.xuxueli.com/xxl-rpc/#/?id=%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8%EF%BC%88springboot%E7%89%88%E6%9C%AC%EF%BC%89
server:
service增加版本号设置,
如:@XxlRpcService(version = "1.1.1"),
注册界面key显示为:com.iflytek.medical.service.DemoApi#1.1.1
client端使用frameless模式创建测试程序:
// init client
DemoApi2 demoService = (DemoApi2) new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, LoadBalance.ROUND,
DemoApi2.class, "1.1.1", 500, "172.16.13.194:7088", null, null, null).getObject();
String rslt = demoService.sayHello("xxxxxx", "你好啊com.iflytek.medical.service.DemoApi2");
执行后异常:
Connected to the target VM, address: '127.0.0.1:55563', transport: 'socket'
Exception in thread "main" com.xxl.rpc.util.XxlRpcException: The serviceKey[com.iflytek.medical.service.DemoApi] not found.
10:08:54.403 logback [main] INFO c.x.r.r.i.r.XxlRpcReferenceBean - >>>>>>>>>>> xxl-rpc, invoke error, address:172.16.13.194:7088, XxlRpcRequestXxlRpcRequest{requestId='4dd8df8f-f3b2-4687-a42c-cd24b3881993', createMillisTime=1571278132458, accessToken='null', className='com.iflytek.medical.service.DemoApi', methodName='sayHello', parameterTypes=[class java.lang.String, class java.lang.String], parameters=[xxxxxx, 你好啊com.iflytek.medical.service.DemoApi2], version='null'}
at com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean$1.invoke(XxlRpcReferenceBean.java:221)
at com.sun.proxy.$Proxy0.sayHello(Unknown Source)
at com.iflytek.medical.imeepclient.test.ClientTest.main(ClientTest.java:19)
看异常信息中,提示version='null',但是实际initconfig时,已经设置了对应版本号信息1.1.1。
看XxlRpcReferenceBean中,构造request参数时,并没有设置version属性值:
// request
XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
xxlRpcRequest.setAccessToken(accessToken);
xxlRpcRequest.setClassName(className);
xxlRpcRequest.setMethodName(methodName);
xxlRpcRequest.setParameterTypes(parameterTypes);
xxlRpcRequest.setParameters(parameters);
==================================
修改测试代码,使用无版本号的service接口,可正常访问。
请问这是bug还是我调用的方式不对呢
18:05:55.885 WARN 6 --- [nioEventLoopGroup-8-1] c.x.r.r.n.i.n.c.NettyHttpClientHandler : >>>>>>>>>>> xxl-rpc netty_http client received close
一致性hash策略时没有使用方法入参作为hash对象
选用XxlRpcAdminRegister注册中心时
服务列表的数据 客户端本地缓存存一份 db存一份 磁盘文件存一份
本地缓存和db我理解,但为什么还要在磁盘存一份
而且需要线程维护一致性
如果: 直接存一份在db里(redis),不但减少维护一致性的代码,而且服务禁用时,请求者也不会有延迟
在使用xxl-job时,executor部署在docker上,容器端口与映射到机器端口不一致,从而导致executor没法注册上外部可访问的地址,比如容器里面rpc开启端口为9999,实际被映射到机器端口31520,外部访问得从31520端口才能访问到容器内部端口为9999的服务。
executor可以读取rpc服务映射的机器端口,但是rpc框架没有支持这种注册方式的接口,代码层面上也没有提供注册功能的扩展接口。
请问一下能帮忙解决一下吗,谢谢。
jetty2M,老哥我把xxljob的版本升了下,jetty换成了netty-http 你又搞个5M,虽然够用了,可架不住哪个不长眼的开发小哥会疯狂的搞日志呢。。。哎
hi:
请问 “长连心跳保活:双向心跳检测周期性发送心跳进行长连保活” 这个问题的修复什么时候发正式版本?
Hello,
The GitHub Security Lab team has found a potential vulnerability in your project. Please create a Security Advisory and invite me in to further disclose and discuss the vulnerability details and potential fix. Alternatively, please add a Security Policy containing a security email address to send the details to, and/or enable Public Vulnerability Reporting so we can submit the details.
If you prefer to contact us by email, please reach out to [email protected] with reference to GHSL-2023-052
.
Thank you,
GitHub Security Lab
channel.pipeline().addLast(new IdleStateHandler(5,5,10, TimeUnit.SECONDS))
IdleStateHandler的读时间和写时间必须要填,不然会导致心跳报错,正在传输数据时关闭通道报错
如题,XxlRpcProviderFactory 的 initConfig 参数建议用接口,不要用枚举,方便扩展其他实现
这里问什么要搞个匿名内部类,和之前的客户端版本不一致呀
Originally posted by @geercode in #35 (comment)
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(5*1024*1024))
.addLast(new NettyHttpClientHandler(xxlRpcInvokerFactory, serializer));
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
Only keepalive used here.
We also need connection timeout and so on
很频繁的一直在打印这个日志,相隔时间差不多10分钟左右一次;
2021-02-01 16:44:09.010 [nioEventLoopGroup-12944-1] [TID: N/A] ERROR c.x.r.r.n.i.n.client.NettyHttpClientHandler - >>>>>>>>>>> xxl-rpc netty_http client caught exception
com.xxl.rpc.util.XxlRpcException: xxl-rpc request data empty.
at com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClientHandler.channelRead0(NettyHttpClientHandler.java:39)
at com.xxl.rpc.remoting.net.impl.netty_http.client.NettyHttpClientHandler.channelRead0(NettyHttpClientHandler.java:19)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
this.channel = bootstrap.connect(host, port).sync().channel();
如果这里面连接不成功,NioEventLoopGroup不会进行释放
ConnectClient
ConnectClient connectClient_new = connectClientImpl.newInstance();
connectClient_new.init(address, xxlRpcReferenceBean.getSerializer(), xxlRpcReferenceBean.getInvokerFactory());
connectClientMap.put(address, connectClient_new);
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.