Code Monkey home page Code Monkey logo

prefect-aws's People

Contributors

ahuang11 avatar akdienes avatar anna-geller avatar chotalia avatar codiac-b avatar davzucky avatar dependabot[bot] avatar desertaxle avatar discdiver avatar dominictarro avatar drfraser avatar j-tr avatar jakekaplan avatar jamsi avatar jawnsy avatar jeanluciano avatar kevingrismore avatar markbruning avatar mattconger avatar neumann-nico avatar nick-amplify avatar prefect-collection-synchronizer[bot] avatar serinamarie avatar taylor-curran avatar tinvaan avatar tpdorsey avatar urimandujano avatar willraphaelson avatar zanieb avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-aws's Issues

ECS block should have an optional field for ephemeralStorage

Expectation / Proposal

ECS block is missing a field for ephemeralStorage. It would be nice if we could get that added as an optional field. This is very useful if you want to adjust the default size from 20GB up to 200GB for a fargate instance

Currently you have to add the value as an task definition, this is not documented in the prefect-aws docs or in Anna's examples:

task_definition = {ephemeralStorage: {'sizeInGiB': disk}}

Traceback / Example

ECS Task creation timeout logging

A customer experiences intermittent failure with ECS task registration. The error raised is a timeout, more explicit errors would be desirable.

Document "prefect" container name requirement for `ECSTask`

If using a predefined task definition, you must set a container name to "prefect" for us to know which container in the definition should be used for deployment of your flow.

This is not clear in the documentation. Prose documentation would be helpful here.

S3Bucket does not support custom AwsClientParameters

If I wanted to access a public S3 bucket (or needed to change something with the s3 client) using S3Bucket, there is no way to do so because the calls to aws_credentials.get_boto3_session().client() don't pass in **aws_client_parameters.get_params_override() like e.g. s3_download() does.

Agent submission to ECS fails with `prepare_for_flow_run() got an unexpected keyword argument 'deployment'`

Using Prefect version 2.6.5.
Prefect Agent is running in ECS.
When agent is trying to submit a flow run using ECSTask block:

Flow log does not contain anything, got the error on email notification.

State message: Submission failed. TypeError: Infrastructure.prepare_for_flow_run() got an unexpected keyword argument 'deployment'

Agent Logs:

| 2022-12-05T13:59:54.323+02:00 | 11:59:54.322 | INFO | prefect.agent - Submitting flow run '<flow_id>'
  | 2022-12-05T14:00:00.194+02:00 | 12:00:00.194 | ERROR | prefect.agent - Failed to get infrastructure for flow run '<flow_id>'.
  | 2022-12-05T14:00:00.194+02:00 | Traceback (most recent call last):
  | 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 229, in submit_run
  | 2022-12-05T14:00:00.194+02:00 | infrastructure = await self.get_infrastructure(flow_run)
  | 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 218, in get_infrastructure
  | 2022-12-05T14:00:00.194+02:00 | prepared_infrastructure = infrastructure_block.prepare_for_flow_run(flow_run)
  | 2022-12-05T14:00:00.194+02:00 | File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 485, in prepare_for_flow_run
  | 2022-12-05T14:00:00.194+02:00 | new = super().prepare_for_flow_run(flow_run, deployment=deployment, flow=flow)
  | 2022-12-05T14:00:00.194+02:00 | TypeError: Infrastructure.prepare_for_flow_run() got an unexpected keyword argument 'deployment'

Same code was working until Friday 2022/12/02.

Enable passing boto3 client configuration from a generic config class to all AWS blocks

Problem

boto3 clients allow additional configuration e.g. for retries, but modifying it on every client separately could lead to unnecessary repetition or a need to redeploy an agent.

We currently recommend doing that on the agent level via environment variables but it would be great if there was an option to modify those settings without having to redeploy an agent e.g. to be able to increase the number of retries or retry mode by changing this single config or custom block.

For instance, to allow client-side retries on ECS clients, we could pass this config on the ECSTask block:

    def _get_session_and_client(self) -> Tuple[boto3.Session, _ECSClient]:
        """
        Retrieve a boto3 session and ECS client
        """
        boto_session = self.aws_credentials.get_boto3_session()
        config = Config(
            retries = {
                'max_attempts': 10,
                'mode': 'standard'
            }
        )

        ecs_client = boto_session.client("ecs", config=config)
        return boto_session, ecs_client

However, users may want to globally configure retries for all failing AWS API calls (e.g. for S3, ECS and Batch clients).

Possible solution/option to consider

Configure a separate block/class AwsConfig:

ECSTask(
  aws_client=AwsConfig(..., credentials=AwsCredentials(...))
)

# or:

def _get_session_and_client(self) -> Tuple[boto3.Session, _ECSClient]:
    """
    Retrieve a boto3 session and ECS client
    """
    boto_session = self.aws_credentials.get_boto3_session()
    ecs_client = boto_session.client("ecs", config=self.config)
    return boto_session, ecs_client

