Code Monkey home page Code Monkey logo

prefect-ray's Issues

Circular import error upon deployment

(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/python.py", line 8, in <module>
(begin_task_run pid=8142)     from prefect.software.pip import PipRequirement, current_environment_requirements
(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/pip.py", line 4, in <module>
(begin_task_run pid=8142)     import packaging.requirements
(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/__init__.py", line 1, in <module>
(begin_task_run pid=8142)     from prefect.packaging.docker import DockerPackager
(begin_task_run pid=8142) ImportError: cannot import name 'DockerPackager' from partially initialized module 'prefect.packaging.docker' (most likely due to a circular import) (/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/docker.py)

The issue is that in a deployment, ray includes /home/ray/anaconda3/lib/python3.8/site-packages/prefect/ in its sys.path, and in this directory, there’s a packaging directory which confuses the from packaging import version statement (it tries to pull from prefect.packaging rather than the pypi packaging).

To workaround this issue, override sys.argv[0] = str(Path.cwd()) in prefect_ray/task_runners.py

Flows with many tasks unable to complete

Expectation / Proposal

With the example code below, only 5 tasks can scheduled by Ray at any time, and each task will take between 45-135 seconds. We schedule 15,000 tasks, and although each task does not use much memory (and only 5 can run at any given time), eventually Ray stops being able to schedule any tasks, so the flow run just hangs.
Ray error:

Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.

The flow completes normally for lower num_map_keys, e.g. 100.
It also completes normally with num_map_keys=15000 using the default concurrent task runner.

https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html gives a possible resolution to this; prefect should expose the maximum number of pending Ray tasks as a configurable variable.

Traceback / Example

from time import sleep

import numpy as np
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner


@task
def a(n: int, t: int, jitter_pct: float) -> float:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task a with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return n**2

@task
def b(n: float, t: int, jitter_pct: float) -> float:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task b with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return n**0.5


@task
def c(n: float, t: int, jitter_pct: float) -> int:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task c with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return int(n*2)


@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 5}))
def many_map_tasks(num_map_keys: int = 15000, t: int = 90, jitter_pct: float = 0.5):
    squares = a.map(range(num_map_keys), t, jitter_pct)
    roots = b.map(squares, t, jitter_pct)
    doubles = c.map(roots, t, jitter_pct)
    return [f.result() for f in doubles]


if __name__ == "__main__":
    print(many_map_tasks())

Flow crashes after multiple hanged tasks with "Prefect Task run TASK_RUN_ID already finished"

Prefect Version

Version: 2.6.7
API version: 0.8.3
Python version: 3.8.9
Git commit: 9074c158
Built: Thu, Nov 10, 2022 1:07 PM
OS/Arch: linux/x86_64
Profile: cloud
Server type: cloud

Prefect Ray Version

0.2.1

Ray Version

2.1.0

Describe the current behavior

I'm running a Ray cluster in azure (cluster yaml attached below). I created a small flow, using the Azure Container Instance Job infrastructure block, that looks like this:

@task
async def my_task(id: int):
    logger = get_run_logger()
    logger.info(f"Hello from my_task #{id}! Sleeping for 30 seconds...")
    time.sleep(30)
    logger.info(f"my_task #{id} done!")

@flow(
    name=FLOW_NAME,
    task_runner=RayTaskRunner(address="ray://10.0.0.4:10001")
)
def my_flow(loglevel: str = "DEBUG"):
    logconfig = setup_logging.submit(loglevel=loglevel)
    logger = get_run_logger()
    logger.info("Starting")
    # Spam ray tasks here!
    for i in range(50):
        my_task.submit(i + 1, wait_for=[logconfig])
        time.sleep(5)

After running this flow for about 5 minutes, I'm starting to see a lot of tasks that are stuck as 'running' indefinitely. Furthermore, these tasks are printing the following message to the log:

Hello from my_task #42! Sleeping for 30 seconds...                    05:15:10 PM
Task run 'b4a5a5f5-1d72-4146-9476-cf157e686df3' already finished.     05:15:39 PM

After seeing a handful of tasks exhibiting this behavior, the flow promptly crashes with no further error message. The agent logs the following:

16:11:52.674 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Running command...                   
16:11:52.676 | INFO    | prefect.agent - Completed submission of flow run '87eba3cb-f09f-4b21-90f5-e0d7923801e4'                                          
16:16:55.294 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Completed command run.               
16:16:55.296 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Deleting container...    

