gbaranski / ezsockets Goto Github PK
View Code? Open in Web Editor NEWHigh-level declarative API for building WebSocket Clients and Servers in Rust ๐ฆ
Home Page: https://docs.rs/ezsockets
License: MIT License
High-level declarative API for building WebSocket Clients and Servers in Rust ๐ฆ
Home Page: https://docs.rs/ezsockets
License: MIT License
ATM the current implementation only accepts String, but would it be possible to modify the text function to accept both String and &str? It's minor, but I think it could be some nice syntactic sugar to add.
Thanks
After fixing the handshake protocol error with PR #64, I am seeing another bug when closing clients. If I close a session from either the server OR the client interface, I get the following trace:
2023-07-31T16:58:13.956716Z TRACE ezsockets::socket: sending message: Close(Some(CloseFrame { code: Normal, reason: "test" }))
2023-07-31T16:58:13.956955Z TRACE ezsockets::socket: received message: Ok(Close(Some(CloseFrame { code: Normal, reason: "test" })))
2023-07-31T16:58:13.957047Z TRACE ezsockets::socket: received message: Ok(Close(Some(CloseFrame { code: Normal, reason: "test" })))
2023-07-31T16:58:13.957062Z ERROR ezsockets::socket: failed to send message. stream is closed
The last two messages are unexpected. I have no idea why this is happening, so I hope you can investigate and fix this. There should be a valid session-closing workflow for both server and client that causes no errors in the log.
If a client wants to reconnect, it first waits for a reconnect interval to pass. Is there a specific reason it needs to be this way? I'd rather try to reconnect immediately, then if that fails wait for the reconnect interval to pass.
It was moved to the start of the loop in this commit without explanation.
There is no customization of the Ping/Pong messages sent/read to/from the server.
The assumption is that the server is always sending timestamps in milliseconds, but this is not always the case.
A common scenario for a WSS server is to send custom message (e.g. with ping UUID) and expect that same message in pong response. Not necessarily a timestamp.
Could we add a mechanism to allow the user to provide some logic for the Ping/Pong messaging process?
The way I see it, we could add an enum HeartbeatMode in Config, with multiple choice like HeartbeatMode::TimeStamp or HeartbeatMode::Custom("always-send-this-string") or HeartbeatMode::None (disable heartbeat logic) or HeartbeatMode::Random...
There could be better ways to implement this customization, but I believe adding this feature would benefit the lib.
Thanks
The Websocket spec states that:
"a Pong frame sent in response to a Ping frame must have identical 'Application data' as found in the message body of the Ping frame being replied to." - source.
This means every ping received by the client should result in a pong reply with the same bytes as the received ping.
I don't see anywhere in the codebase where this is implemented.
I am happy to fix this and make a PR, however I'd like to know where people think the best place to handle it is.
Should the pong reply happen at the Socket
level so that it is abstracted away from the client? If so, should the user be able to specify a closure to handle the payload much like they can with the heartbeat?
Or should new Message::Ping(bytes)
and Message::Pong(bytes)
be created and on_ping
and on_pong
be added to the client (with default implementations) to allow the user to decide how they want to reply?
Open to all suggestions and ideas.
Is there a way to disable the default logs like
2024-03-13T15:49:57.809170Z INFO ezsockets::server: starting websocket server
I'm trying to create a server based on the echo-server example and it's working great but I want to print my own status messages if possible.
As I see, the session hashmap is encapsulated in the Server struct. I would also need to send requests to the clients from the server side(and wait for the response from the client) based on a channel event from another tokio task(serialport). So I guess it is either
maybe you could point me to another way, how to communicate with the server instance, or would I need to write something custom myself?
as mentioned by https://reddit.com/r/rust/comments/11tg44a/_/jckkxb5/?context=1
run_on() takes get_args: impl Fn(&mut Socket) -> GetArgsFut
, but Socket is just a pair of channels. It's not possible to do anything useful with get_args, even in demos
Ping/Pong are not available in all browsers, and if they are it always happens in the background. The canonical web_sys::WebSocket
implementation offers no Ping/Pong functionality.
This is a problem because I want to enable browser-based websocket clients in ezsockets (e.g. this is the best contender), but ezsockets depends on Ping/Pong
for the heartbeat. (I want to abstract out the client's tokio_tungstenite::connect_async()
call.)
To get around this, I want to:
A) Move the Socket
's last_alive
to always update when receiving a message, instead of only when receiving a Pong.
B) In my downstream project, use Message::Text
to manually implement a Ping/Pong keep-alive when compiling to WASM.
C) Disable the keep-alive mechanism entirely when compiling to WASM (do this in addition to A, since servers won't compile to WASM and still need the keep-alive mechanism).
Alternatively, we could inject the keep-alive Ping/Pong into Message::Text
in ezsockets (pretty invasive).
In my use case of communicating to a game server's WS API, an additional authentication header is required in the initial HTTP connection, which can be achieved with tokio_tungstenite
for example.
ezsockets::client::ClientConfig
however currently seems to be limited to only being able to add a basic auth header..?
I'm curious if there's a reason for not exposing the headers & only allowing addition of basic auth?
I had trouble understanding this crate's API because a lot of the names overlap with each other. After lots of reading the source code...
Suggestions:
SessionExt
-> SessionHandler
ClientExt
-> ClientHandler
ServerExt
-> ConnectionHandler
SessionExt::on_text()/on_binary()/on_call()
-> SessionHandler::on_client_text()/on_client_binary()/on_session_call()
ClientExt::on_text()/on_binary()/on_call()
-> ClientHandler::on_server_text()/on_server_binary()/on_client_call()
ClientExt::on_close()
-> ClientHandler::on_server_close()
ServerExt::on_call()
-> ConnectionHandler::on_server_call()
(can also do on_client_connect()/on_client_disconnect()
for consistency)ServerExt::Session
-> ConnectionHandler::SessionHandler
(this the worst offender here, since ServerExt::on_connect()
returns an ezsockets::Session
NOT a ServerExt::Session
)Args
from SessionExt
and just use a generic parameter on ServerExt::on_connect()
(it is very not-obvious why SessionExt
has Args
)Messages store buffers with unbounded size. Since messages will be logged at log level trace
, log files have potentially unbounded size even with only one server/client pair. I'm not sure if the websockets backend has size constraints, but if not it may be nice to add them in ezsockets.
ClientExt::on_close()
should take the closure frame as an input, so implementers can customize what on_close()
does based on the closure reason. For example, the client may not want to reconnect if the server commanded it to close (e.g. because the client was banned, which means future reconnect attempts will be rejected).
In my usage, I extract jwt from query parameter and extract real ip address from X-forwarded-For.
But there's no way to do it now. I'd like to implement these method and raise a PR later
The calls to .unwrap()
in the client, server, and session APIs all race with the dropping of the corresponding actors. In the session code, for example, you have PANIC_MESSAGE_UNHANDLED_CLOSE
, but it is not possible to race-free test liveness before invoking one of those methods without some kind of lock (which would be hard to implement). Instead, I recommend returning Results from all of those methods. Websocket servers and clients should never panic, because that is an attack vector for DOS.
EDiT: this problem is exacerbated by the inability to call std::panic::catch_unwind()
on any of the objects in this crate, since they are not 'unwind safe'.
When my client gets disconnected, I'd like to do some custom action (like print a warning) as well as attempting to reconnect. Right now the only thing I can customize is how long it waits to reconnect, right?
Hi,
I can't find how to send text from on_text with a client so I'm asking if this is possible? and if so, could you please tell me how to do it?
Currently you can only reject a connection after the websockets backend has accepted the connection. This means clients will always see 'connected' -> 'disconnected' events even if their connection request was rejected by your custom ServerExt
.
There should be an additional step in the connection-acceptance protocol for pre-validating requests before the websockets backend can accept the connection.
It should be possible to move that step into the axum Upgrade::from_request()
method and tungstenite Acceptor .. callback
closure, although it's not clear the optimal/correct approach since server actors are async from their servers (there is a risk of introducing race conditions between pre-validating requests and registering connections in the ServerExt
, e.g. if you reject duplicate connections).
It would great if ezsockets::Client
could be used in the browser. There are a few things blocking that scenario:
tokio
is not supported in the browser.I wrote a crate enfync
that supports arbitrary async backends (including tokio
and WASM).
tokio-tungstenite
is not supported in the browser (can't do tokio_tungstenite::connect()
).There is a nice crate tokio-tungstenite-wasm
that provides a WASM client connector with a very similar interface to tokio_tungstenite::connect()
(actually it uses tokio-tungstenite
for the native backend, but we don't want that since his crate doesn't emit Ping/Pong messages).
I will write an experimental implementation using the above solutions, and follow up here.
Adding configurable socket through SocketConfig could allow users to change the default heartbeat duration, timeout and message. It is already implemented on the client side.
For tungstenite:
ezsockets/src/server_runners/tungstenite.rs
Lines 63 to 109 in 932eb74
and for Axum, on_upgrade_with_config() function is not used anywhere.
ezsockets/src/server_runners/axum.rs
Lines 149 to 160 in 932eb74
It comes down into three options:
Server::create()
, global options for all incoming sessions.Session::create()
, but it could be a little bit tricky, as Socket
is already created when the Session::create()
is called.on_upgrade_with_config()
in case of Axum. It would work only for AxumFirst option doesn't seem bad? Unless you want to have different configuration for each of the clients connected to your server, but is it actually necessary?
Looks like if first connection failed, then the websocket client will not perform reconnect. Is this by design?
If I want to reconnect, what should I do to perform pretty?
Secondly, looks like websocket client does not export reconnect interval?
use async_trait::async_trait;
use ezsockets::ClientConfig;
use std::io::BufRead;
use url::Url;
struct Client {}
#[async_trait]
impl ezsockets::ClientExt for Client {
type Params = ();
async fn text(&mut self, text: String) -> Result<(), ezsockets::Error> {
println!("received message: {text}");
Ok(())
}
async fn binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
println!("received bytes: {bytes:?}");
Ok(())
}
async fn call(&mut self, params: Self::Params) -> Result<(), ezsockets::Error> {
let () = params;
Ok(())
}
}
#[tokio::main]
async fn main() {
let url = Url::parse("ws://localhost:8080").unwrap();
let config = ClientConfig::new(url);
let (handle, future) = ezsockets::connect(|_client| Client {}, config).await;
tokio::spawn(async move {
match future.await {
Ok(_) => {
println!("OK.")
},
Err(_) => {
// First connection failed.
// What should I do to perform reconnect?
println!("Connection failed.")
},
}
// future.await.unwrap();
});
let stdin = std::io::stdin();
let lines = stdin.lock().lines();
for line in lines {
let line = line.unwrap();
println!("sending {line}");
handle.text(line);
}
}
Is WSS supported for client and server? And how would a simple example look?
Hi there, great work on this crate
I'm using an on_connect implementation such as the following to have the server kill a connection if it doesn't pass some checks (specifically, using axum to extract headers and validate data passed within them)
async fn on_connect(&mut self, socket: Socket, address: SocketAddr, args: <Self::Session as SessionExt>::Args) -> Result<ezsockets::Session<<Self::Session as SessionExt>::ID, <Self::Session as SessionExt>::Call>, Error> {
info!("New connection from {}, {:?}", &address, &args);
return Err(anyhow!("Not implemented!").into());
}
which results in the following panic (removed full backtrace, will attach if relevant)
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: ()', /Users/roee/.cargo/registry/src/github.com-1ecc6299db9ec823/ezsockets-0.5.1/src/server.rs:248:14
which leads me to the following snippet in server.rs
pub async fn accept(
&self,
socket: Socket,
address: SocketAddr,
args: <E::Session as SessionExt>::Args,
) -> <E::Session as SessionExt>::ID {
let (sender, receiver) = oneshot::channel();
self.connections
.send(NewConnection {
socket,
address,
args,
respond_to: sender,
})
.map_err(|_| ())
.unwrap();
receiver.await.unwrap()
}
A potential solution would be to eliminate the unwrap from both expressions, or modify the return type of the 'accept' function to an enumeration that includes either a session or an exit code.
Iโm very new to Rust (i just recently finished my Hello World program) but i am looking for a basic server/client system that can send JSON files back and forth for compliance checks of endpoints.
basicly the server will send the client a list of checks to perform (atm registry checks since thats what im learning) which the client performs and then sends back the results. The server will then store those results in a (postgresql) database where a web client then can grab the results and build piecharts of it.
Something like that :)
The Websockets specification does not allow additional headers in the HTTP connection request. This is a problem because additional connect headers are not supported by other websocket implementations (e.g. browser websockets).
Instead, I think we should force additional headers to go in the URI as 'query' elements.
I need to respond to connection drops on the client, can you add reconnect() and close() callback handlers to ClientExt? I could make a PR if you're busy.
When you use tokio-tungstenite
to run a server, accepting a new connection blocks the acceptor loop until the websocket handshake is complete. This opens a DOS vector where malicious clients can prevent other clients from connecting by hogging the acceptor loop.
The solution here seems to be spawning a new tokio task to finish accepting a connection after the TcpListener
connects.
If you send a message into a client, it can silently fail if the underlying connection is broken while trying to send the message. Normally when working with a raw websocket API, the send method will return an error if sending failed. However, ezsockets defers sending to the internal async mechanisms, so success/failure of a message is hidden from the API.
One possible solution is for the binary/text methods to return a handle to a oneshot channel which will contain the eventual result of sending (i.e. Result<(), ezsockets::SendError>
with a new SendError
enum). Then if the channel breaks or returns Err
, you will know that a send message fails. Doing it this way affects perf, obviously, but would be a reliable mechanism for reporting async success/failure.
Where users log in, join channels, and send texts? And if so, do you have any pointers? Iโm looking to learn rust :)
thread 'tokio-runtime-worker' panicked at 'called Result::unwrap()
on an Err
value: SendError
/// Calls a method on the session
pub async fn call(&self, params: P) {
self.calls.send(params).unwrap();
}
I assume that .unwrap()'s in many places can cause panics which makes websocket server not able to handle further connections.
Line 148 in 99534d1
Are these sends always succeeding and are fine to be unwrapped?
My idea is to rename all handlers to on_.....
, for example binary()
to on_binary()
or text()
to on_text()
.
as suggested in #15
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.