Code Monkey home page Code Monkey logo

kedro-wings's Introduction

Kedro Wings

As Seen on YouTube DataEngineerOne:

Give your next kedro project Wings! The perfect plugin for brand new pipelines, and new kedro users. This plugin enables easy and fast creation of datasets so that you can get straight into coding your pipelines.

Quick Start Usage Example: Iris Example

The following example is a recreation of the iris example pipeline.

Kedro Wings enables super fast creation of pipelines by taking care of all the catalog work for you. Catalog entries are automatically created by parsing the values for your nodes' inputs and outputs.

This pipeline automatically creates a dataset that reads from the iris.csv and then it creates 12 more datasets, corresponding to the outputs and inputs of the other datasets.

wing_example = Pipeline([
    node(
        split_data,
        inputs=['01_raw/iris.csv', 'params:example_test_data_ratio'],
        outputs=dict(
            train_x="02_intermediate/example_train_x.csv",
            train_y="02_intermediate/example_train_y.csv",
            test_x="02_intermediate/example_test_x.csv",
            test_y="02_intermediate/example_test_y.csv")
        ),
    node(
        train_model,
        ["02_intermediate/example_train_x.csv", "02_intermediate/example_train_y.csv", "parameters"],
        outputs="06_models/example_model.pkl",
    ),
    node(
        predict,
        inputs=dict(
            model="06_models/example_model.pkl",
            test_x="02_intermediate/example_test_x.csv"
        ),
        outputs="07_model_output/example_predictions.pkl",
    ),
    node(
        report_accuracy,
        inputs=["07_model_output/example_predictions.pkl", "02_intermediate/example_test_y.csv"],
        None
    ),
])

Quick Start Example: Chronocoding

Watch the video on Chronocoding here: Easy Stateful Pipelines with Chronocoding and Kedro Wings

Sometimes, there arises a need to rewrite data to the same path. This makes it easier to save state between kedro runs. Using KedroWings, you can automatically generate chronocoded datasets which temporally separates a read and write to a dataset.

By adding an ! at the end of a dataset, we signal to kedro that we wish to overwrite the data in that same filepath. Thus, we get around kedro's DAG requirement for datasets.

In Depth Breakdown on Chronocoding here: [KED-1667] Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs

def state_modifier(state: str) -> str:
    current_value = int(state)
    new_value = current_value + 1
    return str(new_value)

def create_pipelines(**kwargs):
    return Pipeline([
        node(
            state_modifier,
            inputs="01_raw/state.txt",
            outputs="01_raw/state.txt!"
        ),
    ])

Installation

Kedro Wings is available on pypi, and is installed with kedro hooks.

pip install kedro-wings

Setup with Kedro Pipeline

Simply add a KedroWings instance to the ProjectContext hooks tuple.

from kedro_wings import KedroWings


class ProjectContext(KedroContext):
    hooks = (
        KedroWings(),
    )

Setup with Jupyter Notebook

Simply pass the kedro context into KedroWings, and it will automatically add all catalog entries from all available pipelines.

# Load the context if not using a kedro jupyter notebook
from kedro.framework.context import load_context
context = load_context('./')

# Pass the context into KedroWings
from kedro_wings import KedroWings
KedroWings(context=context)

# context catalog now has all wings datasets available.
context.catalog.list()

Usage

Catalog Creation

Catalog entries are created using dataset input and output strings. The API is simple:

inputs="[PATH]/[NAME].[EXT]"

The PATH portion determines the directory where a file will be saved. The NAME portion determines the final output name of the file to be saved. The EXT portion determines the dataset used to save and load that particular data.

Ex: Creating an iris.csv reader
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')

This will create a pandas.CSVDataSet pointing at the 01_raw/iris.csv file.

Ex: Overwrite a Kedro Wing dataset using catalog.yml
# pipeline.py
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')
# catalog.yml
01_raw/iris.csv':
    type: pandas.CSVDataSet
    filepath: data/01_raw/iris.csv

If a catalog entry already exists inside of catalog.yml, with a name that matches the wing catalog name, KedroWings will NOT create that catalog, and will instead defer to the catalog.yml entry.

