Code Monkey home page Code Monkey logo

dataloader's Introduction

PyPI - Python Version PyPI version shields.io GitHub License Documentation

The merlin-dataloader lets you quickly train recommender models for TensorFlow, PyTorch and JAX. It eliminates the biggest bottleneck in training recommender models, by providing GPU optimized dataloaders that read data directly into the GPU, and then do a 0-copy transfer to TensorFlow and PyTorch using dlpack.

The benefits of the Merlin Dataloader include:

  • Over 10x speedup over native framework dataloaders
  • Handles larger than memory datasets
  • Per-epoch shuffling
  • Distributed training

Installation

Merlin-dataloader requires Python version 3.7+. Additionally, GPU support requires CUDA 11.0+.

To install using Conda:

conda install -c nvidia -c rapidsai -c numba -c conda-forge merlin-dataloader python=3.7 cudatoolkit=11.2

To install from PyPi:

pip install merlin-dataloader

There are also docker containers on NGC with the merlin-dataloader and dependencies included on them

Basic Usage

# Get a merlin dataset from a set of parquet files
import merlin.io
dataset = merlin.io.Dataset(PARQUET_FILE_PATHS, engine="parquet")

# Create a Tensorflow dataloader from the dataset, loading 65K items
# per batch
from merlin.dataloader.tensorflow import Loader
loader = Loader(dataset, batch_size=65536)

# Get a single batch of data. Inputs will be a dictionary of columnname
# to TensorFlow tensors
inputs, target = next(loader)

# Train a Keras model with the dataloader
model = tf.keras.Model( ... )
model.fit(loader, epochs=5)

dataloader's People

Contributors

ajschmidt8 avatar arturkasymov avatar ayodeawe avatar benfred avatar bschifferer avatar edknv avatar evenoldridge avatar jperez999 avatar karlhigley avatar mikemckiernan avatar nv-alaiacano avatar oliverholworthy avatar radekosmulski avatar sararb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dataloader's Issues

Feed pre-trained embeddings to NVTabular

What is your question?
I have a dataset that includes a column feature of pre-trained embeddings. I couldn't find any documentations or examples on how this column should be passed to NVTabular. Is it treated as a continuous feature?

Out-of-memory error when iterating over merlin.dataloader.torch.Loader

I run into an out-of-memory error when iterating over the Merlin Loader in a Lightning Datamodule:

import os
from math import ceil
from os.path import join
from typing import Dict, List

import lightning.pytorch as pl
import merlin.io
import pyarrow as pa
from merlin.dataloader.torch import Loader
from merlin.dtypes import float32, int64
from merlin.schema import ColumnSchema, Schema

def _merlin_dataset_factory(path: str, columns: List[str], dataset_kwargs: Dict):
    return merlin.io.Dataset(
        path,
        engine='parquet',
        schema=Schema(
            [
                ColumnSchema(
                    'X', dtype=float32,
                    is_list=True, is_ragged=False,
                    properties={'value_count': {'max': n_cols}}
                )
            ] +
            [ColumnSchema(col, dtype=int64) for col in columns]
        ),
        **dataset_kwargs
    )

def _set_default_kwargs_dataloader(kwargs: Dict[str, any], train: bool = True):
    if kwargs is None:
        kwargs = {}

    parts_per_chunk = 8 if train else 1
    drop_last = True if train else False
    shuffle = True if train else False

    if 'parts_per_chunk' not in kwargs:
        kwargs['parts_per_chunk'] = parts_per_chunk
    if 'drop_last' not in kwargs:
        kwargs['drop_last'] = drop_last
    if'shuffle' not in kwargs:
        kwargs['shuffle'] = shuffle

    return kwargs


def _set_default_kwargs_dataset(kwargs: Dict[str, any], train: bool = True):
    if kwargs is None:
        kwargs = {}

    part_size = '100MB' if train else '325MB'

    if all(['part_size' not in kwargs, 'part_mem_fraction' not in kwargs]):
        kwargs['part_size'] = part_size

    return kwargs


class MerlinDataModule(pl.LightningDataModule):

    def __init__(
            self,
            path: str,
            columns: List[str],
            batch_size: int,
            dataloader_kwargs_train: Dict = None,
            dataloader_kwargs_inference: Dict = None,
            dataset_kwargs_train: Dict = None,
            dataset_kwargs_inference: Dict = None
    ):
        super(MerlinDataModule).__init__()

        for col in columns:
            assert col in PARQUET_SCHEMA.names

        self.dataloader_kwargs_train = _set_default_kwargs_dataloader(dataloader_kwargs_train, train=True)
        self.dataloader_kwargs_inference = _set_default_kwargs_dataloader(dataloader_kwargs_inference, train=False)

        self.train_dataset = _merlin_dataset_factory(
            join(path, 'train'),
            columns,
            _set_default_kwargs_dataset(dataset_kwargs_train, train=True)
        )
        self.val_dataset = _merlin_dataset_factory(
            join(path, 'val'),
            columns,
            _set_default_kwargs_dataset(dataset_kwargs_inference, train=False)
        )
        self.test_dataset = _merlin_dataset_factory(
            join(path, 'test'), columns, _set_default_kwargs_dataset(dataset_kwargs_inference, train=False))

        self.batch_size = batch_size

    def train_dataloader(self):
        return Loader(self.train_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_train)

    def val_dataloader(self):
        return Loader(self.val_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)

    def test_dataloader(self):
        return Loader(self.test_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)

    def predict_dataloader(self):
        return Loader(self.test_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)

My versions are:
lightning: 2.0.1
merlin-core: 23.6.0
merlin-dataloader: 23.4.0
pyarrow: 12.0.1

And I call, e.g.,

for batch in datamodule.train_dataloader():
    batch_data = batch[0]['X']
    break

which takes ages and then crashes with out-of-memory.

PyTorch Loader not working

I tried the Pytorch loader but it is giving me the following error:

BufferError: DLPack only supports signed/unsigned integers, float and complex dtypes.

but if I switched Tensorflow it works. Here's the code snippet:

train_dataset = merlin.io.Dataset(train_path, engine="parquet")
from merlin.loader.torch import Loader
# from merlin.dataloader.tensorflow import Loader
dl = Loader(train_dataset, batch_size=4096)

GPU is not detected properly when using SLURM

Hi,

I get the following error when using merlin-dataloader:

Description of the error:
The error seems to happen because the GPU does not seem to be detected here: https://github.com/NVIDIA-Merlin/core/blob/2fc6889cc5e9c7fbe497622710bea18c3344e824/merlin/core/compat.py#L26 (I verified nvml.device_get_count() always returns 0 in my case even though there is a GPU available). Not sure if this is because I use SLURM or whether this is a general problem?

As the HAS_GPU variable is set to False then. The concat function from merlin.core.dispatch uses pd.concat instead of the cudf version causing the error.

Potential fix:
Replacing this line with HAS_GPU = True fixes the issue and everything works as expected in my case.

