Code Monkey home page Code Monkey logo

Comments (220)

lesismal avatar lesismal commented on May 18, 2024 1

async read/write都是可选的,对时序有要求可以采用同步模式. 它们都通过了autobahn的测试,相比同步模式多了一些not strict.
事实上,我们跑benchmark才会出现单连接高并发的情况,实际业务锁竞争不会特别激烈.

这两条的前提都是基于运气好。业务量再小,只要时间足够,也会遇到问题——墨菲定律了解一下。

如果基础设施的代码基于运气,谁敢用啊兄弟,:joy:

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

字节的主要是自己的简单链表+pool略有优化,用的for循环,你的是数组+递归,本质上你俩这个是相同的。

ants写的太复杂,我没深入读过它代码,简单扫了下应该是用的条件变量,类似c/cpp那套。因为写的太复杂,而且我看issue里有人遇到一些莫名其妙的bug,而且较高版本go里我benchmark了下它更慢,所以就更没打算看它源码了。

我那个协程池为了减少一个协程池只有单个数组或者链表时锁操作的竞争浪费,保留了一个常驻协程用chan做队列,当前并发度没有达到协程数量限制时则只要一个原子操作,比无竞争锁需要加锁解锁省掉一些原子操作,比竞争时try lock部分自旋节省更多,以一个常驻协程代价换更均衡的。对于单个conn做了上面说的那个时序保证、但这是conn自己的部分、并不是协程池本身的功能。我这里单独实现了一个有序化的,相当于把conn上那个拆出来了,可以配合任何协程池使用:
https://github.com/lesismal/serial

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

回到问题本身,不管是否优化协程占用,应该严格保证发送顺序、而不是基于运气默认认为不会出现不一致。

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

一个常驻协程无所谓, 连接上面的就可观了

我说的一个常驻协程是一个协程池只有一个常驻,一个nbio Engine上的所有conn共用同一个或几个这种协程池,并不是像gws这种一个conn上一个workerpool,所以不存在协程数量可观的问题。
一个conn上只是一个数组作为执行队列、不绑定协程池,而是这个conn解析出一个完整message时,把任务加入到这个conn的执行队列:
https://github.com/lesismal/nbio/blob/master/conn.go#L103
如果这个message的任务就是队首,则用协程池去异步执行并且循环取任务直到执行完所有,这里的c.p.g.Execute是协程池的pool.Go:
https://github.com/lesismal/nbio/blob/master/conn.go#L115
这样就能保证并发入队列的时候单个conn多个任务也只会启动并临时占用一个协程,执行完当前任务列表就退出,并且保证了顺序加入队列的任务的有序执行

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

这里的85-88行取出所有后解锁了,下面循环发送的过程中,别的地方可能又WriteAsync并触发了新的job异步执行:
https://github.com/lxzan/gws/blob/master/writer.go#L85

这里取所有如果只加锁、等所有发送完再解锁,那其他地方WriteAsync又可能阻塞,所以这里取所有message不管加不加锁都不是正解。我前面说不要使用RWMutex、保证不了原子性也是类似的原因。
要不你先看下我那个conn.Execute或者serial再改:joy:

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

可以试试去给官方pr下解决pipe的这个问题

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

review了下新的release,异步写这块应该还是不对:

首先,writePublic执行实际写入,是有可能阻塞的,比如tcp窗口拥塞

通常是入chan这块怕阻塞需要select default:
https://github.com/lxzan/gws/blob/master/writer.go#L87
writePublic阻塞后,正在异步写的协程阻塞了,其他地方调用WriteAsync多次导致chan的size默认8也满了,那继续WriteAsync这里入chan就会阻塞,WriteAsync就阻塞了

而从chan里取出来执行写的协程,通常是不应该在外部范围加锁了:
https://github.com/lxzan/gws/blob/master/writer.go#L93
一是循环发送多个消息,这个锁粒度偏大,二是同样的writePublic可能阻塞了,这里锁的时间就更久了,其他地方WriteMessage就也会卡住

还有就是前面提到过的,你的AddJob是可能导致同时有多个协程存在,你这里循环发送外部加了锁,保证了时序,但是第一个循环发送的协程把所有消息都取出来发完了,其他几个协程进入循环后就是一次空跑浪费

我理解你这里这样设计是为了尽量不占用常驻协程,但用chan的方式好像是很难解决或者不是最优的

from gws.

lxzan avatar lxzan commented on May 18, 2024 1

哎,我放弃了

不好复现的场景解决起来很困难,工作日也没那么多时间,只能有空了再研究下

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

问题,得测一下混合同步

锁粒度是一个问题
卡住是一个问题,本地环境用tcp比较难模拟出来,或者你继续魔改下net.Pipe,让它的writePublic卡住比如1s,看看广播时候其他连接的情况,或者测试里直接加代码打印下相关的时间就行

from gws.

lesismal avatar lesismal commented on May 18, 2024 1

用net.Pipe复现了下,与我肉眼分析的现象一致,你看后面Output的 broadcast 9 和测试代码中的注释,然后自己跑跑试下差不多就清楚了

由于net.Pipe用于完整逻辑gws一些地方有错误,我临时注释掉了两个地方、不影响测试io问题:

  1. serveWebSocket 中的:
// c.SetDeadline(time.Time{})
// c.SetReadDeadline(time.Time{})
// c.SetWriteDeadline(time.Time{})
// c.setNoDelay(c.conn)
  1. readMessage中的:
// if !c.fh.GetMask() {
//	return internal.CloseProtocolError
// }

测试代码如下:

package gws

import (
	"bufio"
	_ "embed"
	"fmt"
	"net"
	"testing"
	"time"
)

