nvidia-merlin / dataloader Goto Github PK
View Code? Open in Web Editor NEWThe merlin dataloader lets you rapidly load tabular data for training deep leaning models with TensorFlow, PyTorch or JAX
License: Apache License 2.0
The merlin dataloader lets you rapidly load tabular data for training deep leaning models with TensorFlow, PyTorch or JAX
License: Apache License 2.0
Transformers4Rec depends on the NVTabular class TorchAsyncItr for the dataloader definition. As this class was already updated to use the new data loader, T4Rec has been implicitly updated to use the new data loader which broke example notebooks, and CI tests.
I am listing here the issues raised in T4Rec because of the new conventions implemented in the merlin loader. The goal is to give enough context to decide what should be updated (in t4rec or loader) and ensure a stable integration with T4Rec:
TorchAsyncItr
which makes it sensitive to break each time nvtabular classes change. We could create the T4Rec loader class as a subclass of the merlin.loader.torch.Loader
instead.nvtabular
, after changing t4rec to use directly merlin data loader we could change the registered name to something like merlin-loader
Hi all,
I'm trying to train a recommendation system for user-items using Databricks as distributed computing platform.
I'm applying proprocessing steps to my user related features as well as to the images, however, spark does not work well with numpy arrays. Petastorm approach to support image preprocessing on spark is to encode all numpy arrays as bytearray.
Unfortunately, your dataloader does not have support for bytearray
is it possible to add support for custom unpacking function ?
What is your question?
I have a dataset that includes a column feature of pre-trained embeddings. I couldn't find any documentations or examples on how this column should be passed to NVTabular. Is it treated as a continuous feature?
Do you have any notebooks or tutorials where you show how can I define a dataloader for image classification with pytorch?
As of 12372f4, dataloader does not work with tf.keras.layers.Embedding
.
When we run the following:
import pandas as pd
import tensorflow as tf
from merlin.dataloader.tensorflow import Loader
from merlin.io.dataset import Dataset
dataset = Dataset(pd.DataFrame({"user_id": [0, 1, 2, 3, 4]}))
layer = tf.keras.layers.Embedding(input_dim=101, output_dim=8)
with Loader(dataset, batch_size=1) as loader:
for idx, batch in enumerate(loader):
print(f"Starting batch {idx}...")
x, y = batch
_ = layer(x["user_id"])
print(f"Finished batch {idx}.")
the first batch works, but after that it will immediately fail:
Starting batch 0...
Finished batch 0.
Starting batch 1...
2023-04-10 11:05:35.362655: F tensorflow/core/framework/tensor.cc:729] Check failed: IsAligned() ptr = 0x7fc81ae01008
Aborted (core dumped)
This seems to occur both in the nvcr.io/nvidia/merlin/merlin-tensorflow:23.02
container (with main branches pip-installed) and the current pre-release container.
Versions in the pre-release container (built on April 8, 2023):
$ docker run --rm -it --gpus 0 nvcr.io/nvstaging/merlin/tmp-merlin-tensorflow-stg:23.03 bash
root@7d9fc2b399bc:/opt/tritonserver# cd /core/
root@7d9fc2b399bc:/core# git log -1
commit fce0366a458e436101d91b23e08d60b7eeb8481b (grafted, HEAD -> main, origin/main, origin/HEAD)
root@7d9fc2b399bc:/core# cd /dataloader/
root@7d9fc2b399bc:/dataloader# git log -1
commit 12372f4c6562f296c510f6734e748ef54c375c33 (grafted, HEAD -> main, origin/main, origin/HEAD)
root@7d9fc2b399bc:/dataloader# python -c 'import tensorflow; print(tensorflow.__version__)'
2.11.0
root@7d9fc2b399bc:/dataloader# python -c 'import cudf; print(cudf.__version__)'
22.12.00
I tried the Pytorch loader but it is giving me the following error:
BufferError: DLPack only supports signed/unsigned integers, float and complex dtypes.
but if I switched Tensorflow it works. Here's the code snippet:
train_dataset = merlin.io.Dataset(train_path, engine="parquet")
from merlin.loader.torch import Loader
# from merlin.dataloader.tensorflow import Loader
dl = Loader(train_dataset, batch_size=4096)
Using the README's conda install command in a fresh environment fails with an error indicating the package is not available from any channels. I see there is a merlin-dataloader package in the nvidia channel associated with this repository. Should we be using this instead and update the README command?
(base) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ mamba create -n test -c conda-forge python=3.7
...
(base) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ conda activate test
(test) nicholasb@nicholasb-HP-Z8-G4-Workstation:~/NVIDIA$ conda install -c nvidia -c rapidsai -c numba -c conda-forge merlin-loader python=3.7 cudatoolkit=11.2
Collecting package metadata (current_repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
Collecting package metadata (repodata.json): done
Solving environment: failed with initial frozen solve. Retrying with flexible solve.
PackagesNotFoundError: The following packages are not available from current channels:
- merlin-loader
Current channels:
- https://conda.anaconda.org/nvidia/linux-64
- https://conda.anaconda.org/nvidia/noarch
- https://conda.anaconda.org/rapidsai/linux-64
- https://conda.anaconda.org/rapidsai/noarch
- https://conda.anaconda.org/numba/linux-64
- https://conda.anaconda.org/numba/noarch
- https://conda.anaconda.org/conda-forge/linux-64
- https://conda.anaconda.org/conda-forge/noarch
- https://repo.anaconda.com/pkgs/main/linux-64
- https://repo.anaconda.com/pkgs/main/noarch
- https://repo.anaconda.com/pkgs/r/linux-64
- https://repo.anaconda.com/pkgs/r/noarch
To search for alternate channels that may provide the conda package you're
looking for, navigate to
https://anaconda.org
and use the search bar at the top of the page.
If dataloaders are separated into own repository, provide example how to use dataloader (similar to old NVTabular examples)
Hi,
I get the following error when using merlin-dataloader:
Description of the error:
The error seems to happen because the GPU does not seem to be detected here: https://github.com/NVIDIA-Merlin/core/blob/2fc6889cc5e9c7fbe497622710bea18c3344e824/merlin/core/compat.py#L26 (I verified nvml.device_get_count()
always returns 0 in my case even though there is a GPU available). Not sure if this is because I use SLURM or whether this is a general problem?
As the HAS_GPU
variable is set to False
then. The concat function from merlin.core.dispatch
uses pd.concat
instead of the cudf
version causing the error.
Potential fix:
Replacing this line with HAS_GPU = True
fixes the issue and everything works as expected in my case.
Environment info:
OS: Rocky Linux 8.7
Python: 3.10.9
merlin-core: 0.10.0
merlin-dataloader: 0.0.4
cudf-cu11: 23.02
rmm-cu11: 23.02
dask-cudf: 23.02
I installed both cudf + merlin via pip:
python -m pip install cudf-cu11==23.02 rmm-cu11==23.02 dask-cudf-cu11==23.02 --extra-index-url https://pypi.nvidia.com/
python -m pip install merlin-dataloader
Stacktrace:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:282, in LoaderBase.__next__(self)
280 def __next__(self):
281 """Get the next batch."""
--> 282 return self._get_next_batch()
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:349, in LoaderBase._get_next_batch(self)
347 # get the first chunks
348 if self._batch_itr is None:
--> 349 self._fetch_chunk()
351 # try to iterate through existing batches
352 try:
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:298, in LoaderBase._fetch_chunk(self)
296 if isinstance(chunks, Exception):
297 self.stop()
--> 298 raise chunks
299 self._batch_itr = iter(chunks)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:767, in ChunkQueue.load_chunks(self, dev)
765 self.chunk_logic(itr)
766 else:
--> 767 self.chunk_logic(itr)
768 except Exception as e: # pylint: disable=broad-except
769 self.put(e)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
98 @wraps(func)
99 def inner(*args, **kwargs):
100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
102 libnvtx_pop_range(self.domain.handle)
103 return result
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/dataloader/loader_base.py:740, in ChunkQueue.chunk_logic(self, itr)
737 if spill is not None and not spill.empty:
738 chunks.insert(0, spill)
--> 740 chunks = concat(chunks)
741 chunks.reset_index(drop=True, inplace=True)
742 chunks, spill = self.get_batch_div_chunk(chunks, self.dataloader.batch_size)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/merlin/core/dispatch.py:476, in concat(objs, **kwargs)
474 return dd.multi.concat(objs)
475 elif isinstance(objs[0], (pd.DataFrame, pd.Series)) or not HAS_GPU:
--> 476 return pd.concat(objs, **kwargs)
477 else:
478 return cudf.core.reshape.concat(objs, **kwargs)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
305 if len(args) > num_allow_args:
306 warnings.warn(
307 msg.format(arguments=arguments),
308 FutureWarning,
309 stacklevel=stacklevel,
310 )
--> 311 return func(*args, **kwargs)
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/core/reshape/concat.py:294, in concat(objs, axis, join, ignore_index, keys, levels, names, verify_integrity, sort, copy)
90 @deprecate_nonkeyword_arguments(version=None, allowed_args=["objs"])
91 def concat(
92 objs: Iterable[NDFrame] | Mapping[Hashable, NDFrame],
(...)
101 copy: bool = True,
102 ) -> FrameOrSeriesUnion:
103 """
104 Concatenate pandas objects along a particular axis with optional set logic
105 along the other axes.
(...)
292 ValueError: Indexes have overlapping values: ['a']
293 """
--> 294 op = _Concatenator(
295 objs,
296 axis=axis,
297 ignore_index=ignore_index,
298 join=join,
299 keys=keys,
300 levels=levels,
301 names=names,
302 verify_integrity=verify_integrity,
303 copy=copy,
304 sort=sort,
305 )
307 return op.get_result()
File ~/Programs/anaconda3/envs/merlin/lib/python3.10/site-packages/pandas/core/reshape/concat.py:384, in _Concatenator.__init__(self, objs, axis, join, keys, levels, names, ignore_index, verify_integrity, copy, sort)
379 if not isinstance(obj, (ABCSeries, ABCDataFrame)):
380 msg = (
381 f"cannot concatenate object of type '{type(obj)}'; "
382 "only Series and DataFrame objs are valid"
383 )
--> 384 raise TypeError(msg)
386 ndims.add(obj.ndim)
388 # get the sample
389 # want the highest ndim that we have, and must be non-empty
390 # unless all objs are empty
TypeError: cannot concatenate object of type '<class 'cudf.core.dataframe.DataFrame'>'; only Series and DataFrame objs are valid
Thanks for your help!
Felix
Describe the issue:
Dataloader accumulates GPU memory across batches if not manually calling gc.collect()
after each batch or after every e.g every 5th batch. See example below, manually calling garbage collection saves around 7GiB
in max GPU memory usage (11GiB
vs 18GiB
). Is there a way to free up GPU memory more reliable after each batch?
Minimal Complete Verifiable Example:
Create example data:
import pandas as pd
import numpy as np
n_samples = 20480
df = pd.DataFrame({
'x': [np.random.uniform(size=(19357, )).astype('f4') for _ in range(n_samples)],
'y': np.random.choice(range(100), size=n_samples).astype('i8')
})
df.to_parquet('test.parquet', row_group_size=1024, engine='pyarrow')
Check memory usage:
import merlin.io
from merlin.dataloader.torch import Loader
from merlin.schema import ColumnSchema, Schema
import gc
from pynvml import nvmlDeviceGetMemoryInfo, nvmlDeviceGetHandleByIndex
dataset = merlin.io.Dataset(
'test.parquet',
engine='parquet',
part_size='180MB',
schema=Schema([
ColumnSchema(
'x', dtype='float32',
is_list=True, is_ragged=False,
properties={'value_count': {'max': 19357}}
),
ColumnSchema('y', dtype='int64')
])
)
print(dataset.partition_lens[:10]) # --> [2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048]
def benchmark(dataset, batch_size=4096, n_samples=1_000_000, call_gc=False):
handle = nvmlDeviceGetHandleByIndex(0)
max_memory = nvmlDeviceGetMemoryInfo(handle).used
num_iter = n_samples // batch_size
loader = Loader(dataset, batch_size=batch_size, shuffle=True, drop_last=True).epochs(100)
for i, (batch, _) in enumerate(loader):
x, y = batch['x'], batch['y']
max_memory = max((max_memory, nvmlDeviceGetMemoryInfo(handle).used))
if call_gc:
gc.collect()
if i == num_iter:
break
loader.stop()
gc.collect()
return max_memory
Without manually calling garbage collection
max_mem = benchmark(dataset, batch_size=4096, n_samples=300_000, call_gc=False)
print('Max GPU memory usage:', max_mem // 1024**2 , 'MiB') # --> Gives: Max GPU memory usage: 18435 MiB
With manually calling garbage collection
max_mem = benchmark(dataset, batch_size=4096, n_samples=300_000, call_gc=True)
print('Max GPU memory usage:', max_mem // 1024**2 , 'MiB') # --> Gives: Max GPU memory usage: 11305 MiB
Environment:
OS: Rocky Linux 8.7
Python: 3.10.9
merlin-core: 0.10.0
merlin-dataloader: 0.0.4
cudf-cu11: 23.02
rmm-cu11: 23.02
dask-cudf: 23.02
I installed both cudf + merlin via pip:
python -m pip install cudf-cu11==23.02 rmm-cu11==23.02 dask-cudf-cu11==23.02 --extra-index-url https://pypi.nvidia.com/
python -m pip install merlin-dataloader
When using TensorFlow ragged tensors with Merlin Models and the latest tensorflow package 2.10.0 installed from pypi we are getting errors that show up in various places at random. This results in random failures when training a model that uses ragged features.
I've noticed that the more ragged tensors there are involved in a model, the more likely it is to fail with an exception
docker run -it --gpus=all --rm nvcr.io/nvidia/merlin/merlin-tensorflow:22.10 bash
> cd /models
> pip install pytest-repeat
pytest -rsx --count 10 tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling
tests/unit/tf/transformers/test_block.py ....................
# ============ 20 passed, 81 warnings in 683.13s (0:11:23) =========
pip install --upgrade tensorflow==2.10.0
pytest -rsx --count 10 tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling
# tests/unit/tf/transformers/test_block.py .FF....FF.....F.....
# ============5 failed, 15 passed, 79 warnings in 660.47s (0:11:00) =============
Tests should pass with the pypi published tensorflow package installed.
nvcr.io/nvidia/tensorflow:22.10-tf2-py3
)E numba.cuda.cudadrv.driver.LinkerError: [222] Call to cuLinkAddData results in UNKNOWN_CUDA_ERROR
E ptxas application ptx input, line 9; fatal : Unsupported .version 7.8; current version is '7.7'
tests/unit/tf/transformers/test_block.py::test_transformer_with_causal_language_modeling
> testing_utils.model_test(model, loader, run_eagerly=run_eagerly, reload_model=True)
tests/unit/tf/transformers/test_block.py:189:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
merlin/models/tf/utils/testing_utils.py:91: in model_test
losses = model.fit(dataset, batch_size=50, epochs=epochs, steps_per_epoch=1, **fit_kwargs)
merlin/models/tf/models/base.py:831: in fit
out = super().fit(**fit_kwargs)
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
raise e.with_traceback(filtered_tb) from None
merlin/models/tf/models/base.py:652: in train_step
outputs = self.call_train_test(x, y, sample_weight=sample_weight, training=True)
merlin/models/tf/models/base.py:515: in call_train_test
forward = self(
merlin/models/tf/models/base.py:1054: in call
outputs, context = self._call_child(block, outputs, context)
merlin/models/tf/models/base.py:1083: in _call_child
outputs = call_layer(child, inputs, **call_kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
merlin/models/tf/core/combinators.py:549: in call
out = call_layer(layer, layer_inputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
merlin/models/tf/core/combinators.py:549: in call
out = call_layer(layer, layer_inputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:433: in call_layer
return layer(inputs, *args, **filtered_kwargs)
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
merlin/models/tf/inputs/embedding.py:379: in call
out[feature_name] = self._call_table(inputs[feature_name], **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = EmbeddingTable(
(features): Dict(
(categories): ColumnSchema(name='categories', tags={<Tags.LIST: 'list'>, <Tags...s): tf.float32
(event_weekday_sin): tf.float32
(event_weekday_cos): tf.float32
(user_age): tf.float32
)
), inputs = <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f7d63b90820>
kwargs = {'features': {'categories': <tf.RaggedTensor [[229, 108, 27],
[73, 66, 91],
[91, 304, 40],
[88, 24, 100],
[173, 25...7, 48],
[2, 26, 6],
[119, 19, 10],
[8, 24, 22],
[25, 14, 9],
[8, 39, 41],
[48, 31, 41]]>}, 'testing': False, ...}
def _call_table(self, inputs, **kwargs):
if isinstance(inputs, tuple) and len(inputs) == 2:
inputs = list_col_to_ragged(inputs)
# Eliminating the last dim==1 of dense tensors before embedding lookup
if isinstance(inputs, tf.Tensor):
inputs = tf.squeeze(inputs, axis=-1)
if isinstance(inputs, (tf.RaggedTensor, tf.SparseTensor)):
if self.sequence_combiner and isinstance(self.sequence_combiner, str):
if isinstance(inputs, tf.RaggedTensor):
inputs = inputs.to_sparse()
> out = tf.nn.safe_embedding_lookup_sparse(
self.table.embeddings, inputs, None, combiner=self.sequence_combiner
)
E tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "categories" " f"(type EmbeddingTable).
E
E {{function_node __wrapped__Reshape_device_/job:localhost/replica:0/task:0/device:GPU:0}} Input to reshape is a tensor with 128 values, but the requested shape has 16 [Op:Reshape]
E
E Call arguments received by layer "categories" " f"(type EmbeddingTable):
E • inputs={'categories': '<tf.RaggedTensor [[229, 108, 27],\n [73, 66, 91],\n [91, 304, 40],\n [88, 24, 100],\n [173, 250, 222],\n [202, 53, 181],\n [148, 317, 298],\n [62, 133, 13]]>'}
E • kwargs={'features': {'item_id_seq': '<tf.RaggedTensor [[30, 36, 2],\n [63, 26, 7],\n [8, 2, 26],\n [7, 119, 19],\n [8, 8, 24],\n [23, 25, 14],\n [203, 8, 39],\n [7, 48, 31]]>', 'categories': '<tf.RaggedTensor [[229, 108, 27],\n [73, 66, 91],\n [91, 304, 40],\n [88, 24, 100],\n [173, 250, 222],\n [202, 53, 181],\n [148, 317, 298],\n [62, 133, 13]]>', 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': '<tf.RaggedTensor [[-2.872848, -2.2042453, -1.3785743, -1.8874695],\n [-1.6890998, -1.2722275, -2.3514903, -1.92955],\n [-2.8999336, -2.4636097, -2.2710497, -2.286526],\n [-1.5714741, -1.4789449, -2.3554823, -2.7618356],\n [-2.4185984, -2.0205548, -2.5964813, -1.3083836],\n [-2.3086674, -1.7392484, -1.7713305, -2.7666268],\n [-1.6371323, -1.2986643, -2.741214, -2.8319168],\n [-2.4287252, -2.1784492, -2.9063392, -2.1983893]]>', 'event_hour_sin': '<tf.RaggedTensor [[0.87519, 0.74108493, 0.7021362, 0.20880622],\n [0.7751159, 0.5112479, 0.6897644, 0.4618523],\n [0.9877395, 0.8067353, 0.2785073, 0.5127078],\n [0.42245722, 0.37151605, 0.662217, 0.29389524],\n [0.6415208, 0.8767573, 0.99127567, 0.13979311],\n [0.08620147, 0.9858557, 0.93777907, 0.27667108],\n [0.3221979, 0.8577759, 0.50887823, 0.37246662],\n [0.007813198, 0.11941633, 0.8280699, 0.46084708]]>', 'event_hour_cos': '<tf.RaggedTensor [[0.20659643, 0.34245858, 0.79789066, 0.6533956],\n [0.21133861, 0.29099822, 0.29909313, 0.38387182],\n [0.6483645, 0.26128438, 0.5097018, 0.0679879],\n [0.35938728, 0.122880295, 0.31028247, 0.42299688],\n [0.47082654, 0.3139253, 0.54357225, 0.33945748],\n [0.1049997, 0.27617747, 0.16228387, 0.25873512],\n [0.25580782, 0.6140291, 0.7268057, 0.018048134],\n [0.45832136, 0.25160003, 0.34657586, 0.24501482]]>', 'event_weekday_sin': '<tf.RaggedTensor [[0.2813703, 0.19528128, 0.9347062, 0.019609304],\n [0.21666977, 0.121215016, 0.5087735, 0.016004765],\n [0.039047234, 0.19097127, 0.24673569, 0.5377793],\n [0.5285091, 0.9157901, 0.11795353, 0.0054059345],\n [0.03237542, 0.58396244, 0.024678292, 0.45758602],\n [0.7823007, 0.16673656, 0.69379526, 0.50180435],\n [0.62802064, 0.8324896, 0.01632287, 0.74014264],\n [0.6049697, 0.14898655, 0.62781924, 0.20901294]]>', 'event_weekday_cos': '<tf.RaggedTensor [[0.643363, 0.7165992, 0.02970902, 0.96914697],\n [0.28388807, 0.6481853, 0.572021, 0.82755953],\n [0.46088678, 0.9416777, 0.7179773, 0.7201362],\n [0.8907028, 0.2865412, 0.24590795, 0.9174267],\n [0.5912441, 0.6970897, 0.44896245, 0.71856815],\n [0.6125141, 0.9272917, 0.64227724, 0.8949168],\n [0.5841027, 0.8976689, 0.9729557, 0.5763398],\n [0.1711828, 0.49837482, 0.11556943, 0.93513274]]>', 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}, 'training': 'True', 'testing': 'False', 'mask': ('None',), 'targets': {'item_id_seq': '<tf.RaggedTensor [[36, 2, 59],\n [26, 7, 48],\n [2, 26, 6],\n [119, 19, 10],\n [8, 24, 22],\n [25, 14, 9],\n [8, 39, 41],\n [48, 31, 41]]>'}}
merlin/models/tf/inputs/embedding.py:397: InvalidArgumentError
> predictions = model.predict(loader, batch_size=8, steps=1)
tests/unit/tf/transformers/test_block.py:194:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
merlin/models/tf/models/base.py:915: in predict
out = super(BaseModel, self).predict(
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
raise e.with_traceback(filtered_tb) from None
merlin/models/tf/loader.py:337: in __getitem__
return DataLoader.__next__(self)
merlin/models/loader/backend.py:356: in __next__
return self._get_next_batch()
merlin/models/loader/backend.py:393: in _get_next_batch
batch = next(self._batch_itr)
merlin/models/loader/backend.py:495: in <genexpr>
return (self._handle_tensors(*batch) for batch in batches)
merlin/models/tf/loader.py:514: in _handle_tensors
to_return = transform(*to_return)
merlin/models/tf/core/tabular.py:490: in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = SequencePredictNext(
(_pre): ListToRagged()
), inputs = {'categories': <tf.RaggedTensor [[230, 232, 48, 89],
[59, 60, 186, 213],
[126, 26, 264, 130],
[221, 254, 118, 94],
....7090351],
[0.297698, 0.8401888, 0.033521578, 0.062045045],
[0.101515464, 0.52725536, 0.55959797, 0.21533921]]>, ...}
targets = {'item_id_seq': <tf.RaggedTensor [[2, 19, 23],
[7, 13, 11],
[12, 19, 35],
[21, 8, 17],
[16, 87, 21],
[3, 23, 45],
[18, 33, 6],
[4, 17, 40]]>}, training = False, testing = False, kwargs = {}, new_target = <tf.RaggedTensor [[2, 19, 23],
[7, 13, 11],
[12, 19, 35],
[21, 8, 17],
[16, 87, 21],
[3, 23, 45],
[18, 33, 6],
[4, 17, 40]]>
new_inputs = {'item_id_seq': <tf.RaggedTensor [[22, 2, 19],
[7, 7, 13],
[24, 12, 19],
[10, 21, 8],
[11, 16, 87],
[16, 3, 23],
[8, 18, 33],
[42, 4, 17]]>}, k = 'categories', v = <tf.RaggedTensor [[230, 232, 48, 89],
[59, 60, 186, 213],
[126, 26, 264, 130],
[221, 254, 118, 94],
[92, 32, 64, 307],
[165, 217, 9, 200],
[127, 19, 14, 262],
[91, 31, 268, 196]]>
def call(
self, inputs: TabularData, targets=None, training=False, testing=False, **kwargs
) -> Tuple:
self._check_seq_inputs_targets(inputs)
# Shifts the target column to be the next item of corresponding input column
new_target = inputs[self.target_name][:, 1:]
if targets is None:
targets = dict({self.target_name: new_target})
elif isinstance(targets, dict):
targets[self.target_name] = new_target
else:
raise ValueError("Targets should be None or a dict of tensors")
new_inputs = dict()
for k, v in inputs.items():
if k in self.schema.column_names:
# Removes the last item of the sequence, as it belongs to the target
> new_inputs[k] = v[:, :-1]
E tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "sequence_predict_next" " f"(type SequencePredictNext).
E
E {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Expected begin, end, and strides to be 1D equal size tensors, but got shapes [5,7], [1], and [1] instead. [Op:StridedSlice] name: sequence_predict_next/strided_slice/
E
E Call arguments received by layer "sequence_predict_next" " f"(type SequencePredictNext):
E • inputs={'item_id_seq': '<tf.RaggedTensor [[22, 2, 19, 23],\n [7, 7, 13, 11],\n [24, 12, 19, 35],\n [10, 21, 8, 17],\n [11, 16, 87, 21],\n [16, 3, 23, 45],\n [8, 18, 33, 6],\n [42, 4, 17, 40]]>', 'categories': '<tf.RaggedTensor [[230, 232, 48, 89],\n [59, 60, 186, 213],\n [126, 26, 264, 130],\n [221, 254, 118, 94],\n [92, 32, 64, 307],\n [165, 217, 9, 200],\n [127, 19, 14, 262],\n [91, 31, 268, 196]]>', 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': '<tf.RaggedTensor [[-2.6321867, -1.8960141, -1.3827176, -1.2979091],\n [-1.8419394, -1.8213999, -2.562653, -2.6888773],\n [-2.4615552, -1.6159127, -1.673134, -2.5371401],\n [-1.5431116, -2.8743885, -2.3154233, -2.4626255],\n [-2.5722892, -2.3213966, -1.8599365, -1.783778],\n [-2.156473, -1.9776379, -2.3210335, -2.0802162],\n [-2.5183177, -2.0650043, -1.443693, -1.8066756],\n [-1.9003887, -2.8115828, -2.2143846, -2.8128927]]>', 'event_hour_sin': '<tf.RaggedTensor [[0.51098526, 0.70276326, 0.46969718, 0.2593699],\n [0.6366822, 0.3187587, 0.20556688, 0.66788745],\n [0.95823944, 0.6742351, 0.5742672, 0.48995376],\n [0.57856935, 0.03680166, 0.19356328, 0.38373777],\n [0.7118747, 0.22016901, 0.5754359, 0.2170586],\n [0.47503403, 0.3085081, 0.42746046, 0.62828666],\n [0.56101286, 0.34885383, 0.6834232, 0.7991214],\n [0.94796985, 0.9983975, 0.3882422, 0.3168175]]>', 'event_hour_cos': '<tf.RaggedTensor [[0.23879345, 0.56286687, 0.10395286, 0.9803779],\n [0.8735259, 0.8191224, 0.6987805, 0.9508517],\n [0.7893286, 0.25938335, 0.9413311, 0.39071575],\n [0.07055479, 0.80096203, 0.43780208, 0.2883036],\n [0.111770056, 0.26904875, 0.47182095, 0.9736114],\n [0.8101491, 0.82414633, 0.8063273, 0.6993653],\n [0.71110797, 0.47319838, 0.11871127, 0.9379435],\n [0.2919758, 0.99880177, 0.423404, 0.40579566]]>', 'event_weekday_sin': '<tf.RaggedTensor [[0.8151913, 0.5308951, 0.6098198, 0.62053984],\n [0.26776758, 0.1522134, 0.676475, 0.3599095],\n [0.3076552, 0.5671771, 0.044789243, 0.41746667],\n [0.31245553, 0.067309044, 0.94046533, 0.2758188],\n [0.99028224, 0.6741068, 0.32716697, 0.6070125],\n [0.046452753, 0.6973015, 0.91035575, 0.30170825],\n [0.5714769, 0.8249496, 0.60799205, 0.5094672],\n [0.25499228, 0.5543269, 0.293861, 0.17119424]]>', 'event_weekday_cos': '<tf.RaggedTensor [[0.70672184, 0.8013267, 0.90591705, 0.7007231],\n [0.29772517, 0.6715878, 0.78386587, 0.35773724],\n [0.7337082, 0.07127134, 0.8546552, 0.11059116],\n [0.82335657, 0.8343963, 0.09818372, 0.2662608],\n [0.27790034, 0.017068934, 0.21067896, 0.6397068],\n [0.13516696, 0.26911286, 0.04904374, 0.7090351],\n [0.297698, 0.8401888, 0.033521578, 0.062045045],\n [0.101515464, 0.52725536, 0.55959797, 0.21533921]]>', 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}
E • targets=None
E • training=False
E • testing=False
E • kwargs=<class 'inspect._empty'>
merlin/models/tf/transforms/sequence.py:234: InvalidArgumentError
> batch = next(iter(loader))[0]
tests/unit/tf/transformers/test_block.py:186:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.8/dist-packages/keras/utils/data_utils.py:515: in __iter__
for item in (self[i] for i in range(len(self))):
/usr/local/lib/python3.8/dist-packages/keras/utils/data_utils.py:515: in <genexpr>
for item in (self[i] for i in range(len(self))):
merlin/models/tf/loader.py:337: in __getitem__
return DataLoader.__next__(self)
merlin/models/loader/backend.py:356: in __next__
return self._get_next_batch()
merlin/models/loader/backend.py:393: in _get_next_batch
batch = next(self._batch_itr)
merlin/models/loader/backend.py:495: in <genexpr>
return (self._handle_tensors(*batch) for batch in batches)
merlin/models/tf/loader.py:514: in _handle_tensors
to_return = transform(*to_return)
merlin/models/tf/core/tabular.py:487: in _tabular_call
inputs = self.pre_call(inputs, transformations=pre)
merlin/models/tf/core/tabular.py:225: in pre_call
return self._maybe_apply_transformations(
merlin/models/tf/core/tabular.py:298: in _maybe_apply_transformations
return _transformations(inputs)
merlin/models/tf/core/tabular.py:490: in _tabular_call
outputs = self.super().__call__(inputs, *args, **kwargs) # type: ignore
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70: in error_handler
raise e.with_traceback(filtered_tb) from None
merlin/models/tf/transforms/tensor.py:40: in call
outputs[name] = list_col_to_ragged(val)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
col = (<tf.Tensor: shape=(32, 1), dtype=float32, numpy=
array([[0.6031388 ],
[0.8644114 ],
[0.76056653],
... numpy=
array([[4],
[4],
[4],
[4],
[4],
[4],
[4],
[4]], dtype=int32)>)
def list_col_to_ragged(col: Tuple[tf.Tensor, tf.Tensor]):
values = col[0][:, 0]
> row_lengths = col[1][:, 0]
E tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "list_to_ragged" " f"(type ListToRagged).
E
E {{function_node __wrapped__StridedSlice_device_/job:localhost/replica:0/task:0/device:GPU:0}} Expected begin, end, and strides to be 1D equal size tensors, but got shapes [2], [5,7], and [2] instead. [Op:StridedSlice] name: list_to_ragged/strided_slice/
E
E Call arguments received by layer "list_to_ragged" " f"(type ListToRagged):
E • inputs={'item_id_seq': ('tf.Tensor(shape=(32, 1), dtype=int64)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'categories': ('tf.Tensor(shape=(32, 1), dtype=int64)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'test_user_id': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'user_country': 'tf.Tensor(shape=(8, 1), dtype=int64)', 'item_age_days_norm': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_hour_sin': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_hour_cos': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_weekday_sin': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'event_weekday_cos': ('tf.Tensor(shape=(32, 1), dtype=float32)', 'tf.Tensor(shape=(8, 1), dtype=int32)'), 'user_age': 'tf.Tensor(shape=(8, 1), dtype=float32)'}
E • kwargs={'training': 'None'}
merlin/models/tf/utils/tf_utils.py:460: InvalidArgumentError
tests/unit/tf/transformers/test_block.py::test_retrieval_transformer
(#833 )> predictions = model.predict(loader)
tests/unit/tf/transformers/test_block.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
merlin/models/tf/models/base.py:940: in predict
out = super(BaseModel, self).predict(
/home/oliverholworthy/venv/lib/python3.8/site-packages/keras/utils/traceback_utils.py:70: in error_handler
raise e.with_traceback(filtered_tb) from None
merlin/models/tf/models/base.py:730: in predict_step
return self(x, training=False)
merlin/models/tf/models/base.py:1107: in call
outputs, context = self._call_child(block, outputs, context)
merlin/models/tf/models/base.py:1136: in _call_child
outputs = call_layer(child, inputs, **call_kwargs)
merlin/models/tf/utils/tf_utils.py:437: in call_layer
return layer(inputs, *args, **filtered_kwargs)
merlin/models/tf/core/encoder.py:166: in __call__
return super().__call__(inputs, **kwargs)
merlin/models/tf/core/encoder.py:143: in call
return combinators.call_sequentially(
merlin/models/tf/core/combinators.py:819: in call_sequentially
outputs = call_layer(layer, outputs, **kwargs)
merlin/models/tf/utils/tf_utils.py:437: in call_layer
return layer(inputs, *args, **filtered_kwargs)
merlin/models/config/schema.py:58: in __call__
return super().__call__(*args, **kwargs)
merlin/models/tf/transformers/block.py:131: in call
transformer = self.transformer(pre)
/usr/local/lib/python3.8/dist-packages/transformers/modeling_tf_utils.py:412: in run_call_with_unpacked_inputs
return func(self, **unpacked_inputs)
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:480: in call
outputs = block(
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:255: in call
output_attn = self.attn(
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:197: in call
attn_outputs = self._attn(query, key, value, attention_mask, head_mask, output_attentions, training=training)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = TFAttention(
(c_attn): TFConv1D(
(_feature_shapes): Dict(
(item_id_seq): TensorShape([8, None])
(cat...s): tf.float32
(event_weekday_sin): tf.float32
(event_weekday_cos): tf.float32
(user_age): tf.float32
)
)
q = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[ 2.95891333e-02, -8.08279309e-03, 6.67379573e-02, .....98048e-02, 4.53588329e-02, ...,
6.08591698e-02, 2.35463437e-02, -3.80779319e-02]]]],
dtype=float32)>
k = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[-2.16567628e-02, -6.11152239e-02, -1.46311242e-02, .....19633e-02, -1.48034040e-02, ...,
-3.22266296e-02, 7.09668454e-03, 3.13788280e-02]]]],
dtype=float32)>
v = <tf.Tensor: shape=(8, 4, 3, 12), dtype=float32, numpy=
array([[[[ 5.50495535e-02, -2.97006536e-02, 3.75673361e-02, .....80668e-02, -2.66220625e-02, ...,
-4.91122231e-02, 5.88061437e-02, 2.58720685e-02]]]],
dtype=float32)>, attention_mask = None, head_mask = None, output_attentions = False, training = False
def _attn(self, q, k, v, attention_mask, head_mask, output_attentions, training=False):
# q, k, v have shape [batch, heads, sequence, features]
w = tf.matmul(q, k, transpose_b=True)
if self.scale:
dk = tf.cast(shape_list(k)[-1], dtype=w.dtype) # scale attention_scores
w = w / tf.math.sqrt(dk)
if not self.is_cross_attention:
# if only "normal" attention layer implements causal mask
# w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst.
_, _, nd, ns = shape_list(w)
b = self.causal_attention_mask(nd, ns, dtype=w.dtype)
> b = tf.reshape(b, [1, 1, nd, ns])
E tensorflow.python.framework.errors_impl.InvalidArgumentError: Exception encountered when calling layer "attn" " f"(type TFAttention).
E
E {{function_node __wrapped__Reshape_device_/job:localhost/replica:0/task:0/device:GPU:0}} Input to reshape is a tensor with 8 values, but the requested shape has 9 [Op:Reshape]
E
E Call arguments received by layer "attn" " f"(type TFAttention):
E • x=tf.Tensor(shape=(8, 3, 48), dtype=float32)
E • layer_past=None
E • attention_mask=None
E • head_mask=None
E • encoder_hidden_states=None
E • encoder_attention_mask=None
E • use_cache=True
E • output_attentions=False
E • training=False
/usr/local/lib/python3.8/dist-packages/transformers/models/gpt2/modeling_tf_gpt2.py:123: InvalidArgumentError
I believe that the "validater" in the class name should probably be "validator."
I skimmed https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence and source for that on GitHub and I couldn't find any precedent for "validater."
Is your feature request related to a problem? Please describe.
The PyT and TF Dataloader support padding list (sparse) features to the right, which means that shorter list sequences will be completed with 0s in the right.
For sequential recommendation, a common use case is to keep the last N user interactions, what can be done either in the preprocessing or in the model side. The NVT Slice op, supports truncating to the last N elements (by providing negative limits).
But it is also useful to be able to do additional truncation in the model side (e.g. truncating with larger max seq. threshold with Slice op and tuning the best max sequence length according to model accuracy and training speed. To do such truncation in the model side, the padding needs to be applied by the Data Loader on the left side of the sequence features, so that when they are converted to dense tensors the padding 0s are placed on the left side. Thus, features could be sliced in the model like feature[:, -keep_last_n:]
without loosing the sequence features of users with less than N interactions.
Describe the solution you'd like
Create an argument for the datalodader sparse_padding_side
, which by default is right
, but can be set to left
In NVTabular, we have multiple multi-GPU examples. We move them to Merlin as legacy.
PRs:
After we created the example, let's remove multi-gpu-movielens in the legacy folder
In data parallel training, we start multiple workers with different initialization of the dataloader and train with horovod. After each batch update, the parameters are synced. Merlin dataloader has different number of batches depending on the selected rank. Therefore, some workers finishes the training loop and other workers are still training - this causes horovod to freeze.
import cudf
import os
import merlin.models.tf.dataset as tf_dataloader
import nvtabular as nvt
os.system('mkdir ./test/')
df = cudf.DataFrame({
'col1': range(0,9000000)
})
df.to_parquet('./test/part_1.parquet')
df = cudf.DataFrame({
'col1': range(0,10000000)
})
df.to_parquet('./test/part_2.parquet')
df = cudf.DataFrame({
'col1': range(0,11000000)
})
df.to_parquet('./test/part_3.parquet')
df = cudf.DataFrame({
'col1': range(0,12000000)
})
df.to_parquet('./test/part_4.parquet')
ds = nvt.Dataset('./test/*.parquet', part_size='100MB')
for i in range(4):
train_dl = tf_dataloader.BatchedDataset(
ds,
batch_size = 1024*16,
shuffle=True,
drop_last=True,
cat_names=['col1'],
global_size=4,
global_rank=i,
)
print(len(train_dl))
Output:
549
610
671
732
Hello,
I have a very large parquet file that the Loader
is trying to load on a 24 GB GPU. Is there any way not to load the whole dataset into the dataloader?
Describe the bug
Customer use NVTabular KerasSequenceLoader
and parquet dataset to load data, but it cost a long time, almost 1.5+ hours.
Reproduction Steps
The data file and script will be upload.
Expected behavior
Reduce the time of data loading.
Environment (please complete the following information):
Container: nvcr.io/nvidia/merlin/merlin-tensorflow:latest
Thanks!
As of 12372f4, device assignment in the PyTorch dataloader does not work correctly with multiple GPUs.
import os
import pandas as pd
from merlin.dataloader.torch import Loader
from merlin.io.dataset import Dataset
dataset = Dataset(pd.DataFrame({"a": list(range(10))}))
dataset = dataset.repartition(npartitions=2)
rank = int(os.environ["LOCAL_RANK"])
with Loader(
dataset,
batch_size=1,
global_rank=rank,
global_size=2,
device=rank,
) as loader:
for idx, batch in enumerate(loader):
x, y = batch
device = x["a"].device
print(f"rank: {rank}, device: {device}")
When I run the above, I get:
root@ba87ba84045a:/dataloader# torchrun --nproc_per_node=2 test_torch_multi_gpu.py
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
But for rank 1, tensors are expected to be be placed on cuda:1
not cuda:0
.
PyTorch has recently released tensordict, which provides a series of utilities for efficiently process batches of heterogeneous tensor types.
It makes it easy to index multiple tensors at the same time, manipulate their shape, split them etc. It also allows to work efficiently with nested data structures. Have a look at the README, the doc and tutorial.
For context, ysing TensorDict with replay buffers in TorchRL provided us with at least an order of magnitude speed-up during data collection.
Would it be possible to consider a version of the dataloader that would output a TensorDict, instead of a dictionary as in the tutorial?
I run a training script and reinitialize nvtabular dataloaders. After each initialization, the available GPU memory decreases (fmem = pynvml_mem_size(kind="free", index=0) ). That is unexpected. The available GPU memory should be constant. After training a while, I run OOM for that reason. The use case is that I want to train per day -> each day is a separate file.
Results:
[41587769344,
41113812992,
40744714240,
40509833216,
40390295552,
39972962304,
39866007552,
39635320832,
39299776512,
39115227136,
38846791680,
38544801792,
38276366336,
38007930880,
37789827072,
37521391616,
37269733376,
37018075136,
36758028288,
36514758656,
36263100416,
36112105472,
35701063680,
35474571264,
35222913024]
Data Generation
import cudf
df = cudf.DataFrame({
'col1': list(range(100000000)),
'col2': list(range(100000000)),
'col3': list(range(100000000)),
'target1': list(range(100000000)),
'target2': list(range(100000000))
})
df.to_parquet('test'+str(0)+'.parquet')
Executing Script
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
import merlin.models.tf.dataset as tf_dataloader
from merlin.schema.tags import Tags
import nvtabular as nvt
from nvtabular.utils import pynvml_mem_size
def map_output(x,y):
out = []
for tg in ['target1', 'target2']:
out.append(y[tg])
y = tf.concat(out, axis=1)
return x, y
import gc
import glob
BATCH_SIZE = 2*64*1024
fmems = []
for j in range(25):
print(j)
fmem = pynvml_mem_size(kind="free", index=0)
print(fmem)
fmems.append(fmem)
gc.collect()
files = sorted(glob.glob(
'test'+str(0)+'.parquet'
))
train = nvt.Dataset(files, part_size="100MB")
train_dl = tf_dataloader.BatchedDataset(
train,
batch_size = 1024*64,
shuffle=True,
drop_last=True,
cat_names=['col1', 'col2', 'col3'],
label_names=['target1', 'target2']
).map(map_output)
gc.collect()
for i, (inputs, labels) in enumerate(train_dl):
if i>10:
del train, train_dl
break
The available memory should be constant.
Hi all!
Below an examle of code:
from merlin.loader.torch import Loader
from merlin.io import Dataset
train_ds = Dataset('train.parquet')
train_loader = Loader(train_ds, batch_size=65536, shuffle=True)
for batch in train_loader:
print(batch)
After running I got following:
TypeError: sample() got an unexpected keyword argument 'keep_index'
@radekosmulski developed the examples for dataloaders with native TensorFlow/PyTorch:#47
I wonder how do we recommend to use the dataloaders without NVTabular. When we do not use NVTabular, we do not have a dataschema, therefore, all columns are treated as an input feature.
In particular, TensorFlow keras expects that the output of the dataloader is (x, y). Currently, @radekosmulski added a parsing function: https://github.com/NVIDIA-Merlin/dataloader/blob/8157c7650248359201545934fd7d3e7f95b0eea8/examples/01a-Getting-started-Tensorflow.ipynb
label_column = 'rating'
def process_batch(data, _):
x = {col: data[col] for col in data.keys() if col != label_column}
y = data[label_column]
return (x, y)
loader._map_fns = [process_batch]
I think the parsing function will be always required and therefore, this it not the best user experience.
Previously, the dataloader supported to parse arguments cat_names, cont_names, label_names.
I think there are multiple options:
I run into an out-of-memory error when iterating over the Merlin Loader in a Lightning Datamodule:
import os
from math import ceil
from os.path import join
from typing import Dict, List
import lightning.pytorch as pl
import merlin.io
import pyarrow as pa
from merlin.dataloader.torch import Loader
from merlin.dtypes import float32, int64
from merlin.schema import ColumnSchema, Schema
def _merlin_dataset_factory(path: str, columns: List[str], dataset_kwargs: Dict):
return merlin.io.Dataset(
path,
engine='parquet',
schema=Schema(
[
ColumnSchema(
'X', dtype=float32,
is_list=True, is_ragged=False,
properties={'value_count': {'max': n_cols}}
)
] +
[ColumnSchema(col, dtype=int64) for col in columns]
),
**dataset_kwargs
)
def _set_default_kwargs_dataloader(kwargs: Dict[str, any], train: bool = True):
if kwargs is None:
kwargs = {}
parts_per_chunk = 8 if train else 1
drop_last = True if train else False
shuffle = True if train else False
if 'parts_per_chunk' not in kwargs:
kwargs['parts_per_chunk'] = parts_per_chunk
if 'drop_last' not in kwargs:
kwargs['drop_last'] = drop_last
if'shuffle' not in kwargs:
kwargs['shuffle'] = shuffle
return kwargs
def _set_default_kwargs_dataset(kwargs: Dict[str, any], train: bool = True):
if kwargs is None:
kwargs = {}
part_size = '100MB' if train else '325MB'
if all(['part_size' not in kwargs, 'part_mem_fraction' not in kwargs]):
kwargs['part_size'] = part_size
return kwargs
class MerlinDataModule(pl.LightningDataModule):
def __init__(
self,
path: str,
columns: List[str],
batch_size: int,
dataloader_kwargs_train: Dict = None,
dataloader_kwargs_inference: Dict = None,
dataset_kwargs_train: Dict = None,
dataset_kwargs_inference: Dict = None
):
super(MerlinDataModule).__init__()
for col in columns:
assert col in PARQUET_SCHEMA.names
self.dataloader_kwargs_train = _set_default_kwargs_dataloader(dataloader_kwargs_train, train=True)
self.dataloader_kwargs_inference = _set_default_kwargs_dataloader(dataloader_kwargs_inference, train=False)
self.train_dataset = _merlin_dataset_factory(
join(path, 'train'),
columns,
_set_default_kwargs_dataset(dataset_kwargs_train, train=True)
)
self.val_dataset = _merlin_dataset_factory(
join(path, 'val'),
columns,
_set_default_kwargs_dataset(dataset_kwargs_inference, train=False)
)
self.test_dataset = _merlin_dataset_factory(
join(path, 'test'), columns, _set_default_kwargs_dataset(dataset_kwargs_inference, train=False))
self.batch_size = batch_size
def train_dataloader(self):
return Loader(self.train_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_train)
def val_dataloader(self):
return Loader(self.val_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)
def test_dataloader(self):
return Loader(self.test_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)
def predict_dataloader(self):
return Loader(self.test_dataset, batch_size=self.batch_size, **self.dataloader_kwargs_inference)
My versions are:
lightning: 2.0.1
merlin-core: 23.6.0
merlin-dataloader: 23.4.0
pyarrow: 12.0.1
And I call, e.g.,
for batch in datamodule.train_dataloader():
batch_data = batch[0]['X']
break
which takes ages and then crashes with out-of-memory.
Container: nvcr.io/nvidia/merlin/merlin-tensorflow:22.11
The reproducing dataset and scripts are here.
genData/to_parquet.ipynb is the main script and you can change the is_slot_multi
to reproduce.
is_slot_multi = True # True means some of the features are multi-hot, false means all features are one-hot
I am trying to extract embedding but the following options do not work.
Option 1:
I tried these scripts but none works:
model_transformer.query_embeddings(train, index='session_id')
or
model_transformer.query_embeddings(train, batch_size = 1024, index='session_id')
Option 2:
I am able to generate session embeddings for a single batch but it does not work if I iterate over the loader batch by batch, it crashes.
this works:
model_transformer.query_encoder(batch[0])
but iterating over loader batch by batch does not work:
all_sess_embeddings = []
for batch, _ in iter(loader):
embds = model_transformer.query_encoder(batch).numpy()
del batch
gc.collect()
all_sess_embeddings.append(embds)
Please go to this link to download the gist for the code to repro the issue:
https://gist.github.com/rnyak/d70822084c26ba6972615512e8a78bb2
We should be able to extract session embeddings from query_model of the transformer model without any issues.
The following commands produce the below error on import as of this morning, compared to about 12 hours ago:
!pip install merlin-dataloader
from merlin.loader.torch import Loader
ImportError Traceback (most recent call last)
/tmp/ipykernel_27/1850034830.py in <module>
14 get_ipython().system('pip install merlin-dataloader')
---> 15 from merlin.loader.torch import Loader
16 from merlin.io import Dataset
17
/opt/conda/lib/python3.7/site-packages/merlin/loader/torch.py in <module>
19
20 from merlin.core.dispatch import HAS_GPU
---> 21 from merlin.loader.loader_base import LoaderBase
22
23 numpy_to_torch_dtype_dict = {
/opt/conda/lib/python3.7/site-packages/merlin/loader/loader_base.py in <module>
38 pull_apart_list,
39 )
---> 40 from merlin.dag import BaseOperator, ColumnSelector, DictArray, Graph, Node
41 from merlin.dag.executors import LocalExecutor
42 from merlin.io import shuffle_df
ImportError: cannot import name 'DictArray' from 'merlin.dag' (/opt/conda/lib/python3.7/site-packages/merlin/dag/__init__.py)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.