Code Monkey home page Code Monkey logo

rsocket-go's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rsocket-go's Issues

Roadmap and the future?

I've just heard of rsocket as an alternative to gRPC and I'm wondering what is the future of rsocket-go.

rsocket-go has not seen as much love as other implementation, notably c++ and especially java and has sparse commits this year. Is it because the protocol is fully implemented?

All but one TODO has been completed. Does that mean that we could get v1 soon, or at least RC? If there are more to do, could this be updated to include what else needs to be done so that the community can engage better.

Parallel request processing

I've noticed the following in my testing with RequestResponse:

1. Server responds with mono.Just

// server code 
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
	return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
                time.Sleep(time.Second)
		return mono.Just(payload.NewString("data", "metadata"))
	})), nil
}).
  • Single client connection, multiple requests: requests are processed in series
  • Multiple client connection, single request per client: requests are processed in parallel

2. Server responds with mono.Create

// server code 
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
	return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
		return mono.Create(func(ctx context.Context, s mono.Sink) {
			time.Sleep(time.Second)
			s.Success(payload.NewString("data", "metadata"))
		})
	})), nil
}).
  • Single client connection, multiple requests: requests are processed in parallel
  • Multiple client connection, single request per client: requests are processed in parallel

Is this by the rsocket protocol specification (could not find any reference)? Or was this a design choice or perhaps it was a non-intentional feature? In any case, will this functionality persist in v1?

unexport type how to implement

inServerBuilder interface
it unexport(private) type serverResumeOptions in OpServerResume, how to implement "Resume(opts ...OpServerResume) ServerBuilder"?

L37: Go Custom Transports

Make gRPC run on RSocket transports:

  • Transports message-based: wrapper metadata(gRPC metadata) and data in Payload
  • ServeTransport & ClientTransport: almost same in RSocket transportation implementation.

https://github.com/grpc/proposal/pull/103/files
https://github.com/grpc/proposal/blob/9b5e98916beeb8955264b28c54b8ffbdaa1d12fa/L37-go-custom-transports.md

An RSocket adapter for gRPC transportation and make RSocket and gRPC interoperate each other. Some consideration as following:

References:

TLS support

Please consider to add TLS support. websocket with TLS and TCP with TLS. Now Java SDK use Netty Reactor TCPServer and TCPClient to add TLS support.

Some considerations:

  • TLSv1.3 and TLSv1.2 with BoringSSL?
  • self signed certification trust manager support
  • Certification fingerprint validation support from requester
  • uri handler support for wss and tcps

Sending anything on the client socket, without client having Acceptor defined, closes the socket.

As the title says, if I use sendingSocket to send anything to the client that does not have Acceptor function defined, it causes it to close the client socket without error. Not sure if this is correct behavior.

Steps to Reproduce

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/jjeffcaii/reactor-go/scheduler"
	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx/mono"
)

const transportString = "tcp://127.0.0.1:7878"

func main() {
	readyCh := make(chan struct{})
	go server(readyCh)
	<-readyCh
	client()
}

var schedulerMap map[byte]scheduler.Scheduler

func server(readyCh chan struct{}) {
	schedulerMap = make(map[byte]scheduler.Scheduler)
	err := rsocket.Receive().
		OnStart(func() {
			close(readyCh)
		}).
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			fmt.Println("new connection")
			sendingSocket.MetadataPush(payload.New(nil, []byte{0x44}))
			// sendingSocket.FireAndForget(payload.New(nil, []byte{0x44}))
			// sendingSocket.RequestResponse(payload.New(nil, []byte{0x44})).Subscribe(context.Background())
			fmt.Println("asynchronous metadata push sent")
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
					return mono.Just(msg)
				}),
			), nil
		}).
		Transport(transportString).
		Serve(context.Background())

	panic(err)
}

func client() {

	fmt.Println("new client start")
	client, err := rsocket.
		Connect().
		OnClose(func(err error) {
			fmt.Println("client closed", err)
		}).
		// Uncomment acceptor for the client not to close
		// Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
		// 	return rsocket.NewAbstractSocket()
		// }).
		Transport(transportString).
		Start(context.Background())

	if err != nil {
		panic(err)
	}

	fmt.Println("new client end")

	// wait for a bit so that the client closes, panic ensues
	time.Sleep(time.Second)

	client.
		RequestResponse(payload.New(nil, nil)).
		DoOnSuccess(func(input payload.Payload) {
			fmt.Println("success")
		}).
		DoOnError(func(err error) {
			fmt.Println(err)
		}).
		Block(context.Background())

	client.Close()
}