func TestWriteAsyncBlocking(t *testing.T) {
	var handler = new(webSocketMocker)
	var upgrader = NewUpgrader(
		WithEventHandler(handler),
		WithResponseHeader(nil),
		WithMaxContentLength(0),
		WithCheckOrigin(func(r *Request) bool {
			return true
		}),
	)

	allConns := map[*Conn]struct{}{}
	for i := 0; i < 3; i++ {
		svrConn, cliConn := net.Pipe() // no reading from another side
		var sbrw = bufio.NewReadWriter(bufio.NewReader(svrConn), bufio.NewWriter(svrConn))
		var svrSocket = serveWebSocket(upgrader, &Request{}, svrConn, sbrw, handler, false)
		go svrSocket.Listen()
		var cbrw = bufio.NewReadWriter(bufio.NewReader(cliConn), bufio.NewWriter(svrConn))
		var cliSocket = serveWebSocket(upgrader, &Request{}, cliConn, cbrw, handler, false)
		if i == 0 { // client 0 1s后再开始读取;1s内不读取消息,则svrSocket 0在发送chan取出一个msg进行writePublic时即开始阻塞
			time.AfterFunc(time.Second, func() {
				cliSocket.Listen()
			})
		} else {
			go cliSocket.Listen()
		}
		allConns[svrSocket] = struct{}{}
	}

	// 第一个msg被异步协程从chan取出了,取出后阻塞在writePublic、没有后续的取出,再入defaultAsyncIOGoLimit个msg到chan里,
	// 则defaultAsyncIOGoLimit+2个消息会导致入chan阻塞。
	// 1s后client 0开始读取,广播才会继续,这一轮对应的时间约为1s
	for i := 0; i <= defaultAsyncIOGoLimit+2; i++ {
		t := time.Now()
		for wsConn := range allConns {
			wsConn.WriteAsync(OpcodeBinary, []byte{0})
		}
		fmt.Printf("broadcast %d, used: %v\n", i, time.Since(t).Seconds())
	}

	time.Sleep(time.Second*2)
}

Output:

roadcast 0, used: 0
broadcast 1, used: 0
broadcast 2, used: 0.0005029
broadcast 3, used: 0
broadcast 4, used: 0
broadcast 5, used: 0
broadcast 6, used: 0
broadcast 7, used: 0
broadcast 8, used: 0
broadcast 9, used: 1.0001524
broadcast 10, used: 0

from gws.

lesismal avatar lesismal commented on May 18, 2024

基于标准库阻塞io的websocket,melody虽然也有些浪费的地方,但它在gorilla基础上的封装方式是做得比较不错的,可以参考下:
https://github.com/olahol/melody

from gws.

lxzan avatar lxzan commented on May 18, 2024

async read/write都是可选的,对时序有要求可以采用同步模式. 它们都通过了autobahn的测试,相比同步模式多了一些not strict.

from gws.

lxzan avatar lxzan commented on May 18, 2024

事实上,我们跑benchmark才会出现单连接高并发的情况,实际业务锁竞争不会特别激烈.

from gws.

lxzan avatar lxzan commented on May 18, 2024

基于标准库阻塞io的websocket,melody虽然也有些浪费的地方,但它在gorilla基础上的封装方式是做得比较不错的,可以参考下: https://github.com/olahol/melody

下午去看看借鉴下

from gws.

lesismal avatar lesismal commented on May 18, 2024

时序的问题,如果你想节省协程也可以做到,消息入队列如果是队首则启动go后for循环去发送队列里的数据,这样也能做到没有待发送的数据时不用占用协程,类似我这里:
https://github.com/lesismal/nbio/blob/master/conn.go#L103

你现在的workerqueue size设置为1、do的时候改造下判断是否队首应该就可以。目前没有判断队首直接改size为1应该是不行的,并发AddJob会乱

from gws.

lxzan avatar lxzan commented on May 18, 2024

async read/write都是可选的,对时序有要求可以采用同步模式. 它们都通过了autobahn的测试,相比同步模式多了一些not strict.
事实上,我们跑benchmark才会出现单连接高并发的情况,实际业务锁竞争不会特别激烈.

这两条的前提都是基于运气好。业务量再小,只要时间足够,也会遇到问题——墨菲定律了解一下。

如果基础设施的代码基于运气,谁敢用啊兄弟,😂

并不是运气. 对于线上环境, 单连接内出现高并发, 十有八九是DDOS. 而且, 我这套基于任务队列的IO模型是有改进空间的. channel并发写之所以高效, 是因为并行写入内存(加锁解锁时间很短), 串行写入net.Conn.

from gws.

lxzan avatar lxzan commented on May 18, 2024

时序的问题,如果你想节省协程也可以做到,消息入队列如果是队首则启动go后for循环去发送队列里的数据,这样也能做到没有待发送的数据时不用占用协程,类似我这里: https://github.com/lesismal/nbio/blob/master/conn.go#L103

你现在的workerqueue size设置为1、do的时候改造下判断是否队首应该就可以。目前没有判断队首直接改size为1应该是不行的,并发AddJob会乱

被你发现了, 我就是不想增加常驻协程.

from gws.

lesismal avatar lesismal commented on May 18, 2024

1.14后,go的调度是抢占式的了,即使纯cpu消耗的代码也可能被中途打断的。所以这并不是你的并发量有多大的问题,只要发送队列大于等于2,基于墨菲定律,就肯定会发生。
这还不是运气式啥!?清醒一点

from gws.

lxzan avatar lxzan commented on May 18, 2024

1.14后,go的调度是抢占式的了,即使纯cpu消耗的代码也可能被中途打断的。所以这并不是你的并发量有多大的问题,只要发送队列大于等于2,基于墨菲定律,就肯定会发生。 这还不是运气式啥!?清醒一点

就墨菲定律来说, 你无法100%保证所有用户的QoS, 就像机械效率无法达到100%.

from gws.

lesismal avatar lesismal commented on May 18, 2024

被你发现了, 我就是不想增加常驻协程.

nbio里使用的默认协程池没任务的时候就是退出的不占用资源的,但是基于自然均衡的需求,留了一个带size chan当队列的常驻协程,一个协程成本无所谓。

from gws.

lesismal avatar lesismal commented on May 18, 2024

就墨菲定律来说, 你无法100%保证所有用户的QoS, 就像机械效率无法达到100%.

计算机科学的分层**很有道理。就可靠性来讲,我觉得:

  1. 每一层应该尽量保障自己这一层的可用性;
  2. 从硬件到软件协议栈,每一层都有校验和、可靠协议校验失败丢弃重发之类的机制,都是尽量确保自己这一层的稳定性可用性;
  3. 如果每一层都觉得其他层无法100%所以自己也基于运气,那整个系统的可靠性就更低了

而且这个问题,完全可以做到更好的可靠性代码+小于等于当前方案的消耗+更低的理论消耗上限

from gws.

lesismal avatar lesismal commented on May 18, 2024

这个workerqueue设计得挺好,但更适合用于通用协程池的场景。单个conn这种有时序要求的,再特化一下会更好

from gws.

lesismal avatar lesismal commented on May 18, 2024

字节的那个workerpool本质也是类似的,细节差异罢了

from gws.

lxzan avatar lxzan commented on May 18, 2024

这个workerqueue设计得挺好,但更适合用于通用协程池的场景。单个conn这种有时序要求的,再特化一下会更好

再加一个有锁队列(RWMutex),可以达到channel的效果

from gws.

lxzan avatar lxzan commented on May 18, 2024

