Code Monkey home page Code Monkey logo

phoenix_pubsub's People

Contributors

asonge avatar ausimian avatar bigardone avatar binaryseed avatar chrismccord avatar folz avatar gazler avatar gottfrois avatar hauntedhost avatar indrekj avatar jeremy-365 avatar jonatanklosko avatar josevalim avatar ljzn avatar pdawczak avatar peck avatar pragdave avatar princemaple avatar pzel avatar scrogson avatar shpakvel avatar simonmcconnell avatar stavro avatar studzien avatar techgaun avatar tokafish avatar trarbr avatar tverlaan avatar wojtekmach avatar zwhitchcox avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

phoenix_pubsub's Issues

Calling Presence.Track in a channel process changes the behavior of the channel when the server is stopped

If you have two channels, call them AChannel and BChannel defined as:

defmodule AChannel do
  def join("room:lobby", _, socket) do
    send self(), :after_join
    {:ok, socket}
  end
  def handle_info(:after_join, socket) do
    Presence.track(socket, "1234", %{})
    {:noreply, socket}
  end
end

defmodule BChannel do
  def join("room:lobby", _, socket) do
    {:ok, socket}
  end
end

And you deploy this server in a release, then stopping or restarting the release (eg. bin/myapp restart) has subtly different behavior:

In the AChannel case, when the server is restarted, the client will receive an onClose event.
In the BChannel case, when the server is restarted, the client will receive an onError event!

This results in the client for AChannel not attempting to reconnect to the channel when the server comes back up, but the client in BChannel will reconnect.

I believe this is due to this line:
https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/tracker.ex#L402

I think that this causes the process in AChannel to get shutdown correctly when the supervision trees are brought down, so it sends a close message, whereas people are probably expecting the behavior exhibited by BChannel.

Not sure what the fix is here - setting trap_exit on the channel process seems like overkill. Perhaps the Tracker should only set up a monitor?

Possible issue with distributed heterogeneous nodes

I have two connected nodes. One runs a Phoenix app, the other is plain ol' Elixir. The Phoenix node has pubsub started automatically, as it is a dependncy. The latter manually starts PubSub.

The Phoenix node broadcasts a message:

          Phoenix.PubSub.broadcast!(
            Hangman.Connector,
            "hangman:game_stats",
            {:game_stats, "123", "cat", true, ~w{ a b c }}
          )

On the plain ol' Elixir node, I get an error:

14:27:13.889 [error] GenServer Hangman.Connector terminating
** (UndefinedFunctionError) function Phoenix.Channel.Server.fastlane/3 is undefined (module Phoenix.Channel.Server is not available)                 
    Phoenix.Channel.Server.fastlane([{#PID<0.277.0>, nil}], :none, {:game_stats, "123", "cat", true, ["a", "b", "c"]})                               
    (phoenix_pubsub) lib/phoenix/pubsub/local.ex:101: anonymous fn/7 in Phoenix.PubSub.Local.broadcast/6                                             
    (elixir) lib/enum.ex:2940: Enum.reduce_range_inc/4
    (phoenix_pubsub) lib/phoenix/pubsub/local.ex:100: Phoenix.PubSub.Local.broadcast/6                                                               
    (phoenix_pubsub) lib/phoenix/pubsub/pg2_server.ex:52: Phoenix.PubSub.PG2Server.handle_info/2                                                     
Last message: {:forward_to_local, Phoenix.Channel.Server, :none, "hangman:game_stats", {:game_stats, "123", "cat", true, ["a", "b", "c"]}}           
State: %{name: Hangman.Connector, pool_size: 4}

I'm guessing the problem is that the Phoenix side assumes the other side is a Phoenix app and passes the Phoenix.Channel.Server module as the fastline implementation (which doesn't exist on the POE node).

Is this bad config on my part?

Dave

Presence timeout on heavy load

Environment

  • Elixir version (elixir -v): 1.4.2
  • Phoenix version (mix deps): 1.2.1
  • Operating system: Ubuntu server

Expected behavior

Presence works fine

Actual behavior

I have three nodes with these hardware
2 * Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz
64 GB Ram
256 SSD

When each of the servers is under ~300-400 channel request per second and ~35000 user connection (System load 15/56), I get timeout error from list and track methods.

Presence.list error:

** (stop) exited in: GenServer.call(App.Presence, {:list, "user:e6728e0c-3170-4863-b8b0-8e7e0b00e813"}, 5000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:737: GenServer.call/3
    lib/phoenix/tracker.ex:199: Phoenix.Tracker.list/2
    (phoenix) lib/phoenix/presence.ex:236: Phoenix.Presence.list/2
    (app) web/channels/presence.ex:8: App.Presence.user_online?/1
    (app) web/channels/user_channel.ex:318: anonymous fn/1 in App.UserChannel.push_notification/1
    (elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<2.63574933/0 in App.UserChannel./1>
    Args: []

Presence.track error:

** (stop) exited in: GenServer.call(App.Presence, {:track, #PID<0.28571.6>, "user:9cab5aab-7737-49
bc-a4a2-faba1380b95a", "online", %{}}, 5000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:737: GenServer.call/3
    (app) web/channels/user_channel.ex:67: anonymous fn/1 in App.UserChannel.handle_info/2
    (elixir) lib/task/supervised.ex:85: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<1.63574933/0 in App.UserChannel.handle_info/2>
    Args: []

This is the track code section:

  with {:ok, _} <- Presence.track(socket, "online", %{}) do
    ### do somethings
  end
  {:noreply, socket}

This is the list code section:

  def user_online?(user_id) do
    "user:" <> user_id
    |> Presence.list
    |> Map.has_key?("online")
  end

phoenixframework/phoenix#2238

Increase heartbeat time

Just send a heartbeat if you haven't broadcast any delta in a while. Basically, send an empty delta if no deltas were sent in X seconds and X seconds is the heartbeat time. We can increase it to at least 15 seconds (Erlang would detect a failure after 60 seconds). http://erlang.org/doc/man/kernel_app.html

Every time the delta is sent, we should also include the dotted vector versions.

Potential problem with documentation

In the documentation (https://hexdocs.pm/phoenix_pubsub/Phoenix.Tracker.html) under the heading "Implementing a Tracker" where it does:

defmodule MyTracker do
  @behavior Phoenix.Tracker
  ...

It then goes on to define a start_link that calls GenServer.start_link

Should that sample code not include use GenServer as well? Without it, the module does not define child_spec and, like #94, problems with the deprecation of Supervisor.Spec ensue.

(if the change is appropriate, I would be happy to put together a pull request to make it)

Remote node doesn't receive last update right before process terminates

We use Phoenix.Tracker for keeping the state of currently active calls. Each call has a certain state in the metadata of the tracker. When a process updates the state in the terminate callback, the update of the metadata is only published locally. The remote node doesn't see this updated metadata but only sees the process leave since it terminated within the broadcast_window.

I made an example project that demoes the above scenario. You can find it here: https://github.com/tverlaan/presence_multinode

We discussed possible improvements and alternative solutions during ElixirConfEU, but we didn't come to a conclusion just yet.

Pull in Presence CRDT into project

At your earliest convenience @asonge . I can even handle the namespace rename if you just want to copy the impl and test files over. I just want want to make sure your contribution stays intact :)

Ignore list

We ran into a case where the process that published to a channel is also a subscriber of that channel. Is there an easy way to pass along a list of pids that should NOT not receive the message even though they are a subscriber? It would be pretty easy for us to hack this into phoenix_pubsub but I was just wondering if there was already support such an operation before we pulled out the machete :)

Thanks,

-Chris

Make Presence.track/4 and Presence.untrack/3 idempotent

Currently it is possible to track the same {pid, topic, key} tuple multiple times, which causes error on untrack (since ets returns more then one row). I believe that operation should be idempotent(except for phx_ref), replacing the old record with new meta. Currently it is possible to crash GenServer, which probably should not be the case.

Add invariants to Tracker configuration

We need to validate and raise for violations. For example, a down_period must be at least 2x broadcast_period, and the the permdown_period must be at minimum > down_period

Phoenix.Tracker desync after connecting to node

When Phoenix.Tracker nodes connect for the first time, and boths nodes already hold presences, their state will not merge correctly.

Steps to reproduce

Pull https://github.com/szlend/tracker_test

Node 1

iex --sname n1 --cookie test -S mix

for n <- 1..51, do: Phoenix.Tracker.track(TrackerTest.Tracker, self(), "test", "#{inspect Node.self()}_#{n}", %{})

Node 2

iex --sname n2 --cookie test -S mix

for n <- 1..51, do: Phoenix.Tracker.track(TrackerTest.Tracker, self(), "test", "#{inspect Node.self()}_#{n}", %{})

Node.connect :"n1@your_hostname"

Both nodes

Wait a while for Phoenix.Tracker to do it's thing, then:

Phoenix.Tracker.list(TrackerTest.Tracker, "test") |> Enum.count

# Node 1
> 51

# Node 2
> 102

I can only reproduce this issue when the number of tracked presences is above 50 on each node. Looking at the debug logs, I can see one difference:

When N > 50, I get:

sending delta generation 1

When N <= 50, I get:

falling back to sending entire crdt

Question on Tracker.list

Hey all,

We have integrated the phoenix_pubsub into our application and it is great. I noticed in the Phoenix.Tracker.State module that that "values" ets table is created via the following command:

values: :ets.new(:values, [:ordered_set])

When one calls the Phoenix.Tracker.list function it is returning a copy of the State object and it then calls an :ets.select on :values tid. Since this query is happening on a caller process and not the GenServer managing the state object, I was wondered why {read_concurrency, true} is not given as an option?

For our use cases when a user logins in they pull a friends list and check it against the Tracker system so we are doing a lot of current reads.

Thanks!

Efficient way to get Tracker.count

Hi, I've been playing with phoenix_pubsub with the tracker feature, I've built an http endpoint to show how many tracked clients the system holds, using this:

`size = length( Phoenix.Tracker.list(My.Tracker, "bucket"))`

when system has more than 10k tracker in the registry the length function takes for ever to process. is there any way to call a count function over the Phoenix.PubSub.PG2 registry in a more efficient way ?

Zombie Pids from a restarted node may linger in the crdt.

When a replica is restarted then permdown is invoked in other nodes and the old instance removed from the State. However, it seems like not all the old instance info is cleaned up and sometimes the old instance resurfaces. Eventually, this old instance info finds its way back to the restarted replica and the zombie Pids are made available again.

This is easier to reproduce with many nodes and low broadcast periods.

Here the node4 out of 4 nodes is restarted and the old instance {:"[email protected]", 1473221321103120} shows up along with invalid pids.

AudiaMBP:Dht gabiz$ iex --name [email protected] -S mix
Erlang/OTP 19 [erts-8.0] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex([email protected])1>
21:08:45.127 [debug] [email protected]: transfer_req from :"[email protected]"

21:08:45.129 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1

21:08:45.129 [debug] [email protected]: transfer_req from :"[email protected]"

21:08:45.129 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1

21:08:45.141 [debug] [email protected]: transfer_req from :"[email protected]"

21:08:45.141 [debug] {:"[email protected]", 1473221325050827}: sending delta generation 1

21:08:45.177 [debug] [email protected]: replica up from :"[email protected]"

21:08:45.177 [debug] [email protected]: replica up from :"[email protected]"

21:08:45.194 [debug] [email protected]: replica up from :"[email protected]"

21:08:45.210 [debug] [email protected]: transfer_req from [email protected]

21:08:45.212 [debug] [email protected]: transfer_ack from :"[email protected]"
iex([email protected])1> GenServer.call(Dispatch.Registry, {:list, "dht"})
%Phoenix.Tracker.State{cloud: #MapSet<[]>,
 context: %{{:"[email protected]", 1473221166666092} => 1,
   {:"[email protected]", 1473221261730782} => 1,
   {:"[email protected]", 1473221317066008} => 1,
   {:"[email protected]", 1473221321103120} => 1,
   {:"[email protected]", 1473221325050827} => 1},
 delta: %Phoenix.Tracker.State{cloud: #MapSet<[]>, context: %{}, delta: :unset,
  mode: :delta, pids: nil,
  range: {%{{:"[email protected]", 1473221325050827} => 1},
   %{{:"[email protected]", 1473221325050827} => 1}},
  replica: {:"[email protected]", 1473221325050827}, replicas: %{}, values: %{}},
 mode: :normal, pids: 245817, range: {%{}, %{}},
 replica: {:"[email protected]", 1473221325050827},
 replicas: %{{:"[email protected]", 1473221166666092} => :up,
   {:"[email protected]", 1473221261730782} => :up,
   {:"[email protected]", 1473221317066008} => :up,
   {:"[email protected]", 1473221325050827} => :up}, values: 241720}

iex([email protected])3> Phoenix.Tracker.list(Dispatch.Registry, "dht")
[{#PID<15449.228.0>,
  %{node: :"[email protected]", phx_ref: "Pe6xs3WtaDM=", state: :online}},
 {#PID<15551.232.0>,
  %{node: :"[email protected]", phx_ref: "PGUKQ2hd0fI=", state: :online}},
 {#PID<0.236.0>,
  %{node: :"[email protected]", phx_ref: "m1qD8cn+eMc=", state: :online}},
 {#PID<0.236.0>,
  %{node: :"[email protected]", phx_ref: "wDLkDaoxAL8=", state: :online}},
 {#PID<15448.268.0>,
  %{node: :"[email protected]", phx_ref: "teoKZJQijs8=", state: :online}}]

The Collectable protocol is deprecated for non-empty lists

I am getting this:

Enum.into/2 or "for" comprehensions with an :into option is incorrect when collecting into non-empty lists. If you're collecting into a non-empty keyword list, consider using Keyword.merge/2 instead. If you're collecting into a non-empty list, consider concatenating the two lists with the ++ operator.
(elixir) lib/collectable.ex:83: Collectable.List.into/1
(phoenix_pubsub) lib/phoenix/tracker/state.ex:380: Phoenix.Tracker.State.merge_deltas/2
(phoenix_pubsub) lib/phoenix/tracker/delta_generation.ex:34: Phoenix.Tracker.DeltaGeneration.do_push/4
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:519: Phoenix.Tracker.Shard.push_delta_generation/2
(phoenix_pubsub) lib/phoenix/tracker/shard.ex:169: Phoenix.Tracker.Shard.handle_info/2

With elixir 1.8 and phoenux_pubsub 1.1.1

Enumerable.MapSet.reduce function not found

Hi Guys , I’m using phoenix_pubsub in a project, I’ve pushed to a production app and I’m doing some load testing, sadly I getting a lot of errors , one of these is

function Enumerable.MapSet.reduce/3 is undefined or private. Did you mean one of:

      * reduce/3

the trace tells that the error is from lib/phoenix/tracker/state.ex:362:

then I've remove the build and re compile all , the same error persist but with different trace:

function Enumerable.MapSet.reduce/3 is undefined (module Enumerable.MapSet is not available)

I'm using:
Erlang/OTP 19 [erts-8.1] [source] [64-bit] [async-threads:10] [kernel-poll:false]
Elixir 1.3.4

Dialyzer issues being caused in main phoenix repo

With regards to phoenixframework/phoenix#3027
I have tracked it down to shard.ex

@spec list(pid, topic) :: [presence]
  def list(server_pid, topic) do
    server_pid
    |> GenServer.call({:list, topic})
    |> State.get_by_topic(topic)
  end

The spec needs to be updated to @spec list(pid | atom, topic) :: [presence]
In addition, there are several other dialzyer issues in the shard.ex file I have fixes for.

I notice that the dialyzer package is no longer supported, could I also switch to the recommended https://github.com/Comcast/dialyzex or https://github.com/jeremyjh/dialyxir?

Fastlanes via subscribe

Can't find way to make it works.

I.e.:

Phoenix.PubSub.subscribe(:freeling, "freeling:#{id}", fastlane: {some_module_pid, SomeModule, ["some_id1"]})

Normal subscriber always called, but not fastlane destination.

Also tried to print subscribers, all right, it has fastlanes:

[{#PID<0.959.0>,
  {#PID<0.1006.0>, SomeModule,
   ["bdb59e6a-2a5d-11e6-be56-685b35999bd2"]}}]
[{#PID<0.959.0>,
  {#PID<0.1007.0>, SomeModule,
   ["bdb59e6a-2a5d-11e6-be56-685b35999bd2"]}}]

Seems that sending to local pid is forced.
lib/phoenix/pubsub/local.ex#L115

  defp do_broadcast(nil, pubsub_server, shard, from, topic, msg) do
    pubsub_server
    |> subscribers_with_fastlanes(topic, shard)
    |> Enum.each(fn
      {pid, _} when pid == from -> :noop
      {pid, _} -> send(pid, msg)
    end)
  end

memory leak (monitor refs) in Phoenix.PubSub.Local

I think I found a memory leak in Phoenix.PubSub.Local with monitor refs and long running subscribers that join topics frequently.

Each time a process subscribes to a topic Process.monitor is called at

Process.monitor(pid)
causing the Local GenServer to create a new monitor for the subscriber pid regardless of whether it's already monitoring that pid as a result of an earlier subscription from another (or the same) topic.

The ref isn't captured when it's created and isn't demonitored when the subscriber unsubscribes from the topic. It appears that the only way these monitors are cleaned up is if the subscriber or the Local GenServer process exits.

Below is an iex session using phoenix_pubsub master demonstrating the issue.

Interactive Elixir (1.2.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, pubsub} = Phoenix.PubSub.LocalSupervisor.start_link(:monitor_leak_test, 1, [])
{:ok, #PID<0.128.0>}
iex(2)> [{_, shard?, _, _}] = Supervisor.which_children(pubsub)
[{0, #PID<0.129.0>, :supervisor, [Supervisor]}]
iex(3)> [{Phoenix.PubSub.Local, local, _, _}, _gc] = Supervisor.which_children(shard?)
[{Phoenix.PubSub.Local, #PID<0.131.0>, :worker, [Phoenix.PubSub.Local]},
 {Phoenix.PubSub.GC, #PID<0.130.0>, :worker, [Phoenix.PubSub.GC]}]
iex(4)> subscriber = spawn fn -> receive do end end
#PID<0.135.0>
iex(5)> :erlang.process_info(local, :memory)
{:memory, 2824}
iex(6)> (1..10000) |> Enum.each(fn(_) -> Phoenix.PubSub.subscribe(:monitor_leak_test, subscriber, "foo") && Phoenix.PubSub.unsubscribe(:monitor_leak_test, subscriber, "foo") end)
:ok
iex(7)> :erlang.process_info(local, :memory)
{:memory, 736752}
iex(8)> :erlang.garbage_collect(local)
true
iex(9)> :erlang.process_info(local, :memory)
{:memory, 722824}
iex(10)> Process.exit(subscriber, :kill)
true
iex(11)> :erlang.garbage_collect(local)
true
iex(12)> :erlang.process_info(local, :memory)
{:memory, 2824}

As always thanks for all the work you do on Phoenix!

:ets.lookup(nil, :subscribe) problem

Hey @chrismccord !

Some folks are running into an issue the following issue. Given a supervision tree that looks like:

    children = [
      supervisor(App.Endpoint, []),
      supervisor(Absinthe.Subscription, [App.Endpoint]),
    ]

The Absinthe.Subscription starts a set of proxy processes which, as part of their boot process, do:

  def init({pubsub, shard}) d
    :ok = pubsub.subscribe(topic(shard))
    {:ok, %__MODULE__{pubsub: pubsub}}
  end

pubsub in this case is the App.Endpoint, per the argument to the Absinthe.Subscription child spec.

This produces the following error:

=INFO REPORT==== 13-Jul-2017::15:12:30 ===
    application: logger
    exited: stopped
    type: temporary
** (Mix) Could not start application frontend: Frontend.Application.start(:normal, []) returned an error: shutdown: failed to start child: Absinthe.Subscription
    ** (EXIT) shutdown: failed to start child: Absinthe.Subscription.ProxySupervisor
        ** (EXIT) shutdown: failed to start child: 1
            ** (EXIT) an exception was raised:
                ** (ArgumentError) argument error
                    (stdlib) :ets.lookup(nil, :subscribe)
                    (phoenix_pubsub) lib/phoenix/pubsub.ex:288: Phoenix.PubSub.call/3
                    (absinthe) lib/absinthe/subscription/proxy.ex:19: Absinthe.Subscription.Proxy.init/1
                    (stdlib) gen_server.erl:365: :gen_server.init_it/2
                    (stdlib) gen_server.erl:333: :gen_server.init_it/6
                    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

I haven't run into this myself so it's possibly config related, but I'm having difficulty tracking down what might be wrong.

no function clause matching in Phoenix.PubSub.subscribe/2

Hello,

I just noticed that while you can use Phoenix.PubSub.subscribe/2 to subscribe to presence changes, you may only do so when your topic is of binary type.
On the other hand, one can track presence with arbitrary things as topic and presence listing does work fine (haven't tested multi-node setup)

So this does work:

iex(dingen@localhost)15> Phoenix.PubSub.subscribe(Dingen.PubSub, "dingen:12345")
:ok


iex(dingen@localhost)7> DingenWeb.Presence.track(self(), "dingen:12345", "dingen", %{})
{:ok, "SMcm2taTxLw="}


iex(dingen@localhost)17> flush
%Phoenix.Socket.Broadcast{event: "presence_diff",
 payload: %{joins: %{"dingen" => %{metas: [%{phx_ref: "SMcm2taTxLw="}]}},
   leaves: %{}}, topic: "dingen:12345"}
:ok

While this doesn't

iex(dingen@localhost)15> Phoenix.PubSub.subscribe(Dingen.PubSub, {"game", 12345})
** (FunctionClauseError) no function clause matching in Phoenix.PubSub.subscribe/2

    The following arguments were given to Phoenix.PubSub.subscribe/2:

        # 1
        Dingen.PubSub

        # 2
        {"game", 12345}

    Attempted function clauses (showing 1 out of 1):

        def subscribe(server, topic) when is_atom(server) and -is_binary(topic)-

    (phoenix_pubsub) lib/phoenix/pubsub.ex:151: Phoenix.PubSub.subscribe/2


iex(dingen@localhost)4> DingenWeb.Presence.track(self(), {"game", 12345}, "dingen", %{})
{:ok, "RY19Y4RqB8Q="}

On IRC I heard that I should not count on the behaviour of Presence track allowing for arbitrary topic names so maybe some guard clause is missing. Or maybe the other way around that Presence does support it by design and maybe PubSub should too?

Anyway, I thought I'd rather report it.

Best regards

broadcast_from!/4 is undefined or private

I am using pubsub without phoenix in version 1.1.2 and there are some tests for the tracker behaviour. Now and then the tests fail with the following message:

** (UndefinedFunctionError) function Phoenix.PubSub.broadcast_from!/4 is undefined or private
    (phoenix_pubsub) Phoenix.PubSub.broadcast_from!(:results, #PID<0.340.0>, "phx_presence:Elixir.Styx.PubSub.Tracker_shard3", {:pub, :heartbeat, {:nonode@nohost, 1564048528061830}, %Phoenix.Tracker.State{clouds: %{{:nonode@nohost, 1564048528061830} => #MapSet<[{{:nonode@nohost, 1564048528061830}, 1}, {{:nonode@nohost, 1564048528061830}, 2}]>}, context: %{}, delta: :unset, mode: :delta, pids: nil, range: {%{{:nonode@nohost, 1564048528061830} => 0}, %{{:nonode@nohost, 1564048528061830} => 2}}, replica: {:nonode@nohost, 1564048528061830}, replicas: %{}, values: %{}}, {{:nonode@nohost, 1564048528061830}, %{{:nonode@nohost, 1564048528061830} => 2}}})
    (phoenix_pubsub) lib/phoenix/tracker/shard.ex:453: Phoenix.Tracker.Shard.broadcast_delta_heartbeat/1
    (phoenix_pubsub) lib/phoenix/tracker/shard.ex:169: Phoenix.Tracker.Shard.handle_info/2
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: :heartbeat

In which cases can this happen?

Message ordering

Should message ordering be preserved when a broadcast call returns before another broadcast call begins? I've done some testing and found some counterexamples -- not sure whether it's a bug or not.

Is a `:one_for_one` restart strategy safe for my supervisor?

We're using this library in a non-phoenix project and are defining our supervision tree ourselves. Here's the children I'm defining under a supervisor:

    [
      supervisor(Task.Supervisor, [[name: MyApp.PubSubTracker.TaskSupervisor]]),
      supervisor(Phoenix.PubSub.PG2, [MyApp.PubSub, []]),
      worker(MyApp.PubSubTracker, [[
        pubsub_server: MyApp.PubSub,
        name: MyApp.PubSubTracker,
        task_sup: MyApp.PubSubTracker.TaskSupervisor,
      ]]),
    ]

We're trying to figure out what restart strategy to use. When I'm dealing with GenServers I've written I can generally figure out if :one_for_one is safe or if I need to use :rest_for_one. Here I' not sure, because I don't know the details of how the tracker and pubsub processes interact. Is it safe for them to be restarted individually (allowing us to use :one_for_one)? Or do we need to use :rest_for_one?

If possible, it would be nice for this to be in the docs.

Thanks!

Network traffic up for 20 minutes after a server restart

We're using Phoenix Presence (latest version in the master branch). We have a kubernetes set up where we have multiple pods running.

We've noticed that every time we restart a pod or do a rolling update then network traffic is up for 20 minutes.

I was able to replicate it in our beta environment when I had 10K online connections and I restarted one pod:
Screen Shot 2019-11-27 at 18 05 28
As you can see, traffic went up around 11:53 and came back down around 12:14.

I think it's related to permdown_period setting which by default is 20 minutes. I tried to replicate this with just phoenix_pubsub library without a web server but wasn't able to. EDIT: It is related. If I changed it to 10, then network traffic was up only for 10 minutes.

I also did a tcpdump inside one pod to see where the traffic is coming from/going to. It was all between the presence servers themselves. I think these are the state synchronization messages.

Do you have any suggestions what to look for or how to gather more information?

Providing custom `Tracker.list` timeout

Tracker.Shard uses a GenServer.call which doesn't accept any options from the caller. This call can be expensive in a large shard or one which is under heavy load.

Is it worthwhile to expose a timeout option the entire way through the function calls? My thought is to set a low timeout value when writing a listeners? function and then just assuming true if the function times out.

I'm happy to add this in but wanted to run it by the maintainers first. I could see a desire to expose this through all of the Tracker public APIs.

Phoenix.PubSub.PG2.start_link/1 missing?

I noticed that it's not possible to start Phoenix.PubSub.PG2by using

children = [
  {Phoenix.PubSub.PG2, [:pubsub_name, []]}
]
...
Supervisor.start_link(children, opts)

because it result in in ** (UndefinedFunctionError) function Phoenix.PubSub.PG2Server.start_link/1 is undefined or private..

Is this the intended behaviour or am I missing anything?

Thanks!

Mismatched pool sizes cause errors/missed messages

When using pubsub with multi-node, if the nodes are configured with different pool sizes, the following situations can occur:

  1. pool_size is lower than the broadcaster - this results in the error below
  2. pool_size is greater than the broadcaster - this results in missed messages

The issue stems from forwarding the pool_size based on the broadcaster - https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/pubsub/pg2_server.ex#L36


Error:

14:41:19.297 [error] Task #PID<0.5811.0> started from Phoenix.PubSub.PubSub terminating
** (MatchError) no match of right hand side value: lib/phoenix/pubsub/local.ex:228: Phoenix.PubSub.Local.pools_for_shard/2
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:223: Phoenix.PubSub.Local.local_for_shard/2
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:152: Phoenix.PubSub.Local.subscribers_with_fastlanes/3
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:112: Phoenix.PubSub.Local.do_broadcast/6
(phoenix_pubsub) lib/phoenix/pubsub/local.ex:103: anonymous fn/7 in Phoenix.PubSub.Local.broadcast/6
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<4.20419525/0 in Phoenix.PubSub.Local.broadcast/6>
Args: []

Latest Version of Phoenix.PubSub in Hex

Started to study a bit about Phoenix.PubSub and added {:phoenix_pubsub, "~> 2.0"} as a dependency - as the front page instructs. But it seems the latest version is 1.1.2 and 2.0 is not available in hex.pm:

** (Mix) No matching version for phoenix_pubsub ~> 2.0 (from: mix.exs) in registry

The latest version is: 1.1.2

Should the GitHub repo be used directly? How long will it take to show up in hex.pm?

The scalability of elixir/phoenix in the future

I've noticed that lasp/partisan enables erlang/elixir to scale up to 1k nodes, however, elixir/phoenix itself is totally built upon distributed erlang. Does phoenix.pubsub.redis mitigate this problem? What if we somehow want to scale elixir/phoenix upto 1k nodes?

GC is extremely slow

Hey!
I am developing a small experimental app for a project I am working on.
I planned on using Phoenix.PubSub for setting up subscriptions in a cluster of machines so a game process can subscribe to events happening that it cares about.
Due to the high scalability requirements I tested adding and deleting lots of processes early on and saw a huge bottleneck in the Phoenix.PubSub.GC module.

Subscribing my 50k test processes to 10 topics is done in well under 2s, removing their subscription takes ˜50s with all schedulers maxing out: https://dsh.re/19075

This is obviously a super synthetic benchmark but I was wondering if the GC part has been looked at with a performance perspective in mind.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.