Server.Serve() Got panic when shutting down client channel

Hi team,
I'm trying the client.RequestChannel feature and got panic in server side when shutting down the client side.(kill the client's process)
The testing code is very simple:
server:

err := rsocket.Receive().
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
        return rsocket.NewAbstractSocket(
            rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
                log.Printf("msg received: %+v", msg)
                return mono.Just(payload.NewString("OK", ""))
            }),
            rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
                return msgs.(flux.Flux).DoOnNext(func(msg payload.Payload) {
                    log.Printf("msg received: %+v", msg)
                })
            }),
        ), nil
    }).
    Transport("tcp://0.0.0.0:7878").
    Serve(context.Background())

the panic:

2019/11/18 20:37:35 [WARN] send frame failed: send on closed channel
panic: close of closed channel

goroutine 23 [running]:
github.com/rsocket/rsocket-go/internal/framing.(*BaseFrame).Done(0xc0003f8d80)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/framing/frame.go:160 +0x2f
github.com/rsocket/rsocket-go/internal/transport.(*Transport).Send(0xc000186000, 0x13f8760, 0xc0003b1378, 0x0, 0x13eed80, 0xc000122500)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/transport/transport.go:82 +0x1a3
github.com/rsocket/rsocket-go/internal/socket.(*DuplexRSocket).drainOutBack(0xc000190000)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:984 +0xe5
github.com/rsocket/rsocket-go/internal/socket.(*DuplexRSocket).loopWrite(0xc000190000, 0x13f51c0, 0xc0000e0300, 0x0, 0x0)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1065 +0x109
github.com/rsocket/rsocket-go/internal/socket.(*resumeServerSocket).Start(0xc000122080, 0x13f51c0, 0xc0000e0300, 0x0, 0x0)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/server_resume.go:36 +0x89
github.com/rsocket/rsocket-go.(*server).serve.func3.2(0x13f51c0, 0xc0000e0300, 0x13f8b60, 0xc000122080)
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/server.go:220 +0x49
created by github.com/rsocket/rsocket-go.(*server).serve.func3
        /Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/server.go:219 +0x316

although the client was shutdown without closing the channel, I suppose the server should still be running instead of exit directly

Deconstructing error

From what I can tell, there is no way of deconstructing the error message to get the error data and error code. As can be seen from the code bellow, error is of type *framing.FrameError which is an internal package and therefore not accessible outside.

Can it be considered exposing FrameError in some way to allow users to get parts of the error?

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"github.com/rsocket/rsocket-go/rx/mono"
)

func main() {
	go server()
	time.Sleep(time.Second)
	client()
}

func server() {
	err := rsocket.Receive().
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				return mono.Error(fmt.Errorf("mono error"))
			})), nil
		}).
		Transport("tcp://127.0.0.1:7878").
		Serve(context.Background())
	panic(err)

}

func client() {
	wg := sync.WaitGroup{}

	client, err := rsocket.Connect().
		Transport("tcp://127.0.0.1:7878").
		Start(context.Background())

	if err != nil {
		panic(err)
	}

	wg.Add(1)
	client.RequestResponse(payload.NewString("abc", "def")).DoOnSuccess(func(input payload.Payload) {
		fmt.Println(input)
	}).DoOnError(func(err error) {
		// Unaccessible 
		// e := err.(*framing.FrameError)
		// fmt.Println(string(e.ErrorData()))
		// fmt.Println(e.ErrorCode())
		
		fmt.Println(err)
		fmt.Printf("%T\n", err)
	}).DoFinally(func(s rx.SignalType) {
		wg.Done()
	}).Subscribe(context.Background())

	wg.Wait()
	client.Close()
}

Let user specify a Proxy func for NewWebsocketClientTransport

Motivation

