Code Monkey home page Code Monkey logo

stac-task's Introduction

STAC Task (stac-task)

Build Status PyPI version Documentation Status codecov License

This Python library consists of the Task class, which is used to create custom tasks based on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.

This library is based on a branch of cirrus-lib except aims to be more generic.

Quickstart for Creating New Tasks

from typing import Any

from stactask import Task, DownloadConfig

class MyTask(Task):
    name = "my-task"
    description = "this task does it all"

    def validate(self, payload: dict[str, Any]) -> bool:
        return len(self.items) == 1

    def process(self, **kwargs: Any) -> list[dict[str, Any]]:
        item = self.items[0]

        # download a datafile
        item = self.download_item_assets(
            item,
            config=DownloadConfig(include=['data'])
        )

        # operate on the local file to create a new asset
        item = self.upload_item_assets_to_s3(item)

        # this task returns a single item
        return [item.to_dict(include_self_link=True, transform_hrefs=False)]

Task Input

Field Name Type Description
type string Must be FeatureCollection
features [Item] A list of STAC Item
process ProcessDefinition A Process Definition

ProcessDefinition Object

A STAC task can be provided additional configuration via the 'process' field in the input ItemCollection.

Field Name Type Description
description string Optional description of the process configuration
upload_options UploadOptions Options used when uploading assets to a remote server
tasks Map<str, Map> Dictionary of task configurations. A list of task configurations is supported for backwards compatibility reasons, but a dictionary should be preferred.

UploadOptions Object

