permitio / fastapi_websocket_pubsub Goto Github PK
View Code? Open in Web Editor NEWA fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == โก ๐ช โค๏ธ
Home Page: https://permit.io
License: MIT License
A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == โก ๐ช โค๏ธ
Home Page: https://permit.io
License: MIT License
I was able to extract the payloads sent between client and server to create a Javascript client with websocket (code below).
But there are 2 issues, which I haven't solved yet:
Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicA
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 220, in callback_subscribers
await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 178, in trigger_callback
await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 159, in __broadcast_notifications__
async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 48, in __aenter__
await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 55, in connect
await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/postgres.py", line 13, in connect
self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 2093, in connect
return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 903, in _connect
raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 889, in _connect
return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 781, in _connect_addr
return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 833, in __connect_addr
tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/compat.py", line 66, in wait_for
return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 700, in _create_ssl_connection
do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call
Failed to notify subscriber sub_id=3d0477159cdc4eb0bd8afefa9df82423 with topic=topicC
Traceback (most recent call last):
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 220, in callback_subscribers
await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 178, in trigger_callback
await subscription.callback(subscription, data)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 159, in __broadcast_notifications__
async with self._sharing_broadcast_channel:
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 48, in __aenter__
await self.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_base.py", line 55, in connect
await self._backend.connect()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/broadcaster/_backends/postgres.py", line 13, in connect
self._conn = await asyncpg.connect(self._url)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connection.py", line 2093, in connect
return await connect_utils._connect(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 903, in _connect
raise last_error
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 889, in _connect
return await _connect_addr(
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 781, in _connect_addr
return await __connect_addr(params, timeout, True, *args)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 833, in __connect_addr
tr, pr = await compat.wait_for(connector, timeout=timeout)
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/compat.py", line 66, in wait_for
return await asyncio.wait_for(fut, timeout)
File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/home/koliham/.cache/pypoetry/virtualenvs/websocketsample-dFaV7d37-py3.9/lib/python3.9/site-packages/asyncpg/connect_utils.py", line 700, in _create_ssl_connection
do_ssl_upgrade = await pr.on_data
ConnectionError: unexpected connection_lost() call
<template>
<div class="text-center self-center">
<div>Status is: {{status}} <q-btn color="primary" @click="checkStatus()">Check Status</q-btn></div>
<div v-for="response in responses" :key="response">{{ response }}</div>
</div>
</template>
<script>
export default {
name: 'PageIndex',
data() {
return {
inhalt: "",
responses: [],
status: -1
}
},
created() {
var payload = {request: {method: "subscribe", "arguments": {"topics":["topic1", "topic2", "topic3"]}}}
var payload_str = JSON.stringify(payload)
this.connection = new WebSocket("ws://localhost:8000/pubsub")
this.connection.onmessage = (event) => {
console.log(event)
this.messageReceived(event.data)
}
this.connection.onopen = (event) => {
this.connection.send(payload_str)
}
},
methods: {
checkStatus() {
this.status = this.connection.readyState
},
messageReceived (content) {
//called, when message is received
this.responses.push(content)
},
}
}
</script>
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter
from fastapi_websocket_pubsub import PubSubEndpoint
from starlette.websockets import WebSocket
PORT = 8000
app = FastAPI()
router = APIRouter()
endpoint = PubSubEndpoint(broadcaster="postgres://postgres:pw@localhost:5432/")
@router.websocket("/pubsub")
async def websocket_rpc_endpoint(websocket: WebSocket):
async with endpoint.broadcaster:
await endpoint.main_loop(websocket)
app.include_router(router)
async def events(topic: str = "NoTopic"):
await endpoint.publish([topic], {"somekey": "SomeValue"})
@app.get("/trigger")
async def trigger_events():
asyncio.create_task(events("topic1"))
asyncio.create_task(events("topic3"))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=PORT)
Sorry, my native language is not English, my English is poor, if communication is not good I will close issues.
I found the websocket example from #41
Before I found this example, it was not clear that the message should be sent after connecting.
{
request: {
method: "subscribe",
"arguments": {
"topics": ["event"]
}
}
}
I understand that I can receive messages by sending Subscribe Msg
.
I understand sending messages to topics using Python.
endpoint = PubSubEndpoint(broadcaster="redis://localhost:6379")
await endpoint.publish(['event'], {'msg': 'ok'})
How can I send a message to a topic from Web JS?
Do you have any examples or documentation to look at?
Event topic
?let ws = new WebSocket("ws://localhost:8080/pubsub")
ws.send(JSON.stringify(????))
import asyncio
from fastapi_websocket_pubsub import PubSubEndpoint
async def main():
endpoint = PubSubEndpoint(broadcaster='redis://localhost:6379')
while True:
await endpoint.publish(["event"], data={'msg': 'hello web'})
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
But the web can receive the trigger message.
async def events():
await endpoint.publish(['event'], {"msg": "fastapi"})
@app.get('/trigger')
async def trigger_events():
asyncio.create_task(events())
Hello people!
First and foremost, I really appreciate your project! I am also using it heavily in a library I am making - eventhive.
So, I am trying to use your fastapi PubSub in a project that cannot use compiled C extensions.
This wouldn't be a problem as fastapi is Python, but broadcaster does require a postgres dependency that contains C code, failing my build.
I propose to create optional dependencies, exactly like broadcaster does, so I can exclude the postgres dependency that creates the issue - I'm not planning to use postgres anyway!
Thanks again for your project, it really is very useful!
how can i get client's data on server
Hey guys, thanks for the fantastic work.
I've encountered a timeout error, but nowhere in the docs or source code I found how to set a limit (if possible at all) for the client. My server consumes an awaitable function that provides data at 1 minute intervals. The server then publishes the data. But the client times out at what it seems like 5 seconds
RPC Error
Traceback (most recent call last):
File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/fastapi_websocket_rpc/websocket_rpc_client.py", line 118, in __connect__
raw_ws = await self.conn.__aenter__()
File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/client.py", line 633, in __aenter__
return await self
File "/home/fernandopapi/.cache/pypoetry/virtualenvs/propdesk-ml-ops-nZ04QmJs-py3.8/lib/python3.8/site-packages/websockets/legacy/client.py", line 650, in __await_impl_timeout__
return await asyncio.wait_for(self.__await_impl__(), self.open_timeout)
File "/usr/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
Can I set this TimeOut limit? Thanks!
I try to run with Python 3.11 but it can't.
Traceback (most recent call last):
File "/workspace/api/main.py", line 6, in <module>
from fastapi_websocket_pubsub import PubSubEndpoint
File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/__init__.py", line 1, in <module>
from .pub_sub_server import PubSubEndpoint
File "/usr/local/lib/python3.11/site-packages/fastapi_websocket_pubsub/pub_sub_server.py", line 5, in <module>
from fastapi_websocket_rpc import WebsocketRPCEndpoint
File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/__init__.py", line 2, in <module>
from .websocket_rpc_client import WebSocketRpcClient
File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/websocket_rpc_client.py", line 12, in <module>
from .rpc_channel import RpcChannel, OnConnectCallback, OnDisconnectCallback
File "/home/fastapi/.local/lib/python3.11/site-packages/fastapi_websocket_rpc/rpc_channel.py", line 7, in <module>
from asyncio.coroutines import coroutine
ImportError: cannot import name 'coroutine' from 'asyncio.coroutines' (/usr/local/lib/python3.11/asyncio/coroutines.py)
fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for 4af54b2301024c419c30
5b0a37a08a51 could be received
Failed to notify subscriber sub_id=1c16a0c075014bb7a9f00022bde7c831 with topic=test_path_2#log_data
Traceback (most recent call last):
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\event_notifier.py", line 222, in
callback_subscribers
await self.trigger_callback(data, topic, subscriber_id, event)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_pubsub\event_notifier.py", line 180, in
trigger_callback
await subscription.callback(subscription, data)
File "Z:\datasmart\datasmart_backend\venv\Lib\site-packages\fastapi_websocket_pubsub\rpc_event_methods.py", line 26, i
n callback
await self.channel.other.notify(subscription=sub, data=data)
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\rpc_channel.py", line 397, in call
return await self.wait_for_response(promise, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "Z:\datasmart\datasmart\venv\Lib\site-packages\fastapi_websocket_rpc\rpc_channel.py", line 370, in wait_f
or_response
raise RpcChannelClosedException(
fastapi_websocket_rpc.rpc_channel.RpcChannelClosedException: Channel Closed before RPC response for fd885a529545428594b6
c10ecb695e78 could be received
someone cloud help me? thanks very much.
Do you have a JavaScript client that can connect to an instance of this running on the server?
So I realize this project is intended for RPC between servers, wrapping fastapi_websocket_rpc. I do see some hints at browser
easily accessible and scalable over the web and across your cloud in realtime
And it seems like this project could really streamline FastAPI WebSocket endpoints even just for that use-case (browser/mobile clients). In particular, the auto-handling of broadcaster
while still exposing on_connect|disconnect
, marshaling request/response via Pydantic, etc. Things I'm struggling with using FastAPI WebSockets "raw" + broadcaster
(discussion).
Anyway, that said I'm not clear on how to have the server both subscribe & publish. Eg, what I'm looking for in pseudo-code would be:
sockets = []
app.on_connect(websocket):
sockets.append(websocket)
app.subscribe(topic, data):
if topic == 'chat':
save_to_db(data)
for s in sockets:
if s.hasPermission(topic):
endpoint.publish('new-message', data)
That is, it's using both subscribe and publish. On the README there's separate setups for pub vs sub (ie: PubSubEndpoint vs PubSubClient). It strikes me they can both be used together on the FastAPI server, but I'm not sure how. The PubSubClient takes a server_uri, which in this scenario should just be "self" (or, PubSubClient should be available directly from PubSubEndpoint?). And the client_example loops main()
in asyncio.run
; which in the above scenario wouldn't be compatible with FastAPI's main loop (right)? Looking through the code, it strikes me that maybe I want to instantiate a custom EventNotifier and pass that to PubSubEndpoint, so I can both broadcast & listen - but I'm not sure I'm barking up the right tree.
TL;DR: how, just on the FastAPI server, can I both subscribe to messages from the browser, and publish things back?
BTW, it may sound like I'm fitting circle to square peg, but I actually want to use this project anyway for server->server RPC (separate); so if it can streamline both scenarios, this would be a very valuable project indeed.
The example pubsub_server_example.py indicates in comments that notifier_client_test.py should be run but this script does not seem to be in the repo anywhere.
I am trying the example out on Python 3.8.5 and getting the following error when invoking the /trigger
endpoint:
$ PORT=8004 python pubsub_broadcaster_server_example.py
INFO: Started server process [58697]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8004 (Press CTRL+C to quit)
INFO: ('127.0.0.1', 37088) - "WebSocket /pubsub" [accepted]
INFO: 127.0.0.1:37124 - "GET /trigger/ HTTP/1.1" 307 Temporary Redirect
INFO: 127.0.0.1:37124 - "GET /trigger HTTP/1.1" 200 OK
Failed to notify subscriber sub_id=a8fd8a8aaf6d42238d6bda98f193bf40 with topic=germs
Traceback (most recent call last):
File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 156, in callback_subscribers
await self.trigger_callback(data, topic, subscriber_id, event)
File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py", line 129, in trigger_callback
await subscription.callback(subscription, data)
File "/home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_broadcaster.py", line 157, in __broadcast_notifications__
async with self._publish_lock:
File "/usr/lib/python3.8/asyncio/locks.py", line 97, in __aenter__
await self.acquire()
File "/usr/lib/python3.8/asyncio/locks.py", line 203, in acquire
await fut
RuntimeError: Task <Task pending name='Task-20' coro=<EventNotifier.callback_subscribers() running at /home/bridge/venv/websocket/lib/python3.8/site-packages/fastapi_websocket_pubsub/event_notifier.py:156> cb=[gather.<locals>._done_callback() at /usr/lib/python3.8/asyncio/tasks.py:758]> got Future <Future pending> attached to a different loop
INFO: ('127.0.0.1', 37644) - "WebSocket /pubsub" [accepted]
The client does see the events though:
$ PORT=8004 python pubsub_client_example.py
running callback for guns!
running callback for germs!
running callback for germs!
running callback steel!
Got data None
This what I have installed:
aiohttp==3.7.4.post0
async-timeout==3.0.1
asyncpg==0.22.0
attrs==20.3.0
broadcaster==0.2.0
certifi==2020.12.5
chardet==4.0.0
click==7.1.2
fastapi==0.63.0
fastapi-websocket-pubsub==0.1.17
fastapi-websocket-rpc==0.1.18
flake8==3.9.0
h11==0.12.0
idna==2.10
iniconfig==1.1.1
mccabe==0.6.1
multidict==5.1.0
packaging==20.9
pkg-resources==0.0.0
pluggy==0.13.1
py==1.10.0
pycodestyle==2.7.0
pydantic==1.8.1
pyflakes==2.3.1
PyJWT==2.0.1
pyparsing==2.4.7
pytest==6.2.3
requests==2.25.1
sentry-sdk==1.0.0
six==1.15.0
starlette==0.13.6
tenacity==7.0.0
toml==0.10.2
typing-extensions==3.7.4.3
urllib3==1.26.4
uvicorn==0.13.4
websockets==8.1
yarl==1.6.3
I'm trying to use fastapi_websocket_pubsub on a Raspberry Pi. pip install doesn't work
pi@raspberrypi:~ $ pip install fastapi_websocket_pubsub
Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Collecting fastapi_websocket_pubsub
Could not find a version that satisfies the requirement fastapi_websocket_pubsub (from versions: )
No matching distribution found for fastapi_websocket_pubsub
so I tried to install from one of the release archives directly. This produced the error
fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.
Here's the complete output:
pi@raspberrypi:~/Downloads $ python3 -m pip install v0.1.18.tar.gz
Looking in indexes: https://pypi.org/simple, https://www.piwheels.org/simple
Processing ./v0.1.18.tar.gz
Collecting broadcaster==0.2.0 (from fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/a8/28/500ebd98ca2cc8f70626bb3521fc3afed88c0d0c8a739630ccb5ec7ca47 2/broadcaster-0.2.0-py3-none-any.whl
Collecting fastapi-websocket-rpc>=0.1.18 (from fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/5c/cd/fd643ea736ff81099f975cfaaaa1788014c98a0624035fed9a602d015ed 2/fastapi_websocket_rpc-0.1.18-py3-none-any.whl
Collecting fastapi (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/2c/2a/4b5dcc2eabaa439f81387b0d3f80ebb2ea52b0436720b713fef79db32b7 e/fastapi-0.65.0-py3-none-any.whl (50kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 51kB 288kB/s
Collecting aiohttp>=3.7.2 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://www.piwheels.org/simple/aiohttp/aiohttp-3.7.4.post0-cp37-cp37m-linux_armv6l.whl (1.3MB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 1.3MB 67kB/s
Collecting pydantic (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/8f/c4/a83f0b745d824a85ee5e4c8d26329d1c687b3a9303eca762dcc37372173 4/pydantic-1.8.1-py3-none-any.whl (125kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 133kB 418kB/s
Collecting uvicorn (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/c8/de/953f0289508b1b92debdf0a6822d9b88ffb0c6ad471d709cf639a2c8a17 6/uvicorn-0.13.4-py3-none-any.whl (46kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 51kB 427kB/s
Collecting starlette==0.13.6 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/c5/a4/c9e228d7d47044ce4c83ba002f28ff479e542455f0499198a3f77c94f56 4/starlette-0.13.6-py3-none-any.whl (59kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 61kB 185kB/s
Collecting websockets (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://www.piwheels.org/simple/websockets/websockets-9.0.1-cp37-cp37m-linux_armv6l.whl (101kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 102kB 265kB/s
Collecting requests>=2.25.0 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/29/c1/24814557f1d22c56d50280771a17307e6bf87b70727d975fd6b2ce6b014 a/requests-2.25.1-py2.py3-none-any.whl (61kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 61kB 192kB/s
Collecting tenacity>=6.3.1 (from fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/41/ee/d6eddff86161c6a3a1753af4a66b06cbc508d3b77ca4698cd0374cd6653 1/tenacity-7.0.0-py2.py3-none-any.whl
Collecting multidict<7.0,>=4.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/1c/74/e8b46156f37ca56d10d895d4e8595aa2b344cff3c1fb3629ec97a8656cc b/multidict-5.1.0.tar.gz (53kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 61kB 265kB/s
Installing build dependencies ... done
Requirement already satisfied: chardet<5.0,>=2.0 in /usr/lib/python3/dist-packages (from aiohttp>=3.7.2->fastapi-websoc ket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (3.0.4)
Collecting yarl<2.0,>=1.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://www.piwheels.org/simple/yarl/yarl-1.6.3-cp37-cp37m-linux_armv6l.whl (262kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 266kB 260kB/s
Collecting async-timeout<4.0,>=3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.1 8)
Downloading https://files.pythonhosted.org/packages/e1/1e/5a4441be21b0726c4464f3f23c8b19628372f606755a9d2e46c187e65ec 4/async_timeout-3.0.1-py3-none-any.whl
Collecting typing-extensions>=3.6.5 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1. 18)
Downloading https://files.pythonhosted.org/packages/2e/35/6c4fff5ab443b57116cb1aad46421fb719bed2825664e8fe77d66d99bcb c/typing_extensions-3.10.0.0-py3-none-any.whl
Collecting attrs>=17.3.0 (from aiohttp>=3.7.2->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/20/a9/ba6f1cd1a1517ff022b35acd6a7e4246371dfab08b8e42b829b6d07913c c/attrs-21.2.0-py2.py3-none-any.whl (53kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 61kB 491kB/s
Collecting h11>=0.8 (from uvicorn->fastapi-websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18)
Downloading https://files.pythonhosted.org/packages/60/0f/7a0eeea938eaf61074f29fed9717f2010e8d0e0905d36b38d3275a1e462 2/h11-0.12.0-py3-none-any.whl (54kB)
100% |โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ| 61kB 519kB/s
Requirement already satisfied: click==7.* in /usr/lib/python3/dist-packages (from uvicorn->fastapi-websocket-rpc>=0.1.1 8->fastapi-websocket-pubsub==0.1.18) (7.0)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi- websocket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.24.1)
Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-web socket-rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2018.8.24)
Requirement already satisfied: idna<3,>=2.5 in /usr/lib/python3/dist-packages (from requests>=2.25.0->fastapi-websocket -rpc>=0.1.18->fastapi-websocket-pubsub==0.1.18) (2.6)
Requirement already satisfied: six>=1.9.0 in /usr/lib/python3/dist-packages (from tenacity>=6.3.1->fastapi-websocket-rp c>=0.1.18->fastapi-websocket-pubsub==0.1.18) (1.12.0)
Building wheels for collected packages: fastapi-websocket-pubsub, multidict
Running setup.py bdist_wheel for fastapi-websocket-pubsub ... done
Stored in directory: /tmp/pip-ephem-wheel-cache-zmte8q0s/wheels/2d/ec/39/e3f0b6ac2e50b81155acd465120b86bb85aa6fd78248 eb3d22
Running setup.py bdist_wheel for multidict ... done
Stored in directory: /home/pi/.cache/pip/wheels/e7/05/d2/f5c04c29d0e4b234dbcd4b609b51f8c65d67ff9bbd01c904b1
Successfully built fastapi-websocket-pubsub multidict
fastapi 0.65.0 has requirement starlette==0.14.2, but you'll have starlette 0.13.6 which is incompatible.
Installing collected packages: broadcaster, typing-extensions, pydantic, starlette, fastapi, multidict, yarl, async-tim eout, attrs, aiohttp, h11, uvicorn, websockets, requests, tenacity, fastapi-websocket-rpc, fastapi-websocket-pubsub
The script uvicorn is installed in '/home/pi/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed aiohttp-3.7.4.post0 async-timeout-3.0.1 attrs-21.2.0 broadcaster-0.2.0 fastapi-0.65.0 fastapi-we bsocket-pubsub-0.1.18 fastapi-websocket-rpc-0.1.18 h11-0.12.0 multidict-5.1.0 pydantic-1.8.1 requests-2.25.1 starlette- 0.13.6 tenacity-7.0.0 typing-extensions-3.10.0.0 uvicorn-0.13.4 websockets-9.0.1 yarl-1.6.3
Is there any concept of sending a message to a specific subscriber/group within this? Or is that on the roadmap at all?
Also - It looks like the example is slightly wrong here:
endpoint.register_route(app, "/pubsub")
I believe it needs to be:
endpoint.register_route(app, path="/pubsub")
If this doesn't make sense, I was hoping to have the same topics (chat_messages, events, etc) for various user levels. (guest, user, admin). So they would subscribe to /admin/chat_messages and get chat messages from all topics, whereas users would get messages from only the users channel. Hopefully that makes sense.
ValueError: too many file descriptors in select()
windows10
python3.7
when clients connect more than 500 , server will have shutdown
r, w, x = select.select(r, w, w, timeout)
ValueError: too many file descriptors in select()
fastapi_websocket_pubsub/README.md
Line 110 in 90304e3
Instead of on_event
it should be on_trigger
, am I right?
I'm wondering whether we can use celery+rabbitmq as PUB/SUB and what modifications need to be done to achieve this? Thank you.
I want to build a Single Page Application that subscribes to channels.
The connection itself is accepted, but how do I subscribe to channels in Javascript?
When subscribing to ALL_TOPICS from a client they do not receive any updates.
server.py:
import uvicorn
from fastapi import FastAPI
from fastapi_websocket_pubsub import PubSubEndpoint
import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
endpoint = PubSubEndpoint()
endpoint.register_route(app, "/pubsub")
async def on_events(subscription, data):
print(f"Hello from {subscription.topic}: {data}")
@app.on_event("startup")
async def startup_client():
# Subscribe to all incoming messages
await endpoint.subscribe(ALL_TOPICS, on_events)
async def events():
# await asyncio.sleep(1)
await endpoint.publish("other", data={"info": 3, "some_data": "test123"})
await asyncio.sleep(1)
await endpoint.publish("other", data={"info": 2, "some_data": "test456"})
await asyncio.sleep(1)
await endpoint.publish("test", data={"info": 1, "some_data": "test999"})
@app.get("/request")
async def trigger_events():
asyncio.create_task(events())
if __name__ == "__main__":
uvicorn.run(
"server:app",
host="0.0.0.0",
reload=False,
port=8020,
)
client.py:
from fastapi_websocket_pubsub.event_notifier import ALL_TOPICS
import uvicorn
from fastapi_websocket_pubsub import PubSubClient
app = FastAPI()
endpoint = PubSubClient()
async def on_events(data, topic):
print(f"Received from {topic}: {data}")
@app.on_event("startup")
async def startup_client():
# Subscribe to all incoming messages
endpoint.subscribe(ALL_TOPICS, on_events)
endpoint.start_client("ws://0.0.0.0:8020/pubsub")
@app.get("/")
async def get():
await endpoint.publish("test", data={"info": 1, "some_data": "client"})
if __name__ == "__main__":
uvicorn.run(
"client:app",
host="0.0.0.0",
reload=False,
port=8010,
)
Expected behavior:
client.py will receive all data from server.py
server.py receives all data
Observed behaviour:
client.py does not receive any data
server.py receives all data
Hello! Thanks for this great library.
I'm currently trying to use in an application, but another library I'm using requires websockets>=11. I noticed this library requires an older version. Is there a particular reason for this, or do the dependencies just need to be updated?
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.