Comments (3)
I guess this issue was created based on a question I asked on slack. The concern I had is that s3_upload
and the underlying S3 Block functions work on a bytes objects instead of a file-like object so the whole file needs to be in memory (or memory mapped) which is memory intensive or hackish depending on which you do. Either a file-like object or file path would be fine but it seems the solution suggested above (and by Nate on slack) was to use a file path use multi-part upload. This works fine for my use case and I wrote an implementation around it. It doesn't solve your entire checklist and is not completely generalized but maybe this could be the basis for an implementation or least spark some ideas. It's important to note that the upload_file
goes off in another thread so you need to use the callback function to report when finished if you want to sync things up. All that being said, code below is provided as-is since I only have tested it for my use case.
import os
from pathlib import Path
import threading
import time
from boto3.s3.transfer import TransferConfig
from prefect.filesystems import S3
from prefect_aws import AwsCredentials
class S3Future(object):
def __init__(self, filename: Path, final_key: str):
self.key = final_key
self._filename = filename
self._size = filename.stat().st_size
self._seen_so_far = 0
self._lock = threading.Lock()
self._condition = threading.Condition()
def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
if self.done():
with self._condition():
self._condition.notify_all()
def done(self) -> bool:
return self._seen_so_far >= self._size
def wait(self):
with self._condition():
self._condition.wait()
def upload_s3(fpath: Path, key: str) -> S3Future:
aws_credentials = AwsCredentials.load("dataflowops")
s3_client = aws_credentials.get_boto3_session().client("s3")
fs: S3 = S3.load("dataflowops")
bucket, keypath = fs.bucket_path.split("/", 1)
final_key = "/".join((keypath.rstrip("/"), key.lstrip("/")))
future = S3Future(fpath, final_key)
config = TransferConfig(multipart_threshold=1024 * 25, max_concurrency=10,
multipart_chunksize=1024 * 25, use_threads=True)
s3_client.upload_file(str(fpath), bucket, final_key,
Config=config,
Callback=future,
)
return future
(edit: changed to use threading.Condition() instead of polling done()
from prefect-aws.
Thanks for correcting me. I created a separate issue for expanding s3_upload
for file-like objects here.
from prefect-aws.
S3Bucket block now implements this.
from prefect-aws.
Related Issues (20)
- allow define only aws region name through AwsCredentials HOT 1
- Rate limiting on task registration using ECS Workpools
- Add a Lambda function block HOT 1
- Accept `None` as an argument to Launch Type for ECSTask with `publish_as_work_pool` HOT 1
- Make credentials optional for S3Bucket block HOT 2
- Task definition caching does not work if the task definitions come from separate deployments. HOT 2
- Error in version 0.4.8: TypeError: unhashable type: 'dict' HOT 3
- Feature Request: Support for ExtraArgs on S3Bucket
- Credential use examples are in correct in documentation HOT 6
- Add distinct style to links in docs
- ECS Works Pools Should Support Specifying Volumes for Flow Runs HOT 1
- `S3Bucket.copy_object`: Only resolve target path with self if `to_bucket` is not defined
- ECSTask block `publish_as_work_pool` does not set `network_configuration` HOT 2
- Worker mistakenly marks flow runs as Crashed HOT 1
- Improvement to ECS worker setup guide
- ECS worker: Updating family setting to template flow name/deployment
- External luanch type not supported
- Change logging prefix to avoid unnecessary task definition registrations
- Add Python 3.12 to test matrix
- ECS task definition ephemaralStorage overrides do not work
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from prefect-aws.