这个workerqueue设计得挺好,但更适合用于通用协程池的场景。单个conn这种有时序要求的,再特化一下会更好

从我的concurrency库复制过来的. 看过字节和ants两个库,都没看明白原理😂

from gws.

lesismal avatar lesismal commented on May 18, 2024

再加一个有锁队列(RWMutex),可以达到channel的效果

应该是不需要用RWMutex的,这种队列+线程/协程池,通常是需要push/pop的原子性保障,读锁并发时没法保证push/pop的原子性,所以可能会带来bug。
如果只使用Lock/Unlock,则Mutex性能好于RWMutex,看下源码就懂了。lockSlow那个内联的注释让人心情非常愉悦:
https://github.com/golang/go/blob/master/src/sync/mutex.go#L89

from gws.

lxzan avatar lxzan commented on May 18, 2024

字节的主要是自己的简单链表+pool略有优化,用的for循环,你的是数组+递归,本质上你俩这个是相同的。

ants写的太复杂,我没深入读过它代码,简单扫了下应该是用的条件变量,类似c/cpp那套。因为写的太复杂,而且我看issue里有人遇到一些莫名其妙的bug,而且较高版本go里我benchmark了下它更慢,所以就更没打算看它源码了。

我那个协程池为了减少一个协程池只有单个数组或者链表时锁操作的竞争浪费,保留了一个常驻协程用chan做队列,当前并发度没有达到协程数量限制时则只要一个原子操作,比无竞争锁需要加锁解锁省掉一些原子操作,比竞争时try lock部分自旋节省更多,以一个常驻协程代价换更均衡的。对于单个conn做了上面说的那个时序保证、但这是conn自己的部分、并不是协程池本身的功能。我这里单独实现了一个有序化的,相当于把conn上那个拆出来了,可以配合任何协程池使用: https://github.com/lesismal/serial

一个常驻协程无所谓, 连接上面的就可观了

from gws.

lesismal avatar lesismal commented on May 18, 2024

对于传统的c/cpp那些框架,我这个时序的方案也是适用的,这样至少可以让多逻辑线程被均衡利用起来。不只是单个conn时序保证,还可以根据模块,每个模块继承/集成一个这种队列+线程池,就能做到逻辑多线程+各种时序保证了

from gws.

lxzan avatar lxzan commented on May 18, 2024

回到问题本身,不管是否优化协程占用,应该严格保证发送顺序、而不是基于运气默认认为不会出现不一致。

时序问题在v1.3.1已经解决了. 肝了一天, 累死了, 异步IO真是让人头大.

from gws.

lesismal avatar lesismal commented on May 18, 2024

时序问题在v1.3.1已经解决了. 肝了一天, 累死了, 异步IO真是让人头大.

这。。。我简单review了下,应该是没解决的。我展开了说,你看一下是不是这个道理:

1.3.1是每次WriteAsync都先入队列,然后AddJob,每个job执行时是doWriteAsync,但是doWriteAsync是每次先取出所有再循环发送。
比如连续WriteAsync M 个消息,入队列与实际执行写的job并不是一一对应的、不是入队一个写一个的。 而是很可能M个消息分散成了小于M份比如M/2份,被M/2个job分别拿到了去发送,而你一共AddJob了M次,所以还会有M/2个job被异步执行时实际是空跑、浪费了。而且,由于可能同时存在M/2个job在执行,仍然没有保障时序。

今天头大了可以先停一停,休息下散散步喝喝茶,等思路清晰了再改,否则一下子卡住可能越改越乱,我以前也是,好几次肝到凌晨5点甚至早上8点,整个人状态都不好了。。。

from gws.

lxzan avatar lxzan commented on May 18, 2024

时序问题在v1.3.1已经解决了. 肝了一天, 累死了, 异步IO真是让人头大.

这。。。我简单review了下,应该是没解决的。我展开了说,你看一下是不是这个道理:

1.3.1是每次WriteAsync都先入队列,然后AddJob,每个job执行时是doWriteAsync,但是doWriteAsync是每次先取出所有再循环发送。 比如连续WriteAsync M 个消息,入队列与实际执行写的job并不是一一对应的、不是入队一个写一个的。 而是很可能M个消息分散成了小于M份比如M/2份,被M/2个job分别拿到了去发送,而你一共AddJob了M次,所以还会有M/2个job被异步执行时实际是空跑、浪费了。而且,由于可能同时存在M/2个job在执行,仍然没有保障时序。

今天头大了可以先停一停,休息下散散步喝喝茶,等思路清晰了再改,否则一下子卡住可能越改越乱,我以前也是,好几次肝到凌晨5点甚至早上8点,整个人状态都不好了。。。

写入顺序就是入队顺序, 已经跑通了Autobahn测试, 没有再出现额外的几个Non-Strict了. 暂未发现异常, 跑Benchmark性能更强了 😂.

from gws.

lesismal avatar lesismal commented on May 18, 2024

还有一点,nbio的阻塞io的websocket conn的写,是按配置项决定是否开启单独协程负责写的。用户如果不需要广播,不开启就能省点协程。但是如果需要广播、用户配置了异步写,我是用单独的常驻协程处理异步写的,没有做成这种动态协程,因为动态协程性能要差一些,跨协程变量逃逸、调度亲和性都要差。
而且既然用户都使用了阻塞或者混合模式来处理conn而且配置了异步写,应该也不差这点协程数、而是追求性能多一些,并且如果是用混合模式,肯定也是配置了阻塞io的连接数量的,这个数量不会太大,多出这个数量的协程也没压力,其他超过阈值的部分的conn交给poller部分去节省,完全能cover住

from gws.

lesismal avatar lesismal commented on May 18, 2024

写入顺序就是入队顺序, 已经跑通了Autobahn测试, 没有再出现额外的几个Non-Strict了. 暂未发现异常, 跑Benchmark性能更强了

一些问题压测是测不出来的,autobahn也跑通也不代表就没有问题啊。。。
你先看我分析的代码逻辑,是不是确实存在这种可能性

from gws.

lxzan avatar lxzan commented on May 18, 2024

这里的85-88行取出所有后解锁了,下面循环发送的过程中,别的地方可能又WriteAsync并触发了新的job异步执行: https://github.com/lxzan/gws/blob/master/writer.go#L85

这里取所有如果只加锁、等所有发送完再解锁,那其他地方WriteAsync又可能阻塞,所以这里取所有message不管加不加锁都不是正解。我前面说不要使用RWMutex、保证不了原子性也是类似的原因。 要不你先看下我那个conn.Execute或者serial再改😂

改了再看吧, 今天改休息了

