Code Monkey home page Code Monkey logo

fastapi_websocket_pubsub's Issues

Javascript Client for Frontend

I was able to extract the payloads sent between client and server to create a Javascript client with websocket (code below).

But there are 2 issues, which I haven't solved yet:

  1. While working fine on Windows and Android Browsers, there are issues on Safari: The connection to the Websocket is established, but the browser doesn't receive the messages, when triggering pub.
  2. The connection pool doesn't seem to be cleaned up, when a client disconnects. I get these error messages, because a subscriber was not found:

Click to see stack trace!

Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicA
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 220, in callback_subscribers
  await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 178, in trigger_callback
  await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 159, in __broadcast_notifications__
  async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/", line 48, in __aenter__
  await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/", line 55, in connect
  await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/", line 13, in connect
  self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 2093, in connect
  return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 903, in _connect
  raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 889, in _connect
  return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 781, in _connect_addr
  return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 833, in __connect_addr
  tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 66, in wait_for
  return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/", line 481, in wait_for
  return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 700, in _create_ssl_connection
  do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call
Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicC
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 220, in callback_subscribers
  await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 178, in trigger_callback
  await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/", line 159, in __broadcast_notifications__
  async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/", line 48, in __aenter__
  await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/", line 55, in connect
  await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/", line 13, in connect
  self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 2093, in connect
  return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 903, in _connect
  raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 889, in _connect
  return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 781, in _connect_addr
  return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 833, in __connect_addr
  tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 66, in wait_for
  return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/", line 481, in wait_for
  return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/", line 700, in _create_ssl_connection
  do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call

Frontend Code

<div class="text-center self-center">
     <div>Status is: {{status}} <q-btn color="primary" @click="checkStatus()">Check Status</q-btn></div>
    <div v-for="response in responses" :key="response">{{ response }}</div>