Environment info:
OS: Rocky Linux 8.7
Python: 3.10.9
merlin-core: 0.10.0
merlin-dataloader: 0.0.4
cudf-cu11: 23.02
rmm-cu11: 23.02
dask-cudf: 23.02

I installed both cudf + merlin via pip:
python -m pip install cudf-cu11==23.02 rmm-cu11==23.02 dask-cudf-cu11==23.02 --extra-index-url https://pypi.nvidia.com/
python -m pip install merlin-dataloader

Stacktrace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:282, in LoaderBase.__next__(self)
    280 def __next__(self):
    281     """Get the next batch."""
--> 282     return self._get_next_batch()

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:349, in LoaderBase._get_next_batch(self)
    347 # get the first chunks
    348 if self._batch_itr is None:
--> 349     self._fetch_chunk()
    351 # try to iterate through existing batches
    352 try:

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:298, in LoaderBase._fetch_chunk(self)
    296 if isinstance(chunks, Exception):
    297     self.stop()
--> 298     raise chunks
    299 self._batch_itr = iter(chunks)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:767, in ChunkQueue.load_chunks(self, dev)
    765             self.chunk_logic(itr)
    766     else:
--> 767         self.chunk_logic(itr)
    768 except Exception as e:  # pylint: disable=broad-except
    769     self.put(e)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:740, in ChunkQueue.chunk_logic(self, itr)
    737 if spill is not None and not spill.empty:
    738     chunks.insert(0, spill)
--> 740 chunks = concat(chunks)
    741 chunks.reset_index(drop=True, inplace=True)
    742 chunks, spill = self.get_batch_div_chunk(chunks, self.dataloader.batch_size)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/core/dispatch.py:476, in concat(objs, **kwargs)
    474     return dd.multi.concat(objs)
    475 elif isinstance(objs[0], (pd.DataFrame, pd.Series)) or not HAS_GPU:
--> 476     return pd.concat(objs, **kwargs)
    477 else:
    478     return cudf.core.reshape.concat(objs, **kwargs)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
    305 if len(args) > num_allow_args:
    306     warnings.warn(
    307         msg.format(arguments=arguments),
    308         FutureWarning,
    309         stacklevel=stacklevel,
    310     )
--> 311 return func(*args, **kwargs)

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/core/reshape/concat.py:294, in concat(objs, axis, join, ignore_index, keys, levels, names, verify_integrity, sort, copy)
     90 @deprecate_nonkeyword_arguments(version=None, allowed_args=["objs"])
     91 def concat(
     92     objs: Iterable[NDFrame] | Mapping[Hashable, NDFrame],
   (...)
    101     copy: bool = True,
    102 ) -> FrameOrSeriesUnion:
    103     """
    104     Concatenate pandas objects along a particular axis with optional set logic
    105     along the other axes.
   (...)
    292     ValueError: Indexes have overlapping values: ['a']
    293     """
--> 294     op = _Concatenator(
    295         objs,
    296         axis=axis,
    297         ignore_index=ignore_index,
    298         join=join,
    299         keys=keys,
    300         levels=levels,
    301         names=names,
    302         verify_integrity=verify_integrity,
    303         copy=copy,
    304         sort=sort,
    305     )
    307     return op.get_result()

File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/core/reshape/concat.py:384, in _Concatenator.__init__(self, objs, axis, join, keys, levels, names, ignore_index, verify_integrity, copy, sort)
    379     if not isinstance(obj, (ABCSeries, ABCDataFrame)):
    380         msg = (
    381             f"cannot concatenate object of type '{type(obj)}'; "
    382             "only Series and DataFrame objs are valid"
    383         )
--> 384         raise TypeError(msg)
    386     ndims.add(obj.ndim)
    388 # get the sample
    389 # want the highest ndim that we have, and must be non-empty
    390 # unless all objs are empty

TypeError: cannot concatenate object of type '<class 'cudf.core.dataframe.DataFrame'>'; only Series and DataFrame objs are valid

Thanks for your help!
Felix

Does this work with images?

Do you have any notebooks or tutorials where you show how can I define a dataloader for image classification with pytorch?

[BUG] Exception in model when using ragged tensors with tensorflow 2.10.0

Bug description

When using TensorFlow ragged tensors with Merlin Models and the latest tensorflow package 2.10.0 installed from pypi we are getting errors that show up in various places at random. This results in random failures when training a model that uses ragged features.

I've noticed that the more ragged tensors there are involved in a model, the more likely it is to fail with an exception

Steps/Code to reproduce bug

  1. Open Merlin Tensorflow Docker Container
docker run -it --gpus=all --rm nvcr.io/nvidia/merlin/merlin-tensorflow:22.10 bash
> cd /models
> pip install pytest-repeat
  1. Run tests with pre-installed version of tensorflow
pytest -rsx --count 10 tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling
tests/unit/tf/transformers/test_block.py ....................
# ============ 20 passed, 81 warnings in 683.13s (0:11:23) =========
  1. Install latest tensorflow package
pip install --upgrade tensorflow==2.10.0
  1. run the same tests again
 pytest -rsx --count 10 tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling
# tests/unit/tf/transformers/test_block.py .FF....FF.....F.....
# ============5 failed, 15 passed, 79 warnings in 660.47s (0:11:00) =============

Expected behavior

Tests should pass with the pypi published tensorflow package installed.

Environment details

  • Merlin version: 22.10
  • Platform: Ubuntu WSL-2
  • Python version: 3.8
  • Tensorflow version (GPU):
    • [2.10.0, 2.11.0rc2]
    • 2.10.0+nv22.10 (nvcr.io/nvidia/tensorflow:22.10-tf2-py3)
  • Note on nvidia/tensorflow compatibiliy. that 22.10 image requires a min dirver version of R520 or R525 to avoid a cuda linkererror
E           numba.cuda.cudadrv.driver.LinkerError: [222] Call to cuLinkAddData results in UNKNOWN_CUDA_ERROR
E           ptxas application ptx input, line 9; fatal   : Unsupported .version 7.8; current version is '7.7'

Example Errors

From tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling

In Embedding looup
>       testing_utils.model_test(model, loader, run_eagerly=run_eagerly, reload_model=True)

tests/unit/tf/transformers/test_block.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
merlin/models/tf/utils/testing_utils.py:91: in model_test
    losses = model.fit(dataset, batch_size=50, epochs=epochs, steps_per_epoch=1, **fit_kwargs)
merlin/models/tf/models/base.py:831: in fit
    out = super().fit(**fit_kwargs)
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
    raise e.with_traceback(filtered_tb) from None
merlin/models/tf/models/base.py:652: in train_step
    outputs = self.call_train_test(x, y, sample_weight=sample_weight, training=True)
