Code Monkey home page Code Monkey logo

Comments (11)

simonw avatar simonw commented on June 11, 2024

Looks like I'll need asyncio.run_coroutine_threadsafe(): https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe

asyncio.run_coroutine_threadsafe(coro, loop)

Unlike other asyncio functions this function requires the loop argument to be passed explicitly.

Also relevant: https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading

This is a bit confusing though. There's also loop.call_soon(callback, *args) and its thread-safe variant loop.call_soon_threadsafe(callback, *args) which both take regular - not async def - Python functions and schedule them to be called on the event loop.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I can use asyncio.get_running_loop() to get the current loop. Note that this can only be called in an async def function or a callback that has been passed to something like loop.call_soon() - if you call it outside of those you get this error:

>>> import asyncio
>>> asyncio.get_running_loop()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
RuntimeError: no running event loop
>>> 
>>> 
>>> import asyncio
>>> 
>>> def callback():
...     loop = asyncio.get_running_loop()
...     print("Current loop inside callback:", loop)
... 
>>> async def main():
...     loop = asyncio.get_running_loop()
...     loop.call_soon(callback)
... 
>>> asyncio.run(main())
Current loop inside callback: <_UnixSelectorEventLoop running=True closed=False debug=False>

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I'm going to try running the CSV parsing entirely in a dedicated thread. I'll start a new thread for each upload - since this is only available to signed-in users I'm not worried about thousands of concurrent uploads starving threads.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I got a threaded version working but it feels a bit weird. I uploaded a 175MB CSV file through it and it seemed to work... but once the file had uploaded and the progress bar showed it to be fully processed hitting "refresh" on the table continued to increment the table count.

I think the thread just crammed a huge number of in-memory write operations into the in-memory queue and marked it as complete, then those kept on being processed later.

Here's the diff:

diff --git a/.gitignore b/.gitignore
index bc23806..aac8831 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ __pycache__/
 *.py[cod]
 *$py.class
 venv
+venv-1
 .eggs
 .pytest_cache
 *.egg-info
diff --git a/datasette_upload_csvs/__init__.py b/datasette_upload_csvs/__init__.py
index b15f26b..5a5d6cd 100644
--- a/datasette_upload_csvs/__init__.py
+++ b/datasette_upload_csvs/__init__.py
@@ -1,3 +1,4 @@
+import asyncio
 from datasette import hookimpl
 from datasette.utils.asgi import Response, Forbidden
 from charset_normalizer import detect
@@ -10,6 +11,7 @@ import io
 import os
 import sqlite_utils
 from sqlite_utils.utils import TypeTracker
+import threading
 import uuid
 
 
@@ -124,57 +126,105 @@ async def upload_csvs(scope, receive, datasette, request):
 
     await db.execute_write_fn(insert_initial_record)
 
-    def insert_docs(database):
-        reader = csv_std.reader(codecs.iterdecode(csv.file, encoding))
-        headers = next(reader)
-
-        tracker = TypeTracker()
+    # We run the CSV parser in a thread, sending 100 rows at a time to the DB
+    def parse_csv_in_thread(event_loop, csv_file, db, table_name, task_id):
+        try:
+            reader = csv_std.reader(codecs.iterdecode(csv_file, encoding))
+            headers = next(reader)
+
+            tracker = TypeTracker()
+
+            docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+
+            i = 0
+
+            def docs_with_progress():
+                nonlocal i
+                for doc in docs:
+                    i += 1
+                    yield doc
+                    if i % 10 == 0:
+
+                        def update_progress(conn):
+                            database = sqlite_utils.Database(conn)
+                            database["_csv_progress_"].update(
+                                task_id,
+                                {
+                                    "rows_done": i,
+                                    "bytes_done": csv_file.tell(),
+                                },
+                            )
+
+                        asyncio.run_coroutine_threadsafe(
+                            db.execute_write_fn(update_progress), event_loop
+                        )
+
+            def write_batch(batch):
+                def insert_batch(conn):
+                    database = sqlite_utils.Database(conn)
+                    database[table_name].insert_all(batch, alter=True)
+
+                asyncio.run_coroutine_threadsafe(
+                    db.execute_write_fn(insert_batch), event_loop
+                )
 
-        docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+            batch = []
+            batch_size = 0
+            for doc in docs_with_progress():
+                batch.append(doc)
+                batch_size += 1
+                if batch_size > 100:
+                    write_batch(batch)
+                    batch = []
+                    batch_size = 0
+
+            if batch:
+                write_batch(batch)
+
+            # Mark progress as complete
+            def mark_complete(conn):
+                nonlocal i
+                database = sqlite_utils.Database(conn)
+                database["_csv_progress_"].update(
+                    task_id,
+                    {
+                        "rows_done": i,
+                        "bytes_done": total_size,
+                        "completed": str(datetime.datetime.utcnow()),
+                    },
+                )
 
-        i = 0
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(mark_complete), event_loop
+            )
 