export default {
name: 'PageIndex',

data() {
  return {
    inhalt: "",
    responses: [],
    status: -1
created() {
  var payload = {request: {method: "subscribe", "arguments": {"topics":["topic1", "topic2", "topic3"]}}}
  var payload_str = JSON.stringify(payload)
  this.connection = new WebSocket("ws://localhost:8000/pubsub")

  this.connection.onmessage = (event) => {

  this.connection.onopen = (event) => {


methods: {
  checkStatus() {
    this.status = this.connection.readyState
  messageReceived (content) {
  //called, when message is received

Backend Code

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
from starlette.websockets import WebSocket

PORT = 8000

app = FastAPI()
router = APIRouter()
endpoint = PubSubEndpoint(broadcaster="postgres://postgres:pw@localhost:5432/")

async def websocket_rpc_endpoint(websocket: WebSocket):
  async with endpoint.broadcaster:
      await endpoint.main_loop(websocket)


async def events(topic: str = "NoTopic"):
  await endpoint.publish([topic], {"somekey": "SomeValue"})

async def trigger_events():

if __name__ == "__main__":, host="", port=PORT)

Is there a detailed tutorial on "websocket"?

Sorry, my native language is not English, my English is poor, if communication is not good I will close issues.

I found the websocket example from #41

Before I found this example, it was not clear that the message should be sent after connecting.

Subscribe Msg

   request:  {
       method: "subscribe",
       "arguments": {
       "topics": ["event"]

I understand that I can receive messages by sending Subscribe Msg.

I understand sending messages to topics using Python.

endpoint = PubSubEndpoint(broadcaster="redis://localhost:6379")
await endpoint.publish(['event'], {'msg': 'ok'})

How can I send a message to a topic from Web JS?

Do you have any examples or documentation to look at?

I have a few questions

1.I want to send a message to Event topic?

let ws = new WebSocket("ws://localhost:8080/pubsub")

2.After the web receives a message once, there are no more messages.

import asyncio
from fastapi_websocket_pubsub import PubSubEndpoint

async def main():
    endpoint = PubSubEndpoint(broadcaster='redis://localhost:6379')
    while True:
        await endpoint.publish(["event"], data={'msg': 'hello web'})
        await asyncio.sleep(1)

if __name__ == '__main__':

But the web can receive the trigger message.

async def events():
    await endpoint.publish(['event'], {"msg": "fastapi"})

async def trigger_events():


Optional dependencies - Broadcaster Postgres dependency problem

Hello people!
First and foremost, I really appreciate your project! I am also using it heavily in a library I am making - eventhive.

So, I am trying to use your fastapi PubSub in a project that cannot use compiled C extensions.

This wouldn't be a problem as fastapi is Python, but broadcaster does require a postgres dependency that contains C code, failing my build.

I propose to create optional dependencies, exactly like broadcaster does, so I can exclude the postgres dependency that creates the issue - I'm not planning to use postgres anyway!

Thanks again for your project, it really is very useful!

Timeout Error

Hey guys, thanks for the fantastic work.

I've encountered a timeout error, but nowhere in the docs or source code I found how to set a limit (if possible at all) for the client. My server consumes an awaitable function that provides data at 1 minute intervals. The server then publishes the data. But the client times out at what it seems like 5 seconds

RPC Error
Traceback (most recent call last):
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/fastapi_websocket_rpc/", line 118, in __connect__
    raw_ws = await self.conn.__aenter__()
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/", line 633, in __aenter__
    return await self
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/", line 650, in __await_impl_timeout__
    return await asyncio.wait_for(self.__await_impl__(), self.open_timeout)
  File "/usr/lib/python3.8/asyncio/", line 501, in wait_for
    raise exceptions.TimeoutError()

Can I set this TimeOut limit? Thanks!

Plan to support Python 3.11

I try to run with Python 3.11 but it can't.

Traceback (most recent call last):
  File "/workspace/api/", line 6, in <module>
    from fastapi_websocket_pubsub import PubSubEndpoint
  File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/", line 1, in <module>
    from .pub_sub_server import PubSubEndpoint
  File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/", line 5, in <module>
    from fastapi_websocket_rpc import WebsocketRPCEndpoint
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/", line 2, in <module>
    from .websocket_rpc_client import WebSocketRpcClient
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/", line 12, in <module>
    from .rpc_channel import RpcChannel, OnConnectCallback, OnDisconnectCallback
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/", line 7, in <module>
    from asyncio.coroutines import coroutine
ImportError: cannot import name 'coroutine' from 'asyncio.coroutines' (/usr/local/lib/python3.11/asyncio/

Channel Closed before RPC response for 5b0a37a08a51 could be received

  1. I send some log data to client(web)
  2. Then I switch tabs (two tab in one page)
  3. server throws exception like below:

fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for 4af54b2301024c419c30
5b0a37a08a51 could be received
Failed to notify subscriber sub_id=1c16a0c075014bb7a9f00022bde7c831 with topic=test_path_2#log_data
Traceback (most recent call last):
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\", line 222, in
await self.trigger_callback(data, topic, subscriber_id, event)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\", line 180, in
await subscription.callback(subscription, data)
File "Z:\datasmart\datasmart_backend\venv\Lib\site-packages\fastapi_websocket_pubsub\", line 26, i
n callback
await, data=data)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\", line 397, in call
return await self.wait_for_response(promise, timeout=timeout)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\", line 370, in wait_f
raise RpcChannelClosedException(
fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for fd885a529545428594b6
c10ecb695e78 could be received

someone cloud help me? thanks very much.

How to use as just a Websocket server (for browser/mobile client)?

So I realize this project is intended for RPC between servers, wrapping fastapi_websocket_rpc. I do see some hints at browser

easily accessible and scalable over the web and across your cloud in realtime

And it seems like this project could really streamline FastAPI WebSocket endpoints even just for that use-case (browser/mobile clients). In particular, the auto-handling of broadcaster while still exposing on_connect|disconnect, marshaling request/response via Pydantic, etc. Things I'm struggling with using FastAPI WebSockets "raw" + broadcaster (discussion).

Anyway, that said I'm not clear on how to have the server both subscribe & publish. Eg, what I'm looking for in pseudo-code would be:

sockets = []


app.subscribe(topic, data):
  if topic == 'chat':
    for s in sockets:
      if s.hasPermission(topic):
        endpoint.publish('new-message', data)

That is, it's using both subscribe and publish. On the README there's separate setups for pub vs sub (ie: PubSubEndpoint vs PubSubClient). It strikes me they can both be used together on the FastAPI server, but I'm not sure how. The PubSubClient takes a server_uri, which in this scenario should just be "self" (or, PubSubClient should be available directly from PubSubEndpoint?). And the client_example loops main() in; which in the above scenario wouldn't be compatible with FastAPI's main loop (right)? Looking through the code, it strikes me that maybe I want to instantiate a custom EventNotifier and pass that to PubSubEndpoint, so I can both broadcast & listen - but I'm not sure I'm barking up the right tree.

TL;DR: how, just on the FastAPI server, can I both subscribe to messages from the browser, and publish things back?

BTW, it may sound like I'm fitting circle to square peg, but I actually want to use this project anyway for server->server RPC (separate); so if it can streamline both scenarios, this would be a very valuable project indeed.

Demo: got Future <Future pending> attached to a different loop

I am trying the example out on Python 3.8.5 and getting the following error when invoking the /trigger endpoint:

$ PORT=8004 python 
INFO:     Started server process [58697]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on (Press CTRL+C to quit)
INFO:     ('', 37088) - "WebSocket /pubsub" [accepted]
INFO: - "GET /trigger/ HTTP/1.1" 307 Temporary Redirect
INFO: - "GET /trigger HTTP/1.1" 200 OK
Failed to notify subscriber sub_id=a8fd8a8aaf6d42238d6bda98f193bf40 with topic=germs
Traceback (most recent call last):
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/", line 156, in callback_subscribers
    await self.trigger_callback(data, topic, subscriber_id, event)
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/", line 129, in trigger_callback
    await subscription.callback(subscription, data)
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/", line 157, in __broadcast_notifications__
    async with self._publish_lock:
  File "/usr/lib/python3.8/asyncio/", line 97, in __aenter__
    await self.acquire()
  File "/usr/lib/python3.8/asyncio/", line 203, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-20' coro=<EventNotifier.callback_subscribers() running at /home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/> cb=[gather.<locals>._done_callback() at /usr/lib/python3.8/asyncio/]> got Future <Future pending> attached to a different loop
INFO:     ('', 37644) - "WebSocket /pubsub" [accepted]

The client does see the events though:

$ PORT=8004 python 
running callback for guns!
running callback for germs!
running callback for germs!
running callback steel!
Got data None

This what I have installed:


starlette requirement failing

I'm trying to use fastapi_websocket_pubsub on a Raspberry Pi. pip install doesn't work

pi@raspberrypi:~ $ pip install fastapi_websocket_pubsub
Looking in indexes:,
Collecting fastapi_websocket_pubsub
  Could not find a version that satisfies the requirement fastapi_websocket_pubsub (from versions: )
No matching distribution found for fastapi_websocket_pubsub

so I tried to install from one of the release archives directly. This produced the error

fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.

Here's the complete output:

pi@raspberrypi:~/Downloads $ python3 -m pip install v0.1.18.tar.gz
Looking in indexes:,
Processing ./v0.1.18.tar.gz
Collecting broadcaster==0.2.0 (from fastapi-websocket-pubsub==0.1.18)
  Downloading                                    2/broadcaster-0.2.0-py3-none-any.whl
Collecting fastapi-websocket-rpc>=0.1.18 (from fastapi-websocket-pubsub==0.1.18)
  Downloading                                    2/fastapi_websocket_rpc-0.1.18-py3-none-any.whl
Collecting fastapi (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    e/fastapi-0.65.0-py3-none-any.whl (50kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 51kB 288kB/s
Collecting aiohttp>=3.7.2 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading (1.3MB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 1.3MB 67kB/s
Collecting pydantic (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    4/pydantic-1.8.1-py3-none-any.whl (125kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 133kB 418kB/s
Collecting uvicorn (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    6/uvicorn-0.13.4-py3-none-any.whl (46kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 51kB 427kB/s
Collecting starlette==0.13.6 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    4/starlette-0.13.6-py3-none-any.whl (59kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 185kB/s
Collecting websockets (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading (101kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 102kB 265kB/s
Collecting requests>=2.25.0 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    a/requests-2.25.1-py2.py3-none-any.whl (61kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 192kB/s
Collecting tenacity>=6.3.1 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    1/tenacity-7.0.0-py2.py3-none-any.whl
Collecting multidict<7.0,>=4.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    b/multidict-5.1.0.tar.gz (53kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 265kB/s
  Installing build dependencies ... done
Requirement already satisfied: chardet<5.0,>=2.0 in /usr/lib/python3/dist-packages (from aiohttp>=3.7.2->fastapi-websoc                                    ket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (3.0.4)
Collecting yarl<2.0,>=1.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading (262kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 266kB 260kB/s
Collecting async-timeout<4.0,>=3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.1                                    8)
  Downloading                                    4/async_timeout-3.0.1-py3-none-any.whl
Collecting typing-extensions>=3.6.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.                                    18)
  Downloading                                    c/typing_extensions-
Collecting attrs>=17.3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    c/attrs-21.2.0-py2.py3-none-any.whl (53kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 491kB/s
Collecting h11>=0.8 (from uvicorn->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading                                    2/h11-0.12.0-py3-none-any.whl (54kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 519kB/s
Requirement already satisfied: click==7.* in /usr/lib/python3/dist-packages (from uvicorn->fastapi-websocket-rpc>=0.1.1                                    8->fastapi-websocket-pubsub==0.1.18) (7.0)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-                                    websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.24.1)
Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-web                                    socket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2018.8.24)
Requirement already satisfied: idna<3,>=2.5 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-websocket                                    -rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2.6)
Requirement already satisfied: six>=1.9.0 in /usr/lib/python3/dist-packages (from tenacity>=6.3.1->fastapi-websocket-rp                                    c>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.12.0)
Building wheels for collected packages: fastapi-websocket-pubsub, multidict
  Running bdist_wheel for fastapi-websocket-pubsub ... done
  Stored in directory: /tmp/pip-ephem-wheel-cache-zmte8q0s/wheels/2d/ec/39/e3f0b6ac2e50b81155acd465120b86bb85aa6fd78248                                    eb3d22
  Running bdist_wheel for multidict ... done
  Stored in directory: /home/pi/.cache/pip/wheels/e7/05/d2/f5c04c29d0e4b234dbcd4b609b51f8c65d67ff9bbd01c904b1
Successfully built fastapi-websocket-pubsub multidict
fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.
Installing collected packages: broadcaster, typing-extensions, pydantic, starlette, fastapi, multidict, yarl, async-tim                                    eout, attrs, aiohttp, h11, uvicorn, websockets, requests, tenacity, fastapi-websocket-rpc, fastapi-websocket-pubsub
  The script uvicorn is installed in '/home/pi/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed aiohttp-3.7.4.post0 async-timeout-3.0.1 attrs-21.2.0 broadcaster-0.2.0 fastapi-0.65.0 fastapi-we                                    bsocket-pubsub-0.1.18 fastapi-websocket-rpc-0.1.18 h11-0.12.0 multidict-5.1.0 pydantic-1.8.1 requests-2.25.1 starlette-                                    0.13.6 tenacity-7.0.0 typing-extensions- uvicorn-0.13.4 websockets-9.0.1 yarl-1.6.3

Private or Non-Broadcast Messages

Is there any concept of sending a message to a specific subscriber/group within this? Or is that on the roadmap at all?

Also - It looks like the example is slightly wrong here:

endpoint.register_route(app, "/pubsub")

I believe it needs to be:

endpoint.register_route(app, path="/pubsub")

If this doesn't make sense, I was hoping to have the same topics (chat_messages, events, etc) for various user levels. (guest, user, admin). So they would subscribe to /admin/chat_messages and get chat messages from all topics, whereas users would get messages from only the users channel. Hopefully that makes sense.

too many file descriptors in select()

ValueError: too many file descriptors in select()
when clients connect more than 500 , server will have shutdown
r, w, x =, w, w, timeout)
ValueError: too many file descriptors in select()

Can we use celery as PUB/SUB?

I'm wondering whether we can use celery+rabbitmq as PUB/SUB and what modifications need to be done to achieve this? Thank you.

ALL_TOPICS client subscription not working

When subscribing to ALL_TOPICS from a client they do not receive any updates.

import uvicorn
from fastapi import FastAPI
from fastapi_websocket_pubsub import PubSubEndpoint
import asyncio
import uvicorn
from fastapi import FastAPI

app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, "/pubsub")

async def on_events(subscription, data):
    print(f"Hello from {subscription.topic}: {data}")

async def startup_client():
    # Subscribe to all incoming messages
    await endpoint.subscribe(ALL_TOPICS, on_events)

async def events():
    # await asyncio.sleep(1)
    await endpoint.publish("other", data={"info": 3, "some_data": "test123"})
    await asyncio.sleep(1)
    await endpoint.publish("other", data={"info": 2, "some_data": "test456"})
    await asyncio.sleep(1)
    await endpoint.publish("test", data={"info": 1, "some_data": "test999"})

async def trigger_events():

if __name__ == "__main__":

from fastapi_websocket_pubsub.event_notifier import ALL_TOPICS
import uvicorn

from fastapi_websocket_pubsub import PubSubClient

app = FastAPI()
endpoint = PubSubClient()

async def on_events(data, topic):
    print(f"Received from {topic}: {data}")

async def startup_client():
    # Subscribe to all incoming messages
    endpoint.subscribe(ALL_TOPICS, on_events)

async def get():
    await endpoint.publish("test", data={"info": 1, "some_data": "client"})

if __name__ == "__main__":

Expected behavior: will receive all data from receives all data

Observed behaviour: does not receive any data receives all data

Support for websockets>=11?

Hello! Thanks for this great library.

I'm currently trying to use in an application, but another library I'm using requires websockets>=11. I noticed this library requires an older version. Is there a particular reason for this, or do the dependencies just need to be updated?


Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.