Code Monkey home page Code Monkey logo

signac-flow's Introduction

signac-flow - manage workflows with signac

Affiliated with NumFOCUS PyPI conda-forge RTD License PyPI-downloads Slack Twitter GitHub Stars

The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, and reproducibility.

The signac-flow tool provides the basic components to set up simple to complex workflows for projects managed by the signac framework. That includes the definition of data pipelines, execution of data space operations and the submission of operations to high-performance super computers.

Note: Check out row, the actively maintained spiritual successor to signac-flow.

Resources

Installation

The recommended installation method for signac-flow is through conda or pip. The software is tested for Python versions 3.6+ and is built for all major platforms.

To install signac-flow via the conda-forge channel, execute:

conda install -c conda-forge signac-flow

To install signac-flow via pip, execute:

pip install signac-flow

Detailed information about alternative installation methods can be found in the documentation.

Testing

You can test this package by executing

$ python -m pytest tests/

within the repository root directory.

Acknowledgment

When using signac as part of your work towards a publication, we would really appreciate that you acknowledge signac appropriately. We have prepared examples on how to do that here. Thank you very much!

The signac framework is a NumFOCUS Affiliated Project.

signac-flow's People

Contributors

ac-optimus avatar atravitz avatar b-butler avatar bdice avatar berceanu avatar cbkerr avatar charlottez112 avatar csadorf avatar dependabot[bot] avatar erinvchen avatar iblanco11981870 avatar javierbg avatar joaander avatar justingilmer avatar kidrahahjo avatar klywang avatar mattwthompson avatar melodyyzh avatar michiboo avatar mikemhenry avatar pre-commit-ci[bot] avatar ramanishsingh avatar rosecers avatar seoulfood avatar shannon-moran avatar swerdfish avatar tcmoore3 avatar vishav1771 avatar vyasr avatar zhou-pj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

signac-flow's Issues

API Consolidation

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


Proposal for API consolidation

The FlowProject API has significantly grown since version 0.3 and warrants consolidation for version 0.6.

This the current consolidation plan:

Candidates for removal:

FlowProject.next_operation()  # replace with FlowProject.next_operations()
FlowProject.update_aliases()
FlowProject.export_job_stati()
FlowProject.eligible()

Candidates for making private:

FlowProject.add_submit_args()
FlowProject.add_script_args()
FlowProject.add_print_status_args()
FlowProject.format_row()
FlowProject.update_stati()
FlowProject.map_scheduler_jobs()

Candidates for signature modification:

FlowProject.scheduler_jobs()

Implementation

In order to keep the current project.py module clean, I propose to move all deprecated methods into the same section, which is clearly marked using high-visibility comments, such as:

#!python
### BEGIN DEPRECATED METHODS ###

# All methods within this section have been deprecated as of version 0.6.
...
### END DEPRECATED METHODS ###

Proposal for operation hooks

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


About

This is a proposal for a feature that would allow users to specify hook functions that are executed either before or after the execution of an operation.
The purpose is to simplify the execution of specific functions before and after the execution of a job-operation, such as logging or data upload functions.

A hook function takes an instance of JobOperation as first argument which allows to access the name of the operation, the job it operates on, the command, and optional execution directives.
For example:

#!python
def log_operation(op):
    log("Operation {} started.".format(op))

This log function could then be specified to be executed on operation start:

#!python

@Project.operation
@Project.hook.on_start(log_operation)
def hello(job):
    print('hello', job)

API

Hooks are in general defined as lists of functions, as part of the flow.hooks.Hooks class.

  • on_start: Executed before a operation is executed.
  • on_finish: Executed after a operation has completed (with or without error).
  • on_success: Executed after a operation has completed without error.
  • on_fail: Executed after a operation has completed with error.

Low-level API

The low-level API allows the installation of hooks on the project instance level via the manipulation of the hooks instance variable:

#!python
project = Project()
project.hooks.on_start.append(my_log_function)

And during the adding of operations, again, via a dictionary:

#!python
project.add_operation(
    name='hello',
    cmd='echo hello {job._id}',
    hooks={'on_start': [my_log_function]})

The hooks are executed in order of definition, where project-wide hooks are executed prior to operation-wide hooks.

Decorator API

Alternatively, hooks can be installed via the decorator API:

#!python
from flow import hook

@Project.hook.on_start(log_operation)
def my_op(job):
    pass

Note, that the hooks are only executed if the operation is executed through the FlowProject interface.

Configurational API

Finally, hooks can be installed directly in the signac configuration by specifying a function that installs the hooks during project initialization.

