Code Monkey home page Code Monkey logo

fastapi_websocket_pubsub's Introduction

pubsub

โšก๐Ÿ—ž๏ธ FastAPI Websocket Pub/Sub

Tests Package Downloads

A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

As seen at PyCon IL 2021 and EuroPython 2021

Installation ๐Ÿ› ๏ธ

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == โšก๐Ÿ’ช โค๏ธ

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on server
      async def on_event(data):
          print("We got an event! with data- ", data)
      # Subscribe for the event 
      client.subscribe("my event", on_event)
  • Publish

    • Directly from server code to connected clients.
      app = FastAPI() 
      endpoint = PubSubEndpoint()
      endpoint.register_route(app, path="/pubsub")
      endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • From client to client (through the servers)
      async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
          endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app = FastAPI() 
      endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
      
      @app.websocket("/pubsub")
      async def websocket_rpc_endpoint(websocket: WebSocket):
          await endpoint.main_loop(websocket)
      see examples/pubsub_broadcaster_server_example.py for full usage example

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter

from fastapi_websocket_pubsub import PubSubEndpoint
app =  FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
    # Upon request trigger an event
    endpoint.publish(["triggered"])

Client:

from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
    print("Trigger URL was accessed")

async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
    # Subscribe for the event 
    client.subscribe("triggered", on_trigger)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based on fastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based on broadcaster for syncing server instances

  • Server Endpoint:

    • Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based on Tenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on python websockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config. It provides a helper logging module to control how it produces logs for you. See fastapi_websocket_rpc/logger.py. Use logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer. Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features

fastapi_websocket_pubsub's People

Contributors

arkadybag avatar asafc avatar ff137 avatar graeme22 avatar obsd avatar orweis avatar roekatz avatar singingwolfboy avatar sondrelg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastapi_websocket_pubsub's Issues

Timeout Error

Hey guys, thanks for the fantastic work.

I've encountered a timeout error, but nowhere in the docs or source code I found how to set a limit (if possible at all) for the client. My server consumes an awaitable function that provides data at 1 minute intervals. The server then publishes the data. But the client times out at what it seems like 5 seconds

RPC Error
Traceback (most recent call last):
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/fastapi_websocket_rpc/websocket_rpc_client.py", line 118, in __connect__
    raw_ws = await self.conn.__aenter__()
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/client.py", line 633, in __aenter__
    return await self
  File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/client.py", line 650, in __await_impl_timeout__
    return await asyncio.wait_for(self.__await_impl__(), self.open_timeout)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

Can I set this TimeOut limit? Thanks!

too many file descriptors in select()

ValueError: too many file descriptors in select()
windows10
python3.7
when clients connect more than 500 , server will have shutdown
r, w, x = select.select(r, w, w, timeout)
ValueError: too many file descriptors in select()

Is there a detailed tutorial on "websocket"?

Sorry, my native language is not English, my English is poor, if communication is not good I will close issues.

I found the websocket example from #41

Before I found this example, it was not clear that the message should be sent after connecting.

Subscribe Msg

{
   request:  {
       method: "subscribe",
       "arguments": {
       "topics": ["event"]
       }
   }
}

I understand that I can receive messages by sending Subscribe Msg.

I understand sending messages to topics using Python.

endpoint = PubSubEndpoint(broadcaster="redis://localhost:6379")
await endpoint.publish(['event'], {'msg': 'ok'})

How can I send a message to a topic from Web JS?

Do you have any examples or documentation to look at?

I have a few questions

1.I want to send a message to Event topic?

let ws = new WebSocket("ws://localhost:8080/pubsub")
ws.send(JSON.stringify(????))

2.After the web receives a message once, there are no more messages.

import asyncio
from fastapi_websocket_pubsub import PubSubEndpoint


async def main():
    endpoint = PubSubEndpoint(broadcaster='redis://localhost:6379')
    while True:
        await endpoint.publish(["event"], data={'msg': 'hello web'})
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

But the web can receive the trigger message.

async def events():
    await endpoint.publish(['event'], {"msg": "fastapi"})


@app.get('/trigger')
async def trigger_events():
    asyncio.create_task(events())

