Code Monkey home page Code Monkey logo

gws's Introduction

中文

GWS

logo

Simple, Fast, Reliable WebSocket Server & Client

awesome codecov go-test go-reportcard license go-version

Introduction

GWS (Go WebSocket) is a very simple, fast, reliable and feature-rich WebSocket implementation written in Go. It is designed to be used in highly-concurrent environments, and it is suitable for building API, Proxy, Game, Live Video, Message, etc. It supports both server and client side with a simple API which mean you can easily write a server or client by yourself.

GWS developed base on Event-Driven model. every connection has a goroutine to handle the event, and the event is able to be processed in a non-blocking way.

Why GWS

  • Simplicity and Ease of Use

    • User-Friendly: Simple and clear WebSocket Event API design makes server-client interaction easy.
    • Code Efficiency: Minimizes the amount of code needed to implement complex WebSocket solutions.
  • High-Performance

    • High IOPS Low Latency: Designed for rapid data transmission and reception, ideal for time-sensitive applications.
    • Low Memory Usage: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership.
  • Reliability and Stability

    • Robust Error Handling: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
    • Well-Developed Test Cases: Passed all Autobahn test cases, fully compliant with RFC 7692. Unit test coverage is almost 100%, covering all conditional branches.

Benchmark

IOPS (Echo Server)

GOMAXPROCS=4, Connection=1000, CompressEnabled=false

performance

gorilla and nhooyr not using stream api

GoBench

goos: linux
goarch: amd64
pkg: github.com/lxzan/gws
cpu: AMD Ryzen 5 PRO 4650G with Radeon Graphics
BenchmarkConn_WriteMessage/compress_disabled-8         	 7252513	     165.4 ns/op	       0 B/op	       0 allocs/op
BenchmarkConn_WriteMessage/compress_enabled-8          	   97394	     10391 ns/op	     349 B/op	       0 allocs/op
BenchmarkConn_ReadMessage/compress_disabled-8          	 7812108	     152.3 ns/op	      16 B/op	       0 allocs/op
BenchmarkConn_ReadMessage/compress_enabled-8           	  368712	      3248 ns/op	     108 B/op	       0 allocs/op
PASS

Index

Feature

  • Event API
  • Broadcast
  • Dial via Proxy
  • Context-Takeover
  • Passed Autobahn Test Cases Server / Client
  • Concurrent & Asynchronous Non-Blocking Write

Attention

  • The errors returned by the gws.Conn export methods are ignorable, and are handled internally.
  • Transferring large files with gws tends to block the connection.
  • If HTTP Server is reused, it is recommended to enable goroutine, as blocking will prevent the context from being GC.

Install

go get -v github.com/lxzan/gws@latest

Event

type Event interface {
    OnOpen(socket *Conn)                        // connection is established
    OnClose(socket *Conn, err error)            // received a close frame or input/output error occurs
    OnPing(socket *Conn, payload []byte)        // received a ping frame
    OnPong(socket *Conn, payload []byte)        // received a pong frame
    OnMessage(socket *Conn, message *Message)   // received a text/binary frame
}

Quick Start

package main

import "github.com/lxzan/gws"

func main() {
	gws.NewServer(&gws.BuiltinEventHandler{}, nil).Run(":6666")
}

Best Practice

package main

import (
	"github.com/lxzan/gws"
	"net/http"
	"time"
)

const (
	PingInterval = 5 * time.Second
	PingWait     = 10 * time.Second
)

func main() {
	upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
		ParallelEnabled:  true,                                 // Parallel message processing
		Recovery:          gws.Recovery,                         // Exception recovery
		PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // Enable compression
	})
	http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
		socket, err := upgrader.Upgrade(writer, request)
		if err != nil {
			return
		}
		go func() {
			socket.ReadLoop() // Blocking prevents the context from being GC.
		}()
	})
	http.ListenAndServe(":6666", nil)
}

type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
	_ = socket.WritePong(nil)
}

func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
	defer message.Close()
	socket.WriteMessage(message.Opcode, message.Bytes())
}

More Examples

KCP

  • server
package main

import (
	"log"
	"github.com/lxzan/gws"
	kcp "github.com/xtaci/kcp-go"
)