#!bash
# signac.rc

[flow]
hooks = my_hooks.install_hooks

The hooks config key is interpreted as list, multiple modules would be provided as a comma-separated list.
The my_hooks module must be directly importable and should have a install_hooks() function in the global namespace similar to this:

#!python
# my_hooks.py

def log_operation(op):
    log("Operation {} started.".format(op))

def install_hooks(project):
    project.hooks.on_start.append(log_operation)
    return project

The install_hooks() function should return the project argument so that we can use the following idiom: install_hooks(Project()).main().

The default hook installation function is called install_hooks so in this case the following configuration would be equivalent:

#!bash
[flow]
hooks = my_hooks

As a final example, assuming that the flow package had a module that would install hooks for the automatic generation of snapshots after execution, it would likely be installed like this:

#!bash
[flow]
hooks = flow.snapshots

The cluster job id should be project specific

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


The cluster job id is currently generated from job id and operation-name, however it actually should be specific to a specific project directory, because otherwise it is difficult to submit cluster jobs that share a project name and state point.

I propose an id generated from the following arguments:

  1. project root directory,
  2. job id,
  3. operation-name,
  4. counter index.

The counter index would currently always default to zero, but moving forward it would allow us to submit the same job-operation multiple times, for example as a chained array.

Flagging Operations as "submit only"

Original report by RoseCers (Bitbucket: RoseCers, ).


I'd like a command line functionality to be added such that I can say "python project.py run -o all_not_for_submit --progress --parallel" so that I can run all quick aggregators without trying to run operations that should be "submit only"

UMich Flux Hadoop is incorrectly detected as having a scheduler

Description

On host flux-hadoop-login2.arc-ts.umich.edu, signac-flow incorrectly detects the hostname pattern and thinks this is a cluster with a scheduler. I am not sure what the real problem is, though - I tried editing the hostname using the suggested_pattern below and still had an issue. I just know that the hostname is one likely part of the solution.

To reproduce

import flow.environments
import re
current_pattern = flow.environments.umich.FluxEnvironment().hostname_pattern  # '(nyx|flux).*.umich.edu'
suggested_pattern = '(nyx|flux)((?!hadoop).)*.umich.edu'
good_fqdn = 'flux-login2.arc-ts.umich.edu'
bad_fqdn = 'flux-hadoop-login2.arc-ts.umich.edu'
re.match(current_pattern, good_fqdn)  # returns a match on the torque cluster
re.match(current_pattern, bad_fqdn)  # returns a match on the hadoop cluster - this is wrong
re.match(suggested_pattern, good_fqdn)  # still returns a match on the torque cluster
re.match(suggested_pattern, bad_fqdn)  # does not return a match on the hadoop cluster - this is right

Error output

bdice@flux-hadoop-login2 ~/project $ ./project.py status -d --show-traceback                                                 [5/1046]
Query scheduler...
WARNING:flow.project:Error occurred while querying scheduler: 'Torque not available.'.
ERROR:flow.project:Error occured during status update. Use '--show-traceback' to show the full traceback or '--ignore-errors' to complete the update anyways.
ERROR: Encountered error during program execution: 'Torque not available.'
Execute with '--show-traceback' or '--debug' to get more information.
Traceback (most recent call last):
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/scheduling/torque.py", line 29, in _fetch
    result = io.BytesIO(subprocess.check_output(cmd.split()))
  File "/sw/dsi/centos7/x86-64/Anaconda3-5.0.1/lib/python3.6/subprocess.py", line 336, in check_output
    **kwargs).stdout
  File "/sw/dsi/centos7/x86-64/Anaconda3-5.0.1/lib/python3.6/subprocess.py", line 403, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/sw/dsi/centos7/x86-64/Anaconda3-5.0.1/lib/python3.6/subprocess.py", line 709, in __init__
    restore_signals, start_new_session)
  File "/sw/dsi/centos7/x86-64/Anaconda3-5.0.1/lib/python3.6/subprocess.py", line 1344, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'qstat': 'qstat'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./project.py", line 86, in <module>
    Project().main()
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 2943, in main
    _exit_or_raise()
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 2911, in main
    args.func(args)
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 2571, in _main_status
    self.print_status(jobs=jobs, **args)
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 1242, in print_status
    self._fetch_scheduler_status(jobs, scheduler, err, ignore_errors)
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 1122, in _fetch_scheduler_status
    for sjob in self.scheduler_jobs(scheduler):
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 1000, in scheduler_jobs
    for sjob in self._expand_bundled_jobs(scheduler.jobs()):
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/project.py", line 976, in _expand_bundled_jobs
    for job in scheduler_jobs:
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/scheduling/torque.py", line 85, in jobs
    nodes = _fetch(user=self.user)
  File "/home/bdice/.local/lib/python3.6/site-packages/flow/scheduling/torque.py", line 31, in _fetch
    raise RuntimeError("Torque not available.")