merlin/models/tf/models/base.py:515: in call_train_test
    forward = self(
merlin/models/tf/models/base.py:1054: in call
    outputs, context = self._call_child(block, outputs, context)
merlin/models/tf/models/base.py:1083: in _call_child
    outputs = call_layer(child, inputs, **call_kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
    return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
merlin/models/tf/core/combinators.py:549: in call
    out = call_layer(layer, layer_inputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
    return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
merlin/models/tf/core/combinators.py:549: in call
    out = call_layer(layer, layer_inputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
    return layer(inputs, *args, **filtered_kwargs)
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
merlin/models/tf/inputs/embedding.py:379: in call
    out[feature_name] = self._call_table(inputs[feature_name], **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = EmbeddingTable(
  (features): Dict(
    (categories): ColumnSchema(name='categories', tags={<Tags.LIST: 'list'>, <Tags...s): tf.float32
    (event_weekday_sin): tf.float32
    (event_weekday_cos): tf.float32
    (user_age): tf.float32
  )
), inputs = <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f7d63b90820>
kwargs = {'features': {'categories': <tf.RaggedTensor [[229, 108, 27],
 [73, 66, 91],
 [91, 304, 40],
 [88, 24, 100],
 [173, 25...7, 48],
 [2, 26, 6],
 [119, 19, 10],
 [8, 24, 22],
 [25, 14, 9],
 [8, 39, 41],
 [48, 31, 41]]>}, 'testing': False, ...}

    def _call_table(self, inputs, **kwargs):
        if isinstance(inputs, tuple) and len(inputs) == 2:
            inputs = list_col_to_ragged(inputs)

        # Eliminating the last dim==1 of dense tensors before embedding lookup
        if isinstance(inputs, tf.Tensor):
            inputs = tf.squeeze(inputs, axis=-1)

        if isinstance(inputs, (tf.RaggedTensor, tf.SparseTensor)):
            if self.sequence_combiner and isinstance(self.sequence_combiner, str):
                if isinstance(inputs, tf.RaggedTensor):
                    inputs = inputs.to_sparse()
>               out = tf.nn.safe_embedding_lookup_sparse(
                    self.table.embeddings, inputs, None, combiner=self.sequence_combiner
                )
E               tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "categories" "                 f"(type EmbeddingTable).
E
E               {{function_node __wrapped__Reshape_device_/job:localhost/replica:0/task:0/device:GPU:0}} Input to reshape is a tensor with 128 values, but the requested shape has 16 [Op:Reshape]
E
E               Call arguments received by layer "categories" "                 f"(type EmbeddingTable):
E                 โ”ฌรฒ inputs={'categories': '<tf.RaggedTensor [[229, 108, 27],\n [73, 66, 91],\n [91, 304, 40],\n [88, 24, 100],\n [173, 250, 222],\n [202, 53, 181],\n [148, 317, 298],\n [62, 133, 13]]>'}
E                 โ”ฌรฒ kwargs={'features': {'item_id_seq': '<tf.RaggedTensor [[30, 36, 2],\n [63, 26, 7],\n [8, 2, 26],\n [7, 119, 19],\n [8, 8, 24],\n [23, 25, 14],\n [203, 8, 39],\n [7, 48, 31]]>', 'categories': '<tf.RaggedTensor [[229, 108, 27],\n [73, 66, 91],\n [91, 304, 40],\n [88, 24, 100],\n [173, 250, 222],\n [202, 53, 181],\n [148, 317, 298],\n [62, 133, 13]]>', 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': '<tf.RaggedTensor [[-2.872848, -2.2042453, -1.3785743, -1.8874695],\n [-1.6890998, -1.2722275, -2.3514903, -1.92955],\n [-2.8999336, -2.4636097, -2.2710497, -2.286526],\n [-1.5714741, -1.4789449, -2.3554823, -2.7618356],\n [-2.4185984, -2.0205548, -2.5964813, -1.3083836],\n [-2.3086674, -1.7392484, -1.7713305, -2.7666268],\n [-1.6371323, -1.2986643, -2.741214, -2.8319168],\n [-2.4287252, -2.1784492, -2.9063392, -2.1983893]]>', 'event_hour_sin': '<tf.RaggedTensor [[0.87519, 0.74108493, 0.7021362, 0.20880622],\n [0.7751159, 0.5112479, 0.6897644, 0.4618523],\n [0.9877395, 0.8067353, 0.2785073, 0.5127078],\n [0.42245722, 0.37151605, 0.662217, 0.29389524],\n [0.6415208, 0.8767573, 0.99127567, 0.13979311],\n [0.08620147, 0.9858557, 0.93777907, 0.27667108],\n [0.3221979, 0.8577759, 0.50887823, 0.37246662],\n [0.007813198, 0.11941633, 0.8280699, 0.46084708]]>', 'event_hour_cos': '<tf.RaggedTensor [[0.20659643, 0.34245858, 0.79789066, 0.6533956],\n [0.21133861, 0.29099822, 0.29909313, 0.38387182],\n [0.6483645, 0.26128438, 0.5097018, 0.0679879],\n [0.35938728, 0.122880295, 0.31028247, 0.42299688],\n [0.47082654, 0.3139253, 0.54357225, 0.33945748],\n [0.1049997, 0.27617747, 0.16228387, 0.25873512],\n [0.25580782, 0.6140291, 0.7268057, 0.018048134],\n [0.45832136, 0.25160003, 0.34657586, 0.24501482]]>', 'event_weekday_sin': '<tf.RaggedTensor [[0.2813703, 0.19528128, 0.9347062, 0.019609304],\n [0.21666977, 0.121215016, 0.5087735, 0.016004765],\n [0.039047234, 0.19097127, 0.24673569, 0.5377793],\n [0.5285091, 0.9157901, 0.11795353, 0.0054059345],\n [0.03237542, 0.58396244, 0.024678292, 0.45758602],\n [0.7823007, 0.16673656, 0.69379526, 0.50180435],\n [0.62802064, 0.8324896, 0.01632287, 0.74014264],\n [0.6049697, 0.14898655, 0.62781924, 0.20901294]]>', 'event_weekday_cos': '<tf.RaggedTensor [[0.643363, 0.7165992, 0.02970902, 0.96914697],\n [0.28388807, 0.6481853, 0.572021, 0.82755953],\n [0.46088678, 0.9416777, 0.7179773, 0.7201362],\n [0.8907028, 0.2865412, 0.24590795, 0.9174267],\n [0.5912441, 0.6970897, 0.44896245, 0.71856815],\n [0.6125141, 0.9272917, 0.64227724, 0.8949168],\n [0.5841027, 0.8976689, 0.9729557, 0.5763398],\n [0.1711828, 0.49837482, 0.11556943, 0.93513274]]>', 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}, 'training': 'True', 'testing': 'False', 'mask': ('None',), 'targets': {'item_id_seq': '<tf.RaggedTensor [[36, 2, 59],\n [26, 7, 48],\n [2, 26, 6],\n [119, 19, 10],\n [8, 24, 22],\n [25, 14, 9],\n [8, 39, 41],\n [48, 31, 41]]>'}}

merlin/models/tf/inputs/embedding.py:397: InvalidArgumentError
In dataloader transform
>       predictions = model.predict(loader, batch_size=8, steps=1)

tests/unit/tf/transformers/test_block.py:194:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
merlin/models/tf/models/base.py:915: in predict
    out = super(BaseModel, self).predict(
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
    raise e.with_traceback(filtered_tb) from None
merlin/models/tf/loader.py:337: in __getitem__
    return DataLoader.__next__(self)
merlin/models/loader/backend.py:356: in __next__
    return self._get_next_batch()
merlin/models/loader/backend.py:393: in _get_next_batch
    batch = next(self._batch_itr)
merlin/models/loader/backend.py:495: in <genexpr>
    return (self._handle_tensors(*batch) for batch in batches)
merlin/models/tf/loader.py:514: in _handle_tensors
    to_return = transform(*to_return)
merlin/models/tf/core/tabular.py:490: in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SequencePredictNext(
  (_pre): ListToRagged()
), inputs = {'categories': <tf.RaggedTensor [[230, 232, 48, 89],
 [59, 60, 186, 213],
 [126, 26, 264, 130],
 [221, 254, 118, 94],
....7090351],
 [0.297698, 0.8401888, 0.033521578, 0.062045045],
 [0.101515464, 0.52725536, 0.55959797, 0.21533921]]>, ...}
targets = {'item_id_seq': <tf.RaggedTensor [[2, 19, 23],
 [7, 13, 11],
 [12, 19, 35],
 [21, 8, 17],
 [16, 87, 21],
 [3, 23, 45],
 [18, 33, 6],
 [4, 17, 40]]>}, training = False, testing = False, kwargs = {}, new_target = <tf.RaggedTensor [[2, 19, 23],
 [7, 13, 11],
 [12, 19, 35],
 [21, 8, 17],
 [16, 87, 21],
 [3, 23, 45],
 [18, 33, 6],
 [4, 17, 40]]>
new_inputs = {'item_id_seq': <tf.RaggedTensor [[22, 2, 19],
 [7, 7, 13],
 [24, 12, 19],
 [10, 21, 8],
 [11, 16, 87],
 [16, 3, 23],
 [8, 18, 33],
 [42, 4, 17]]>}, k = 'categories', v = <tf.RaggedTensor [[230, 232, 48, 89],
 [59, 60, 186, 213],
 [126, 26, 264, 130],
 [221, 254, 118, 94],
 [92, 32, 64, 307],
 [165, 217, 9, 200],
 [127, 19, 14, 262],
 [91, 31, 268, 196]]>

    def call(
        self, inputs: TabularData, targets=None, training=False, testing=False, **kwargs
    ) -> Tuple:
        self._check_seq_inputs_targets(inputs)

        # Shifts the target column to be the next item of corresponding input column
        new_target = inputs[self.target_name][:, 1:]
        if targets is None:
            targets = dict({self.target_name: new_target})
        elif isinstance(targets, dict):
            targets[self.target_name] = new_target
        else:
            raise ValueError("Targets should be None or a dict of tensors")

        new_inputs = dict()
        for k, v in inputs.items():
            if k in self.schema.column_names:
                # Removes the last item of the sequence, as it belongs to the target
>               new_inputs[k] = v[:, :-1]
E               tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "sequence_predict_next" "                 f"(type SequencePredictNext).
E
E               {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Expected begin, end, and strides to be 1D equal size tensors, but got shapes [5,7], [1], and [1] instead. [Op:StridedSlice] name: sequence_predict_next/strided_slice/
E
E               Call arguments received by layer "sequence_predict_next" "                 f"(type SequencePredictNext):
E                 โ”ฌรฒ inputs={'item_id_seq': '<tf.RaggedTensor [[22, 2, 19, 23],\n [7, 7, 13, 11],\n [24, 12, 19, 35],\n [10, 21, 8, 17],\n [11, 16, 87, 21],\n [16, 3, 23, 45],\n [8, 18, 33, 6],\n [42, 4, 17, 40]]>', 'categories': '<tf.RaggedTensor [[230, 232, 48, 89],\n [59, 60, 186, 213],\n [126, 26, 264, 130],\n [221, 254, 118, 94],\n [92, 32, 64, 307],\n [165, 217, 9, 200],\n [127, 19, 14, 262],\n [91, 31, 268, 196]]>', 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': '<tf.RaggedTensor [[-2.6321867, -1.8960141, -1.3827176, -1.2979091],\n [-1.8419394, -1.8213999, -2.562653, -2.6888773],\n [-2.4615552, -1.6159127, -1.673134, -2.5371401],\n [-1.5431116, -2.8743885, -2.3154233, -2.4626255],\n [-2.5722892, -2.3213966, -1.8599365, -1.783778],\n [-2.156473, -1.9776379, -2.3210335, -2.0802162],\n [-2.5183177, -2.0650043, -1.443693, -1.8066756],\n [-1.9003887, -2.8115828, -2.2143846, -2.8128927]]>', 'event_hour_sin': '<tf.RaggedTensor [[0.51098526, 0.70276326, 0.46969718, 0.2593699],\n [0.6366822, 0.3187587, 0.20556688, 0.66788745],\n [0.95823944, 0.6742351, 0.5742672, 0.48995376],\n [0.57856935, 0.03680166, 0.19356328, 0.38373777],\n [0.7118747, 0.22016901, 0.5754359, 0.2170586],\n [0.47503403, 0.3085081, 0.42746046, 0.62828666],\n [0.56101286, 0.34885383, 0.6834232, 0.7991214],\n [0.94796985, 0.9983975, 0.3882422, 0.3168175]]>', 'event_hour_cos': '<tf.RaggedTensor [[0.23879345, 0.56286687, 0.10395286, 0.9803779],\n [0.8735259, 0.8191224, 0.6987805, 0.9508517],\n [0.7893286, 0.25938335, 0.9413311, 0.39071575],\n [0.07055479, 0.80096203, 0.43780208, 0.2883036],\n [0.111770056, 0.26904875, 0.47182095, 0.9736114],\n [0.8101491, 0.82414633, 0.8063273, 0.6993653],\n [0.71110797, 0.47319838, 0.11871127, 0.9379435],\n [0.2919758, 0.99880177, 0.423404, 0.40579566]]>', 'event_weekday_sin': '<tf.RaggedTensor [[0.8151913, 0.5308951, 0.6098198, 0.62053984],\n [0.26776758, 0.1522134, 0.676475, 0.3599095],\n [0.3076552, 0.5671771, 0.044789243, 0.41746667],\n [0.31245553, 0.067309044, 0.94046533, 0.2758188],\n [0.99028224, 0.6741068, 0.32716697, 0.6070125],\n [0.046452753, 0.6973015, 0.91035575, 0.30170825],\n [0.5714769, 0.8249496, 0.60799205, 0.5094672],\n [0.25499228, 0.5543269, 0.293861, 0.17119424]]>', 'event_weekday_cos': '<tf.RaggedTensor [[0.70672184, 0.8013267, 0.90591705, 0.7007231],\n [0.29772517, 0.6715878, 0.78386587, 0.35773724],\n [0.7337082, 0.07127134, 0.8546552, 0.11059116],\n [0.82335657, 0.8343963, 0.09818372, 0.2662608],\n [0.27790034, 0.017068934, 0.21067896, 0.6397068],\n [0.13516696, 0.26911286, 0.04904374, 0.7090351],\n [0.297698, 0.8401888, 0.033521578, 0.062045045],\n [0.101515464, 0.52725536, 0.55959797, 0.21533921]]>', 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}
E                 โ”ฌรฒ targets=None
E                 โ”ฌรฒ training=False
E                 โ”ฌรฒ testing=False
E                 โ”ฌรฒ kwargs=<class 'inspect._empty'>

merlin/models/tf/transforms/sequence.py:234: InvalidArgumentError
In list_col_to_ragged
>       batch = next(iter(loader))[0]

tests/unit/tf/transformers/test_block.py:186:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.8/dist-packages/keras/utils/data_utils.py:515: in __iter__
    for item in (self[i] for i in range(len(self))):
/usr/local/lib/python3.8/dist-packages/keras/utils/data_utils.py:515: in <genexpr>
    for item in (self[i] for i in range(len(self))):
merlin/models/tf/loader.py:337: in __getitem__
    return DataLoader.__next__(self)
merlin/models/loader/backend.py:356: in __next__
    return self._get_next_batch()
merlin/models/loader/backend.py:393: in _get_next_batch
    batch = next(self._batch_itr)
merlin/models/loader/backend.py:495: in <genexpr>
    return (self._handle_tensors(*batch) for batch in batches)
merlin/models/tf/loader.py:514: in _handle_tensors
    to_return = transform(*to_return)
merlin/models/tf/core/tabular.py:487: in _tabular_call
    inputs = self.pre_call(inputs, transformations=pre)
merlin/models/tf/core/tabular.py:225: in pre_call
    return self._maybe_apply_transformations(
merlin/models/tf/core/tabular.py:298: in _maybe_apply_transformations
    return _transformations(inputs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
    outputs = self.super().__call__(inputs, *args, **kwargs)  # type: ignore
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
    raise e.with_traceback(filtered_tb) from None
merlin/models/tf/transforms/tensor.py:40: in call
    outputs[name] = list_col_to_ragged(val)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

col = (<tf.Tensor: shape=(32, 1), dtype=float32, numpy=
array([[0.6031388 ],
       [0.8644114 ],
       [0.76056653],
     ... numpy=
array([[4],
       [4],
       [4],
       [4],
       [4],
       [4],
       [4],
       [4]], dtype=int32)>)

    def list_col_to_ragged(col: Tuple[tf.Tensor, tf.Tensor]):
        values = col[0][:, 0]
>       row_lengths = col[1][:, 0]
E       tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "list_to_ragged" "                 f"(type ListToRagged).
E
E       {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Expected begin, end, and strides to be 1D equal size tensors, but got shapes [2], [5,7], and [2] instead. [Op:StridedSlice] name: list_to_ragged/strided_slice/
E
E       Call arguments received by layer "list_to_ragged" "                 f"(type ListToRagged):
E         โ”ฌรฒ inputs={'item_id_seq': ('tf.Tensor(shape=(32, 1), dtype=int64)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'categories': ('tf.Tensor(shape=(32, 1), dtype=int64)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_hour_sin': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_hour_cos': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_weekday_sin': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_weekday_cos': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}
E         โ”ฌรฒ kwargs={'training': 'None'}

merlin/models/tf/utils/tf_utils.py:460: InvalidArgumentError

From tests/unit/tf/transformers/test_block.py::test_retrieval_transformer (#833 )

in hugging face TFAttention
>       predictions = model.predict(loader)

tests/unit/tf/transformers/test_block.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
merlin/models/tf/models/base.py:940: in predict
    out = super(BaseModel, self).predict(
/home/oliverholworthy/venv/lib/python3.8/site-packages/keras/utils/traceback_utils.py:70: in error_handler
    raise e.with_traceback(filtered_tb) from None
merlin/models/tf/models/base.py:730: in predict_step
    return self(x, training=False)
merlin/models/tf/models/base.py:1107: in call
    outputs, context = self._call_child(block, outputs, context)
merlin/models/tf/models/base.py:1136: in _call_child
    outputs = call_layer(child, inputs, **call_kwargs)
merlin/models/tf/utils/tf_utils.py:437: in call_layer
    return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/encoder.py:166: in __call__
    return super().__call__(inputs, **kwargs)
merlin/models/tf/core/encoder.py:143: in call
    return combinators.call_sequentially(
merlin/models/tf/core/combinators.py:819: in call_sequentially
    outputs = call_layer(layer, outputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:437: in call_layer
    return layer(inputs, *args, **filtered_kwargs)
merlin/models/config/schema.py:58: in __call__
    return super().__call__(*args, **kwargs)
merlin/models/tf/transformers/block.py:131: in call
    transformer = self.transformer(pre)
/usr/local/lib/python3.8/dist-packages/transformers/modeling_tf_utils.py:412: in run_call_with_unpacked_inputs
    return func(self, **unpacked_inputs)
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:480: in call
    outputs = block(
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:255: in call
    output_attn = self.attn(
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:197: in call
    attn_outputs = self._attn(query, key, value, attention_mask, head_mask, output_attentions, training=training)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = TFAttention(
  (c_attn): TFConv1D(
    (_feature_shapes): Dict(
      (item_id_seq): TensorShape([8, None])
      (cat...s): tf.float32
    (event_weekday_sin): tf.float32
    (event_weekday_cos): tf.float32
    (user_age): tf.float32
  )
)
q = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[ 2.95891333e-02, -8.08279309e-03,  6.67379573e-02, .....98048e-02,  4.53588329e-02, ...,
           6.08591698e-02,  2.35463437e-02, -3.80779319e-02]]]],
      dtype=float32)>
k = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[-2.16567628e-02, -6.11152239e-02, -1.46311242e-02, .....19633e-02, -1.48034040e-02, ...,
          -3.22266296e-02,  7.09668454e-03,  3.13788280e-02]]]],
      dtype=float32)>
v = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[ 5.50495535e-02, -2.97006536e-02,  3.75673361e-02, .....80668e-02, -2.66220625e-02, ...,
          -4.91122231e-02,  5.88061437e-02,  2.58720685e-02]]]],
      dtype=float32)>, attention_mask = None, head_mask = None, output_attentions = False, training = False

    def _attn(self, q, k, v, attention_mask, head_mask, output_attentions, training=False):
        # q, k, v have shape [batch, heads, sequence, features]
        w = tf.matmul(q, k, transpose_b=True)
        if self.scale:
            dk = tf.cast(shape_list(k)[-1], dtype=w.dtype)  # scale attention_scores
            w = w / tf.math.sqrt(dk)

        if not self.is_cross_attention:
            # if only "normal" attention layer implements causal mask

            # w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst.
            _, _, nd, ns = shape_list(w)
            b = self.causal_attention_mask(nd, ns, dtype=w.dtype)
>           b = tf.reshape(b, [1, 1, nd, ns])
E           tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "attn" "                 f"(type TFAttention).
E
E           {{function_node __wrapped__Reshape_device_/job:localhost/replica:0/task:0/device:GPU:0}} Input to reshape is a tensor with 8 values, but the requested shape has 9 [Op:Reshape]
E
E           Call arguments received by layer "attn" "                 f"(type TFAttention):
E             โ€ข x=tf.Tensor(shape=(8, 3, 48), dtype=float32)
E             โ€ข layer_past=None
E             โ€ข attention_mask=None
E             โ€ข head_mask=None
E             โ€ข encoder_hidden_states=None
E             โ€ข encoder_attention_mask=None
E             โ€ข use_cache=True
E             โ€ข output_attentions=False
E             โ€ข training=False

/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:123: InvalidArgumentError

How to use dataloader without NVTabular?

@radekosmulski developed the examples for dataloaders with native TensorFlow/PyTorch:#47

I wonder how do we recommend to use the dataloaders without NVTabular. When we do not use NVTabular, we do not have a dataschema, therefore, all columns are treated as an input feature.

In particular, TensorFlow keras expects that the output of the dataloader is (x, y). Currently, @radekosmulski added a parsing function: https://github.com/NVIDIA-Merlin/dataloader/blob/8157c7650248359201545934fd7d3e7f95b0eea8/examples/01a-Getting-started-Tensorflow.ipynb

label_column = 'rating'

def process_batch(data, _):
    x = {col: data[col] for col in data.keys() if col != label_column}
    y = data[label_column]
    
    return (x, y)

loader._map_fns = [process_batch]

I think the parsing function will be always required and therefore, this it not the best user experience.
Previously, the dataloader supported to parse arguments cat_names, cont_names, label_names.

I think there are multiple options:

  • Easy tool to define manually a schema
  • Enabling parameters cat_names, cont_names, label_names, etc.
  • maybe there are more?

[BUG] Dataloader doesnt release memory and memory growth

Bug description

I run a training script and reinitialize nvtabular dataloaders. After each initialization, the available GPU memory decreases (fmem = pynvml_mem_size(kind="free", index=0) ). That is unexpected. The available GPU memory should be constant. After training a while, I run OOM for that reason. The use case is that I want to train per day -> each day is a separate file.

Results:
[41587769344,
41113812992,
40744714240,
40509833216,
40390295552,
39972962304,
39866007552,
39635320832,
39299776512,
39115227136,
38846791680,
38544801792,
38276366336,
38007930880,
37789827072,
37521391616,
37269733376,
37018075136,
36758028288,
36514758656,
36263100416,
36112105472,
35701063680,
35474571264,
35222913024]

Steps/Code to reproduce bug

Data Generation

import cudf

df = cudf.DataFrame({
    'col1': list(range(100000000)),
    'col2': list(range(100000000)),
    'col3': list(range(100000000)),
    'target1': list(range(100000000)),
    'target2': list(range(100000000))
})

df.to_parquet('test'+str(0)+'.parquet')

Executing Script

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

import merlin.models.tf.dataset as tf_dataloader
from merlin.schema.tags import Tags
import nvtabular as nvt

from nvtabular.utils import pynvml_mem_size

def map_output(x,y):
    out = []
    for tg in ['target1', 'target2']:
        out.append(y[tg])
    y = tf.concat(out, axis=1)
    return x, y

import gc
import glob

BATCH_SIZE = 2*64*1024
fmems = []

for j in range(25):
    print(j)
    fmem = pynvml_mem_size(kind="free", index=0)
    print(fmem)
    fmems.append(fmem)
    gc.collect()
    files = sorted(glob.glob(
        'test'+str(0)+'.parquet'
    ))
    train = nvt.Dataset(files, part_size="100MB")
    train_dl = tf_dataloader.BatchedDataset(
        train,
        batch_size = 1024*64,
        shuffle=True,
        drop_last=True,
        cat_names=['col1', 'col2', 'col3'],
        label_names=['target1', 'target2']
    ).map(map_output)
    gc.collect()
    for i, (inputs, labels) in enumerate(train_dl):
        if i>10:
            del train, train_dl
            break

Expected behavior

The available memory should be constant.

[BUG] Data parallel training freezes due to different number of batches

Bug description

In data parallel training, we start multiple workers with different initialization of the dataloader and train with horovod. After each batch update, the parameters are synced. Merlin dataloader has different number of batches depending on the selected rank. Therefore, some workers finishes the training loop and other workers are still training - this causes horovod to freeze.

import cudf
import os

import merlin.models.tf.dataset as tf_dataloader
import nvtabular as nvt

os.system('mkdir ./test/')

df = cudf.DataFrame({
    'col1': range(0,9000000)
})
df.to_parquet('./test/part_1.parquet')
df = cudf.DataFrame({
    'col1': range(0,10000000)
})
df.to_parquet('./test/part_2.parquet')
df = cudf.DataFrame({
    'col1': range(0,11000000)
})
df.to_parquet('./test/part_3.parquet')
df = cudf.DataFrame({
    'col1': range(0,12000000)
})
df.to_parquet('./test/part_4.parquet')

ds = nvt.Dataset('./test/*.parquet', part_size='100MB')
for i in range(4):
    train_dl = tf_dataloader.BatchedDataset(
        ds,
        batch_size = 1024*16,
        shuffle=True,
        drop_last=True,
        cat_names=['col1'],
        global_size=4,
        global_rank=i,
    )
    print(len(train_dl))

Output:

549
610
671
732

NVTabular KerasSequenceLoader cost long time to load data

Describe the bug
Customer use NVTabular KerasSequenceLoader and parquet dataset to load data, but it cost a long time, almost 1.5+ hours.

Reproduction Steps
The data file and script will be upload.

Expected behavior

Reduce the time of data loading.

Environment (please complete the following information):

Container: nvcr.io/nvidia/merlin/merlin-tensorflow:latest

Thanks!

Dataloader does not work with tf.keras.layers.Embedding

As of 12372f4, dataloader does not work with tf.keras.layers.Embedding.

When we run the following:

import pandas as pd
import tensorflow as tf

from merlin.dataloader.tensorflow import Loader
from merlin.io.dataset import Dataset

dataset = Dataset(pd.DataFrame({"user_id": [0, 1, 2, 3, 4]}))
layer = tf.keras.layers.Embedding(input_dim=101, output_dim=8)

with Loader(dataset, batch_size=1) as loader:
    for idx, batch in enumerate(loader):
        print(f"Starting batch {idx}...")
        x, y = batch
        _ = layer(x["user_id"])
        print(f"Finished batch {idx}.")

the first batch works, but after that it will immediately fail:

Starting batch 0...
Finished batch 0.
Starting batch 1...
2023-04-10 11:05:35.362655: F tensorflow/core/framework/tensor.cc:729] Check failed: IsAligned() ptr = 0x7fc81ae01008
Aborted (core dumped)

Environment

This seems to occur both in the nvcr.io/nvidia/merlin/merlin-tensorflow:23.02 container (with main branches pip-installed) and the current pre-release container.

Versions in the pre-release container (built on April 8, 2023):

$ docker run --rm -it --gpus 0 nvcr.io/nvstaging/merlin/tmp-merlin-tensorflow-stg:23.03 bash
root@7d9fc2b399bc:/opt/tritonserver# cd /core/
root@7d9fc2b399bc:/core# git log -1
commit fce0366a458e436101d91b23e08d60b7eeb8481b (grafted, HEAD -> main, origin/main, origin/HEAD)
root@7d9fc2b399bc:/core# cd /dataloader/
root@7d9fc2b399bc:/dataloader# git log -1
commit 12372f4c6562f296c510f6734e748ef54c375c33 (grafted, HEAD -> main, origin/main, origin/HEAD)
root@7d9fc2b399bc:/dataloader# python -c 'import tensorflow; print(tensorflow.__version__)'
2.11.0
root@7d9fc2b399bc:/dataloader#  python -c 'import cudf; print(cudf.__version__)'
22.12.00

Shuffle doesn't work

Hi all!

Below an examle of code:

from merlin.loader.torch import Loader
from merlin.io import Dataset


train_ds = Dataset('train.parquet')
train_loader = Loader(train_ds, batch_size=65536, shuffle=True)

for batch in train_loader:
    print(batch)

After running I got following:

TypeError: sample() got an unexpected keyword argument 'keep_index'

[BUG] Unable to extract session embeddings from a session-based transformer model

Bug description

I am trying to extract embedding but the following options do not work.

Option 1:

I tried these scripts but none works:

model_transformer.query_embeddings(train, index='session_id')

or 

model_transformer.query_embeddings(train, batch_size = 1024,  index='session_id')

Option 2:

I am able to generate session embeddings for a single batch but it does not work if I iterate over the loader batch by batch, it crashes.

this works:
model_transformer.query_encoder(batch[0])

but iterating over loader batch by batch does not work:

all_sess_embeddings = []
for batch, _ in iter(loader):
    embds = model_transformer.query_encoder(batch).numpy()
    del batch
    gc.collect()
    all_sess_embeddings.append(embds)

Steps/Code to reproduce bug

Please go to this link to download the gist for the code to repro the issue:

https://gist.github.com/rnyak/d70822084c26ba6972615512e8a78bb2

Expected behavior

We should be able to extract session embeddings from query_model of the transformer model without any issues.

Environment details

  • Merlin version:
  • Platform:
  • Python version:
  • PyTorch version (GPU?):
  • Tensorflow version (GPU?): Using tensorflow 23.06 image with the latest branches pulled.

Support for images from spark DataFrame

Hi all,

I'm trying to train a recommendation system for user-items using Databricks as distributed computing platform.
I'm applying proprocessing steps to my user related features as well as to the images, however, spark does not work well with numpy arrays. Petastorm approach to support image preprocessing on spark is to encode all numpy arrays as bytearray.
Unfortunately, your dataloader does not have support for bytearray

is it possible to add support for custom unpacking function ?

NVTabular KerasSequenceLoader costs longer time to load multi-hot features than one-hot features

  • If some of the features are multi-hot, it costs 616s.
  • If all features are one-hot, it costs 13s.

Container: nvcr.io/nvidia/merlin/merlin-tensorflow:22.11
The reproducing dataset and scripts are here.
genData/to_parquet.ipynb is the main script and you can change the is_slot_multi to reproduce.

is_slot_multi = True  # True means some of the features are multi-hot, false means all features are one-hot

[Feature Request] Make the torch dataloader support TensorDict

PyTorch has recently released tensordict, which provides a series of utilities for efficiently process batches of heterogeneous tensor types.
It makes it easy to index multiple tensors at the same time, manipulate their shape, split them etc. It also allows to work efficiently with nested data structures. Have a look at the README, the doc and tutorial.

For context, ysing TensorDict with replay buffers in TorchRL provided us with at least an order of magnitude speed-up during data collection.

Would it be possible to consider a version of the dataloader that would output a TensorDict, instead of a dictionary as in the tutorial?

[FEA] Data loader: support to padding sparse sequential features on the left side

Is your feature request related to a problem? Please describe.
The PyT and TF Dataloader support padding list (sparse) features to the right, which means that shorter list sequences will be completed with 0s in the right.
For sequential recommendation, a common use case is to keep the last N user interactions, what can be done either in the preprocessing or in the model side. The NVT Slice op, supports truncating to the last N elements (by providing negative limits).
But it is also useful to be able to do additional truncation in the model side (e.g. truncating with larger max seq. threshold with Slice op and tuning the best max sequence length according to model accuracy and training speed. To do such truncation in the model side, the padding needs to be applied by the Data Loader on the left side of the sequence features, so that when they are converted to dense tensors the padding 0s are placed on the left side. Thus, features could be sliced in the model like feature[:, -keep_last_n:] without loosing the sequence features of users with less than N interactions.

Describe the solution you'd like
Create an argument for the datalodader sparse_padding_side, which by default is right, but can be set to left

Update Transformers4Rec to use new dataloader package

Transformers4Rec depends on the NVTabular class TorchAsyncItr for the dataloader definition. As this class was already updated to use the new data loader, T4Rec has been implicitly updated to use the new data loader which broke example notebooks, and CI tests.

I am listing here the issues raised in T4Rec because of the new conventions implemented in the merlin loader. The goal is to give enough context to decide what should be updated (in t4rec or loader) and ensure a stable integration with T4Rec:

  1. The new data loader is setting the input types from the source dataset being loaded (parquet files) while T4rec is following the convention of the old nvtabular where inputs are always converted to hard dtypes int32 and float32. (more information can be found in this ticket)
  2. As the data loader is not converting the features dtypes anymore, this conversion should happen in the dataset before saving to disk. As an example, the nvtabular criteo notebook shows how to convert hexadecimal features to numerical ones before saving to disk.
  3. The T4Rec model is expecting the data loader to load only the features specified in the model definition while the new t4rec loader is returning all features specified in the original dataset. (more details can be found in this PR description)
  4. T4rec is built on top of the nvtabular class TorchAsyncItr which makes it sensitive to break each time nvtabular classes change. We could create the T4Rec loader class as a subclass of the merlin.loader.torch.Loader instead.
  5. The t4rec data loader class that used the nvtabular loader is registered as nvtabular, after changing t4rec to use directly merlin data loader we could change the registered name to something like merlin-loader
  6. Some datasets can have continuous features with different types (float32+float64). T4Rec models are expecting all the continuous features to have the same type (all float32 or all float64 for example). In more detail, the issue is raised because t4rec is concatenating all continuous values into one vector and using an MLP project to get their final embeddings.

Can't import Loader

The following commands produce the below error on import as of this morning, compared to about 12 hours ago:

!pip install merlin-dataloader
from merlin.loader.torch import Loader 
ImportError                               Traceback (most recent call last)
/tmp/ipykernel_27/1850034830.py in <module>
     14 get_ipython().system('pip install merlin-dataloader')
---> 15 from merlin.loader.torch import Loader
     16 from merlin.io import Dataset
     17 

/opt/conda/lib/python3.7/site-packages/merlin/loader/torch.py in <module>
     19 
     20 from merlin.core.dispatch import HAS_GPU
---> 21 from merlin.loader.loader_base import LoaderBase
     22 
     23 numpy_to_torch_dtype_dict = {

/opt/conda/lib/python3.7/site-packages/merlin/loader/loader_base.py in <module>
     38     pull_apart_list,
     39 )
---> 40 from merlin.dag import BaseOperator, ColumnSelector, DictArray, Graph, Node
     41 from merlin.dag.executors import LocalExecutor
     42 from merlin.io import shuffle_df

ImportError: cannot import name 'DictArray' from 'merlin.dag' (/opt/conda/lib/python3.7/site-packages/merlin/dag/__init__.py)

Device assignment does not work in PyTorch

As of 12372f4, device assignment in the PyTorch dataloader does not work correctly with multiple GPUs.

import os
import pandas as pd

from merlin.dataloader.torch import Loader
from merlin.io.dataset import Dataset


dataset = Dataset(pd.DataFrame({"a": list(range(10))}))
dataset = dataset.repartition(npartitions=2)

rank = int(os.environ["LOCAL_RANK"])

with Loader(
    dataset,
    batch_size=1,
    global_rank=rank,
    global_size=2,
    device=rank,
) as loader:
    for idx, batch in enumerate(loader):
        x, y = batch
        device = x["a"].device
        print(f"rank: {rank}, device: {device}")

When I run the above, I get:

root@ba87ba84045a:/dataloader# torchrun --nproc_per_node=2 test_torch_multi_gpu.py 
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0

But for rank 1, tensors are expected to be be placed on cuda:1 not cuda:0.

[Bug] Conda install command in README cannot find merlin-loader package

Using the README's conda install command in a fresh environment fails with an error indicating the package is not available from any channels. I see there is a merlin-dataloader package in the nvidia channel associated with this repository. Should we be using this instead and update the README command?

(base) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ mamba create -n test -c conda-forge python=3.7
...
(base) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ conda activate test
(test) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ conda install -c nvidia -c rapidsai -c numba -c conda-forge merlin-loader python=3.7 cudatoolkit=11.2
Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.

PackagesNotFoundError: The following packages are not available from current channels:

  - merlin-loader

Current channels:

  - https://conda.anaconda.org/nvidia/linux-64
  - https://conda.anaconda.org/nvidia/noarch
  - https://conda.anaconda.org/rapidsai/linux-64
  - https://conda.anaconda.org/rapidsai/noarch
  - https://conda.anaconda.org/numba/linux-64
  - https://conda.anaconda.org/numba/noarch
  - https://conda.anaconda.org/conda-forge/linux-64
  - https://conda.anaconda.org/conda-forge/noarch
  - https://repo.anaconda.com/pkgs/main/linux-64
  - https://repo.anaconda.com/pkgs/main/noarch
  - https://repo.anaconda.com/pkgs/r/linux-64
  - https://repo.anaconda.com/pkgs/r/noarch

To search for alternate channels that may provide the conda package you're
looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.

GPU memory does not get freed up properly after each batch

Describe the issue:

Dataloader accumulates GPU memory across batches if not manually calling gc.collect() after each batch or after every e.g every 5th batch. See example below, manually calling garbage collection saves around 7GiB in max GPU memory usage (11GiB vs 18GiB). Is there a way to free up GPU memory more reliable after each batch?

Minimal Complete Verifiable Example:

Create example data:

import pandas as pd
import numpy as np

n_samples = 20480

df = pd.DataFrame({
    'x': [np.random.uniform(size=(19357, )).astype('f4') for _ in range(n_samples)],
    'y': np.random.choice(range(100), size=n_samples).astype('i8')
})

df.to_parquet('test.parquet', row_group_size=1024, engine='pyarrow')

Check memory usage:

import merlin.io
from merlin.dataloader.torch import Loader
from merlin.schema import ColumnSchema, Schema

import gc
from pynvml import nvmlDeviceGetMemoryInfo, nvmlDeviceGetHandleByIndex


dataset = merlin.io.Dataset(
    'test.parquet', 
    engine='parquet', 
    part_size='180MB',
    schema=Schema([
        ColumnSchema(
            'x', dtype='float32', 
            is_list=True, is_ragged=False, 
            properties={'value_count': {'max': 19357}}
        ),
        ColumnSchema('y', dtype='int64')
    ])
)
print(dataset.partition_lens[:10])  # --> [2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048]


def benchmark(dataset, batch_size=4096, n_samples=1_000_000, call_gc=False):
    handle = nvmlDeviceGetHandleByIndex(0)
    max_memory = nvmlDeviceGetMemoryInfo(handle).used

    num_iter = n_samples // batch_size
    loader = Loader(dataset, batch_size=batch_size, shuffle=True, drop_last=True).epochs(100)

    for i, (batch, _) in enumerate(loader):
        x, y = batch['x'], batch['y']
        max_memory = max((max_memory, nvmlDeviceGetMemoryInfo(handle).used))
        if call_gc:
            gc.collect()
        if i == num_iter:
            break  

    loader.stop()
    gc.collect()

    return max_memory

Without manually calling garbage collection

max_mem = benchmark(dataset, batch_size=4096, n_samples=300_000, call_gc=False)
print('Max GPU memory usage:', max_mem // 1024**2 , 'MiB') # --> Gives: Max GPU memory usage: 18435 MiB

With manually calling garbage collection

max_mem = benchmark(dataset, batch_size=4096, n_samples=300_000, call_gc=True)
print('Max GPU memory usage:', max_mem // 1024**2 , 'MiB')  # --> Gives: Max GPU memory usage: 11305 MiB

Environment:

OS: Rocky Linux 8.7
Python: 3.10.9
merlin-core: 0.10.0
merlin-dataloader: 0.0.4
cudf-cu11: 23.02
rmm-cu11: 23.02
dask-cudf: 23.02

I installed both cudf + merlin via pip:
python -m pip install cudf-cu11==23.02 rmm-cu11==23.02 dask-cudf-cu11==23.02 --extra-index-url https://pypi.nvidia.com/
python -m pip install merlin-dataloader

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.