nfibrokerage / slipstream Goto Github PK
View Code? Open in Web Editor NEWA slick WebSocket client for Phoenix Channels
Home Page: https://hex.pm/packages/slipstream
License: Apache License 2.0
A slick WebSocket client for Phoenix Channels
Home Page: https://hex.pm/packages/slipstream
License: Apache License 2.0
instrumentation with telemetry could be valuable, but I'm not sure what events we would want to emit
what questions would we be trying to answer with telemetry?
I heavily use Dash for local doc storage and it can't seem to download the latest versions of the hex docs.
So..I really have no idea why this would be happening, it could be problem with dash itself. But reporting just in case there is something to fix.
FWIW v0.5.2
was the last doc version to work. >= 0.5.3
seems to be broken
Ill investigate a bit more, but just as an FYI for posterity..
unknown message {:gun_error, #PID<0.16322.1>, {:badstate, 'Connection needs to be upgraded to Websocket before the gun:ws_send/1 function can be used.'}}
heard in Slipstream.Connection
please open an issue in NFIBrokerage/slipstream with this message and
any available information.
starting to feel a need for #4 so we can understand how a connection gets in a state where this happens
Phoenix.LiveView.update/3
is a pretty helpful tool for updating an assign
it'd be pretty nice to have this functionality for Slipstream.Socket
s so we can use Map.merge/2
or &[topic | &1]
to update values over time
if a Phoenix.Channel
can reply with a reply signature ({:reply, reply, socket}
) or Phoenix.Channel.reply/2
, surely we the client writers should be able to reply to server pushes?
Hello,
when I connect to a socket that rejects the connection in the sockets connect
function, i get a gun error
14:20:29.039 [error] unknown message {:gun_response, #PID<0.532.0>, #Reference<0.1111790471.536608770.196337>, :fin, 403, [{"cache-control", "max-age=0, private, must-revalidate"}, {"content-length", "0"}, {"date", "Fri, 18 Jun 2021 12:20:28 GMT"}, {"server", "Cowboy"}]} heard in Slipstream.Connection.Pipeline please open an issue in NFIBrokerage/slipstream with this message and any available information.
and the slipstream handle_disconnect
callback is called several seconds (10 or more) later with the reason :closed_by_remote
.
Proper handling via the handle_disconnect
or maybe a new handle_reject
callback would be nice.
Hey there!
First off, love the library. Huge thanks for building it ๐
Documentation in question: https://github.com/NFIBrokerage/slipstream/blob/main/lib/slipstream.ex#L537
As I'm using it, I noticed I was getting an error due to double wrapping the socket in :ok
tuples. Is line meant to be
@impl Slipstream
def handle_disconnect(_reason, socket) do
reconnect(socket)
end
instead of {:ok, reconnect(socket)
?
So far, this lib is proving very intriguing and promising. Thanks! โค๏ธ
One of the pitfalls we're running into, however, is the requirement for :phoenix
as a dependency which seems to only be for the %Phoenix.Socket.Message{}
struct used in a few places
In some cases, like mine, where dependencies are tightly controlled, this can be a deal breaker.
I see that there are some local and testing cases that require it, so it could be scoped to dev
and test
:
{:phoenix, "~> 1.0", only: [:dev, :test]}
As for the struct, my current idea is just to reimplement and change as needed when phoenix changes. Or some clever meta programming could be done to pull definition from the dev dependency, but might be getting too clever there ๐.
Though I might actually suggest supporting the concept of a :serializer
that can also be configured by the user. That would make this match functionality with phoenix and support those either using the V1 serializer, or those who have abandoned JSON as the primary serialization and made their own for the server instead of forcing the V2 serialization structure thats default in phoenix now.
Thoughts?
I have been tinkering to see if a successful return value for push/4
implies any kind of network ack. I wondered - does it mean that my channel process on the other end has the message in its inbox?
The answer is "no". In my testing, if I'm pushing to a remote server, push/4
returns {:ok, ref}
even if I've turned off WiFi (as long as I use heartbeat_interval_msec: 0
so that Slipstream doesn't notice that the connection is down).
The docs say
push/4
blocks until the message has been sent by the transport.
which in hindsight, is fairly clear that no TCP ack
is implied. But I wonder: would it be better to say something like "until the message has been accepted by the OS network stack for sending"?
I have a small toy application to test connectivity from a Slipstream client to a Phoenix channel.
(Using Slipstream 1.1.0)
The app works perfectly in OTP 25:
$ iex -S mix
Erlang/OTP 25 [erts-13.2.2.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Compiling 1 file (.ex)
23:23:42.051 [info] Requesting connection to socket ws://localhost:4000/device_socket/websocket
Interactive Elixir (1.15.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
23:23:42.169 [info] Connected to channel, joining topic device:general
23:23:42.190 [info] Successfully joined device:general, sending HELLO
However, if I switch my environment to OTP 26, I get this crashing error that seems to happen when Slipstream attempts to decode a tcp message:
$ iex -S mix
23:30:05.621 [info] Requesting connection to socket ws://localhost:4000/device_socket/websocket
Erlang/OTP 26 [erts-14.0.1] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit]
Interactive Elixir (1.15.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
23:30:05.732 [info] Connected to channel, joining topic device:general
23:30:05.739 [error] GenServer #PID<0.196.0> terminating
** (FunctionClauseError) no function clause matching in Slipstream.Connection.Impl.decode_message/2
(slipstream 1.1.0) lib/slipstream/connection/impl.ex:98: Slipstream.Connection.Impl.decode_message({:error, {:malformed_reserved, <<0::size(3)>>}}, %Slipstream.Connection.State{connection_id: "ad0861bdd01fedb0", trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", client_pid: #PID<0.194.0>, client_ref: #Reference<0.3755692760.2161901571.81591>, config: %Slipstream.Configuration{uri: %URI{scheme: "ws", authority: "localhost:4000", userinfo: nil, host: "localhost", port: 4000, path: "/device_socket/websocket", query: nil, fragment: nil}, heartbeat_interval_msec: 30000, headers: [], serializer: Slipstream.Serializer.PhoenixSocketV2Serializer, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], mint_opts: [protocols: [:http1]], extensions: [], test_mode?: false}, conn: %Mint.HTTP1{host: "localhost", port: 4000, request: nil, streaming_request: nil, socket: #Port<0.4>, transport: Mint.Core.Transport.TCP, mode: :active, scheme_as_string: "http", requests: {[], []}, state: :open, buffer: "", proxy_headers: [], private: %{scheme: :ws, mode: :active, extensions: [], sec_websocket_key: "InnXaRXzD6Kjl48WKxOuYg==", websockets: [#Reference<0.3755692760.2161901572.79796>]}, log: false}, websocket: %Mint.WebSocket{extensions: [], fragment: nil, private: %{}, buffer: ""}, request_ref: #Reference<0.3755692760.2161901572.79796>, join_params: nil, heartbeat_timer: {:interval, #Reference<0.3755692760.2161901572.79812>}, heartbeat_ref: nil, metadata: %{connection_id: "ad0861bdd01fedb0", state: %Slipstream.Connection.State{connection_id: "ad0861bdd01fedb0", trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", client_pid: #PID<0.194.0>, client_ref: #Reference<0.3755692760.2161901571.81591>, config: %Slipstream.Configuration{uri: %URI{scheme: "ws", authority: "localhost:4000", userinfo: nil, host: "localhost", port: 4000, path: "/device_socket/websocket", query: nil, fragment: nil}, heartbeat_interval_msec: 30000, headers: [], serializer: Slipstream.Serializer.PhoenixSocketV2Serializer, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], mint_opts: [protocols: [:http1]], extensions: [], test_mode?: false}, conn: nil, websocket: nil, request_ref: nil, join_params: nil, heartbeat_timer: nil, heartbeat_ref: nil, metadata: nil, status: :opened, joins: %{}, leaves: %{}, current_ref: 0, current_ref_str: "0"}, trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", start_time: ~U[2023-06-19 21:30:05.693241Z], start_time_monotonic: -576460751472882083}, status: :connected, joins: %{"device:general" => "1"}, leaves: %{}, current_ref: 1, current_ref_str: "1"})
(slipstream 1.1.0) lib/slipstream/connection/pipeline.ex:122: anonymous fn/2 in Slipstream.Connection.Pipeline.decode_message/1
(elixir 1.15.0) lib/enum.ex:1693: Enum."-map/2-lists^map/1-1-"/2
(slipstream 1.1.0) lib/slipstream/connection/pipeline.ex:121: Slipstream.Connection.Pipeline.decode_message/1
(slipstream 1.1.0) lib/slipstream/connection/pipeline.ex:34: anonymous fn/1 in Slipstream.Connection.Pipeline.handle/2
(slipstream 1.1.0) lib/slipstream/connection/telemetry.ex:29: anonymous fn/2 in Slipstream.Connection.Telemetry.span/2
(telemetry 1.2.1) /Users/guille/personal/channels_mvp/slipstream_client/deps/telemetry/src/telemetry.erl:321: :telemetry.span/3
(slipstream 1.1.0) lib/slipstream/connection/telemetry.ex:25: Slipstream.Connection.Telemetry.span/2
Last message: {:tcp, #Port<0.4>, <<129, 68, 91, 34, 49, 34, 44, 34, 49, 34, 44, 34, 100, 101, 118, 105, 99, 101, 58, 103, 101, 110, 101, 114, 97, 108, 34, 44, 34, 112, 104, 120, 95, 114, 101, 112, 108, 121, 34, 44, 123, 34, 114, 101, 115, 112, 111, ...>>}
State: %Slipstream.Connection.State{connection_id: "ad0861bdd01fedb0", trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", client_pid: #PID<0.194.0>, client_ref: #Reference<0.3755692760.2161901571.81591>, config: %Slipstream.Configuration{uri: %URI{scheme: "ws", authority: "localhost:4000", userinfo: nil, host: "localhost", port: 4000, path: "/device_socket/websocket", query: nil, fragment: nil}, heartbeat_interval_msec: 30000, headers: [], serializer: Slipstream.Serializer.PhoenixSocketV2Serializer, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], mint_opts: [protocols: [:http1]], extensions: [], test_mode?: false}, conn: %Mint.HTTP1{host: "localhost", port: 4000, request: nil, streaming_request: nil, socket: #Port<0.4>, transport: Mint.Core.Transport.TCP, mode: :active, scheme_as_string: "http", requests: {[], []}, state: :open, buffer: "", proxy_headers: [], private: %{scheme: :ws, mode: :active, extensions: [], sec_websocket_key: "InnXaRXzD6Kjl48WKxOuYg==", websockets: [#Reference<0.3755692760.2161901572.79796>]}, log: false}, websocket: %Mint.WebSocket{extensions: [], fragment: nil, private: %{}, buffer: ""}, request_ref: #Reference<0.3755692760.2161901572.79796>, join_params: nil, heartbeat_timer: {:interval, #Reference<0.3755692760.2161901572.79812>}, heartbeat_ref: nil, metadata: %{connection_id: "ad0861bdd01fedb0", state: %Slipstream.Connection.State{connection_id: "ad0861bdd01fedb0", trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", client_pid: #PID<0.194.0>, client_ref: #Reference<0.3755692760.2161901571.81591>, config: %Slipstream.Configuration{uri: %URI{scheme: "ws", authority: "localhost:4000", userinfo: nil, host: "localhost", port: 4000, path: "/device_socket/websocket", query: nil, fragment: nil}, heartbeat_interval_msec: 30000, headers: [], serializer: Slipstream.Serializer.PhoenixSocketV2Serializer, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], mint_opts: [protocols: [:http1]], extensions: [], test_mode?: false}, conn: nil, websocket: nil, request_ref: nil, join_params: nil, heartbeat_timer: nil, heartbeat_ref: nil, metadata: nil, status: :opened, joins: %{}, leaves: %{}, current_ref: 0, current_ref_str: "0"}, trace_id: "6971b906fd5bd2013fd8b7b2e8ff7ad9", start_time: ~U[2023-06-19 21:30:05.693241Z], start_time_monotonic: -576460751472882083}, status: :connected, joins: %{"device:general" => "1"}, leaves: %{}, current_ref: 1, current_ref_str: "1"}
Not sure if this is a known issue from OTP 26 or if there is some extra configuration that I'm missing. Any pointers?
Would you be open to changing the API of reconnect/1
to also accept the same options as connect/2
?
If you are working with an authenticated websocket server and your token expires (403), you'll want to refresh the token and then reconnect with a new query param. If reconnect/1
also took the same options as connect/2
, you could handle the token refresh in response to a 403 and rebuild the uri inside of handle_disconnect
.
I am open to helping out here, but I wanted to check first if the changes made sense. What do you think? Or is there a better way to handle this scenario?
Thanks
Hello!
Thanks for the library, we've been really enjoying it.
Recently we noticed our server taking up a lot of memory and we tracked it down to Slipstream when joining lots of channels concurrently. When we put a delay between the joins, the memory goes back down to nearly nothing.
I put together a quick phx app to reproduce the situation here https://github.com/cadebward/slipstream_memory.
Run that app as you normally would and open up whatever you use to monitor memory. I'm running MacOS and I see memory go straight up to 20Gb within a few moments of the server being online.
Here's the point of interest: https://github.com/cadebward/slipstream_memory/blob/master/lib/slip/room_server.ex#L43-L51
According to this comment , I'm supposed to open an issue ๐
I'm trying to experiment with Slipstream to connect to a server via Client-side SSL (This is for connecting Nerves device to nerves-hub.org BTW) and that seems to be causing an error here. I might not have gun options right as it is new to me..
Happy to help test and contribute as needed.
tls_opts = [
cacerts: [
<<der_encoded_cert, 1, ...>>,
<<der_encoded_cert, 2, ...>>
],
keyfile: "nerves-hub/poser-x86_64-key.pem",
certfile: "nerves-hub/poser-x86_64-cert.pem",
verify: :verify_peer,
server_name_indication: 'device.nerves-hub.org'
]
uri = "wss://0.0.0.0:4001/socket/websocket"
opts = [
gun_open_options: %{transport: :tls, tls_opts: tls_opts},
uri: uri
]
connect(opts)
12:12:02.649 [error] GenServer #PID<0.490.0> terminating
** (MatchError) no match of right hand side value: {:error, {:options, {:tls_opts, [cacerts: [<<der_encoded_cert, 1, ....>>], keyfile: "nerves-hub/poser-x86_64-key.pem", certfile: "nerves-hub/poser-x86_64-cert.pem", verify: :verify_peer, server_name_indication: 'device.nerves-hub.org']}}}
(slipstream 0.0.0) lib/slipstream/connection/impl.ex:28: Slipstream.Connection.Impl.connect/1
(slipstream 0.0.0) lib/slipstream/connection/pipeline.ex:179: Slipstream.Connection.Pipeline.handle_message/1
(slipstream 0.0.0) lib/slipstream/connection/pipeline.ex:36: anonymous fn/1 in Slipstream.Connection.Pipeline.handle/2
(slipstream 0.0.0) lib/slipstream/connection/telemetry.ex:29: anonymous fn/2 in Slipstream.Connection.Telemetry.span/2
(telemetry 0.4.2) /home/jonjon/repos/poser/deps/telemetry/src/telemetry.erl:262: :telemetry.span/3
(slipstream 0.0.0) lib/slipstream/connection/telemetry.ex:25: Slipstream.Connection.Telemetry.span/2
(stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
(stdlib 3.14) gen_server.erl:431: :gen_server.loop/7
Last message: {:continue, :connect}
State: %Slipstream.Connection.State{client_pid: #PID<0.488.0>, client_ref: #Reference<0.3467614670.2497970181.119212>, config: %Slipstream.Configuration{gun_open_options: %{protocols: [:http], tls_opts: [cacerts: [<<der_encoded_cert, 1, ....>>], keyfile: "nerves-hub/poser-x86_64-key.pem", certfile: "nerves-hub/poser-x86_64-cert.pem", verify: :verify_peer, server_name_indication: 'device.nerves-hub.org'], transport: :tls}, headers: [], heartbeat_interval_msec: 30000, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], test_mode?: false, uri: %URI{authority: "0.0.0.0:4001", fragment: nil, host: "0.0.0.0", path: "/socket/websocket", port: 4001, query: nil, scheme: "wss", userinfo: nil}}, conn: nil, connection_id: "9e8af19ef7543727", current_ref: 0, current_ref_str: "0", heartbeat_ref: nil, heartbeat_timer: nil, join_params: nil, joins: %{}, leaves: %{}, metadata: %{connection_id: "9e8af19ef7543727", start_time: ~U[2021-02-19 19:12:02.643673Z], start_time_monotonic: -576460751244116933, state: %Slipstream.Connection.State{client_pid: #PID<0.488.0>, client_ref: #Reference<0.3467614670.2497970181.119212>, config: %Slipstream.Configuration{gun_open_options: %{protocols: [:http], tls_opts: [cacerts: [<<der_encoded_cert, 1, ....>>], keyfile: "nerves-hub/poser-x86_64-key.pem", certfile: "nerves-hub/poser-x86_64-cert.pem", verify: :verify_peer, server_name_indication: 'device.nerves-hub.org'], transport: :tls}, headers: [], heartbeat_interval_msec: 30000, json_parser: Jason, reconnect_after_msec: [10, 50, 100, 150, 200, 250, 500, 1000, 2000, 5000], rejoin_after_msec: [100, 500, 1000, 2000, 5000, 10000], test_mode?: false, uri: %URI{authority: "0.0.0.0:4001", fragment: nil, host: "0.0.0.0", path: "/socket/websocket", port: 4001, query: nil, scheme: "wss", userinfo: nil}}, conn: nil, connection_id: "9e8af19ef7543727", current_ref: 0, current_ref_str: "0", heartbeat_ref: nil, heartbeat_timer: nil, join_params: nil, joins: %{}, leaves: %{}, metadata: nil, status: :opened, stream_ref: nil, trace_id: "2efa9caa07a01c2b9c0b541007a8b9db"}, trace_id: "2efa9caa07a01c2b9c0b541007a8b9db"}, status: :opened, stream_ref: nil, trace_id: "2efa9caa07a01c2b9c0b541007a8b9db"}
some clients simply re-broadcast a message after joining a topic
I've been calling them a Repeater in the Haste implementation
it probably makes sense to solidify this with an example+tutorial
I'm trying out this library to create a client that connects to a pretty standard Phoenix channel on the server. I'd like to use token based authentication, as shown in the Phoenix docs. This is how it looks on the server:
def connect(%{"token" => token}, socket, _connect_info) do
case Tokens.authenticate_token(token) do
...
And the question is: how do I pass the token from the client? There's no params
option in the configuration and I don't know how headers
/mint_opts
can be used here.
Hi, I've been using slipstream for a little bit and it's been great. I ran into one issue that I think might be a bug but I'm not sure. I have the following code in a slipstream-powered genserver:
@impl true
def handle_continue(:await_connection, %{assigns: %{partner: partner, uri: uri}} = socket) do
socket =
with {:ok, socket} <- connect(uri: uri),
{:ok, socket} <- await_connect(socket) do
Logger.debug("Joined #{partner}")
join(socket, "policy_updates")
end
{:noreply, socket}
end
I assign the partner
on init, and it's just an atom, the idea is I'm spinning up multiple of these servers for each partner
. If I inspect the socket assigns before and after, I notice that the assigns get cleared after the call to join/3
, because my handle_join/2
blows up as it expects a partner
in the assigns. IO.inspect
ing the socket reveals the assigns are there before join/3
and then is an empty map afterwards. I can easily fix this issue by just assigning the partner
again after the join, but it felt like something was up. If this is intentional behavior feel free to disregard.
slipstream should provide tooling for testing connections
this shouldn't be exceptionally hard to implement: the Slipstream.Test
(or whatever module) should provide functions for sending Slipstream.Events
directly to the client
we can also add a flag in configuration test_mode?
(default false
) which could turn off all :gun
operations (so the test process does not try to connect to any server)
unknown message {:gun_down, #PID<0.24164.0>, :http, :normal, [#Reference<0.1639604841.1126694913.160354>], []}
heard in Slipstream.Connection
please open an issue in NFIBrokerage/slipstream with this message and
any available information.
implemented a client lately that wanted to maximize uptime to its joined topic, re-joining on disconnect (e.g. the websocket-server service restarting)
we can use an implementation like this (pardon my pseudocode):
defmodule MyClient do
use Slipstream
def join(topic) do
GenServer.cast(__MODULE__, {:join, topic})
end
def start_link(config) do
Slipstream.start_link(__MODULE__, config, name: __MODULE__)
end
@impl Slipstream
def init(config) do
socket =
connect!(config)
|> assign(:topics, [])
{:ok, socket}
end
@impl Slipstream
def handle_cast({:join, topic}, socket) do
{:ok, socket |> assign(:topics, [topic | socket.assigns.topic]) |> join(topic)}
end
@impl Slipstream
def handle_connect(socket) do
socket =
Enum.reduce(socket.assigns.topics, socket, fn topic, socket ->
with false <- joined?(socket, topic),
{:ok, socket} <- rejoin(socket, topic) do
socket
else
_ -> socket
end
end)
{:ok, socket}
end
end
this isn't a worry with clients that know the topic they'd like to join ahead-of-time, but for clients that join topics dynamically, they have to roll their own re-join-after-re-connect (as above)
that implementation is not too beefy, could just write an example and be done, but it might make sense to make this the default behavior if it comes in line with phoenix.js
's behavior
see #26
JSON is a good default but it's not the only encoding method out there. Phoenix supports arbitrary encoding schemes with its Phoenix.Socket.Serializer
behaviour.
Slipstream should provide a similar behaviour and allow one to pass a module which implements the behaviour as configuration. We should be able to write a custom parser in Slipstream.Configuration
that validates that a :serializer
key implements the behaviour.
(Say we provide c:Slipstream.Serializer.encode!/1
and c:Slipstream.Serializer.decode!/2
, we should be able to write a function that works with NimbleOptions by comparing proposed_module.__info__(:functions)
with Slipstream.Serializer.behaviour_info(:callbacks)
)
Default implementations should also match Phoenix's:
Hi, first of all, nice work y'all <3
I've been testing Mint.WebSocket
to create integrate a WebSocket client into our project but I couldn't use Slipstream very well.
So, I have a question, does Slipstream works with Sockets wihtout the Channel API? Only sending and receiving messages from the WebSocket? I could do it with Mint.WebSocket
, but I still want to try with Slipstream, since your API is much more simple to develop with.
I'm asking just to make sure before doing anything here.
19:43:52.840 [error] unknown message {:gun_down, #PID<0.3419.4>, :http, :closed, [#Reference<0.3754436833.3376939009.223252>], []}
heard in Slipstream.Connection
please open an issue in NFIBrokerage/slipstream with this message and
any available information.
see #7
occurred after that error, but from the same thing: HTTP being closed
I know it is not really an issue, but right now I am a bit lost and could need some help :)
I have a synchronous handle_call/3
callback like this:
def handle_call({:track_event, name, payload}, _from, %{assigns: %{channel: channel}} = socket) do
{:ok, ref} = push(socket, channel, name, payload)
result = case await_reply(ref) do
{:error, :timeout} -> :error
reply -> reply
end
{:reply, result, socket}
end
How would I test such a function? connect_and_assert_join/4
works as expected. But I do not know how to implement assert_push/4
and reply/3
. They work, but I always trigger the callbacks timeout. I could test the handle_call/3
function directly, but that triggers a process attempted to call itself
error.
shouldn't be widely used, but should be useful during these early days of using slipstream and catching rogue :gun
messages
Slipstream.push/5
should allow pushing binary data as a binary WebSocket frame. The same is allowed on the server side (docs). The typespec should become:
@spec push(
socket :: Socket.t(),
topic :: String.t(),
event :: String.t(),
- params :: json_serializable(),
+ params :: json_serializable() | {:binary, binary()},
timeout :: timeout()
) :: {:ok, push_reference()} | {:error, reason :: term()}
and anything passed in as {:binary, data}
should not be passed through the serializer.
See #28 (comment)
I'm trying to send some binary data from the server (Phoenix Channels) to the client, running Slipstream.
On the server side, this is something like:
AppWeb.Endpoint.broadcast!("channel", "topic", {:binary, binary_data})
which should invoke handle_message
on SlipStream side.
Instead this seems to be invoking handle_info/2
with the following parameters:
#1 nil
#2 state
It looks like the client side stack trace is something like:
deps/slipstream/lib/slipstream.ex:83: ClientApp.Websocket.handle_info/2
lib/slipstream/callback.ex:34
19:43:52.840 [error] unknown message {:gun_error, #PID<0.3419.4>, {:websocket, #Reference<0.3754436833.3376939009.223252>, "VFeeglQh/qqSFe9rqSM5FQ==", [], %{}}, {:closed, 'The connection was lost.'}}
heard in Slipstream.Connection
please open an issue in NFIBrokerage/slipstream with this message and
any available information.
saw this at the tail end of a service using slipstream shutting down with a slipstream connection still open.
(we do our networking with Istio, so when our services shut down, the network shuts down faster than the app, leading to nice connection failure messages like this)
https://github.com/NFIBrokerage/mint_web_socket is getting somewhat mature and it now supports HTTP/2.
Once elixir-mint/mint_web_socket#9 and associated mint PRs are merged, it'd be nice to try out replacing :gun
with it.
:gun
is fine and fun to work with, but it has (as Frank points out here nerves-hub/nerves_hub_link#68 (comment)) some very specific hex dependencies that make it hard to use cowboy (usually by proxy of phoenix) and gun in the same project without an override.
This will be a somewhat large refactor of Slipstream.Connection
so I think it's best to completely blow away the code for Slipstream.Connection
and Slipstream.Connection.Pipeline
and rewrite them from scratch. Mostly I want to do this because the paradigm of working with mint (and specifically Mint.WebSocket) is very different from the prescribed-process-architecture approach of gun (because of mint's processless architecture).
It may also be a nice time to bring in https://github.com/elixir-ecto/connection and fashion Slipstream.Connection after Spear.Connection (with the notable difference that Slipstream.Connection is a temporary genserver whose lifecycle is controlled by the user/author).
in the implementation, leaves are certainly different than topic closes
it seems most implementations would treat leaves differently than topic closes, so I think it makes sense to have a c:Slipstream.handle_leave/2
separate from c:Slipstream.handle_topic_close/3
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.