Code Monkey home page Code Monkey logo

prefect-ray's Introduction

prefect-ray

PyPI

Welcome!

Visit the full docs here to see additional examples and the API reference.

prefect-ray contains Prefect integrations with the Ray execution framework, a flexible distributed computing framework for Python.

Provides a RayTaskRunner that enables Prefect flows to run tasks execute tasks in parallel using Ray.

Getting Started

Python setup

Requires an installation of Python 3.8 or newer.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Installation

Install prefect-ray with pip:

pip install prefect-ray

Users running Apple Silicon (such as M1 macs) will need to install ray in a miniforge conda environment. Click here for more details.

Running tasks on Ray

The RayTaskRunner is a Prefect task runner that submits tasks to Ray for parallel execution.

By default, a temporary Ray instance is created for the duration of the flow run.

For example, this flow counts to 3 in parallel.

import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9

If you already have a Ray instance running, you can provide the connection URL via an address argument.

To configure your flow to use the RayTaskRunner:

  1. Make sure the prefect-ray collection is installed as described earlier: pip install prefect-ray.
  2. In your flow code, import RayTaskRunner from prefect_ray.task_runners.
  3. Assign it as the task runner when the flow is defined using the task_runner=RayTaskRunner argument.

For example, this flow uses the RayTaskRunner with a local, temporary Ray instance created by Prefect at flow run time.

from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ... 

This flow uses the RayTaskRunner configured to access an existing Ray instance at ray://192.0.2.255:8786.

from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786"))
def my_flow():
    ... 

RayTaskRunner accepts the following optional parameters:

Parameter Description
address Address of a currently running Ray instance, starting with the ray:// URI.
init_kwargs Additional kwargs to use when calling ray.init.

Note that Ray Client uses the ray:// URI to indicate the address of a Ray instance. If you don't provide the address of a Ray instance, Prefect creates a temporary instance automatically.

!!! warning "Ray environment limitations" Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from pip alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with conda. See the Ray documentation for instructions.

See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.

Running tasks on a Ray remote cluster

When using the RayTaskRunner with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:

  1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs.

We recommend using the Prefect UI to configure a storage block to use for remote results storage.

Here's an example of a flow that uses caching and remote result storage:

from typing import List

from prefect import flow, get_run_logger, task
from prefect.filesystems import S3
from prefect.tasks import task_input_hash
from prefect_ray.task_runners import RayTaskRunner


# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
    logger = get_run_logger()
    # This log statement will print only on the first run. Subsequent runs will be cached.
    logger.info(f"hello {name}!")
    return name


@flow(
    task_runner=RayTaskRunner(
        address="ray://<instance_public_ip_address>:10001",
    ),
    # Using an S3 block that has already been created via the Prefect UI
    result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
  1. If you get an error stating that the module 'prefect' cannot be found, ensure prefect is installed on the remote cluster, with:
pip install prefect
  1. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on both local and remote machines. The required prerequisite modules can be found in the Prefect documentation. For example, if using S3 for the remote storage:
pip install s3fs
  1. If you are seeing timeout or other connection errors, double check the address provided to the RayTaskRunner. The address should look similar to: address='ray://<head_node_ip_address>:10001':
RayTaskRunner(address="ray://1.23.199.255:10001")

Specifying remote options

The remote_options context can be used to control the task’s remote options.

For example, we can set the number of CPUs and GPUs to use for the process task:

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42)

Resources

If you encounter and bugs while using prefect-ray, feel free to open an issue in the prefect-ray repository.

If you have any questions or issues while using prefect-ray, you can find help in the Prefect Slack community.

Feel free to star or watch prefect-ray for updates, too!

Development

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-ray, please propose changes through a pull request from a fork of the repository.

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
 pip install -e ".[dev]"

  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
 pre-commit install
  1. git commit, git push, and create a pull request

prefect-ray's People

Contributors

ahuang11 avatar anticorrelator avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar j-tr avatar pcmoritz avatar prefect-collection-synchronizer[bot] avatar rpeden avatar toro-berlin avatar tpdorsey avatar urimandujano avatar zanieb avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-ray's Issues

Return object references from tasks?

Currently, our main flow script (outside of tasks) always runs on the head node. In some cases, we'd like workers to return Ray object references to larger objects to avoid high memory utilization in the head node. Unfortunately, it appears that prefect-ray automatically retrieves objects from the object store when you return an object reference, and I haven't been able to find a way to bypass that and return the object ID itself back from a task on a worker to the flow on the head node.

Is this behavior supported? Otherwise it seems like I need to make sure my head node has enough resources to support all of the flows we're running in parallel, which is not ideal.

Expectation / Proposal

In Ray, you can ray.put() a value and use the returned object reference to ray.get() it later. This really helps avoid keeping a ton of data in RAM solely to pass it between tasks - instead we use the object store as a shared data repository.

We'd like to be able to ray.put some larger data structures (pandas dataframes, etc) into the object store by hand and then return object references. Currently, object references returned from a task to the main flow are automatically retrieved, forcing the flow to keep this data in local memory.

Traceback / Example

This is contrived - I don't currently have a min viable example but can put that together if the issue is unclear.

@prefect.task
def read_records():
   df = python_function_that_loads_pandas_dataframe()

   data_ref = ray.put(df)
   return data_ref

@prefect.flow(task_runner=RayTaskRunner(address="my_address"))
def primary_process():
   with remote_options(num_cpus=1, memory=4_000_000):
      records_ref = read_records.submit().result()

      # Currently `records_ref` is a pandas DataFrame, but I want a ray object ref

FileNotFoundError errors when running with a remote ray cluster

When running a flow from my laptop against a remote ray cluster, prefect tries to reference directories that only exist on my laptop (eg: /Users/tekumara/.prefect/storage):

$ python flows/ray_flow.py
20:38:31.738 | INFO    | prefect.engine - Created flow run 'fiery-copperhead' for flow 'greetings'
20:38:31.738 | INFO    | Flow run 'fiery-copperhead' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
20:38:31.790 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
20:38:35.568 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
20:38:35.568 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.136.205.104:8265
20:38:37.838 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
20:38:39.172 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-0' for execution.
20:38:39.477 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
20:38:39.531 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
20:38:39.838 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
20:38:39.887 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-1' for execution.
20:38:40.169 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
20:38:40.205 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
(get_dashboard_url pid=385) /home/ray/anaconda3/lib/python3.9/site-packages/paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
(get_dashboard_url pid=385)   "class": algorithms.Blowfish,
20:38:40.504 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
20:38:40.540 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-2' for execution.
20:38:40.826 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
20:38:40.862 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
(begin_task_run pid=385) 03:38:41.096 | INFO    | Task run 'say_hello-811087cd-0' - hello arthur
20:38:41.165 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
20:38:41.207 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-3' for execution.
20:38:41.510 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
20:38:41.551 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
(begin_task_run pid=385) 03:38:41.605 | INFO    | Task run 'say_hello-811087cd-1' - hello trillian
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:42.104 | INFO    | Task run 'say_goodbye-261e56a8-3' - goodbye marvin
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:42.593 | INFO    | Task run 'say_goodbye-261e56a8-2' - goodbye ford
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:43.139 | INFO    | Task run 'say_goodbye-261e56a8-1' - goodbye trillian
(begin_task_run pid=385) 03:38:43.635 | INFO    | Task run 'say_hello-811087cd-3' - hello marvin
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:44.126 | INFO    | Task run 'say_goodbye-261e56a8-0' - goodbye arthur
20:38:44.644 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
(begin_task_run pid=385) 03:38:44.623 | INFO    | Task run 'say_hello-811087cd-2' - hello ford
20:38:44.936 | INFO    | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.279 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.561 | INFO    | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.845 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.193 | INFO    | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.472 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.751 | INFO    | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:48.741 | ERROR   | Flow run 'fiery-copperhead' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
  File "/Users/tekumara/code/orion-demo/flows/ray_flow.py", line 38, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 367, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/tekumara/.pyenv/versions/3.9.13/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/tekumara/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 226, in create_then_begin_flow_run
    return state.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
    state.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=385, ip=10.136.205.104)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara/.prefect/storage'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara/.prefect'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/home/ray/anaconda3/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/ray/anaconda3/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
    return await func(*args)
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/engine.py", line 944, in begin_task_run
    return await orchestrate_task_run(
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/engine.py", line 1081, in orchestrate_task_run
    await _persist_serialized_result(
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/filesystems.py", line 94, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'

flows/ray_flow.py:

from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner


@task
def say_hello(name: str) -> None:
    logger = get_run_logger()
    logger.info(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")


# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        # 127.0.0.1:10001 is port-forwarded to the remote ray cluster
        address="ray://127.0.0.1:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect==2.0b12"]}},
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)
        say_goodbye.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