Related issue: PrefectHQ/prefect#7689

cc @madkinsz

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

`read_path` and `write_path` fail when `credentials` attribute is used

Expectation / Proposal

I expect to be able to use the credentials field along with the write_path and read_path methods. When I use credentials, I get the following error: ValueError: S3 Bucket requires either a minio_credentialsfield or an aws_credentials field.

Traceback / Example

def _get_bucket_resource(self) -> boto3.resource:
        """
        Retrieves boto3 resource object for the configured bucket
        """
        if self.minio_credentials:
            params_override = (
                self.minio_credentials.aws_client_parameters.get_params_override()
            )
            if "endpoint_url" not in params_override and self.endpoint_url:
                params_override["endpoint_url"] = self.endpoint_url
            bucket = (
                self.minio_credentials.get_boto3_session()
                .resource("s3", **params_override)
                .Bucket(self.bucket_name)
            )
    
        elif self.aws_credentials:
            params_override = (
                self.aws_credentials.aws_client_parameters.get_params_override()
            )
            bucket = (
                self.aws_credentials.get_boto3_session()
                .resource("s3", **params_override)
                .Bucket(self.bucket_name)
            )
        else:
>           raise ValueError(
                "S3 Bucket requires either a minio_credentials"
                "field or an aws_credentials field."
            )
E           ValueError: S3 Bucket requires either a minio_credentialsfield or an aws_credentials field.

Make S3Bucket._resolve_path Windows-friendly

path = str(Path(bucket_folder) / path) if bucket_folder else path

Read/write from Windows OS produced object keys with backslashes and no folder hierarchy. This was due to pathlib inferring backslashes on Windows. Backslashes are frowned upon in AWS Object key naming guidelines.

Explicitly madke Path return a POSIX path string.

        # If basepath provided, it means we won't write to the root dir of
        # the bucket. So we need to add it on the front of the path.
        # 
        # AWS object key naming guidelines require '/'. Get POSIX path to prevent
        # Path from inferring '\' on Windows OS
        path = (Path(self.basepath) / path).as_posix() if self.basepath else path

Creates a new task-definition for every deployment run

Here is the ECS block code:

> ecs = ECSTask(
>         task_definition_arn= "arn:aws:ecs:us-west-2:acc_no:task-definition/hello_world",
>         cluster= "Data-Pipelines",
>         image= None,
>         task_customizations=[{
>             "op": "add",
>             "path": "/networkConfiguration/awsvpcConfiguration/securityGroups",
>             "value": [
>                 "sg-0d483b5145df02ea9"
>             ]
>         }],
>         auto_deregister_task_definiation= True,
>         vpc_id= aws_vpc_id,
>         stream_output= True,
>         configure_cloudwatch_log= True,
>         task_start_timeout_seconds= "120"
>     )
>     ecs.save("proto", overwrite= True)

Here is the task definition:

> {
>     "family": "hello_world",
>     "executionRoleArn": "arn:aws:iam::acc_no:role/ecsTaskExecutionRole",
>     "taskRoleArn": "arn:aws:iam::acc_no:role/ecsTaskRole",
>     "networkMode": "awsvpc",
>     "containerDefinitions": [{
>         "name": "prefect",
>         "image": "acc_no.dkr.ecr.aws_region.amazonaws.com/hello_world:latest",
>         "essential": true,
>         "logConfiguration": {
>             "logDriver": "awslogs",
>             "secretOptions": null,
>             "options": {
>                 "awslogs-group": "/ecs/pipelines",
>                 "awslogs-region": "aws_region",
>                 "awslogs-stream-prefix": "ecs"
>             }
>         }
>     }],
>     "requiresCompatibilities": [
>         "FARGATE"
>     ],
>     "runtimePlatform": {
>         "operatingSystemFamily": "LINUX",
>         "cpuArchitecture": "ARM64"
>       },
>     "cpu": "1024",
>     "memory": "2048"
> }

Deployment:
prefect deployment build main.py:maintenance -q smo-proto -a -n ecs-task-test -ib ecs-task/proto --path /app
and running the deployment using the UI

Attaching a screenshot of the task definitions it has been creating:
I have started working on prefect with task-def v8 for every run it creates new task-def

image
@anna-geller Let me know if you need anything else

IndexError: list index out of range occurs intermittently when utilizing the ECStask Block

Intermittently when Running a flow using the ECStask block the run appears to fail with this error appearing in the agent logs IndexError: list index out of range. This seems to be occurring before the flow run is actually submitted so it doesn't appear tied to the flow code itself.

Agent Logs:

09:00:00.595 | ERROR | prefect.agent - Failed to submit flow run 'XXXXXXXXXXX' to infrastructure.

Traceback (most recent call last):

File "/usr/local/lib/python3.10/site-packages/prefect/agent.py", line 257, in _submit_run_and_capture_errors

