Code Monkey home page Code Monkey logo

crew's People

Contributors

olivroy avatar shikokuchuo avatar wlandau avatar wlandau-lilly avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

crew's Issues

Complete redesign to prepare for async

crew needs a non-blocking way to run small meta-tasks like launching and polling cloud workers. Recent conversations with Kirill, as well as Gabor's task queue blog post, are leading me to a completely different design of crew.

A worker should not have the crew object as a field. A worker needs both a cache and a store as fields. The cache is either a local file system or an environment in memory, depending on the subclass. The store is either a local file system or a prefix on an S3 bucket, again depending on the subclass. Both the cache and the store are subclasses of class "store".

A worker keeps track of multiple states that should update asynchronously through polling and invocation:

  • up: TRUE if the worker was running when last checked, FALSE otherwise.
  • input: job instructions if the worker is occupied, NULL if the worker is free (either no job assigned or all job output collected).
  • output: return value (a monad) of the job if available, NULL otherwise.
  • progress: "input_cache", "input_store", "output_store", "output_cache", or "none", depending on where the worker job data is at a given moment.

Simplify the launcher plugin interface

The launch_worker() method should only need to accept the call to crew_worker(), the launcher name, and the worker token. Everything else should be internal. This will be a breaking change, but it will be well worth the initial pain.

Test errors receiving output

f921df7 has test failures because receive() fails to delete the job output file at the end. But strangely, the previous commit passes all tests consistently. The only change is that the failing commit does not delete the job input file inside the worker loop, which theoretically be independent of the job output file.

Common data

clustermq has a feature called "common data": objects that are part of the worker environment for all tasks. These objects need only get sent once, rather than with each new task. If the objects get assigned to .GlobalEnv on the worker (@mschubert, is this what clustermq does?), then I can implement this feature in crew without asking @shikokuchuo to implement it in mirai::server().

Release

Hi, I'm trying to get this package onto conda forge. Can a release on Github be made to facilitate that?

restarts

Should probably handle this at the crew level to account for worker crashes.

avoid blocking the main process

Problems/observations

  • Launching workers, sending jobs, polling workers, and receiving output may take a long time and block the main process.
  • This blocking is not so bad for a callr-based crew, especially if the scheduler does not need to send much data (think tar_target(retrieval = "worker")).
  • The number of high-performance workers in a pipeline could number in the thousands, but local computers can only support a handful of processes for local scheduling.

Proposal

Establish an outer crew with callr workers. Each outer worker runs a crew of its own. These inner crews will interface to AWS Batch, GCP, etc., where launching and polling are the most costly in terms of runtime and blocking. Because all this slow stuff happens in outer callr workers, the main process will not be blocked. And it will be straightforward to control the number of outer vs inner workers.

Difficulty

I think this is perfectly doable with nested crews. We just need #2 to make sure jobs can access inner crews.

persistent data for a worker

If a worker's purpose is to manage an inner crew, then jobs submitted from the outer crew need to be able to access the inner crew object.

internal nomenclature

What I have been calling a "queue" is really a task queue. What I have been calling a "store" is really a message queue. I think the internal naming conventions should refer to the appropriate terms "task queue" and "message queue". Prefixes "tq_" and "mq_" seem reasonable.

seeds

crew_eval() should take a seed and then set withr::local_seed() for reproducibility. The default seed should be random using the RNG state of the parent R session. The value of the set seed should be returned from the crew_eval() monad and passed on to the results of the controller.

crew functionality

  • cap max workers
  • determine if crew can accept work (based on which workers are idle and how many there are relative to max).
  • send job without user having to specify the worker. the crew should first try available worker using worker statuses, then if not, launch a new one.
  • receive job results without the user having to specify the worker. the crew should search through worker statuses.
  • shut down all workers.

Avoid shutdown jobs

Prework

Description

My early attempts at task queues explored the idea of using a special job to shut down a worker. This shutdown method is proving to be unreliable, and it is too important to fail. The technique for shutting down workers will need to be backend-specific, and this is not always available. For batchtools futures, in the absence of HenrikBengtsson/future#93, those workers will need to be transient. But I still think batchtools futures will make a nice task queue because async can speed up the startup/polling/cleanup for large collections of futures (i.e. to work around HenrikBengtsson/future#602).

future work

Is this a good time to look at cloud backends? Maybe an AWS Batch queue with an EFS data store? Or should crew step aside for rrq?

Superfluous launches in the case of fully transient workers

Promoting #50 to an issue so it is more visible. See reprex below. I am not sure where the problem comes from. Things may improve with shikokuchuo/mirai#38, and they may improve we can figure out a way to use mirai's own sockets instead of custom bus sockets to detect when particular new instances of worker processes connect.

library(crew)
crew_session_start()
x <- crew_controller_callr(
  tasks_max = 1L,
  workers = 1L
)
x$start()
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  x$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
x$wait()
results <- NULL
while (!is.null(out <- x$pop(scale = FALSE))) {
  if (!is.null(out)) {
    results <- dplyr::bind_rows(results, out)
  }
}
length(unique(results$socket_session))
#> [1] 100
utils::stack(x$summary(-contains(c("controller", "seconds"))))
#> values              ind
#> 1  ws://10.0.0.9:54499    worker_socket
#> 2                FALSE worker_connected
#> 3                FALSE      worker_busy
#> 4                  114  worker_launches # It is not usually this high, but it is always a little over 100 (e.g. 105 in other test runs)
#> 5                  101 worker_instances
#> 6                    0   tasks_assigned
#> 7                    1   tasks_complete
#> 8                  100     popped_tasks
#> 9                    0    popped_errors
#> 10                   0  popped_warnings
x$terminate()