-        def docs_with_progress():
-            nonlocal i
-            for doc in docs:
-                i += 1
-                yield doc
-                if i % 10 == 0:
-                    database["_csv_progress_"].update(
-                        task_id,
-                        {
-                            "rows_done": i,
-                            "bytes_done": csv.file.tell(),
-                        },
-                    )
+            # Transform columns to detected types
+            def transform_columns(conn):
+                database = sqlite_utils.Database(conn)
+                database[table_name].transform(types=tracker.types)
 
-        database[table_name].insert_all(
-            docs_with_progress(), alter=True, batch_size=100
-        )
-        database["_csv_progress_"].update(
-            task_id,
-            {
-                "rows_done": i,
-                "bytes_done": total_size,
-                "completed": str(datetime.datetime.utcnow()),
-            },
-        )
-        # Transform columns to detected types
-        database[table_name].transform(types=tracker.types)
-        return database[table_name].count
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(transform_columns), event_loop
+            )
+        except Exception as error:
 
-    def insert_docs_catch_errors(conn):
-        database = sqlite_utils.Database(conn)
-        with conn:
-            try:
-                insert_docs(database)
-            except Exception as error:
+            def insert_error(conn):
+                database = sqlite_utils.Database(conn)
                 database["_csv_progress_"].update(
                     task_id,
                     {"error": str(error)},
                 )
 
-    await db.execute_write_fn(insert_docs_catch_errors, block=False)
+            asyncio.run_coroutine_threadsafe(
+                db.execute_write_fn(insert_error), event_loop
+            )
+
+    loop = asyncio.get_running_loop()
+
+    # Start that thread running in the default executor in the background
+    loop.run_in_executor(
+        None, parse_csv_in_thread, loop, csv.file, db, table_name, task_id
+    )
 
     if formdata.get("xhr"):
         return Response.json(

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

Also running the tests pass but I get a huge number of lines like this:

task: <Task pending name='Task-193' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>
Task was destroyed but it is pending!
task: <Task pending name='Task-194' coro=<Database.execute_write_fn() done, defined at /Users/simon/.local/share/virtualenvs/datasette-upload-csvs-Tx5Sw3VW/lib/python3.10/site-packages/datasette/database.py:162> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Users/simon/.pyenv/versions/3.10.4/lib/python3.10/asyncio/futures.py:391]>

Resulting file was pretty big prior to a vacuum too, but that may be from previous unrelated experiments:

$ ls -lah temp.db
-rw-r--r--@ 1 simon  staff   458M Jan 24 16:07 temp.db
$ sqlite-utils vacuum temp.db
$ ls -lah temp.db            
-rw-r--r--@ 1 simon  staff   280M Jan 24 16:10 temp.db

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I think this may be a classic queue problem. Turns out parsing the CSV file as fast as possible produces writes at a rate that's too high for the database to keep up. What we actually want to do is send those writes at a pace that's slightly slower than what the DB can handle, to keep DB capacity open for other stuff that might be going on.

I may need to use https://pypi.org/project/janus/ - which is already used elsewhere in Datasette.

Actually it turns out it's only used in one place in Datasette: to set up a tiny queue on which to send a message back when you perform a blocking write operation: https://github.com/simonw/datasette/blob/7a5adb592ae6674a2058639c66e85eb1b49448fb/datasette/database.py#L201-L210

Note that setting it up with janus.Queue(maxsize=100) should cause writes to the queue to block until there is some space in it once that maximum size has been hit.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

Thinking more about this.

The goal here is to have batches of rows from the CSV file written to the database as quickly as possible while still keeping the server able to do other things.

I think the best way to do this is to send batches of 100 rows at a time to the db.execute_write_fn() function - maybe even with a tiny artificial sleep between to give other writes more of a chance to grab a turn?

The file has already been uploaded and saved to the temporary directory before the import kicks off. It would be nice if parsing could start happening while the file was still uploading but I'm OK ignoring that for the moment.

Is CSV parsing an I/O or a CPU bound activity? I'm going to guess it's I/O bound, which means that running it in a regular thread should still provide a performance benefit.

So I think I want a regular thread to run the CSV parsing, which sends 100 rows at a time to db.execute_write_fn().

The biggest question though is whether those calls should be blocking, such that the file reading operation pauses until the batch has been written to the database.

I didn't make them blocking in my first version of this, which resulted in the weird behaviour where the writes continued long after the progress bar had finished.

I'm going to try making them blocking writes instead. That means I need to figure out how to perform a blocking write from a thread, even though that thread isn't running as async def code that can await the call.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I think I can use this pattern for that:

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

So maybe I just need to call future.result() without a timeout to wait for that thing to finish writing to the DB?

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

Adding that future.result() call really did do the job!

progress

The server stayed fully responsive during the upload.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

Weird, still that same test failure where types are not converted on some Python versions for Datasette 1.0.

from datasette-upload-csvs.

simonw avatar simonw commented on June 11, 2024

I ended up running the CSV parsing in an asyncio task rather than a thread, yielding every 100 records.

from datasette-upload-csvs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.