func main() {
	listener, err := kcp.Listen(":6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app := gws.NewServer(&gws.BuiltinEventHandler{}, nil)
	app.RunListener(listener)
}
  • client
package main

import (
	"github.com/lxzan/gws"
	kcp "github.com/xtaci/kcp-go"
	"log"
)

func main() {
	conn, err := kcp.Dial("127.0.0.1:6666")
	if err != nil {
		log.Println(err.Error())
		return
	}
	app, _, err := gws.NewClientFromConn(&gws.BuiltinEventHandler{}, nil, conn)
	if err != nil {
		log.Println(err.Error())
		return
	}
	app.ReadLoop()
}

Proxy

Dial via proxy, using socks5 protocol.

package main

import (
	"crypto/tls"
	"github.com/lxzan/gws"
	"golang.org/x/net/proxy"
	"log"
)

func main() {
	socket, _, err := gws.NewClient(new(gws.BuiltinEventHandler), &gws.ClientOption{
		Addr:      "wss://example.com/connect",
		TlsConfig: &tls.Config{InsecureSkipVerify: true},
		NewDialer: func() (gws.Dialer, error) {
			return proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, nil)
		},
		PermessageDeflate: gws.PermessageDeflate{
			Enabled:               true,
			ServerContextTakeover: true,
			ClientContextTakeover: true,
		},
	})
	if err != nil {
		log.Println(err.Error())
		return
	}
	socket.ReadLoop()
}

Broadcast

Create a Broadcaster instance, call the Broadcast method in a loop to send messages to each client, and close the broadcaster to reclaim memory. The message is compressed only once.

func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) {
    var b = gws.NewBroadcaster(opcode, payload)
    defer b.Close()
    for _, item := range conns {
        _ = b.Broadcast(item)
    }
}

WriteWithTimeout

SetDeadline covers most of the scenarios, but if you want to control the timeout for each write, you need to encapsulate the WriteWithTimeout function, the creation and destruction of the timer will incur some overhead.

func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {
	var sig = atomic.Uint32{}
	var timer = time.AfterFunc(timeout, func() {
		if sig.CompareAndSwap(0, 1) {
			socket.WriteClose(1000, []byte("write timeout"))
		}
	})
	var err = socket.WriteMessage(gws.OpcodeText, p)
	if sig.CompareAndSwap(0, 1) {
		timer.Stop()
	}
	return err
}

Pub / Sub

Use the event_emitter package to implement the publish-subscribe model. Wrap gws.Conn in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription.

This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.

package main

import (
	"github.com/lxzan/event_emitter"
	"github.com/lxzan/gws"
)

type Socket struct{ *gws.Conn }

func (c *Socket) GetSubscriberID() int64 {
	userId, _ := c.Session().Load("userId")
	return userId.(int64)
}

func (c *Socket) GetMetadata() event_emitter.Metadata {
	return c.Conn.Session()
}

func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) {
	em.Subscribe(socket, topic, func(subscriber *Socket, msg any) {
		_ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn)
	})
}

func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) {
	var broadcaster = gws.NewBroadcaster(op, msg)
	defer broadcaster.Close()
	em.Publish(topic, broadcaster)
}

Autobahn Test

cd examples/autobahn
mkdir reports
docker run -it --rm \
    -v ${PWD}/config:/config \
    -v ${PWD}/reports:/reports \
    crossbario/autobahn-testsuite \
    wstest -m fuzzingclient -s /config/fuzzingclient.json

Communication

微信需要先添加好友再拉群, 请注明来自 GitHub

WeChat      QQ

Acknowledgments

The following project had particular influence on gws's design.

gws's People

Contributors

fzdwx avatar houseme avatar lesismal avatar lxzan avatar rfyiamcool avatar shengyanli1982 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gws's Issues

哈哈,我还是没忍住

package main

import (
    "fmt"
    "time"
    "sync"
)

type (
    workerQueue struct {
        mu             sync.Mutex // 锁
        q              []asyncJob // 任务队列
        maxConcurrency int32      // 最大并发
        curConcurrency int32      // 当前并发
    }

    asyncJob func()
)