result = await infrastructure.run(task_status=task_status)

File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 456, in run

) = await run_sync_in_worker_thread(

File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread

return await anyio.to_thread.run_sync(call, cancellable=True)

File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync

return await get_asynclib().run_sync_in_worker_thread(

File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread

return await future

File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run

result = context.run(func, *args)

File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 561, in _create_task_and_wait_for_start

self._wait_for_task_start(

File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 744, in _wait_for_task_start

for task in self._watch_task_run(

File "/usr/local/lib/python3.10/site-packages/prefect_aws/ecs.py", line 712, in _watch_task_run

task = ecs_client.describe_tasks(tasks=[task_arn], cluster=cluster_arn)[

IndexError: list index out of range

Document how to use `S3Bucket` as storage for a deployment

Now that S3Bucket implements the interfaces necessary to be used as deployment storage, we should document how to use S3Bucket in a deployment.

Scenarios that are important to document:

  • CLI based deployment
  • Python based deployment
  • Using MinIO

Error while monitoring ECS task due to missing log stream

botocore.errorfactory.ResourceNotFoundException: An error occurred (ResourceNotFoundException)
when calling the GetLogEvents operation: The specified log stream does not exist.

image

Replicating a report from a user in the community: https://prefect-community.slack.com/archives/CL09KU1K7/p1671026163262599

I've seen a couple reports of this. Perhaps the log stream takes a bit to be populated? It'd be good to retry here. Unclear if this is an issue with the way the users are configuring their task definitions without more details.

_version.py is missing

The _version.py file generated by versioneer wasn't commited when updating the repo to match the collection template. Need to make sure it's added.

If container name in task-def is not prefect throws Client Exception

If we want to use an image from a task-def ARN, we must set image=None in the ECSTask class, but I got botocore.errorfactory. ClientException: When calling the RegisterTaskDefinition operation, an error (ClientException) occurred: The container.image should not be null or empty.
This problem was resolved by changing the container name in task-def to prefect. There was no mention of this in the documentation. I'm not sure if this will be a problem for some users.

Infer container definition name instead of requiring 'prefect'

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When you create a task definition using ECS block and add task definition by default prefect add second container definition called "prefect". The system should not be adding the default prefect container definition if you add your container definition with the appropriate values.

The other issue that occurs is when you add value to the name field the task deinition will still be set as"prefect" instead of the name you entered.

{
    "containerDefinitions": [
        {
            "name": "test_util_ecs_job",
            "image": "xxx.dkr.ecr.eu-west-1.amazonaws.com/test_util",
            "cpu": 0,
            "portMappings": [],
            "essential": true,
            "environment": [
                {
                    "name": "test",
                    "value": "TEST"
                }
            ],
            "mountPoints": [],
            "volumesFrom": []
        },
        {
            "name": "prefect",
            "image": "xxx.ecr.eu-west-1.amazonaws.com/test_util",
            "cpu": 0,
            "portMappings": [],
            "essential": true,
            "environment": [],
            

Reproduction

1. Create an ECS block, 
2. Add task definition (I didn't use the task arn field for this one)
3. Create a flow run
4. Examine the task definition and see that the default prefect definition is still present

Error

No response

Versions

Version:
Prefect 2.7.8
Python: 3.11.1
Prefect-aws: 0.2.3

Additional context

No response

Support public S3 buckets

The way the code is, AWS credentials must be supplied (see issue #102)

It should be a fairly simple enhancement - e.g. I wrote my own s3_list_objects() and merely had to replace

s3_client = aws_credentials.get_boto3_session().client(...)

with

s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED))

Flow using ECSTask Infra Block remains in Pending state forever

During my migration to 2.0 I have noticed that if multiple flows are scheduled at the same time.

Eg. they all have a schedule of 0 * * * *

Only one or two of the flows will run, and the rest will remain in the Pending state forever.

I currently have one agent and one queue setup.

I am going to explore and see if setting up multiple agents may solve the problem.

'Unknown Parameter' Error in ECS when a Task Definition is Registered

When running a deployment that uses an ECSTask block the Prefect agent is able to successfully retrieve the associated task definition, however upon registering the task definition using the returned json object a ParamValidationError is thrown from the underlying botocore library for the following fields in the task definition json: requiresAttributes, registeredAt, registeredBy.

Stack trace from the Prefect 2 agent:

13/10/2022, 09:33:19 | 08:33:19.822 \| INFO \| prefect.agent - Completed submission of flow run 'ad1f930d-875a-4878-99ed-36641a16f788' | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.821 \| ERROR \| prefect.agent - Failed to submit flow run 'ad1f930d-875a-4878-99ed-36641a16f788' to infrastructure. | prefect-2-agent
13/10/2022, 09:33:19 | Traceback (most recent call last): | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 233, in _submit_run_and_capture_errors | prefect-2-agent
13/10/2022, 09:33:19 | result = await infrastructure.run(task_status=task_status) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 362, in run | prefect-2-agent
13/10/2022, 09:33:19 | ) = await run_sync_in_worker_thread( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread | prefect-2-agent
13/10/2022, 09:33:19 | return await anyio.to_thread.run_sync(call, cancellable=True) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync | prefect-2-agent
13/10/2022, 09:33:19 | return await get_asynclib().run_sync_in_worker_thread( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread | prefect-2-agent
13/10/2022, 09:33:19 | return await future | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run | prefect-2-agent
13/10/2022, 09:33:19 | result = context.run(func, *args) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 441, in _create_task_and_wait_for_start | prefect-2-agent
13/10/2022, 09:33:19 | task_definition_arn = self._register_task_definition( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/prefect_aws/ecs.py", line 818, in _register_task_definition | prefect-2-agent
13/10/2022, 09:33:19 | response = ecs_client.register_task_definition(**task_definition_request) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 508, in _api_call | prefect-2-agent
13/10/2022, 09:33:19 | return self._make_api_call(operation_name, kwargs) | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 878, in _make_api_call | prefect-2-agent
13/10/2022, 09:33:19 | request_dict = self._convert_to_request_dict( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 939, in _convert_to_request_dict | prefect-2-agent
13/10/2022, 09:33:19 | request_dict = self._serializer.serialize_to_request( | prefect-2-agent
13/10/2022, 09:33:19 | File "/usr/local/lib/python3.9/site-packages/botocore/validate.py", line 381, in serialize_to_request | prefect-2-agent
13/10/2022, 09:33:19 | raise ParamValidationError(report=report.generate_report()) | prefect-2-agent
13/10/2022, 09:33:19 | botocore.exceptions.ParamValidationError: Parameter validation failed: | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "requiresAttributes", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "registeredAt", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | Unknown parameter in input: "registeredBy", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.819 \| DEBUG \| prefect.infrastructure.ecs-task - Task definition payload | prefect-2-agent
...
13/10/2022, 09:33:19 | 08:33:19.813 \| INFO \| prefect.infrastructure.ecs-task - ECSTask 'cocky-bulldog': Registering task definition... | prefect-2-agent
13/10/2022, 09:33:19 | 08:33:19.765 \| INFO \| prefect.infrastructure.ecs-task - ECSTask 'cocky-bulldog': Retrieving task definition 'manual_hello_world_test'... | prefect-2-agent

It appears to be an issue on the AWS SDK side where some fields in the response from describe-task-definition are not compatible in a call to register-task-definition (aws/aws-sdk#38, aws/aws-sdk#406). The current workaround appears to be removing these fields when registering the task.

Steps to Reproduce:
We are currently defining our ECS task definitions within terraform, however I have manually created a new ECS task from within the ECS GUI. Viewing the Overview for this task in the GUI and clicking on the JSON tab you can see that the requiresAttributes, registeredAt and registeredBy are all defined.

Our ECSTask block is defined with the name of our ECS cluster and we use what AWS call the 'Task definition family' for the 'task_definition_arn' argument which is correctly translated to the full arn.

There doesn't appear to a fix coming from the AWS SDK side, so this might need to be something that is handled on the Prefect side.

`S3Bucket` is incompatible with deployments

@gabcoyne ran into the following error when attempting to use a S3Bucket block document when creating a deployment:

AttributeError: 'S3Bucket' object has no attribute 'put_directory'

It appears that the deployments functionality expects a put_directory method on storage blocks with the following signature:

async def put_directory(
        self, local_path: str = None, to_path: str = None, ignore_file: str = None
    ) -> None:

Additionally, a get_directory method is expected with the following signature:

async def get_directory(
        self, from_path: str = None, local_path: str = None
    ) -> None:

S3Bucket should be updated to conform to the expect interface.

ECS Task: Specify pre-existing log group

When enabling CloudWatch logs for flows executing in an ECSTask infra block, it generates a new CW log group of "prefect." I would like to provide a reference to an existing log group and specify the log prefix for new streams.

Add SNS Notify Block

Proposal

It would be great to be able to use AWS's Simple Notification Service (SNS) to deliver Prefect notifications. Specifically, I'd like to be able to publish a message to a SNS topic as a notify capability. In order to be ultimately useful, this block would need to be usable in a Cloud automation.

Example

from prefect_aws.sns import SNS


def create_block():
    block = SNS(
        topic_arn="arn:aws:sns:us-east-1:123456789123:PrefectNotifications"
    )
    block.save("prefect-notifications")


def send_message(message: str):
    block = SNS.load("prefect-notifications")
    block.notify("Send help!")

ECR registry block does not work.

ECR registry block's docker client throws a 404 not authenticated on image pull. I know there were bugs with private registries having their own separate docker client that did not login, but that seems fixed. This issue seems unresolved for ECR. I need the ECR block for running a local agent, on-prem, that pulls from ECR.

Using ignore_file in S3Bucket.put_directory skips files which should be uploaded

When I try to use the ignore_file option in S3Bucket.put_directory it skips files which should be included. As an example I modified one test and simply added an empty .prefectignore file. No files are uploaded then.

async def test_put_directory_respects_local_path(
    s3_bucket: S3Bucket, tmp_path: Path, aws_creds_block
):
    (tmp_path / "file1.txt").write_text("FILE 1")
    (tmp_path / "file2.txt").write_text("FILE 2")
    (tmp_path / "folder1").mkdir()
    (tmp_path / "folder1" / "file3.txt").write_text("FILE 3")
    (tmp_path / "folder1" / "file4.txt").write_text("FILE 4")
    (tmp_path / "folder1" / "folder2").mkdir()
    (tmp_path / "folder1" / "folder2" / "file5.txt").write_text("FILE 5")
    (tmp_path / ".prefectignore").write_text("") # ADDED

    uploaded_file_count = await s3_bucket.put_directory(
        local_path=str(tmp_path / "folder1"),
        ignore_file=str(tmp_path / ".prefectignore"), # ADDED
    )
    assert uploaded_file_count == 3  # uploaded_file_count equals 0 

When using S3Bucket to deploy a Flow, the basepath is used twice

Previously I used S3(bucket_path="s3-bucket/my-dir") for deployments which worked as expected.

Now I use S3Bucket(bucket_name="s3-bucket", basepath="my-dir") and the files are saved under s3-bucket/my-dir/my-dir instead of s3-bucket/my-dir.

When deploying a flow with S3Bucket the put_directory method is called with to_path=None which is then resolved to basepath:

# put_directory
if to_path is None:
    to_path = str(self.basepath) if self.basepath is not None else ""

put_directory then calls the write_path method, which resolves the path again:

# write_path
path = self._resolve_path(path)

But path already includes the basepath

# _resolve_path
path = str(Path(self.basepath) / path) if self.basepath else path

I tried to changed it either in put_directory or write_path but then the other tests are failing.

Maybe S3Bucket should not use basepath directly and instead use an individual parameter like S3 did with bucket_path?
Also might be worth to include tests for the Deployment using S3Bucket?

I am looking forward to the discussion and would like to take part in the solution.

Flows Using ECS Task infrastructure block crash when paused if using reschedule=True

When using the pause_flow_run function with argument reschedule=True on a flow with an ECS Task infrastructure block the expectation is that when paused the ECS task will be deprovisioned and the flow will remain in a Paused state. When resumed a new ECS task will be provisioned and run the remaining flow tasks (similar to raise PAUSE in 1.0).

What I am seeing is that when the flow hits the Paused state the ECS task deprovisions and then the flow moves from a Paused state to a Crashed state. The final State Message is "Flow run infrastructure exited with non-zero status code -1.". The last log message on the ECS task is "21:55:46.456 | INFO | prefect.engine - Engine execution of flow run '09d19369-66ad-4f96-837a-09b713e3ff46' is paused:"

I am using a ECS Task infrastructure block on Fargate. There doesn't seem to be any error other than that cloud is expecting the infrastructure to still be present and it is not.

Simple example flow code I am using:

from prefect import get_run_logger
from prefect import flow, task, pause_flow_run

@task(name="First Task", persist_result=True)
def first_task():
    logger=get_run_logger()
    logger.info('first task')
    return None
    
@task(name="Second Task", persist_result=True)
def second_task():
    logger=get_run_logger()
    logger.info('second task')
    return None

@flow(name="Test pause", result_storage='s3/prefect-results')
def test_pause_flow():
    first_result = first_task()
    pause_flow_run(reschedule=True, timeout=64800)
    second_result = second_task(wait_for=[first_result])

The ECS task definition was created using this terraform: https://github.com/PrefectHQ/prefect-recipes/tree/main/devops/infrastructure-as-code/aws/tf-prefect2-ecs-agent

Prefect deployment expecting string on basepath not `pathlib.Path`.

Overview

This is similar to issue found in the prefect repo (PrefectHQ/prefect#7583), where AttributeError is encountered when trying to build deployment with s3-bucket storage block type. As I mentioned in the other issue S3Bucket attribute of basepath is pathlib.Path type not str, which caused the error I think:

basepath: Optional[Path] = Field(
default=None,
description="Location to write to and read from in the S3 bucket. Defaults to "
"the root of the bucket.",
)

Accept file-like objects in `s3_upload`

Since s3.upload_fileobj is able to accept file-like objects as seen in the docs, we should match that within our s3_upload task too.

At the moment, it converts input data bytes into a BytesIO stream. We should:

  1. expand the type annotation of data from bytes to typing.IO or typing.BinaryIO
  2. detect whether it's a file-like object; if so directly pass it into the underlying boto function, else convert into bytesio

Add ability to override S3 endpoint_url

They are a lot of alternatives to AWS S3 like minio which are used in the enterprise. To allow this module to work it would be nice to allow to pass to the S3 task a dictionary that would be used when we initialize the S3 client. For example from boto3 doc

This would allow as well to set up the verification when we are using a custom CA Bundle. https://github.com/boto/boto3/blob/develop/boto3/session.py.

If you are happy with that change I can create a PR for it.

Add deployment storage compatible `CodeCommitRepository` block

Create a CodeCommitRepository block that can be used as remote storage for a deployment. The block should follow a similar pattern to the GitHub remote storage block and have an AwsCredentials attribute that can be used for authentication.

This particular block was requested during a PACC course.

Prefect-aws==0.1.2 has a botocore==1.24.21 on Pipenv

Hey Prefect team!

I've stumbled upon this Pipenv issue. When trying to execute the following:
pipenv install prefect-aws

It installs v0.1.2, but when pipenv is generating the lockfile, it has a botocore==1.24.21 subdependency. I see that in this repo, it should be botocore==1.27.53. There's a mismatch between this repo and the Pypi one perhaps? Because the v0.1.2 version in Pypi was updated last Aug 3, but the requirements.txt in this repo was updated last Aug 18.

Thanks in advance.

s3 exception when no credentials set

It appears as though a cryptic error is thrown if no credentials are defined in an S3 block and no credntials are defined locally either:

Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 986, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
  File "/usr/lib/python3.10/asyncio/base_events.py", line 1049, in create_connection
    sock = await self._connect_sock(
  File "/usr/lib/python3.10/asyncio/base_events.py", line 960, in _connect_sock
    await self.sock_connect(sock, address)
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 499, in sock_connect
    return await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/client.py", line 535, in _request
    conn = await self._connector.connect(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 542, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 907, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 1175, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/connector.py", line 985, in _wrap_create_connection
    async with ceil_timeout(timeout.sock_connect):
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/async_timeout/__init__.py", line 129, in __aexit__
    self._do_exit(exc_type)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/async_timeout/__init__.py", line 212, in _do_exit
    raise asyncio.TimeoutError
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 178, in send
    response = await self._session.request(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiohttp/client.py", line 539, in _request
    raise ServerTimeoutError(
aiohttp.client_exceptions.ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 212, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 141, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/cli/deployment.py", line 577, in build
    deployment = await Deployment.build_from_flow(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 598, in build_from_flow
    await deployment.upload_to_storage()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 465, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 459, in put_directory
    return await self.filesystem.put_directory(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 342, in put_directory
    self.filesystem.put_file(f, fpath, overwrite=True)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 1015, in _put_file
    await self._call_s3(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 325, in _call_s3
    await self.set_session()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/s3fs/core.py", line 473, in set_session
    self._s3 = await s3creator.__aenter__()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 22, in __aenter__
    self._client = await self._coro
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 102, in _create_client
    credentials = await self.get_credentials()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/session.py", line 133, in get_credentials
    self._credentials = await (self._components.get_component(
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/credentials.py", line 814, in load_credentials
    creds = await provider.load()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/credentials.py", line 486, in load
    metadata = await fetcher.retrieve_iam_role_credentials()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/utils.py", line 175, in retrieve_iam_role_credentials
    token = await self._fetch_metadata_token()
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/utils.py", line 88, in _fetch_metadata_token
    response = await session.send(request.prepare())
  File "/home/user/.cache/pypoetry/py3.10/lib/python3.10/site-packages/aiobotocore/httpsession.py", line 210, in send
    raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"
An exception occurred.

ECSTask - Cannot specify a VPC that is owned by another AWS account

Using ECSTask - If I leave VPC Id configuration out of the infra block, then the tasks will run in the default VPC.

However for a managed VPC - if I set that id, the agent reports the following botocore error:

botocore.errorfactory.InvalidParameterException:
An error occurred (InvalidParameterException) when calling the RunTask operation: 
At least one security group must be supplied when specifying subnets that are owned by a different account.

There is no way to configure the secruity group/subnets currently in 2.4.0 - which was possible in prefect 1 through run_task_kwargs

Ideally, I'd like to use the existing VPC by configuring the security group and subnets.

Reduce chances of hitting AWS ECS Rate Limits

AWS will rate limit task definition registration for a given task family. There are two factors that make it easy for a relatively low volume of Prefect flows to hit rate limits:

  1. All Prefect Flows are registered under the same task family "prefect" defined here. This is relevant because the error raise is RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
  2. Task definitions create a new log stream for each task created here, which in turn means that each flow run has a new task definition.

Solution:
I feel like creating task families per deployment will be the easiest way around this, as this reduces the number of registrations for a given family. However this does not fully resolve the issue. It is nice to see a new log stream for every flow run, however the need to register task definitions introduces a chance for rate limits to be hit.

ECSTask creates new AWS ECS task definition revision every time a flow run is created from a deployment

Using the _retrieve_task_definition classmethod to fetch a task definition from aws renders the below, there are 6 fields that are created every time a new task definition is registered which will cause diffing logic to detect drift every single time a task definition is registered. These fields are registeredBy, registeredAt, requiresAttributes, placementConstraints, status and revision. If these fields are stripped from the retrieved task definition we can effectively reduce the need to register new task definitions and prevent AWS rate limits and other strange boto behavior:

{'taskDefinitionArn': 'arn:aws:ecs:us-east-2:455346737763:task-definition/prefect-agent-prod-fargate-prefect-agent:2',
 'containerDefinitions': [{'name': 'prefect-agent-prod-fargate-prefect-agent',
   'image': 'prefecthq/prefect:2-python3.10',
   'cpu': 1024,
   'memory': 2048,
   'portMappings': [],
   'essential': True,
   'command': ['prefect', 'agent', 'start', '-q', 'prod-ecs'],
   'environment': [{'name': 'PREFECT_API_URL',
     'value': 'https://api.prefect.cloud/api/accounts/0ff44498-d380-4d7b-bd68-9b52da03823f/workspaces/bb3005b9-99c6-4289-802b-6cfbfcffc5c0'},
    {'name': 'EXTRA_PIP_PACKAGES', 'value': 'prefect-aws s3fs'}],
   'mountPoints': [],
   'volumesFrom': [],
   'secrets': [{'name': 'PREFECT_API_KEY',
     'valueFrom': 'OBFUSCATED'}],
   'logConfiguration': {'logDriver': 'awslogs',
    'options': {'awslogs-group': 'prefect-agent-log-group-prod',
     'awslogs-region': 'us-east-2',
     'awslogs-stream-prefix': 'prefect-agent-prod-fargate-prefect-agent'}}}],
 'family': 'prefect-agent-prod-fargate-prefect-agent',
 'executionRoleArn': 'arn:aws:iam::455346737763:role/prefect-agent-execution-role-prod',
 'networkMode': 'awsvpc',
 'revision': 2,
 'volumes': [],
 'status': 'ACTIVE',
 'requiresAttributes': [{'name': 'com.amazonaws.ecs.capability.logging-driver.awslogs'},
  {'name': 'ecs.capability.execution-role-awslogs'},
  {'name': 'com.amazonaws.ecs.capability.docker-remote-api.1.19'},
  {'name': 'ecs.capability.secrets.asm.environment-variables'},
  {'name': 'com.amazonaws.ecs.capability.docker-remote-api.1.18'},
  {'name': 'ecs.capability.task-eni'}],
 'placementConstraints': [],
 'compatibilities': ['EC2', 'FARGATE'],
 'requiresCompatibilities': ['FARGATE'],
 'cpu': '1024',
 'memory': '2048',
 'registeredAt': datetime.datetime(2022, 12, 19, 16, 36, 58, 506000, tzinfo=tzlocal()),
 'registeredBy': 'arn:aws:sts::455346737763:assumed-role/AWSReservedSSO_AdministratorAccess_fe1980cf6870d6ed/[email protected]'}

Fix failing ECS task tests

Tests are failing in CI with the following error:



__________________________ test_launch_types[FARGATE] __________________________

aws_credentials = AwsCredentials(aws_access_key_id='access_key_id', aws_secret_access_key=SecretStr('**********'), aws_session_token=None, profile_name=None, region_name='us-east-1')
launch_type = 'FARGATE'

    @pytest.mark.usefixtures("ecs_mocks")
    @pytest.mark.parametrize("launch_type", ["EC2", "FARGATE", "FARGATE_SPOT"])
    async def test_launch_types(aws_credentials, launch_type: str):
        task = ECSTask(
            aws_credentials=aws_credentials,
            auto_deregister_task_definition=False,
            command=["prefect", "version"],
            launch_type=launch_type,
        )
        print(task.preview())
    
        session = aws_credentials.get_boto3_session()
        ecs_client = session.client("ecs")
    
>       task_arn = await run_then_stop_task(task)

...
self = <moto.ecs.models.Task object at 0x7fce6af6cf10>
cluster = <moto.ecs.models.Cluster object at 0x7fce65f7d190>
task_definition = <moto.ecs.models.TaskDefinition object at 0x7fce691a6e10>
container_instance_arn = 'arn:aws:ecs:us-east-1:123456789012:container-instance/default/e2f46c39-c747-4a71-af62-7426d6d1c898'
resource_requirements = {'CPU': 0, 'MEMORY': 0, 'PORTS': [], 'PORTS_UDP': []}
backend = <moto.ecs.models.EC2ContainerServiceBackend object at 0x7fce6aaa9610>
launch_type = 'FARGATE'
overrides = {'containerOverrides': [{'command': ['prefect', 'version'], 'environment': [{'name': 'PREFECT_ORION_DATABASE_CONNECTION_URL', 'value': 'sqlite+aiosqlite:////tmp/tmp9xu65h5q/orion.db'}], 'name': 'prefect'}]}
started_by = '', tags = []
networking_configuration = {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-5b085f7a', 'subnet-57e3fb65', 'subnet-544405f4', 'subnet-7be12aee', 'subnet-c47aeb59', 'subnet-0b5523d7']}}

    def __init__(
        self,
        cluster,
        task_definition,
        container_instance_arn,
        resource_requirements,
        backend,
        launch_type="",
        overrides=None,
        started_by="",
        tags=None,
        networking_configuration=None,
    ):
        self.id = str(mock_random.uuid4())
        self.cluster_name = cluster.name
        self.cluster_arn = cluster.arn
        self.container_instance_arn = container_instance_arn
        self.last_status = "RUNNING"
        self.desired_status = "RUNNING"
        self.task_definition_arn = task_definition.arn
        self.overrides = overrides or {}
        self.containers = []
        self.started_by = started_by
        self.tags = tags or []
        self.launch_type = launch_type
        self.stopped_reason = ""
        self.resource_requirements = resource_requirements
        self.region_name = cluster.region_name
        self._account_id = backend.account_id
        self._backend = backend
        self.attachments = []
    
        if task_definition.network_mode == "awsvpc":
            if not networking_configuration:
    
                raise InvalidParameterException(
                    "Network Configuration must be provided when networkMode 'awsvpc' is specified."
                )
    
            self.network_configuration = networking_configuration
            net_conf = networking_configuration["awsvpcConfiguration"]
            ec2_backend = ec2_backends[self._account_id][self.region_name]
    
            eni = ec2_backend.create_network_interface(
                subnet=net_conf["subnets"][0],
                private_ip_address=random_private_ip(),
>               group_ids=net_conf["securityGroups"],
                description="moto ECS",
            )
E           KeyError: 'securityGroups'

Originally posted by @ahuang11 in #125 (comment)

Add `s3_upload_file` task

Create a task that uploads a file to S3 bucket. The task should accept a required file_name, bucket, aws_credentials, optional aws_client_parameters, optional key argument, and an optional transfer_config object. It should return the key of the uploaded object.

The task should be similar to s3_upload but for files instead of file-like objects.

If you'd like to help contribute this, please let us know in the comments below, thanks! We're also happy to provide further guidance if desired.

References:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html
https://stackoverflow.com/questions/50105094/python-upload-large-files-s3-fast

`S3Bucket` methods are not sync compatible

Summary

Trying to use S3Bucket.read_path or S3Bucket.write_path in a sync context raises TypeError: cannot pickle 'coroutine' obejct

If I install prefect-aws in editable mode and add the sync_compatible decorator to each method, things work as expected.

MRE and output

from prefect import flow, task
from prefect_aws.s3 import S3Bucket

@task
def testing():
    my_bucket = S3Bucket.load("test-bucket")
    
    path = my_bucket.read_path("requirements.txt")

    print(path, type(path))

    return path

@flow(name="example_flow")
def test():
    result = testing()

if __name__ == "__main__":
    test()
❯ python return_state.py
14:47:28.885 | INFO    | prefect.engine - Created flow run 'mini-anaconda' for flow 'example_flow'
14:47:29.752 | INFO    | Flow run 'mini-anaconda' - Created task run 'testing-3152b825-0' for task 'testing'
14:47:29.754 | INFO    | Flow run 'mini-anaconda' - Executing 'testing-3152b825-0' immediately...
<coroutine object S3Bucket.write_path at 0x7f91e015f7c0> <class 'coroutine'>
14:47:30.080 | ERROR   | Flow run 'mini-anaconda' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "return_state.py", line 19, in test
    print(result.result())
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/task_runners.py", line 203, in submit
    result = await call()
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1104, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1233, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/states.py", line 130, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'coroutine' object
14:47:30.089 | INFO    | Task run 'testing-3152b825-0' - Crash detected! Execution was interrupted by an unexpected exception.
14:47:30.268 | ERROR   | Flow run 'mini-anaconda' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "return_state.py", line 22, in <module>
    test()
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/flows.py", line 388, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 159, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run
    return state.result()
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "return_state.py", line 19, in test
    print(result.result())
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/task_runners.py", line 203, in submit
    result = await call()
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1104, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/engine.py", line 1233, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/states.py", line 130, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/nate/opt/miniconda3/envs/prefect-2/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'coroutine' object

Add AWS region to S3 block configuration

Problem

Deployment push (during build) and pull (when agent runs flow) using S3 block is dependent on region set in environment (AWS_REGION or ~/.aws/config). If S3 bucket is in different region, push/pull fails with "Access denied".

Workaround

Set AWS_REGION when deploying or running agent (problematic when multiple regions are used)

Possible solutions

  • Add region to S3 block configuration
  • Determine bucket's region automatically on access

ECS Task: Enable security groups

I would like to pass an existing Security Group(s) to enable communication between Orion and generated ECS tasks and other network-isolated services in my VPCs.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.