Code Monkey home page Code Monkey logo

Comments (12)

zanieb avatar zanieb commented on May 30, 2024 1

I think we'll want to consider more fundamentally the "local persistence for tasks on remote workers" story. I added a couple tickets to the tracking pull request:

  • Ensure task run results persisted to local file systems on remote workers respect relative paths
  • Investigate storing results after return from the remote worker for task runs with local file systems

from prefect-ray.

desertaxle avatar desertaxle commented on May 30, 2024 1

As a workaround for this issue, we are adding instructions to the prefect-ray documentation to recommend updating the PREFECT_LOCAL_STORAGE_PATH setting to a path available on the Ray worker and in flow execution environment in #47. This is not a perfect solution, but should unblock current use cases. We will continue to work on improving results management when local storage is used in conjunction with remote workers.

from prefect-ray.

ahuang11 avatar ahuang11 commented on May 30, 2024

Hi thanks for sharing this!

I was wondering if you can test this with the newest Prefect version
pip install -U "prefect>=2.0"

Also, I suspect you need to use a remote storage for Ray if remote address is provided, see:
https://orion-docs.prefect.io/concepts/storage/

from prefect-ray.

tekumara avatar tekumara commented on May 30, 2024

This still occurs in prefect 2.3.2.

I'm trying to run directly against the cluster without a Deployment, and so there is no explicit storage involved.

I would have thought it possible to run the flow locally, but submit tasks to the remote ray cluster.

from prefect-ray.

tekumara avatar tekumara commented on May 30, 2024

From the stack trace above it looks like the error is occurring in the filesystems module.

Digging into this further, it looks what is happening is prefect is persisting task run results to PREFECT_LOCAL_STORAGE_PATH. This path defaults to ${PREFECT_HOME}/storage.... the problem seems to be that prefect is resolving ${PREFECT_HOME} in the process running the flow (eg: my laptop which is/Users/tekumara/.prefect) and not in the process running the task (eg: on the ray cluster this would be /home/ray/.prefect)

from prefect-ray.

tekumara avatar tekumara commented on May 30, 2024

My current workaround is prior to running the flow, set PREFECT_LOCAL_STORAGE_PATH to a writable path inside the container running on the ray cluster, eg:

export PREFECT_LOCAL_STORAGE_PATH=/tmp/prefect/storage

from prefect-ray.

ahuang11 avatar ahuang11 commented on May 30, 2024

Thanks so much for helping debug this!

I think we could use by wrapping call.func in task_runner.submit with
https://github.com/PrefectHQ/prefect/blob/b65d1366eeb89fb3593a546459859d825af8f37d/src/prefect/settings.py#L806

e.g.

async def submit(
    self,
    key: UUID,
    call: Callable[..., Awaitable[State[R]]],
) -> None:
    def _submit_call_func_wrapper(*args, **kwargs):
        with temporary_settings(updates={PREFECT_LOCAL_STORAGE_PATH: "/tmp/prefect/storage"}):
            return call.func(*args, **kwargs)

    if not self._started:
        raise RuntimeError(
            "The task runner must be started before submitting work."
        )

    call_kwargs = self._optimize_futures(call.keywords)

    # Ray does not support the submission of async functions and we must create a
    # sync entrypoint
    self._ray_refs[key] = ray.remote(sync_compatible(_submit_call_func_wrapper)).remote(
        **call_kwargs
    )

And maybe could resolve #37 with Michael's suggestion too.

Would you be interested in contributing a PR?

from prefect-ray.

pcmoritz avatar pcmoritz commented on May 30, 2024

We also ran into this. The workaround we used is setting export PREFECT_HOME="/tmp/prefect" on the laptop, which exists both on the laptop and the cluster. We should try to fix this upstream or otherwise every single user of prefect-ray will run into this problem :)

from prefect-ray.

zanieb avatar zanieb commented on May 30, 2024

I'll attempt to address this as a part of PrefectHQ/prefect#6908

from prefect-ray.

pcmoritz avatar pcmoritz commented on May 30, 2024

Thanks, that's awesome! I'm happy to try out a PR once it is ready, I have a setup that reproduces the problem. Also happy to try out if #26 (comment) fixes the problem in case that helps you @madkinsz :)

from prefect-ray.

pcmoritz avatar pcmoritz commented on May 30, 2024

Btw, unfortunately the workaround from #26 (comment) is not working for me, even after some obvious modifications to fix the obvious problems with it (like shuffling sync_compatible into the function). Maybe temporary_settings is not enough (I have a feeling these settings are pickled with cloudpickle, and temporary_settings is not enough to overwrite that, but I didn't dig deep enough to really understand what is going on).

What did work for me is the following atrocious hack:

diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py
index deb7a8d..fcafb74 100644
--- a/prefect_ray/task_runners.py
+++ b/prefect_ray/task_runners.py
@@ -79,6 +79,11 @@ import anyio
 import ray
 from prefect.futures import PrefectFuture
 from prefect.orion.schemas.states import State
+from prefect.settings import (PREFECT_HOME,
+                              PREFECT_PROFILES_PATH,
+                              PREFECT_LOCAL_STORAGE_PATH,
+                              PREFECT_LOGGING_SETTINGS_PATH,
+                              PREFECT_ORION_DATABASE_CONNECTION_URL)
 from prefect.states import exception_to_crashed_state
 from prefect.task_runners import BaseTaskRunner, R, TaskConcurrencyType
 from prefect.utilities.asyncutils import sync_compatible
@@ -116,6 +121,13 @@ class RayTaskRunner(BaseTaskRunner):
         address: str = None,
         init_kwargs: dict = None,
     ):
+        import pathlib
+        PREFECT_HOME.value = lambda: pathlib.Path("/tmp/prefect")
+        PREFECT_PROFILES_PATH.value = lambda: pathlib.Path("/tmp/prefect/profiles.toml")
+        PREFECT_LOCAL_STORAGE_PATH.value = lambda: pathlib.Path("/tmp/prefect/storage")
+        PREFECT_LOGGING_SETTINGS_PATH.value = lambda: pathlib.Path("/tmp/prefect/logging.yml")
+        PREFECT_ORION_DATABASE_CONNECTION_URL.value = lambda: pathlib.Path("sqlite+aiosqlite:////tmp/prefect/orion.db")
+
         # Store settings
         self.address = address
         self.init_kwargs = init_kwargs.copy() if init_kwargs else {}

from prefect-ray.

pcmoritz avatar pcmoritz commented on May 30, 2024

Any updates on this?

from prefect-ray.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.