Comments (11)
Looks like I'll need asyncio.run_coroutine_threadsafe()
: https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe
asyncio.run_coroutine_threadsafe(coro, loop)
Unlike other asyncio functions this function requires the loop argument to be passed explicitly.
Also relevant: https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading
This is a bit confusing though. There's also loop.call_soon(callback, *args)
and its thread-safe variant loop.call_soon_threadsafe(callback, *args)
which both take regular - not async def
- Python functions and schedule them to be called on the event loop.
from datasette-upload-csvs.
I can use asyncio.get_running_loop()
to get the current loop. Note that this can only be called in an async def
function or a callback that has been passed to something like loop.call_soon()
- if you call it outside of those you get this error:
>>> import asyncio
>>> asyncio.get_running_loop()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
RuntimeError: no running event loop
>>>
>>>
>>> import asyncio
>>>
>>> def callback():
... loop = asyncio.get_running_loop()
... print("Current loop inside callback:", loop)
...
>>> async def main():
... loop = asyncio.get_running_loop()
... loop.call_soon(callback)
...
>>> asyncio.run(main())
Current loop inside callback: <_UnixSelectorEventLoop running=True closed=False debug=False>
from datasette-upload-csvs.
I'm going to try running the CSV parsing entirely in a dedicated thread. I'll start a new thread for each upload - since this is only available to signed-in users I'm not worried about thousands of concurrent uploads starving threads.
from datasette-upload-csvs.
I got a threaded version working but it feels a bit weird. I uploaded a 175MB CSV file through it and it seemed to work... but once the file had uploaded and the progress bar showed it to be fully processed hitting "refresh" on the table continued to increment the table count.
I think the thread just crammed a huge number of in-memory write operations into the in-memory queue and marked it as complete, then those kept on being processed later.
Here's the diff:
diff --git a/.gitignore b/.gitignore
index bc23806..aac8831 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ __pycache__/
*.py[cod]
*$py.class
venv
+venv-1
.eggs
.pytest_cache
*.egg-info
diff --git a/datasette_upload_csvs/__init__.py b/datasette_upload_csvs/__init__.py
index b15f26b..5a5d6cd 100644
--- a/datasette_upload_csvs/__init__.py
+++ b/datasette_upload_csvs/__init__.py
@@ -1,3 +1,4 @@
+import asyncio
from datasette import hookimpl
from datasette.utils.asgi import Response, Forbidden
from charset_normalizer import detect
@@ -10,6 +11,7 @@ import io
import os
import sqlite_utils
from sqlite_utils.utils import TypeTracker
+import threading
import uuid
@@ -124,57 +126,105 @@ async def upload_csvs(scope, receive, datasette, request):
await db.execute_write_fn(insert_initial_record)
- def insert_docs(database):
- reader = csv_std.reader(codecs.iterdecode(csv.file, encoding))
- headers = next(reader)
-
- tracker = TypeTracker()
+ # We run the CSV parser in a thread, sending 100 rows at a time to the DB
+ def parse_csv_in_thread(event_loop, csv_file, db, table_name, task_id):
+ try:
+ reader = csv_std.reader(codecs.iterdecode(csv_file, encoding))
+ headers = next(reader)
+
+ tracker = TypeTracker()
+
+ docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+
+ i = 0
+
+ def docs_with_progress():
+ nonlocal i
+ for doc in docs:
+ i += 1
+ yield doc
+ if i % 10 == 0:
+
+ def update_progress(conn):
+ database = sqlite_utils.Database(conn)
+ database["_csv_progress_"].update(
+ task_id,
+ {
+ "rows_done": i,
+ "bytes_done": csv_file.tell(),
+ },
+ )
+
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(update_progress), event_loop
+ )
+
+ def write_batch(batch):
+ def insert_batch(conn):
+ database = sqlite_utils.Database(conn)
+ database[table_name].insert_all(batch, alter=True)
+
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(insert_batch), event_loop
+ )
- docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+ batch = []
+ batch_size = 0
+ for doc in docs_with_progress():
+ batch.append(doc)
+ batch_size += 1
+ if batch_size > 100:
+ write_batch(batch)
+ batch = []
+ batch_size = 0
+
+ if batch:
+ write_batch(batch)
+
+ # Mark progress as complete
+ def mark_complete(conn):
+ nonlocal i
+ database = sqlite_utils.Database(conn)
+ database["_csv_progress_"].update(
+ task_id,
+ {
+ "rows_done": i,
+ "bytes_done": total_size,
+ "completed": str(datetime.datetime.utcnow()),
+ },
+ )
- i = 0
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(mark_complete), event_loop
+ )
- def docs_with_progress():
- nonlocal i
- for doc in docs:
- i += 1
- yield doc
- if i % 10 == 0:
- database["_csv_progress_"].update(
- task_id,
- {
- "rows_done": i,
- "bytes_done": csv.file.tell(),
- },
- )
+ # Transform columns to detected types
+ def transform_columns(conn):
+ database = sqlite_utils.Database(conn)
+ database[table_name].transform(types=tracker.types)
- database[table_name].insert_all(
- docs_with_progress(), alter=True, batch_size=100
- )
- database["_csv_progress_"].update(
- task_id,
- {
- "rows_done": i,
- "bytes_done": total_size,
- "completed": str(datetime.datetime.utcnow()),
- },
- )
- # Transform columns to detected types
- database[table_name].transform(types=tracker.types)
- return database[table_name].count
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(transform_columns), event_loop
+ )
+ except Exception as error:
- def insert_docs_catch_errors(conn):
- database = sqlite_utils.Database(conn)
- with conn:
- try:
- insert_docs(database)
- except Exception as error:
+ def insert_error(conn):
+ database = sqlite_utils.Database(conn)
database["_csv_progress_"].update(
task_id,
{"error": str(error)},
)
- await db.execute_write_fn(insert_docs_catch_errors, block=False)
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(insert_error), event_loop
+ )
+
+ loop = asyncio.get_running_loop()
+
+ # Start that thread running in the default executor in the background
+ loop.run_in_executor(
+ None, parse_csv_in_thread, loop, csv.file, db, table_name, task_id
+ )
if formdata.get("xhr"):
return Response.json(
from datasette-upload-csvs.
Also running the tests pass but I get a huge number of lines like this:
task: <Task pending name='Task-193' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-194' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>
Resulting file was pretty big prior to a vacuum too, but that may be from previous unrelated experiments:
$ ls -lah temp.db
-rw-r--r--@ 1 simon staff 458M Jan 24 16:07 temp.db
$ sqlite-utils vacuum temp.db
$ ls -lah temp.db
-rw-r--r--@ 1 simon staff 280M Jan 24 16:10 temp.db
from datasette-upload-csvs.
I think this may be a classic queue problem. Turns out parsing the CSV file as fast as possible produces writes at a rate that's too high for the database to keep up. What we actually want to do is send those writes at a pace that's slightly slower than what the DB can handle, to keep DB capacity open for other stuff that might be going on.
I may need to use https://pypi.org/project/janus/ - which is already used elsewhere in Datasette.
Actually it turns out it's only used in one place in Datasette: to set up a tiny queue on which to send a message back when you perform a blocking write operation: https://github.com/simonw/datasette/blob/7a5adb592ae6674a2058639c66e85eb1b49448fb/datasette/database.py#L201-L210
Note that setting it up with janus.Queue(maxsize=100)
should cause writes to the queue to block until there is some space in it once that maximum size has been hit.
from datasette-upload-csvs.
Thinking more about this.
The goal here is to have batches of rows from the CSV file written to the database as quickly as possible while still keeping the server able to do other things.
I think the best way to do this is to send batches of 100 rows at a time to the db.execute_write_fn()
function - maybe even with a tiny artificial sleep between to give other writes more of a chance to grab a turn?
The file has already been uploaded and saved to the temporary directory before the import kicks off. It would be nice if parsing could start happening while the file was still uploading but I'm OK ignoring that for the moment.
Is CSV parsing an I/O or a CPU bound activity? I'm going to guess it's I/O bound, which means that running it in a regular thread should still provide a performance benefit.
So I think I want a regular thread to run the CSV parsing, which sends 100 rows at a time to db.execute_write_fn()
.
The biggest question though is whether those calls should be blocking, such that the file reading operation pauses until the batch has been written to the database.
I didn't make them blocking in my first version of this, which resulted in the weird behaviour where the writes continued long after the progress bar had finished.
I'm going to try making them blocking writes instead. That means I need to figure out how to perform a blocking write from a thread, even though that thread isn't running as async def
code that can await
the call.
from datasette-upload-csvs.
I think I can use this pattern for that:
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3
So maybe I just need to call future.result()
without a timeout to wait for that thing to finish writing to the DB?
from datasette-upload-csvs.
Adding that future.result()
call really did do the job!
The server stayed fully responsive during the upload.
from datasette-upload-csvs.
Weird, still that same test failure where types are not converted on some Python versions for Datasette 1.0.
from datasette-upload-csvs.
I ended up running the CSV parsing in an asyncio task rather than a thread, yielding every 100 records.
from datasette-upload-csvs.
Related Issues (20)
- UI to select name of table at time of upload HOT 6
- If a database has no tables, add a prompt to upload a CSV
- Defaults to uploading to _internal which is wrong HOT 1
- Tests fail against latest dependencies
- URL on final confirmation page needs to use tilde encoding HOT 1
- Should use block=False HOT 1
- UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 105: invalid start byte HOT 2
- Don't hide errors in imports from the user HOT 3
- ascii codec Error uploading CSV HOT 4
- Detect column types in uploaded data automatically HOT 1
- Template extends extra_head without super()
- Allow selection of the database to add the table to HOT 4
- If table already exists, create a new table with _2
- Don't crash if there are no writable databases available HOT 4
- Forbidden - No mutable databases available
- add primary key to (possibly) enable drill-down
- Improved compatibility with Datasette 1.0a+ HOT 2
- Ability to set permissions on a per-database basis
- Title of upload page is incorrect HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from datasette-upload-csvs.