l42111996 / java-kcp Goto Github PK
View Code? Open in Web Editor NEW基于java的netty实现的可靠udp网络库(kcp算法),包含fec实现,可用于游戏,视频,加速等业务
License: Apache License 2.0
基于java的netty实现的可靠udp网络库(kcp算法),包含fec实现,可用于游戏,视频,加速等业务
License: Apache License 2.0
for example, compare with kcptun, dragonite-java.
Please disable fec before do performance test :)
1.如果是基于jdk8,可以将AtomicInteger 或 AtomicLong 换成LongAdder, 这将会在高并发下有更好的性能。
2.目前看了点源码发现TimerThreadFacotry拼写错了IDEA都检测出来了,可以改一下。
请问在LockStepSynchronizationServer/client中约定的KCP 相关参数配置是最佳实践不?
是否在生产项目中也是如此设置呢?
thanks!
可以清晰地看到KCP的包头字段详情
ByteBuf byteBuf = queue.poll(); 取出为null,发送任务停止。
为什么取出null,因为发送快,放入慢。
C#有DotNetty,要是有一个C#版的就好了
Android集成的时候需要java8支持,需要依赖'com.lmax:disruptor:3.4.2' ,但是打包时会报"MethodHandle.invoke and MethodHandle.invokeExact are only supported starting with Android O (--min-api 26)"错误
2台pc测试,一台运行KcpRttExampleServer
KcpRttExampleServer kcpRttExampleServer = new KcpRttExampleServer();
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setAckNoDelay(true);
channelConfig.setTimeoutMillis(5000);
channelConfig.setUseConvChannel(true);
channelConfig.setCrc32Check(false);
KcpServer kcpServer = new KcpServer();
kcpServer.init(kcpRttExampleServer,channelConfig,20003);
一台运行KcpRttExampleClient
public KcpRttExampleClient() {
data = Unpooled.buffer(220000);
for (int i = 0; i < data.capacity(); i++) {
data.writeByte((byte) i);
}
rtts = new int[30000];
for (int i = 0; i < rtts.length; i++) {
rtts[i] = -1;
}
startTime = System.currentTimeMillis();
scheduleSrv = new ScheduledThreadPoolExecutor(1);
}
public static void main(String[] args) {
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setAckNoDelay(true);
channelConfig.setConv(55);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setCrc32Check(false);
KcpClient kcpClient = new KcpClient();
kcpClient.init(channelConfig);
KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
kcpClient.connect(new InetSocketAddress("192.168.2.180",20003),channelConfig,kcpClientRttExample);
}
@Override
public void onConnected(Ukcp ukcp) {
future = scheduleSrv.scheduleWithFixedDelay(() -> {
ByteBuf byteBuf = rttMsg(++count);
ukcp.write(byteBuf);
byteBuf.release();
if (count >= rtts.length) {
// finish
future.cancel(true);
byteBuf = rttMsg(-1);
ukcp.write(byteBuf);
byteBuf.release();
}
}, 20, 20, TimeUnit.MILLISECONDS);
}
@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
int curCount = byteBuf.readShort();
if (curCount == -1) {
scheduleSrv.schedule(new Runnable() {
@Override
public void run() {
int sum = 0;
for (int rtt : rtts) {
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length));
System.out.println(Snmp.snmp.toString());
ukcp.close();
//ukcp.setTimeoutMillis(System.currentTimeMillis());
System.exit(0);
}
}, 3, TimeUnit.SECONDS);
} else {
int idx = curCount - 1;
long time = byteBuf.readInt();
if (rtts[idx] != -1) {
System.out.println("???");
}
//log.info("rcv count {} {}", curCount, System.currentTimeMillis());
rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
System.out.println("rtt : "+ curCount+" "+ rtts[idx]);
}
}
@Override
public void handleException(Throwable ex, Ukcp kcp)
{
ex.printStackTrace();
}
@Override
public void handleClose(Ukcp kcp) {
scheduleSrv.shutdown();
try {
scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
int sum = 0;
int max = 0;
for (int rtt : rtts) {
if(rtt>max){
max = rtt;
}
sum += rtt;
}
System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
System.out.println(Snmp.snmp.toString());
System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));
}
/**
* count+timestamp+dataLen+data
*
* @param count
* @return
*/
public ByteBuf rttMsg(int count) {
ByteBuf buf = Unpooled.buffer(220000);
buf.writeShort(count);
buf.writeInt((int) (System.currentTimeMillis() - startTime));
//int dataLen = new Random().nextInt(200);
//buf.writeBytes(new byte[dataLen]);
int dataLen = data.readableBytes();
buf.writeShort(dataLen);
buf.writeBytes(data, data.readerIndex(), dataLen);
return buf;
}
KcpRttExampleServer 报错:
bytebuf长度: 1394 读出长度26846
[104] [-34] [56] [0] [0] [0] [-127] [75] [0] [4] [20]。。。。。
Snmp{BytesSent=0, BytesReceived=0, MaxConn=0, ActiveOpens=0, PassiveOpens=0, CurrEstab=0, InErrs=0, InCsumErrors=0, KCPInErrors=0, 收到包=12071, 发送包=15022, InSegs=9058, OutSegs=11557, 收到字节=7091032, 发送字节=14605712, 总共重发数=6269, 快速重发数=0, 空闲快速重发数=0, 超时重发数=6269, 收到重复包数量=1687, fec恢复数=16, fec恢复错误数=1, 收到fecData数=9050, 收到fecParity数=3021, fec缓存冗余淘汰data包数=2085, fec收到重复的数据包=0}
java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 2, widx: 1394, cap: 1394).slice(2, 26846)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.checkSliceOutOfBounds(AbstractUnpooledSlicedByteBuf.java:474)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.<init>(AbstractUnpooledSlicedByteBuf.java:38)
at io.netty.buffer.UnpooledSlicedByteBuf.<init>(UnpooledSlicedByteBuf.java:24)
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1223)
at com.backblaze.erasure.fec.FecDecode.decode(FecDecode.java:211)
at kcp.Ukcp.input(Ukcp.java:135)
at kcp.ReadTask.execute(ReadTask.java:60)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
server :
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setAckNoDelay(true);
channelConfig.setTimeoutMillis(10000);
channelConfig.setUseConvChannel(true);
channelConfig.setCrc32Check(false);
KcpServer kcpServer = new KcpServer();
kcpServer.init(kcpRttExampleServer,channelConfig,20003);
client:
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.nodelay(true,30,2,true);
channelConfig.setSndwnd(1024);
channelConfig.setRcvwnd(1024);
channelConfig.setMtu(1400);
channelConfig.setAckNoDelay(true);
channelConfig.setConv(55);
channelConfig.setFecDataShardCount(10);
channelConfig.setFecParityShardCount(3);
channelConfig.setCrc32Check(false);
//channelConfig.setTimeoutMillis(10000);
//channelConfig.setAckMaskSize(32);
KcpClient kcpClient = new KcpClient();
数据包初始化 data = Unpooled.buffer(220000);
会复现此bug
我测试,如果 server 重启;;;原客户端,发的消息就过不去了。。。有没有什么方案?
我想使用kcp-go作为服务器,java作为客户端,不过没有找到比较易懂的示例,请问可以告诉我哪个文件可以作为参考吗?多谢了。
希望能够有个类似于下面代码的简单示例,非常感谢。
package main
import (
"fmt"
"io"
"net"
"github.com/pkg/errors"
"github.com/xtaci/kcp-go"
"time"
)
func checkerr(err error) {
if err != nil {
panic(err)
}
}
func Svr(laddr string) {
lis, err := kcp.ListenWithOptions(laddr, nil, 10, 3)
checkerr(err)
for {
conn, e := lis.AcceptKCP()
checkerr(e)
fmt.Println(conn.RemoteAddr())
go func(conn net.Conn) {
var buff = make([]byte, 1024, 1024)
for {
n, e := conn.Read(buff)
if e != nil {
if e == io.EOF {
break
}
fmt.Println(errors.Wrap(e, "hello?"))
break
}
fmt.Println("recv from client:", buff[:n])
}
}(conn)
}
}
func main() {
go Svr(":10000")
go Svr(":10001")
clt, err := kcp.DialWithOptions("localhost:10001", nil, 10, 3)
checkerr(err)
clt.Write([]byte("hello!!!!!11111111111111111111111111"))
clt.Write([]byte("hello!!!!!2222222222222222222222222222222"))
for {
time.Sleep(1 * time.Second)
}
}
RT
吞吐有与tcp对比过嘛?
能否改成线程安全的
变量在多线程下容易被改变,无法单独抽出。
//count the number of datashards collected
private int shardCount;
造成java.lang.ArrayIndexOutOfBoundsException: 14 错误
线上运行一段时间后,时间不定,可能出现cpu 100%的问题,目前没排查到哪里触发
以下是某个线程当时的状态stack
"epollEventLoopGroup-5-3" #28 prio=10 os_prio=0 cpu=14498125.28ms elapsed=61602.15s tid=0x00007f88917db800 nid=0x7262 runnable [0x00007f8818dcc000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:357)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
at com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)
at com.ath.io.kcp.base.threadPool.thread.DisruptorSingleExecutor.execute(DisruptorSingleExecutor.java:132)
at com.ath.io.kcp.base.baseKcp.ServerChannelHandler.channelRead(ServerChannelHandler.java:87)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.EpollDatagramChannel.read(EpollDatagramChannel.java:679)
at io.netty.channel.epoll.EpollDatagramChannel.access$100(EpollDatagramChannel.java:58)
at io.netty.channel.epoll.EpollDatagramChannel$EpollDatagramChannelUnsafe.epollInReady(EpollDatagramChannel.java:497)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:834)
if (left > 0) {
shard.writeBytes(zeros, left);
zeros.resetReaderIndex();
}
java.lang.IndexOutOfBoundsException: writerIndex(283) + minWritableBytes(1111) exceeds maxCapacity(283): PooledSlicedByteBuf(ridx: 0, widx: 283, cap: 283/283, unwrapped: PooledUnsafeDirectByteBuf(ridx: 289, widx: 289, cap: 289))
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:296)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:282)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1105)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1098)
如题
@ServerEndpoint("/websocket/TimeTrialWebSocket/{id}")
public class TimeTrialWebSocket {
private static int onlineCount = 0;
private static ConcurrentHashMap<String, TimeTrialWebSocket> webSocketSet = new ConcurrentHashMap<String, TimeTrialWebSocket>();
private Session WebSocketsession;
private String id = "";
private JSONObject sendMsg = new JSONObject();
@OnOpen
public void onOpen(@PathParam(value = "id") String param, Session WebSocketsession, EndpointConfig config) {
System.out.println(param);
id = param;
this.WebSocketsession = WebSocketsession;
webSocketSet.put(param, this);
addOnlineCount();
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
}
@OnClose
public void onClose() {
if (!id.equals("")) {
webSocketSet.remove(id);
subOnlineCount();
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
}
}
@OnMessage
public void onMessage(String message, Session session) {
sendToUser(message);
//sendAll(message) ;
}
public void sendToUser(String message) {
sendMsg = JSONObject.parseObject(message);
String cid = sendMsg.getString("cid");
//String now = getNowTime();
for (String key : webSocketSet.keySet()) {
try {
if (key.split(",")[1].equals(cid) && !key.equals(id)){
webSocketSet.get(key).sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.WebSocketsession.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
TimeTrialWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
TimeTrialWebSocket.onlineCount--;
}
}
KCP能够实现广播的功能吗?我模仿WebSocket试了一下,失败了,感觉KCP并没有提供这种方式。 private Session WebSocketsession;
FecPacket fecPacket = FecPacket.newFecPacket(data);
if (fecPacket.getFlag() == Fec.typeData) {
data.skipBytes(2);
input(data, true,current);
}
if (fecPacket.getFlag() == Fec.typeData || fecPacket.getFlag() == Fec.typeParity) {
List<ByteBuf> byteBufs = fecDecode.decode(fecPacket);
if (byteBufs != null) {
ByteBuf byteBuf;
for (int i = 0; i < byteBufs.size(); i++) {
byteBuf = byteBufs.get(i);
input(byteBuf, false,current);
byteBuf.release();
}
}
当fecPacket.getFlag() == Fec.typeDat 第一次时调用了 input(data, true,current); 后面判断解析后有调用的目的是什么呢,我没有很好的理解到,希望作者解惑一下,感谢
下午又看了一下
data.skipBytes(2);
input(data, true,current);
这里是没有做前向纠错的原信息,直接丢过去解析
后面是做了前向纠错的包让他重新做解析,
但是我不明白为什么不先做前向纠错之后再丢过去呢,或者上面input之后如果是正常解析不做前向纠错这种处理不行么,希望作者解惑一下 谢谢
有连接进来disruptorExecutorPool1/10.0.0.7:55938
java.io.IOException: Conv inconsistency
at kcp.Ukcp.input(Ukcp.java:163)
at kcp.Ukcp.input(Ukcp.java:149)
at kcp.RecieveTask.execute(RecieveTask.java:62)
at threadPool.thread.DistriptorHandler.execute(DistriptorHandler.java:17)
at threadPool.thread.DistriptorEventHandler.onEvent(DistriptorEventHandler.java:10)
at threadPool.thread.DistriptorEventHandler.onEvent(DistriptorEventHandler.java:5)
at com.lmax.disruptor.BatchEventProcessor.processEvents(BatchEventProcessor.java:168)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:125)
at java.lang.Thread.run(Thread.java:748)
是否改成long
另集群多进程通讯下,id由客户端设置,容易重复,改成uuid可好?或者有更好的建议
在一个pc上测试,运行两个KcpRttExampleClient,连接KcpRttExampleServer
ScheduleTask,ReadTask,WriteTask其实都是一个线程在运行,都是串行在操作,这设计是为何?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.