// newWorkerQueue 创建一个任务队列
func newWorkerQueue(maxConcurrency int32) *workerQueue {
    c := &workerQueue{
        mu:             sync.Mutex{},
        maxConcurrency: maxConcurrency,
        curConcurrency: 0,
    }
    return c
}

// 获取一个任务
func (c *workerQueue) getJob(delta int32) asyncJob {
    c.mu.Lock()
    defer c.mu.Unlock()

    c.curConcurrency += delta
    if c.curConcurrency >= c.maxConcurrency {
        return nil
    }
    if len(c.q) == 0 {
        return nil
    }
    var result = c.q[0]
    c.q = c.q[1:]
    c.curConcurrency++
    return result
}

// 循环执行任务
func (c *workerQueue) do(job asyncJob) {
    for job != nil {
        job()
        job = c.getJob(-1)
    }
}
// Push 追加任务, 有资源空闲的话会立即执行
func (c *workerQueue) Push(job asyncJob) {
    c.mu.Lock()
    c.q = append(c.q, job)
    c.mu.Unlock()
    if job := c.getJob(0); job != nil {
        go c.do(job)
    }
}

func main() {
    wq := newWorkerQueue(8)
    wq.Push(func () { fmt.Println(1) })
    wq.Push(func () { fmt.Println(2) })
    wq.Push(func () { fmt.Println(3) })
    wq.Push(func () { fmt.Println(4) })
    wq.Push(func () { fmt.Println(5) })
    wq.Push(func () { fmt.Println(6) })
    time.Sleep(time.Millisecond * 1000)
}
go run c.go
6
4
2
3
1
5

你看,我直接把task.go拷贝过来的,这总看明白了吧 ^_^

Can library be modified to support (as a client) compression WITH server context takeover?

My usecase includes many similar messages (150-400 bytes each) sent from third party server. I suppose required bandwidth can be decreased by using server context takeover (as a client).
Currently as I can see in code

gws/client.go

Line 135 in 5318d69

