prefecthq / prefect-aws Goto Github PK
View Code? Open in Web Editor NEWPrefect integrations with AWS.
Home Page: https://PrefectHQ.github.io/prefect-aws/
License: Apache License 2.0
Prefect integrations with AWS.
Home Page: https://PrefectHQ.github.io/prefect-aws/
License: Apache License 2.0
Create a task that uploads a file to S3 bucket. The task should accept a required file_name
, bucket
, aws_credentials
, optional aws_client_parameters
, optional key
argument, and an optional transfer_config
object. It should return the key of the uploaded object.
The task should be similar to s3_upload
but for files instead of file-like objects.
If you'd like to help contribute this, please let us know in the comments below, thanks! We're also happy to provide further guidance if desired.
References:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html
https://stackoverflow.com/questions/50105094/python-upload-large-files-s3-fast
Implement change to template in PrefectHQ/prefect-collection-template#42
Deployment push (during build) and pull (when agent runs flow) using S3 block is dependent on region set in environment (AWS_REGION or ~/.aws/config). If S3 bucket is in different region, push/pull fails with "Access denied".
Set AWS_REGION when deploying or running agent (problematic when multiple regions are used)
I expect to be able to use the credentials
field along with the write_path
and read_path
methods. When I use credentials
, I get the following error: ValueError: S3 Bucket requires either a minio_credentialsfield or an aws_credentials field.
def _get_bucket_resource(self) -> boto3.resource:
"""
Retrieves boto3 resource object for the configured bucket
"""
if self.minio_credentials:
params_override = (
self.minio_credentials.aws_client_parameters.get_params_override()
)
if "endpoint_url" not in params_override and self.endpoint_url:
params_override["endpoint_url"] = self.endpoint_url
bucket = (
self.minio_credentials.get_boto3_session()
.resource("s3", **params_override)
.Bucket(self.bucket_name)
)
elif self.aws_credentials:
params_override = (
self.aws_credentials.aws_client_parameters.get_params_override()
)
bucket = (
self.aws_credentials.get_boto3_session()
.resource("s3", **params_override)
.Bucket(self.bucket_name)
)
else:
> raise ValueError(
"S3 Bucket requires either a minio_credentials"
"field or an aws_credentials field."
)
E ValueError: S3 Bucket requires either a minio_credentialsfield or an aws_credentials field.
When you create a task definition using ECS block and add task definition by default prefect add second container definition called "prefect". The system should not be adding the default prefect container definition if you add your container definition with the appropriate values.
The other issue that occurs is when you add value to the name field the task deinition will still be set as"prefect" instead of the name you entered.
{
"containerDefinitions": [
{
"name": "test_util_ecs_job",
"image": "xxx.dkr.ecr.eu-west-1.amazonaws.com/test_util",
"cpu": 0,
"portMappings": [],
"essential": true,
"environment": [
{
"name": "test",
"value": "TEST"
}
],
"mountPoints": [],
"volumesFrom": []
},
{
"name": "prefect",
"image": "xxx.ecr.eu-west-1.amazonaws.com/test_util",
"cpu": 0,
"portMappings": [],
"essential": true,
"environment": [],
1. Create an ECS block,
2. Add task definition (I didn't use the task arn field for this one)
3. Create a flow run
4. Examine the task definition and see that the default prefect definition is still present
No response
Version:
Prefect 2.7.8
Python: 3.11.1
Prefect-aws: 0.2.3
No response
When enabling CloudWatch logs for flows executing in an ECSTask infra block, it generates a new CW log group of "prefect." I would like to provide a reference to an existing log group and specify the log prefix for new streams.
When running a deployment that uses an ECSTask block the Prefect agent is able to successfully retrieve the associated task definition, however upon registering the task definition using the returned json object a ParamValidationError
is thrown from the underlying botocore library for the following fields in the task definition json: requiresAttributes, registeredAt, registeredBy.
Stack trace from the Prefect 2 agent:
13/10/2022, 09:33:19 | 08:33:19.822 \| INFO \| prefect.agent - Completed submission of flow run 'ad1f930d-875a-4878-99ed-36641a16f788' | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.821 \| ERROR \| prefect.agent - Failed to submit flow run 'ad1f930d-875a-4878-99ed-36641a16f788' to infrastructure. | prefect-2-agent
13/10/2022, 09:33:19 | Traceback (most recent call last): | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 233, in _submit_run_and_capture_errors | prefect-2-agent
13/10/2022, 09:33:19 | result = await infrastructure.run(task_status=task_status) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 362, in run | prefect-2-agent
13/10/2022, 09:33:19 | ) = await run_sync_in_worker_thread( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread | prefect-2-agent
13/10/2022, 09:33:19 | return await anyio.to_thread.run_sync(call, cancellable=True) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync | prefect-2-agent
13/10/2022, 09:33:19 | return await get_asynclib().run_sync_in_worker_thread( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread | prefect-2-agent
13/10/2022, 09:33:19 | return await future | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run | prefect-2-agent
13/10/2022, 09:33:19 | result = context.run(func, *args) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 441, in _create_task_and_wait_for_start | prefect-2-agent
13/10/2022, 09:33:19 | task_definition_arn = self._register_task_definition( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 818, in _register_task_definition | prefect-2-agent
13/10/2022, 09:33:19 | response = ecs_client.register_task_definition(**task_definition_request) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 508, in _api_call | prefect-2-agent
13/10/2022, 09:33:19 | return self._make_api_call(operation_name, kwargs) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 878, in _make_api_call | prefect-2-agent
13/10/2022, 09:33:19 | request_dict = self._convert_to_request_dict( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 939, in _convert_to_request_dict | prefect-2-agent
13/10/2022, 09:33:19 | request_dict = self._serializer.serialize_to_request( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/validate.py", line 381, in serialize_to_request | prefect-2-agent
13/10/2022, 09:33:19 | raise ParamValidationError(report=report.generate_report()) | prefect-2-agent
13/10/2022, 09:33:19 | botocore.exceptions.ParamValidationError: Parameter validation failed: | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "requiresAttributes", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "registeredAt", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "registeredBy", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.819 \| DEBUG \| prefect.infrastructure.ecs-task - Task definition payload | prefect-2-agent
...
13/10/2022, 09:33:19 | 08:33:19.813 \| INFO \| prefect.infrastructure.ecs-task - ECSTask 'cocky-bulldog': Registering task definition... | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.765 \| INFO \| prefect.infrastructure.ecs-task - ECSTask 'cocky-bulldog': Retrieving task definition 'manual_hello_world_test'... | prefect-2-agent
It appears to be an issue on the AWS SDK side where some fields in the response from describe-task-definition are not compatible in a call to register-task-definition (aws/aws-sdk#38, aws/aws-sdk#406). The current workaround appears to be removing these fields when registering the task.
Steps to Reproduce:
We are currently defining our ECS task definitions within terraform, however I have manually created a new ECS task from within the ECS GUI. Viewing the Overview for this task in the GUI and clicking on the JSON tab you can see that the requiresAttributes, registeredAt and registeredBy are all defined.
Our ECSTask block is defined with the name of our ECS cluster and we use what AWS call the 'Task definition family' for the 'task_definition_arn' argument which is correctly translated to the full arn.
There doesn't appear to a fix coming from the AWS SDK side, so this might need to be something that is handled on the Prefect side.
Using Prefect version 2.6.5.
Prefect Agent is running in ECS.
When agent is trying to submit a flow run using ECSTask block:
Flow log does not contain anything, got the error on email notification.
State message: Submission failed. TypeError: Infrastructure.prepare_for_flow_run() got an unexpected keyword argument 'deployment'
Agent Logs:
| 2022-12-05T13:59:54.323+02:00 | 11:59:54.322 | INFO | prefect.agent - Submitting flow run '<flow_id>'
| 2022-12-05T14:00:00.194+02:00 | 12:00:00.194 | ERROR | prefect.agent - Failed to get infrastructure for flow run '<flow_id>'.
| 2022-12-05T14:00:00.194+02:00 | Traceback (most recent call last):
| 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 229, in submit_run
| 2022-12-05T14:00:00.194+02:00 | infrastructure = await self.get_infrastructure(flow_run)
| 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 218, in get_infrastructure
| 2022-12-05T14:00:00.194+02:00 | prepared_infrastructure = infrastructure_block.prepare_for_flow_run(flow_run)
| 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 485, in prepare_for_flow_run
| 2022-12-05T14:00:00.194+02:00 | new = super().prepare_for_flow_run(flow_run, deployment=deployment, flow=flow)
| 2022-12-05T14:00:00.194+02:00 | TypeError: Infrastructure.prepare_for_flow_run() got an unexpected keyword argument 'deployment'
Same code was working until Friday 2022/12/02.
Implement change to template in PrefectHQ/prefect-collection-template#43
This is similar to issue found in the prefect
repo (PrefectHQ/prefect#7583), where AttributeError
is encountered when trying to build deployment with s3-bucket
storage block type. As I mentioned in the other issue S3Bucket
attribute of basepath
is pathlib.Path
type not str
, which caused the error I think:
Lines 272 to 276 in 519105b
A customer experiences intermittent failure with ECS task registration. The error raised is a timeout, more explicit errors would be desirable.
I would like to pass an existing Security Group(s) to enable communication between Orion and generated ECS tasks and other network-isolated services in my VPCs.
Trying to use S3Bucket.read_path
or S3Bucket.write_path
in a sync context raises TypeError: cannot pickle 'coroutine' obejct
If I install prefect-aws
in editable mode and add the sync_compatible
decorator to each method, things work as expected.
from prefect import flow, task
from prefect_aws.s3 import S3Bucket
@task
def testing():
my_bucket = S3Bucket.load("test-bucket")
path = my_bucket.read_path("requirements.txt")
print(path, type(path))
return path
@flow(name="example_flow")
def test():
result = testing()
if __name__ == "__main__":
test()
❯ python return_state.py
14:47:28.885 | INFO | prefect.engine - Created flow run 'mini-anaconda' for flow 'example_flow'
14:47:29.752 | INFO | Flow run 'mini-anaconda' - Created task run 'testing-3152b825-0' for task 'testing'
14:47:29.754 | INFO | Flow run 'mini-anaconda' - Executing 'testing-3152b825-0' immediately...
<coroutine object S3Bucket.write_path at 0x7f91e015f7c0> <class 'coroutine'>
14:47:30.080 | ERROR | Flow run 'mini-anaconda' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "return_state.py", line 19, in test
print(result.result())
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/task_runners.py", line 203, in submit
result = await call()
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1104, in begin_task_run
return await orchestrate_task_run(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1233, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/states.py", line 130, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'coroutine' object
14:47:30.089 | INFO | Task run 'testing-3152b825-0' - Crash detected! Execution was interrupted by an unexpected exception.
14:47:30.268 | ERROR | Flow run 'mini-anaconda' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
File "return_state.py", line 22, in <module>
test()
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/flows.py", line 388, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 159, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run
return state.result()
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "return_state.py", line 19, in test
print(result.result())
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/task_runners.py", line 203, in submit
result = await call()
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1104, in begin_task_run
return await orchestrate_task_run(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1233, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/states.py", line 130, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'coroutine' object
Hey Prefect team!
I've stumbled upon this Pipenv issue. When trying to execute the following:
pipenv install prefect-aws
It installs v0.1.2, but when pipenv is generating the lockfile, it has a botocore==1.24.21
subdependency. I see that in this repo, it should be botocore==1.27.53
. There's a mismatch between this repo and the Pypi one perhaps? Because the v0.1.2 version in Pypi was updated last Aug 3, but the requirements.txt in this repo was updated last Aug 18.
Thanks in advance.
They are a lot of alternatives to AWS S3 like minio which are used in the enterprise. To allow this module to work it would be nice to allow to pass to the S3 task a dictionary that would be used when we initialize the S3 client. For example from boto3 doc
This would allow as well to set up the verification when we are using a custom CA Bundle. https://github.com/boto/boto3/blob/develop/boto3/session.py.
If you are happy with that change I can create a PR for it.
When using the pause_flow_run function with argument reschedule=True on a flow with an ECS Task infrastructure block the expectation is that when paused the ECS task will be deprovisioned and the flow will remain in a Paused state. When resumed a new ECS task will be provisioned and run the remaining flow tasks (similar to raise PAUSE in 1.0).
What I am seeing is that when the flow hits the Paused state the ECS task deprovisions and then the flow moves from a Paused state to a Crashed state. The final State Message is "Flow run infrastructure exited with non-zero status code -1.". The last log message on the ECS task is "21:55:46.456 | INFO | prefect.engine - Engine execution of flow run '09d19369-66ad-4f96-837a-09b713e3ff46' is paused:"
I am using a ECS Task infrastructure block on Fargate. There doesn't seem to be any error other than that cloud is expecting the infrastructure to still be present and it is not.
Simple example flow code I am using:
from prefect import get_run_logger
from prefect import flow, task, pause_flow_run
@task(name="First Task", persist_result=True)
def first_task():
logger=get_run_logger()
logger.info('first task')
return None
@task(name="Second Task", persist_result=True)
def second_task():
logger=get_run_logger()
logger.info('second task')
return None
@flow(name="Test pause", result_storage='s3/prefect-results')
def test_pause_flow():
first_result = first_task()
pause_flow_run(reschedule=True, timeout=64800)
second_result = second_task(wait_for=[first_result])
The ECS task definition was created using this terraform: https://github.com/PrefectHQ/prefect-recipes/tree/main/devops/infrastructure-as-code/aws/tf-prefect2-ecs-agent
AWS will rate limit task definition registration for a given task family. There are two factors that make it easy for a relatively low volume of Prefect flows to hit rate limits:
RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Solution:
I feel like creating task families per deployment will be the easiest way around this, as this reduces the number of registrations for a given family. However this does not fully resolve the issue. It is nice to see a new log stream for every flow run, however the need to register task definitions introduces a chance for rate limits to be hit.
Presently, for an ECSTask block, we can only provide the VPC ID. I do not want these flows running on public subs, so it would be awesome to specify subnet IDs.
Previously I used S3(bucket_path="s3-bucket/my-dir")
for deployments which worked as expected.
Now I use S3Bucket(bucket_name="s3-bucket", basepath="my-dir")
and the files are saved under s3-bucket/my-dir/my-dir
instead of s3-bucket/my-dir
.
When deploying a flow with S3Bucket the put_directory
method is called with to_path=None
which is then resolved to basepath:
# put_directory
if to_path is None:
to_path = str(self.basepath) if self.basepath is not None else ""
put_directory
then calls the write_path
method, which resolves the path again:
# write_path
path = self._resolve_path(path)
But path already includes the basepath
# _resolve_path
path = str(Path(self.basepath) / path) if self.basepath else path
I tried to changed it either in put_directory
or write_path
but then the other tests are failing.
Maybe S3Bucket should not use basepath
directly and instead use an individual parameter like S3 did with bucket_path
?
Also might be worth to include tests for the Deployment using S3Bucket?
I am looking forward to the discussion and would like to take part in the solution.
botocore.errorfactory.ResourceNotFoundException: An error occurred (ResourceNotFoundException)
when calling the GetLogEvents operation: The specified log stream does not exist.
Replicating a report from a user in the community: https://prefect-community.slack.com/archives/CL09KU1K7/p1671026163262599
I've seen a couple reports of this. Perhaps the log stream takes a bit to be populated? It'd be good to retry here. Unclear if this is an issue with the way the users are configuring their task definitions without more details.
botocore.config.Config.signature_version
uses strings to identify signers and a singleton object, botocore.UNSIGNED
, to prevent signing. ClientParameters.config
does not have an option to pass the singleton instead of a string.
{"signature_version": "unsigned"} passes botocore.UNSIGNED
to botocore.config.Config
instead of "unsigned".
It appears as though a cryptic error is thrown if no credentials are defined in an S3 block and no credntials are defined locally either:
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 986, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
File "/usr/lib/python3.10/asyncio/base_events.py", line 1049, in create_connection
sock = await self._connect_sock(
File "/usr/lib/python3.10/asyncio/base_events.py", line 960, in _connect_sock
await self.sock_connect(sock, address)
File "/usr/lib/python3.10/asyncio/selector_events.py", line 499, in sock_connect
return await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/client.py", line 535, in _request
conn = await self._connector.connect(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 1175, in _create_direct_connection
transp, proto = await self._wrap_create_connection(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 985, in _wrap_create_connection
async with ceil_timeout(timeout.sock_connect):
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/async_timeout/__init__.py", line 129, in __aexit__
self._do_exit(exc_type)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/async_timeout/__init__.py", line 212, in _do_exit
raise asyncio.TimeoutError
asyncio.exceptions.TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 178, in send
response = await self._session.request(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/client.py", line 539, in _request
raise ServerTimeoutError(
aiohttp.client_exceptions.ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/cli/deployment.py", line 577, in build
deployment = await Deployment.build_from_flow(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 598, in build_from_flow
await deployment.upload_to_storage()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 465, in upload_to_storage
file_count = await self.storage.put_directory(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 459, in put_directory
return await self.filesystem.put_directory(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 342, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
raise return_result
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 1015, in _put_file
await self._call_s3(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 325, in _call_s3
await self.set_session()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 473, in set_session
self._s3 = await s3creator.__aenter__()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 22, in __aenter__
self._client = await self._coro
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 102, in _create_client
credentials = await self.get_credentials()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 133, in get_credentials
self._credentials = await (self._components.get_component(
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/credentials.py", line 814, in load_credentials
creds = await provider.load()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/credentials.py", line 486, in load
metadata = await fetcher.retrieve_iam_role_credentials()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/utils.py", line 175, in retrieve_iam_role_credentials
token = await self._fetch_metadata_token()
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/utils.py", line 88, in _fetch_metadata_token
response = await session.send(request.prepare())
File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 210, in send
raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"
An exception occurred.
ECS block is missing a field for ephemeralStorage. It would be nice if we could get that added as an optional field. This is very useful if you want to adjust the default size from 20GB up to 200GB for a fargate instance
Currently you have to add the value as an task definition, this is not documented in the prefect-aws docs or in Anna's examples:
task_definition = {ephemeralStorage: {'sizeInGiB': disk}}
Tests are failing in CI with the following error:
__________________________ test_launch_types[FARGATE] __________________________
aws_credentials = AwsCredentials(aws_access_key_id='access_key_id', aws_secret_access_key=SecretStr('**********'), aws_session_token=None, profile_name=None, region_name='us-east-1')
launch_type = 'FARGATE'
@pytest.mark.usefixtures("ecs_mocks")
@pytest.mark.parametrize("launch_type", ["EC2", "FARGATE", "FARGATE_SPOT"])
async def test_launch_types(aws_credentials, launch_type: str):
task = ECSTask(
aws_credentials=aws_credentials,
auto_deregister_task_definition=False,
command=["prefect", "version"],
launch_type=launch_type,
)
print(task.preview())
session = aws_credentials.get_boto3_session()
ecs_client = session.client("ecs")
> task_arn = await run_then_stop_task(task)
...
self = <moto.ecs.models.Task object at 0x7fce6af6cf10>
cluster = <moto.ecs.models.Cluster object at 0x7fce65f7d190>
task_definition = <moto.ecs.models.TaskDefinition object at 0x7fce691a6e10>
container_instance_arn = 'arn:aws:ecs:us-east-1:123456789012:container-instance/default/e2f46c39-c747-4a71-af62-7426d6d1c898'
resource_requirements = {'CPU': 0, 'MEMORY': 0, 'PORTS': [], 'PORTS_UDP': []}
backend = <moto.ecs.models.EC2ContainerServiceBackend object at 0x7fce6aaa9610>
launch_type = 'FARGATE'
overrides = {'containerOverrides': [{'command': ['prefect', 'version'], 'environment': [{'name': 'PREFECT_ORION_DATABASE_CONNECTION_URL', 'value': 'sqlite+aiosqlite:////tmp/tmp9xu65h5q/orion.db'}], 'name': 'prefect'}]}
started_by = '', tags = []
networking_configuration = {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-5b085f7a', 'subnet-57e3fb65', 'subnet-544405f4', 'subnet-7be12aee', 'subnet-c47aeb59', 'subnet-0b5523d7']}}
def __init__(
self,
cluster,
task_definition,
container_instance_arn,
resource_requirements,
backend,
launch_type="",
overrides=None,
started_by="",
tags=None,
networking_configuration=None,
):
self.id = str(mock_random.uuid4())
self.cluster_name = cluster.name
self.cluster_arn = cluster.arn
self.container_instance_arn = container_instance_arn
self.last_status = "RUNNING"
self.desired_status = "RUNNING"
self.task_definition_arn = task_definition.arn
self.overrides = overrides or {}
self.containers = []
self.started_by = started_by
self.tags = tags or []
self.launch_type = launch_type
self.stopped_reason = ""
self.resource_requirements = resource_requirements
self.region_name = cluster.region_name
self._account_id = backend.account_id
self._backend = backend
self.attachments = []
if task_definition.network_mode == "awsvpc":
if not networking_configuration:
raise InvalidParameterException(
"Network Configuration must be provided when networkMode 'awsvpc' is specified."
)
self.network_configuration = networking_configuration
net_conf = networking_configuration["awsvpcConfiguration"]
ec2_backend = ec2_backends[self._account_id][self.region_name]
eni = ec2_backend.create_network_interface(
subnet=net_conf["subnets"][0],
private_ip_address=random_private_ip(),
> group_ids=net_conf["securityGroups"],
description="moto ECS",
)
E KeyError: 'securityGroups'
Originally posted by @ahuang11 in #125 (comment)
Using the _retrieve_task_definition classmethod to fetch a task definition from aws renders the below, there are 6 fields that are created every time a new task definition is registered which will cause diffing logic to detect drift every single time a task definition is registered. These fields are registeredBy, registeredAt, requiresAttributes, placementConstraints, status and revision. If these fields are stripped from the retrieved task definition we can effectively reduce the need to register new task definitions and prevent AWS rate limits and other strange boto behavior:
{'taskDefinitionArn': 'arn:aws:ecs:us-east-2:455346737763:task-definition/prefect-agent-prod-fargate-prefect-agent:2',
'containerDefinitions': [{'name': 'prefect-agent-prod-fargate-prefect-agent',
'image': 'prefecthq/prefect:2-python3.10',
'cpu': 1024,
'memory': 2048,
'portMappings': [],
'essential': True,
'command': ['prefect', 'agent', 'start', '-q', 'prod-ecs'],
'environment': [{'name': 'PREFECT_API_URL',
'value': 'https://api.prefect.cloud/api/accounts/0ff44498-d380-4d7b-bd68-9b52da03823f/workspaces/bb3005b9-99c6-4289-802b-6cfbfcffc5c0'},
{'name': 'EXTRA_PIP_PACKAGES', 'value': 'prefect-aws s3fs'}],
'mountPoints': [],
'volumesFrom': [],
'secrets': [{'name': 'PREFECT_API_KEY',
'valueFrom': 'OBFUSCATED'}],
'logConfiguration': {'logDriver': 'awslogs',
'options': {'awslogs-group': 'prefect-agent-log-group-prod',
'awslogs-region': 'us-east-2',
'awslogs-stream-prefix': 'prefect-agent-prod-fargate-prefect-agent'}}}],
'family': 'prefect-agent-prod-fargate-prefect-agent',
'executionRoleArn': 'arn:aws:iam::455346737763:role/prefect-agent-execution-role-prod',
'networkMode': 'awsvpc',
'revision': 2,
'volumes': [],
'status': 'ACTIVE',
'requiresAttributes': [{'name': 'com.amazonaws.ecs.capability.logging-driver.awslogs'},
{'name': 'ecs.capability.execution-role-awslogs'},
{'name': 'com.amazonaws.ecs.capability.docker-remote-api.1.19'},
{'name': 'ecs.capability.secrets.asm.environment-variables'},
{'name': 'com.amazonaws.ecs.capability.docker-remote-api.1.18'},
{'name': 'ecs.capability.task-eni'}],
'placementConstraints': [],
'compatibilities': ['EC2', 'FARGATE'],
'requiresCompatibilities': ['FARGATE'],
'cpu': '1024',
'memory': '2048',
'registeredAt': datetime.datetime(2022, 12, 19, 16, 36, 58, 506000, tzinfo=tzlocal()),
'registeredBy': 'arn:aws:sts::455346737763:assumed-role/AWSReservedSSO_AdministratorAccess_fe1980cf6870d6ed/[email protected]'}
Line 394 in 7748ffe
Read/write from Windows OS produced object keys with backslashes and no folder hierarchy. This was due to pathlib
inferring backslashes on Windows. Backslashes are frowned upon in AWS Object key naming guidelines.
Explicitly madke Path
return a POSIX path string.
# If basepath provided, it means we won't write to the root dir of
# the bucket. So we need to add it on the front of the path.
#
# AWS object key naming guidelines require '/'. Get POSIX path to prevent
# Path from inferring '\' on Windows OS
path = (Path(self.basepath) / path).as_posix() if self.basepath else path
During my migration to 2.0 I have noticed that if multiple flows are scheduled at the same time.
Eg. they all have a schedule of 0 * * * *
Only one or two of the flows will run, and the rest will remain in the Pending
state forever.
I currently have one agent and one queue setup.
I am going to explore and see if setting up multiple agents may solve the problem.
Intermittently when Running a flow using the ECStask block the run appears to fail with this error appearing in the agent logs IndexError: list index out of range
. This seems to be occurring before the flow run is actually submitted so it doesn't appear tied to the flow code itself.
Agent Logs:
09:00:00.595 | ERROR | prefect.agent - Failed to submit flow run 'XXXXXXXXXXX' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 257, in _submit_run_and_capture_errors
result = await infrastructure.run(task_status=task_status)
File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 456, in run
) = await run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 561, in _create_task_and_wait_for_start
self._wait_for_task_start(
File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 744, in _wait_for_task_start
for task in self._watch_task_run(
File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 712, in _watch_task_run
task = ecs_client.describe_tasks(tasks=[task_arn], cluster=cluster_arn)[
IndexError: list index out of range
If we want to use an image from a task-def ARN, we must set image=None
in the ECSTask class, but I got botocore.errorfactory. ClientException: When calling the RegisterTaskDefinition operation, an error (ClientException) occurred: The container.image should not be null or empty
.
This problem was resolved by changing the container name in task-def to prefect. There was no mention of this in the documentation. I'm not sure if this will be a problem for some users.
Here is the ECS block code:
> ecs = ECSTask(
> task_definition_arn= "arn:aws:ecs:us-west-2:acc_no:task-definition/hello_world",
> cluster= "Data-Pipelines",
> image= None,
> task_customizations=[{
> "op": "add",
> "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
> "value": [
> "sg-0d483b5145df02ea9"
> ]
> }],
> auto_deregister_task_definiation= True,
> vpc_id= aws_vpc_id,
> stream_output= True,
> configure_cloudwatch_log= True,
> task_start_timeout_seconds= "120"
> )
> ecs.save("proto", overwrite= True)
Here is the task definition:
> {
> "family": "hello_world",
> "executionRoleArn": "arn:aws:iam::acc_no:role/ecsTaskExecutionRole",
> "taskRoleArn": "arn:aws:iam::acc_no:role/ecsTaskRole",
> "networkMode": "awsvpc",
> "containerDefinitions": [{
> "name": "prefect",
> "image": "acc_no.dkr.ecr.aws_region.amazonaws.com/hello_world:latest",
> "essential": true,
> "logConfiguration": {
> "logDriver": "awslogs",
> "secretOptions": null,
> "options": {
> "awslogs-group": "/ecs/pipelines",
> "awslogs-region": "aws_region",
> "awslogs-stream-prefix": "ecs"
> }
> }
> }],
> "requiresCompatibilities": [
> "FARGATE"
> ],
> "runtimePlatform": {
> "operatingSystemFamily": "LINUX",
> "cpuArchitecture": "ARM64"
> },
> "cpu": "1024",
> "memory": "2048"
> }
Deployment:
prefect deployment build main.py:maintenance -q smo-proto -a -n ecs-task-test -ib ecs-task/proto --path /app
and running the deployment using the UI
Attaching a screenshot of the task definitions it has been creating:
I have started working on prefect with task-def v8 for every run it creates new task-def
@anna-geller Let me know if you need anything else
ECSTask
sometimes creates a new task definition based on a copy of an existing task definition.
To avoid task creation errors, ECSTask
excludes several properties that sometimes get copied over from existing tasks.
registeredAt
is part of the exclusion list, but deregisteredAt
is not - which causes errors when a copied task contains deregisteredAt
(see the example below).
To prevent this error, ECSTask
should add deregisteredAt
to its exclusion list.
Submission failed. botocore.exceptions.ParamValidationError: Parameter validation failed: Unknown parameter in input: "deregisteredAt", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform
The way the code is, AWS credentials must be supplied (see issue #102)
It should be a fairly simple enhancement - e.g. I wrote my own s3_list_objects() and merely had to replace
s3_client = aws_credentials.get_boto3_session().client(...)
with
s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED))
When using an ECSTask if you specify a task definition ARN prefect will replace its image with the base prefect image unless you pass the image parameter to the ECSTask.
If I have prefect working using an aws s3 storage block, and then install prefect_aws, I can no longer use the aws s3 block.
Presumably this is due to some aws package incompatibilities. Here is an example of it occurring:
Starting from a clean virtual environment, on ubuntu 20.22
python3 -m venv testv
# make sure you modify .prefectignore to exclude this, other
# wise you'll upload whole venv!
source ./testv/bin/activate
pip install -U pip
pip install prefect s3fs
prefect deployment build ./log_flow.py:log_flow \
-n s3test \
-sb s3/scratchs3/ptut/ \
-q test \
--param name="test run"
This works fine.
Now, if I install prefect_aws:
pip install prefect_aws
prefect deployment build ./log_flow.py:log_flow \
-n s3test \
-sb s3/scratchs3/ptut/ \
-q test \
--param name="test run"
I get the following:
testv/lib/python3.9/site-packages/botocore/utils.py:1720: FutureWarning: The S3RegionRedirector class has been deprecated for a new internal replacement. A future version of botocore may remove this class.
warnings.warn(
Traceback (most recent call last):
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/s3fs/core.py", line 112, in _error_wrapper
return await func(*args, **kwargs)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/aiobotocore/client.py", line 358, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.NoSuchBucket: An error occurred (NoSuchBucket) when calling the PutObject operation: The specified bucket does not exist
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 230, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 181, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/cli/deployment.py", line 1097, in build
deployment = await Deployment.build_from_flow(
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/deployments.py", line 764, in build_from_flow
await deployment.upload_to_storage(ignore_file=ignore_file)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/deployments.py", line 609, in upload_to_storage
file_count = await self.storage.put_directory(
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/filesystems.py", line 494, in put_directory
return await self.filesystem.put_directory(
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/prefect/filesystems.py", line 370, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/fsspec/asyn.py", line 115, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/fsspec/asyn.py", line 100, in sync
raise return_result
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/fsspec/asyn.py", line 55, in _runner
result[0] = await coro
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/s3fs/core.py", line 1110, in _put_file
await self._call_s3(
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/s3fs/core.py", line 347, in _call_s3
return await _error_wrapper(
File "/path/to/project/prefect_tutorial/testv/lib/python3.9/site-packages/s3fs/core.py", line 139, in _error_wrapper
raise err
FileNotFoundError: The specified bucket does not exist
An exception occurred.
Only thing that changed was the aws/boto packages.
The difference before and after for me looks like:
13c13,14
< botocore==1.27.59
---
> boto3==1.26.86
> botocore==1.29.86
51a53,54
> mypy-boto3-s3==1.26.62
> mypy-boto3-secretsmanager==1.26.49
57a61
> prefect-aws==0.2.4
77a82
> s3transfer==0.6.0
So botocore upgraded, and some new packages.
My complete working (pre prefect_aws) environment looks like:
on 🌱 main [!+?⇡] via 🐍 v3.9.16 (.venv39) ❯ cat pre_prefect_aws.txt
aiobotocore==2.4.2
aiohttp==3.8.4
aioitertools==0.11.0
aiosignal==1.3.1
aiosqlite==0.18.0
alembic==1.10.1
anyio==3.6.2
apprise==1.3.0
asgi-lifespan==2.0.0
async-timeout==4.0.2
asyncpg==0.27.0
attrs==22.2.0
botocore==1.27.59
cachetools==5.3.0
certifi==2022.12.7
cffi==1.15.1
charset-normalizer==3.1.0
click==8.1.3
cloudpickle==2.2.1
colorama==0.4.6
coolname==2.2.0
croniter==1.3.8
cryptography==39.0.2
dateparser==1.1.7
docker==6.0.1
fastapi==0.93.0
frozenlist==1.3.3
fsspec==2023.3.0
google-auth==2.16.2
greenlet==2.0.2
griffe==0.25.5
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==0.16.3
httpx==0.23.3
hyperframe==6.0.1
idna==3.4
importlib-metadata==6.0.0
Jinja2==3.1.2
jmespath==1.0.1
jsonpatch==1.32
jsonpointer==2.3
jsonschema==4.17.3
kubernetes==26.1.0
Mako==1.2.4
Markdown==3.4.1
markdown-it-py==2.2.0
MarkupSafe==2.1.2
mdurl==0.1.2
multidict==6.0.4
oauthlib==3.2.2
orjson==3.8.7
packaging==23.0
pathspec==0.11.0
pendulum==2.1.2
prefect==2.8.4
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.21
pydantic==1.10.5
Pygments==2.14.0
pyrsistent==0.19.3
python-dateutil==2.8.2
python-slugify==8.0.1
pytz==2022.7.1
pytz-deprecation-shim==0.1.0.post0
pytzdata==2020.1
PyYAML==6.0
readchar==4.0.3
regex==2022.10.31
requests==2.28.2
requests-oauthlib==1.3.1
rfc3986==1.5.0
rich==13.3.2
rsa==4.9
s3fs==2023.3.0
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.46
starlette==0.25.0
text-unidecode==1.3
toml==0.10.2
typer==0.7.0
typing_extensions==4.5.0
tzdata==2022.7
tzlocal==4.2
urllib3==1.26.14
uvicorn==0.20.0
websocket-client==1.5.1
websockets==10.4
wrapt==1.15.0
yarl==1.8.2
zipp==3.15.0
I didn't expect installing that package would break prefect. I propose we work out what the conflicts are, and either
specify acceptable versions, or work out why it no longer can find the bucket and fix it.
Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link
. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.
Users would like to adopt prefect2 with similar task support for AWS services as Prefect 1. Specifically requests for ClientWaiter and BatchSubmit
Source here: https://github.com/PrefectHQ/prefect/tree/master/tests/tasks/aws
Hi, when I use S3 Bucket (Minio) on prefect I got an error like:
botocore.exceptions.NoCredentialsError: Unable to locate credentials
After some investigation, I find that this line:
Line 305 in 7a17997
If I change this line into:
credentials: Optional[MinIOCredentials] = Field(
Things will be Ok.
Blocks:
My python version: Python 3.10.10
prefect version: ==2.8.0
prefect-aws version: commit 7a17997
from prefect import flow, task
from prefect_aws.s3 import S3Bucket
from prefect_aws import MinIOCredentials
@flow
def my_favorite_function():
minio_credentials_block = MinIOCredentials.load("testblock")
s3_bucket_block = S3Bucket(
bucket_name="testblock",
minio_credentials=minio_credentials_block
)
print(s3_bucket_block.list_objects(""))
return "test"
print(my_favorite_function())
If I wanted to access a public S3 bucket (or needed to change something with the s3 client) using S3Bucket, there is no way to do so because the calls to aws_credentials.get_boto3_session().client() don't pass in **aws_client_parameters.get_params_override() like e.g. s3_download() does.
Using ECSTask - If I leave VPC Id configuration out of the infra block, then the tasks will run in the default VPC.
However for a managed VPC - if I set that id, the agent reports the following botocore error:
botocore.errorfactory.InvalidParameterException:
An error occurred (InvalidParameterException) when calling the RunTask operation:
At least one security group must be supplied when specifying subnets that are owned by a different account.
There is no way to configure the secruity group/subnets currently in 2.4.0 - which was possible in prefect 1 through run_task_kwargs
Ideally, I'd like to use the existing VPC by configuring the security group and subnets.
Currently, we only expose the first three positional args,Fileobj
, Bucket
, Key
, but we should expose other keyword arguments as well.
When I try to use the ignore_file option in S3Bucket.put_directory
it skips files which should be included. As an example I modified one test and simply added an empty .prefectignore
file. No files are uploaded then.
async def test_put_directory_respects_local_path(
s3_bucket: S3Bucket, tmp_path: Path, aws_creds_block
):
(tmp_path / "file1.txt").write_text("FILE 1")
(tmp_path / "file2.txt").write_text("FILE 2")
(tmp_path / "folder1").mkdir()
(tmp_path / "folder1" / "file3.txt").write_text("FILE 3")
(tmp_path / "folder1" / "file4.txt").write_text("FILE 4")
(tmp_path / "folder1" / "folder2").mkdir()
(tmp_path / "folder1" / "folder2" / "file5.txt").write_text("FILE 5")
(tmp_path / ".prefectignore").write_text("") # ADDED
uploaded_file_count = await s3_bucket.put_directory(
local_path=str(tmp_path / "folder1"),
ignore_file=str(tmp_path / ".prefectignore"), # ADDED
)
assert uploaded_file_count == 3 # uploaded_file_count equals 0
Since s3.upload_fileobj
is able to accept file-like objects as seen in the docs, we should match that within our s3_upload
task too.
At the moment, it converts input data bytes
into a BytesIO
stream. We should:
data
from bytes
to typing.IO
or typing.BinaryIO
boto3 clients allow additional configuration e.g. for retries, but modifying it on every client separately could lead to unnecessary repetition or a need to redeploy an agent.
We currently recommend doing that on the agent level via environment variables but it would be great if there was an option to modify those settings without having to redeploy an agent e.g. to be able to increase the number of retries or retry mode by changing this single config or custom block.
For instance, to allow client-side retries on ECS clients, we could pass this config on the ECSTask
block:
def _get_session_and_client(self) -> Tuple[boto3.Session, _ECSClient]:
"""
Retrieve a boto3 session and ECS client
"""
boto_session = self.aws_credentials.get_boto3_session()
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
ecs_client = boto_session.client("ecs", config=config)
return boto_session, ecs_client
However, users may want to globally configure retries for all failing AWS API calls (e.g. for S3, ECS and Batch clients).
Configure a separate block/class AwsConfig
:
ECSTask(
aws_client=AwsConfig(..., credentials=AwsCredentials(...))
)
# or:
def _get_session_and_client(self) -> Tuple[boto3.Session, _ECSClient]:
"""
Retrieve a boto3 session and ECS client
"""
boto_session = self.aws_credentials.get_boto3_session()
ecs_client = boto_session.client("ecs", config=self.config)
return boto_session, ecs_client
Related issue: PrefectHQ/prefect#7689
cc @madkinsz
@gabcoyne ran into the following error when attempting to use a S3Bucket
block document when creating a deployment:
AttributeError: 'S3Bucket' object has no attribute 'put_directory'
It appears that the deployments functionality expects a put_directory
method on storage blocks with the following signature:
async def put_directory(
self, local_path: str = None, to_path: str = None, ignore_file: str = None
) -> None:
Additionally, a get_directory
method is expected with the following signature:
async def get_directory(
self, from_path: str = None, local_path: str = None
) -> None:
S3Bucket
should be updated to conform to the expect interface.
If using a predefined task definition, you must set a container name to "prefect" for us to know which container in the definition should be used for deployment of your flow.
This is not clear in the documentation. Prose documentation would be helpful here.
The _version.py file generated by versioneer wasn't commited when updating the repo to match the collection template. Need to make sure it's added.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.