To run:
pip install .
run-detection
To install when developing:
pip install .[dev]
Note: you may need to escape the square brackets. This will also install pytest, pylint, mypy etc.
To demo and test. The easiest way to test the whole run detection currently:
docker run -p 61613:61613 -p 61616:61616 -p 8161:8161 rmohr/activemq
- Login to the webgui at http://localhost:8161
- Create a new queue called "Interactive-Reduction"
- Send messages via the webgui to that queue
- Verify they are printed.
- The containers are stored in the container registry for the organisation on github.
- Have docker installed to build the container
- Construct the container by running:
docker build . -f ./container/rundetection.D -t ghcr.io/interactivereduction/rundetection
- Run the container by running:
docker run -it --rm --mount source=/archive,target=/archive --name rundetection ghcr.io/interactivereduction/rundetection
- To push containers you will need to setup the correct access for it, you can follow this guide.
- Upload the container by running (should be handled by CI, but this can be done manually if needed):
docker push ghcr.io/interactivereduction/rundetection -a
- To pull containers you will also need the permissions set above in the guide.
- Pull the container by running:
docker pull ghcr.io/interactivereduction/rundetection:latest
To run the unit tests only run: pytest . --ignore test/test_e2e.py
To run the e2e tests:
cd test
docker-compose up -d
cd ..
pytest test/test_e2e.py
This will pull the kafka/activemq containers and build the run detection container. Any code changes made after starting run detection will require the run detection container to be rebuilt.
For a run to be sent downstream the metadata of the recieved file must meet the specification for that instrument.
The specifications for each instrument are found in rundetection/specifications/<instrument>_specification.json
An example specification file:
{
"enabled": true
}
Within the json file each field is considered to be a Rule
and has a class associated with it. e.g. the EnabledRule
class.
Below is an example of adding a new rule. The example is unrealistic, but it shows how much flexibility there is.
- Update the specification file:
{ "enabled": true, "skipTitlesIncluding": ["foo", "bar", "baz"] }
- Create the
Rule
implementation:class SkipTitlesIncludingRule(Rule[List[str]]): def verify(self, metadata: NexusMetadata): return any(word in metadata.experiment_title for word in self._value)
- Update the
RuleFactory
:def rule_factory(key: str, value: T) -> Rule[T]: """ Given the rule key, and rule value, return the rule implementation :param key: The key of the rule :param value: The value of the rule :return: The Rule implementation """ match key.lower(): case "enabled": if isinstance(value, bool): return EnabledRule(value) else: raise ValueError(f"Bad value: {value} in rule: {key}") case "skiptitlesincluding": if isinstance(value, list): return SkipTitlesIncludingRule(value) else: raise ValueError(f"Bad value: {value} in rule: {key}") case _: raise MissingRuleError(f"Implementation of Rule: {key} does not exist.")