prefect 2.0b12

AWS EC2 Spot Interrupt fails entire flow

Expectation / Proposal

Prefect-Ray can run tasks on Ray clusters that utilize AWS EC2 spot instances.

Current behavior

Flows run with prefect-ray on EC2 spot clusters will immediately fail the whole flow as soon as one spot-type worker note receives a spot-interrupt.

Minimal Reproducible Example

@task
def wait_for_an_eternity():
    logger = get_run_logger()

    logger.info(f"STARTING TASK: {datetime.now()}")
    sleep(180000)  # seconds = 50h
    logger.info(f"STOPPING TASK: {datetime.now()}")


@flow(
    name="EC2-Spot-Interrupt-Lab",
    task_runner=RayTaskRunner(address="cluster-with-ec2-spot-workers"),
)
def main():
    logger = get_run_logger()
    logger.info(f"start")

    wait_for_an_eternity.submit()

Steps to reproduce:

  1. Setup Ray cluster that does not run tasks on the head node and consists of only EC2 spot worker nodes
  2. Run flow above
  3. Either wait for a worker node to get a spot interrupt notice (and subsequentially "stolen") or manually trigger a failed worker node in the cluster by terminating the worker node EC2 instance

Result:

  • Task run '...' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
  • Task is still in state RUNNING
  • Flow is immediately in state FAILED

Business perspective

Running workloads on EC2 spot instances significantly reduces computation costs compared to on-demand instances. Especially for our data processing workloads with little to no side effects, rerunning a task in case of a spot instance termination is no problem from an application point of view. Ray as our distributed compute framework is already capable of provisioning replacement instances in case of spot instance termination and rerunning the interrupted task.
Therefore, we need to run our processing with Prefect and Ray on EC2 Spot Instances. This issue blocks us from using EC2 spot and crashes every workload that runs for an extended time.
This issue was discovered after rolling out the solution to production pipelines.

Our error analysis

Prefect's PreventRedundantTransitions orchestration policy prevents a repeated task execution as the task remains in status Running
when the instance is terminated, and a new replacement instance will try to start the task from scratch, including proposing the state Running again, which leads to an abort of the state transition and fail of the task. This effectively makes it impossible to leverage Ray's retry mechanism and use spot instances, which has direct cost implications.

Our Initial Ideas for potential mitigation

Possible solutions that would enable using spot instances and automatic retries on termination could include:

  1. An adaptation of the PreventRedundantTransitions policy. The intended use case for this policy is to prevent the flow from progressing backwards or multiple agents picking up the same task. The policy could be potentially adapted to be aware of the origin of the transition request and hand this specific case separately.
  2. Gracefully handling instance termination. The application or orchestration code could react to a spot instance termination notice which is usually published 2 minutes before the instance actually terminates and transition the task into a state that can that can later transition again into Running.
    1. This could potentially be modeled as transitioning to Paused and then transitioning the same task to Running again
    2. or as a transition to Failed/Crashed and some shutdown logic that submits an entirely new task for the rerun.

First class results error using remote ray

I checked out PrefectHQ/prefect#6908 branch on both local and remote machine, then ran this code (if address is omitted, it works). Any ideas @madkinsz?

from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
from prefect.settings import temporary_settings, PREFECT_LOCAL_STORAGE_PATH

# from prefect_dask.task_runners import DaskTaskRunner


@task
def say_hello(name: str) -> None:
    logger = get_run_logger()
    logger.info(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")


# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        # 127.0.0.1:10001 is port-forwarded to the remote ray cluster
        address="ray://myaddress",
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

Results in:

15:29:39.838 | INFO    | prefect.engine - Created flow run 'wakeful-condor' for flow 'greetings'
15:29:39.839 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://3.95.199.184:10001
15:29:43.498 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
15:29:44.509 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
15:29:46.272 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-0' for execution.
15:29:46.366 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
15:29:46.458 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-2' for execution.
15:29:46.461 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
15:29:46.553 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-3' for execution.
15:29:46.554 | INFO    | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
15:29:46.647 | INFO    | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-1' for execution.
15:29:46.728 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:46.849 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.005 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.134 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:48.747 | ERROR   | Flow run 'wakeful-condor' - Finished in state Failed('4/4 states failed.')
Traceback (most recent call last):
  File "/Users/andrew/test2.py", line 40, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/Users/andrew/Applications/python/prefect/src/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/orion.py", line 86, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 232, in create_then_begin_flow_run
    return state.result()
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 116, in result
    state.result()
  File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 102, in result
    raise data
TypeError: missing a required argument: 'result_factory'

Flows with many tasks unable to complete

Expectation / Proposal

With the example code below, only 5 tasks can scheduled by Ray at any time, and each task will take between 45-135 seconds. We schedule 15,000 tasks, and although each task does not use much memory (and only 5 can run at any given time), eventually Ray stops being able to schedule any tasks, so the flow run just hangs.
Ray error:

Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.

The flow completes normally for lower num_map_keys, e.g. 100.
It also completes normally with num_map_keys=15000 using the default concurrent task runner.

https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html gives a possible resolution to this; prefect should expose the maximum number of pending Ray tasks as a configurable variable.

Traceback / Example

from time import sleep

import numpy as np
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner


@task
def a(n: int, t: int, jitter_pct: float) -> float:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task a with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return n**2

@task
def b(n: float, t: int, jitter_pct: float) -> float:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task b with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return n**0.5


@task
def c(n: float, t: int, jitter_pct: float) -> int:
    sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
    print(f"task c with {n=}, sleeping {sleep_time}s")
    sleep(sleep_time)
    return int(n*2)


@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 5}))
def many_map_tasks(num_map_keys: int = 15000, t: int = 90, jitter_pct: float = 0.5):
    squares = a.map(range(num_map_keys), t, jitter_pct)
    roots = b.map(squares, t, jitter_pct)
    doubles = c.map(roots, t, jitter_pct)
    return [f.result() for f in doubles]


if __name__ == "__main__":
    print(many_map_tasks())

Flow crashes after multiple hanged tasks with "Prefect Task run TASK_RUN_ID already finished"

Prefect Version

Version: 2.6.7
API version: 0.8.3
Python version: 3.8.9
Git commit: 9074c158
Built: Thu, Nov 10, 2022 1:07 PM
OS/Arch: linux/x86_64
Profile: cloud
Server type: cloud

Prefect Ray Version

0.2.1

Ray Version

2.1.0

Describe the current behavior

I'm running a Ray cluster in azure (cluster yaml attached below). I created a small flow, using the Azure Container Instance Job infrastructure block, that looks like this:

@task
async def my_task(id: int):
    logger = get_run_logger()
    logger.info(f"Hello from my_task #{id}! Sleeping for 30 seconds...")
    time.sleep(30)
    logger.info(f"my_task #{id} done!")

@flow(
    name=FLOW_NAME,
    task_runner=RayTaskRunner(address="ray://10.0.0.4:10001")
)
def my_flow(loglevel: str = "DEBUG"):
    logconfig = setup_logging.submit(loglevel=loglevel)
    logger = get_run_logger()
    logger.info("Starting")
    # Spam ray tasks here!
    for i in range(50):
        my_task.submit(i + 1, wait_for=[logconfig])
        time.sleep(5)

After running this flow for about 5 minutes, I'm starting to see a lot of tasks that are stuck as 'running' indefinitely. Furthermore, these tasks are printing the following message to the log:

