rsocket / rsocket-go Goto Github PK
View Code? Open in Web Editor NEWrsocket-go implementation
License: Apache License 2.0
rsocket-go implementation
License: Apache License 2.0
I've just heard of rsocket as an alternative to gRPC and I'm wondering what is the future of rsocket-go
.
rsocket-go
has not seen as much love as other implementation, notably c++ and especially java and has sparse commits this year. Is it because the protocol is fully implemented?
All but one TODO
has been completed. Does that mean that we could get v1
soon, or at least RC? If there are more to do, could this be updated to include what else needs to be done so that the community can engage better.
I've noticed the following in my testing with RequestResponse
:
mono.Just
// server code
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
time.Sleep(time.Second)
return mono.Just(payload.NewString("data", "metadata"))
})), nil
}).
mono.Create
// server code
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(request payload.Payload) mono.Mono {
return mono.Create(func(ctx context.Context, s mono.Sink) {
time.Sleep(time.Second)
s.Success(payload.NewString("data", "metadata"))
})
})), nil
}).
Is this by the rsocket protocol specification (could not find any reference)? Or was this a design choice or perhaps it was a non-intentional feature? In any case, will this functionality persist in v1
?
inServerBuilder interface
it unexport(private) type serverResumeOptions in OpServerResume, how to implement "Resume(opts ...OpServerResume) ServerBuilder"?
Make gRPC run on RSocket transports:
https://github.com/grpc/proposal/pull/103/files
https://github.com/grpc/proposal/blob/9b5e98916beeb8955264b28c54b8ffbdaa1d12fa/L37-go-custom-transports.md
An RSocket adapter for gRPC transportation and make RSocket and gRPC interoperate each other. Some consideration as following:
References:
Please consider to add TLS support. websocket with TLS and TCP with TLS. Now Java SDK use Netty Reactor TCPServer and TCPClient to add TLS support.
Some considerations:
Json is a popular encoding mechanism. The sdk should support Json format payload.
RSocket Java SDK implemented this feature
https://github.com/rsocket/rsocket-java/blob/develop/rsocket-core/src/main/java/io/rsocket/StreamIdSupplier.java
Consideration:
Client and server should only matter when establishing the connection. Once done, either side can be requester or responder. Their behavior should be the same.
As the title says, if I use sendingSocket
to send anything to the client that does not have Acceptor
function defined, it causes it to close the client socket without error. Not sure if this is correct behavior.
package main
import (
"context"
"fmt"
"time"
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"
)
const transportString = "tcp://127.0.0.1:7878"
func main() {
readyCh := make(chan struct{})
go server(readyCh)
<-readyCh
client()
}
var schedulerMap map[byte]scheduler.Scheduler
func server(readyCh chan struct{}) {
schedulerMap = make(map[byte]scheduler.Scheduler)
err := rsocket.Receive().
OnStart(func() {
close(readyCh)
}).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
fmt.Println("new connection")
sendingSocket.MetadataPush(payload.New(nil, []byte{0x44}))
// sendingSocket.FireAndForget(payload.New(nil, []byte{0x44}))
// sendingSocket.RequestResponse(payload.New(nil, []byte{0x44})).Subscribe(context.Background())
fmt.Println("asynchronous metadata push sent")
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Just(msg)
}),
), nil
}).
Transport(transportString).
Serve(context.Background())
panic(err)
}
func client() {
fmt.Println("new client start")
client, err := rsocket.
Connect().
OnClose(func(err error) {
fmt.Println("client closed", err)
}).
// Uncomment acceptor for the client not to close
// Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
// return rsocket.NewAbstractSocket()
// }).
Transport(transportString).
Start(context.Background())
if err != nil {
panic(err)
}
fmt.Println("new client end")
// wait for a bit so that the client closes, panic ensues
time.Sleep(time.Second)
client.
RequestResponse(payload.New(nil, nil)).
DoOnSuccess(func(input payload.Payload) {
fmt.Println("success")
}).
DoOnError(func(err error) {
fmt.Println(err)
}).
Block(context.Background())
client.Close()
}
Hi team,
I'm trying the client.RequestChannel feature and got panic in server side when shutting down the client side.(kill the client's process)
The testing code is very simple:
server:
err := rsocket.Receive().
Resume().
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
log.Printf("msg received: %+v", msg)
return mono.Just(payload.NewString("OK", ""))
}),
rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
return msgs.(flux.Flux).DoOnNext(func(msg payload.Payload) {
log.Printf("msg received: %+v", msg)
})
}),
), nil
}).
Transport("tcp://0.0.0.0:7878").
Serve(context.Background())
the panic:
2019/11/18 20:37:35 [WARN] send frame failed: send on closed channel
panic: close of closed channel
goroutine 23 [running]:
github.com/rsocket/rsocket-go/internal/framing.(*BaseFrame).Done(0xc0003f8d80)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/framing/frame.go:160 +0x2f
github.com/rsocket/rsocket-go/internal/transport.(*Transport).Send(0xc000186000, 0x13f8760, 0xc0003b1378, 0x0, 0x13eed80, 0xc000122500)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/transport/transport.go:82 +0x1a3
github.com/rsocket/rsocket-go/internal/socket.(*DuplexRSocket).drainOutBack(0xc000190000)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:984 +0xe5
github.com/rsocket/rsocket-go/internal/socket.(*DuplexRSocket).loopWrite(0xc000190000, 0x13f51c0, 0xc0000e0300, 0x0, 0x0)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1065 +0x109
github.com/rsocket/rsocket-go/internal/socket.(*resumeServerSocket).Start(0xc000122080, 0x13f51c0, 0xc0000e0300, 0x0, 0x0)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/internal/socket/server_resume.go:36 +0x89
github.com/rsocket/rsocket-go.(*server).serve.func3.2(0x13f51c0, 0xc0000e0300, 0x13f8b60, 0xc000122080)
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/server.go:220 +0x49
created by github.com/rsocket/rsocket-go.(*server).serve.func3
/Users/my-path/go_home/pkg/mod/github.com/rsocket/[email protected]/server.go:219 +0x316
although the client was shutdown without closing the channel, I suppose the server should still be running instead of exit directly
From what I can tell, there is no way of deconstructing the error message to get the error data and error code. As can be seen from the code bellow, error is of type *framing.FrameError
which is an internal package and therefore not accessible outside.
Can it be considered exposing FrameError
in some way to allow users to get parts of the error?
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
go server()
time.Sleep(time.Second)
client()
}
func server() {
err := rsocket.Receive().
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
return mono.Error(fmt.Errorf("mono error"))
})), nil
}).
Transport("tcp://127.0.0.1:7878").
Serve(context.Background())
panic(err)
}
func client() {
wg := sync.WaitGroup{}
client, err := rsocket.Connect().
Transport("tcp://127.0.0.1:7878").
Start(context.Background())
if err != nil {
panic(err)
}
wg.Add(1)
client.RequestResponse(payload.NewString("abc", "def")).DoOnSuccess(func(input payload.Payload) {
fmt.Println(input)
}).DoOnError(func(err error) {
// Unaccessible
// e := err.(*framing.FrameError)
// fmt.Println(string(e.ErrorData()))
// fmt.Println(e.ErrorCode())
fmt.Println(err)
fmt.Printf("%T\n", err)
}).DoFinally(func(s rx.SignalType) {
wg.Done()
}).Subscribe(context.Background())
wg.Wait()
client.Close()
}
For the NewWebsocketClientTransport
, the proxy function is hardcoded to http.ProxyFromEnvironment
. This is not ideal for processes where multiple Proxy settings are required. Java SDK allows the user to pass in an httpClient
, which can have its own Proxy configuration.
When invoking NewWebsocketClientTransport
, let the user provide a Proxy function that can be passed to websocket.Dialer
. The function signature is a de-facto standard, it's used by http.Transport
as well.
None.
The change is very straight-forward and provides more flexibility to the user of rsocket-go.
Proposed changes in my fork: ronakg@5133e49. I can raise the PR once we have discussed the solution in this issue.
如果我在连接客户端的时候指定了n个server,在线上跑的时候,突然一个server挂掉了,是怎么处理的呢--NewRoundRobinBalancer。你有wechat或者别的联系方式吗?群也行,我提的问题,觉得发在issues这里不太好.....另外,我发现我用etcd做服务发现是多余的。
In a flux.Create function I need to call sink.Error twice for it to actually respond with the error, otherwise it will block indefinitely.
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
)
const transportString = "tcp://127.0.0.1:7878"
const number = 13
func main() {
readyCh := make(chan struct{})
go server(readyCh)
<-readyCh
client()
time.Sleep(time.Second)
}
func server(readyCh chan struct{}) {
requestChannelHandler := rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
return flux.Create(func(ctx context.Context, s flux.Sink) {
msgs.(flux.Flux).DoOnNext(func(msg payload.Payload) {
fmt.Print(int8(msg.Data()[0]), " ")
if int8(msg.Data()[0]) == 1 {
fmt.Println("sending error")
s.Error(fmt.Errorf("error on %d", int8(msg.Data()[0])))
// uncomment bellow error for it to send error
//s.Error(fmt.Errorf("second error on %d", int8(msg.Data()[0])))
} else {
fmt.Println("sending response")
s.Next(msg)
}
}).DoOnComplete(func() {
fmt.Println("DoOnComplete server")
s.Complete()
}).DoFinally(func(sig rx.SignalType) {
fmt.Println("DoFinally server")
}).Subscribe(context.Background())
})
})
err := rsocket.Receive().
OnStart(func() { close(readyCh) }).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(requestChannelHandler), nil
}).
Transport(transportString).
Serve(context.Background())
panic(err)
}
func client() {
client, err := rsocket.Connect().Transport(transportString).Start(context.Background())
if err != nil {
panic(err)
}
defer client.Close()
wg := sync.WaitGroup{}
wg.Add(1)
counter := 0
client.RequestChannel(
flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 5; i++ {
s.Next(payload.New([]byte{byte(i)}, nil))
}
s.Complete()
}),
).DoOnNext(func(input payload.Payload) {
fmt.Println(input)
counter = counter + 1
}).DoOnError(func(err error) {
fmt.Println("error", err)
}).DoFinally(func(s rx.SignalType) {
fmt.Println("DoFinally client")
wg.Done()
}).Subscribe(context.Background())
wg.Wait()
}
Additional question: Since I can only send one error and after the error server will not send any more responses so it should stop execution of the DoOnNext
function. Is it possible to stop execution if an error occurred?
Benchmarking simple nil echo server to compare rsocket and http to see the overhead for small payloads to see how much can I squeeze out of rsocket.
For http I'm using fasthttp and it can do ~367000 req/s with 100 concurrent requests.
With RSocket, to achieve the best performance that I could I had to create 100 connections to get ~400000 req/s
In both situations the CPU was 100%.
Is it possible to increase performance? Possibly with less opened connections.
func TestHTTP(t *testing.T) {
go fasthttp.ListenAndServe(":8081", func(ctx *fasthttp.RequestCtx) { ctx.Response.SetBody(ctx.Request.Body()) })
time.Sleep(time.Second)
client := &fasthttp.Client{
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
MaxIdleConnDuration: time.Hour,
NoDefaultUserAgentHeader: true,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
Dial: (&fasthttp.TCPDialer{Concurrency: 4096, DNSCacheDuration: time.Hour}).Dial,
}
limit := make(chan struct{}, 100)
wg := sync.WaitGroup{}
const n = 10_000_000
wg.Add(n)
now := time.Now()
for i := 0; i < n; i++ {
limit <- struct{}{}
go func() {
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:8081/")
req.Header.SetMethod(fasthttp.MethodGet)
resp := fasthttp.AcquireResponse()
err := client.Do(req, resp)
fasthttp.ReleaseRequest(req)
if err != nil {
panic(fmt.Sprintf("ERR Connection error: %v\n", err))
}
fasthttp.ReleaseResponse(resp)
<-limit
wg.Done()
}()
}
wg.Wait()
fmt.Println(time.Since(now))
// 10_000_000/27,181s = 367904 req/s
}
func TestRR(t *testing.T) {
go func() {
rsocket.
Receive().
Acceptor(func(ctx context.Context, setup payload.SetupPayload, socket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(request payload.Payload) (response mono.Mono) {
return mono.Create(func(ctx context.Context, s mono.Sink) {
s.Success(request)
})
}),
), nil
}).
Transport(rsocket.TCPServer().SetHostAndPort("0.0.0.0", 9000).Build()).
Serve(context.Background())
}()
time.Sleep(time.Second)
const cons = 100
count := int64(0)
conWg := sync.WaitGroup{}
conWg.Add(cons)
for i := 0; i < cons; i++ {
go func() {
p := payload.New(nil, nil)
client, err := rsocket.
Connect().
Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 9000).Build()).
Start(context.Background())
if err != nil {
panic(err)
}
now := time.Now()
limit := make(chan struct{}, 1000)
const n = 100_000
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
limit <- struct{}{}
client.
RequestResponse(p).
DoOnSuccess(func(input payload.Payload) error {
atomic.AddInt64(&count, 1)
<-limit
wg.Done()
return nil
}).
Subscribe(context.Background())
}
wg.Wait()
fmt.Println(count, time.Since(now))
conWg.Done()
}()
}
conWg.Wait()
fmt.Println(count)
// 10_000_000/24.923s = 401235 req/s
}
Hi,
Is there a third party build that you can setup for this? We have been using TravisCI for the other RSocket projects.
Add merge function for Flux
Hello folks :)
I'm afraid we may end up with competing implementations. We have a (semi) spec compliant rsocket protocol implementation for Go, which we want to use in Thrift.
A couple weeks ago I started socializing the idea of open sourcing it in this repository and started setting up the automation to sync our codebase over here. But earlier this week I noticed you started landing your implementation and was surprised.
I don't know who's governing the 'rsocket' entity here on GitHub, but we should probably talk.
Is there a slack or irc where you hang out? Please let me know.
I have tried to use RequestChannel and it seems to hang indefinitely on new requests from client. I can see that when a client connects, the Acceptor function is called but none of the functions after rsocket.RequestChannel
are called.
This issue is reproduceable in the word counter example. If I checkout the current HEAD of master, and run the word counter example, it will hang indefinitely with no terminal output produced.
If I checkout the original commit that introduced the example and run it, it will produce the word counts in the terminal and exit afterwards without any errors.
If I work my way back from the recent commits, I can see that d8b9e5f
was most likely the commit that broke the example, but I don't understand the codebase enough to be able to provide any more insights.
I have tried this with go 1.16 and 1.15.7 so far and they both produce the same results.
If the server responds with a REJECTED_SETUP error, the Start client call does not return an error
if server return error (https://github.com/rsocket/rsocket-go/blob/master/examples/echo/echo.go#L55) - the callback should work and an error should be returned to the client
the error is returned through an additional mechanism, such as a context, and this happens asynchronously
https://github.com/kuronyago/rx
go run cmd/server/main.go
go run cmd/client/main.go
maybe this is the expected behavior, but then it is not clear how to get a REJECTED_SETUP error in sync, the only option I can think of is time.Sleep, but that is disgusting
uname -a
): Linux 4.19.128Hey recently I encountered an error with communication via RSocket.
The idea of code is to provide a tree structure of file system on the other side of communication.
On the server side, where file system calculation is done I receive following log from this place:
https://github.com/d-wojciechowski/plm-companion-addon/blob/golang-just-16-update/server/FileServerService.go
please take a look at handler function Navigate, where that calculation is done.
Server starting with parameters: -v: true, -noWnc: true, -port: 4040
INFO : 2021/04/24 10:59:58 Server starting with parameters: -v: true, -noWnc: true, -port: 4040
INFO : 2021/04/24 10:59:58 Attempt to create server on addr 0.0.0.0:4040
INFO : 2021/04/24 10:59:58 Server instance started on addr 0.0.0.0:4040
INFO : 2021/04/24 11:00:13 Acceptor initialization started
INFO : 2021/04/24 11:00:13 RequestResponse initialization start
INFO : 2021/04/24 11:00:13 RequestResponse initialization start
INFO : 2021/04/24 11:00:13 RequestChannel initialization start
INFO : 2021/04/24 11:00:13 RequestChannel initialization ended
INFO : 2021/04/24 11:00:13 requestStreamHandler initialization start
INFO : 2021/04/24 11:00:13 requestStreamHandler initialization ended
INFO : 2021/04/24 11:00:13 Acceptor initialization ended
INFO : 2021/04/24 11:00:13 Service FileService with method navigate execution start
INFO : 2021/04/24 11:00:13 Navigation to
INFO : 2021/04/24 11:00:13 Path is C:, Users, allst, AppData, Local, Temp
INFO : 2021/04/24 11:00:13 Starting build of response data
INFO : 2021/04/24 11:00:13 Build C: path element
INFO : 2021/04/24 11:00:13 Build Users path element
INFO : 2021/04/24 11:00:13 Build allst path element
INFO : 2021/04/24 11:00:13 Build AppData path element
INFO : 2021/04/24 11:00:13 Build Local path element
INFO : 2021/04/24 11:00:13 Build Temp path element
INFO : 2021/04/24 11:00:13 Full response build finished.
INFO : 2021/04/24 11:00:13 Service FileService with method navigate execution ended
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x28 pc=0x891f0c]
goroutine 46 [running]:
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).sendPayload.func2.1()
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1062 +0x2c
github.com/rsocket/rsocket-go/core/framing.(*writeableFrame).Done(...)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/writeable.go:25
github.com/rsocket/rsocket-go/core/framing.(*WriteablePayloadFrame).Done(0xc00009ea00)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/writeable_payload.go:88 +0xc7
github.com/rsocket/rsocket-go/core/transport.(*Transport).Send.func1(0xc00022dd50, 0xa7bc98, 0xc00009ea00)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/transport/transport.go:112 +0x4d
github.com/rsocket/rsocket-go/core/transport.(*Transport).Send(0xc0000a40c0, 0xa7bc98, 0xc00009ea00, 0xc00022dd00, 0x0, 0x0)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/core/transport/transport.go:124 +0x1ad
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).drainOne(0xc00006d180, 0xa7bc98, 0xc00009ea00, 0x0)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1182 +0x134
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).drain(0xc00006d180, 0x0, 0xc000302000)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1163 +0x165
github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).LoopWrite(0xc00006d180, 0xa79758, 0xc000050ec0, 0x0, 0x0)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/duplex.go:1299 +0x172
github.com/rsocket/rsocket-go/internal/socket.(*resumeServerSocket).Start(0xc000030e60, 0xa79758, 0xc000050ec0, 0x0, 0x0)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/internal/socket/resumable_server_socket.go:36 +0x7d
github.com/rsocket/rsocket-go.(*server).Serve.func3.2(0xa79758, 0xc000050ec0, 0xa80a18, 0xc000030e60)
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/server.go:202 +0x50
created by github.com/rsocket/rsocket-go.(*server).Serve.func3
C:/Users/allst/go/pkg/mod/github.com/rsocket/[email protected]/server.go:201 +0x6b0
Panic is being raised after
return mono.Just(toPayload(getFullResult(root, protoPath.FullExpand), make([]byte, 1)))
On kotlin/java side I have following exception:
java.lang.RuntimeException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
code which is responsible for that :
return FileServiceClient(connector.getConnection())
.navigate(pathObj)
.retry(5)
.block() ?: emptyResponse
The FileServiceClient is generated by rsocket-rpc-java, code you can check here : https://github.com/d-wojciechowski/plm-companion/blob/master/src/main/kotlin/pl/dwojciechowski/service/impl/FileServiceImpl.kt
javar -version
) or Node version (node --version
)):uname -a
):发生异常: panic
"unreachable: should never occur socket.requestChannelCallback!"
rsocket.RequestChannel(func(remote flux.Flux) flux.Flux {
var sub rx.Subscription
remote.Subscribe(
context.Background(),
rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
sub = s
sub.Request(1)
}),
rx.OnComplete(func() {
print()
}),
rx.OnError(func(e error) {
print()
}),
rx.OnNext(func(input payload.Payload) error {
sub.Cancel() //for some reason,i don't want continue
return nil
}),
)
return flux.Create(func(ctx context.Context, s flux.Sink) {})
}),
switch vv := v.(type) {
case requestResponseCallbackReverse:
vv.su.Cancel()
case requestStreamCallbackReverse:
vv.su.Cancel()
case requestChannelCallback:
vv.snd.Cancel() //should i?
case respondChannelCallback:
vv.snd.Cancel()
default:
panic(fmt.Sprintf("unreachable: should never occur %T!", vv))
}
netty
, ...):javar -version
) or Node version (node --version
)):uname -a
):f = cli.client.RequestStream(payload.New(req.Bytes(), context.Marshal(c)))
cc, cancel := context.WithTimeout(context.TODO(), timeout)
f.
SubscribeOn(scheduler.Parallel()).
DoFinally(func(s rx.SignalType) {
//todo handler rx.SignalType
cancel()
close(rsp)
close(errs)
}).
Subscribe(
cc,
rx.OnNext(func(p payload.Payload) error {
rsp <- payload.Clone(p).Data()
return nil
}),
rx.OnError(func(e error) {
errs <- e
}),
)
rsocket是否有应用于im的案例
Hi,
In line 91 in socket.go: p.MP should be p.FF I think
Sorry, I'm a rookie in golang, I just wanna konw how to download it by go mod.
First of all, thank you for implementing the ability to deconstruct the error.
Now, I wish to add custom error codes into a service. For example: request validation errors, invalid authentication token error, missing tracing ID error.
The rsocket
protocol states the following: [Error Code] Values in the range of 0x00301 to 0xFFFFFFFE are reserved for application layer errors
.
With the current version (v0.5.10
), the only way to pass custom error codes is with a custom error object serialized into an error string passed to a mono
, flux
and sink
Error(err error)
function or returned from the Acceptor
function. This to me seems like a bad way of doing it, and I was wondering is there a possibility of implementing a way of specifying error code alongside the message?
From what I can see, with writeError
function almost everything is in place for this to be supported and all we need is an interface to construct the new frame error. Something along the lines of:
struct ErrorStruct struct {
code uint32
message string
}
func NewErrorStruct(code uint32, message string) *ErrorStruct { ... }
// duplex.go
...
func (p *DuplexRSocket) writeError(sid uint32, e error) {
// ignore sening error because current socket has been closed.
if e == errSocketClosed {
return
}
if v, ok := e.(*framing.FrameError); ok {
p.sendFrame(v)
} else if v, ok := e.(*ErrorStruct); ok {
// perhaps some validation in a case the code in out of acceptable range
p.sendFrame(framing.NewFrameError(sid, v.code, []byte(v.message)))
} else {
p.sendFrame(framing.NewFrameError(sid, common.ErrorCodeApplicationError, []byte(e.Error())))
}
}
...
Protobuf is an important encoding mechanism. It is especially important for golang since GRpc is widely used by go programmers.
We are using flux and trying to cancel a subscriber using rx.subscription.Cancel().
But we are seeing the following panic error:
panic: frame has been released!
goroutine 682 [running]:
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).HasFlag(...)
/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:47
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).trySeekMetadataLen(0xc00007ea80?, 0x5578bb59a7cd?)
/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:100 +0x19d
github.com/rsocket/rsocket-go/core/framing.(*bufferedFrame).trySliceMetadata(0xc000c75f50, 0x0)
/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/buffered.go:113 +0x27
github.com/rsocket/rsocket-go/core/framing.(*PayloadFrame).Metadata(0x1b2?)
/go/pkg/mod/github.com/rsocket/[email protected]/core/framing/payload.go:61 +0x1e
Please provide any suggestion/advise if we're missing anything here.
go version: 1.19.4
rsocket library version: v0.8.12
libraries used: rx/subscriber.go which uses reactor.Subscription (reactor-go/subscriber.go)
netty
, ...): reactor v0.5.5javar -version
) or Node version (node --version
)):uname -a
): macOS Ventura同一台机器使用多个客户端连接池,性能急剧下降。
//1.2021-01-24
//cpu: i9 2.3hz
//rom:16G
//direct use origin rsocket
//-------------tps------------ 153754
//-------------tps------------ 154480
//-------------tps------------ 153816
//-------------tps------------ 159207
//-------------tps------------ 156221
使用pool建立多个连接之后
//-------------tps------------ 67442
//-------------tps------------ 68952
//-------------tps------------ 68716
//-------------tps------------ 70428
//-------------tps------------ 67465
//-------------tps------------ 67358
Currently there is not way of accesing context that was used to run rsocket server.
Example 1 - I could inititate logger with some options that I attach to the context that is used to run rsocket server. Then I would acces the same context in my AbstractSocket
function so I could use the same logger.
Example 2 - If my abstract socket has a long running task, having cancelable context available means that I could gracefully stop the execution if the server was requested to shutdown.
First thing that comes to mind is to change the signature of the OptAbstractSocket
to func(context.Context, *socket.AbstractRSocket)
thus having context available on every request.
Use the main context when creating AbstractSocket
.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx/mono"
)
func main() {
go server()
time.Sleep(time.Second)
client()
}
func server() {
mainCtx := context.WithValue(context.Background(), "KEY", "VALUE")
err := rsocket.Receive().
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
return rsocket.NewAbstractSocket(
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
// no context available here
// return mono.Just(msg)
return mono.Create(func(ctx context.Context, sink mono.Sink) {
// ctx is not the same context
fmt.Printf("key KEY value: %v\n", ctx.Value("KEY"))
sink.Success(msg)
})
}),
), nil
}).
Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
Serve(mainCtx)
log.Fatalln(err)
}
func client() {
cli, err := rsocket.Connect().
SetupPayload(payload.NewString("Hello", "World")).
Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()).
Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
// Send request
result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
if err != nil {
panic(err)
}
log.Println("response:", result.DataUTF8())
}
When building using go version 1.18.x the build fails with:
github.com/jjeffcaii/reactor-go/flux
../../../../go/pkg/mod/github.com/jjeffcaii/[email protected]/flux/types.go:37:16: invalid use of type alias FnSwitchOnFirst in recursive type (see issue #50729)
⨯ release failed after 1.09s error=failed to build for linux_arm_5: exit status 2: # github.com/rsocket/rsocket-go/internal/common
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:36:12: constant 4294967295 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:36:24: constant 4294967296 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:104:20: constant 4294967295 overflows int
../../Documents/Code/go/pkg/mod/github.com/rsocket/[email protected]/internal/common/bytedump.go:104:32: constant 4294967296 overflows int
for bi-direction channel to trigger first item.
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>,Flux<T>,Publisher<? extends V>> transformer)
Java code in RSocket
public interface ResponderRSocket extends RSocket {
/**
* Implement this method to peak at the first payload of the incoming request stream without
* having to subscribe to Publish<Payload> payloads
*
* @param payload First payload in the stream - this is the same payload as the first payload in
* Publisher<Payload> payloads
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
*/
default Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
return requestChannel(payloads);
}
}
2021-02-09 16:27:06.195195 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:06.195223 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:06.195242 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:06.195258 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:07.092670 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:07.092704 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.092721 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.092730 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:07.200096 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:07.200122 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.200136 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:07.200144 I | [ERROR] flush failed: flush failed: short write
2021-02-09 16:27:08.095270 I | [ERROR] send resume response failed: flush failed: short write
2021-02-09 16:27:08.095290 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:08.095306 I | [ERROR] send frame failed: write frame failed: short write
2021-02-09 16:27:08.095313 I | [ERROR] flush failed: flush failed: short write
api:https://github.com/go-roc/roc/blob/main/_example/api/api.hello/main.go
srv:https://github.com/go-roc/roc/blob/main/_example/srv/srv.hello/main.go
客户端配置:
https://github.com/go-roc/roc/blob/232e721817fe6390a75bcf6523626f528c61c3d8/client/client.go#L29
服务器启动配置:
https://github.com/go-roc/roc/blob/232e721817fe6390a75bcf6523626f528c61c3d8/server/server.go#L146
Seems that there is some problem with the checksum validation of the github.com/panjf2000/ants dependency.
Output of go get -u github.com/rsocket/rsocket-go
:
go: finding github.com/rsocket/rsocket-go v0.3.2
go: downloading github.com/rsocket/rsocket-go v0.3.2
go: extracting github.com/rsocket/rsocket-go v0.3.2
go: downloading github.com/pkg/errors v0.8.1
go: downloading github.com/jjeffcaii/reactor-go v0.0.16
go: extracting github.com/jjeffcaii/reactor-go v0.0.16
go: downloading github.com/panjf2000/ants v1.2.0
go: extracting github.com/pkg/errors v0.8.1
verifying github.com/panjf2000/[email protected]: checksum mismatch
downloaded: h1:Ufw4aDz9RqH1RVblx2W9L9Uv5vSX5apbX5+peR7LQ5k=
sum.golang.org: h1:pMQ1/XpSgnWx3ro4y1xr/uA3jXUsTuAaU3Dm0JjwggE=
SECURITY ERROR
This download does NOT match the one reported by the checksum server.
The bits may have been replaced on the origin server, or an attacker may
have intercepted the download attempt.
For more information, see 'go help module-auth'.
/go/pkg/mod/github.com/rsocket/[email protected]/rx/subscriber.go:12:60: cannot use func literal (type func(interface {})) as type func(interface {}) error in argument to reactor.OnNext
go.mod
module rmw
go 1.15
require (
github.com/golang/protobuf v1.4.2
github.com/jjeffcaii/reactor-go v0.2.2
github.com/rsocket/rsocket-go v0.5.13
github.com/rsocket/rsocket-rpc-go v0.0.1
github.com/stretchr/testify v1.6.1
)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.