Field Name Type Description
path_template string REQUIRED A string template for specifying the location of uploaded assets
public_assets [str] A list of asset keys that should be marked as public when uploaded
headers Map<str, str> A set of key, value headers to send when uploading data to s3
collections Map<str, str> A mapping of output collection name to a JSONPath pattern (for matching Items)
s3_urls bool Controls if the final published URLs should be an s3 (s3://bucket/key) or https URL
path_template

The path_template string is a way to control the output location of uploaded assets from a STAC Item using metadata from the Item itself. The template can contain fixed strings along with variables used for substitution. See the PySTAC documentation for LayoutTemplate for a list of supported template variables and their meaning.

collections

The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. At the end of processing, before the final STAC Items are returned, the Task class can be used to assign all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be compared. The first match will cause the Item's Collection ID to be set to the provided value.

For example:

"collections": {
    "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
}

In this example, the task will set any STAC Items that have an ID beginning with "LC08" to the landsat-c2l2 collection.

See JSONPath Online Evaluator to experiment with JSONPath and regex101 to experiment with regex.

tasks

The tasks field is a dictionary with an optional key for each task. If present, it contains a dictionary that is converted to a set of keywords and passed to the Task's process function. The documentation for each task will provide the list of available parameters.

{
    "tasks": {
        "task-a": {
            "param1": "value1"
        },
        "task-c": {
            "param2": "value2"
        }
    }
}

In the example above a task named task-a would have the param1=value1 passed as a keyword, while task-c would have param2=value2 passed. If there were a task-b to be run it would not be passed any keywords.

TaskConfig Object

DEPRECATED: tasks should be a dictionary of parameters, with task names as keys. See tasks for more information.

A Task Configuration contains information for running a specific task.

Field Name Type Description
name str REQUIRED Name of the task
parameters Map<str, str> Dictionary of keyword parameters that will be passed to the Tasks process function

Full Process Definition Example

Process definitions are sometimes called "Payloads":

{
    "description": "My process configuration",
    "collections": {
        "landsat-c2l2": "$[?(@.id =~ 'LC08.*')]"
    },
    "upload_options": {
        "path_template": "s3://my-bucket/${collection}/${year}/${month}/${day}/${id}"
    },
    "tasks": {
        "task-name": {
            "param": "value"
        }
    }
}

Migration

0.4.x -> 0.5.x

In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with the stac-asset library. This has necessitated a change in the parameters that the download methods accept.

The primary change is that the Task methods download_item_assets and download_items_assets (items plural) now accept fewer explicit and implicit (kwargs) parameters.

Previously, the methods looked like:

  def download_item_assets(
        self,
        item: Item,
        path_template: str = "${collection}/${id}",
        keep_original_filenames: bool = False,
        **kwargs: Any,
    ) -> Item:

but now look like:

    def download_item_assets(
        self,
        item: Item,
        path_template: str = "${collection}/${id}",
        config: Optional[DownloadConfig] = None,
    ) -> Item:

Similarly, the asset_io package methods were previously:

async def download_item_assets(
    item: Item,
    assets: Optional[list[str]] = None,
    save_item: bool = True,
    overwrite: bool = False,
    path_template: str = "${collection}/${id}",
    absolute_path: bool = False,
    keep_original_filenames: bool = False,
    **kwargs: Any,
) -> Item:

and are now:

async def download_item_assets(
    item: Item,
    path_template: str = "${collection}/${id}",
    config: Optional[DownloadConfig] = None,
) -> Item:

Additionally, kwargs keys were set to pass configuration through to fsspec. The most common parameter was requester_pays, to set the Requester Pays flag in AWS S3 requests.

Many of these parameters can be directly translated into configuration passed in a DownloadConfig object, which is just a wrapper over the stac_asset.Config object.

Migration of these various parameters to DownloadConfig are as follows:

  • assets: set include
  • requester_pays: set s3_requester_pays = True
  • keep_original_filenames: set file_name_strategy to FileNameStrategy.FILE_NAME if True or FileNameStrategy.KEY if False
  • overwrite: set overwrite
  • save_item: none, Item is always saved
  • absolute_path: none. To create or retrieve the Asset hrefs as absolute paths, use either Item#make_all_asset_hrefs_absolute() or Asset#get_absolute_href()

Development

Clone, install in editable mode with development requirements, and install the pre-commit hooks:

git clone https://github.com/stac-utils/stac-task
cd stac-task
pip install -e '.[dev]'
pre-commit install

To run the tests:

pytest

To lint all the files:

pre-commit run --all-files

Contributing

Use Github issues and pull requests.

stac-task's People

Contributors

dependabot[bot] avatar drnextgis avatar gadomski avatar ircwaves avatar jkeifer avatar jsignell avatar matthewhanson avatar philvarner avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stac-task's Issues

DownloadError with no message

In the middle of download_items_assets execution I'm getting DownloadError with no message:

---------------------------------------------------------------------------
DownloadError                             Traceback (most recent call last)
Cell In [4], line 19
      8 results = catalog.search(
      9     filter={
     10         "op": "and",
   (...)
     15     }
     16 )
     17 items = results.items()
---> 19 await download_items_assets(items, path_template="input_chunks/${id}")

File .env/lib/python3.10/site-packages/stactask/asset_io.py:40, in download_items_assets(items, path_template, config, keep_non_downloaded)
     34 async def download_items_assets(
     35     items: Iterable[Item],
     36     path_template: str = "${collection}/${id}",
     37     config: Optional[DownloadConfig] = None,
     38     keep_non_downloaded: bool = True,
     39 ) -> list[Item]:
---> 40     return await asyncio.gather(
     41         *[
     42             asyncio.create_task(
     43                 download_item_assets(
     44                     item=item,
     45                     path_template=path_template,
     46                     config=config,
     47                     keep_non_downloaded=keep_non_downloaded,
     48                 )
     49             )
     50             for item in items
     51         ]
     52     )

File .env/lib/python3.10/site-packages/stactask/asset_io.py:25, in download_item_assets(item, path_template, config, keep_non_downloaded)
     19 async def download_item_assets(
     20     item: Item,
     21     path_template: str = "${collection}/${id}",
     22     config: Optional[DownloadConfig] = None,
     23     keep_non_downloaded: bool = True,
     24 ) -> Item:
---> 25     return await stac_asset.download_item(
     26         item=item.clone(),
     27         directory=LayoutTemplate(path_template).substitute(item),
     28         file_name="item.json",
     29         config=config,
     30         keep_non_downloaded=keep_non_downloaded,
     31     )

File .env/lib/python3.10/site-packages/stac_asset/_functions.py:234, in download_item(item, directory, file_name, infer_file_name, config, messages, clients, keep_non_downloaded)
    229 async with Downloads(
    230     config=config or Config(),
    231     clients=clients,
    232 ) as downloads:
    233     await downloads.add(item, Path(directory), file_name, keep_non_downloaded)
--> 234     await downloads.download(messages)
    236 self_href = item.get_self_href()
    237 if self_href:

File .env/lib/python3.10/site-packages/stac_asset/_functions.py:172, in Downloads.download(self, messages)
    170             exceptions.append(result.error)
    171 if exceptions:
--> 172     raise DownloadError(exceptions)

DownloadError: 

Add ability to bootstrap new task

Looking for some way (a CLI seems the most obvious) to bootstrap a new task repository. This would include for example:

  • standard Python project files
  • Skeleton code task.py file
  • Skelton Dockerfile

I've been finding a great deal of variations in how various task repos are laid out, so some guidance is needed for sure.

Construct new output in handler

Currently the handler takes the array of Items returned by process and puts them in features field of the input payload, and returns it.

This means the output payload will have any additional fields from the input payload which may not be what is intended. The output of a Task should be a valid ItemCollection with an 'id' and process definition, whereas the input could be any JSON at all as long as it has a process definition and optionally an id.

Construct a new ItemCollection output to return, populating with the output from the Task.process into the features field and copying over the process definition.

Support for other cloud storage providers

This is repo is very interesting. Looks like only S3 storage is supported at the moment. Does the roadmap include adding support for other cloud providers like Azure Blob?

Add option for not including processing:software field

The Processing Extension states that the intention of it is to describe software that was used in the processing of the data, rather than metadata. Not all tasks process the data, so processing:software shouldn't be added in those cases. There should be a flag or something in the Task that indicates if the field should be added or not.

Add option to download assets with original filename

Currently stactask will download assets and name the file as the <asset_key>., thereby keeping the extension, but possibly (probably) renaming the file.

Sometimes a subprocess will expect the file to be named the same as it was originally. Present as option.

Collections should be an array?

Per the documentation in the README:

The collections dictionary provides a collection ID and JSONPath pattern for matching against STAC Items. At the end of processing, before the final STAC Items are returned, the Task class can be used to assign all of the Items to specific collection IDs. For each Item the JSONPath pattern for all collections will be compared. The first match will cause the Item's Collection ID to be set to the provided value.

This sounds fine except that dictionaries are maps in json and do not have any guarantees about order preservation, i.e., maps in the json spec are considered unordered.

Best practices typically suggest the use of arrays where ordering is meaningful, and maps where uniqueness is required. In this case ordering is meaningful and mandating collection name uniqueness could be problematic for some use cases (think cases where multiple patterns might be used to check for and assign collection membership to a single collection). So it seems like collections should be an array of collections-matching objects (CollectionsMatchers?).

I'd propose this "CollectionMatcher" object at minimum contain a type and collection_name property. The type would be used to resolve a matcher from a discriminated union of supported matchers. To start we'd support only one type, jsonpath, which also requires a pattern property. With this idea the example from the README becomes:

"collections": [
    {
        "type": "jsonpath",
        "pattern": "$[?(@.id =~ 'LC08.*')]",
        "collection_name": "landsat-c2l2"
    }
]

rename package to stac-workflow

The name stac-task may be misleading due to the use of "tasking" with regard to ordering future collects of data from a satellite or remote platform. See https://github.com/Element84/stat-api-spec

In this context a "task" is a processing function that takes in 0 or more STAC Items and returns 1 or more STAC Items. Where used the term workflows refers to a series of tasks chained together.

Suggest renaming repo to stac-process to better reflect what it does.

Sort stac_extensions

During post processing of STAC Items (currently in the handler, when the task name and version are added), the stac_extensions should be sorted. This will ensure consistent output files that only differ in the order in which extensions were added.

Types of Tasks

In @gadomski's PR #42 several types of tasks are defined.

class Task(BaseModel, ABC, Generic[Input, Output]):
    """A generic task."""

class PassthroughTask(Task[Anything, Anything]):
    """A simple task that doesn't modify the items at all."""

class StacOutputTask(Task[Input, Item], ABC):
    """Anything in, STAC out task."""

class ItemTask(StacOutputTask[Item], ABC):
    """STAC In, STAC Out task.

class HrefTask(StacOutputTask[Href], ABC):
    """Href in, STAC Out task.

I really like this way to define the input and output for different types of tasks, especially if it gives us JSON Schema!

Want to review these two Tasks:
StacOutputTask - Anything in, STAC out task.
HrefTask - Href in, STAC Out task

These tasks captures the need to create STAC Items from scratch. In the current payload structure you pass in parameters to the task in the process definition, you don't hand them in as part of the Task Input (which would normally be a FeatureCollection. So the href (or multiple hrefs), along with other parameters, would be provided in the process.tasks.taskname.parameterfield. I think that should be the preferred model and Input/Output is always going to be STAC Items, or nothing.

Next is the ItemTask which defines a single Items, but stac-tasks current are ItemCollections. A STAC task can take in 1 or more STAC Items as input, and returns 1 or more STAC Items. Note that this is not 1:1, a task doesn't process each item independently to create an array of output items (although you could write a task to do that). A task might take in one Item and create two derived Items from it, or it takes in an Item of data and a few other Items of auxiliary data used in the processing to create a single output Item.

Each task would have requirements on the number of input Items.

So I'd propose
StacOutputTask - Nothing in, STAC out task
ItemCollectionTask - ItemCollection in, ItemCollection out

I suppose we could also have an ItemTask for single Item input and output (a most common scenario), but I'm not sure I see the advantage over using ItemCollection with 1 Item.

Support for `process` arrays

We've had a bit of a disconnect between this lib and cirrus for some time. Cirrus supports an array of process definitions in payloads under the process key, whereas stac-task only supports a single process definition object. The reason for a process array is to support workflow chaining.

This disconnect is only increasing, with cirrus v1 supporting only process arrays, for consistency and simplicity (only one right way to do things). We also chose to only support array values for process in swoop (and we had to add workarounds in each task we implemented because of this). Without an change to support process array values stac-task based tasks are not compatible with swoop and will no longer be compatible with cirrus out-of-the-box.

RuntimeError: This event loop is already running

When I run this code with Python 3.9:

import asyncio
from stactask.asset_io import download_items_assets
from pystac_client import Client

catalog = Client.open(...)
results = catalog.search(...)
items = results.items()

loop = asyncio.get_event_loop()
loop.run_until_complete(download_items_assets(items, path_template="input_chunks/${id}"))

I'm getting this error:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[20], line 1
----> 1 loop.run_until_complete(download_items_assets(items, path_template="input_chunks/${id}"))

File ~/.pyenv/versions/3.9.0/lib/python3.9/asyncio/base_events.py:618, in BaseEventLoop.run_until_complete(self, future)
    607 """Run until the Future is done.
    608 
    609 If the argument is a coroutine, it is wrapped in a Task.
   (...)
    615 Return the Future's result, or raise its exception.
    616 """
    617 self._check_closed()
--> 618 self._check_running()
    620 new_task = not futures.isfuture(future)
    621 future = tasks.ensure_future(future, loop=self)

File ~/.pyenv/versions/3.9.0/lib/python3.9/asyncio/base_events.py:578, in BaseEventLoop._check_running(self)
    576 def _check_running(self):
    577     if self.is_running():
--> 578         raise RuntimeError('This event loop is already running')
    579     if events._get_running_loop() is not None:
    580         raise RuntimeError(
    581             'Cannot run the event loop while another loop is running')

RuntimeError: This event loop is already running
stactask==0.5.0

Task.upload_item_assets_to_s3 has wrong typing for item argument.

Task.upload_item_assets_to_s3 has type Dict for the item argument: https://github.com/stac-utils/stac-task/blob/4ff3c9e51e83fd88ed7aea639a70c80dae555a95/stactask/task.py#L192C9-L192C33

but that function (for the most part) just forwards the call to asset_io.upload_item_assets_to_s3:

def upload_item_assets_to_s3(
which has type Item for item.

Looking at the asset_io code it should be type Item.

Add local asset methods

These utility methods have been useful in Task implementations and should be added to the base Task class

   def is_local_asset(self, asset: Asset) -> bool:
        return bool(asset.href.startswith(str(self._workdir)))

    def get_local_asset_keys(self, item: Item) -> list[str]:
        return [key for key, asset in item.assets.items() if self.is_local_asset(asset)]

DeprecationWarning: There is no current event loop

Running tests yields deprecation warning

tests/test_task_download.py::test_download_nosuch_asset
  /Users/philvarner/code/stac-task/stactask/task.py:238: DeprecationWarning: There is no current event loop
    loop = asyncio.get_event_loop()

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html

Description of the issue is here: https://stackoverflow.com/questions/73361664/asyncio-get-event-loop-deprecationwarning-there-is-no-current-event-loop

Add Item post processing function

Currently the handler does some post processing on the Items (it adds in the task name and version).

This should be factored out into it's own function that can be extended by users subclassing Task.

Also see #22

Incorrect documentation on `collections` location in `ProcessDefinition` object

In several locations, the README indicates the collections field should be in the top-level of the ProcessDefinition object. However, task.py accesses collections from the UploadOptions object within the ProcessDefinition object.

The example process definition given in task.py shows the correct location of the collections field.

mypy error Class cannot subclass "Task" (has type "Any")

I have code like:

class MyDatasetToStac(Task): 

That returns this error from mypy:

task.py:30: error: Class cannot subclass "Task" (has type "Any")  [misc]

The fix is to add:

class Sentinel2ToStac(Task): # type: ignore

However, it would be nice if stactask provided the correct type information so that this was not necessary.

Description of this general issue is here: https://stackoverflow.com/questions/49888155/class-cannot-subclass-qobject-has-type-any-using-mypy

Add find_collection method

  def find_collection(self, item_dict: dict[str, Any]) -> Optional[str]:
        return next(
            (
                c
                for c, expr in self.upload_options.get("collections", {}).items()
                if stac_jsonpath_match(item_dict, expr)
            ),
            None,
        )

Add --save-workdir to the README

It would be helpful to highlight --save-workdir as a flag in the README. I had forgotten about it; the main problem is it causes developers (๐Ÿคš) to do unintended working directory things.

We may also want to discuss the separation of data from "your code", why the workdir is a tmp dir and that it gets deleted by default.

Change validate to instance method

Currently, Task.validate is marked as a class method, but there are scenarios where having access to the task instance within the validate method would be beneficial. For example, if a STAC item property name is passed as a task parameter and we need to validate that all input items possess that property, access to self.parameters would be useful. What do you think about changing validate to a regular instance method?

In some cases, add more version information to processing metadata

While we'd like to think that we're always running code that's been tagged w/ a version, in reality we're sometimes (often?!) running on a unreleased version. It'd be useful to provide a switch to turn on commit SHA version information in

item["properties"]["processing:software"] = {cls.name: cls.version}
. E.g. maybe:

{
    "processing:software": {
        "my-awesome-task": "0.1.0 @ abcdef0123456789"
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.