For the NewWebsocketClientTransport, the proxy function is hardcoded to http.ProxyFromEnvironment. This is not ideal for processes where multiple Proxy settings are required. Java SDK allows the user to pass in an httpClient, which can have its own Proxy configuration.

Desired solution

When invoking NewWebsocketClientTransport, let the user provide a Proxy function that can be passed to websocket.Dialer. The function signature is a de-facto standard, it's used by http.Transport as well.

Considered alternatives

None.

Additional context

The change is very straight-forward and provides more flexibility to the user of rsocket-go.

Proposed changes in my fork: ronakg@5133e49. I can raise the PR once we have discussed the solution in this issue.

关于NewRoundRobinBalancer的疑问

如果我在连接客户端的时候指定了n个server,在线上跑的时候,突然一个server挂掉了,是怎么处理的呢--NewRoundRobinBalancer。你有wechat或者别的联系方式吗?群也行,我提的问题,觉得发在issues这里不太好.....另外,我发现我用etcd做服务发现是多余的。

Need to call Error twice in flux.Sink

In a flux.Create function I need to call sink.Error twice for it to actually respond with the error, otherwise it will block indefinitely.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"github.com/rsocket/rsocket-go/rx/flux"
)

const transportString = "tcp://127.0.0.1:7878"
const number = 13

func main() {
	readyCh := make(chan struct{})
	go server(readyCh)
	<-readyCh
	client()
	time.Sleep(time.Second)
}

func server(readyCh chan struct{}) {

	requestChannelHandler := rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
		return flux.Create(func(ctx context.Context, s flux.Sink) {
			msgs.(flux.Flux).DoOnNext(func(msg payload.Payload) {
				fmt.Print(int8(msg.Data()[0]), " ")
				if int8(msg.Data()[0]) == 1 {
					fmt.Println("sending error")
					s.Error(fmt.Errorf("error on %d", int8(msg.Data()[0])))
					// uncomment bellow error for it to send error 
					//s.Error(fmt.Errorf("second error on %d", int8(msg.Data()[0])))
				} else {
					fmt.Println("sending response")
					s.Next(msg)
				}
			}).DoOnComplete(func() {
				fmt.Println("DoOnComplete server")
				s.Complete()
			}).DoFinally(func(sig rx.SignalType) {
				fmt.Println("DoFinally server")
			}).Subscribe(context.Background())
		})
	})

	err := rsocket.Receive().
		OnStart(func() { close(readyCh) }).
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			return rsocket.NewAbstractSocket(requestChannelHandler), nil
		}).
		Transport(transportString).
		Serve(context.Background())

	panic(err)
}

func client() {
	client, err := rsocket.Connect().Transport(transportString).Start(context.Background())
	if err != nil {
		panic(err)
	}
	defer client.Close()

	wg := sync.WaitGroup{}
	wg.Add(1)

	counter := 0

	client.RequestChannel(
		flux.Create(func(ctx context.Context, s flux.Sink) {
			for i := 0; i < 5; i++ {
				s.Next(payload.New([]byte{byte(i)}, nil))
			}
			s.Complete()
		}),
	).DoOnNext(func(input payload.Payload) {
		fmt.Println(input)
		counter = counter + 1
	}).DoOnError(func(err error) {
		fmt.Println("error", err)
	}).DoFinally(func(s rx.SignalType) {
		fmt.Println("DoFinally client")
		wg.Done()
	}).Subscribe(context.Background())

	wg.Wait()
}

Additional question: Since I can only send one error and after the error server will not send any more responses so it should stop execution of the DoOnNext function. Is it possible to stop execution if an error occurred?

Performance tuning

Benchmarking simple nil echo server to compare rsocket and http to see the overhead for small payloads to see how much can I squeeze out of rsocket.

For http I'm using fasthttp and it can do ~367000 req/s with 100 concurrent requests.
With RSocket, to achieve the best performance that I could I had to create 100 connections to get ~400000 req/s
In both situations the CPU was 100%.

Is it possible to increase performance? Possibly with less opened connections.