The ray cluster doesn't log anything of note, that I can tell. I'm not entirely sure what's happening here, but as per this discussion: https://discourse.prefect.io/t/edgecase-error-prefect-task-run-task-run-id-already-finished/1836 I'm not the only person affected by this.

Describe the proposed behavior

Of course the tasks shouldn't hang with "task already finished", they should either exit properly and crash, or continue running properly.

Ray Dockerfile

Ray Dockerfile

FROM rayproject/ray:2.1.0-py38-cpu

# Don't buffer console output
ENV PYTHONUNBUFFERED=1
# Don't compile bytecode, we're only running once
ENV PYTHONDONTWRITEBYTECODE=1
ENV CODE_PATH=/home/ray/anaconda3/lib/python3.8/site-packages/

COPY ./pipeline/requirements.txt ${CODE_PATH}/pipeline/requirements.txt
RUN pip install pip --upgrade && \
    pip install -r ${CODE_PATH}/pipeline/requirements.txt

COPY ./logging.yml ${CODE_PATH}/logging.yml
COPY ./flow_audit ${CODE_PATH}/flow_audit
COPY ./orchestration ${CODE_PATH}/orchestration
COPY ./pipeline ${CODE_PATH}/pipeline
Ray Cluster YAML

Ray Cluster YAML

Please ignore the comments here, they're mostly from the default cluster YAML. We run a custom docker image as seen below from censoredregistry.azurecr.io/ray-cluster:latest (not the actual URL). The Dockerfile is attached above.

# An unique identifier for the head node and workers of this cluster.
cluster_name: test

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 25

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 100.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty object means disabled.
docker:
    # image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    image: censoredregistry.azurecr.io/ray-cluster:latest # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options:   # Extra options to pass into "docker run"
        - --ulimit nofile=65536:65536

    # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"
    # Allow Ray to automatically detect GPUs

    # worker_image: "rayproject/ray-ml:latest-cpu"
    # worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 30 # TODO: Should probably be something like 5 minutes

# Cloud-provider specific configuration.
provider:
    type: azure
    # https://azure.microsoft.com/en-us/global-infrastructure/locations
    location: westeurope
    cache_stopped_nodes: False
    resource_group: rg-ray-test
    # Below settings only applicable for AWS, not Azure.
    # security_group:
    #     GroupName: ray-nsg
    #     IpPermissions:
    #           - FromPort: 10001
    #             ToPort: 10001
    #             IpProtocol: TCP
    #             IpRanges:
    #                 # This will enable inbound access from ALL IPv4 addresses.
    #                 - CidrIp: 194.218.21.196
    # set subscription id otherwise the default from az cli will be used
    subscription_id: CENSORED_SUBSCRIPTION_ID
    # set unique subnet mask or a random mask will be used
    subnet_mask: 10.0.0.0/24
    # set unique id for resources in this cluster
    # if not set a default id will be generated based on the resource group and cluster name
    unique_id: test

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ray
    # you must specify paths to matching private and public key pair files
    # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
    ssh_private_key: ~/.ssh/ray-test
    # changes to this should match what is specified in file_mounts
    ssh_public_key: ~/.ssh/ray-test.pub

# More specific customization to node configurations can be made using the ARM template azure-vm-template.json file
# See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines
# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs
# on the head node, so changes to the template must be included in the wheel file used in setup_commands section below

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The resources provided by this node type.
        resources: {"CPU": 0}
        # Provider-specific config, e.g. instance type.
        node_config:
            azure_arm_parameters:
                vmSize: Standard_D2s_v3
                # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
                imagePublisher: microsoft-dsvm
                imageOffer: ubuntu-1804
                imageSku: 1804-gen2
                imageVersion: latest

    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 0
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 25
        # The resources provided by this node type.
        resources: {"CPU": 2}
        # Provider-specific config, e.g. instance type.
        node_config:
            azure_arm_parameters:
                vmSize: Standard_D2s_v3
                # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
                imagePublisher: microsoft-dsvm
                imageOffer: ubuntu-1804
                imageSku: 1804-gen2
                imageVersion: latest
                # optionally set priority to use Spot instances
                # priority: Spot
                # set a maximum price for spot instances if desired
                # billingProfile:
                #     maxPrice: -1

# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
     "~/.ssh/ray-test.pub": "~/.ssh/ray-test.pub"
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
    # add /home/ray to path
    - export PATH=$PATH:/home/ray
    # enable docker setup
    - sudo usermod -aG docker $USER || true
    - sleep 10  # delay to avoid docker permission denied errors
    # get rid of annoying Ubuntu message
    - touch ~/.sudo_as_admin_successful
    # Wait for auto upgrade that might run in the background.
    - bash -c $'ps -e | grep apt | awk \'{print $1}\' | xargs tail -f --pid || true'
    # Log in to azure container registry (by ray-msi-test)
    - az login --identity
    - docker login censoredregistry.azurecr.io -u censoreduser -p censoredpassword # Exactly how the password is retrieved is censored.

# List of shell commands to run to set up nodes.
# NOTE: rayproject/ray-ml:latest has ray latest bundled
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
# NOTE: rayproject/ray-ml:latest has azure packages bundled
head_setup_commands: []
    # - pip install -U azure-cli-core==2.22.0 azure-mgmt-compute==14.0.0 azure-mgmt-msi==1.0.0 azure-mgmt-network==10.2.0 azure-mgmt-resource==13.0.0

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - export AUTOSCALER_MAX_NUM_FAILURES=inf;
    - ray stop
    - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076

First class results error using remote ray

I checked out PrefectHQ/prefect#6908 branch on both local and remote machine, then ran this code (if address is omitted, it works). Any ideas @madkinsz?

from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
from prefect.settings import temporary_settings, PREFECT_LOCAL_STORAGE_PATH

