Code Monkey home page Code Monkey logo

Comments (33)

l42111996 avatar l42111996 commented on May 18, 2024

运行两个KcpRttExampleClient 是两个进程吗? 对于单个Ukcp的连接来说 ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作。对于多个Ukcp来说他们都是并行的,这样子业务都是无锁的,这样子单连接串行多连接并行充分利用了多核,对cpu的缓存也友好

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

如果是同一个进程的话,最佳实践是new一个KcpRttExampleClient实例然后多次调用connect不同的server

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

希望读和写能分开线程,如果单线程,这样怕性能上有影响。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

这里的线程只处理kcp层的逻辑不会处理网络io,逻辑通过visual分析不是瓶颈,测试的单线程单连接带fec的逻辑处理预计每秒可以处理6万个包,每个包如果算1K的话也有6W1K字节,这个应该满足99%的业务了,而且并行的话最少可以跑到70%cpu+的性能,整体吞吐量等于 cpu数6W1K70%
如果读写分离必须要加锁,单连接的吞吐可能上去20%,整个系统吞吐量会降低很多。

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

实际测试下来,性能有影响,发送方数据包在20w以上,receivequeue或writequeue数据一多,就造成所有数据处理延迟,发送不及时,最终服务端读取超时。

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

例如用 #15 的例子,接收数据非常缓慢,耗时很长

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

例如用 #15 的例子,接收数据非常缓慢,耗时很长
上面的问题在我本地没跑出来,从上面的日志分析可能是带宽满了或者你局域网路由器达到极限了,局域网达到了50%的丢包率,
可以试试再本地跑几个虚拟机试试

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。

我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

1、我的是pc机,本机运行,用的无线网卡。
2、数据包要大,200000字节。

我的猜测是数据包太大,拆分成了很多分片,每个分片都需要ack来回,造成了时间延长。

我这边用你的例子在本地跑了两个进程,连接的127.0.0.1,看带宽峰值在38MB,来回rtt打印的在10ms以内,所以我感觉可能是网卡问题或者路由器问题,快手那边用这个库做过类似的测试瓶颈也出现在了公司内部路由器。

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

测算下来,耗时在fec编,解码,平均耗时1毫秒,累计起来,延迟增加。

啥配置的cpu?怎么这么耗时,fec的编码可以独立出去搞个线程 ,解码现在还不好搞出去单独搞个线程

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

i7-7700 3.6GHz
内存16g
测试10000个字节以下性能很好。
100000个字节,就不行了。

打印的时间代码 ReedSolomon:

    long start = System.currentTimeMillis();
    // Do the coding.
    LOOP.codeSomeShards(
            parityRows,
            shards, dataShardCount,
            outputs, parityShardCount,
            offset, byteCount);
    long end = System.currentTimeMillis();
    if ((end - start) > 0)
        System.out.println("codeSomeShards use time = " + (end - start));

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

i7-7700 3.6GHz

内存16g

测试10000个字节以下性能很好。

100000个字节,就不行了。

打印的时间代码 ReedSolomon:

    long start = System.currentTimeMillis();

    // Do the coding.

    LOOP.codeSomeShards(

            parityRows,

            shards, dataShardCount,

            outputs, parityShardCount,

            offset, byteCount);

    long end = System.currentTimeMillis();

    if ((end - start) > 0)

        System.out.println("codeSomeShards use time = " + (end - start));

Windows操作系统吗?明天我测试一下试试

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

windows10
jdk1.8

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

windows10
jdk1.8

有完整的测试代码吗?

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

KcpRttExampleServer:

   ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,40,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setAckNoDelay(true);
    channelConfig.setTimeoutMillis(10000);
    channelConfig.setUseConvChannel(true);
    channelConfig.setCrc32Check(false);

KcpRttExampleClient:

public class KcpRttExampleClient implements KcpListener {
private static final Logger log = LoggerFactory.getLogger(KcpRttExampleClient.class);
private final ByteBuf data;
private int[] rtts;
private volatile int count;
private ScheduledExecutorService scheduleSrv;
private ScheduledFuture<?> future = null;
private final long startTime ;

public KcpRttExampleClient() {
    data = Unpooled.buffer(200000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }
    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,10,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setCrc32Check(false);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);
    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);
}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 5, 5, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();
    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(200000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);
    return buf;
}

}

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

系统:windows10 cpu: i7-3770 16G内存跑这个程序分析瓶颈在网络io,带宽跑到了600Mb
image
线程分析
image
kcp所有业务(fec编码解码,kcp组包合包等)占用了60%还没到瓶颈
因为windows下面是select模型性能会差一些 ,如果是linux或者mac下面会好很多

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
fec算法部分是否考虑用c++,java调用dll方式呢?

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