func TestHTTP(t *testing.T) {
	go fasthttp.ListenAndServe(":8081", func(ctx *fasthttp.RequestCtx) { ctx.Response.SetBody(ctx.Request.Body()) })

	time.Sleep(time.Second)
	client := &fasthttp.Client{
		ReadTimeout:                   time.Second * 5,
		WriteTimeout:                  time.Second * 5,
		MaxIdleConnDuration:           time.Hour,
		NoDefaultUserAgentHeader:      true,
		DisableHeaderNamesNormalizing: true,
		DisablePathNormalizing:        true,
		Dial:                          (&fasthttp.TCPDialer{Concurrency: 4096, DNSCacheDuration: time.Hour}).Dial,
	}

	limit := make(chan struct{}, 100)
	wg := sync.WaitGroup{}
	const n = 10_000_000
	wg.Add(n)
	now := time.Now()
	for i := 0; i < n; i++ {
		limit <- struct{}{}
		go func() {
			req := fasthttp.AcquireRequest()
			req.SetRequestURI("http://localhost:8081/")
			req.Header.SetMethod(fasthttp.MethodGet)
			resp := fasthttp.AcquireResponse()
			err := client.Do(req, resp)
			fasthttp.ReleaseRequest(req)
			if err != nil {
				panic(fmt.Sprintf("ERR Connection error: %v\n", err))
			}
			fasthttp.ReleaseResponse(resp)
			<-limit
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(time.Since(now))
	// 10_000_000/27,181s = 367904 req/s
}
func TestRR(t *testing.T) {
	go func() {
		rsocket.
			Receive().
			Acceptor(func(ctx context.Context, setup payload.SetupPayload, socket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
				return rsocket.NewAbstractSocket(
					rsocket.RequestResponse(func(request payload.Payload) (response mono.Mono) {
						return mono.Create(func(ctx context.Context, s mono.Sink) {
							s.Success(request)
						})
					}),
				), nil
			}).
			Transport(rsocket.TCPServer().SetHostAndPort("0.0.0.0", 9000).Build()).
			Serve(context.Background())
	}()
	time.Sleep(time.Second)

	const cons = 100
	count := int64(0)
	conWg := sync.WaitGroup{}
	conWg.Add(cons)
	for i := 0; i < cons; i++ {
		go func() {
			p := payload.New(nil, nil)

			client, err := rsocket.
				Connect().
				Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 9000).Build()).
				Start(context.Background())
			if err != nil {
				panic(err)
			}

			now := time.Now()
			limit := make(chan struct{}, 1000)
			const n = 100_000
			wg := sync.WaitGroup{}
			wg.Add(n)
			for i := 0; i < n; i++ {
				limit <- struct{}{}

				client.
					RequestResponse(p).
					DoOnSuccess(func(input payload.Payload) error {
						atomic.AddInt64(&count, 1)
						<-limit
						wg.Done()
						return nil
					}).
					Subscribe(context.Background())
			}

			wg.Wait()
			fmt.Println(count, time.Since(now))
			conWg.Done()
		}()
	}
	conWg.Wait()
	fmt.Println(count)
	// 10_000_000/24.923s = 401235 req/s
}

build server

Hi,

Is there a third party build that you can setup for this? We have been using TravisCI for the other RSocket projects.

Competing implementations

Hello folks :)

I'm afraid we may end up with competing implementations. We have a (semi) spec compliant rsocket protocol implementation for Go, which we want to use in Thrift.

A couple weeks ago I started socializing the idea of open sourcing it in this repository and started setting up the automation to sync our codebase over here. But earlier this week I noticed you started landing your implementation and was surprised.

I don't know who's governing the 'rsocket' entity here on GitHub, but we should probably talk.

Is there a slack or irc where you hang out? Please let me know.

Word Counter example hangs indefinitely

I have tried to use RequestChannel and it seems to hang indefinitely on new requests from client. I can see that when a client connects, the Acceptor function is called but none of the functions after rsocket.RequestChannel are called.

This issue is reproduceable in the word counter example. If I checkout the current HEAD of master, and run the word counter example, it will hang indefinitely with no terminal output produced.

If I checkout the original commit that introduced the example and run it, it will produce the word counts in the terminal and exit afterwards without any errors.

If I work my way back from the recent commits, I can see that d8b9e5f was most likely the commit that broke the example, but I don't understand the codebase enough to be able to provide any more insights.