from gws.

lesismal avatar lesismal commented on May 18, 2024

嗯,先休息下吧,状态好了再看可能就清晰了。并发这块,没那么简单的,需要更多想象力去推演代码实际执行的情况

from gws.

lxzan avatar lxzan commented on May 18, 2024

写入顺序就是入队顺序, 已经跑通了Autobahn测试, 没有再出现额外的几个Non-Strict了. 暂未发现异常, 跑Benchmark性能更强了

一些问题压测是测不出来的,autobahn也跑通也不代表就没有问题啊。。。 你先看我分析的代码逻辑,是不是确实存在这种可能性

可以写单元测试测一下, 调用WriterAsync写入整数序列, Wait后检查IsSorted

from gws.

lesismal avatar lesismal commented on May 18, 2024

你要不说,我还没细看Wait(),这个Wait()。。。
我感觉你越走越远了。。。:joy:

from gws.

lesismal avatar lesismal commented on May 18, 2024

因为调度的先后是不可控的,所以如果代码本身存在时序错乱的可能性,那么简单测试正常也并不代表解决了问题,尤其是本地测试环境过于稳定、无法跑出临界情况很正常。即使概率再低,生产环境也是会出现的。

from gws.

lxzan avatar lxzan commented on May 18, 2024

因为调度的先后是不可控的,所以如果代码本身存在时序错乱的可能性,那么简单测试正常也并不代表解决了问题,尤其是本地测试环境过于稳定、无法跑出临界情况很正常。即使概率再低,生产环境也是会出现的。

有单元测试肯定好过没测试, 可以模拟各种情况,加随机延迟.

from gws.

lesismal avatar lesismal commented on May 18, 2024

有些临界情况,本地测试是很难触发的,甚至没办法模拟出来,但生产环境几乎肯定会出现。
所以,单元测试是相当于必需品,得有,但它解决不了一切,还是需要更多工作。

from gws.

lesismal avatar lesismal commented on May 18, 2024

我之前给melody的pr,就是读代码时肉眼看到的,一看就有问题,但它的测试以及那么多人用于生产那么久都没有人碰到或或者指出来。那个问题不复杂,你这里的并发时序性比那个还要复杂些,需要想象力去推演,因为很难通过测试模拟触发,因为本地测试环境比较稳定,即使是压测,多数时候也是按最顺畅的分支去执行的,所以触发不了,但问题可能确实存在

from gws.

lxzan avatar lxzan commented on May 18, 2024

我之前给melody的pr,就是读代码时肉眼看到的,一看就有问题,但它的测试以及那么多人用于生产那么久都没有人碰到或或者指出来。那个问题不复杂,你这里的并发时序性比那个还要复杂些,需要想象力去推演,因为很难通过测试模拟触发,因为本地测试环境比较稳定,即使是压测,多数时候也是按最顺畅的分支去执行的,所以触发不了,但问题可能确实存在

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

from gws.

lesismal avatar lesismal commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

from gws.

lxzan avatar lxzan commented on May 18, 2024

简单看了下,每个conn上自带一个size为8的workerQueue,但是这样似乎有几个问题:

  1. doWrite中是有锁的,所以即使多个协程同时写,也是在锁这里排队,而且还加剧了锁的竞争自旋、可能cpu消耗更高了,所以这里的多协程异步写目测并不能提高性能
  2. 高并发时协程数量、资源消耗更多
  3. 这个workerQueue并发执行时,好像并不能保证发送队列中的数据按照WriteAsync时入队列的顺序。
    而单个conn的消息,很多时候是需要时序保证的。比如流媒体推流需要广播不能允许被单个conn卡其他conn,所以就得WriteAsync,但是推送的数据顺序错乱了音视频就乱码了丢帧了。比如游戏一些玩家操作顺序广播出去,本来是ABC,用户收到变成了CAB。。。

兄弟我建议这块就按常规简单的方式来,单个协程+limited size chan+select default/timeout循环发送就可以了,继续这样下去好像越走越远了。。。

这样子吗?
image

from gws.

lxzan avatar lxzan commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

跑一下TestRead, 必现; 有点担心net.TcpConn和tls.Conn是不是也有这个问题; 得到妥善解决的话就可以去掉Wait了, https://github.com/lxzan/gws/blob/debug/reader_test.go

from gws.

lxzan avatar lxzan commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

可有可能只有net.Pipe()创建的连接有这个问题, 在一个协程内读写会造成死锁

from gws.

lxzan avatar lxzan commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

应该是pipe本身的问题, 某一端在read函数里面调用write会造成死锁

net.Conn和tls.Conn没这毛病

from gws.

lesismal avatar lesismal commented on May 18, 2024

简单看了下,每个conn上自带一个size为8的workerQueue,但是这样似乎有几个问题:

  1. doWrite中是有锁的,所以即使多个协程同时写,也是在锁这里排队,而且还加剧了锁的竞争自旋、可能cpu消耗更高了,所以这里的多协程异步写目测并不能提高性能
  2. 高并发时协程数量、资源消耗更多
  3. 这个workerQueue并发执行时,好像并不能保证发送队列中的数据按照WriteAsync时入队列的顺序。
    而单个conn的消息,很多时候是需要时序保证的。比如流媒体推流需要广播不能允许被单个conn卡其他conn,所以就得WriteAsync,但是推送的数据顺序错乱了音视频就乱码了丢帧了。比如游戏一些玩家操作顺序广播出去,本来是ABC,用户收到变成了CAB。。。

兄弟我建议这块就按常规简单的方式来,单个协程+limited size chan+select default/timeout循环发送就可以了,继续这样下去好像越走越远了。。。

这样子吗? image

不是完整代码,没法细分析。但单从这个锁+for循环chan就感觉不对,锁粒度太大了容易卡

from gws.

lesismal avatar lesismal commented on May 18, 2024

net.Pipe好像是写的一方要等读的一方读取对应字节数之后才会返回、否则一直阻塞,测试代码我好没细看,你看下是不是这个原因

from gws.

lesismal avatar lesismal commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

可有可能只有net.Pipe()创建的连接有这个问题, 在一个协程内读写会造成死锁

如果是同一个协程内先写后读,那肯定是阻塞了。没读取完成或者close之前写会一直阻塞不返回。你看下源码:
https://github.com/golang/go/blob/master/src/net/pipe.go#L195
这里就是先写入buf到chan,从另一个chan阻塞读取对方传入的已读字节数,如果对方没读完,循环继续写,同一个协程内先写后读肯定是要卡这里的

养成习惯,多看标准库源码

from gws.

