Code Monkey home page Code Monkey logo

Comments (36)

jsigee87 avatar jsigee87 commented on September 26, 2024 23

Wanted to bump this thread, I am currently trying to write async BigQuery jobs myself and it would be great if the job were awaitable.

from python-bigquery.

chalmerlowe avatar chalmerlowe commented on September 26, 2024 21

Gonna close this out as "Will not work", due to conflicting priorities.

from python-bigquery.

adamserafini avatar adamserafini commented on September 26, 2024 14

Any update from Google on asyncio compatibility for BigQuery lib?

from python-bigquery.

kiraksi avatar kiraksi commented on September 26, 2024 12

For awareness: This is an internship project and my internship ends in 2 weeks. I plan on implementing an AsyncClient for async_query_and_wait rpc, there has been other rpc's requested to be made asynchronous which I may not have time for. For future maintainers I will document my progress here, but please refer to the internal design doc on this feature which can be found at my internship documentation website go/kirnendra-internship

RPCs to make async:

  • query_and_wait - in progress
  • query
  • get_job
  • get_query_results
  • get_table
  • list_partitions
  • get_partition

from python-bigquery.

adamserafini avatar adamserafini commented on September 26, 2024 10

Amazing @kiraksi, that is awesome.

But,
@google leadership: I find it sad that a 5 year old issue to move the library to properly support asyncio Python has taken this long and treated as a side project for the organisation.

The message I take from it is that the Google leadership simply doesn't care about supporting Python as a first-class citizen of the Google Cloud Developer Experience.

from python-bigquery.

kiraksi avatar kiraksi commented on September 26, 2024 10

New updates: there are some blocking issues in that in order to make a completely asynchronous RowIterator object for the query and query_and_wait methods, which is a child of HTTPIterator in google-api-core library, I would need an AsyncHTTPIterator, I have optioned this issue here: googleapis/python-api-core#627 and I may make this PR myself. However until then I will be focusing on the get_ methods that won't have this blocking issue. Additionally, there is current work on this but I would like this method exposed: googleapis/google-auth-library-python#613

from python-bigquery.

tseaver avatar tseaver commented on September 26, 2024 9

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

from python-bigquery.

eboddington avatar eboddington commented on September 26, 2024 9

Would definitely appreciate this as well

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024 9

+1 for reopening!

from python-bigquery.

chalmerlowe avatar chalmerlowe commented on September 26, 2024 9

At this moment, this task is on hold. We will revisit it when we have manpower. I am not gonna close it so that it remains on the radar.

from python-bigquery.

tommyhutcheson avatar tommyhutcheson commented on September 26, 2024 4

Hey @tswast @chalmerlowe
Is there an update for this issue and @kiraksi PR #1853 the python / BigQuery communities would love this feature to be available.

from python-bigquery.

zbarnett avatar zbarnett commented on September 26, 2024 3

I just ran into this problem myself and found out that even though the job itself is "asynchronous" there are actually two places where synchronous I/O is happening.

From the example usage on https://googleapis.dev/python/bigquery/latest/index.html

from google.cloud import bigquery

client = bigquery.Client()

# Perform a query.
QUERY = (
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
    'WHERE state = "TX" '
    'LIMIT 100')
query_job = client.query(QUERY)  # API request
rows = query_job.result()  # Waits for query to finish

for row in rows:
    print(row.name)

client.query(QUERY) makes a synchronous network request as does query_job.result()

Usually, blocking on result() takes the majority of the time, but I've seen cases where query() can take over a second to complete by itself.

As a workaround for now, I'm running all the BigQuery data fetching code in a separate thread to free up the event loop but it would be fantastic if the BigQuery API supported async I/O.

Example workaround (code fragment):

def get_data(client, query):
    return client.query(query).result()

loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, get_data, client, query)

from python-bigquery.

yanhong-zhao-ef avatar yanhong-zhao-ef commented on September 26, 2024 2

still waiting here... do you have an approximate timeline for this?

from python-bigquery.

chalmerlowe avatar chalmerlowe commented on September 26, 2024 2

@adamserafini Thank you for your input.
We will take it into consideration.

from python-bigquery.

pingyeh avatar pingyeh commented on September 26, 2024 2

Before the async libraries are ready, it seems the done callback is the best way to run multiple queries in parallel unless the developer is willing to do multi-threading.

Can you please improve the documentation on add_done_callback? It is unclear what the argument to the callback is, and what the callback can do with it. It's a Future class, but which Future class is it? Is it asyncio.Future or google.api_core.future.base.Future or something else?

from python-bigquery.

tswast avatar tswast commented on September 26, 2024 1

We've identified EOL for new versions of the BigQuery client library as January 1, 2020 in googleapis/google-cloud-python#9036 We can revisit this request after that date.

from python-bigquery.

tswast avatar tswast commented on September 26, 2024 1

I talked with some folks on Firestore who've implemented this. They suggest a trio of classes: async_batch, base_batch, batch https://github.com/googleapis/python-firestore/tree/master/google/cloud/firestore_v1 in order to avoid too much duplicated effort.

from python-bigquery.

tseaver avatar tseaver commented on September 26, 2024

@dkapitan Re: the async await / async def keywords: our codebase straddles both Python2 and Python3 until at least the end of this year, and thus cannot adopt the syntax directly in the codebase until we drop Python2 support.

We might be able to tweak the google.api_core.operation.Operation class to support that usage at a low level. Can you please post the traceback for the AttributeError?

Also, for your example above, your callback isn't doing anything with result: it is being called from a helper thread, and would need to do something to signal the main thread

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024