I have tried this with go 1.16 and 1.15.7 so far and they both produce the same results.

client connect: rejected setup

If the server responds with a REJECTED_SETUP error, the Start client call does not return an error

Expected Behavior

if server return error (https://github.com/rsocket/rsocket-go/blob/master/examples/echo/echo.go#L55) - the callback should work and an error should be returned to the client

Actual Behavior

the error is returned through an additional mechanism, such as a context, and this happens asynchronously

Steps to Reproduce

https://github.com/kuronyago/rx
go run cmd/server/main.go
go run cmd/client/main.go

Possible Solution

maybe this is the expected behavior, but then it is not clear how to get a REJECTED_SETUP error in sync, the only option I can think of is time.Sleep, but that is disgusting

Your Environment

  • RSocket version(s) used: github.com/rsocket/rsocket-go v0.8.0
  • OS and version (eg uname -a): Linux 4.19.128

RSocket mono failure.

Hey recently I encountered an error with communication via RSocket.

The idea of code is to provide a tree structure of file system on the other side of communication.

On the server side, where file system calculation is done I receive following log from this place:
https://github.com/d-wojciechowski/plm-companion-addon/blob/golang-just-16-update/server/FileServerService.go
please take a look at handler function Navigate, where that calculation is done.

Server starting with parameters: -v: true, -noWnc: true, -port: 4040
INFO : 2021/04/24 10:59:58 Server starting with parameters: -v: true, -noWnc: true, -port: 4040
INFO : 2021/04/24 10:59:58 Attempt to create server on addr 0.0.0.0:4040
INFO : 2021/04/24 10:59:58 Server instance started on addr 0.0.0.0:4040
INFO : 2021/04/24 11:00:13 Acceptor initialization started
INFO : 2021/04/24 11:00:13 RequestResponse initialization start
INFO : 2021/04/24 11:00:13 RequestResponse initialization start
INFO : 2021/04/24 11:00:13 RequestChannel initialization start
INFO : 2021/04/24 11:00:13 RequestChannel initialization ended
INFO : 2021/04/24 11:00:13 requestStreamHandler initialization start
INFO : 2021/04/24 11:00:13 requestStreamHandler initialization ended
INFO : 2021/04/24 11:00:13 Acceptor initialization ended
INFO : 2021/04/24 11:00:13 Service FileService with method navigate execution start
INFO : 2021/04/24 11:00:13 Navigation to 
INFO : 2021/04/24 11:00:13 Path is C:, Users, allst, AppData, Local, Temp
INFO : 2021/04/24 11:00:13 Starting build of response data
INFO : 2021/04/24 11:00:13 Build C: path element
INFO : 2021/04/24 11:00:13 Build Users path element
INFO : 2021/04/24 11:00:13 Build allst path element
INFO : 2021/04/24 11:00:13 Build AppData path element
INFO : 2021/04/24 11:00:13 Build Local path element
INFO : 2021/04/24 11:00:13 Build Temp path element
INFO : 2021/04/24 11:00:13 Full response build finished.
INFO : 2021/04/24 11:00:13 Service FileService with method navigate execution ended
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x28 pc=0x891f0c]

goroutine 46 [running]:
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).sendPayload.func2.1()
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1062 +0x2c
github.com/rsocket/rsocket-go/core/framing.(*writeableFrame).Done(...)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/writeable.go:25
github.com/rsocket/rsocket-go/core/framing.(*WriteablePayloadFrame).Done(0xc00009ea00)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/writeable_payload.go:88 +0xc7
github.com/rsocket/rsocket-go/core/transport.(*Transport).Send.func1(0xc00022dd50, 0xa7bc98, 0xc00009ea00)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/transport/transport.go:112 +0x4d
github.com/rsocket/rsocket-go/core/transport.(*Transport).Send(0xc0000a40c0, 0xa7bc98, 0xc00009ea00, 0xc00022dd00, 0x0, 0x0)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/transport/transport.go:124 +0x1ad
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).drainOne(0xc00006d180, 0xa7bc98, 0xc00009ea00, 0x0)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1182 +0x134
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).drain(0xc00006d180, 0x0, 0xc000302000)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1163 +0x165
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).LoopWrite(0xc00006d180, 0xa79758, 0xc000050ec0, 0x0, 0x0)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1299 +0x172
github.com/rsocket/rsocket-go/internal/socket.(*resumeServerSocket).Start(0xc000030e60, 0xa79758, 0xc000050ec0, 0x0, 0x0)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/resumable_server_socket.go:36 +0x7d
github.com/rsocket/rsocket-go.(*server).Serve.func3.2(0xa79758, 0xc000050ec0, 0xa80a18, 0xc000030e60)
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/server.go:202 +0x50
created by github.com/rsocket/rsocket-go.(*server).Serve.func3
	C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/server.go:201 +0x6b0

