Comments (33)
运行两个KcpRttExampleClient 是两个进程吗? 对于单个Ukcp的连接来说 ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作。对于多个Ukcp来说他们都是并行的,这样子业务都是无锁的,这样子单连接串行多连接并行充分利用了多核,对cpu的缓存也友好
from java-kcp.
如果是同一个进程的话,最佳实践是new一个KcpRttExampleClient实例然后多次调用connect不同的server
from java-kcp.
希望读和写能分开线程,如果单线程,这样怕性能上有影响。
from java-kcp.
这里的线程只处理kcp层的逻辑不会处理网络io,逻辑通过visual分析不是瓶颈,测试的单线程单连接带fec的逻辑处理预计每秒可以处理6万个包,每个包如果算1K的话也有6W1K字节,这个应该满足99%的业务了,而且并行的话最少可以跑到70%cpu+的性能,整体吞吐量等于 cpu数6W1K70%
如果读写分离必须要加锁,单连接的吞吐可能上去20%,整个系统吞吐量会降低很多。
from java-kcp.
实际测试下来,性能有影响,发送方数据包在20w以上,receivequeue或writequeue数据一多,就造成所有数据处理延迟,发送不及时,最终服务端读取超时。
from java-kcp.
例如用 #15 的例子,接收数据非常缓慢,耗时很长
from java-kcp.
例如用 #15 的例子,接收数据非常缓慢,耗时很长
上面的问题在我本地没跑出来,从上面的日志分析可能是带宽满了或者你局域网路由器达到极限了,局域网达到了50%的丢包率,
可以试试再本地跑几个虚拟机试试
from java-kcp.
1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。
我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。
from java-kcp.
1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。
我这边用你的例子在本地跑了两个进程,连接的127.0.0.1,看带宽峰值在38MB,来回rtt打印的在10ms以内,所以我感觉可能是网卡问题或者路由器问题,快手那边用这个库做过类似的测试瓶颈也出现在了公司内部路由器。
from java-kcp.
测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。
from java-kcp.
测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。
啥配置的cpu?怎么这么耗时,fec的编码可以独立出去搞个线程 ,解码现在还不好搞出去单独搞个线程
from java-kcp.
i7-7700 3.6GHz
内存16g
测试10000个字节以下性能很好。
100000个字节,就不行了。
打印的时间代码 ReedSolomon:
long start = System.currentTimeMillis();
// Do the coding.
LOOP.codeSomeShards(
parityRows,
shards, dataShardCount,
outputs, parityShardCount,
offset, byteCount);
long end = System.currentTimeMillis();
if ((end - start) > 0)
System.out.println("codeSomeShards use time = " + (end - start));
from java-kcp.
i7-7700 3.6GHz
内存16g
测试10000个字节以下性能很好。
100000个字节,就不行了。
打印的时间代码 ReedSolomon:
long start = System.currentTimeMillis(); // Do the coding. LOOP.codeSomeShards( parityRows, shards, dataShardCount, outputs, parityShardCount, offset, byteCount); long end = System.currentTimeMillis(); if ((end - start) > 0) System.out.println("codeSomeShards use time = " + (end - start));
Windows操作系统吗?明天我测试一下试试
from java-kcp.
windows10
jdk1.8
from java-kcp.
windows10
jdk1.8
有完整的测试代码吗?
from java-kcp.
KcpRttExampleServer:
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,40,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setAckNoDelay(true);
channelConfig.setTimeoutMillis(10000);
channelConfig.setUseConvChannel(true);
channelConfig.setCrc32Check(false);
KcpRttExampleClient:
public class KcpRttExampleClient implements KcpListener {
private static final Logger log = LoggerFactory.getLogger(KcpRttExampleClient.class);
private final ByteBuf data;
private int[] rtts;
private volatile int count;
private ScheduledExecutorService scheduleSrv;
private ScheduledFuture<?> future = null;
private final long startTime ;
public KcpRttExampleClient() {
data = Unpooled.buffer(200000);
for (int i = 0; i < data.capacity(); i++) {
data.writeByte((byte) i);
}
rtts = new int[30000];
for (int i = 0; i < rtts.length; i++) {
rtts[i] = -1;
}
startTime = System.currentTimeMillis();
scheduleSrv = new ScheduledThreadPoolExecutor(1);
}
public static void main(String[] args) {
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,10,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setAckNoDelay(true);
channelConfig.setConv(55);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setCrc32Check(false);
KcpClient kcpClient = new KcpClient();
kcpClient.init(channelConfig);
KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);
}
@Override
public void onConnected(Ukcp ukcp) {
future = scheduleSrv.scheduleWithFixedDelay(() -> {
ByteBuf byteBuf = rttMsg(++count);
ukcp.write(byteBuf);
byteBuf.release();
if (count >= rtts.length) {
// finish
future.cancel(true);
byteBuf = rttMsg(-1);
ukcp.write(byteBuf);
byteBuf.release();
}
}, 5, 5, TimeUnit.MILLISECONDS);
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
int curCount = byteBuf.readShort();
if (curCount == -1) {
scheduleSrv.schedule(new Runnable() {
@Override
public void run() {
int sum = 0;
for (int rtt : rtts) {
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length));
System.out.println(Snmp.snmp.toString());
ukcp.close();
//ukcp.setTimeoutMillis(System.currentTimeMillis());
System.exit(0);
}
}, 3, TimeUnit.SECONDS);
} else {
int idx = curCount - 1;
long time = byteBuf.readInt();
if (rtts[idx] != -1) {
System.out.println("???");
}
//log.info("rcv count {} {}", curCount, System.currentTimeMillis());
rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
System.out.println("rtt : "+ curCount+" "+ rtts[idx]);
}
}
@Override
public void handleException(Throwable ex, Ukcp kcp)
{
ex.printStackTrace();
}
@Override
public void handleClose(Ukcp kcp) {
scheduleSrv.shutdown();
try {
scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
int sum = 0;
int max = 0;
for (int rtt : rtts) {
if(rtt>max){
max = rtt;
}
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
System.out.println(Snmp.snmp.toString());
System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));
}
/**
* count+timestamp+dataLen+data
*
* @param count
* @return
*/
public ByteBuf rttMsg(int count) {
ByteBuf buf = Unpooled.buffer(200000);
buf.writeShort(count);
buf.writeInt((int) (System.currentTimeMillis() - startTime));
//int dataLen = new Random().nextInt(200);
//buf.writeBytes(new byte[dataLen]);
int dataLen = data.readableBytes();
buf.writeShort(dataLen);
buf.writeBytes(data, data.readerIndex(), dataLen);
return buf;
}
}
from java-kcp.
系统:windows10 cpu: i7-3770 16G内存跑这个程序分析瓶颈在网络io,带宽跑到了600Mb
线程分析
kcp所有业务(fec编码解码,kcp组包合包等)占用了60%还没到瓶颈
因为windows下面是select模型性能会差一些 ,如果是linux或者mac下面会好很多
from java-kcp.
cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
fec算法部分是否考虑用c++,java调用dll方式呢?
from java-kcp.
cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
单个连接只会有2个线程处理,一个处理网络收发,一个处理kcp业务和fec编解码,最多占用两核,2个连接最多占用4核,你那边i7-7700是8线程的应该最多占用50%,得分析一下别的什么业务占用了cpu,比如windows下360会洗流量占用cpu
我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
tcp有控流,kcp没有控流,如果流量打满了tcp确实会好很多
fec算法部分是否考虑用c++,java调用dll方式呢?
java版本的fec确实跟c++和go版本有点差距,用dll的话需要点时间,这个版本的所有内存都是堆外的不受jvm的gc管制,得把堆外内存拷贝到堆内再传到c++层去处理这样子gc会加大具体性能不好评估。
from java-kcp.
试试1.4版本,fec会好一点
from java-kcp.
优化后的fec单核每秒大概400MB 加密+解密,可以满足需求了,有时间再进一步优化成c艹版本
from java-kcp.
刚买个650m的无线网卡,:)
优化效果有,但延时还是越来越严重,cpu单个连接占用在40~50%之间。
rtt : 3733 6415
rtt : 3734 6399
rtt : 3735 6407
rtt : 3736 6400
rtt : 3737 6382
rtt : 3738 6418
rtt : 3739 6411
rtt : 3740 6405
rtt : 3741 6398
rtt : 3742 6393
rtt : 3743 6416
rtt : 3744 6418
rtt : 3745 6419
rtt : 3746 6419
rtt : 3747 6412
rtt : 3748 6421
rtt : 3749 6414
时间会越来越长,看来只适合小数据包通信。
from java-kcp.
650Mb的网卡跑满也就81.25MB,上面那个例子轻松跑满网卡,有延迟是正常的,瓶颈还是在这个无线网卡
from java-kcp.
在不丢包的环境,kcp没法跟tcp来比较的,因为窗口是固定的,在丢包环境下kcp会相对好挺多,kcp更多的应用是一定丢包得网络下低延迟要求的场景。
from java-kcp.
我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。
from java-kcp.
我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。
过段时间,最近挺忙的,等忙完这阵
from java-kcp.
看看nativeFec分支,jni调用c版本的fec,现在我只生成了mac版本lib,回头我提交c代码可以编译自己平台版本。
java与jni c的fec性能对比:
java :420MB/s
jni c :750MB/s
from java-kcp.
我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。
https://github.com/l42111996/reedsolomon_jni
from java-kcp.
我一直尝试的用的jna,过年期间试下,非常感谢
from java-kcp.
windwos dylib加载不了,这个是mac用的吧
native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)
from java-kcp.
windwos dylib加载不了,这个是mac用的吧
native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)
https://github.com/l42111996/reedsolomon_jni
这个需要你自己百度编译一下,我没有windows的电脑,放到dylib同级别的目录就行了
from java-kcp.
CLion-2020.3.2.win + mingw,编译通过,生成dll,可以加载
另用上面的测试代码,会报错
java.io.IOException: No enough bytes of head
at kcp.Ukcp.input(Ukcp.java:170)
at kcp.Ukcp.input(Ukcp.java:156)
at kcp.ReadTask.execute(ReadTask.java:58)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
KcpRttExampleClient:
public class KcpRttExampleClient implements KcpListener {
private final ByteBuf data;
private int[] rtts;
private volatile int count;
private ScheduledExecutorService scheduleSrv;
private ScheduledFuture<?> future = null;
private final long startTime ;
public KcpRttExampleClient() {
data = Unpooled.buffer(200000);
for (int i = 0; i < data.capacity(); i++) {
data.writeByte((byte) i);
}
rtts = new int[30000];
for (int i = 0; i < rtts.length; i++) {
rtts[i] = -1;
}
startTime = System.currentTimeMillis();
scheduleSrv = new ScheduledThreadPoolExecutor(1);
}
public static void main(String[] args) {
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,40,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setAckNoDelay(true);
channelConfig.setConv(55);
channelConfig.setFecAdapt(new FecAdapt(10,3));
channelConfig.setCrc32Check(true);
//channelConfig.setTimeoutMillis(10000);
//channelConfig.setAckMaskSize(32);
KcpClient kcpClient = new KcpClient();
kcpClient.init(channelConfig);
KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);
//kcpClient.connect(new InetSocketAddress("10.60.100.191",20003),channelConfig,kcpClientRttExample);
}
@Override
public void onConnected(Ukcp ukcp) {
future = scheduleSrv.scheduleWithFixedDelay(() -> {
ByteBuf byteBuf = rttMsg(++count);
ukcp.write(byteBuf);
byteBuf.release();
if (count >= rtts.length) {
// finish
future.cancel(true);
byteBuf = rttMsg(-1);
ukcp.write(byteBuf);
byteBuf.release();
}
}, 20, 20, TimeUnit.MILLISECONDS);
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
int curCount = byteBuf.readShort();
if (curCount == -1) {
scheduleSrv.schedule(new Runnable() {
@Override
public void run() {
int sum = 0;
for (int rtt : rtts) {
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length));
System.out.println(Snmp.snmp.toString());
ukcp.close();
//ukcp.setTimeoutMillis(System.currentTimeMillis());
System.exit(0);
}
}, 3, TimeUnit.SECONDS);
} else {
int idx = curCount - 1;
long time = byteBuf.readInt();
if (rtts[idx] != -1) {
System.out.println("???");
}
//log.info("rcv count {} {}", curCount, System.currentTimeMillis());
rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
System.out.println("rtt : "+ curCount+" "+ rtts[idx]);
}
}
@Override
public void handleException(Throwable ex, Ukcp kcp)
{
ex.printStackTrace();
}
@Override
public void handleClose(Ukcp kcp) {
scheduleSrv.shutdown();
try {
scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
int sum = 0;
int max = 0;
for (int rtt : rtts) {
if(rtt>max){
max = rtt;
}
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
System.out.println(Snmp.snmp.toString());
System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));
}
/**
* count+timestamp+dataLen+data
*
* @param count
* @return
*/
public ByteBuf rttMsg(int count) {
ByteBuf buf = Unpooled.buffer(200000);
buf.writeShort(count);
buf.writeInt((int) (System.currentTimeMillis() - startTime));
//int dataLen = new Random().nextInt(200);
//buf.writeBytes(new byte[dataLen]);
int dataLen = data.readableBytes();
buf.writeShort(dataLen);
buf.writeBytes(data, data.readerIndex(), dataLen);
return buf;
}
}
from java-kcp.
Related Issues (20)
- 发送队列会停止,WriteTask HOT 1
- fecDecode 出现错误 java.lang.IndexOutOfBoundsException HOT 3
- recieveList is full HOT 4
- FecEncode 多线程安全问题 HOT 3
- java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf
- 线上出现cpu100%的问题 HOT 3
- 1.5版本的maven包和代码示例不同 HOT 2
- 断线重连问题 HOT 6
- 实现广播 HOT 6
- Fec支持一下linux? HOT 1
- 请问支持在不自定义源码的情况下,绑定地址和处理自定义的握手包吗? HOT 3
- 新加的conv支持 在user创建时 存在bug HOT 5
- 可以支持udp和tcp切换吗?如类似:https://github.com/hjcenry/ktucp-netty HOT 1
- 关于Ukcp中input 方法里面采用fecDecode时两次input(data, true,current)的问题 HOT 2
- 有没有针对Wireshark解析KCP包的lua插件? HOT 1
- 不用像TCP一样用分隔符或者长度进行协议解包吗?
- 连接上来后报错 Conv inconsistency是什么原因呢 HOT 1
- Android集成的问题 HOT 2
- 多连接下,int conv是否够用? HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from java-kcp.