RuntimeError: Torque not available.

System configuration

  • Operating System [e.g. macOS]: Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core
  • Version of Python [e.g. 3.7]: 3.6.3 (Anaconda 64-bit, GCC 7.2.0)
  • Version of signac [e.g. 1.0]: 0.9.5
  • Version of signac-flow: 0.6.4

Proposal for improved execution flow

Original report by Mike Henry (Bitbucket: mikemhenry, GitHub: mikemhenry).


Let's use this issue to track ideas on how to add some functionality to signac-flow that will keep jobs 'flowing'.

Few things to consider:

  • Use the hostname to only run a subset of operations
  • look at systemd or some sort of signaling type daemon to keep things moving along
  • Talk with HPC admins to see what they think would be effective

Improve contributor documentation

Original report by Mike Henry (Bitbucket: mikemhenry, GitHub: mikemhenry).


So as a new contributor, it was easy to find this page which gives a good overview. There is a note

Please see the individual package documentation for detailed guidelines on how to contribute to a specific package.

This note had me check here for extra information, but there really isn't any. The same is true for signac and signac-dashboard.

The tip section is good, but a few more hints would be nice:

  • How to set up dev environment
    • I don't think this needs to be opinionated ie virtual env or conda env, but could at least include the dependencies needed for development like flake8, coverage, Jinja2, mock, and cloudpickle.
  • How to run tests
    • coverage run --source=flow/ -m unittest discover tests/

Other than that, everything was pretty self explanatory, however, there isn't a CONTRIBUTORS.md or a CHANGES.md in signac-flow

Detailed project status failing on Blue Waters

Original report by Michael Howard (Bitbucket: mphoward, GitHub: mphoward).


I recently had an issue crop up with the detailed project status on Blue Waters, possibly due to a programming environment change there. It had been working previously, but after their system update, I got the following error:

> python project.py status --detailed
Update status cache:|#########################################################################################|100%
Collect job status info: 100%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 81/81 [00:02<00:00, 27.74it/s]
# Overview:
Total # of jobs: 81

label         ratio
------------  --------------------------------------------------
sampled       |########################################| 100.00%
equilibrated  |########################################| 100.00%
ERROR: Encountered error during program execution: ''list' object has no attribute 'copy''
Execute with '--debug' to get more information.

The version of signac-flow is:

> flow --version
signac-flow 0.6.0

with python 2.7.11 on Blue Waters.

If I alter flow/project.py on lines 1266 and 1269 to replace row.copy() by list(row), then the correct output is generated. I suspect a different code path was taken previously.

Bundling jobs smaller than node size

Original report by Michael Howard (Bitbucket: mphoward, GitHub: mphoward).


I've run into an issue using the --bundle and --parallel options of the submit interface for an operation in my project. After my simulations are complete, I want to run an analysis on each job that uses only 1 core. Since I need to take a whole node, I'd like to place 16 of these operations per node and execute in parallel. The submit command generates a script like I would expect, where multiple commands are submitted in background followed by a wait.