image

Optional dependencies - Broadcaster Postgres dependency problem

Hello people!
First and foremost, I really appreciate your project! I am also using it heavily in a library I am making - eventhive.

So, I am trying to use your fastapi PubSub in a project that cannot use compiled C extensions.

This wouldn't be a problem as fastapi is Python, but broadcaster does require a postgres dependency that contains C code, failing my build.

I propose to create optional dependencies, exactly like broadcaster does, so I can exclude the postgres dependency that creates the issue - I'm not planning to use postgres anyway!

Thanks again for your project, it really is very useful!

Plan to support Python 3.11

I try to run with Python 3.11 but it can't.

Traceback (most recent call last):
  File "/workspace/api/main.py", line 6, in <module>
    from fastapi_websocket_pubsub import PubSubEndpoint
  File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/__init__.py", line 1, in <module>
    from .pub_sub_server import PubSubEndpoint
  File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/pub_sub_server.py", line 5, in <module>
    from fastapi_websocket_rpc import WebsocketRPCEndpoint
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/__init__.py", line 2, in <module>
    from .websocket_rpc_client import WebSocketRpcClient
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/websocket_rpc_client.py", line 12, in <module>
    from .rpc_channel import RpcChannel, OnConnectCallback, OnDisconnectCallback
  File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/rpc_channel.py", line 7, in <module>
    from asyncio.coroutines import coroutine
ImportError: cannot import name 'coroutine' from 'asyncio.coroutines' (/usr/local/lib/python3.11/asyncio/coroutines.py)

Channel Closed before RPC response for 5b0a37a08a51 could be received

  1. I send some log data to client(web)
  2. Then I switch tabs (two tab in one page)
  3. server throws exception like below:

fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for 4af54b2301024c419c30
5b0a37a08a51 could be received
Failed to notify subscriber sub_id=1c16a0c075014bb7a9f00022bde7c831 with topic=test_path_2#log_data
Traceback (most recent call last):
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\event_notifier.py", line 222, in
callback_subscribers
await self.trigger_callback(data, topic, subscriber_id, event)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\event_notifier.py", line 180, in
trigger_callback
await subscription.callback(subscription, data)
File "Z:\datasmart\datasmart_backend\venv\Lib\site-packages\fastapi_websocket_pubsub\rpc_event_methods.py", line 26, i
n callback
await self.channel.other.notify(subscription=sub, data=data)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\rpc_channel.py", line 397, in call
return await self.wait_for_response(promise, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\rpc_channel.py", line 370, in wait_f
or_response
raise RpcChannelClosedException(
fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for fd885a529545428594b6
c10ecb695e78 could be received

someone cloud help me? thanks very much.

Demo: got Future <Future pending> attached to a different loop

I am trying the example out on Python 3.8.5 and getting the following error when invoking the /trigger endpoint:

$ PORT=8004 python pubsub_broadcaster_server_example.py 
INFO:     Started server process [58697]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8004 (Press CTRL+C to quit)
INFO:     ('127.0.0.1', 37088) - "WebSocket /pubsub" [accepted]
INFO:     127.0.0.1:37124 - "GET /trigger/ HTTP/1.1" 307 Temporary Redirect
INFO:     127.0.0.1:37124 - "GET /trigger HTTP/1.1" 200 OK
Failed to notify subscriber sub_id=a8fd8a8aaf6d42238d6bda98f193bf40 with topic=germs
Traceback (most recent call last):
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 156, in callback_subscribers
    await self.trigger_callback(data, topic, subscriber_id, event)
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 129, in trigger_callback
    await subscription.callback(subscription, data)
  File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 157, in __broadcast_notifications__
    async with self._publish_lock:
  File "/usr/lib/python3.8/asyncio/locks.py", line 97, in __aenter__
    await self.acquire()
  File "/usr/lib/python3.8/asyncio/locks.py", line 203, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-20' coro=<EventNotifier.callback_subscribers() running at /home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py:156> cb=[gather.<locals>._done_callback() at /usr/lib/python3.8/asyncio/tasks.py:758]> got Future <Future pending> attached to a different loop
INFO:     ('127.0.0.1', 37644) - "WebSocket /pubsub" [accepted]

The client does see the events though:

$ PORT=8004 python pubsub_client_example.py 
running callback for guns!
running callback for germs!
running callback for germs!
running callback steel!
Got data None

This what I have installed:

aiohttp==3.7.4.post0
async-timeout==3.0.1
asyncpg==0.22.0
attrs==20.3.0
broadcaster==0.2.0
certifi==2020.12.5
chardet==4.0.0
click==7.1.2
fastapi==0.63.0
fastapi-websocket-pubsub==0.1.17
fastapi-websocket-rpc==0.1.18
flake8==3.9.0
h11==0.12.0
idna==2.10
iniconfig==1.1.1
mccabe==0.6.1
multidict==5.1.0
packaging==20.9
pkg-resources==0.0.0
pluggy==0.13.1
py==1.10.0
pycodestyle==2.7.0
pydantic==1.8.1
pyflakes==2.3.1
PyJWT==2.0.1
pyparsing==2.4.7
pytest==6.2.3
requests==2.25.1
sentry-sdk==1.0.0
six==1.15.0
starlette==0.13.6
tenacity==7.0.0
toml==0.10.2
typing-extensions==3.7.4.3
urllib3==1.26.4
uvicorn==0.13.4
websockets==8.1
yarl==1.6.3

ALL_TOPICS client subscription not working

When subscribing to ALL_TOPICS from a client they do not receive any updates.

server.py:

import uvicorn
from fastapi import FastAPI
from fastapi_websocket_pubsub import PubSubEndpoint
import asyncio
import uvicorn
from fastapi import FastAPI

app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, "/pubsub")


async def on_events(subscription, data):
    print(f"Hello from {subscription.topic}: {data}")


@app.on_event("startup")
async def startup_client():
    # Subscribe to all incoming messages
    await endpoint.subscribe(ALL_TOPICS, on_events)


async def events():
    # await asyncio.sleep(1)
    await endpoint.publish("other", data={"info": 3, "some_data": "test123"})
    await asyncio.sleep(1)
    await endpoint.publish("other", data={"info": 2, "some_data": "test456"})
    await asyncio.sleep(1)
    await endpoint.publish("test", data={"info": 1, "some_data": "test999"})


@app.get("/request")
async def trigger_events():
    asyncio.create_task(events())


if __name__ == "__main__":

    uvicorn.run(
        "server:app",
        host="0.0.0.0",
        reload=False,
        port=8020,
    )

client.py:


from fastapi_websocket_pubsub.event_notifier import ALL_TOPICS
import uvicorn

from fastapi_websocket_pubsub import PubSubClient

app = FastAPI()
endpoint = PubSubClient()


async def on_events(data, topic):
    print(f"Received from {topic}: {data}")


@app.on_event("startup")
async def startup_client():
    # Subscribe to all incoming messages
    endpoint.subscribe(ALL_TOPICS, on_events)
    endpoint.start_client("ws://0.0.0.0:8020/pubsub")


@app.get("/")
async def get():
    await endpoint.publish("test", data={"info": 1, "some_data": "client"})


if __name__ == "__main__":

    uvicorn.run(
        "client:app",
        host="0.0.0.0",
        reload=False,
        port=8010,
    )

Expected behavior:
client.py will receive all data from server.py
server.py receives all data

Observed behaviour:
client.py does not receive any data
server.py receives all data

Private or Non-Broadcast Messages

Is there any concept of sending a message to a specific subscriber/group within this? Or is that on the roadmap at all?

Also - It looks like the example is slightly wrong here:

endpoint.register_route(app, "/pubsub")

I believe it needs to be:

endpoint.register_route(app, path="/pubsub")

If this doesn't make sense, I was hoping to have the same topics (chat_messages, events, etc) for various user levels. (guest, user, admin). So they would subscribe to /admin/chat_messages and get chat messages from all topics, whereas users would get messages from only the users channel. Hopefully that makes sense.

Can we use celery as PUB/SUB?

I'm wondering whether we can use celery+rabbitmq as PUB/SUB and what modifications need to be done to achieve this? Thank you.

Javascript Client for Frontend

I was able to extract the payloads sent between client and server to create a Javascript client with websocket (code below).

But there are 2 issues, which I haven't solved yet:

  1. While working fine on Windows and Android Browsers, there are issues on Safari: The connection to the Websocket is established, but the browser doesn't receive the messages, when triggering pub.
  2. The connection pool doesn't seem to be cleaned up, when a client disconnects. I get these error messages, because a subscriber was not found:

Click to see stack trace!

Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicA
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 220, in callback_subscribers
  await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 178, in trigger_callback
  await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 159, in __broadcast_notifications__
  async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 48, in __aenter__
  await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 55, in connect
  await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/postgres.py", line 13, in connect
  self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 2093, in connect
  return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 903, in _connect
  raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 889, in _connect
  return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 781, in _connect_addr
  return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 833, in __connect_addr
  tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/compat.py", line 66, in wait_for
  return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
  return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 700, in _create_ssl_connection
  do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call
Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicC
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 220, in callback_subscribers
  await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 178, in trigger_callback
  await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 159, in __broadcast_notifications__
  async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 48, in __aenter__
  await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 55, in connect
  await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/postgres.py", line 13, in connect
  self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 2093, in connect
  return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 903, in _connect
  raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 889, in _connect
  return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 781, in _connect_addr
  return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 833, in __connect_addr
  tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/compat.py", line 66, in wait_for
  return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
  return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 700, in _create_ssl_connection
  do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call

Frontend Code

<template>
<div class="text-center self-center">
     <div>Status is: {{status}} <q-btn color="primary" @click="checkStatus()">Check Status</q-btn></div>
    <div v-for="response in responses" :key="response">{{ response }}</div>
  </div>
</template>

<script>
export default {
name: 'PageIndex',

data() {
  return {
    inhalt: "",
    responses: [],
    status: -1
  }
},
created() {
  var payload = {request: {method: "subscribe", "arguments": {"topics":["topic1", "topic2", "topic3"]}}}
  var payload_str = JSON.stringify(payload)
  this.connection = new WebSocket("ws://localhost:8000/pubsub")

  this.connection.onmessage = (event) => {
    console.log(event)
    this.messageReceived(event.data)
  }

  this.connection.onopen = (event) => {
    this.connection.send(payload_str)
  }

},

methods: {
  checkStatus() {
    this.status = this.connection.readyState
  },
  messageReceived (content) {
  //called, when message is received
    this.responses.push(content)
  },
}
}
</script>

Backend Code

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
from starlette.websockets import WebSocket

PORT = 8000


app = FastAPI()
router = APIRouter()
endpoint = PubSubEndpoint(broadcaster="postgres://postgres:pw@localhost:5432/")

@router.websocket("/pubsub")
async def websocket_rpc_endpoint(websocket: WebSocket):
  async with endpoint.broadcaster:
      await endpoint.main_loop(websocket)

app.include_router(router)


async def events(topic: str = "NoTopic"):
  await endpoint.publish([topic], {"somekey": "SomeValue"})


@app.get("/trigger")
async def trigger_events():
  asyncio.create_task(events("topic1"))
  asyncio.create_task(events("topic3"))


if __name__ == "__main__":
  uvicorn.run(app, host="0.0.0.0", port=PORT)

starlette requirement failing

I'm trying to use fastapi_websocket_pubsub on a Raspberry Pi. pip install doesn't work


pi@raspberrypi:~ $ pip install fastapi_websocket_pubsub
Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Collecting fastapi_websocket_pubsub
  Could not find a version that satisfies the requirement fastapi_websocket_pubsub (from versions: )
No matching distribution found for fastapi_websocket_pubsub

so I tried to install from one of the release archives directly. This produced the error

fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.

Here's the complete output:


pi@raspberrypi:~/Downloads $ python3 -m pip install v0.1.18.tar.gz
Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Processing ./v0.1.18.tar.gz
Collecting broadcaster==0.2.0 (from fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/a8/28/500ebd98ca2cc8f70626bb3521fc3afed88c0d0c8a739630ccb5ec7ca47                                    2/broadcaster-0.2.0-py3-none-any.whl
Collecting fastapi-websocket-rpc>=0.1.18 (from fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/5c/cd/fd643ea736ff81099f975cfaaaa1788014c98a0624035fed9a602d015ed                                    2/fastapi_websocket_rpc-0.1.18-py3-none-any.whl
Collecting fastapi (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/2c/2a/4b5dcc2eabaa439f81387b0d3f80ebb2ea52b0436720b713fef79db32b7                                    e/fastapi-0.65.0-py3-none-any.whl (50kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 51kB 288kB/s
Collecting aiohttp>=3.7.2 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://www.piwheels.org/simple/aiohttp/aiohttp-3.7.4.post0-cp37-cp37m-linux_armv6l.whl (1.3MB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 1.3MB 67kB/s
Collecting pydantic (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/8f/c4/a83f0b745d824a85ee5e4c8d26329d1c687b3a9303eca762dcc37372173                                    4/pydantic-1.8.1-py3-none-any.whl (125kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 133kB 418kB/s
Collecting uvicorn (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/c8/de/953f0289508b1b92debdf0a6822d9b88ffb0c6ad471d709cf639a2c8a17                                    6/uvicorn-0.13.4-py3-none-any.whl (46kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 51kB 427kB/s
Collecting starlette==0.13.6 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/c5/a4/c9e228d7d47044ce4c83ba002f28ff479e542455f0499198a3f77c94f56                                    4/starlette-0.13.6-py3-none-any.whl (59kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 185kB/s
Collecting websockets (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://www.piwheels.org/simple/websockets/websockets-9.0.1-cp37-cp37m-linux_armv6l.whl (101kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 102kB 265kB/s
Collecting requests>=2.25.0 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/29/c1/24814557f1d22c56d50280771a17307e6bf87b70727d975fd6b2ce6b014                                    a/requests-2.25.1-py2.py3-none-any.whl (61kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 192kB/s
Collecting tenacity>=6.3.1 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/41/ee/d6eddff86161c6a3a1753af4a66b06cbc508d3b77ca4698cd0374cd6653                                    1/tenacity-7.0.0-py2.py3-none-any.whl
Collecting multidict<7.0,>=4.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/1c/74/e8b46156f37ca56d10d895d4e8595aa2b344cff3c1fb3629ec97a8656cc                                    b/multidict-5.1.0.tar.gz (53kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 265kB/s
  Installing build dependencies ... done
Requirement already satisfied: chardet<5.0,>=2.0 in /usr/lib/python3/dist-packages (from aiohttp>=3.7.2->fastapi-websoc                                    ket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (3.0.4)
Collecting yarl<2.0,>=1.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://www.piwheels.org/simple/yarl/yarl-1.6.3-cp37-cp37m-linux_armv6l.whl (262kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 266kB 260kB/s
Collecting async-timeout<4.0,>=3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.1                                    8)
  Downloading https://files.pythonhosted.org/packages/e1/1e/5a4441be21b0726c4464f3f23c8b19628372f606755a9d2e46c187e65ec                                    4/async_timeout-3.0.1-py3-none-any.whl
Collecting typing-extensions>=3.6.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.                                    18)
  Downloading https://files.pythonhosted.org/packages/2e/35/6c4fff5ab443b57116cb1aad46421fb719bed2825664e8fe77d66d99bcb                                    c/typing_extensions-3.10.0.0-py3-none-any.whl
Collecting attrs>=17.3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/20/a9/ba6f1cd1a1517ff022b35acd6a7e4246371dfab08b8e42b829b6d07913c                                    c/attrs-21.2.0-py2.py3-none-any.whl (53kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 491kB/s
Collecting h11>=0.8 (from uvicorn->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
  Downloading https://files.pythonhosted.org/packages/60/0f/7a0eeea938eaf61074f29fed9717f2010e8d0e0905d36b38d3275a1e462                                    2/h11-0.12.0-py3-none-any.whl (54kB)
    100% |โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 61kB 519kB/s
Requirement already satisfied: click==7.* in /usr/lib/python3/dist-packages (from uvicorn->fastapi-websocket-rpc>=0.1.1                                    8->fastapi-websocket-pubsub==0.1.18) (7.0)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-                                    websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.24.1)
Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-web                                    socket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2018.8.24)
Requirement already satisfied: idna<3,>=2.5 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-websocket                                    -rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2.6)
Requirement already satisfied: six>=1.9.0 in /usr/lib/python3/dist-packages (from tenacity>=6.3.1->fastapi-websocket-rp                                    c>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.12.0)
Building wheels for collected packages: fastapi-websocket-pubsub, multidict
  Running setup.py bdist_wheel for fastapi-websocket-pubsub ... done
  Stored in directory: /tmp/pip-ephem-wheel-cache-zmte8q0s/wheels/2d/ec/39/e3f0b6ac2e50b81155acd465120b86bb85aa6fd78248                                    eb3d22
  Running setup.py bdist_wheel for multidict ... done
  Stored in directory: /home/pi/.cache/pip/wheels/e7/05/d2/f5c04c29d0e4b234dbcd4b609b51f8c65d67ff9bbd01c904b1
Successfully built fastapi-websocket-pubsub multidict
fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.
Installing collected packages: broadcaster, typing-extensions, pydantic, starlette, fastapi, multidict, yarl, async-tim                                    eout, attrs, aiohttp, h11, uvicorn, websockets, requests, tenacity, fastapi-websocket-rpc, fastapi-websocket-pubsub
  The script uvicorn is installed in '/home/pi/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed aiohttp-3.7.4.post0 async-timeout-3.0.1 attrs-21.2.0 broadcaster-0.2.0 fastapi-0.65.0 fastapi-we                                    bsocket-pubsub-0.1.18 fastapi-websocket-rpc-0.1.18 h11-0.12.0 multidict-5.1.0 pydantic-1.8.1 requests-2.25.1 starlette-                                    0.13.6 tenacity-7.0.0 typing-extensions-3.10.0.0 uvicorn-0.13.4 websockets-9.0.1 yarl-1.6.3

Support for websockets>=11?

Hello! Thanks for this great library.

I'm currently trying to use in an application, but another library I'm using requires websockets>=11. I noticed this library requires an older version. Is there a particular reason for this, or do the dependencies just need to be updated?

Thanks!

How to use as just a Websocket server (for browser/mobile client)?

So I realize this project is intended for RPC between servers, wrapping fastapi_websocket_rpc. I do see some hints at browser

easily accessible and scalable over the web and across your cloud in realtime

And it seems like this project could really streamline FastAPI WebSocket endpoints even just for that use-case (browser/mobile clients). In particular, the auto-handling of broadcaster while still exposing on_connect|disconnect, marshaling request/response via Pydantic, etc. Things I'm struggling with using FastAPI WebSockets "raw" + broadcaster (discussion).

Anyway, that said I'm not clear on how to have the server both subscribe & publish. Eg, what I'm looking for in pseudo-code would be:

sockets = []

app.on_connect(websocket):
  sockets.append(websocket)

app.subscribe(topic, data):
  if topic == 'chat':
    save_to_db(data)
    for s in sockets:
      if s.hasPermission(topic):
        endpoint.publish('new-message', data)

That is, it's using both subscribe and publish. On the README there's separate setups for pub vs sub (ie: PubSubEndpoint vs PubSubClient). It strikes me they can both be used together on the FastAPI server, but I'm not sure how. The PubSubClient takes a server_uri, which in this scenario should just be "self" (or, PubSubClient should be available directly from PubSubEndpoint?). And the client_example loops main() in asyncio.run; which in the above scenario wouldn't be compatible with FastAPI's main loop (right)? Looking through the code, it strikes me that maybe I want to instantiate a custom EventNotifier and pass that to PubSubEndpoint, so I can both broadcast & listen - but I'm not sure I'm barking up the right tree.

TL;DR: how, just on the FastAPI server, can I both subscribe to messages from the browser, and publish things back?


BTW, it may sound like I'm fitting circle to square peg, but I actually want to use this project anyway for server->server RPC (separate); so if it can streamline both scenarios, this would be a very valuable project indeed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.