Comments (18)
wow, changed it and it works now, that was fast
from prefect-gcp.
Thanks for the suggestion! Would you be interested in contributing a method?
It would be very similar to list_blobs
and simply:
- updating the name to
list_folders
- replacing
if not blob.name.endswith("/")]
withif blob.name.endswith("/")]
- updating the docstring
- adding a test
https://github.com/PrefectHQ/prefect-gcp/blob/main/prefect_gcp/cloud_storage.py#L715-L745
from prefect-gcp.
from prefect-gcp.
Thank you! Please let me know if you have any questions.
from prefect-gcp.
I managed to modify the list_blobs method. This is how it looks like:
@sync_compatible
async def list_folders(self, folder: str = "") -> List[str]:
"""
Lists all folders in the bucket.
Args:
folder: List all folders inside given folder.
Returns:
A list of folders.
Examples:
Get all folders from a bucket named "my-bucket".
```python
from prefect_gcp.cloud_storage import GcsBucket
gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.list_folders()
```
"""
client = self.gcp_credentials.get_cloud_storage_client()
bucket_path = self._join_bucket_folder(folder)
self.logger.info(f"Listing blobs in bucket {bucket_path}.")
blobs = await run_sync_in_worker_thread(
client.list_blobs, self.bucket, prefix=bucket_path
)
# Ignore blobs
return [blob for blob in blobs if blob.name.endswith("/")]
However, I am not able to test it yet as it gives me a Timeout error with code 113. This applies to list_blobs and get_directory as well while using GcsBucket.
On the other hand, the filesystems.GCS works accordingly for downloading a directory. This also excludes gcp related issues so maybe I'm doing something wrong here.
from prefect-gcp.
I updated your comment with three "`" so it's formatted nicely.
Would you be able to create a PR and then I can directly pull the branch down to test? Instructions here: https://github.com/PrefectHQ/prefect-gcp#contributing
from prefect-gcp.
done, let me know if there's any problem
from prefect-gcp.
Thanks for reporting this issue; it should be fixed in #120!
from prefect-gcp.
Okay, that PR is merged. Can you pull the latest main into your PR?
from prefect-gcp.
Yes, it's up to date now. I tried again with the new changes but I get 404 error on gcp api. Would the stacktrace help you?
from prefect-gcp.
Even with list_blobs?
from prefect-gcp.
yes, I tried it with a name of a folder in bucket as argument and also with None
from prefect-gcp.
Can you share the code you used + traceback? Thanks!
from prefect-gcp.
In [1]: from prefect_gcp.cloud_storage import GcsBucket
In [2]: buck = GcsBucket.load("bucket-test-block")
In [3]: buck.list_blobs(None)
20:22:30.856 | INFO | prefect.GcsBucket - Listing blobs in bucket None.
---------------------------------------------------------------------------
NotFound Traceback (most recent call last)
Cell In[3], line 1
----> 1 buck.list_blobs(None)
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:230, in sync_compatible.<locals>.coroutine_wrapper(*args, **kwargs)
226 return run_async_from_worker_thread(async_fn, *args, **kwargs)
227 else:
228 # In a sync context and there is no event loop; just create an event loop
229 # to run the async code then tear it down
--> 230 return run_async_in_new_loop(async_fn, *args, **kwargs)
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:181, in run_async_in_new_loop(__fn, *args, **kwargs)
180 def run_async_in_new_loop(__fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any):
--> 181 return anyio.run(partial(__fn, *args, **kwargs))
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/anyio/_core/_eventloop.py:56, in run(func, backend, backend_options, *args)
54 try:
55 backend_options = backend_options or {}
---> 56 return asynclib.run(func, *args, **backend_options)
57 finally:
58 if token:
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:233, in run(func, debug, use_uvloop, policy, *args)
230 del _task_states[task]
232 _maybe_set_event_loop_policy(policy, use_uvloop)
--> 233 return native_run(wrapper(), debug=debug)
File ~/anaconda3/envs/prefect-dev/lib/python3.10/asyncio/runners.py:44, in run(main, debug)
42 if debug is not None:
43 loop.set_debug(debug)
---> 44 return loop.run_until_complete(main)
45 finally:
46 try:
File ~/anaconda3/envs/prefect-dev/lib/python3.10/asyncio/base_events.py:649, in BaseEventLoop.run_until_complete(self, future)
646 if not future.done():
647 raise RuntimeError('Event loop stopped before Future completed.')
--> 649 return future.result()
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:228, in run.<locals>.wrapper()
225 task.set_name(task_state.name)
227 try:
--> 228 return await func(*args)
229 finally:
230 del _task_states[task]
File ~/projects/prefect-gcp/prefect_gcp/cloud_storage.py:755, in GcsBucket.list_blobs(self, folder)
750 blobs = await run_sync_in_worker_thread(
751 client.list_blobs, self.bucket, prefix=bucket_path
752 )
754 # Ignore folders
--> 755 return [blob for blob in blobs if not blob.name.endswith("/")]
File ~/projects/prefect-gcp/prefect_gcp/cloud_storage.py:755, in <listcomp>(.0)
750 blobs = await run_sync_in_worker_thread(
751 client.list_blobs, self.bucket, prefix=bucket_path
752 )
754 # Ignore folders
--> 755 return [blob for blob in blobs if not blob.name.endswith("/")]
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/page_iterator.py:208, in Iterator._items_iter(self)
206 def _items_iter(self):
207 """Iterator for each item returned."""
--> 208 for page in self._page_iter(increment=False):
209 for item in page:
210 self.num_results += 1
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/page_iterator.py:244, in Iterator._page_iter(self, increment)
232 def _page_iter(self, increment):
233 """Generator of pages of API responses.
234
235 Args:
(...)
242 Page: each page of items from the API.
243 """
--> 244 page = self._next_page()
245 while page is not None:
246 self.page_number += 1
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/page_iterator.py:373, in HTTPIterator._next_page(self)
366 """Get the next page in the iterator.
367
368 Returns:
369 Optional[Page]: The next page in the iterator or :data:`None` if
370 there are no pages left.
371 """
372 if self._has_next_page():
--> 373 response = self._get_next_page_response()
374 items = response.get(self._items_key, ())
375 page = Page(self, items, self.item_to_value, raw_page=response)
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/page_iterator.py:432, in HTTPIterator._get_next_page_response(self)
430 params = self._get_query_params()
431 if self._HTTP_METHOD == "GET":
--> 432 return self.api_request(
433 method=self._HTTP_METHOD, path=self.path, query_params=params
434 )
435 elif self._HTTP_METHOD == "POST":
436 return self.api_request(
437 method=self._HTTP_METHOD, path=self.path, data=params
438 )
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/cloud/storage/_http.py:72, in Connection.api_request(self, *args, **kwargs)
70 if retry:
71 call = retry(call)
---> 72 return call()
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/retry.py:349, in Retry.__call__.<locals>.retry_wrapped_func(*args, **kwargs)
345 target = functools.partial(func, *args, **kwargs)
346 sleep_generator = exponential_sleep_generator(
347 self._initial, self._maximum, multiplier=self._multiplier
348 )
--> 349 return retry_target(
350 target,
351 self._predicate,
352 sleep_generator,
353 self._timeout,
354 on_error=on_error,
355 )
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/api_core/retry.py:191, in retry_target(target, predicate, sleep_generator, timeout, on_error, **kwargs)
189 for sleep in sleep_generator:
190 try:
--> 191 return target()
193 # pylint: disable=broad-except
194 # This function explicitly must deal with broad exceptions.
195 except Exception as exc:
File ~/anaconda3/envs/prefect-dev/lib/python3.10/site-packages/google/cloud/_http/__init__.py:494, in JSONConnection.api_request(self, method, path, query_params, data, content_type, headers, api_base_url, api_version, expect_json, _target_object, timeout, extra_api_info)
482 response = self._make_request(
483 method=method,
484 url=url,
(...)
490 extra_api_info=extra_api_info,
491 )
493 if not 200 <= response.status_code < 300:
--> 494 raise exceptions.from_http_response(response)
496 if expect_json and response.content:
497 return response.json()
NotFound: 404 GET https://storage.googleapis.com/storage/v1/b/gs://testing_bucket_321/o?projection=noAcl&prefix=None&prettyPrint=false: Not Found
from prefect-gcp.
I think you specified the bucket_name with gs://
which you don't need in prefect_gcp.GcsBucket
(but you do need in prefect.GCS
which understandably is confusing). I'll try to update the docs to mention exclude gs://
Can you try dropping the gs:// prefix in the bucket_name and try again?
from prefect-gcp.
Awesome!
from prefect-gcp.
When you're ready, feel free to make a PR directly on this repo:
https://github.com/PrefectHQ/prefect-gcp/compare
from prefect-gcp.
Managed to wrap everything and create a PR:
#121
please let me know if I should change something
from prefect-gcp.
Related Issues (20)
- Unable to set Vertex runner disk size (but was able to in prefect 1) HOT 4
- Emphasize project steps in documentation HOT 2
- Create GCS Project Push Step
- Fix Official Docs on upload_from_dataframe HOT 1
- Create GCS Project Pull Step
- Fix `upload_from_dataframe` Compressed Parquet serialization to .gz.parquet & .snappy.parquet HOT 1
- VPC Connector annotations are in the wrong place in CloudRunJob
- VertexAICustomTrainingJob does not have accelerator_count
- `list_blobs` and `list_folder` log incorrect bucket name HOT 2
- Add support of maxRetries to CloudRunJob HOT 2
- Upgrade Cloud Run to use API v2
- Persist Labels from Prefect to Vertex AI Job
- Create Prefect GCP Worker that works with Cloud Run v2 HOT 2
- Create GCP Vertex AI worker HOT 3
- Add `kill_infrastructure` support for Vertex AI Worker
- vpc-access-connector error in Google Cloud Run Work Pool
- Default Prefect Command for Cloud Run V2 Worker not up-to-date
- Cloud Run V2 Flow Needs to Refresh Credentials after being paused for 60+ minutes
- VPC Connector Name does not work in Cloud Run Worker v2 HOT 1
- Cloud Run V2 Worker is creating duplicate cloud run jobs
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from prefect-gcp.