Code Monkey home page Code Monkey logo

java-kcp's People

Contributors

l42111996 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

java-kcp's Issues

一点建议与问题

1.如果是基于jdk8,可以将AtomicInteger 或 AtomicLong 换成LongAdder, 这将会在高并发下有更好的性能。
2.目前看了点源码发现TimerThreadFacotry拼写错了IDEA都检测出来了,可以改一下。

请问在LockStepSynchronizationServer/client中约定的KCP 相关参数配置是最佳实践不?
是否在生产项目中也是如此设置呢?
thanks!

Android集成的问题

Android集成的时候需要java8支持,需要依赖'com.lmax:disruptor:3.4.2' ,但是打包时会报"MethodHandle.invoke and MethodHandle.invokeExact are only supported starting with Android O (--min-api 26)"错误

java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf

2台pc测试,一台运行KcpRttExampleServer

    KcpRttExampleServer kcpRttExampleServer = new KcpRttExampleServer();
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setAckNoDelay(true);
    channelConfig.setTimeoutMillis(5000);
    channelConfig.setUseConvChannel(true);
    channelConfig.setCrc32Check(false);
    KcpServer kcpServer = new KcpServer();
    kcpServer.init(kcpRttExampleServer,channelConfig,20003);

一台运行KcpRttExampleClient

   public KcpRttExampleClient() {
    data = Unpooled.buffer(220000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }

    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setCrc32Check(false);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);

    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("192.168.2.180",20003),channelConfig,kcpClientRttExample);

}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 20, 20, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();

    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(220000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);

    return buf;
}

KcpRttExampleServer 报错:

bytebuf长度: 1394 读出长度26846
[104] [-34] [56] [0] [0] [0] [-127] [75] [0] [4] [20]。。。。。
Snmp{BytesSent=0, BytesReceived=0, MaxConn=0, ActiveOpens=0, PassiveOpens=0, CurrEstab=0, InErrs=0, InCsumErrors=0, KCPInErrors=0, 收到包=12071, 发送包=15022, InSegs=9058, OutSegs=11557, 收到字节=7091032, 发送字节=14605712, 总共重发数=6269, 快速重发数=0, 空闲快速重发数=0, 超时重发数=6269, 收到重复包数量=1687, fec恢复数=16, fec恢复错误数=1, 收到fecData数=9050, 收到fecParity数=3021, fec缓存冗余淘汰data包数=2085, fec收到重复的数据包=0}

java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 2, widx: 1394, cap: 1394).slice(2, 26846)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.checkSliceOutOfBounds(AbstractUnpooledSlicedByteBuf.java:474)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.<init>(AbstractUnpooledSlicedByteBuf.java:38)
at io.netty.buffer.UnpooledSlicedByteBuf.<init>(UnpooledSlicedByteBuf.java:24)
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1223)
at com.backblaze.erasure.fec.FecDecode.decode(FecDecode.java:211)
at kcp.Ukcp.input(Ukcp.java:135)
at kcp.ReadTask.execute(ReadTask.java:60)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

recieveList is full

server :

    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setAckNoDelay(true);
    channelConfig.setTimeoutMillis(10000);
    channelConfig.setUseConvChannel(true);
    channelConfig.setCrc32Check(false);
    KcpServer kcpServer = new KcpServer();
    kcpServer.init(kcpRttExampleServer,channelConfig,20003);

client:

    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);

    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setCrc32Check(false);
    //channelConfig.setTimeoutMillis(10000);
    //channelConfig.setAckMaskSize(32);
    KcpClient kcpClient = new KcpClient();

数据包初始化 data = Unpooled.buffer(220000);
会复现此bug

请问这个可以和kcp-go互相通信吗

我想使用kcp-go作为服务器,java作为客户端,不过没有找到比较易懂的示例,请问可以告诉我哪个文件可以作为参考吗?多谢了。
希望能够有个类似于下面代码的简单示例,非常感谢。

package main

import (
	"fmt"
	"io"
	"net"
	"github.com/pkg/errors"
	"github.com/xtaci/kcp-go"
	"time"
)

func checkerr(err error) {
	if err != nil {
		panic(err)
	}
}

func Svr(laddr string) {
	lis, err := kcp.ListenWithOptions(laddr, nil, 10, 3)
	checkerr(err)
	for {
		conn, e := lis.AcceptKCP()
		checkerr(e)
		fmt.Println(conn.RemoteAddr())
		go func(conn net.Conn) {
			var buff = make([]byte, 1024, 1024)
			for {
				n, e := conn.Read(buff)
				if e != nil {
					if e == io.EOF {
						break
					}
					fmt.Println(errors.Wrap(e, "hello?"))
					break
				}
				fmt.Println("recv from client:", buff[:n])
			}
		}(conn)
	}
}

func main() {
	go Svr(":10000")
	go Svr(":10001")
	clt, err := kcp.DialWithOptions("localhost:10001", nil, 10, 3)
	checkerr(err)
	clt.Write([]byte("hello!!!!!11111111111111111111111111"))
	clt.Write([]byte("hello!!!!!2222222222222222222222222222222"))
	for {
		time.Sleep(1 * time.Second)
	}
}

FecEncode 多线程安全问题

能否改成线程安全的

变量在多线程下容易被改变,无法单独抽出。
//count the number of datashards collected
private int shardCount;

造成java.lang.ArrayIndexOutOfBoundsException: 14 错误

断线重连问题

测试java client端重启模拟断线重连的情况,使用Conv标识用户,断开到重启的时间没有超过timeout时间,client端重启后server业务层收不到数据,看服务端代码,重启后客户端发送的sn从0重新开始,但小于之前的rcvNxt所以直接跳过,请问这个如何解决,您提到的wifi切4g不会存在这个问题吗,代码如下:

线上出现cpu100%的问题

线上运行一段时间后,时间不定,可能出现cpu 100%的问题,目前没排查到哪里触发
以下是某个线程当时的状态stack
"epollEventLoopGroup-5-3" #28 prio=10 os_prio=0 cpu=14498125.28ms elapsed=61602.15s tid=0x00007f88917db800 nid=0x7262 runnable [0x00007f8818dcc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:357)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
at com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)
at com.ath.io.kcp.base.threadPool.thread.DisruptorSingleExecutor.execute(DisruptorSingleExecutor.java:132)
at com.ath.io.kcp.base.baseKcp.ServerChannelHandler.channelRead(ServerChannelHandler.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.EpollDatagramChannel.read(EpollDatagramChannel.java:679)
at io.netty.channel.epoll.EpollDatagramChannel.access$100(EpollDatagramChannel.java:58)
at io.netty.channel.epoll.EpollDatagramChannel$EpollDatagramChannelUnsafe.epollInReady(EpollDatagramChannel.java:497)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:834)

fecDecode 出现错误 java.lang.IndexOutOfBoundsException

if (left > 0) {
shard.writeBytes(zeros, left);
zeros.resetReaderIndex();
}

java.lang.IndexOutOfBoundsException: writerIndex(283) + minWritableBytes(1111) exceeds maxCapacity(283): PooledSlicedByteBuf(ridx: 0, widx: 283, cap: 283/283, unwrapped: PooledUnsafeDirectByteBuf(ridx: 289, widx: 289, cap: 289))
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:296)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:282)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1105)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1098)

实现广播

@ServerEndpoint("/websocket/TimeTrialWebSocket/{id}")
public class TimeTrialWebSocket {
    private static int onlineCount = 0;
    private static ConcurrentHashMap<String, TimeTrialWebSocket> webSocketSet = new ConcurrentHashMap<String, TimeTrialWebSocket>();
    private Session WebSocketsession;
    private String id = "";
    private JSONObject sendMsg = new JSONObject(); 

    @OnOpen
    public void onOpen(@PathParam(value = "id") String param, Session WebSocketsession, EndpointConfig config) {
        System.out.println(param);
        id = param;
        this.WebSocketsession = WebSocketsession;
        webSocketSet.put(param, this);
        addOnlineCount();           
        System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
    }
 
    @OnClose
    public void onClose() {
        if (!id.equals("")) {
            webSocketSet.remove(id);  
            subOnlineCount();         
            System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
        }
    }
 
    @OnMessage
    public void onMessage(String message, Session session) {
        sendToUser(message);
        //sendAll(message) ;
    }

    public void sendToUser(String message) {
    	sendMsg = JSONObject.parseObject(message);
    	String cid = sendMsg.getString("cid");
    	
        //String now = getNowTime();
    	for (String key : webSocketSet.keySet()) {
          try {
        	  if (key.split(",")[1].equals(cid) && !key.equals(id)){
        		  webSocketSet.get(key).sendMessage(message);
        	  }              
          } catch (IOException e) {
              e.printStackTrace();
          }
       }     
    }

    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }

    public void sendMessage(String message) throws IOException {
        this.WebSocketsession.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
    	TimeTrialWebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
    	TimeTrialWebSocket.onlineCount--;
    }
}

KCP能够实现广播的功能吗?我模仿WebSocket试了一下,失败了,感觉KCP并没有提供这种方式。 private Session WebSocketsession;

关于Ukcp中input 方法里面采用fecDecode时两次input(data, true,current)的问题

            FecPacket fecPacket = FecPacket.newFecPacket(data);
            if (fecPacket.getFlag() == Fec.typeData) {
                data.skipBytes(2);
                input(data, true,current);
            }
            if (fecPacket.getFlag() == Fec.typeData || fecPacket.getFlag() == Fec.typeParity) {
                List<ByteBuf> byteBufs = fecDecode.decode(fecPacket);
                if (byteBufs != null) {
                    ByteBuf byteBuf;
                    for (int i = 0; i < byteBufs.size(); i++) {
                        byteBuf = byteBufs.get(i);
                        input(byteBuf, false,current);
                        byteBuf.release();
                    }
                }

当fecPacket.getFlag() == Fec.typeDat 第一次时调用了 input(data, true,current); 后面判断解析后有调用的目的是什么呢,我没有很好的理解到,希望作者解惑一下,感谢

下午又看了一下

                data.skipBytes(2);
                input(data, true,current);

这里是没有做前向纠错的原信息,直接丢过去解析
后面是做了前向纠错的包让他重新做解析,
但是我不明白为什么不先做前向纠错之后再丢过去呢,或者上面input之后如果是正常解析不做前向纠错这种处理不行么,希望作者解惑一下 谢谢

连接上来后报错 Conv inconsistency是什么原因呢

有连接进来disruptorExecutorPool1/10.0.0.7:55938
java.io.IOException: Conv inconsistency
at kcp.Ukcp.input(Ukcp.java:163)
at kcp.Ukcp.input(Ukcp.java:149)
at kcp.RecieveTask.execute(RecieveTask.java:62)
at threadPool.thread.DistriptorHandler.execute(DistriptorHandler.java:17)
at threadPool.thread.DistriptorEventHandler.onEvent(DistriptorEventHandler.java:10)
at threadPool.thread.DistriptorEventHandler.onEvent(DistriptorEventHandler.java:5)
at com.lmax.disruptor.BatchEventProcessor.processEvents(BatchEventProcessor.java:168)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:125)
at java.lang.Thread.run(Thread.java:748)

iMessageExecutorPool 关于线程模型

在一个pc上测试,运行两个KcpRttExampleClient,连接KcpRttExampleServer
ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作,这设计是为何?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.