@tseaver thanks for clarifying. I have read up more on concurrent.futures and asyncio. I understand the latter is indeed quite new and Python 3 only. Will investigate the sample code a bit more, and get to you with the results and/or the traceback.

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024

@tseaver here's traceback from the AttributeError

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-19-e9025f68a561> in <module>
      9 [operation.add_done_callback(my_callback) for operation in operations]
     10 results2 = []
---> 11 for future in as_completed(operations):
     12   results2.append(list(future.result()))

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in as_completed(fs, timeout)
    217     fs = set(fs)
    218     total_futures = len(fs)
--> 219     with _AcquireFutures(fs):
    220         finished = set(
    221                 f for f in fs

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in __enter__(self)
    144     def __enter__(self):
    145         for future in self.futures:
--> 146             future._condition.acquire()
    147 
    148     def __exit__(self, *args):

AttributeError: 'QueryJob' object has no attribute '_condition'

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024

@tseaver
For now, I have decided to do the following:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

Rather than using as_completed I prefer to use the built-in async functionality from the bigquery jobs themselves. This also makes it possible for me to decompose the datapipeline into separate Cloud Functions, without having to keep the main ThreadPoolExecutor live for the duration of the whole pipeline. Incidentally, this was the reason why I was looking into this: my pipelines are longer than the max timeout of 9 minutes for Cloud Functions (or even 15 minutes for Cloud Run).

Downside is I need to keep track of all the job_ids across the various functions, but that is relatively easy to solve when configuring the pipeline by specifying inputs and outputs such that they form a directed acyclic graph.

Any suggestions are welcome. Would be nice if at some point in the future, Google's api future plays nicely with Python's future (no pun intended).

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024

@tseaver
Nice, thanks. Never thought of using a set to do a kind of countdown ... 😄

from python-bigquery.

northtree avatar northtree commented on September 26, 2024

@tseaver Thanks for the simplify example. Could you fix two typos?

awaiting_jobs.discard(future.job_id)

job = bq.query(query)

from python-bigquery.

tseaver avatar tseaver commented on September 26, 2024

@northtree Thanks for catching those: updated.

from python-bigquery.

sambcom avatar sambcom commented on September 26, 2024

How do you get the rows from your different queries? Won't this block on job.result() What am I missing?

from python-bigquery.

dkapitan avatar dkapitan commented on September 26, 2024

@sambcom
apologies for the late response; forgot to subscribe to this issue.
In my use case, the results of the query are explicitly written to new tables, as part of an ELT pipeline. So that's no issue.

I understand that generically speaking, you could write query results to (temporary) tables in BigQuery, so this should not be a blocking issue.

from python-bigquery.

yan-hic avatar yan-hic commented on September 26, 2024

We too have hundreds of load and dedupe queries running async. We do use futures but only to submit the jobs at once and get the id's.

From there we poke regularly the Stackdriver logging (using airflow) for the status of all in one API call.
Waiting for blocking result() is inefficient if you only want to know when jobs are done.

I wish get_job() could support multiple jobids but the API doesn't and an enhancement request has stalled.

from python-bigquery.

vgrabovets avatar vgrabovets commented on September 26, 2024

Any news here? You've already dropped support for Python 2.

from python-bigquery.

tswast avatar tswast commented on September 26, 2024

@yanhong-zhao-ef I'd love to have this feature, but it's going to take some design work to get right. In other Google Cloud libraries, asynchronous support is provided by a completely separate "AsyncClient". Since this BigQuery library is handwritten, I'd like to avoid having to maintain two copies of the same methods.

from python-bigquery.

jarednash0 avatar jarednash0 commented on September 26, 2024

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

I tried implementing this with copy_table, but I added "assert future.done()" as the first line in callback() and found that the assertion was failing. This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

from python-bigquery.

tswast avatar tswast commented on September 26, 2024

This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

@jarednash0 Definitely not intended. It'd be worth filing a separate issue for this so that we can address it, ideally with the code example that reproduces it.

from python-bigquery.

theophile-dh avatar theophile-dh commented on September 26, 2024

Any news on this issue? It would be a very useful feature

from python-bigquery.

jasonwillschiu avatar jasonwillschiu commented on September 26, 2024

This is probably not the best, but right now a simple way to get queries running synchronously is to chain the queries together in one mega(big)query haha.

My clunky example:

Q1 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  SELECT * EXCEPT({currency_pair}) 
  FROM `{project}.{dataset}.{table}`;
  """ 

Q2 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  (SELECT *
  FROM `{project}.{dataset}.{table}` tab1
  JOIN (
  SELECT {date_col}, {currency_pair}
  FROM `{project}.currency_conversion.{base_currency}_conversion_table` 
  ) currency
  ON tab1.date=currency.{date_col}
  );
  """

Chained_Query1 = Q1 + Q2
query_job = client.query(Chained_Query1)

from python-bigquery.

adamserafini avatar adamserafini commented on September 26, 2024

AFAICT the above proposal isn't actually making the query awaitable though, is it? we want to run the query asynchronously, not synchronously.

from python-bigquery.

jasonwillschiu avatar jasonwillschiu commented on September 26, 2024

My bad, I thought my data was getting stuffed around because these bigquery processes were asynchronous already

from python-bigquery.

Tunneller avatar Tunneller commented on September 26, 2024

Any news kiraksi? I guess that your internship has ended. Will the PR still go forward? Can you elaborate more on what we will see? You mention that your solution will not be completely asynchronous?

from python-bigquery.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.