Hello from my_task #42! Sleeping for 30 seconds...                    05:15:10 PM
Task run 'b4a5a5f5-1d72-4146-9476-cf157e686df3' already finished.     05:15:39 PM

After seeing a handful of tasks exhibiting this behavior, the flow promptly crashes with no further error message. The agent logs the following:

16:11:52.674 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Running command...                   
16:11:52.676 | INFO    | prefect.agent - Completed submission of flow run '87eba3cb-f09f-4b21-90f5-e0d7923801e4'                                          
16:16:55.294 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Completed command run.               
16:16:55.296 | INFO    | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Deleting container...    

The ray cluster doesn't log anything of note, that I can tell. I'm not entirely sure what's happening here, but as per this discussion: https://discourse.prefect.io/t/edgecase-error-prefect-task-run-task-run-id-already-finished/1836 I'm not the only person affected by this.

Describe the proposed behavior

Of course the tasks shouldn't hang with "task already finished", they should either exit properly and crash, or continue running properly.

Ray Dockerfile

Ray Dockerfile

FROM rayproject/ray:2.1.0-py38-cpu

# Don't buffer console output
ENV PYTHONUNBUFFERED=1
# Don't compile bytecode, we're only running once
ENV PYTHONDONTWRITEBYTECODE=1
ENV CODE_PATH=/home/ray/anaconda3/lib/python3.8/site-packages/

COPY ./pipeline/requirements.txt ${CODE_PATH}/pipeline/requirements.txt
RUN pip install pip --upgrade && \
    pip install -r ${CODE_PATH}/pipeline/requirements.txt

COPY ./logging.yml ${CODE_PATH}/logging.yml
COPY ./flow_audit ${CODE_PATH}/flow_audit
COPY ./orchestration ${CODE_PATH}/orchestration
COPY ./pipeline ${CODE_PATH}/pipeline
Ray Cluster YAML

Ray Cluster YAML

Please ignore the comments here, they're mostly from the default cluster YAML. We run a custom docker image as seen below from censoredregistry.azurecr.io/ray-cluster:latest (not the actual URL). The Dockerfile is attached above.

# An unique identifier for the head node and workers of this cluster.
cluster_name: test

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 25

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 100.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty object means disabled.
docker:
    # image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    image: censoredregistry.azurecr.io/ray-cluster:latest # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options:   # Extra options to pass into "docker run"
        - --ulimit nofile=65536:65536

    # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"
    # Allow Ray to automatically detect GPUs

    # worker_image: "rayproject/ray-ml:latest-cpu"
    # worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 30 # TODO: Should probably be something like 5 minutes

# Cloud-provider specific configuration.
provider:
    type: azure
    # https://azure.microsoft.com/en-us/global-infrastructure/locations
    location: westeurope
    cache_stopped_nodes: False
    resource_group: rg-ray-test
    # Below settings only applicable for AWS, not Azure.
    # security_group:
    #     GroupName: ray-nsg
    #     IpPermissions:
    #           - FromPort: 10001
    #             ToPort: 10001
    #             IpProtocol: TCP
    #             IpRanges:
    #                 # This will enable inbound access from ALL IPv4 addresses.
    #                 - CidrIp: 194.218.21.196
    # set subscription id otherwise the default from az cli will be used
    subscription_id: CENSORED_SUBSCRIPTION_ID
    # set unique subnet mask or a random mask will be used
    subnet_mask: 10.0.0.0/24
    # set unique id for resources in this cluster
    # if not set a default id will be generated based on the resource group and cluster name
    unique_id: test

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ray
    # you must specify paths to matching private and public key pair files
    # use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
    ssh_private_key: ~/.ssh/ray-test
    # changes to this should match what is specified in file_mounts
    ssh_public_key: ~/.ssh/ray-test.pub

# More specific customization to node configurations can be made using the ARM template azure-vm-template.json file
# See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines
# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs
# on the head node, so changes to the template must be included in the wheel file used in setup_commands section below

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The resources provided by this node type.
        resources: {"CPU": 0}
        # Provider-specific config, e.g. instance type.
        node_config:
            azure_arm_parameters:
                vmSize: Standard_D2s_v3
                # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
                imagePublisher: microsoft-dsvm
                imageOffer: ubuntu-1804
                imageSku: 1804-gen2
                imageVersion: latest

    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 0
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 25
        # The resources provided by this node type.
        resources: {"CPU": 2}
        # Provider-specific config, e.g. instance type.
        node_config:
            azure_arm_parameters:
                vmSize: Standard_D2s_v3
                # List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
                imagePublisher: microsoft-dsvm
                imageOffer: ubuntu-1804
                imageSku: 1804-gen2
                imageVersion: latest
                # optionally set priority to use Spot instances
                # priority: Spot
                # set a maximum price for spot instances if desired
                # billingProfile:
                #     maxPrice: -1

# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
     "~/.ssh/ray-test.pub": "~/.ssh/ray-test.pub"
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
    # add /home/ray to path
    - export PATH=$PATH:/home/ray
    # enable docker setup
    - sudo usermod -aG docker $USER || true
    - sleep 10  # delay to avoid docker permission denied errors
    # get rid of annoying Ubuntu message
    - touch ~/.sudo_as_admin_successful
    # Wait for auto upgrade that might run in the background.
    - bash -c $'ps -e | grep apt | awk \'{print $1}\' | xargs tail -f --pid || true'
    # Log in to azure container registry (by ray-msi-test)
    - az login --identity
    - docker login censoredregistry.azurecr.io -u censoreduser -p censoredpassword # Exactly how the password is retrieved is censored.

# List of shell commands to run to set up nodes.
# NOTE: rayproject/ray-ml:latest has ray latest bundled
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
# NOTE: rayproject/ray-ml:latest has azure packages bundled
head_setup_commands: []
    # - pip install -U azure-cli-core==2.22.0 azure-mgmt-compute==14.0.0 azure-mgmt-msi==1.0.0 azure-mgmt-network==10.2.0 azure-mgmt-resource==13.0.0

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - export AUTOSCALER_MAX_NUM_FAILURES=inf;
    - ray stop
    - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076

Flow code executes before dependencies present on Ray cluster

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

In a dependent deployment pattern using a multi-node ray cluster, some flows where the code seems to execute before all dependencies are there. The parent flow has about 20 subflows all using the same code … about ½ fail the other half work. This feels like a race condition somehow. If those extracts were rerun then they would usually pass.

Adding a sleep before executing the code nearly entirely reduces the error

Storage: GCS Block
Infrastructure: Process
Stacktrace of ModuleNotFoundError

Reproduction

futures = []
for table in tables:
    parameters = {
        'table': table,
        'extract_date': extract_date.strftime("%Y-%m-%d"),
        'dest_bucket': dest_bucket,
        'config_path': config_path,
        'path': path
    }

    futures.append(run_deployment(
        name=billing_extract_deployment,
        flow_run_name='{}_{}'.format(table,extract_date.strftime("%d%m%Y")),
        parameters=parameters))

# Wait for all sub flows to complete.
summaries = []
for index, future in enumerate(await asyncio.gather(*futures)):

Error

Encountered exception during execution:

Traceback (most recent call last):

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/engine.py", line 673, in orchestrate_flow_run

    result = await flow_call.aresult()

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult

    return await asyncio.wrap_future(self.future)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync

    result = self.fn(*self.args, **self.kwargs)

  File "flows/delta_extract.py", line 112, in flow

    current_source_table = load_current_source_table.submit(

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/client/schemas.py", line 107, in result

    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/states.py", line 76, in get_state_result

    return _get_state_result(state, raise_on_failure=raise_on_failure)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper

    return call()

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__

    return self.result()

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result

    return self.future.result(timeout=timeout)

  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 433, in result

    return self.__get_result()

  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 389, in __get_result

    raise self._exception

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async

    result = await coro

  File "/home/ssm-user/.local/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result

    raise await get_state_exception(state)

ray.exceptions.RaySystemError: System error: No module named 'oci'

traceback: Traceback (most recent call last):

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 369, in deserialize_objects

    obj = self._deserialize_object(data, metadata, object_ref)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 252, in _deserialize_object

    return self._deserialize_msgpack_data(data, metadata_fields)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 207, in _deserialize_msgpack_data

   python_objects = self._deserialize_pickle5_data(pickle5_data)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 197, in _deserialize_pickle5_data

    obj = pickle.loads(in_band)

ModuleNotFoundError: No module named 'oci'

03:30:19 AM

prefect.flow_runs

INFO

Crash detected! Execution was interrupted by an unexpected exception: ray.exceptions.RaySystemError: System error: No module named 'oci'

traceback: Traceback (most recent call last):

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 369, in deserialize_objects

    obj = self._deserialize_object(data, metadata, object_ref)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 252, in _deserialize_object

    return self._deserialize_msgpack_data(data, metadata_fields)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 207, in _deserialize_msgpack_data

    python_objects = self._deserialize_pickle5_data(pickle5_data)

  File "/home/ssm-user/.local/lib/python3.9/site-packages/ray/_private/serialization.py", line 197, in _deserialize_pickle5_data

    obj = pickle.loads(in_band)

ModuleNotFoundError: No module named 'oci'

Versions

2.9.0

Additional context

No response

`timeout_seconds` not enforced with `RayTaskRunner`

Expectation / Proposal

timeout_seconds should be enforced just like in normal tasks / flows

Traceback / Example

with the following example, we'd expect to fail after 3 seconds, but we sleep for the entire 1e3 seconds

from time import sleep

from prefect_ray import RayTaskRunner

from prefect import flow, task

count = 0


@task(retries=1, retry_delay_seconds=10, timeout_seconds=3)
def sleep_task():
    print("Sleeping")
    global count
    count += 1
    print(f"incremented count to {count}")
    sleep(1e3)


@flow(task_runner=RayTaskRunner(), log_prints=True)
def sleepy():
    sleep_task.submit()


if __name__ == "__main__":
    try:
        sleepy()
    except Exception as exc:
        print(exc)
        print(count)

How to properly execute flows with cpu/gpu requirements

We were wondering how to specify that a prefect flow (or even tasks) should only run on instances which fit to the resource requirements. What is the annotation to manage this on a flow/task level?
Basically we would like to use the @ray.remote(num_gpus=1) annotation of ray.

Scenario: We have a Ray-Cluster up and running which has cpu and gpu worker nodes. We now f.ex. would like to execute a flow only on the gpu nodes and have them then scale out accordingly.

We tested a bit around with num_gpus, request_cpus etc. but did not have success yet.

Here some example code:

from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
import time


RayRunner = RayTaskRunner(
    address="auto",
    init_kwargs={
        "num_gpus": 2,
    }
)


@flow(name="time-wait-flow", task_runner=RayTaskRunner)
def time_wait_flow_gpu():
    factor = 50
    times = [5.0, 16.3, 25.5]
    for i in range(0, factor):
        for time_in_secs in times:
            wait_time.submit(time_in_secs)


@task
def wait_time(time_in_secs):
    time.sleep(time_in_secs)


if __name__ == "__main__":
    time_wait_flow_gpu()

prefect worker start with resources ?

usually I would start a ray worker node with this bash command :

ray start --address=ray_head:${REDISPORT} --resources='{\"my_custom_resource\":10}' --block"

now I am to use prefect-ray and understand I need to start workers like this :

prefect worker start --pool 'my-own-pool' --work-queue 'default'

how should I let this worker know it is has "my_custom_resource"=10 so that Prefect flow executes on this worker when my_custom_resource is needed ?

Expectation / Proposal

Traceback / Example

TypeError upon completion

Unsure why this is crashing at the end. @desertaxle @madkinsz thoughts?

import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(3)

# outputs
#0
#2
#1
(begin_task_run pid=67366) #1
(begin_task_run pid=67368) #0
(begin_task_run pid=67371) #2
Output exceeds the [size limit](command:workbench.action.openSettings?[). Open the full output data [in a text editor](command:workbench.action.openLargeOutput?3eba7a16-fd78-43e7-8929-666909e5d171)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb Cell 2 in <cell line: 16>()
     [14](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=13)         shout.submit(number)
     [16](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=15) if __name__ == "__main__":
---> [17](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=16)     count_to(3)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/flows.py:367, in Flow.__call__(self, *args, **kwargs)
    364 # Convert the call args/kwargs to a parameter dict
    365 parameters = get_call_parameters(self.fn, args, kwargs)
--> 367 return enter_flow_run_engine_from_flow_call(
    368     self, parameters, return_type="result"
    369 )

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:131, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
    127 if in_async_main_thread():
    128     # An event loop is already running and we must create a blocking portal to
    129     # run async code from this synchronous context
    130     with start_blocking_portal() as portal:
--> 131         return portal.call(begin_run)
    132 else:
    133     # An event loop is not running so we will create one
    134     return anyio.run(begin_run)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/client.py:107, in inject_client.<locals>.with_injected_client(*args, **kwargs)
    105 async with client_context as client:
    106     kwargs.setdefault("client", client)
--> 107     return await fn(*args, **kwargs)

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:203, in create_then_begin_flow_run(flow, parameters, return_type, client)
    199     engine_logger.info(
    200         f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed."
    201     )
    202 else:
--> 203     state = await begin_flow_run(
    204         flow=flow, flow_run=flow_run, parameters=parameters, client=client
    205     )
    207 if return_type == "state":
    208     return state

File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:332, in begin_flow_run(flow, flow_run, parameters, client)
    329         result_storage = TempStorageBlock()
    330     flow_run_context.result_storage = result_storage
--> 332     terminal_state = await orchestrate_flow_run(
    333         flow,
    334         flow_run=flow_run,
    335         parameters=parameters,
    336         client=client,
    337         partial_flow_run_context=flow_run_context,
    338     )
    340 # If debugging, use the more complete `repr` than the usual `str` description
    341 display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)

File ~/mambaforge/envs/ray/lib/python3.9/contextlib.py:670, in AsyncExitStack.__aexit__(self, *exc_details)
    666 try:
    667     # bare "raise exc_details[1]" replaces our carefully
    668     # set-up context
    669     fixed_ctx = exc_details[1].__context__
--> 670     raise exc_details[1]
...
--> 651         cb_suppress = cb(*exc_details)
    652     else:
    653         cb_suppress = await cb(*exc_details)

TypeError: 'dict' object is not callable

Failed to unpickle serialized exception when running on remote cluster

09:59:53.784 | INFO    | prefect.engine - Created flow run 'glittering-bull' for flow 'greetings'
09:59:53.784 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
09:59:57.618 | INFO    | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
09:59:57.618 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.42.1.9:8265
09:59:57.890 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
09:59:59.292 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-0' for execution.
09:59:59.318 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
09:59:59.326 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-2' for execution.
09:59:59.409 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
09:59:59.421 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-1' for execution.
09:59:59.462 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
09:59:59.476 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
09:59:59.527 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
09:59:59.539 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
09:59:59.623 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
09:59:59.636 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
09:59:59.733 | INFO    | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
09:59:59.745 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
09:59:59.839 | INFO    | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
09:59:59.850 | INFO    | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-3' for execution.
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
10:00:05.415 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.443 | INFO    | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.469 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.500 | INFO    | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.528 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.554 | INFO    | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.579 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.607 | INFO    | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:06.767 | ERROR   | Flow run 'glittering-bull' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/compute/code/orion-demo/flows/ray_flow.py", line 49, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 238, in create_then_begin_flow_run
    return state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 157, in result
    state.result()
  File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 143, in result
    raise data
ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 46, in from_ray_exception
    return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 2 required keyword-only arguments: 'request' and 'response'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
    obj = self._deserialize_object(data, metadata, object_ref)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 264, in _deserialize_object
    return RayError.from_bytes(obj)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 40, in from_bytes
    return RayError.from_ray_exception(ray_exception)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 49, in from_ray_exception
    raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception
from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
import sys

@task
def say_hello(name: str) -> None:
    print(f"say_hello {name}", file=sys.stderr)
    logger = get_run_logger()
    logger.info(f"hello {name}")


@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")


# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        address="ray://10.97.36.20:10001",
        init_kwargs={
            "runtime_env": {
                "pip": ["prefect==2.3.2"],
                "env_vars": {
                    "PREFECT_API_URL": "xxx",
                    "PREFECT_API_KEY": "yyy"
                }
            }
        },
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        # tasks must be submitted to run on ray
        # if called without .submit() they are still tracked but
        # run immediately and locally rather than async on ray
        say_hello.submit(name)
        say_goodbye.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

prefect 2.3.2
ray 2.0.0

Sometimes log output is missing

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner


@task
def count(number):
    with open(f"{number}.txt", "w") as f:
        f.write(str(number))
    print(number)

@flow(task_runner=RayTaskRunner())
def count_to(highest_number):
    for number in range(1, highest_number + 1):
        count.with_options(name=f"{number}").submit(number)


if __name__ == "__main__":
    count_to(3)

Below, it doesn't state 2-464db843-1 is completed; only 1 and 3.

13:34:57.620 | INFO    | prefect.engine - Created flow run 'adept-jackrabbit' for flow 'count-to'
13:34:57.620 | INFO    | Flow run 'adept-jackrabbit' - Using task runner 'RayTaskRunner'
13:34:57.620 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
13:35:02.659 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
13:35:02.671 | WARNING | Flow run 'adept-jackrabbit' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:35:02.712 | INFO    | Flow run 'adept-jackrabbit' - Created task run '1-464db843-0' for task '1'
13:35:02.789 | INFO    | Flow run 'adept-jackrabbit' - Created task run '2-464db843-1' for task '2'
13:35:02.903 | INFO    | Flow run 'adept-jackrabbit' - Created task run '3-464db843-2' for task '3'
(begin_task_run pid=97109) 1
(begin_task_run pid=97108) 3
(begin_task_run pid=97109) 13:35:09.111 | INFO    | Task run '1-464db843-0' - Finished in state Completed()
(begin_task_run pid=97108) 13:35:09.097 | INFO    | Task run '3-464db843-2' - Finished in state Completed()
(begin_task_run pid=97107) 2
13:35:12.537 | INFO    | Flow run 'adept-jackrabbit' - Finished in state Completed('All states completed.')

Subflows with RayTaskManager

Prefect 2.0 supports subflows which is a great new addition. There is a bug when using nested flows with the same RayTaskManager(), in essence it crashes complaining about calling ray.init() twice.

It would be nice if this workcase would be supported since it would allow combining different parallel flows.

To reproduce, use:

from prefect import task, flow
from prefect_ray import RayTaskRunner


@task
def some_task(input):
    return input


@flow(task_runner=RayTaskRunner())
def my_subflow():
    some_task.submit(1)
    some_task.submit(2)
    some_task.submit(3)


@flow(task_runner=RayTaskRunner())
def some_flow():
    my_subflow()
    some_task.submit(4)


some_flow()

The error is:

09:15:38.789 | INFO    | prefect.engine - Created flow run 'terrestrial-bird' for flow 'some-flow'
09:15:38.789 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-08-17 09:15:40,455	INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265
09:15:42.168 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
09:15:42.168 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
09:15:42.355 | INFO    | Flow run 'terrestrial-bird' - Created subflow run 'small-marten' for flow 'my-subflow'
09:15:42.356 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
09:15:42.357 | ERROR   | Flow run 'small-marten' - Crash detected! Execution was interrupted by an unexpected exception.
09:15:42.390 | ERROR   | Flow run 'terrestrial-bird' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
09:15:45.181 | ERROR   | Flow run 'terrestrial-bird' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 29, in <module>
    some_flow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
    my_subflow()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
    await self._start(exit_stack)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
    context = ray.init(*init_args, **self.init_kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
    raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

I solved the problem in my private package by using a modified RayTaskRunner. In particular, I modified the _start_function to start with:

if self.address:
    self.logger.info(
        f"Connecting to an existing Ray instance at {self.address}"
    )
    init_args = (self.address,)
elif not ray.is_initialized():
    self.logger.info("Creating a local Ray instance")
    init_args = ()

Note the ray.is_initialized call.

Prefect-Ray spins up workers before the inputs for the worker tasks are ready / before the task's dependencies are complete

Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future x = task_1.submit() , if we pass that future as an input to another task, y = task_2.submit(x=x), Ray will spin up a worker for task_2 with the requested resources, and this worker will wait on x.result().

This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ..., all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.

Is this known behavior / does anyone know how to solve this?

Here's a working example, using "prefect==2.9.0", "prefect-ray==0.2.4", "ray[default]==2.2.0"

from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time

@task
def task_a():
    time.sleep(100)
    return 'a'

@task
def task_b():
    return 'b'

@flow(
    task_runner=RayTaskRunner(
        address="ray://ray-cluster-kuberay-head-svc:10001"
    )
)
def multi_test():
    with remote_options(num_cpus=1, num_gpus=1):
        x = task_a.submit()
    for i in range(10):
        with remote_options(num_cpus=1, num_gpus=1):
            task_b.submit(wait_for=[x])

if __name__ == '__main__':
    print(multi_test())

When run, we immediately get 11 workers created, per the image.
image

how to set resources on @task

ray can set resources for each operator, how can i set resources on @task ?
like this:

@task(cluster_kwargs={“n_workers”: 1, “resources”: {“GPU”: 1, “process”: 1}})
def spliter(file):
do spliter

@task(cluster_kwargs={“n_workers”: 5, “resources”: {“GPU”: 1, “process”: 1}})
def process(str):
do process;

Add collection sync workflow using cruft

Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.

Circular import error upon deployment

(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/python.py", line 8, in <module>
(begin_task_run pid=8142)     from prefect.software.pip import PipRequirement, current_environment_requirements
(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/pip.py", line 4, in <module>
(begin_task_run pid=8142)     import packaging.requirements
(begin_task_run pid=8142)   File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/__init__.py", line 1, in <module>
(begin_task_run pid=8142)     from prefect.packaging.docker import DockerPackager
(begin_task_run pid=8142) ImportError: cannot import name 'DockerPackager' from partially initialized module 'prefect.packaging.docker' (most likely due to a circular import) (/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/docker.py)

The issue is that in a deployment, ray includes /home/ray/anaconda3/lib/python3.8/site-packages/prefect/ in its sys.path, and in this directory, there’s a packaging directory which confuses the from packaging import version statement (it tries to pull from prefect.packaging rather than the pypi packaging).

To workaround this issue, override sys.argv[0] = str(Path.cwd()) in prefect_ray/task_runners.py

`RayTaskRunner` crashes with `pydantic>=2.0.0`

prefect-ray crashes when using pydantic>=2.0.0.

Expectation / Proposal

It is expected that RayTaskRunner can work with both pydantic v1 and pydantic v2. Especially now that prefect 2.13.4 is out and it also had to update itself to handle pydantic v2's newer schema.

Prefect 2.13.4 supports using pydantic v2. But if one wants to use prefect-ray with it, they can't use pydantic v2 and have to fallback to pydantic v1.

I'm not certain if this is to be fixed in prefect-ray or if this is a prefect issue. Guidance appreciated.

Traceback / Example

❯ pip freeze | grep -E 'pydantic|prefect'
prefect==2.14.3
prefect-dask==0.2.4
prefect-ray==0.3.0
prefect-shell==0.2.0
pydantic==2.4.2
pydantic_core==2.10.1
from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def _():
    pass

@flow(task_runner=RayTaskRunner())
def main():
    _.submit()

if __name__ == "__main__":
    main()

Crash log with pydantic==2.4.2

23:18:38.369 | DEBUG   | prefect.profiles - Using profile 'default'
23:18:39.286 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/msq/.prefect/prefect.db
23:18:39.346 | INFO    | prefect.engine - Created flow run 'uber-koala' for flow 'main'
23:18:39.346 | DEBUG   | Flow run 'uber-koala' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
23:18:39.347 | DEBUG   | prefect.task_runner.ray - Starting task runner...
23:18:39.347 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2023-11-03 23:18:40,978 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
23:18:41.885 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
23:18:41.886 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
23:18:41.889 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////home/msq/.prefect/prefect.db
23:18:41.923 | DEBUG   | Flow run 'uber-koala' - Executing flow 'main' for flow run 'uber-koala'...
23:18:41.924 | DEBUG   | Flow run 'uber-koala' - Beginning execution...
23:18:41.945 | INFO    | Flow run 'uber-koala' - Created task run '_-0' for task '_'
23:18:41.953 | DEBUG   | prefect.task_runner.ray - Shutting down task runner...
23:18:44.613 | ERROR   | Flow run 'uber-koala' - Crash detected! Execution was interrupted by an unexpected exception: TypeError: remote() argument after ** must be a mapping, not FieldInfo
23:18:44.615 | DEBUG   | Flow run 'uber-koala' - Crash details:
Traceback (most recent call last):
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/task_runners.py", line 187, in start
    yield self
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 843, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1861, in wait_for_task_runs_and_report_crashes
    states = await gather(*(future._wait for future in task_run_futures))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 417, in gather
    keys.append(tg.start_soon(call))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 393, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 668, in task_done
    exc = _task.exception()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1944, in report_flow_run_crashes
    yield
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1402, in create_task_run_then_submit
    await submit_task_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1463, in submit_task_run
    future = await task_runner.submit(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 172, in submit
    ray_decorator = ray.remote(**remote_options)
TypeError: remote() argument after ** must be a mapping, not FieldInfo
23:18:44.672 | DEBUG   | prefect.engine - Reported crashed flow run 'uber-koala' successfully!
Traceback (most recent call last):
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/task_runners.py", line 187, in start
    yield self
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 843, in orchestrate_flow_run
    waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1861, in wait_for_task_runs_and_report_crashes
    states = await gather(*(future._wait for future in task_run_futures))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 417, in gather
    keys.append(tg.start_soon(call))
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 393, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 668, in task_done
    exc = _task.exception()
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "prefect_ray_pydantic_v2.py", line 13, in <module>
    main()
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/flows.py", line 1079, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 283, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 375, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 531, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/usr/lib/python3.8/contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "/usr/lib/python3.8/contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1944, in report_flow_run_crashes
    yield
  File "/usr/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1402, in create_task_run_then_submit
    await submit_task_run(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect/engine.py", line 1463, in submit_task_run
    future = await task_runner.submit(
  File "/home/msq/work/virt-envs/abyss-3.8/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 172, in submit
    ray_decorator = ray.remote(**remote_options)
TypeError: remote() argument after ** must be a mapping, not FieldInfo

Recognize Ray Cluster defined via env var RAY_ADDRESS

Current Behaviour
Our infrastructure setup provides the ray cluster address in the environment variable RAY_ADDRESS. Currently, prefect_ray does not recognize and respect the set env var and starts to create a local ray instance. However, ray.init() does recognize the env var and tries to connect to the existing cluster.
In our setup, this leads to an error in case of a terminated cluster that has to be restarted: prefect_ray does not wait for the restart to be finished before starting the task execution.

[~]$ export RAY_ADDRESS=anyscale://prefect-flow-template_cluster
[~]$ python example.py ray
ray_address='anyscale://prefect-flow-template_cluster'
11:43:53.872 | INFO    | prefect.engine - Created flow run 'nondescript-urchin' for flow 'flow'
11:43:53.874 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2022-12-12 11:43:53,874	INFO worker.py:1230 -- Using address anyscale://prefect-flow-template_cluster set in the environment variable RAY_ADDRESS
Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.
[...]
Waiting for cluster prefect-flow-template_cluster to start. This may take a few minutes
14:24:33.357 | ERROR   | Flow run 'prompt-anaconda' - Crash detected! Execution was interrupted by an unexpected exception: click.exceptions.ClickException: Error while starting cluster prefect-flow-template_cluster: KeyError: 'auth'

14:24:33.624 | ERROR   | prefect.engine - Engine execution of flow run '83baaa1a-643c-4e57-9ed9-e41b02ed4f6e' exited with unexpected exception

Desired Behaviour
Support RAY_ADDRESS as described in the ray documentation.

You can also define an environment variable called RAY_ADDRESS in the same format as the address parameter to connect to an existing cluster with ray.init() or ray.init(address=”auto”).
https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-init

Proposed Solution
Add check whether env var RAY_ADDRESS is set in line https://github.com/PrefectHQ/prefect-ray/blob/main/prefect_ray/task_runners.py#L206.

Flow & Task Code example.py

import os
import sys

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from prefect_ray import RayTaskRunner


@task
def task():
    print("task execution")


@flow(log_prints=True)
def flow():
    print("flow execution start")
    task.submit()
    print("flow execution end")

def main(argv):
    if (argv[1] == "sequential"):
        flow.with_options(task_runner=SequentialTaskRunner())()
    elif (str(argv[1]) == "ray"):
        ray_address = str(os.environ.get("RAY_ADDRESS"))
        print(f"{ray_address=}")
        flow.with_options(task_runner=RayTaskRunner())()

if __name__ == "__main__":  # noqa
    main(sys.argv)

Handle dependent tasks concurrently in the `RayTaskRunner`

When updating the RayTaskRunner #35, wrapping of Prefect futures with Ray task futures was introduced. For tasks with upstream dependencies, the upstream tasks need to synchronously unwrap the Ray future to provide the output of the upstream task to the downstream task. This may result in degraded performance for flows using the RayTaskRunner with many dependent tasks. The aim is to discover a way to concurrently handle Ray futures to enable concurrent execution for flows with dependent tasks.

@anticorrelator @zangell44 feel free to add any additional details or correct any inaccuracies in my description of the issue!

Specifying resources for task causes flow to fail/crash

NOTE: duplicate of existing issue in Prefect repository (PrefectHQ/prefect#10542)

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

  1. When using the prefect-ray integration i get an error when specifying the resources that a task needs to use, when calling it from my flow

  2. The following code reproduces the error:

Example from prefect-ray integration docs:

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner("ray://<my-ray-service>:10001"))
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=1)
    with remote_options(num_cpus=1):
        process.submit(42)

my_flow()

Deployment is submitted using CLI:

prefect deployment build flow.py:my_flow \
    --name k8sjob \
    --pool my-k8s-pool \
    --infra kubernetes-job  \
    --override namespace="prefect" \
    --override image="<my-image>"  \
    --apply
  1. The stack trace from the code above, when looking at the flow run logs:
image

I have given the full stack-trace from the kubernetes job below.

If i simply create my flow without specifying the resource e.g.

@flow(task_runner=RayTaskRunner("ray://<my-ray-service>:10001"))
def my_flow():
    process(42)

Everything works as expected.

  1. The following infrastructure was used:
  • Prefect agent running inside kubernetes cluster
  • Prefect deployment, that uses the agent work pool. The deployment is a kubernetes-type deployment, which means my tasks get executed as a job inside my kubernetes cluster
  • ray cluster also running inside kubernetes

I have the same version installed for all images involved in this (prefect==2.11.5, ray==2.5.0, prefect-ray==0.2.5)

Reproduction

See code block above

Error

15:01:06.927 | INFO    | Flow run 'dynamic-dingo' - Downloading flow code from storage at ''
15:01:07.919 | WARNING | ray._private.worker - Failed to set SIGTERM handler, processes mightnot be cleaned up properly on exit.
15:01:07.963 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://ray-cluster-kuberay-head-svc.ray.svc.cluster.local:10001
15:01:09.806 | INFO    | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
15:01:09.807 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.0.7.18:8265
15:01:10.063 | INFO    | Flow run 'dynamic-dingo' - Created task run 'process-0' for task 'process'
15:01:12.005 | ERROR   | Flow run 'dynamic-dingo' - Crash detected! Execution was interrupted by an unexpected exception: RuntimeError: There is no current event loop in thread 'ray_client_server_1'.
15:01:12.063 | ERROR   | prefect.engine - Engine execution of flow run '481bee97-825c-4270-a097-348984ee8b38' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 2477, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 296, in enter_flow_run_engine_from_subprocess
    state = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 441, in retrieve_flow_then_begin_flow_run
    return await begin_flow_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 519, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.9/contextlib.py", line 199, in __aexit__
    await self.gen.athrow(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1913, in report_flow_run_crashes
    yield
  File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1390, in create_task_run_then_submit
    await submit_task_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1451, in submit_task_run
    future = await task_runner.submit(
  File "/usr/local/lib/python3.9/site-packages/prefect_ray/task_runners.py", line 176, in submit
    self._ray_refs[key] = ray_decorator(self._run_prefect_task).remote(
  File "/usr/local/lib/python3.9/site-packages/ray/remote_function.py", line 133, in _remote_proxy
    return self._remote(args=args, kwargs=kwargs, **self._default_options)
  File "/usr/local/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 306, in _invocation_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/remote_function.py", line 252, in _remote
    return client_mode_convert_function(self, args, kwargs, **task_options)
  File "/usr/local/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 164, in client_mode_convert_function
    return client_func._remote(in_args, in_kwargs, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 298, in _remote
    return self.options(**option_args).remote(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 581, in remote
    return return_refs(ray.call_remote(self, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/api.py", line 100, in call_remote
    return self.worker.call_remote(instance, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/worker.py", line 556, in call_remote
    task = instance._prepare_client_task()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 587, in _prepare_client_task
    task = self._remote_stub._prepare_client_task()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 324, in _prepare_client_task
    self._ensure_ref()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 319, in _ensure_ref
    self._ref = ray.worker._put_pickled(
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/worker.py", line 510, in _put_pickled
    raise cloudpickle.loads(resp.error)
RuntimeError: There is no current event loop in thread 'ray_client_server_1'.

Versions

Version:             2.11.5
API version:         0.8.4
Python version:      3.9.17
Git commit:          a597971f
Built:               Thu, Aug 24, 2023 2:14 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Python 3.10 support

Expectation / Proposal

Python 3.10 is supported by Ray since v2.0.0.
It would be nice to also support it in prefect-ray.

Traceback / Example

ModuleNotFoundError 'ray'

With Python 3.9 and 3.10

prefect-ray 0.1.2 pypi_0 pypi

Traceback (most recent call last):
  File "/Users/jeffhale/Desktop/prefect/test-doc-examples/ex2.py", line 330, in <module>
    from prefect_ray import RayTaskRunner
  File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/__init__.py", line 6, in <module>
    from .task_runners import RayTaskRunner  # noqa
  File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/task_runners.py", line 52, in <module>
    import ray
ModuleNotFoundError: No module named 'ray'

Prefect Logger does not work with native Ray Functions

Expectation / Proposal

Using the Prefect Logger with Ray Functions on the RayTaskRunner does not correctly propagate the logs to Prefect Cloud.

Traceback / Example

import ray
from prefect import flow, get_run_logger
from prefect_ray import RayTaskRunner


@flow(task_runner=RayTaskRunner)
def main(*, range_ceiling: int = 4) -> None:
    logger = get_run_logger()
    logger.info(f"Start of 'Simplified SPIKE ML on Prefect' with parameters {range_ceiling=}.")

    @ray.remote
    def square(number: int) -> int:
        logger.info(f"calculating the square of {number=}")
        return number * number

    futures = [square.remote(i) for i in range(range_ceiling)]
    logger.info(ray.get(futures))


if __name__ == "__main__":
    main()

Screenshot from 2023-07-12 13-05-02
image

prefect2: No module named 'prefect.utilities.asyncio'

Seeing the following error with prefect2

python flow.py Traceback (most recent call last):
  File "flow.py", line 8, in <module>    from prefect_ray.task_runners import RayTaskRunner
  File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module>
    from .task_runners import RayTaskRunner  # noqa
  File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module>
    from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'

Repro

python3 -m venv venv 
./venv/bin/python -m pip install --upgrade "prefect>=2.0b" "prefect-ray"
./venv/bin/python -c "from prefect_ray.task_runners import RayTaskRu
nner"Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module>    from .task_runners import RayTaskRunner  # noqa
  File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module>    from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'

Circular Import on Deployment (again)

@ahuang11 I'm running into a similar issue as #33 :

My env (inside docker container):


<details>
<summary>Package                      Version
---------------------------- -----------
adal                         1.2.7
adlfs                        2023.1.0
aiohttp                      3.8.4
aiohttp-cors                 0.7.0
aiosignal                    1.3.1
aiosqlite                    0.18.0
alembic                      1.9.3
anyio                        3.6.2
appdirs                      1.4.4
apprise                      1.2.1
asgi-lifespan                2.0.0
asttokens                    2.2.1
async-timeout                4.0.2
asyncpg                      0.27.0
attrs                        22.2.0
azure-common                 1.1.28
azure-core                   1.26.3
azure-datalake-store         0.0.52
azure-identity               1.12.0
azure-mgmt-containerinstance 10.0.0
azure-mgmt-core              1.3.2
azure-mgmt-resource          22.0.0
azure-storage-blob           12.14.1
backcall                     0.2.0
beautifulsoup4               4.11.2
bleach                       6.0.0
blessed                      1.20.0
blosc2                       2.0.0
bokeh                        2.4.3
Bottleneck                   1.3.6
cachelib                     0.10.2
cachetools                   5.3.0
certifi                      2022.12.7
cffi                         1.15.1
charset-normalizer           3.0.1
click                        8.1.3
cloudpickle                  2.2.1
colorama                     0.4.6
colorful                     0.5.5
comm                         0.1.2
connectorx                   0.3.1
contourpy                    1.0.7
coolname                     2.2.0
cramjam                      2.6.2
croniter                     1.3.8
cryptography                 39.0.1
cssselect                    1.2.0
cycler                       0.11.0
Cython                       0.29.33
dask                         2023.2.0
dateparser                   1.1.7
debugpy                      1.6.6
decorator                    5.1.1
Deprecated                   1.2.13
distlib                      0.3.6
distributed                  2023.2.0
dnspython                    2.3.0
docker                       6.0.1
et-xmlfile                   1.1.0
exchange-calendars           4.2.5
executing                    1.2.0
fastapi                      0.91.0
fastparquet                  2023.2.0
filelock                     3.9.0
finta                        1.3
fonttools                    4.38.0
frozendict                   2.3.4
frozenlist                   1.3.3
fsspec                       2023.1.0
gevent                       22.10.2
google-api-core              2.11.0
google-auth                  2.16.0
googleapis-common-protos     1.58.0
gpustat                      1.0.0
greenlet                     2.0.2
griffe                       0.25.4
grpcio                       1.51.1
h11                          0.14.0
h2                           4.1.0
h5py                         3.8.0
HeapDict                     1.0.1
howdoi                       2.0.20
hpack                        4.0.0
html5lib                     1.1
httpcore                     0.16.3
httpx                        0.23.3
hyperframe                   6.0.1
idna                         3.4
importlib-metadata           6.0.0
ipykernel                    6.21.2
ipython                      8.10.0
ipython-suggestions          1.0.0
ipywidgets                   8.0.4
isodate                      0.6.1
jedi                         0.18.2
Jinja2                       3.1.2
jsonpatch                    1.32
jsonpointer                  2.3
jsonschema                   4.17.3
jupyter_client               8.0.2
jupyter_core                 5.2.0
jupyterlab-widgets           3.0.5
keep                         2.10.1
kiwisolver                   1.4.4
korean-lunar-calendar        0.3.1
kubernetes                   25.3.0
locket                       1.0.0
lxml                         4.9.2
Mako                         1.2.4
Markdown                     3.4.1
markdown-it-py               2.1.0
MarkupSafe                   2.1.2
matplotlib                   3.6.3
matplotlib-inline            0.1.6
mdurl                        0.1.2
modin                        0.18.1
mongoengine                  0.26.0
msal                         1.21.0
msal-extensions              1.0.0
msgpack                      1.0.4
msrest                       0.7.1
multidict                    6.0.4
multitasking                 0.0.11
nest-asyncio                 1.5.6
numexpr                      2.8.4
numpy                        1.24.2
nvidia-ml-py                 11.495.46
oauthlib                     3.2.2
opencensus                   0.11.1
opencensus-context           0.1.3
openpyxl                     3.1.1
orjson                       3.8.6
packaging                    23.0
pandas                       1.5.3
parallel-ssh                 2.12.0
parso                        0.8.3
partd                        1.3.0
pathlib2                     2.3.7.post1
pathspec                     0.11.0
pendulum                     2.1.2
pexpect                      4.8.0
pickleshare                  0.7.5
Pillow                       9.4.0
pip                          23.0
platformdirs                 3.0.0
portalocker                  2.7.0
prefect                      2.8.0
prefect-alert                0.1.3
prefect-azure                0.2.4
prefect-dask                 0.2.2
prefect-ray                  0.2.2
prefect-shell                0.1.4
prometheus-client            0.13.1
prompt-toolkit               3.0.36
protobuf                     3.20.3
psutil                       5.9.4
ptyprocess                   0.7.0
pure-eval                    0.2.2
py-cpuinfo                   9.0.0
py-spy                       0.3.14
pyarrow                      11.0.0
pyasn1                       0.4.8
pyasn1-modules               0.2.8
pycparser                    2.21
pydantic                     1.10.4
PyGithub                     1.57
Pygments                     2.14.0
PyJWT                        2.6.0
pyluach                      2.1.0
pymongo                      4.3.3
PyNaCl                       1.5.0
pyparsing                    3.0.9
pyquery                      2.0.0
pyrsistent                   0.19.3
pyte                         0.8.1
python-dateutil              2.8.2
python-slugify               8.0.0
pytz                         2022.7.1
pytz-deprecation-shim        0.1.0.post0
pytzdata                     2020.1
PyYAML                       6.0
pyzmq                        25.0.0
QuantStats                   0.0.59
ray                          2.2.0
readchar                     4.0.3
regex                        2022.10.31
requests                     2.28.2
requests-oauthlib            1.3.1
rfc3986                      1.5.0
rich                         13.3.1
rsa                          4.9
scipy                        1.10.0
seaborn                      0.12.2
setuptools                   67.2.0
sidetable                    0.9.1
six                          1.16.0
smart-open                   6.3.0
sniffio                      1.3.0
sortedcontainers             2.4.0
soupsieve                    2.3.2.post1
SQLAlchemy                   1.4.46
ssh-python                   1.0.0
ssh2-python                  1.0.0
stack-data                   0.6.2
starlette                    0.24.0
swifter                      1.3.4
sxmpy                        0.3.7
ta-lib-bin                   0.4.26
tables                       3.8.0
tabulate                     0.9.0
tblib                        1.7.0
terminaltables               3.1.10
text-unidecode               1.3
thefuck                      3.32
toml                         0.10.2
toolz                        0.12.0
tornado                      6.2
tqdm                         4.64.1
traitlets                    5.9.0
typer                        0.7.0
typing_extensions            4.4.0
tzdata                       2022.7
tzlocal                      4.2
urllib3                      1.26.14
uvicorn                      0.20.0
virtualenv                   20.19.0
wcwidth                      0.2.6
webencodings                 0.5.1
websocket-client             1.5.1
websockets                   10.4
wheel                        0.38.4
widgetsnbextension           4.0.5
wrapt                        1.14.1
yarl                         1.8.2
yfinance                     0.2.11
zict                         2.2.0
zipp                         3.13.0
zope.event                   4.6
zope.interface               5.5.2</summary>
</details>

python==3.9.16
ray==2.2.0
prefect-ray

Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 650, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "sxm_exporter.py", line 471, in Archive_QueueViewer_Loader
    dfDirs = get_archive_directories.submit(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/futures.py", line 226, in result
    return sync(
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 267, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
ray.exceptions.RaySystemError: System error: cannot import name 'FlowRun' from partially initialized module 'prefect.client.schemas' (most likely due to a circular import) (/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py)
traceback: Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 332, in deserialize_objects
    obj = self._deserialize_object(data, metadata, object_ref)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 235, in _deserialize_object
    return self._deserialize_msgpack_data(data, metadata_fields)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 190, in _deserialize_msgpack_data
    python_objects = self._deserialize_pickle5_data(pickle5_data)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 180, in _deserialize_pickle5_data
    obj = pickle.loads(in_band)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/__init__.py", line 25, in <module>
    from prefect.states import State
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/states.py", line 14, in <module>
    from prefect.client.schemas import State as State
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py", line 6, in <module>
    from prefect.orion import schemas
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/__init__.py", line 1, in <module>
    from . import models
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/models/__init__.py", line 1, in <module>
    from . import (
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/models/block_documents.py", line 13, in <module>
    from prefect.orion import schemas
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/__init__.py", line 1, in <module>
    from . import states, schedules, core, sorting, filters, responses, actions
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 13, in <module>
    from prefect.orion.utilities.schemas import DateTimeTZ, IDBaseModel, PrefectBaseModel
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/utilities/schemas.py", line 16, in <module>
    from packaging.version import Version
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/__init__.py", line 1, in <module>
    from prefect.packaging.docker import DockerPackager
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/__init__.py", line 1, in <module>
    from prefect.packaging.docker import DockerPackager
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/docker.py", line 16, in <module>
    from prefect.flows import Flow, load_flow_from_script
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/flows.py", line 36, in <module>
    from prefect.context import PrefectObjectRegistry, registry_from_script
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/context.py", line 33, in <module>
    from prefect.client.orion import OrionClient
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 19, in <module>
    from prefect.client.schemas import FlowRun, OrchestrationResult, TaskRun
ImportError: cannot import name 'FlowRun' from partially initialized module 'prefect.client.schemas' (most likely due to a circular import) (/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py)
04:02:01 PM

Crash detected! Execution was interrupted by an unexpected exception: ray.exceptions.RaySystemError: System error: cannot import name 'FlowRun' from partially initialized module 'prefect.client.schemas' (most likely due to a circular import) (/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py)
traceback: Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 332, in deserialize_objects
    obj = self._deserialize_object(data, metadata, object_ref)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 235, in _deserialize_object
    return self._deserialize_msgpack_data(data, metadata_fields)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 190, in _deserialize_msgpack_data
    python_objects = self._deserialize_pickle5_data(pickle5_data)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/ray/serialization.py", line 180, in _deserialize_pickle5_data
    obj = pickle.loads(in_band)
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/__init__.py", line 25, in <module>
    from prefect.states import State
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/states.py", line 14, in <module>
    from prefect.client.schemas import State as State
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py", line 6, in <module>
    from prefect.orion import schemas
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/__init__.py", line 1, in <module>
    from . import models
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/models/__init__.py", line 1, in <module>
    from . import (
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/models/block_documents.py", line 13, in <module>
    from prefect.orion import schemas
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/__init__.py", line 1, in <module>
    from . import states, schedules, core, sorting, filters, responses, actions
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 13, in <module>
    from prefect.orion.utilities.schemas import DateTimeTZ, IDBaseModel, PrefectBaseModel
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/utilities/schemas.py", line 16, in <module>
    from packaging.version import Version
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/__init__.py", line 1, in <module>
    from prefect.packaging.docker import DockerPackager
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/__init__.py", line 1, in <module>
    from prefect.packaging.docker import DockerPackager
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/packaging/docker.py", line 16, in <module>
    from prefect.flows import Flow, load_flow_from_script
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/flows.py", line 36, in <module>
    from prefect.context import PrefectObjectRegistry, registry_from_script
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/context.py", line 33, in <module>
    from prefect.client.orion import OrionClient
  File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 19, in <module>
    from prefect.client.schemas import FlowRun, OrchestrationResult, TaskRun
ImportError: cannot import name 'FlowRun' from partially initialized module 'prefect.client.schemas' (most likely due to a circular import) (/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/schemas.py)```

Error in test suite

We're seeing the error:

RuntimeError: There is no current event loop in thread 'ray_client_server_0'.

From the prefect-ray test suite while running locally, and seeing tests timing out in CI after 6 hours. As a cursory triage, I tried with different versions of ray and prefect to see if there was a regression at a certain point, but I wasn't able to narrow it down.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.