lxzan avatar lxzan commented on May 18, 2024

net.Pipe好像是写的一方要等读的一方读取对应字节数之后才会返回、否则一直阻塞,测试代码我好没细看,你看下是不是这个原因

把pipe.go复制过来魔改了下,解决单元测试死锁了

from gws.

lxzan avatar lxzan commented on May 18, 2024

我用net.Pipe写单元测试时碰到了一个很奇怪的问题,conn.Write之后直接没有往下执行了,导致后面的OnError没有被解发

可以把完整示例发一份,我有空了也看一下

可有可能只有net.Pipe()创建的连接有这个问题, 在一个协程内读写会造成死锁

如果是同一个协程内先写后读,那肯定是阻塞了。没读取完成或者close之前写会一直阻塞不返回。你看下源码:
https://github.com/golang/go/blob/master/src/net/pipe.go#L195
这里就是先写入buf到chan,从另一个chan阻塞读取对方传入的已读字节数,如果对方没读完,循环继续写,同一个协程内先写后读肯定是要卡这里的

养成习惯,多看标准库源码

断点追踪到pipe.go里面了

from gws.

lxzan avatar lxzan commented on May 18, 2024

review了下新的release,异步写这块应该还是不对:

首先,writePublic执行实际写入,是有可能阻塞的,比如tcp窗口拥塞

通常是入chan这块怕阻塞需要select default:
https://github.com/lxzan/gws/blob/master/writer.go#L87
writePublic阻塞后,正在异步写的协程阻塞了,其他地方调用WriteAsync多次导致chan的size默认8也满了,那继续WriteAsync这里入chan就会阻塞,WriteAsync就阻塞了

而从chan里取出来执行写的协程,通常是不应该在外部范围加锁了:
https://github.com/lxzan/gws/blob/master/writer.go#L93
一是循环发送多个消息,这个锁粒度偏大,二是同样的writePublic可能阻塞了,这里锁的时间就更久了,其他地方WriteMessage就也会卡住

还有就是前面提到过的,你的AddJob是可能导致同时有多个协程存在,你这里循环发送外部加了锁,保证了时序,但是第一个循环发送的协程把所有消息都取出来发完了,其他几个协程进入循环后就是一次空跑浪费

我理解你这里这样设计是为了尽量不占用常驻协程,但用chan的方式好像是很难解决或者不是最优的

现在的任务队列读写分离了,addJob不会导致并行多个协程,因为写队列并行度是1. 业务中应该尽量使用WriteMessage而不是WriteAsync,广播场景才迫切需要非阻塞.

from gws.

lesismal avatar lesismal commented on May 18, 2024

哦看到了:

writeTaskQ:      newWorkerQueue(1),

那多个写的问题不存在。但是我上面说的其他的问题还是存在

from gws.

lesismal avatar lesismal commented on May 18, 2024

我看你把这个关掉了,别着急以为解决了,并发没你们以为的那么简单。
确认解决、理解了以后,可以把这个issue delete掉。

from gws.

lxzan avatar lxzan commented on May 18, 2024

压测表明同步写和异步写性能差不多

from gws.

lesismal avatar lesismal commented on May 18, 2024

兄弟,我跟你讲了好几天了,你还没搞明白啊!
这不是压测性能好不好的问题,也不是压测没碰到bug就可以不搞的问题!这是你的代码这样设计,生产上存在时序错乱导致业务bug的问题!而且这个bug别人很难定位分析!

from gws.

lesismal avatar lesismal commented on May 18, 2024

每一层,要确保自己这一层的正确性稳定性,压测,我昨天压测一些正常频率范围的,gws的数据并不比其他框架高,在我环境下,一些压测参数下,gws cpu跑不上来,比如cpu 300% 20w qps,但是别的框架能跑到cpu 400%和更高的qps,数据只是举例子

测试也不能只围绕一个非正常频率的压测参数来做定论,性能、稳定性都要全面考量的,做工程要讲究严谨,如果只是这种态度对待基础框架的稳定性,我只能把gws跟gobwas/ws归为一类了

from gws.

lxzan avatar lxzan commented on May 18, 2024

WriteAsync串行写入时序怎么会乱呢. writePublic那里明天再研究下,我还没跑过pprof分析

from gws.

lesismal avatar lesismal commented on May 18, 2024

很多并发的复杂问题,本地压测根本就测不出来,但是生产上几乎是必现的。工程还是管理上讲究几个9的标准,大厂主力业务的靠谱团队的review、产品测试验收逻辑,都是非常严格的。还有比如今天v站发的那个cloudflare的案例,人家每秒 4000w 请求,这么大量下,人家文章里也说了:

面对每秒高达 4000 万的请求总量,即使流经 Cloudflare 网络的全部请求中有 0.001% 发生问题,代表的也是冲击数百万用户的大事件

我工作了十几年,虽然遇到过的大事故不多,但偶尔遇到,都会感叹墨菲定律,真的是得越严谨越好

from gws.

lesismal avatar lesismal commented on May 18, 2024

WriteAsync串行写入时序怎么会乱呢. writePublic那里明天再研究下,我还没跑过pprof分析

你发新版本之前是存在乱的问题。新版本刚才你说了并行度是1、我也看到代码确实是1了,所以应该是不会乱了。
但是卡住的问题依然没有解决。
我今晚第一次留言的那一楼里说的是几种情况下导致的卡的问题,一开始没看到你把write的workerQueue改成1的并行度了所以以为会有同时多个协程异步写、前面写完所有后面的协程空跑浪费的情况,但是是1了就不至于空跑浪费,我今天也没说会乱,而是卡住的问题

我估计这个问题,跑pprof也分析也是难度很大,本地测试甚至可能不会触发

from gws.

lxzan avatar lxzan commented on May 18, 2024

WriteAsync串行写入时序怎么会乱呢. writePublic那里明天再研究下,我还没跑过pprof分析

你发新版本之前是存在乱的问题。新版本刚才你说了并行度是1、我也看到代码确实是1了,所以应该是不会乱了。
但是卡住的问题依然没有解决。
我今晚第一次留言的那一楼里说的是几种情况下导致的卡的问题,一开始没看到你把write的workerQueue改成1的并行度了所以以为会有同时多个协程异步写、前面写完所有后面的协程空跑浪费的情况,但是是1了就不至于空跑浪费,我今天也没说会乱,而是卡住的问题

我估计这个问题,跑pprof也分析也是难度很大,本地测试甚至可能不会触发

这种细节问题,后面再研究下优化方式吧

from gws.

lxzan avatar lxzan commented on May 18, 2024

