Comments (36)
Wanted to bump this thread, I am currently trying to write async BigQuery jobs myself and it would be great if the job were awaitable
.
from python-bigquery.
Gonna close this out as "Will not work", due to conflicting priorities.
from python-bigquery.
Any update from Google on asyncio
compatibility for BigQuery lib?
from python-bigquery.
For awareness: This is an internship project and my internship ends in 2 weeks. I plan on implementing an AsyncClient for async_query_and_wait rpc, there has been other rpc's requested to be made asynchronous which I may not have time for. For future maintainers I will document my progress here, but please refer to the internal design doc on this feature which can be found at my internship documentation website go/kirnendra-internship
RPCs to make async:
- query_and_wait - in progress
- query
- get_job
- get_query_results
- get_table
- list_partitions
- get_partition
from python-bigquery.
Amazing @kiraksi, that is awesome.
But,
@google leadership: I find it sad that a 5 year old issue to move the library to properly support asyncio Python has taken this long and treated as a side project for the organisation.
The message I take from it is that the Google leadership simply doesn't care about supporting Python as a first-class citizen of the Google Cloud Developer Experience.
from python-bigquery.
New updates: there are some blocking issues in that in order to make a completely asynchronous RowIterator object for the query and query_and_wait methods, which is a child of HTTPIterator in google-api-core library, I would need an AsyncHTTPIterator, I have optioned this issue here: googleapis/python-api-core#627 and I may make this PR myself. However until then I will be focusing on the get_ methods that won't have this blocking issue. Additionally, there is current work on this but I would like this method exposed: googleapis/google-auth-library-python#613
from python-bigquery.
@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited
.
You could simplify your example code using a set, e.g.:
from time import sleep
query1 = """
SELECT
language.name,
average(language.bytes)
FROM `bigquery-public-data.github_repos.languages`
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'
queries = [query_1, query_2]
awaiting_jobs = set()
def callback(future):
awaiting_jobs.discard(future.job_id)
for query in queries:
job = bq.query(query)
awaiting_jobs.add(job.job_id)
job.add_done_callback(callback)
while awaiting_jobs:
print('waiting for jobs to finish ... sleeping for 1s')
sleep(1)
print('all jobs done, do your stuff')
from python-bigquery.
Would definitely appreciate this as well
from python-bigquery.
+1 for reopening!
from python-bigquery.
At this moment, this task is on hold. We will revisit it when we have manpower. I am not gonna close it so that it remains on the radar.
from python-bigquery.
Hey @tswast @chalmerlowe
Is there an update for this issue and @kiraksi PR #1853 the python / BigQuery communities would love this feature to be available.
from python-bigquery.
I just ran into this problem myself and found out that even though the job itself is "asynchronous" there are actually two places where synchronous I/O is happening.
From the example usage on https://googleapis.dev/python/bigquery/latest/index.html
from google.cloud import bigquery
client = bigquery.Client()
# Perform a query.
QUERY = (
'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
'WHERE state = "TX" '
'LIMIT 100')
query_job = client.query(QUERY) # API request
rows = query_job.result() # Waits for query to finish
for row in rows:
print(row.name)
client.query(QUERY)
makes a synchronous network request as does query_job.result()
Usually, blocking on result()
takes the majority of the time, but I've seen cases where query()
can take over a second to complete by itself.
As a workaround for now, I'm running all the BigQuery data fetching code in a separate thread to free up the event loop but it would be fantastic if the BigQuery API supported async I/O.
Example workaround (code fragment):
def get_data(client, query):
return client.query(query).result()
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, get_data, client, query)
from python-bigquery.
still waiting here... do you have an approximate timeline for this?
from python-bigquery.
@adamserafini Thank you for your input.
We will take it into consideration.
from python-bigquery.
Before the async libraries are ready, it seems the done callback is the best way to run multiple queries in parallel unless the developer is willing to do multi-threading.
Can you please improve the documentation on add_done_callback? It is unclear what the argument to the callback is, and what the callback can do with it. It's a Future
class, but which Future
class is it? Is it asyncio.Future
or google.api_core.future.base.Future
or something else?
from python-bigquery.
We've identified EOL for new versions of the BigQuery client library as January 1, 2020 in googleapis/google-cloud-python#9036 We can revisit this request after that date.
from python-bigquery.
I talked with some folks on Firestore who've implemented this. They suggest a trio of classes: async_batch, base_batch, batch https://github.com/googleapis/python-firestore/tree/master/google/cloud/firestore_v1 in order to avoid too much duplicated effort.
from python-bigquery.
@dkapitan Re: the async await
/ async def
keywords: our codebase straddles both Python2 and Python3 until at least the end of this year, and thus cannot adopt the syntax directly in the codebase until we drop Python2 support.
We might be able to tweak the google.api_core.operation.Operation
class to support that usage at a low level. Can you please post the traceback for the AttributeError
?
Also, for your example above, your callback isn't doing anything with result
: it is being called from a helper thread, and would need to do something to signal the main thread
from python-bigquery.
@tseaver thanks for clarifying. I have read up more on concurrent.futures
and asyncio
. I understand the latter is indeed quite new and Python 3 only. Will investigate the sample code a bit more, and get to you with the results and/or the traceback.
from python-bigquery.
@tseaver here's traceback from the AttributeError
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-19-e9025f68a561> in <module>
9 [operation.add_done_callback(my_callback) for operation in operations]
10 results2 = []
---> 11 for future in as_completed(operations):
12 results2.append(list(future.result()))
/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in as_completed(fs, timeout)
217 fs = set(fs)
218 total_futures = len(fs)
--> 219 with _AcquireFutures(fs):
220 finished = set(
221 f for f in fs
/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in __enter__(self)
144 def __enter__(self):
145 for future in self.futures:
--> 146 future._condition.acquire()
147
148 def __exit__(self, *args):
AttributeError: 'QueryJob' object has no attribute '_condition'
from python-bigquery.
@tseaver
For now, I have decided to do the following:
import numpy as np
from time import sleep
query1 = """
SELECT
language.name,
average(language.bytes)
FROM `bigquery-public-data.github_repos.languages`
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'
def dummy_callback(future):
global jobs_done
jobs_done[future.job_id] = True
jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]
# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
print('waiting for jobs to finish ... sleeping for 1s')
sleep(1)
print('all jobs done, do your stuff')
Rather than using as_completed
I prefer to use the built-in async functionality from the bigquery jobs themselves. This also makes it possible for me to decompose the datapipeline into separate Cloud Functions, without having to keep the main ThreadPoolExecutor
live for the duration of the whole pipeline. Incidentally, this was the reason why I was looking into this: my pipelines are longer than the max timeout of 9 minutes for Cloud Functions (or even 15 minutes for Cloud Run).
Downside is I need to keep track of all the job_id
s across the various functions, but that is relatively easy to solve when configuring the pipeline by specifying inputs and outputs such that they form a directed acyclic graph.
Any suggestions are welcome. Would be nice if at some point in the future, Google's api future
plays nicely with Python's future
(no pun intended).
from python-bigquery.
@tseaver
Nice, thanks. Never thought of using a set to do a kind of countdown ... 😄
from python-bigquery.
@tseaver Thanks for the simplify example. Could you fix two typos?
awaiting_jobs.discard(future.job_id)
job = bq.query(query)
from python-bigquery.
@northtree Thanks for catching those: updated.
from python-bigquery.
How do you get the rows from your different queries? Won't this block on job.result() What am I missing?
from python-bigquery.
@sambcom
apologies for the late response; forgot to subscribe to this issue.
In my use case, the results of the query are explicitly written to new tables, as part of an ELT pipeline. So that's no issue.
I understand that generically speaking, you could write query results to (temporary) tables in BigQuery, so this should not be a blocking issue.
from python-bigquery.
We too have hundreds of load and dedupe queries running async. We do use futures
but only to submit the jobs at once and get the id's.
From there we poke regularly the Stackdriver logging (using airflow) for the status of all in one API call.
Waiting for blocking result()
is inefficient if you only want to know when jobs are done.
I wish get_job()
could support multiple jobids but the API doesn't and an enhancement request has stalled.
from python-bigquery.
Any news here? You've already dropped support for Python 2.
from python-bigquery.
@yanhong-zhao-ef I'd love to have this feature, but it's going to take some design work to get right. In other Google Cloud libraries, asynchronous support is provided by a completely separate "AsyncClient". Since this BigQuery library is handwritten, I'd like to avoid having to maintain two copies of the same methods.
from python-bigquery.
@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be
awaited
.You could simplify your example code using a set, e.g.:
from time import sleep query1 = """ SELECT language.name, average(language.bytes) FROM `bigquery-public-data.github_repos.languages` , UNNEST(language) AS language GROUP BY language.name""" query2 = 'SELECT 2' queries = [query_1, query_2] awaiting_jobs = set() def callback(future): awaiting_jobs.discard(future.job_id) for query in queries: job = bq.query(query) awaiting_jobs.add(job.job_id) job.add_done_callback(callback) while awaiting_jobs: print('waiting for jobs to finish ... sleeping for 1s') sleep(1) print('all jobs done, do your stuff')
I tried implementing this with copy_table, but I added "assert future.done()" as the first line in callback() and found that the assertion was failing. This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?
from python-bigquery.
This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?
@jarednash0 Definitely not intended. It'd be worth filing a separate issue for this so that we can address it, ideally with the code example that reproduces it.
from python-bigquery.
Any news on this issue? It would be a very useful feature
from python-bigquery.
This is probably not the best, but right now a simple way to get queries running synchronously is to chain the queries together in one mega(big)query haha.
My clunky example:
Q1 = f"""
CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
AS
SELECT * EXCEPT({currency_pair})
FROM `{project}.{dataset}.{table}`;
"""
Q2 = f"""
CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
AS
(SELECT *
FROM `{project}.{dataset}.{table}` tab1
JOIN (
SELECT {date_col}, {currency_pair}
FROM `{project}.currency_conversion.{base_currency}_conversion_table`
) currency
ON tab1.date=currency.{date_col}
);
"""
Chained_Query1 = Q1 + Q2
query_job = client.query(Chained_Query1)
from python-bigquery.
AFAICT the above proposal isn't actually making the query awaitable though, is it? we want to run the query asynchronously, not synchronously.
from python-bigquery.
My bad, I thought my data was getting stuffed around because these bigquery processes were asynchronous already
from python-bigquery.
Any news kiraksi? I guess that your internship has ended. Will the PR still go forward? Can you elaborate more on what we will see? You mention that your solution will not be completely asynchronous?
from python-bigquery.
Related Issues (20)
- integrate with `lower-bound-checker` to ensure constraints files stay in sync with `setup.py` HOT 7
- Support inserting data into BigQuery directly from a Polars DataFrame HOT 4
- BigQuery Python client fails to query JSON field with heterogeneous data types
- Cannot append to REQUIRED field when using `client.load_table_from_file` without providing table schema HOT 7
- Should a rateLimitExceeded have a "429 Too Many Requests" instead of "403 Forbidden"? HOT 1
- snapshot_definition.snapshot_time throws exception
- Add a warning in the type mapping if no conversion function is found
- `table_constraints` has no setter?
- Retry didn't happen
- BUG: `read_pandas` Error: 'values' Attribute Missing in `ChunkedArray` HOT 4
- totalBytesProcessed is not exposed from RowIterator HOT 1
- BUG: `bq_to_arrow_field` in `_pandas_helper.py` always sets `pyarrow.field` to nullable
- [BUG] Undefined symbol in __all__
- RecursionError when pickling Table object
- Should a rateLimitExceeded have a "429 Too Many Requests" instead of "403 Forbidden"? HOT 2
- Cannot append to REPEATED field when using `client.load_table_from_file` with parquet file HOT 1
- ArrayQueryParameter.from_api_repr does not support empty array
- No retry on Impersonation Token Bigquery HOT 2
- support adbc APIs in DB-API package
- Select columns with `extract_table`? HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from python-bigquery.