if compressEnabled && !strings.Contains(extensions, "server_no_context_takeover") {

SecWebSocketExtensions = Pair{"Sec-WebSocket-Extensions", "permessage-deflate; server_no_context_takeover; client_no_context_takeover"}

extension "server_no_context_takeover" is hardcoded.
Is there any way to modify library for using server context takeover? Or is it too much hassle?

pass SubProtocols to Upgrade

Problem

We have problem that SubProtocols are not known beforehand and are rather dynamic.
Therefore we need capability to specify support subprotocols for every websocket connection.

I made it by adding next to Upgrade function UpgradeWithOptions.

master...mrkmrtns:gws:upgradewithoptions
You can check it out here, if it suits for You then I can prepare PR.

UpgradeOptions struct can be extended in future for any other overwrites/opts - currently added there only SubProtocols for our need.

Any questions/recommendations are welcome.

task单元疑问

task单元的任务队列为什么没有进行扩缩容?每次都append,会不会对内存有一定影响呢?

Support for wasm dialing

Is there any plan to support WebAssembly compiled projects? Right now, it will not work as it tries to dial using the default dialer.

Thread safety of methods WriteMessage, WritePing, WritePong, SetDeadline, WriteClose of gws.Conn

Hello, I'm considering usage of gws as a websocket client library.
I want to ask: is it safe to call methods WriteMessage, WritePing, WritePong, SetDeadline, WriteClose of same gws.Conn concurrently from distinct goroutines?

Example:

socket, _, err := gws.NewClient(...arguments here...)
go socket.ReadLoop()

go socket.WriteMessage(gws.OpcodeText, []byte("message1"))
go socket.SetDeadline(time.Now().Add(10 * time.Seconds))
go socket.WriteMessage(gws.OpcodeText, []byte("message2"))
go socket.WritePing([]byte("ping"))
go socket.WritePong([]byte("pong"))

Or should I wrap with mutex each call of those methods?

主动关闭事件?

好像没有主动关闭事件,我在 error里,调用了net.conn.close,看代码这个是没必要的,那error事件怎么调用关闭呢

迭代计划

  • 使用 comparable 约束 map key
  • 添加线程模型图

non-blocking write性能、消耗、时序

简单看了下,每个conn上自带一个size为8的workerQueue,但是这样似乎有几个问题:

  1. doWrite中是有锁的,所以即使多个协程同时写,也是在锁这里排队,而且还加剧了锁的竞争自旋、可能cpu消耗更高了,所以这里的多协程异步写目测并不能提高性能
  2. 高并发时协程数量、资源消耗更多
  3. 这个workerQueue并发执行时,好像并不能保证发送队列中的数据按照WriteAsync时入队列的顺序。
    而单个conn的消息,很多时候是需要时序保证的。比如流媒体推流需要广播不能允许被单个conn卡其他conn,所以就得WriteAsync,但是推送的数据顺序错乱了音视频就乱码了丢帧了。比如游戏一些玩家操作顺序广播出去,本来是ABC,用户收到变成了CAB。。。

兄弟我建议这块就按常规简单的方式来,单个协程+limited size chan+select default/timeout循环发送就可以了,继续这样下去好像越走越远了。。。

升级socket时报错hijack

我在gozero项目中接入gws,按照chatroom的例子来写的时候,报错以下错误:
error write response failed, error: http: connection has been hijacked caller=httpx/responses.go:81

contentLength and ReadMaxPayloadSize mismatch

I'm trying to verify the ReadMaxPayloadSize config option but i can't quite make sense of it.

In readMessage() if I add print("contentLength = ", contentLength, " continuation? ", opcode == OpcodeContinuation, "\n") and then use this test script in Chrome:

w = new WebSocket('ws://localhost:5000/connect');
w.addEventListener('open', () => w.send('a'.repeat(1024)));

I would expect contentLength to be 1024. However, it prints contentLength = 11 continuation? false. Is there some unit mismatch or is it reading the contentLength incorrectly?

How to access request object or context

Hi there,

First off - thanks for this great package. I started building using it, and it works very well so far.

Now, my issue is - how can I access the request context inside a handler.

Lets say I followed the example in the readme:

package main

import (
	"github.com/lxzan/gws"
	"net/http"
	"time"
)

const (
	PingInterval = 10 * time.Second
	PingWait     = 5 * time.Second
)

func main() {
	upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
		ReadAsyncEnabled: true,
		CompressEnabled:  true,
		Recovery:         gws.Recovery,
	})
	http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
		socket, err := upgrader.Upgrade(writer, request)
		if err != nil {
			return
		}
		go func() {
			// Blocking prevents the context from being GC.
			socket.ReadLoop()
		}()
	})
	http.ListenAndServe(":6666", nil)
}

type Handler struct{}

func (c *Handler) OnOpen(socket *gws.Conn) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
}

func (c *Handler) OnClose(socket *gws.Conn, err error) {}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
	_ = socket.SetDeadline(time.Now().Add(PingInterval + PingWait))
	_ = socket.WritePong(nil)
}

func (c *Handler) OnPong(socket *gws.Conn, payload []byte) {}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
	defer message.Close()
	socket.WriteMessage(message.Opcode, message.Bytes())
}

Now, I have logic implemented inside my OnMessage receiver. I can access the socket and the message, but I cannot access the original request and any data attached to it. I also cannot access the context propagated from elsewhere.

In my case, I am mounting the WebSocket from this library on a route on an existing HTTP server. Specifically, using go-chi as a router, but it doesn't matter.

I have values injected into the request via middleware, such as user session from the database, parsed URL parameters, etc. I want to be able to pass these into the WebSocket handler in some way - best by using context.

Is there any such way or is it perhaps a feature that will be implemented in the near future?

能不能把 `gws.Conn.isClosed()`方法改成可公开访问的?

首先谢谢大佬的作品.

github.com/lxzan/[email protected]/conn.go:84

我想把conn 保存到别的地方, 想在那里调用发送之前先判断一下是否已经ws是否已经断开.

但发现这个接口没有公开, 请问是否可以公开给用户使用? 会不会有什么潜在的问题?

如果不公开的话,推荐的做法是什么? 另外在on open on close回调中处理对应的连接状态,自行维护?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.