每一层,要确保自己这一层的正确性稳定性,压测,我昨天压测一些正常频率范围的,gws的数据并不比其他框架高,在我环境下,一些压测参数下,gws cpu跑不上来,比如cpu 300% 20w qps,但是别的框架能跑到cpu 400%和更高的qps,数据只是举例子

测试也不能只围绕一个非正常频率的压测参数来做定论,性能、稳定性都要全面考量的,做工程要讲究严谨,如果只是这种态度对待基础框架的稳定性,我只能把gws跟gobwas/ws归为一类了

低频写入我也测过,iops一样,gws延迟分布和百分位好看一点. 高频写跑满过万兆带宽,600% CPU.

from gws.

lesismal avatar lesismal commented on May 18, 2024

再说一句吧,之前只看了写入这块,刚才你提到读的并发度是8,所以也看了下,你这里派发给用户的message又是不保证时序的了。。。
不只是你写数据给client需要时序,你收到数据传递给用户,也存在类似的需求场景。。。

from gws.

lesismal avatar lesismal commented on May 18, 2024

保守点 defaultAsyncIOGoLimit 改成 1 默认保证时序可能好些

from gws.

lxzan avatar lxzan commented on May 18, 2024

保守点 defaultAsyncIOGoLimit 改成 1 默认保证时序可能好些

是否并行处理请求是有开关的

from gws.

lesismal avatar lesismal commented on May 18, 2024

好吧。。确实,肝这些伤身体得很,多运动

from gws.

lxzan avatar lxzan commented on May 18, 2024

锁粒度的问题,得测一下混合同步写和异步写

from gws.

lxzan avatar lxzan commented on May 18, 2024

用net.Pipe复现了下,与我肉眼分析的现象一致,你看后面Output的 broadcast 9 和测试代码中的注释,然后自己跑跑试下差不多就清楚了

由于net.Pipe用于完整逻辑gws一些地方有错误,我临时注释掉了两个地方、不影响测试io问题:

  1. serveWebSocket 中的:
// c.SetDeadline(time.Time{})
// c.SetReadDeadline(time.Time{})
// c.SetWriteDeadline(time.Time{})
// c.setNoDelay(c.conn)
  1. readMessage中的:
// if !c.fh.GetMask() {
//	return internal.CloseProtocolError
// }

测试代码如下:

package gws

import (
	"bufio"
	_ "embed"
	"fmt"
	"net"
	"testing"
	"time"
)

func TestWriteAsyncBlocking(t *testing.T) {
	var handler = new(webSocketMocker)
	var upgrader = NewUpgrader(
		WithEventHandler(handler),
		WithResponseHeader(nil),
		WithMaxContentLength(0),
		WithCheckOrigin(func(r *Request) bool {
			return true
		}),
	)

	allConns := map[*Conn]struct{}{}
	for i := 0; i < 3; i++ {
		svrConn, cliConn := net.Pipe() // no reading from another side
		var sbrw = bufio.NewReadWriter(bufio.NewReader(svrConn), bufio.NewWriter(svrConn))
		var svrSocket = serveWebSocket(upgrader, &Request{}, svrConn, sbrw, handler, false)
		go svrSocket.Listen()
		var cbrw = bufio.NewReadWriter(bufio.NewReader(cliConn), bufio.NewWriter(svrConn))
		var cliSocket = serveWebSocket(upgrader, &Request{}, cliConn, cbrw, handler, false)
		if i == 0 { // client 0 1s后再开始读取;1s内不读取消息,则svrSocket 0在发送chan取出一个msg进行writePublic时即开始阻塞
			time.AfterFunc(time.Second, func() {
				cliSocket.Listen()
			})
		} else {
			go cliSocket.Listen()
		}
		allConns[svrSocket] = struct{}{}
	}

	// 第一个msg被异步协程从chan取出了,取出后阻塞在writePublic、没有后续的取出,再入defaultAsyncIOGoLimit个msg到chan里,
	// 则defaultAsyncIOGoLimit+2个消息会导致入chan阻塞。
	// 1s后client 0开始读取,广播才会继续,这一轮对应的时间约为1s
	for i := 0; i <= defaultAsyncIOGoLimit+2; i++ {
		t := time.Now()
		for wsConn := range allConns {
			wsConn.WriteAsync(OpcodeBinary, []byte{0})
		}
		fmt.Printf("broadcast %d, used: %v\n", i, time.Since(t).Seconds())
	}

	time.Sleep(time.Second*2)
}

Output:

roadcast 0, used: 0
broadcast 1, used: 0
broadcast 2, used: 0.0005029
broadcast 3, used: 0
broadcast 4, used: 0
broadcast 5, used: 0
broadcast 6, used: 0
broadcast 7, used: 0
broadcast 8, used: 0
broadcast 9, used: 1.0001524
broadcast 10, used: 0

感谢

from gws.

lxzan avatar lxzan commented on May 18, 2024

channel缓冲总有被塞满的时候,还是用有锁队列好,不能因为一个连接影响到整体.

from gws.

lxzan avatar lxzan commented on May 18, 2024

dev分支初步解决了, 后面再细细优化

=== RUN TestWriteAsyncBlocking
broadcast 0, used: 8.125µs
broadcast 1, used: 708ns
broadcast 2, used: 500ns
broadcast 3, used: 291ns
broadcast 4, used: 1.208µs
broadcast 5, used: 2.166µs
broadcast 6, used: 208ns
broadcast 7, used: 209ns
broadcast 8, used: 208ns
broadcast 9, used: 167ns
broadcast 10, used: 167ns
--- PASS: TestWriteAsyncBlocking (2.00s)
PASS

from gws.

lesismal avatar lesismal commented on May 18, 2024

之前没有仔细看workerQueue,刚才又看了下,好像是有问题的,没法保证并发度边界。
早上起床气还没散尽,我不确定自己脑袋是否清醒,你看下对不对:

  1. 这里,比如当前有一个job,前面已经启动了第一个协程,这个协程 job.Do() 并且 decrease() 后,curConcurrency 为0
    https://github.com/lxzan/gws/blob/master/aio.go#L63
  2. 在第一个协程 getJob 之前,刚好其他地方又 AddJob 并且由于 curConcurrency 为0,所以第二个协程也启动了

而且即使只有一个协程可能也有问题,因为 decrease() 是每次执行完一个job就decrease()一下,即使并发度只有1、多次AddJob没有被上面的临界情况导致多个协程,但是单个协程执行完多个job后也是increase了很多次、curConcurrency可能为负数,并发度控制无效、时序也可能错乱

from gws.

lesismal avatar lesismal commented on May 18, 2024

