Comments (23)
Another workaround would be to use sh and do something like
import sh
sh.gsutil("-m cp data gs://mybucket/data".split())
from python-storage.
@hector-sab Shelling out to gsutil
has significant limitations:
- Uploads don't create the empty "placeholder" directory-like entries that are needed to make
gcsfuse
run at a reasonable speed. It's a 10x speed factor. - Parallel downloads will put all the files into the same directory except when using
-r
to recursive download a directory. So I can't give it a list of files from multiple directories to put into multiple directories. - Error handling and recovery: Can one reliably parse its stdout to tell which transfers succeeded?
from python-storage.
Can we reopen this as a feature request? It would be useful to have helpers for recursive copies similar to gsutil.
from python-storage.
@tswast, @frankyn Perhaps we could resolve this by adding a more robust sample to the docs, based on the one @dhermes outlined above. E.g.:
"""Sample: show parallel uploads to a bucket.
"""
from concurrent import futures
import os
import sys
from google.api_core import exceptions
from google.cloud.storage import Client
def ensure_bucket(client, bucket_name):
try:
return client.get_bucket(bucket_name)
except exceptions.NotFound:
return client.create_bucket(bucket_name)
def get_files(path):
for directory, _, filenames in os.walk(path):
for filename in filenames:
yield os.path.join(directory, filename)
def upload_serial(bucket, filename):
blob = bucket.blob(filename)
blob.upload_from_filename(filename)
def main(argv):
bucket_name, dirnames = argv[0], argv[1:]
client = Client()
bucket = ensure_bucket(client, bucket_name)
pool = futures.ThreadPoolExecutor(10)
uploads = []
for dirname in dirnames:
for filename in get_files(dirname):
upload = pool.submit(upload_serial, bucket, filename)
uploads.append(upload)
futures.wait(uploads)
if __name__ == '__main__':
main(sys.argv[1:])
from python-storage.
I think that the following also guarantees that all task complete.
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(upload, files)
for res in results:
pass
from python-storage.
Using workaround similar to the above, we're getting a bunch of these errors:
urllib3.connectionpool(WARNING): Connection pool is full, discarding connection: storage.googleapis.com
even with fairly modest concurrency:
pool = futures.ThreadPoolExecutor(max_workers=5)
(with default, which is 5x # of cores, we were getting even more of those errors).
Have other folks seen that? Since the calls to urllib3 are being made in the library, is there any hook to increase the connection pool size or otherwise resolve this issue?
Given that gsutil
has an option for this, I would echo that it would be very convenient to have this implemented within the library itself.
from python-storage.
assuming that either each blob's upload uses a separate HTTP transport object, or that if they share one, it's thread-safe.
We use requests so it shares a thread-safe connection pool. :)
from python-storage.
Hi --
Code like the following (based pretty closely on the example above, only for downloading files) was resulting in missing files a good amount of the time (with a single thread, works reliably, and fails more of the time as num threads increases):
from google.cloud import storage
def get_templates(download_folder):
# filter warnings about using default credentials
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
storage_client = storage.Client()
# The "folder" where the files you want to download are
bucket = storage_client.get_bucket('somebucket')
blobs = bucket.list_blobs()
pool = futures.ThreadPoolExecutor(max_workers=5)
downloads = []
# Iterating through for loop one by one using API call
for blob in blobs:
download = pool.submit(_download_files, download_folder, bucket, blob)
downloads.append(download)
futures.wait(downloads)
def _download_files(download_folder, bucket, blob):
# Have tried pre-creating these as well
name = blob.name
if not name.endswith("/"):
subdir = os.path.dirname(name)
subdir_path = '{}/{}'.format(download_folder, subdir)
if not os.path.exists(subdir_path):
os.makedirs(subdir_path)
# The name includes the sub-directory path
file_path = '{}/{}'.format(download_folder, name)
blob.download_to_filename(file_path)
Even after following some suggestions from a helpful person on GCP community slack, and using futures.as_completed()
, it still takes me about 11 seconds to copy a directory structure with 41 relatively small Jinja templates from GCS.
The same thing with, e.g., gsutil -m cp
:
- [41/41 files][ 24.5 KiB/ 24.5 KiB] 100% Done
Operation completed over 41 objects/24.5 KiB.
gsutil -m cp -r gs://xxx /tmp 2.55s user 1.06s system 131% cpu 2.736 total
from python-storage.
Yes, in the end, the urllib3 warnings were a red herring (tho pretty sure I was able to get them with fewer than 10 threads). A solutions eng on the GCP Slack gave me an example that fixes the missing files issue.
Either way, I do think this is functionality that should be builtin to the module.
from python-storage.
@bencaine1 You can do this as follows:
def get_files(path):
for directory, _, filenames in os.walk(path):
for filename in filenames:
yield os.path.join(directory, filename)
def upload_serial(bucket, filename):
blob = bucket.blob(filename)
blob.upload_from_filename(filename)
def upload_parallel(bucket, path):
threads = []
for filename in get_files(path):
# You'll probably want something a bit more deliberate here,
# e.g. a thread pool
thread = threading.Thread(target=upload_serial, args=(bucket, filename))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
I am going to close this since the feature is "supported" in this way. I'm happy to continue discussing / reconsider if you'd like.
from python-storage.
Thanks, @dhermes.
Extra note, for context. The -m
flag in gsutil will spawn multiple processes, and multiple threads per process to perform the individual copies in parallel. The solution @dhermes mentioned above is roughly equivalent, assuming that either each blob
's upload uses a separate HTTP transport object, or that if they share one, it's thread-safe. (Gsutil uses httplib2 for its transport, which aren't thread-safe, thus we create individual http objects for each thread to use.)
from python-storage.
@frankyn I'll let you prioritize this one among other GCS feature requests.
from python-storage.
Acking
from python-storage.
@tswast is there any update regarding this feature request. Thanks
from python-storage.
@ayserarmiti none yet. I have a meeting with some of the other Python client maintainers next week to determine priority / design of useful features such as this.
from python-storage.
Hi folks,
PR googleapis/google-cloud-python#8483 attempted to resolve this feature request. This is a feature request and it's not within scope of what should be worked on at this time. We have a priority on fixing reliability bugs in our clients before feature requests.
Additionally, this feature in particular requires a design review that has not yet been prioritized.
Thank you for your patience. We will revisit this feature request in the future.
from python-storage.
Related:
https://issuetracker.google.com/issues/111828021
from python-storage.
@wyardley I'm only starting to get the urllib3 warning when the thread pool size is above 10 workers.
Even then, I think it's only a warning and the same request will be retried: https://stackoverflow.com/a/53765527
from python-storage.
I agree that it should be part of the library.
What fixed the missing files issue?
from python-storage.
It seemed to have to do with using futures.wait()
. I adapted a variation that @domZippilli kindly threw together and that’s worked relatively well, though it’s not quite as simple as the examples above. Maybe he’d be willing to gist it here.
from python-storage.
Of course! Here's the gist. Usual disclaimer that this is offered as-is, no guarantees that it will work or even not hurt you, use at your own risk.
This was about 30 days ago so my memory is a bit foggy but ISTR the key here is that one collect the futures (in this case the downloads
list) and then check them using the as_completed
function. This ensures that the program waits on every Future returning, as it must block on as_completed
until they're all done and of course future.result()
for each. My intuition with the first implementation is that some of the futures were not completing before the program exited, causing the missing files.
Then again, I cannot tell you why that would be happening based on the pydoc for futures.wait()
. That is basically an abstraction for what I implemented here. So there's probably something I'm missing, but I've always found this works better.
from python-storage.
I had also tried some simpler suggestions for fixing the above (including, if memory serves, returning something from _download_files()
, and they didn't seem to work either.
from python-storage.
It should! Executor's map
function rules. And I believe the iterator yields the result of each future, so that is a succinct way of doing the same.
from python-storage.
Related Issues (20)
- tests.system.test_bucket: test_blob_exists_hierarchy failed HOT 1
- tests.system.test_bucket: test_bucket_list_blobs_hierarchy_root_level failed
- tests.system.test_bucket: test_bucket_list_blobs_hierarchy_first_level failed
- tests.system.test_bucket: test_bucket_list_blobs_hierarchy_second_level failed
- tests.system.test_bucket: test_bucket_list_blobs_hierarchy_third_level failed
- tests.system.test_bucket: test_bucket_list_blobs_hierarchy_w_include_trailing_delimiter failed
- tests.system.test_bucket: test_bucket_list_blobs failed HOT 1
- tests.system.test_bucket: test_bucket_list_blobs_w_user_project failed
- tests.system.test_bucket: test_bucket_list_blobs_paginated failed HOT 1
- tests.system.test_bucket: test_bucket_list_blobs_paginated_w_offset failed HOT 1
- tests.system.test_transfer_manager: test_upload_many_skip_if_exists failed HOT 2
- tests.system.test_transfer_manager: test_upload_many_from_filenames_with_attributes failed HOT 1
- tests.system.test_transfer_manager: test_download_many failed HOT 1
- tests.system.test_transfer_manager: test_download_many_with_threads_and_file_objs failed HOT 1
- tests.system.test_bucket: test_ubla_set_unset_preserves_acls failed HOT 1
- tests.system.test_kms_integration: test_bucket_w_default_kms_key_name failed HOT 1
- Make it possible to update / add to the user-agent for an existing client object
- Micropi cannot install the package HOT 1
- OSError occurred while downloading files using transfer_manager.download_many_to_path HOT 6
- Warning: a recent release failed
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from python-storage.