Panic is being raised after
return mono.Just(toPayload(getFullResult(root, protoPath.FullExpand), make([]byte, 1)))

On kotlin/java side I have following exception:

java.lang.RuntimeException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

code which is responsible for that :

        return FileServiceClient(connector.getConnection())
            .navigate(pathObj)
            .retry(5)
            .block() ?: emptyResponse

The FileServiceClient is generated by rsocket-rpc-java, code you can check here : https://github.com/d-wojciechowski/plm-companion/blob/master/src/main/kotlin/pl/dwojciechowski/service/impl/FileServiceImpl.kt

Your Environment

  • RSocket version(s) used:
    x github.com/rsocket/rsocket-go v0.8.3
    x protobufVersion = "3.15.8"
    x rSocketRpcVersion = "0.3.0"
    x rSocketVersion = "1.1.0"
  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
    x Java 8
    x GoLang 1.16
  • OS and version (eg uname -a):
    x Windows 10 2020 H2

requestChannel cancel panic

Expected Behavior

Actual Behavior

发生异常: panic
"unreachable: should never occur socket.requestChannelCallback!"

Steps to Reproduce

rsocket.RequestChannel(func(remote flux.Flux) flux.Flux {
					var sub rx.Subscription
					remote.Subscribe(
						context.Background(),
						rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
							sub = s
							sub.Request(1)
						}),
						rx.OnComplete(func() {
							print()
						}),
						rx.OnError(func(e error) {
							print()
						}),
						rx.OnNext(func(input payload.Payload) error {
							sub.Cancel() //for some reason,i don't want continue
							return nil
						}),
					)
					return flux.Create(func(ctx context.Context, s flux.Sink) {})
				}),

Possible Solution

	switch vv := v.(type) {
	case requestResponseCallbackReverse:
		vv.su.Cancel()
	case requestStreamCallbackReverse:
		vv.su.Cancel()
	case requestChannelCallback:
		vv.snd.Cancel() //should i?
	case respondChannelCallback:
		vv.snd.Cancel()
	default:
		panic(fmt.Sprintf("unreachable: should never occur %T!", vv))
	}

Your Environment

  • RSocket version(s) used:
  • Other relevant libraries versions (eg. netty, ...):
  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
  • OS and version (eg uname -a):

RequestStream with timeout context doesn't work

f = cli.client.RequestStream(payload.New(req.Bytes(), context.Marshal(c)))

cc, cancel := context.WithTimeout(context.TODO(), timeout)

f.
SubscribeOn(scheduler.Parallel()).
DoFinally(func(s rx.SignalType) {
//todo handler rx.SignalType
cancel()
close(rsp)
close(errs)
}).
Subscribe(
cc,
rx.OnNext(func(p payload.Payload) error {
rsp <- payload.Clone(p).Data()
return nil
}),
rx.OnError(func(e error) {
errs <- e
}),
)

Constructing an error

First of all, thank you for implementing the ability to deconstruct the error.

Now, I wish to add custom error codes into a service. For example: request validation errors, invalid authentication token error, missing tracing ID error.

The rsocket protocol states the following: [Error Code] Values in the range of 0x00301 to 0xFFFFFFFE are reserved for application layer errors.

With the current version (v0.5.10), the only way to pass custom error codes is with a custom error object serialized into an error string passed to a mono, flux and sink Error(err error) function or returned from the Acceptor function. This to me seems like a bad way of doing it, and I was wondering is there a possibility of implementing a way of specifying error code alongside the message?

From what I can see, with writeError function almost everything is in place for this to be supported and all we need is an interface to construct the new frame error. Something along the lines of:

struct ErrorStruct struct {
    code uint32
    message string
}

func NewErrorStruct(code uint32, message string) *ErrorStruct { ... }

// duplex.go
...
func (p *DuplexRSocket) writeError(sid uint32, e error) {
	// ignore sening error because current socket has been closed.
	if e == errSocketClosed {
		return
	}
	if v, ok := e.(*framing.FrameError); ok {
		p.sendFrame(v)
	} else if v, ok := e.(*ErrorStruct); ok {
                // perhaps some validation in a case the code in out of acceptable range
		p.sendFrame(framing.NewFrameError(sid, v.code, []byte(v.message)))
	} else {
		p.sendFrame(framing.NewFrameError(sid, common.ErrorCodeApplicationError, []byte(e.Error())))
	}
}
...

Support Protobuf

Protobuf is an important encoding mechanism. It is especially important for golang since GRpc is widely used by go programmers.

Panic seen when trying to close subscription

We are using flux and trying to cancel a subscriber using rx.subscription.Cancel().

But we are seeing the following panic error:

panic: frame has been released!

goroutine 682 [running]: 
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).HasFlag(...)
        /go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:47
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).trySeekMetadataLen(0xc00007ea80?, 0x5578bb59a7cd?)
        /go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:100 +0x19d
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).trySliceMetadata(0xc000c75f50, 0x0)
        /go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:113 +0x27
github.com/rsocket/rsocket-go/core/framing.(*PayloadFrame).Metadata(0x1b2?)
        /go/pkg/mod/github.com/rsocket/[email protected]/core/framing/payload.go:61 +0x1e 

Please provide any suggestion/advise if we're missing anything here.

Your Environment

go version: 1.19.4
rsocket library version: v0.8.12
libraries used: rx/subscriber.go which uses reactor.Subscription (reactor-go/subscriber.go)

  • RSocket version(s) used: v0.8.12
  • Other relevant libraries versions (eg. netty, ...): reactor v0.5.5
  • Platform (eg. JVM version (javar -version) or Node version (node --version)):
  • OS and version (eg uname -a): macOS Ventura

rpc pool

同一台机器使用多个客户端连接池,性能急剧下降。
//1.2021-01-24
//cpu: i9 2.3hz
//rom:16G
//direct use origin rsocket
//-------------tps------------ 153754
//-------------tps------------ 154480
//-------------tps------------ 153816
//-------------tps------------ 159207
//-------------tps------------ 156221

使用pool建立多个连接之后
//-------------tps------------ 67442
//-------------tps------------ 68952
//-------------tps------------ 68716
//-------------tps------------ 70428
//-------------tps------------ 67465
//-------------tps------------ 67358

Access rsocket Serve context from within Acceptor

Currently there is not way of accesing context that was used to run rsocket server.

Motivation

Example 1 - I could inititate logger with some options that I attach to the context that is used to run rsocket server. Then I would acces the same context in my AbstractSocket function so I could use the same logger.

Example 2 - If my abstract socket has a long running task, having cancelable context available means that I could gracefully stop the execution if the server was requested to shutdown.

Desired solution

First thing that comes to mind is to change the signature of the OptAbstractSocket to func(context.Context, *socket.AbstractRSocket) thus having context available on every request.

Considered alternatives

Use the main context when creating AbstractSocket.

Example code

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx/mono"
)

func main() {
	go server()
	time.Sleep(time.Second)
	client()
}

func server() {
	mainCtx := context.WithValue(context.Background(), "KEY", "VALUE")
	err := rsocket.Receive().
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
					// no context available here
					// return mono.Just(msg)
					return mono.Create(func(ctx context.Context, sink mono.Sink) {
						// ctx is not the same context
						fmt.Printf("key KEY value: %v\n", ctx.Value("KEY"))
						sink.Success(msg)
					})
				}),
			), nil
		}).
		Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
		Serve(mainCtx)
	log.Fatalln(err)
}

func client() {
	cli, err := rsocket.Connect().
		SetupPayload(payload.NewString("Hello", "World")).
		Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()).
		Start(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.Close()
	// Send request
	result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
	if err != nil {
		panic(err)
	}
	log.Println("response:", result.DataUTF8())

}