之前没有仔细看workerQueue,刚才又看了下,好像是有问题的,没法保证并发度边界。 早上起床气还没散尽,我不确定自己脑袋是否清醒,你看下对不对:

  1. 这里,比如当前有一个job,前面已经启动了第一个协程,这个协程 job.Do() 并且 decrease() 后,curConcurrency 为0
    https://github.com/lxzan/gws/blob/master/aio.go#L63
  2. 在第一个协程 getJob 之前,刚好其他地方又 AddJob 并且由于 curConcurrency 为0,所以第二个协程也启动了

而且即使只有一个协程可能也有问题,因为 decrease() 是每次执行完一个job就decrease()一下,即使并发度只有1、多次AddJob没有被上面的临界情况导致多个协程,但是单个协程执行完多个job后也是increase了很多次、curConcurrency可能为负数,并发度控制无效、时序也可能错乱

哦,好像没问题,是getJob成功后才++

from gws.

lxzan avatar lxzan commented on May 18, 2024

之前没有仔细看workerQueue,刚才又看了下,好像是有问题的,没法保证并发度边界。 早上起床气还没散尽,我不确定自己脑袋是否清醒,你看下对不对:

  1. 这里,比如当前有一个job,前面已经启动了第一个协程,这个协程 job.Do() 并且 decrease() 后,curConcurrency 为0
    https://github.com/lxzan/gws/blob/master/aio.go#L63
  2. 在第一个协程 getJob 之前,刚好其他地方又 AddJob 并且由于 curConcurrency 为0,所以第二个协程也启动了

而且即使只有一个协程可能也有问题,因为 decrease() 是每次执行完一个job就decrease()一下,即使并发度只有1、多次AddJob没有被上面的临界情况导致多个协程,但是单个协程执行完多个job后也是increase了很多次、curConcurrency可能为负数,并发度控制无效、时序也可能错乱

应该没问题, decrease和addJob是互斥的, 单元测试都通过了(加了随机延迟)

from gws.

lxzan avatar lxzan commented on May 18, 2024

初版https://github.com/lxzan/concurrency处理并发边界有点问题, 我写单元测试的时候发现并解决了

from gws.

lxzan avatar lxzan commented on May 18, 2024

decrease和getJob可以结合起来,减少加锁解锁几十纳秒

from gws.

lxzan avatar lxzan commented on May 18, 2024

@lesismal issue里面还有未解决的问题吗?

from gws.

lxzan avatar lxzan commented on May 18, 2024

简单看了下melody, 感觉性能不会很好, 大量使用channel, 而且map没有分段, 牺牲性能换取便利.

from gws.

lesismal avatar lesismal commented on May 18, 2024

@lesismal issue里面还有未解决的问题吗?

今天比较忙没继续看,估计应该没问题了吧,先关掉了

简单看了下melody, 感觉性能不会很好, 大量使用channel, 而且map没有分段, 牺牲性能换取便利.

melody 接口设计的不错,而且它比较早,那时候整个社区直接使用gorilla的很多都用错了,melody问题不大。而且早期可选的ws库没几个,其他几个拉跨。
melody就是按多用chan少用锁那套搞的,接口比较优雅、性能不是它的首要目标

from gws.

lxzan avatar lxzan commented on May 18, 2024

对于channel我一直心存敬畏, 一不小心就死锁了, 封装和抽象屏蔽了细节

from gws.

lesismal avatar lesismal commented on May 18, 2024

协程和chan是go最大的杀器,chan相比于其他语言用进程线程间的那些同步、通信syscall,要简单太多了,省太多体力,而且清凉成本低。

from gws.

lesismal avatar lesismal commented on May 18, 2024

对比了下 workerQueue 的调度性能:

// pool_test.go
package gws

import (
	"sync"
	"testing"

	"github.com/bytedance/gopkg/util/gopool"
	"github.com/lesismal/nbio/taskpool"
	"github.com/panjf2000/ants/v2"
)

const (
	PoolSize   = 100
	BenchTimes = 10000
)

func BenchmarkGo(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			go func() {
				demoFunc()
				wg.Done()
			}()
		}
		wg.Wait()
	}
}

func BenchmarkGwsWorkerQueue(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	wq := newWorkerQueue(PoolSize)
	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			wq.AddJob(asyncJob{
				Do: func(interface{}) error {
					demoFunc()
					wg.Done()
					return nil
				},
			})
		}
		wg.Wait()
	}
}

