prefecthq / prefect-ray Goto Github PK
View Code? Open in Web Editor NEWPrefect integrations with Ray
Home Page: https://prefecthq.github.io/prefect-ray/
License: Apache License 2.0
Prefect integrations with Ray
Home Page: https://prefecthq.github.io/prefect-ray/
License: Apache License 2.0
(begin_task_run pid=8142) File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/python.py", line 8, in <module>
(begin_task_run pid=8142) from prefect.software.pip import PipRequirement, current_environment_requirements
(begin_task_run pid=8142) File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/software/pip.py", line 4, in <module>
(begin_task_run pid=8142) import packaging.requirements
(begin_task_run pid=8142) File "/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/__init__.py", line 1, in <module>
(begin_task_run pid=8142) from prefect.packaging.docker import DockerPackager
(begin_task_run pid=8142) ImportError: cannot import name 'DockerPackager' from partially initialized module 'prefect.packaging.docker' (most likely due to a circular import) (/home/ray/anaconda3/lib/python3.8/site-packages/prefect/packaging/docker.py)
The issue is that in a deployment, ray includes /home/ray/anaconda3/lib/python3.8/site-packages/prefect/
in its sys.path
, and in this directory, there’s a packaging directory which confuses the from packaging import version
statement (it tries to pull from prefect.packaging rather than the pypi packaging).
To workaround this issue, override sys.argv[0] = str(Path.cwd())
in prefect_ray/task_runners.py
With the example code below, only 5 tasks can scheduled by Ray at any time, and each task will take between 45-135 seconds. We schedule 15,000 tasks, and although each task does not use much memory (and only 5 can run at any given time), eventually Ray stops being able to schedule any tasks, so the flow run just hangs.
Ray error:
Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.
The flow completes normally for lower num_map_keys
, e.g. 100.
It also completes normally with num_map_keys=15000
using the default concurrent task runner.
https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html gives a possible resolution to this; prefect should expose the maximum number of pending Ray tasks as a configurable variable.
from time import sleep
import numpy as np
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
@task
def a(n: int, t: int, jitter_pct: float) -> float:
sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
print(f"task a with {n=}, sleeping {sleep_time}s")
sleep(sleep_time)
return n**2
@task
def b(n: float, t: int, jitter_pct: float) -> float:
sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
print(f"task b with {n=}, sleeping {sleep_time}s")
sleep(sleep_time)
return n**0.5
@task
def c(n: float, t: int, jitter_pct: float) -> int:
sleep_time = t * (1 + jitter_pct * (2 * np.random.default_rng().random() - 1))
print(f"task c with {n=}, sleeping {sleep_time}s")
sleep(sleep_time)
return int(n*2)
@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 5}))
def many_map_tasks(num_map_keys: int = 15000, t: int = 90, jitter_pct: float = 0.5):
squares = a.map(range(num_map_keys), t, jitter_pct)
roots = b.map(squares, t, jitter_pct)
doubles = c.map(roots, t, jitter_pct)
return [f.result() for f in doubles]
if __name__ == "__main__":
print(many_map_tasks())
Version: 2.6.7
API version: 0.8.3
Python version: 3.8.9
Git commit: 9074c158
Built: Thu, Nov 10, 2022 1:07 PM
OS/Arch: linux/x86_64
Profile: cloud
Server type: cloud
0.2.1
2.1.0
I'm running a Ray cluster in azure (cluster yaml attached below). I created a small flow, using the Azure Container Instance Job infrastructure block, that looks like this:
@task
async def my_task(id: int):
logger = get_run_logger()
logger.info(f"Hello from my_task #{id}! Sleeping for 30 seconds...")
time.sleep(30)
logger.info(f"my_task #{id} done!")
@flow(
name=FLOW_NAME,
task_runner=RayTaskRunner(address="ray://10.0.0.4:10001")
)
def my_flow(loglevel: str = "DEBUG"):
logconfig = setup_logging.submit(loglevel=loglevel)
logger = get_run_logger()
logger.info("Starting")
# Spam ray tasks here!
for i in range(50):
my_task.submit(i + 1, wait_for=[logconfig])
time.sleep(5)
After running this flow for about 5 minutes, I'm starting to see a lot of tasks that are stuck as 'running' indefinitely. Furthermore, these tasks are printing the following message to the log:
Hello from my_task #42! Sleeping for 30 seconds... 05:15:10 PM
Task run 'b4a5a5f5-1d72-4146-9476-cf157e686df3' already finished. 05:15:39 PM
After seeing a handful of tasks exhibiting this behavior, the flow promptly crashes with no further error message. The agent logs the following:
16:11:52.674 | INFO | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Running command...
16:11:52.676 | INFO | prefect.agent - Completed submission of flow run '87eba3cb-f09f-4b21-90f5-e0d7923801e4'
16:16:55.294 | INFO | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Completed command run.
16:16:55.296 | INFO | prefect.infrastructure.container-instance-job - AzureContainerInstanceJob 'myflow-161585': Deleting container...
The ray cluster doesn't log anything of note, that I can tell. I'm not entirely sure what's happening here, but as per this discussion: https://discourse.prefect.io/t/edgecase-error-prefect-task-run-task-run-id-already-finished/1836 I'm not the only person affected by this.
Of course the tasks shouldn't hang with "task already finished", they should either exit properly and crash, or continue running properly.
FROM rayproject/ray:2.1.0-py38-cpu
# Don't buffer console output
ENV PYTHONUNBUFFERED=1
# Don't compile bytecode, we're only running once
ENV PYTHONDONTWRITEBYTECODE=1
ENV CODE_PATH=/home/ray/anaconda3/lib/python3.8/site-packages/
COPY ./pipeline/requirements.txt ${CODE_PATH}/pipeline/requirements.txt
RUN pip install pip --upgrade && \
pip install -r ${CODE_PATH}/pipeline/requirements.txt
COPY ./logging.yml ${CODE_PATH}/logging.yml
COPY ./flow_audit ${CODE_PATH}/flow_audit
COPY ./orchestration ${CODE_PATH}/orchestration
COPY ./pipeline ${CODE_PATH}/pipeline
Please ignore the comments here, they're mostly from the default cluster YAML. We run a custom docker image as seen below from censoredregistry.azurecr.io/ray-cluster:latest (not the actual URL). The Dockerfile is attached above.
# An unique identifier for the head node and workers of this cluster.
cluster_name: test
# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 25
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 100.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty object means disabled.
docker:
# image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
image: censoredregistry.azurecr.io/ray-cluster:latest # use this one if you don't need ML dependencies, it's faster to pull
container_name: "ray_container"
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
run_options: # Extra options to pass into "docker run"
- --ulimit nofile=65536:65536
# Example of running a GPU head with CPU workers
# head_image: "rayproject/ray-ml:latest-gpu"
# Allow Ray to automatically detect GPUs
# worker_image: "rayproject/ray-ml:latest-cpu"
# worker_run_options: []
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 30 # TODO: Should probably be something like 5 minutes
# Cloud-provider specific configuration.
provider:
type: azure
# https://azure.microsoft.com/en-us/global-infrastructure/locations
location: westeurope
cache_stopped_nodes: False
resource_group: rg-ray-test
# Below settings only applicable for AWS, not Azure.
# security_group:
# GroupName: ray-nsg
# IpPermissions:
# - FromPort: 10001
# ToPort: 10001
# IpProtocol: TCP
# IpRanges:
# # This will enable inbound access from ALL IPv4 addresses.
# - CidrIp: 194.218.21.196
# set subscription id otherwise the default from az cli will be used
subscription_id: CENSORED_SUBSCRIPTION_ID
# set unique subnet mask or a random mask will be used
subnet_mask: 10.0.0.0/24
# set unique id for resources in this cluster
# if not set a default id will be generated based on the resource group and cluster name
unique_id: test
# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ray
# you must specify paths to matching private and public key pair files
# use `ssh-keygen -t rsa -b 4096` to generate a new ssh key pair
ssh_private_key: ~/.ssh/ray-test
# changes to this should match what is specified in file_mounts
ssh_public_key: ~/.ssh/ray-test.pub
# More specific customization to node configurations can be made using the ARM template azure-vm-template.json file
# See documentation here: https://docs.microsoft.com/en-us/azure/templates/microsoft.compute/2019-03-01/virtualmachines
# Changes to the local file will be used during deployment of the head node, however worker nodes deployment occurs
# on the head node, so changes to the template must be included in the wheel file used in setup_commands section below
# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
ray.head.default:
# The resources provided by this node type.
resources: {"CPU": 0}
# Provider-specific config, e.g. instance type.
node_config:
azure_arm_parameters:
vmSize: Standard_D2s_v3
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
imagePublisher: microsoft-dsvm
imageOffer: ubuntu-1804
imageSku: 1804-gen2
imageVersion: latest
ray.worker.default:
# The minimum number of worker nodes of this type to launch.
# This number should be >= 0.
min_workers: 0
# The maximum number of worker nodes of this type to launch.
# This takes precedence over min_workers.
max_workers: 25
# The resources provided by this node type.
resources: {"CPU": 2}
# Provider-specific config, e.g. instance type.
node_config:
azure_arm_parameters:
vmSize: Standard_D2s_v3
# List images https://docs.microsoft.com/en-us/azure/virtual-machines/linux/cli-ps-findimage
imagePublisher: microsoft-dsvm
imageOffer: ubuntu-1804
imageSku: 1804-gen2
imageVersion: latest
# optionally set priority to use Spot instances
# priority: Spot
# set a maximum price for spot instances if desired
# billingProfile:
# maxPrice: -1
# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
# "/path1/on/remote/machine": "/path1/on/local/machine",
# "/path2/on/remote/machine": "/path2/on/local/machine",
"~/.ssh/ray-test.pub": "~/.ssh/ray-test.pub"
}
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
- "**/.git"
- "**/.git/**"
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
- ".gitignore"
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
# add /home/ray to path
- export PATH=$PATH:/home/ray
# enable docker setup
- sudo usermod -aG docker $USER || true
- sleep 10 # delay to avoid docker permission denied errors
# get rid of annoying Ubuntu message
- touch ~/.sudo_as_admin_successful
# Wait for auto upgrade that might run in the background.
- bash -c $'ps -e | grep apt | awk \'{print $1}\' | xargs tail -f --pid || true'
# Log in to azure container registry (by ray-msi-test)
- az login --identity
- docker login censoredregistry.azurecr.io -u censoreduser -p censoredpassword # Exactly how the password is retrieved is censored.
# List of shell commands to run to set up nodes.
# NOTE: rayproject/ray-ml:latest has ray latest bundled
setup_commands: []
# Note: if you're developing Ray, you probably want to create a Docker image that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
# To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
# that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
# - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl"
# Custom commands that will be run on the head node after common setup.
# NOTE: rayproject/ray-ml:latest has azure packages bundled
head_setup_commands: []
# - pip install -U azure-cli-core==2.22.0 azure-mgmt-compute==14.0.0 azure-mgmt-msi==1.0.0 azure-mgmt-network==10.2.0 azure-mgmt-resource==13.0.0
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- export AUTOSCALER_MAX_NUM_FAILURES=inf;
- ray stop
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
I checked out PrefectHQ/prefect#6908 branch on both local and remote machine, then ran this code (if address is omitted, it works). Any ideas @madkinsz?
from typing import List
from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
from prefect.settings import temporary_settings, PREFECT_LOCAL_STORAGE_PATH
# from prefect_dask.task_runners import DaskTaskRunner
@task
def say_hello(name: str) -> None:
logger = get_run_logger()
logger.info(f"hello {name}")
@task
def say_goodbye(name: str) -> None:
# logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
logger = get_run_logger()
print("print goodbye")
logger.info(f"goodbye {name}")
# run on an existing ray cluster
@flow(
task_runner=RayTaskRunner(
# 127.0.0.1:10001 is port-forwarded to the remote ray cluster
address="ray://myaddress",
)
)
def greetings(names: List[str]) -> None:
for name in names:
say_hello.submit(name)
if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
Results in:
15:29:39.838 | INFO | prefect.engine - Created flow run 'wakeful-condor' for flow 'greetings'
15:29:39.839 | INFO | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://3.95.199.184:10001
15:29:43.498 | INFO | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
15:29:44.509 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
15:29:46.272 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-0' for execution.
15:29:46.366 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
15:29:46.458 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-2' for execution.
15:29:46.461 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
15:29:46.553 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-3' for execution.
15:29:46.554 | INFO | Flow run 'wakeful-condor' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
15:29:46.647 | INFO | Flow run 'wakeful-condor' - Submitted task run 'say_hello-811087cd-1' for execution.
15:29:46.728 | INFO | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:46.849 | INFO | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.005 | INFO | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:47.134 | INFO | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
15:29:48.747 | ERROR | Flow run 'wakeful-condor' - Finished in state Failed('4/4 states failed.')
Traceback (most recent call last):
File "/Users/andrew/test2.py", line 40, in <module>
greetings(["arthur", "trillian", "ford", "marvin"])
File "/Users/andrew/Applications/python/prefect/src/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/Users/andrew/mambaforge/envs/ray310/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/andrew/Applications/python/prefect/src/prefect/client/orion.py", line 86, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 232, in create_then_begin_flow_run
return state.result()
File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 116, in result
state.result()
File "/Users/andrew/Applications/python/prefect/src/prefect/client/schemas.py", line 102, in result
raise data
TypeError: missing a required argument: 'result_factory'
With Python 3.9 and 3.10
prefect-ray 0.1.2 pypi_0 pypi
Traceback (most recent call last):
File "/Users/jeffhale/Desktop/prefect/test-doc-examples/ex2.py", line 330, in <module>
from prefect_ray import RayTaskRunner
File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/__init__.py", line 6, in <module>
from .task_runners import RayTaskRunner # noqa
File "/Users/jeffhale/miniforge3/envs/ray/lib/python3.9/site-packages/prefect_ray/task_runners.py", line 52, in <module>
import ray
ModuleNotFoundError: No module named 'ray'
When updating the RayTaskRunner
#35, wrapping of Prefect futures with Ray task futures was introduced. For tasks with upstream dependencies, the upstream tasks need to synchronously unwrap the Ray future to provide the output of the upstream task to the downstream task. This may result in degraded performance for flows using the RayTaskRunner
with many dependent tasks. The aim is to discover a way to concurrently handle Ray futures to enable concurrent execution for flows with dependent tasks.
@anticorrelator @zangell44 feel free to add any additional details or correct any inaccuracies in my description of the issue!
Python 3.10 is supported by Ray since v2.0.0.
It would be nice to also support it in prefect-ray.
This is a follow-up issue to this PR - this QA comment shows it still doesn't work for Ray TTBOMK.
LMK if you need more info or help with QA
This issue is a blocker for LiveEO usage of RayTaskRunner
cc @madkinsz
Current Behaviour
Our infrastructure setup provides the ray cluster address in the environment variable RAY_ADDRESS. Currently, prefect_ray does not recognize and respect the set env var and starts to create a local ray instance. However, ray.init()
does recognize the env var and tries to connect to the existing cluster.
In our setup, this leads to an error in case of a terminated cluster that has to be restarted: prefect_ray does not wait for the restart to be finished before starting the task execution.
[~]$ export RAY_ADDRESS=anyscale://prefect-flow-template_cluster
[~]$ python example.py ray
ray_address='anyscale://prefect-flow-template_cluster'
11:43:53.872 | INFO | prefect.engine - Created flow run 'nondescript-urchin' for flow 'flow'
11:43:53.874 | INFO | prefect.task_runner.ray - Creating a local Ray instance
2022-12-12 11:43:53,874 INFO worker.py:1230 -- Using address anyscale://prefect-flow-template_cluster set in the environment variable RAY_ADDRESS
Authenticating
Loaded Anyscale authentication token from ~/.anyscale/credentials.json.
[...]
Waiting for cluster prefect-flow-template_cluster to start. This may take a few minutes
14:24:33.357 | ERROR | Flow run 'prompt-anaconda' - Crash detected! Execution was interrupted by an unexpected exception: click.exceptions.ClickException: Error while starting cluster prefect-flow-template_cluster: KeyError: 'auth'
14:24:33.624 | ERROR | prefect.engine - Engine execution of flow run '83baaa1a-643c-4e57-9ed9-e41b02ed4f6e' exited with unexpected exception
Desired Behaviour
Support RAY_ADDRESS as described in the ray documentation.
You can also define an environment variable called RAY_ADDRESS in the same format as the address parameter to connect to an existing cluster with ray.init() or ray.init(address=”auto”).
https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-init
Proposed Solution
Add check whether env var RAY_ADDRESS is set in line https://github.com/PrefectHQ/prefect-ray/blob/main/prefect_ray/task_runners.py#L206.
Flow & Task Code example.py
import os
import sys
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from prefect_ray import RayTaskRunner
@task
def task():
print("task execution")
@flow(log_prints=True)
def flow():
print("flow execution start")
task.submit()
print("flow execution end")
def main(argv):
if (argv[1] == "sequential"):
flow.with_options(task_runner=SequentialTaskRunner())()
elif (str(argv[1]) == "ray"):
ray_address = str(os.environ.get("RAY_ADDRESS"))
print(f"{ray_address=}")
flow.with_options(task_runner=RayTaskRunner())()
if __name__ == "__main__": # noqa
main(sys.argv)
We were wondering how to specify that a prefect flow (or even tasks) should only run on instances which fit to the resource requirements. What is the annotation to manage this on a flow/task level?
Basically we would like to use the @ray.remote(num_gpus=1) annotation of ray.
Scenario: We have a Ray-Cluster up and running which has cpu and gpu worker nodes. We now f.ex. would like to execute a flow only on the gpu nodes and have them then scale out accordingly.
We tested a bit around with num_gpus, request_cpus etc. but did not have success yet.
Here some example code:
from prefect import task, flow
from prefect_ray.task_runners import RayTaskRunner
import time
RayRunner = RayTaskRunner(
address="auto",
init_kwargs={
"num_gpus": 2,
}
)
@flow(name="time-wait-flow", task_runner=RayTaskRunner)
def time_wait_flow_gpu():
factor = 50
times = [5.0, 16.3, 25.5]
for i in range(0, factor):
for time_in_secs in times:
wait_time.submit(time_in_secs)
@task
def wait_time(time_in_secs):
time.sleep(time_in_secs)
if __name__ == "__main__":
time_wait_flow_gpu()
Seeing the following error with prefect2
python flow.py Traceback (most recent call last):
File "flow.py", line 8, in <module> from prefect_ray.task_runners import RayTaskRunner
File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module>
from .task_runners import RayTaskRunner # noqa
File "/home/ubuntu/miniconda3/envs/prefect/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module>
from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'
Repro
python3 -m venv venv
./venv/bin/python -m pip install --upgrade "prefect>=2.0b" "prefect-ray"
./venv/bin/python -c "from prefect_ray.task_runners import RayTaskRu
nner"Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/__init__.py", line 6, in <module> from .task_runners import RayTaskRunner # noqa
File "/home/ubuntu/workspace/prefect/prefect-master/flows/molecule_collection/venv/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 58, in <module> from prefect.utilities.asyncio import A, sync_compatible
ModuleNotFoundError: No module named 'prefect.utilities.asyncio'
Prefect 2.0 supports subflows which is a great new addition. There is a bug when using nested flows with the same RayTaskManager(), in essence it crashes complaining about calling ray.init() twice.
It would be nice if this workcase would be supported since it would allow combining different parallel flows.
To reproduce, use:
from prefect import task, flow
from prefect_ray import RayTaskRunner
@task
def some_task(input):
return input
@flow(task_runner=RayTaskRunner())
def my_subflow():
some_task.submit(1)
some_task.submit(2)
some_task.submit(3)
@flow(task_runner=RayTaskRunner())
def some_flow():
my_subflow()
some_task.submit(4)
some_flow()
The error is:
09:15:38.789 | INFO | prefect.engine - Created flow run 'terrestrial-bird' for flow 'some-flow'
09:15:38.789 | INFO | prefect.task_runner.ray - Creating a local Ray instance
2022-08-17 09:15:40,455 INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265
09:15:42.168 | INFO | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
09:15:42.168 | INFO | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
09:15:42.355 | INFO | Flow run 'terrestrial-bird' - Created subflow run 'small-marten' for flow 'my-subflow'
09:15:42.356 | INFO | prefect.task_runner.ray - Creating a local Ray instance
09:15:42.357 | ERROR | Flow run 'small-marten' - Crash detected! Execution was interrupted by an unexpected exception.
09:15:42.390 | ERROR | Flow run 'terrestrial-bird' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
my_subflow()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
return run_async_from_worker_thread(begin_run)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
task_runner = await stack.enter_async_context(flow.task_runner.start())
File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
return await self.gen.__anext__()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
await self._start(exit_stack)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
context = ray.init(*init_args, **self.init_kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
09:15:45.181 | ERROR | Flow run 'terrestrial-bird' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 29, in <module>
some_flow()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
return state.result()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/robbert/programming/python/altoida/altoida_ml/altoida_ml/robbert/snippits/prefect_bugs/ray_run_manager.py", line 25, in some_flow
my_subflow()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
return run_async_from_worker_thread(begin_run)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/engine.py", line 450, in create_and_begin_subflow_run
task_runner = await stack.enter_async_context(flow.task_runner.start())
File "/usr/lib/python3.8/contextlib.py", line 568, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/lib/python3.8/contextlib.py", line 171, in __aenter__
return await self.gen.__anext__()
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect/task_runners.py", line 168, in start
await self._start(exit_stack)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/prefect_ray/task_runners.py", line 195, in _start
context = ray.init(*init_args, **self.init_kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/home/robbert/.virtualenvs/altoida/lib/python3.8/site-packages/ray/worker.py", line 977, in init
raise RuntimeError(
RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
I solved the problem in my private package by using a modified RayTaskRunner. In particular, I modified the _start_function
to start with:
if self.address:
self.logger.info(
f"Connecting to an existing Ray instance at {self.address}"
)
init_args = (self.address,)
elif not ray.is_initialized():
self.logger.info("Creating a local Ray instance")
init_args = ()
Note the ray.is_initialized
call.
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
@task
def count(number):
with open(f"{number}.txt", "w") as f:
f.write(str(number))
print(number)
@flow(task_runner=RayTaskRunner())
def count_to(highest_number):
for number in range(1, highest_number + 1):
count.with_options(name=f"{number}").submit(number)
if __name__ == "__main__":
count_to(3)
Below, it doesn't state 2-464db843-1
is completed; only 1 and 3.
13:34:57.620 | INFO | prefect.engine - Created flow run 'adept-jackrabbit' for flow 'count-to'
13:34:57.620 | INFO | Flow run 'adept-jackrabbit' - Using task runner 'RayTaskRunner'
13:34:57.620 | INFO | prefect.task_runner.ray - Creating a local Ray instance
13:35:02.659 | INFO | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
13:35:02.671 | WARNING | Flow run 'adept-jackrabbit' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:35:02.712 | INFO | Flow run 'adept-jackrabbit' - Created task run '1-464db843-0' for task '1'
13:35:02.789 | INFO | Flow run 'adept-jackrabbit' - Created task run '2-464db843-1' for task '2'
13:35:02.903 | INFO | Flow run 'adept-jackrabbit' - Created task run '3-464db843-2' for task '3'
(begin_task_run pid=97109) 1
(begin_task_run pid=97108) 3
(begin_task_run pid=97109) 13:35:09.111 | INFO | Task run '1-464db843-0' - Finished in state Completed()
(begin_task_run pid=97108) 13:35:09.097 | INFO | Task run '3-464db843-2' - Finished in state Completed()
(begin_task_run pid=97107) 2
13:35:12.537 | INFO | Flow run 'adept-jackrabbit' - Finished in state Completed('All states completed.')
Add cruft to repo to allow synchronization of this collection with the original template.
Cruft can be added by running cruft link
. Note that a starting commit will need to be specified.
Using the commit of the prefect-collection-template closest to the generation date of this repo
is a good default.
09:59:53.784 | INFO | prefect.engine - Created flow run 'glittering-bull' for flow 'greetings'
09:59:53.784 | INFO | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
09:59:57.618 | INFO | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
09:59:57.618 | INFO | prefect.task_runner.ray - The Ray UI is available at 10.42.1.9:8265
09:59:57.890 | INFO | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
09:59:59.292 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-0' for execution.
09:59:59.318 | INFO | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
09:59:59.326 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-2' for execution.
09:59:59.409 | INFO | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
09:59:59.421 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-1' for execution.
09:59:59.462 | INFO | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
09:59:59.476 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
09:59:59.527 | INFO | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
09:59:59.539 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
09:59:59.623 | INFO | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
09:59:59.636 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
09:59:59.733 | INFO | Flow run 'glittering-bull' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
09:59:59.745 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
09:59:59.839 | INFO | Flow run 'glittering-bull' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
09:59:59.850 | INFO | Flow run 'glittering-bull' - Submitted task run 'say_hello-811087cd-3' for execution.
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
Failed to unpickle serialized exception
10:00:05.415 | INFO | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.443 | INFO | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.469 | INFO | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.500 | INFO | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.528 | INFO | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.554 | INFO | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.579 | INFO | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:05.607 | INFO | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
10:00:06.767 | ERROR | Flow run 'glittering-bull' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/compute/code/orion-demo/flows/ray_flow.py", line 49, in <module>
greetings(["arthur", "trillian", "ford", "marvin"])
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 103, in with_injected_client
return await fn(*args, **kwargs)
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 238, in create_then_begin_flow_run
return state.result()
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 157, in result
state.result()
File "/home/compute/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 143, in result
raise data
ray.exceptions.RaySystemError: System error: Failed to unpickle serialized exception
traceback: Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 46, in from_ray_exception
return pickle.loads(ray_exception.serialized_exception)
TypeError: __init__() missing 2 required keyword-only arguments: 'request' and 'response'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 352, in deserialize_objects
obj = self._deserialize_object(data, metadata, object_ref)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/serialization.py", line 264, in _deserialize_object
return RayError.from_bytes(obj)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 40, in from_bytes
return RayError.from_ray_exception(ray_exception)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/exceptions.py", line 49, in from_ray_exception
raise RuntimeError(msg) from e
RuntimeError: Failed to unpickle serialized exception
from typing import List
from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner
import sys
@task
def say_hello(name: str) -> None:
print(f"say_hello {name}", file=sys.stderr)
logger = get_run_logger()
logger.info(f"hello {name}")
@task
def say_goodbye(name: str) -> None:
# logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
logger = get_run_logger()
print("print goodbye")
logger.info(f"goodbye {name}")
# run on an existing ray cluster
@flow(
task_runner=RayTaskRunner(
address="ray://10.97.36.20:10001",
init_kwargs={
"runtime_env": {
"pip": ["prefect==2.3.2"],
"env_vars": {
"PREFECT_API_URL": "xxx",
"PREFECT_API_KEY": "yyy"
}
}
},
)
)
def greetings(names: List[str]) -> None:
for name in names:
# tasks must be submitted to run on ray
# if called without .submit() they are still tracked but
# run immediately and locally rather than async on ray
say_hello.submit(name)
say_goodbye.submit(name)
if __name__ == "__main__":
greetings(["arthur", "trillian", "ford", "marvin"])
prefect 2.3.2
ray 2.0.0
I met this issue during local testing with prefect and prefect-ray, both use latest versions. Ray uses 2.8.0
I can provide an example to show it later.
Hi folks, we're using Prefect and Ray to orchestrate distributed workflows. When we generate a future x = task_1.submit() , if we pass that future as an input to another task, y = task_2.submit(x=x), Ray will spin up a worker for task_2 with the requested resources, and this worker will wait on x.result().
This is obviously undesirable! If we had x1 = task1(); x2 = task2(x1=x1); x3=task3(x2=x2); ..., all the later tasks in the pipeline would be wasting worker resources while waiting for the earlier workers to complete. We would like the worker for a task to only launch when the task inputs become ready.
Is this known behavior / does anyone know how to solve this?
Here's a working example, using "prefect==2.9.0", "prefect-ray==0.2.4", "ray[default]==2.2.0"
from prefect import flow, task
from prefect_ray import RayTaskRunner
from prefect_ray.context import remote_options
import time
@task
def task_a():
time.sleep(100)
return 'a'
@task
def task_b():
return 'b'
@flow(
task_runner=RayTaskRunner(
address="ray://ray-cluster-kuberay-head-svc:10001"
)
)
def multi_test():
with remote_options(num_cpus=1, num_gpus=1):
x = task_a.submit()
for i in range(10):
with remote_options(num_cpus=1, num_gpus=1):
task_b.submit(wait_for=[x])
if __name__ == '__main__':
print(multi_test())
When run, we immediately get 11 workers created, per the image.
Propagate the prefect task name (whether function or manual name override in the task decorator) to the ray task.
Unsure why this is crashing at the end. @desertaxle @madkinsz thoughts?
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(3)
# outputs
#0
#2
#1
(begin_task_run pid=67366) #1
(begin_task_run pid=67368) #0
(begin_task_run pid=67371) #2
Output exceeds the [size limit](command:workbench.action.openSettings?[). Open the full output data [in a text editor](command:workbench.action.openLargeOutput?3eba7a16-fd78-43e7-8929-666909e5d171)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb Cell 2 in <cell line: 16>()
[14](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=13) shout.submit(number)
[16](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=15) if __name__ == "__main__":
---> [17](vscode-notebook-cell:/Users/andrew/Applications/python/prefect-dask/prefect_dask/test.ipynb#ch0000001?line=16) count_to(3)
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/flows.py:367, in Flow.__call__(self, *args, **kwargs)
364 # Convert the call args/kwargs to a parameter dict
365 parameters = get_call_parameters(self.fn, args, kwargs)
--> 367 return enter_flow_run_engine_from_flow_call(
368 self, parameters, return_type="result"
369 )
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:131, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
127 if in_async_main_thread():
128 # An event loop is already running and we must create a blocking portal to
129 # run async code from this synchronous context
130 with start_blocking_portal() as portal:
--> 131 return portal.call(begin_run)
132 else:
133 # An event loop is not running so we will create one
134 return anyio.run(begin_run)
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
275
(...)
281
282 """
--> 283 return cast(T_Retval, self.start_task_soon(func, *args).result())
File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/mambaforge/envs/ray/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
216 else:
217 future.add_done_callback(callback)
--> 219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/client.py:107, in inject_client.<locals>.with_injected_client(*args, **kwargs)
105 async with client_context as client:
106 kwargs.setdefault("client", client)
--> 107 return await fn(*args, **kwargs)
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:203, in create_then_begin_flow_run(flow, parameters, return_type, client)
199 engine_logger.info(
200 f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed."
201 )
202 else:
--> 203 state = await begin_flow_run(
204 flow=flow, flow_run=flow_run, parameters=parameters, client=client
205 )
207 if return_type == "state":
208 return state
File ~/mambaforge/envs/ray/lib/python3.9/site-packages/prefect/engine.py:332, in begin_flow_run(flow, flow_run, parameters, client)
329 result_storage = TempStorageBlock()
330 flow_run_context.result_storage = result_storage
--> 332 terminal_state = await orchestrate_flow_run(
333 flow,
334 flow_run=flow_run,
335 parameters=parameters,
336 client=client,
337 partial_flow_run_context=flow_run_context,
338 )
340 # If debugging, use the more complete `repr` than the usual `str` description
341 display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)
File ~/mambaforge/envs/ray/lib/python3.9/contextlib.py:670, in AsyncExitStack.__aexit__(self, *exc_details)
666 try:
667 # bare "raise exc_details[1]" replaces our carefully
668 # set-up context
669 fixed_ctx = exc_details[1].__context__
--> 670 raise exc_details[1]
...
--> 651 cb_suppress = cb(*exc_details)
652 else:
653 cb_suppress = await cb(*exc_details)
TypeError: 'dict' object is not callable
The latest version of ray 2.x.x and has been released for quite a while now and brings many improvements. Ray is also working on releasing the next version 3.x.x which will support python 3.11 (ray-project/ray#27881).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.