Build failure on 1.18

When building using go version 1.18.x the build fails with:

github.com/jjeffcaii/reactor-go/flux
../../../../go/pkg/mod/github.com/jjeffcaii/[email protected]/flux/types.go:37:16: invalid use of type alias FnSwitchOnFirst in recursive type (see issue #50729)

Compile failed on ARM platform.

⨯ release failed after 1.09s error=failed to build for linux_arm_5: exit status 2: # github.com/rsocket/rsocket-go/internal/common
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:36:12: constant 4294967295 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:36:24: constant 4294967296 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:104:20: constant 4294967295 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:104:32: constant 4294967296 overflows int

Flux switchOnFirst operator support

for bi-direction channel to trigger first item.

public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)

https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#switchOnFirst-java.util.function.BiFunction-

Java code in RSocket

public interface ResponderRSocket extends RSocket {
  /**
   * Implement this method to peak at the first payload of the incoming request stream without
   * having to subscribe to Publish&lt;Payload&gt; payloads
   *
   * @param payload First payload in the stream - this is the same payload as the first payload in
   *     Publisher&lt;Payload&gt; payloads
   * @param payloads Stream of request payloads.
   * @return Stream of response payloads.
   */
  default Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
    return requestChannel(payloads);
  }
}

开启租约和fragmet=4096的限制后

2021-02-09 16:27:06.195195 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:06.195223 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:06.195242 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:06.195258 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:07.092670 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:07.092704 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.092721 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.092730 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:07.200096 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:07.200122 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.200136 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.200144 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:08.095270 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:08.095290 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:08.095306 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:08.095313 I | [ERROR] flush failed: flush failed: short write

api:https://github.com/go-roc/roc/blob/main/_example/api/api.hello/main.go
srv:https://github.com/go-roc/roc/blob/main/_example/srv/srv.hello/main.go

客户端配置:
https://github.com/go-roc/roc/blob/232e721817fe6390a75bcf6523626f528c61c3d8/client/client.go#L29
服务器启动配置:
https://github.com/go-roc/roc/blob/232e721817fe6390a75bcf6523626f528c61c3d8/server/server.go#L146

Checksum mismatch for go 1.13 for ants Dependency

Seems that there is some problem with the checksum validation of the github.com/panjf2000/ants dependency.

Output of go get -u github.com/rsocket/rsocket-go:

go: finding github.com/rsocket/rsocket-go v0.3.2
go: downloading github.com/rsocket/rsocket-go v0.3.2
go: extracting github.com/rsocket/rsocket-go v0.3.2
go: downloading github.com/pkg/errors v0.8.1
go: downloading github.com/jjeffcaii/reactor-go v0.0.16
go: extracting github.com/jjeffcaii/reactor-go v0.0.16
go: downloading github.com/panjf2000/ants v1.2.0
go: extracting github.com/pkg/errors v0.8.1
verifying github.com/panjf2000/[email protected]: checksum mismatch
        downloaded: h1:Ufw4aDz9RqH1RVblx2W9L9Uv5vSX5apbX5+peR7LQ5k=
        sum.golang.org: h1:pMQ1/XpSgnWx3ro4y1xr/uA3jXUsTuAaU3Dm0JjwggE=

SECURITY ERROR
This download does NOT match the one reported by the checksum server.
The bits may have been replaced on the origin server, or an attacker may
have intercepted the download attempt.

For more information, see 'go help module-auth'.

/go/pkg/mod/github.com/rsocket/[email protected]/rx/subscriber.go:12:60: cannot use func literal (type func(interface {})) as type func(interface {}) error in argument to reactor.OnNext

/go/pkg/mod/github.com/rsocket/[email protected]/rx/subscriber.go:12:60: cannot use func literal (type func(interface {})) as type func(interface {}) error in argument to reactor.OnNext

go.mod

module rmw

go 1.15

require (
        github.com/golang/protobuf v1.4.2
        github.com/jjeffcaii/reactor-go v0.2.2
        github.com/rsocket/rsocket-go v0.5.13
        github.com/rsocket/rsocket-rpc-go v0.0.1
        github.com/stretchr/testify v1.6.1
)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.