func BenchmarkGopool(b *testing.B) {
	p := gopool.NewPool("test", PoolSize, &gopool.Config{})

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Go(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func BenchmarkAnts(b *testing.B) {
	p, _ := ants.NewPool(PoolSize)
	defer p.Release()

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Submit(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func BenchmarkNbio(b *testing.B) {
	p := taskpool.NewMixedPool(PoolSize, 1, 10000)
	defer p.Stop()

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Go(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func demoFunc() int {
	var sum int
	for i := 0; i < 1000; i++ {
		sum += i
	}
	return sum
}

Output:

BenchmarkGo
BenchmarkGo-8               	     672	   1840049 ns/op	  162122 B/op	   10005 allocs/op
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	     168	   7032465 ns/op	 1268412 B/op	   20229 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	     450	   2651943 ns/op	  183109 B/op	   10587 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     252	   4578326 ns/op	  160239 B/op	   10004 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	     482	   2281592 ns/op	  479608 B/op	   29984 allocs/op

from gws.

lesismal avatar lesismal commented on May 18, 2024

又对比了下 workerQueue 并发为1时相当于我的 serial ,也对比了下:

func BenchmarkGwsWorkerQueueSerial(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	wq := newWorkerQueue(1)
	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			wq.AddJob(asyncJob{
				Do: func(interface{}) error {
					demoFunc()
					wg.Done()
					return nil
				},
			})
		}
		wg.Wait()
	}
}

func BenchmarkLesismalSerial(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	s := serial.New(func(f func()) {
		f()
	}, func(f func()) {
		go f()
	})
	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			s.Go(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

Output:

BenchmarkGwsWorkerQueueSerial
BenchmarkGwsWorkerQueueSerial-8   	     160	   7565818 ns/op	 1355702 B/op	   20009 allocs/op
BenchmarkLesismalSerial
BenchmarkLesismalSerial-8         	     202	   6067438 ns/op	  327063 B/op	   20018 allocs/op

from gws.

lxzan avatar lxzan commented on May 18, 2024

对比了下 workerQueue 的调度性能:

// pool_test.go
package gws

import (
	"sync"
	"testing"

	"github.com/bytedance/gopkg/util/gopool"
	"github.com/lesismal/nbio/taskpool"
	"github.com/panjf2000/ants/v2"
)

const (
	PoolSize   = 100
	BenchTimes = 10000
)

func BenchmarkGo(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			go func() {
				demoFunc()
				wg.Done()
			}()
		}
		wg.Wait()
	}
}

func BenchmarkGwsWorkerQueue(b *testing.B) {
	b.ReportAllocs()
	b.ResetTimer()
	wq := newWorkerQueue(PoolSize)
	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			wq.AddJob(asyncJob{
				Do: func(interface{}) error {
					demoFunc()
					wg.Done()
					return nil
				},
			})
		}
		wg.Wait()
	}
}

func BenchmarkGopool(b *testing.B) {
	p := gopool.NewPool("test", PoolSize, &gopool.Config{})

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Go(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func BenchmarkAnts(b *testing.B) {
	p, _ := ants.NewPool(PoolSize)
	defer p.Release()

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Submit(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func BenchmarkNbio(b *testing.B) {
	p := taskpool.NewMixedPool(PoolSize, 1, 10000)
	defer p.Stop()

	b.ReportAllocs()
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		wg := sync.WaitGroup{}
		wg.Add(BenchTimes)
		for j := 0; j < BenchTimes; j++ {
			p.Go(func() {
				demoFunc()
				wg.Done()
			})
		}
		wg.Wait()
	}
}

func demoFunc() int {
	var sum int
	for i := 0; i < 1000; i++ {
		sum += i
	}
	return sum
}

Output:

BenchmarkGo
BenchmarkGo-8               	     672	   1840049 ns/op	  162122 B/op	   10005 allocs/op
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8   	     168	   7032465 ns/op	 1268412 B/op	   20229 allocs/op
BenchmarkGopool
BenchmarkGopool-8           	     450	   2651943 ns/op	  183109 B/op	   10587 allocs/op
BenchmarkAnts
BenchmarkAnts-8             	     252	   4578326 ns/op	  160239 B/op	   10004 allocs/op
BenchmarkNbio
BenchmarkNbio-8             	     482	   2281592 ns/op	  479608 B/op	   29984 allocs/op

没想到差了这么多

from gws.

lesismal avatar lesismal commented on May 18, 2024

不过上面Output里的 ...ns/op,每op是1w次,所以单次其实性能差距不大,业务影响不大。

from gws.

lesismal avatar lesismal commented on May 18, 2024

我这AMD的笔记本,intel的以前试过其他几个、也是类似的结果,gws的我还没试,你可以试下intel的情况

from gws.

lesismal avatar lesismal commented on May 18, 2024

裸go肯定是最快,但是没法限制数量。
ants算是比较早、star最多的知名库了,但是当初我选型的时候,跑benchmark也是这类似的数据,我比较失望。
所以自己撸了一份,后来又看到字节的那个,也测了下对比,几种实现的差异我前面楼层说过的(除了ants我没看源码),性能差别大致就是那个原因了

from gws.

lxzan avatar lxzan commented on May 18, 2024

没有英特尔电脑

from gws.

lxzan avatar lxzan commented on May 18, 2024

mac上gws是400ns/op

from gws.

lesismal avatar lesismal commented on May 18, 2024

整体对比的数据是多少

from gws.

lxzan avatar lxzan commented on May 18, 2024

整体对比的数据是多少

没对比他们的. gws workerQueue是代码量最少的,可惜实现得优雅不代表性能好,不知道还有没有优化空间

from gws.

lxzan avatar lxzan commented on May 18, 2024

整体对比的数据是多少

你对比下8并发试试

from gws.

lesismal avatar lesismal commented on May 18, 2024

nbio那个MixedPool实际代码没多少,加了recover log算是比较浪费的了。。。

from gws.

lesismal avatar lesismal commented on May 18, 2024

8并发:

BenchmarkGo
BenchmarkGo-8                     	     698	   1815711 ns/op	  163157 B/op	   10007 allocs/op
BenchmarkGwsWorkerQueue
BenchmarkGwsWorkerQueue-8         	     154	   7720803 ns/op	 1246208 B/op	   20023 allocs/op
BenchmarkGopool
BenchmarkGopool-8                 	     469	   3104514 ns/op	  206124 B/op	   10047 allocs/op
BenchmarkAnts
BenchmarkAnts-8                   	     224	   5259368 ns/op	  160355 B/op	   10004 allocs/op
BenchmarkNbio
BenchmarkNbio-8                   	     493	   2417265 ns/op	  478422 B/op	   29934 allocs/op
BenchmarkGwsWorkerQueueSerial
BenchmarkGwsWorkerQueueSerial-8   	     158	   7411319 ns/op	 1355767 B/op	   20008 allocs/op
BenchmarkLesismalSerial
BenchmarkLesismalSerial-8         	     208	   5746581 ns/op	  335356 B/op	   20009 allocs/op

结论一致

from gws.

lesismal avatar lesismal commented on May 18, 2024

前面几个是一组、相同并发,后面这两个是并发1的,单独对比:
BenchmarkGwsWorkerQueueSerial
BenchmarkLesismalSerial

from gws.

lxzan avatar lxzan commented on May 18, 2024

我测试不开协程裸跑都要320ns/op

from gws.

lxzan avatar lxzan commented on May 18, 2024

某些平台编译器有毛病吧

goos: linux
goarch: amd64
pkg: demo
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkGwsWorkerQueue-8            194           6073131 ns/op         1355222 B/op      20106 allocs/op
BenchmarkDemoFunc-8                  494           2438866 ns/op               0 B/op          0 allocs/op
PASS
ok      demo    3.263s

goos: darwin
goarch: arm64
pkg: github.com/lxzan/gws
BenchmarkGwsWorkerQueue-8       284   4006893 ns/op  919388 B/op   22014 allocs/op
BenchmarkDemoFunc-8             367   3237271 ns/op       0 B/op       0 allocs/op
PASS
ok   github.com/lxzan/gws 3.193s

from gws.

lesismal avatar lesismal commented on May 18, 2024

我测试不开协程裸跑都要320ns/op

正常啊,里面有for循环求和代码的啊

from gws.

lxzan avatar lxzan commented on May 18, 2024

我测试不开协程裸跑都要320ns/op

正常啊,里面有for循环求和代码的啊

你看看linux-amd64和darwin-arm64的对比

from gws.

lesismal avatar lesismal commented on May 18, 2024

我测试不开协程裸跑都要320ns/op

正常啊,里面有for循环求和代码的啊

你看看linux-amd64和darwin-arm64的对比

你mac是m1/m2芯片的?

from gws.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.