Default Datasets

The following are the datasets available by default.

default_dataset_configs={
".csv": {"type": "pandas.CSVDataSet"},
".yml": {"type": "yaml.YAMLDataSet"},
".yaml": {"type": "yaml.YAMLDataSet"},
".xls": {"type": "pandas.ExcelDataSet"},
".txt": {"type": "text.TextDataSet"},
".png": {"type": "pillow.ImageDataSet"},
".jpg": {"type": "pillow.ImageDataSet"},
".jpeg": {"type": "pillow.ImageDataSet"},
".img": {"type": "pillow.ImageDataSet"},
".pkl": {"type": "pickle.PickleDataSet"},
".parquet": {"type": "pandas.ParquetDataSet"},
".json": {"type": "json.JSONDataSet"}, # Only available in kedro 0.16.3
}

Configuration

Kedro Wings supports configuration on instantiation of the hook.

KedroWings(dataset_configs, paths, root, namespaces, enabled, context)

dataset_configs

:param dataset_configs: A mapping of file name extensions to the type of dataset to be created.

This allows the default dataset configurations to be overridden. This also allows the default extension to dataset mapping to be overridden or extended for other datasets.

Longer extensions are prioritized over shorter extensions, meaning multiple encoding methods can be applied to a single filetype.

Ex: Make default csv files use pipes as separators
KedroWings(dataset_configs={
    '.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})
Ex: Use dataset types directly
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
    '.csv': pandas.CSVDataSet,
})
Ex: Save CSVs with pipes or commas
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
    '.comma.csv': pandas.CSVDataSet,
    '.pipe.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})

paths

This allows specified paths to be remapped

:param paths: A mapping of old path names to new path names.
Ex: Moving data from 06_models to a new_models folder
KedroWings(paths={
    '06_models': 'new_models',
})

root

This setting is prepended to any paths parsed. This is useful if the dataset supports fsspec.

:param root: The root directory to save files to. Default: data
Ex: Saving data to s3 instead of the local directory.
KedroWings(root='s3a://my-bucket/kedro-data')
Ex: Allow individual datasets to choose their root
KedroWings(root=None)

namespaces

Namespaces from modular pipelines are supported. This parameter should be a list of the namespaces that KedroWings should account for. If a namespace is encountered, the output filepath will include the namespace in the extension.

Ex: Namespace

The determined file paths would be iris.example1.csv and iris2.example2.csv.

KedroWings(namespaces=['example1'])

pipeline(Pipeline([node(lambda x: x, inputs='iris.csv', outputs='iris2.csv')]), namespace="example1")

enabled

This setting allows easy enabling and disabling of the plugin.

:param enabled: Convenience flag to enable or disable this plugin.
Ex: Use an environment variable to enable or disable wings
KedroWings(enabled=os.getenv('ENABLE_WINGS'))

kedro-wings's People

Contributors

tam-nguyen-vn avatar tamsanh avatar waylonwalker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

kedro-wings's Issues

Feature Request: support dataset versioning

Versioning in Kedro is great. Even though we have an alternative to use different root for KedroWings, I believe it would be much better to support versions out of the box.

We can do that by adding extra parameter to KedroWings, namely versioned: bool = False. When it set True, we can update the KedroWings._dataset_configs. I've tried it by subclassing from KedroWings and it works fine.

Feature Request: cast Kedro Wings spells on `after_catalog_created`, not `before_pipeline_run`

First of all, thank you for the great and simple way to checkpoint my pipelines! You already saved a lot of my time.

Now, according to the current implementation, Kedro Wings transforms datasets only for the pipeline which was requested to run. This is okay in the most of the cases. However, it would be better to process datasets from all pipelines.

For example:

  • it will allow to use "fixed" datasets inside the kedro ipython
  • or to see them in kedro viz
  • this also works better with %reload_kedro IPython/Jupyter magic.

At the moment we can hack our way by inheriting from KedroWings, replacing before_pipeline_run to suppress this hook and creating a hook for after_catalog_created:

class EarlyBird(KedroWings):
    def __init__(self, *args, **kwargs):
        super(EarlyBird, self).__init__(*args, **kwargs)
        self._context: Optional[KedroContext] = None

    def set_context(self, context: KedroContext):
        self._context = context

    def before_pipeline_run(
            self, run_params: Dict, pipeline: Pipeline, catalog: DataCatalog, name: str = None,
    ):
        log.info(f'Adding pipeline "{name}" checkpoints with Kedro Wings...')
        super(EarlyBird, self).before_pipeline_run(run_params, pipeline, catalog)

    @hook_impl
    def after_catalog_created(self, catalog: DataCatalog, *args, **kwargs):
        for name, pipeline in self._context.pipelines.items():
            self.before_pipeline_run(None, pipeline, catalog, name)

However, the problem with this solution is that we need to pass context to the plugin. For example (in settings.py for Kedro 0.17.0):

class ProjectContext(KedroContext):
    def __init__(self, *args, **kwargs):
        super(ProjectContext, self).__init__(*args, **kwargs)
        load_dotenv()

        for item in HOOKS:   # in Kedro 0.17.0 hooks are defined as a config variable in the `settings.py`
            if isinstance(item, EarlyBird):
                item.set_context(self)

I personally not very happy with this solution because it breaks Framework API and may cause problems with next Kedro versions (I've just upgraded from 0.16.6 to 0.17.0 without much pain but who knows).

Feature Request: support multiple roots

For now we either define "global" root for all checkpoints or have to specify separate root for each type of checkpoints. This is okay but sometimes we need to transfer data between locations.

Possible scenarios:

  • checkpoint some of the datasets locally
  • or publish reports to the different S3 bucket

For now to support such behavior I am overriding datasets in the catalog. But I believe we can make it much better by introducing locations and tag inputs/outputs with location tags. For example:

  • 01_raw/base.csv will be check-pointed in the default root location.
  • while 01_raw/base.csv#reports will be saved to the reports location.

I am not sure whether #* suffix is the best API though. Looks like the possible interference with dataset transcoding convention should be considered.

Feature Request: support dataset layers

There is a handy feature in Kedro: special tags for datasets according to the data engineering convention. It is quite useful in combination with kedro viz or (as in my case) for creating a UI on top of Kedro and sorting/filtering datasets according to their position in a pipeline.

I think we need to support this feature and provide some way to specify the layer for pipeline checkpoints.

Currently I am subclassing KedroWings and use simple rules based on the first two letters of dataset locations:

class ExplicitLark(KedroWings):
    LAYERS: Dict[str, str] = {
        '01': 'raw',
        '02': 'intermediate',
        '03': 'primary',
        '04': 'feature',
        '05': 'model_input',
        '06': 'model',
        '07': 'model_output',
        '08': 'reporting',
    }

   @hook_impl    
   def before_pipeline_run(
            self, run_params: Dict, pipeline: Pipeline, catalog: DataCatalog, name: str = None,
    ):
        super(EarlyBird, self).before_pipeline_run(run_params, pipeline, catalog)
        self._update_layers(catalog)

    def _update_layers(self, catalog: DataCatalog):
        for dataset_name in catalog.list(regex_search=r'^\d+_.*'):
            layer_code = dataset_name[:2]
            if layer_code in self.LAYERS:
                layer_name = self.LAYERS[layer_code]
                catalog.layers[layer_name] = catalog.layers.get(layer_name, set())
                catalog.layers[layer_name].add(dataset_name)

I think we can provide additional parameter to the KedroWings which accepts dictionary with regular expression -> layer name and use the default convention for XX_* datasets.

Support for namespaced modular pipelines

Hello,

Thanks for the great plug-in! I am trying to figure out how to save different model outputs into different folders. I followed your tutorial for generating pipelines using modular pipelines and separate namespaces .

 node(
   func=inference_single_model,
   inputs=["test_subset_s3uris", "params:model_name"],
   outputs="07_model_output/testset_predictions.csv")  # not in catalog; Kedro-Wings hook handles saving

However, the namspace prefixes the pipeline_key in front of outputs, so instead of being stored in data/07_model_output/MyPipelineKey.testset_predictions.csv it is saved in data/MyPipelineKey.07_model_output/testset_predictions.csv.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.