slowness in controller$collect()

I am testing the throughput of small tasks for persistent workers: https://github.com/wlandau/crew/blob/main/tests/performance/test-persistent-throughput.R. The tasks push really fast, but in pop() there is a distinct bottleneck at collect(), and it takes over 12 seconds to pop 200 tasks. I tracked it down to:

if (!mirai::unresolved(task$handle[[1L]])) {

If I replace this line with if (TRUE) {, then the same code runs in a fraction of a second (but gives incorrect answers).

I am trying to reproduce this bottleneck using mirai only (with 0.7.2.9042), but all I notice is that for the thousand mirai()s I submit, only few get assigned, and the whole thing runs very fast.

library(mirai)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
#> [1] 1
px <- callr::r_bg(function() mirai::server("ws://127.0.0.1:5000/1"))
tasks <- lapply(seq_len(1e3), function(x) mirai(1))
Sys.sleep(5)
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
#>                       status_online status_busy tasks_assigned tasks_complete
#> ws://127.0.0.1:5000/1             1           0             37             37
#>                       instance #
#> ws://127.0.0.1:5000/1          1
system.time(out <- lapply(tasks, function(x) x$data))
#>    user  system elapsed 
#>   0.016   0.001   0.018
px$kill()
#> [1] TRUE
daemons(0)
#> [1] 0

Created on 2023-03-14 with reprex v2.0.2

@shikokuchuo, is there a faster way to check if a mirai is resolved? Also, in the above reprex, do you know why only 37 tasks get assigned out of the 1000 I submitted?

Long docs for branch 11

Update for the interface in branch 11. The README should describe the problem and solution in high-level prose. There should be a basic usage vignette that demonstrates the session and bg queues, and there should be a vignette for the future queue. Maybe also a design vignette for extending crew.

nuanced crash handling

Prework

  • Read and agree to the Contributor Code of Conduct and contributing guidelines.
  • If there is already a relevant issue, whether open or closed, comment on the existing thread instead of posting a new issue.
  • New features take time and effort to create, and they take even more effort to maintain. So if the purpose of the feature is to resolve a struggle you are encountering personally, please consider first posting a "trouble" or "other" issue so we can discuss your use case and search for existing solutions first.
  • Format your code according to the tidyverse style guide.

Proposal

If there is a crash, by default the queue will shut down all the workers and throw an error. But eventually there should be more options, like restarting up to some fixed number of attempts.

packages

The launcher should have a common set of packages and the package library paths so each worker can load them on launch.

Task queues with transient workers

In addition to the stuff with {rrq} I propose in #21, crew could also support queues with completely transient workers. This might scale better in some cases, and it would make it easier to work with just the web APIs to monitor jobs. This approach would also let me warm up to using AWS Batch on a serious basis before I need to worry about the network programming that the {rrq} stuff requires. And it would make use for the work I put into the code base in the commits leading up to f6a5769.

Code coverage below par

                   filename     functions line value
558     R/crew_controller.R       collect  197     0
364 R/crew_launcher_callr.R launch_worker  107     0
367 R/crew_launcher_callr.R launch_worker  108     0
369 R/crew_launcher_callr.R launch_worker  109     0
372 R/crew_launcher_callr.R launch_worker  110     0
375 R/crew_launcher_callr.R launch_worker  111     0
378 R/crew_launcher_callr.R launch_worker  112     0
580       R/crew_launcher.R        launch  203     0

GitHub interactions are temporarily limited because the maintainer is out of office.

Vacation mode

When this issue is open, vacation mode is turned on. That means Github interactions are temporarily limited, so users cannot open or comment on issues or discussions until I return and re-enable interactions (see return date below). When this issue is closed, vacation mode is turned off and interactions are re-enabled and possible again.

Thanks

Vacation mode helps me rest because it prevents tasks from piling up in my absence. Thank you for your patience and understanding.

Day of my return

Already returned.

Vacation mode source code

Hanging mirai dispatcher

Although I cannot reliably reproduce it, I still frequently observe that the mirai dispatcher keeps running long after daemons(0), and RStudio segfaults when I restart my R session if I forget to manually terminate the dispatcher with htop. I am concerned that crew users might encounter the same issue without knowing how many dangling R processes are accumulating behind the scenes. It would be great to find a way to manually access the dispatcher's PID so crew can terminate it directly.

Begin a major rewrite with rrq

rrq is a Redis-based task queue for R. It manages communication between the controller and the workers. More specifically, it brokers tasks, dynamically down-scales workers, and handles errors. But it does not start the Redis server, and it can only launch workers on the local machine. My goal is to make crew focus on these last two pieces. In the upcoming phase of development, crew will serve as a launcher for rrq and a future-like unifying R interface to many different backend computing environments that run the workers. I will start with traditional schedulers like SGE and SLURM, where it is standard practice to trust the local network and establish direct Redis connections between the controller and the workers. Later on (outside the scope of this first issue) I will work on a more secure communication layer between the user and the controller in cases where ports cannot be safely exposed (AWS EC2, Batch, etc.)

Worker statistics

For the controller and controller group, it would be nice to combine the output of mirai::daemons()$daemons with information about the load balancing of tasks in a nice data frame.

nested crews

Solution to #3. Let's create outer callr workers that run inner crews of cloud workers. We can accomplish this with a special subclass of class_worker_callr() where launch() has a new event loop that creates an inner crew and forwards outer jobs to it. #2 is no longer needed.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.