This works well with slurm + srun, but I am running on Blue Waters with torque + aprun. I noticed that my jobs were hitting the walltime, even though there should have been no problem. I dug into the Blue Waters documentation and found that, apparently, only one aprun command is permitted on a compute node at a time (https://bluewaters.ncsa.illinois.edu/job-bundling). I believe this either serializes or kills the other aprun jobs (not sure).

The solution, as suggested in the Blue Waters docs, is to either use another tool or to use xargs to run multiple commands. I opted to use xargs, since I could dump out the submission script to file and then modify it by moving lines around. This is obviously frustrating and error prone, especially as more tasks are run.

Is there any way of customizing how jobs are bundled by environment? Or, could there be a mechanism for selecting from a few "standard" bundle options for different schedulers?

The job status does not automatically update in between submissions

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


Observed behavior

Resubmitting jobs fails if the user did not update the status after the initial submission.

Expected behavior

The job status should be automatically updated before each submission.

Background

Currently, it is necessary to manually run the status update before submitting new jobs, in order to synchronize the project's status metadata with the one from the scheduler.
However, this may not be obvious to the user and is probably not communicated well enough in the documentation.

Proposed Solution

Run a status query automatically before each submission, however cache queries (with a short time out), to ensure that scripted submission does not result in too frequent polling.

Mapping of scheduler jobs to project

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


Problem

The FlowProject currently provides the scheduler_jobs() and the map_scheduler_jobs() method. These can be used to identify scheduler jobs that belong to the project within the current environment, but are still kind of awkward to use. For example, it should be simple to iterate through all scheduler-jobs associated with the current project, e.g. to change their status.

Current Solution

This is the code currently required to do so:

#!python
import flow

project = flow.FlowProject()
env = flow.get_environment()

sjobs = project.scheduler_jobs(env.scheduler_type())
sjobs_map = project.map_scheduler_jobs(sjobs)

for job in project:
    for sjobs in sjobs_map[job.get_id()].values():
        for sjob in sjobs:
            # do something with sjob

The reason for this rather convoluted approach is to enforce the querying of the environment scheduler only once as opposed to multiple times, for example for each job.

Proposed Enhancement

I propose to protect the environment scheduler resource, using the following API:

#!python
import flow

project = FlowProject()
env = flow.get_environment()

result = project.query_scheduler(env)
for job in project:
    for op_name, sjob in result(job):
        # do something with sjob

Restore job-based granularity

Original report by Jens Glaser (Bitbucket: jens_glaser, GitHub: jglaser).


I have to admit that I don't quite understand the submission interface. Currently it maps operations onto real cluster jobs. However, that may be a finer level of granularity than one really wants if the operations are quick, but the job scheduler throughput is slow. One may end up waiting for the next cluster slot to free up between two trivial tasks. I don't see an option to generate scripts that basically contain

python project.py run -j <my_job_id>

In other words, bundle all job-specific tasks into one single cluster job.

Alternatively, the execution graph could be traversed, as proposed for issue #35, and only the last operation would be issued, automatically fulfilling previous dependencies.

Bridges Template missing --gres=gpu:p100:2 with GPU-shared partition

Description

When trying to submit a job to the GPU-shared partition on Bridges with Signac Flow version 0.6.4 results in a failure sbatch: error: No gpu count given, specify --gres=gpu.

To reproduce

Within the template bridges.sh where the resources are allocated based on the partition chosen:

{% elif partition == 'GPU-shared' %}
#SBATCH -N {{ nn|default(1, true)|check_utilization(gpu_tasks, 1, threshold, 'GPU') }}
#SBATCH --ntasks-per-node=16
{% elif 'shared' in partition %}
#SBATCH -N {{ nn|default(1, true) }}
#SBATCH --ntasks-per-node={{ cpu_tasks }}
{% else %}

This section needs #SBATCH --gres=gpu:p100:2

Error output

If possible, copy any terminal outputs or attach screenshots that provide additional information on the problem.

sbatch: error: No gpu count given, specify --gres=gpu
             See https://www.psc.edu/bridges/user-guide
sbatch: error: Batch job submission failed: Invalid generic resource (gres) specification
Submission error: sbatch error:
make: *** [run] Error 1 

System configuration

Please complete the following information:

Linux-3.10.0-693.21.1.el7.x86_64-x86_64-with-centos-7.4.1708-Core
3.7.1 (default, Dec 14 2018, 19:28:38)
[GCC 7.3.0]
0.9.5
0.6.4

lambda functions not evaluated in directives

Original report by Tim Moore (Bitbucket: mootimot, GitHub: tcmoore3).


I'm trying to use a lambda function to set the number of mpi ranks to use for an operation:

@directives(executable=lambda job: 'mpirun --bind-to none -n {} python'.format(9))

When I check the submission, the lambda isn't actually evaluated, but the function object itself is printed:

$ python project.py submit --pretend
...
mpirun --bind-to none -n <function <lambda> at 0x619d17620> python project.py exec ...

I expected this to print ```mpirun --bind-to none -n 9 python ...`, but that is not what's happening.

Negative lines omitted?

Original report by Bradley Dice (Bitbucket: bdice, GitHub: bdice).


When outputting the current status with the setting --overview-max-lines 8, I see a negative number of lines omitted if I have fewer lines than the max. This seems strange. Maybe this should print nothing there for negative values?

Status project 'MyProject':
Total # of jobs: 45

label          progress
-------------  --------------------------------------------------
precompressed  |########################################| 100.00%
Lines omitted: -7

Export workflow graph

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


As per offline discussion, we determined that it should be possible to explicitly export the directed graph defined as FlowProject, .e.g., as NetworkX graph structure.

In loose relation to that, we should think on how to submit a complete workflow instead of individual operations to a scheduler.

Control `--show-traceback` via a configuration option

Feature description

I would like to be able to override the default behavior of --show-traceback with a configuration key. For example, the following configuration key:

[flow]
show_traceback = true

Should enable showing the traceback by default.

New `with_job` decorator for operations

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


I would like to add a flow.with_job-decorator that will switch into the job's workspace for the duration of an operation.

For example:

@FlowProject.operation
@with_job
def hello(job):
    print("hello {}".format(job))

would be equivalent to

@FlowProject.operation
def hello(job):
    with job:
        print("hello {}".format(job))

Similarly for cmd-operations:

from flow import FlowProject, cmd, with_job

@FlowProject.operation
@cmd
@with_job
def hello(job):
    return "echo 'hello {}'".format(job)

would be equivalent to

@FlowProject.operation
@cmd
def hello_cmd(job):
    return 'trap "cd `pwd`" EXIT && cd {job.ws} && echo "hello {job}"'

How is --debug meant to be used?

Original report by Jens Glaser (Bitbucket: jens_glaser, GitHub: jglaser).


Trying to use the --debug option gives me

[jsglaser@collins 1brf_md_assembly]$ python project.py --debug exec init_protein_config 6be70023b335b224517d50cf397b6cd
1.768879647180012
DEBUG:signac.contrib.project:Reading cache...
DEBUG:signac.contrib.project:No cache file found.
Usage: project.py [options]

project.py: error: no such option: --debug

but the error message says

project.py:122  |          mc = hpmc.integrate.sphere_union(seed=sp['seed'])
ERROR: Encountered error during program execution: ''v''
Execute with '--debug' to get more information.

--pretend and --serial break job listing

Original report by Vyas Ramasubramani (Bitbucket: vramasub, GitHub: vyasr).


In flow/project.py's _submit function, the line constructing the after variable breaks under certain circumstances:

#!python

Line 293: after = ':'.join(after.split(':') + [scheduler_job_id])

This breaks because when --pretend is specified along with --serial (and not --bundle), the scheduler_job_id is None because scheduler.submit does nothing, and then the string join fails on trying to join a Nonetype.

Not sure what the desired fix would be here, and it shouldn't be affecting anything in practice.

SLURM scheduler driver broken when qstat not available

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


The slurm driver is currently using the qstat command for the query of cluster jobs.
Some cluster environments with slurm scheduler provide the qstat command, but many don't.
The slurm scheduler driver should be patched to use the squeue command.
Below is a tested prototype implementation:

#!python
def _fetch(user=None):

    def parse_status(s):
        s = s.strip()
        if s == 'PD':
            return JobStatus.queued
        elif s == 'R':
            return JobStatus.active
        elif s in ['CG', 'CD', 'CA', 'TO']:
            return JobStatus.inactive
        elif s in ['F', 'NF']:
            return JobStatus.error
        return JobStatus.registered

    if user is None:
        user = getpass.getuser()

    cmd = ['squeue', '-u', user, '-h', '-o "%2t %100j"']
    try:
        result = subprocess.check_output(cmd).decode()
    except subprocess.CalledProcessError as error:
        print('error', error)
        raise
    except FileNotFoundError:
        raise RuntimeError("Slurm not available.")
    lines = result.split('\n')
    for line in lines:
        if line:
            status, name = line.strip()[1:-1].split()
            yield ClusterJob(name, parse_status(status))


class SQueueSlurmScheduler(flow.slurm.SlurmScheduler):
    def jobs(self):
        self._prevent_dos()
        for job in _fetch(user=self.user):
            yield job

The TORQUE scheduler is_present() method triggers subprocess.CalledProcessError

Description

The TORQUE scheduler is_present() method forks to qsub -V to determine whether the scheduler is present. On some systems this fork will return with an exit status, because the current context does not allow for submission, e.g., on a compute node.

To reproduce

For example, try to call project.py status on a Cray compute node.

Proposal: A library for project templates

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


Proposal for a template library

Right now there is one maintained project template that serves to provide some generic example and guidance for users on how to setup a project. For that purpose it needs to remain rather abstract and simple.

With a library of different templates we could cater much better towards the specific needs of individual workflows. With that in mind we would need some way to manage and distribute these templates.

Library Format

I propose to use the anaconda style, where a git repo may contain one or multiple templates (recipes).
That would mean a repo would have for example the following directory structure:

#!bash
flow-template-library:
./minimal/
./minimal/project.py
./hoomd/
./hoomd/project.py
./hoomd/operations.py
./gromacs/
./gromacs/project.py
./gromacs/operations.py
...

Installation

With the above format we would have the following installation methods:

Manual Installation

The user just clones the whole repo and copies the files into their project.

#!bash
git clone [email protected]:glotzer/flow-template-library.git
cp flow-template-library/hoomd/* my_hoomd_project/

Guided Installation

The user would use the flow interface, for example like this:

#!bash
git clone [email protected]:glotzer/flow-template-library.git
cd my_hoomd_project
flow init --src=../flow-template-library -t hoomd

or directly with the remote source:

#!bash
cd my_hoomd_project
flow init [email protected]:glotzer/flow-template-library.git -t hoomd

It would be possible to configure a repository like a channel:

#!bash
signac config add flow.templates.url [email protected]:glotzer/flow-template-library.git

cd my_hoomd_project
flow init -t hoomd

Versioning

There should be standardized way to indicate what flow version a template has been written/ tested for.
For example, by adding the following line to the project.py file:

#!python
flow.required_version("0.5.0")

Integrated Testing

Templates should have simple routines for the initialization and execution of simple operations.
Not all operations need to be testable.

Proposal for the improved management of provenance and reproducibility

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


About

This is a proposal for the integration of features into signac-flow that would simplify provenance management and increase the chance for reproducibility.
Such features would enable users to keep track of the exact path that data took from its source to the current / latest stage.

Note: This feature depends on and is developed in tandem with the snapshot feature (signac/issue #69) for the signac core application.

Provenance vs. Reproducibility

First, we need to clearly define and distinguish what we mean by data provenance and reproducibility.
The first refers to the ability to know exactly where data came from and which operations were applied to to them, while reproducibility means that we can also execute these operations again.

To preserve the provenance of the data set, we need to track

  1. The original state of the (input) data,
  2. the order of operations that were applied to the data set,
  3. any parameters that were provided as arguments,
  4. the state of the data set before application,
  5. the software and hardware environment that was used for execution [1].

To be able to immediately reproduce the workflow we also need to

  1. Have access to all scripts used for execution,
  2. access to the original or an equivalent execution environment (hardware and software) or at least the ability to create such an equivalent environment.

[1] The approach for tracking the environment is described in the next section.

Approach

The general idea is to create a snapshot before the execution of a data space operation and to store operation related metadata to a log file or a log document entry.
This may be done manually, but could also happen automatically, for example when operations are executed through the flow.run() or the Project.run() interface.

Such a log entry would have the following fields:

{
  timestamp: float,  # The UTC time in seconds since the UNIX epoch when the operation was triggered.
  snapshot_id: int, # The id of the snapshot taken immediately before execution.
  operation: str,  # The name of the operation.
  cmd: str, # The command used for execution of this operation.
  canonical: bool, # Indicates whether the command is notated in canonical form [1].
  environment: dict,  # Information about the execution environment [2].
  num_ranks: int, # The number of MPI ranks used for execution.
  note: str,  # Optional user-defined remarks or comments.
}

A log entry is either appended to a field called log as part of the job document and/ or written to a collection-based log file.

[1] The recorded command is assumed to be executed within the project root directory and should ideally be in canonical notation.
A canonical command is a command that is a function of job and is therefore easily transferable between jobs.
For example, the user may have executed operation my_op in parallel using the flow.run() interface with the command python operations.py my_op, however, the canonical command would be python operations.py my_op {job._id}.
All commands executed through the Project.run() and the flow.run() interface would automatically be canonical, other commands might require some extra treatment.

CAVEAT: The actual command used for execution can not always be accurately preserved.
For example, it is possible to record the number of MPI ranks used for execution, but it is not possible to automatically record which mpirun wrapper variant was actually used for execution and with which parallelization layout.
We are assuming that this is desired behavior and that the operations logic is not a function of the parallelization layout.

[2] Capturing the hardware and software (library) environment accurately and comprehensively is only possible under specific circumstances, e.g., when using a containerized environment. Existing tools such as pip freeze are capable to capture most software versions within the current python environment, however local extensions to the python path or other locally installed packages may not be captured.
Non-python tools are even harder to automatically detect.
We therefore propose a good enough approach, where it is mostly user responsibility to ensure that all essential environment parameters are accurately tracked, but the framework assists with that as much as reasonably possible.
See below for the proposed API on how to track the environment.

Proposed API

Keeping an operations log

Logging of operations would be enabled in the following ways:

  1. Manually via the internal flow._log_operation() function,
  2. Manually via the low-level flow.log_operation() functor,
  3. through arguments to flow.run(),
  4. through arguments to Project.run().

The API would also be exposed on the command line and the configuration, but for the sake of brevity, we are going to focus on the python API here.

Manual logging

This is how we would create a manual log entry:

flow.log_operation(
    # An instance of JobOperation
    operation,
    # Indicate whether the command is in canonical form (see above) (default: False).
    canonical=False,
    # Generate snapshots before execution if True (default: True).
    snapshot=True,
    # The field within the job document that contains the operations log.
    # None means 'do not store' (default: 'log').
    field='log',
    # The filename of a log file relative to the job's workspace path.
    file='operations.log',
    # An additional user-defined comment (default: None).
    note="Executed through aprun.",
    # Restore the job if the execution of the operation fails (default: True) [1].
    restore_on_fail=True,
): {<log entry>} (dict)
  • The file argument can alternatively be a file-like object.
  • The snapshot argument may optionally be a mapping, to be able to forward arguments to the snapshot function, e.g., {deep': True}.

Manual tracking is useful where operations are not executed through the flow interface. It is up to the user to ensure that the entry is correct and that the operation is actually executed after creating the log entry.

[1] The log_operation() functor may be used as context manager, where the log entry is only created if the execution of the operation actually succeeded, otherwise the snapshot is restored:

op = JobOperation('myop', job, './src/my_op {}'.format(job))
with log_operation(op):
    subprocess.run(op.cmd)

This would be roughly equivalent to:

op = JobOperation('myop', job, './src/my_op {}'.format(job))
log_entry = log_operation(op, field=None)
try:
    subprocess.run(op.cmd)
except:
    project.restore(_id=log_entry['snapshot_id'])
    raise
else:
    flow._log_operation(log_entry)

Semi-automatic logging through the flow interface

Alternatively, we can use the extended flow.run() interface to keep track:

flow.run(
    # An optional sequence of job-operations (default: None).
    operations=None,
    # The field within the job document that contains the operations log.
    # None means 'do not store' (default: 'log').
    log='log',
    # Specify a log file name relative to the job's workspace (default: None).
    logfile='operations.log',
    # Generate snapshots before execution if True (default: True).
    snapshot=True,
    # An optional parser argument to add the run CLI arguments to (default: None).
    parser=None,
    # Additional keyword arguments to be forwarded to the operations function.
    **kwargs
    )
  • When the first argument is None, the command line interface is started (replicating previous behavior).
  • The logfile argument can alternatively be a file-like object.
  • The snapshot argument may optionally be a mapping, to be able to forward arguments to the snapshot function, e.g., {deep': True}.

Using the flow.run() interface, the logged command would be logged in canonical form for the execution of a single operation for a single job, that means even if the operation was actually executed for all jobs like this:
python operations.py my_op, it would be logged as python operations.py my_op {job._id}.

Keeping track of the hardware and software (library) environment

The capturing of environment parameters would be embedded into the existing flow.environment structure.
The DefaultEnvironment would be extended to capture a few parameters that are always accessible, such as the python version environment or information provided by the platform module.
For everything else, user input is needed.
The current definition of the Environment class could be extended in this form:

class MyEnvironment(DefaultEnvironment):

    MyEnvironment.track_version('gcc', cmd='gcc --version')

Some tracking may also be enabled through the configuration:

[flow]
[[environment]]
[[[track]]]
[[[[gcc]]]]
cmd="gcc --version"

Example Usage

Listing all operations that have been applied to a specific job from the beginning to now, would be trivial:

pprint(job.document['log'])

Assuming we wanted to replicate the workflow applied to a_job to another_job, and that commands have been recorded canonically, this is a possible approach:

log = a_job.document['log']
for ts in sorted(log):
    subprocess.run(log[ts]['cmd'].format(job=another_job))

A canonical command is a function of the job's state point and document, e.g.: ./src/operations.py my_op {job._id}

The logic implemented above would also be implemented as a high-level function, for instance to replay the exact same sequence of operations:

# Execute with subprocess.run():
for job_op in flow.replay(a_job.document['log'], job=another_job):
    subprocess.run(job_op.cmd)

# Execute with flow.run():
flow.run(flow.replay(job.document['log']), job=another_job)

Detect and automatically execute dependencies of an operation

Original report by Jens Glaser (Bitbucket: jens_glaser, GitHub: jglaser).


For local testing, I would expect that @Project.pre(...) decorators are honored when I try to execute an operation where the precondition is not fulfilled.

E.g.
python project.py exec simulate

should evaluate the preconditions of that @Project.operation (simulate) and execute these automatically. Instead, it tries to directly execute the operation in question and fails.

Decoding status output

Original report by Vyas Ramasubramani (Bitbucket: vramasub, GitHub: vyasr).


When decoding the output of subprocess commands (specifically in the schedulers e.g. slurm.py that parse the output of "qstat"), the presence of unprintable or unencodable characters in the output can cause python3 to raise a UnicodeEncodeError. The best way to fix this might just be to change the relevant line to

#!python
result = subprocess.check_output(cmd.split()).decode(errors = 'backslashreplace')

to explicitly ignore the errors when decoding, but further investigation is needed.

More detailed information on the options available for fixing this problem are documented on this webpage.

The return value of next-operation() is ignored for <project> submit / script / run

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


With the last update not only the next-next operation is run/submitted or evaluated for script generation, but all next operations, using the FlowProject.next_operations() method.

While, this is generally an improvement and leads to arguably more expected behavior, it also leads to backwards-compatibility issues with projects, that have explicitly overloaded the next_operation() method. With this updated, next operations are gathered from both methods for the above mentioned functions. Duplicates are automatically discarded.

single string names input won't be auto detected in project.submit()

Description

When submitting jobs with python interface using project.submit(), and the names input is only one single string rather than list, submit won't auto detect single string and won't submit any job.

To reproduce

project.submit(names='operation_name',...)
won't submit any jobs as compare to

project.submit(names=['operation_name'],...)

System configuration

  • Operating System [stampede2]:
  • Version of Python [3.7]:
  • Version of signac [0.9.5]:
  • Version of signac-flow [0.6.4]:

Evaluation of solutions for a more concise command line API

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


The command line API to request the status/ submit and run jobs is currently a little verbose.

For users, who use the template, the commands for status and submit require the execution of modules within the my_project leading to the following patterns:

#!bash
$ python -m my_project.status
$ python -m my_project.submit

By design the execution script is not part of the package (in the template) and needs to be executed like this:

#!bash
python scripts/run.py

We propose that the API should be more concise.

A simple work-around with the current state is the use of aliases, such as

#!bash
alias submit="python -m my_project.submit"
alias status="python -m my_project.status"
alias run="python scripts/run.py"

However, aliases are probably not a good long-term solution to this issue.

It needs to be evaluated, what other methods could be applied to solve this issue.

Use environment profiles on opt-out basis

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


To use the shipped environment profiles, users are currently required to opt-in by adding the following line to the project.py file: import flow.environments.

We do this as a pre-cautionary measure, however I think it is fair to assume that this should be the default behavior.

I propose that we switch to an opt-out model with 6.0, where users can opt out to use profiles by changing the signac configuration:

[flow]
use_shipped_environments = False

or by switching a global variable: flow.USE_SHIPPED_ENVIRONMENTS = False.

The exact name of the variable would be up for debate.

Streamline the environment selection and deselection

Original report by Carl Simon Adorf (Bitbucket: csadorf, GitHub: csadorf).


It should be straightforward and properly documented on how to select and deselect environments using command line options, global variables, environment variables and config variables. This kind of selection is already partially implemented.

I propose the following hierarchy following standard practices.
The environment is determined (in order) by:

  1. A command line option / function argument to submit.
  2. A global variable set in the signac root namespace.
  3. An environment variable.
  4. A configuration variable.
  5. Automatic determination.

The automatic environment detection can be uniformly deactivated by setting any of these arguments or variables to an empty string ("").

Tagging @harperic .

.bundle file not found when running python project.py status

Original report by RoseCers (Bitbucket: RoseCers, ).


Traceback (most recent call last):
  File "../../../job_scripts/project.py", line 108, in <module>
    Project().main()
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 2785, in main
    _exit_or_raise()
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 2754, in main
    args.func(args)
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 2428, in _main_status
    self.print_status(jobs=jobs, **args)
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 1141, in print_status
    self._fetch_scheduler_status(jobs, scheduler, err, ignore_errors)
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 1023, in _fetch_scheduler_status
    for sjob in self.scheduler_jobs(scheduler):
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 901, in scheduler_jobs
    for sjob in self._expand_bundled_jobs(scheduler.jobs()):
  File "/home/rosecers/.local/lib/python3.5/site-packages/flow/project.py", line 879, in _expand_bundled_jobs
    with open(self._fn_bundle(job.name())) as file:
FileNotFoundError: [Errno 2] No such file or directory: '/scratch/sglotzer1_fluxoe/***/.bundles/***/bundle/73f21***'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.