# from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name: str) -> None:
    logger = get_run_logger()
    logger.info(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")


# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        # 127.0.0.1:10001 is port-forwarded to the remote ray cluster
        address="ray://myaddress",
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

Results in:

15:29:39.838 | INFO    | prefect.engine - Created flow run 'wakeful-condor' for flow 'greetings'
15:29:39.839 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://3.95.199.184:10001
15:29:43.498 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
15:29:44.509 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
15:29:46.272 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-0' for execution.
15:29:46.366 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
15:29:46.458 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-2' for execution.
15:29:46.461 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
15:29:46.553 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-3' for execution.
15:29:46.554 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
15:29:46.647 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-1' for execution.
15:29:46.728 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:46.849 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.005 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.134 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:48.747 | ERROR   | Flow run 'wakeful-condor' - Finished in state Failed('4/4 states failed.')
Traceback (most recent call last):
  File "/Users/andrew/test2.py", line 40, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/Users/andrew/Applications/python/prefect/src/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/orion.py", line 86, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 232, in create_then_begin_flow_run
    return state.result()
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 116, in result
    state.result()
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 102, in result
    raise data
TypeError: missing a required argument: 'result_factory'

ModuleNotFoundError 'ray'

With Python 3.9 and 3.10

prefect-ray 0.1.2 pypi_0 pypi

Traceback (most recent call last):
  File "/Users/jeffhale/Desktop/prefect/test-doc-examples/ex2.py", line 330, in <module>
    from prefect_ray import RayTaskRunner
  File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/__init__.py", line 6, in <module>
    from .task_runners import RayTaskRunner  # noqa
  File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/task_runners.py", line 52, in <module>
    import ray
ModuleNotFoundError: No module named 'ray'

Handle dependent tasks concurrently in the `RayTaskRunner`

When updating the RayTaskRunner #35, wrapping of Prefect futures with Ray task futures was introduced. For tasks with upstream dependencies, the upstream tasks need to synchronously unwrap the Ray future to provide the output of the upstream task to the downstream task. This may result in degraded performance for flows using the RayTaskRunner with many dependent tasks. The aim is to discover a way to concurrently handle Ray futures to enable concurrent execution for flows with dependent tasks.

@anticorrelator @zangell44 feel free to add any additional details or correct any inaccuracies in my description of the issue!

Python 3.10 support

Expectation / Proposal

Python 3.10 is supported by Ray since v2.0.0.
It would be nice to also support it in prefect-ray.

Traceback / Example

Recognize Ray Cluster defined via env var RAY_ADDRESS

Current Behaviour
Our infrastructure setup provides the ray cluster address in the environment variable RAY_ADDRESS. Currently, prefect_ray does not recognize and respect the set env var and starts to create a local ray instance. However, ray.init() does recognize the env var and tries to connect to the existing cluster.
In our setup, this leads to an error in case of a terminated cluster that has to be restarted: prefect_ray does not wait for the restart to be finished before starting the task execution.

[~]$ export RAY_ADDRESS=anyscale://prefect-flow-template_cluster
[~]$ python example.py ray
ray_address='anyscale://prefect-flow-template_cluster'
11:43:53.872 | INFO    | prefect.engine - Created flow run 'nondescript-urchin' for flow 'flow'
11:43:53.874 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-12-12 11:43:53,874	INFO worker.py:1230 -- Using address anyscale://prefect-flow-template_cluster set in the environment variable RAY_ADDRESS
Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.
[...]
Waiting for cluster prefect-flow-template_cluster to start. This may take a few minutes
14:24:33.357 | ERROR   | Flow run 'prompt-anaconda' - Crash detected! Execution was interrupted by an unexpected exception: click.exceptions.ClickException: Error while starting cluster prefect-flow-template_cluster: KeyError: 'auth'

14:24:33.624 | ERROR   | prefect.engine - Engine execution of flow run '83baaa1a-643c-4e57-9ed9-e41b02ed4f6e' exited with unexpected exception

Desired Behaviour
Support RAY_ADDRESS as described in the ray documentation.

You can also define an environment variable called RAY_ADDRESS in the same format as the address parameter to connect to an existing cluster with ray.init() or ray.init(address=”auto”).
https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-init

Proposed Solution
Add check whether env var RAY_ADDRESS is set in line https://github.com/PrefectHQ/prefect-ray/blob/main/prefect_ray/task_runners.py#L206.

Flow & Task Code example.py

import os
import sys

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from prefect_ray import RayTaskRunner


@task
def task():
    print("task execution")


@flow(log_prints=True)
def flow():
    print("flow execution start")
    task.submit()
    print("flow execution end")

def main(argv):
    if (argv[1] == "sequential"):
        flow.with_options(task_runner=SequentialTaskRunner())()
    elif (str(argv[1]) == "ray"):
        ray_address = str(os.environ.get("RAY_ADDRESS"))
        print(f"{ray_address=}")
        flow.with_options(task_runner=RayTaskRunner())()

if __name__ == "__main__":  # noqa
    main(sys.argv)

How to properly execute flows with cpu/gpu requirements

We were wondering how to specify that a prefect flow (or even tasks) should only run on instances which fit to the resource requirements. What is the annotation to manage this on a flow/task level?
Basically we would like to use the @ray.remote(num_gpus=1) annotation of ray.

Scenario: We have a Ray-Cluster up and running which has cpu and gpu worker nodes. We now f.ex. would like to execute a flow only on the gpu nodes and have them then scale out accordingly.

We tested a bit around with num_gpus, request_cpus etc. but did not have success yet.

Here some example code:

from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
import time


RayRunner = RayTaskRunner(
    address="auto",
    init_kwargs={
        "num_gpus": 2,
    }
)


@flow(name="time-wait-flow", task_runner=RayTaskRunner)
def time_wait_flow_gpu():
    factor = 50
    times = [5.0, 16.3, 25.5]
    for i in range(0, factor):
        for time_in_secs in times:
            wait_time.submit(time_in_secs)


@task
def wait_time(time_in_secs):
    time.sleep(time_in_secs)


if __name__ == "__main__":
    time_wait_flow_gpu()

prefect2: No module named 'prefect.utilities.asyncio'

Seeing the following error with prefect2

python flow.py Traceback (most recent call last):
  File "flow.py", line 8, in <module>    from prefect_ray.task_runners import RayTaskRunner
  File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module>
    from .task_runners import RayTaskRunner  # noqa
  File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module>
    from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'

Repro

python3 -m venv venv 
./venv/bin/python -m pip install --upgrade "prefect>=2.0b" "prefect-ray"
./venv/bin/python -c "from prefect_ray.task_runners import RayTaskRu
nner"Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module>    from .task_runners import RayTaskRunner  # noqa
  File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module>    from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'

Subflows with RayTaskManager

Prefect 2.0 supports subflows which is a great new addition. There is a bug when using nested flows with the same RayTaskManager(), in essence it crashes complaining about calling ray.init() twice.

It would be nice if this workcase would be supported since it would allow combining different parallel flows.

To reproduce, use:

from prefect import task, flow
from prefect_ray import RayTaskRunner


@task
def some_task(input):
    return input


@flow(task_runner=RayTaskRunner())
def my_subflow():
    some_task.submit(1)
    some_task.submit(2)
    some_task.submit(3)


@flow(task_runner=RayTaskRunner())
def some_flow():
    my_subflow()
    some_task.submit(4)


some_flow()

The error is:

09:15:38.789 | INFO    | prefect.engine - Created flow run 'terrestrial-bird' for flow 'some-flow'
09:15:38.789 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-08-17 09:15:40,455	INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265
09:15:42.168 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
09:15:42.168 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
09:15:42.355 | INFO    | Flow run 'terrestrial-bird' - Created subflow run 'small-marten' for flow 'my-subflow'
09:15:42.356 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
09:15:42.357 | ERROR   | Flow run 'small-marten' - Crash detected! Execution was interrupted by an unexpected exception.
09:15:42.390 | ERROR   | Flow run 'terrestrial-bird' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
09:15:45.181 | ERROR   | Flow run 'terrestrial-bird' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 29, in <module>
    some_flow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

I solved the problem in my private package by using a modified RayTaskRunner. In particular, I modified the _start_function to start with:

if self.address:
    self.logger.info(
        f"Connecting to an existing Ray instance at {self.address}"
    )
    init_args = (self.address,)
elif not ray.is_initialized():
    self.logger.info("Creating a local Ray instance")
    init_args = ()

Note the ray.is_initialized call.

Sometimes log output is missing

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner


@task
def count(number):
    with open(f"{number}.txt", "w") as f:
        f.write(str(number))
    print(number)

@flow(task_runner=RayTaskRunner())
def count_to(highest_number):
    for number in range(1, highest_number + 1):
        count.with_options(name=f"{number}").submit(number)


if __name__ == "__main__":
    count_to(3)

Below, it doesn't state 2-464db843-1 is completed; only 1 and 3.

13:34:57.620 | INFO    | prefect.engine - Created flow run 'adept-jackrabbit' for flow 'count-to'
13:34:57.620 | INFO    | Flow run 'adept-jackrabbit' - Using task runner 'RayTaskRunner'
13:34:57.620 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
13:35:02.659 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
13:35:02.671 | WARNING | Flow run 'adept-jackrabbit' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:35:02.712 | INFO    | Flow run 'adept-jackrabbit' - Created task run '1-464db843-0' for task '1'
13:35:02.789 | INFO    | Flow run 'adept-jackrabbit' - Created task run '2-464db843-1' for task '2'
13:35:02.903 | INFO    | Flow run 'adept-jackrabbit' - Created task run '3-464db843-2' for task '3'
(begin_task_run pid=97109) 1
(begin_task_run pid=97108) 3
(begin_task_run pid=97109) 13:35:09.111 | INFO    | Task run '1-464db843-0' - Finished in state Completed()
(begin_task_run pid=97108) 13:35:09.097 | INFO    | Task run '3-464db843-2' - Finished in state Completed()
(begin_task_run pid=97107) 2
13:35:12.537 | INFO    | Flow run 'adept-jackrabbit' - Finished in state Completed('All states completed.')

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Failed to unpickle serialized exception when running on remote cluster

09:59:53.784 | INFO    | prefect.engine - Created flow run 'glittering-bull' for flow 'greetings'
09:59:53.784 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
09:59:57.618 | INFO    | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
09:59:57.618 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.42.1.9:8265
09:59:57.890 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
09:59:59.292 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-0' for execution.
09:59:59.318 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
09:59:59.326 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-2' for execution.
09:59:59.409 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
09:59:59.421 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-1' for execution.
09:59:59.462 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
09:59:59.476 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
09:59:59.527 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
09:59:59.539 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
09:59:59.623 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
09:59:59.636 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
09:59:59.733 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
09:59:59.745 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
09:59:59.839 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
09:59:59.850 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-3' for execution.
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
10:00:05.415 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.443 | INFO    | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.469 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.500 | INFO    | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.528 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.554 | INFO    | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.579 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.607 | INFO    | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:06.767 | ERROR   | Flow run 'glittering-bull' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/compute/code/orion-demo/flows/ray_flow.py", line 49, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 238, in create_then_begin_flow_run
    return state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 157, in result
    state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 46, in from_ray_exception
    return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 2 required keyword-only arguments: 'request' and 'response'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
    obj = self._deserialize_object(data, metadata, object_ref)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 264, in _deserialize_object
    return RayError.from_bytes(obj)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 40, in from_bytes
    return RayError.from_ray_exception(ray_exception)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 49, in from_ray_exception
    raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception
from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
import sys

@task
def say_hello(name: str) -> None:
    print(f"say_hello {name}", file=sys.stderr)
    logger = get_run_logger()
    logger.info(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")


# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        address="ray://10.97.36.20:10001",
        init_kwargs={
            "runtime_env": {
                "pip": ["prefect==2.3.2"],
                "env_vars": {
                    "PREFECT_API_URL": "xxx",
                    "PREFECT_API_KEY": "yyy"
                }
            }
        },
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        # tasks must be submitted to run on ray
        # if called without .submit() they are still tracked but
        # run immediately and locally rather than async on ray
        say_hello.submit(name)
        say_goodbye.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

prefect 2.3.2
ray 2.0.0

Prefect-Ray spins up workers before the inputs for the worker tasks are ready / before the task's dependencies are complete

Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future x = task_1.submit() , if we pass that future as an input to another task, y = task_2.submit(x=x), Ray will spin up a worker for task_2 with the requested resources, and this worker will wait on x.result().

This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ..., all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.

Is this known behavior / does anyone know how to solve this?

Here's a working example, using "prefect==2.9.0", "prefect-ray==0.2.4", "ray[default]==2.2.0"

from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time

@task
def task_a():
    time.sleep(100)
    return 'a'

@task
def task_b():
    return 'b'

@flow(
    task_runner=RayTaskRunner(
        address="ray://ray-cluster-kuberay-head-svc:10001"
    )
)
def multi_test():
    with remote_options(num_cpus=1, num_gpus=1):
        x = task_a.submit()
    for i in range(10):
        with remote_options(num_cpus=1, num_gpus=1):
            task_b.submit(wait_for=[x])

if __name__ == '__main__':
    print(multi_test())

When run, we immediately get 11 workers created, per the image.
image

TypeError upon completion

Unsure why this is crashing at the end. @desertaxle @madkinsz thoughts?

import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(3)

# outputs
#0
#2
#1
(begin_task_run pid=67366) #1
(begin_task_run pid=67368) #0
(begin_task_run pid=67371) #2
Output exceeds the [size limit](command:workbench.action.openSettings?[). Open the full output data [in a text editor](command:workbench.action.openLargeOutput?3eba7a16-fd78-43e7-8929-666909e5d171)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb Cell 2 in <cell line: 16>()
     [14](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=13)         shout.submit(number)
     [16](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=15) if __name__ == "__main__":
---> [17](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=16)     count_to(3)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/flows.py:367, in Flow.__call__(self, *args, **kwargs)
    364 # Convert the call args/kwargs to a parameter dict
    365 parameters = get_call_parameters(self.fn, args, kwargs)
--> 367 return enter_flow_run_engine_from_flow_call(
    368     self, parameters, return_type="result"
    369 )

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:131, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
    127 if in_async_main_thread():
    128     # An event loop is already running and we must create a blocking portal to
    129     # run async code from this synchronous context
    130     with start_blocking_portal() as portal:
--> 131         return portal.call(begin_run)
    132 else:
    133     # An event loop is not running so we will create one
    134     return anyio.run(begin_run)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/client.py:107, in inject_client.<locals>.with_injected_client(*args, **kwargs)
    105 async with client_context as client:
    106     kwargs.setdefault("client", client)
--> 107     return await fn(*args, **kwargs)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:203, in create_then_begin_flow_run(flow, parameters, return_type, client)
    199     engine_logger.info(
    200         f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed."
    201     )
    202 else:
--> 203     state = await begin_flow_run(
    204         flow=flow, flow_run=flow_run, parameters=parameters, client=client
    205     )
    207 if return_type == "state":
    208     return state

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:332, in begin_flow_run(flow, flow_run, parameters, client)
    329         result_storage = TempStorageBlock()
    330     flow_run_context.result_storage = result_storage
--> 332     terminal_state = await orchestrate_flow_run(
    333         flow,
    334         flow_run=flow_run,
    335         parameters=parameters,
    336         client=client,
    337         partial_flow_run_context=flow_run_context,
    338     )
    340 # If debugging, use the more complete `repr` than the usual `str` description
    341 display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)

File ~/mambaforge/envs/ray/lib/python3.9/contextlib.py:670, in AsyncExitStack.__aexit__(self, *exc_details)
    666 try:
    667     # bare "raise exc_details[1]" replaces our carefully
    668     # set-up context
    669     fixed_ctx = exc_details[1].__context__
--> 670     raise exc_details[1]
...
--> 651         cb_suppress = cb(*exc_details)
    652     else:
    653         cb_suppress = await cb(*exc_details)

TypeError: 'dict' object is not callable

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.