cpu为何如此之高,如果开2个连接,直接占用到90%了。这点无法使用,能否优化?
单个连接只会有2个线程处理,一个处理网络收发,一个处理kcp业务和fec编解码,最多占用两核,2个连接最多占用4核,你那边i7-7700是8线程的应该最多占用50%,得分析一下别的什么业务占用了cpu,比如windows下360会洗流量占用cpu

我的无线网卡是300m的,同样条件下,tcp的情况要好很多,kcp的优势就没体现出来。
tcp有控流,kcp没有控流,如果流量打满了tcp确实会好很多

fec算法部分是否考虑用c++,java调用dll方式呢?
java版本的fec确实跟c++和go版本有点差距,用dll的话需要点时间,这个版本的所有内存都是堆外的不受jvm的gc管制,得把堆外内存拷贝到堆内再传到c++层去处理这样子gc会加大具体性能不好评估。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

试试1.4版本,fec会好一点

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

优化后的fec单核每秒大概400MB 加密+解密,可以满足需求了,有时间再进一步优化成c艹版本

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

刚买个650m的无线网卡,:)
优化效果有,但延时还是越来越严重,cpu单个连接占用在40~50%之间。
rtt : 3733 6415
rtt : 3734 6399
rtt : 3735 6407
rtt : 3736 6400
rtt : 3737 6382
rtt : 3738 6418
rtt : 3739 6411
rtt : 3740 6405
rtt : 3741 6398
rtt : 3742 6393
rtt : 3743 6416
rtt : 3744 6418
rtt : 3745 6419
rtt : 3746 6419
rtt : 3747 6412
rtt : 3748 6421
rtt : 3749 6414
时间会越来越长,看来只适合小数据包通信。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

650Mb的网卡跑满也就81.25MB,上面那个例子轻松跑满网卡,有延迟是正常的,瓶颈还是在这个无线网卡

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

在不丢包的环境,kcp没法跟tcp来比较的,因为窗口是固定的,在丢包环境下kcp会相对好挺多,kcp更多的应用是一定丢包得网络下低延迟要求的场景。

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

过段时间,最近挺忙的,等忙完这阵

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

看看nativeFec分支,jni调用c版本的fec,现在我只生成了mac版本lib,回头我提交c代码可以编译自己平台版本。
java与jni c的fec性能对比:
java :420MB/s
jni c :750MB/s

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

我找到了c++版本的reed-solomon,和目前java版本代码一样,地址:https://github.com/DrPizza/reed-solomon
c++运行通过,但java包装调用dll,c++不熟,没有成功,请有空帮助一下。

https://github.com/l42111996/reedsolomon_jni

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

我一直尝试的用的jna,过年期间试下,非常感谢

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

windwos dylib加载不了,这个是mac用的吧

native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)

from java-kcp.

l42111996 avatar l42111996 commented on May 18, 2024

windwos dylib加载不了,这个是mac用的吧

native\libjni.dylib: Can't load this .dll (machine code=0x7) on a AMD 64-bit platform
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.backblaze.erasure.fecNative.ReedSolomonC.(ReedSolomonC.java:13)
at com.backblaze.erasure.fecNative.ReedSolomonNative.(ReedSolomonNative.java:12)
at com.backblaze.erasure.FecAdapt.(FecAdapt.java:20)
at test.KcpRttExampleServer.main(KcpRttExampleServer.java:27)

https://github.com/l42111996/reedsolomon_jni

这个需要你自己百度编译一下,我没有windows的电脑,放到dylib同级别的目录就行了

from java-kcp.

testwen00 avatar testwen00 commented on May 18, 2024

CLion-2020.3.2.win + mingw,编译通过,生成dll,可以加载

另用上面的测试代码,会报错

java.io.IOException: No enough bytes of head
at kcp.Ukcp.input(Ukcp.java:170)
at kcp.Ukcp.input(Ukcp.java:156)
at kcp.ReadTask.execute(ReadTask.java:58)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

KcpRttExampleClient:

public class KcpRttExampleClient implements KcpListener {

private final ByteBuf data;

private int[] rtts;

private volatile int count;

private ScheduledExecutorService scheduleSrv;

private ScheduledFuture<?> future = null;

private final long startTime ;

public KcpRttExampleClient() {
    data = Unpooled.buffer(200000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }

    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,40,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);

    channelConfig.setFecAdapt(new FecAdapt(10,3));
    channelConfig.setCrc32Check(true);
    //channelConfig.setTimeoutMillis(10000);
    //channelConfig.setAckMaskSize(32);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);

    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("127.0.0.1",20003),channelConfig,kcpClientRttExample);

    //kcpClient.connect(new InetSocketAddress("10.60.100.191",20003),channelConfig,kcpClientRttExample);
}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 20, 20, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();

    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(200000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);

    return buf;
}

}

from java-kcp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.