Code Monkey home page Code Monkey logo

space's Introduction

Space: Unified Storage for Machine Learning

Python CI


Unify data in your entire machine learning lifecycle with Space, a comprehensive storage solution that seamlessly handles data from ingestion to training.

Key Features:

  • Ground Truth Database
    • Store and manage multimodal data in open source file formats, row or columnar, local or in cloud.
    • Ingest from various sources, including ML datasets, files, and labeling tools.
    • Support data manipulation (append, insert, update, delete) and version control.
  • OLAP Database and Lakehouse
  • Distributed Data Processing Pipelines
    • Integrate with processing frameworks like Ray for efficient data transformation.
    • Store processed results as Materialized Views (MVs); incrementally update MVs when the source is changed.
  • Seamless Training Framework Integration
    • Access Space datasets and MVs directly via random access interfaces.
    • Convert to popular ML dataset formats (e.g., TFDS, HuggingFace, Ray).

Onboarding Examples

Space 101

  • Space uses Arrow in the API surface, e.g., schema, filter, data IO.
  • All file paths in Space are relative; datasets are immediately usable after downloading or moving.
  • Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see space.core.schema.types.files).
  • space.TfFeatures is a built-in field type providing serializers for nested dicts of numpy arrays, based on TFDS FeaturesDict.
  • Please find more information in the design and performance docs.

Quick Start

Install

Install:

pip install space-datasets

Or install from code:

cd python
pip install .[dev]

Cluster Setup and Performance Tuning

See the setup and performance doc.

Create and Load Datasets

Create a Space dataset with two index fields (id, image_name) (store in Parquet) and a record field (feature) (store in ArrayRecord).

This example uses the plain binary type for the record field. Space supports a type space.TfFeatures that integrates with the TFDS feature serializer. See more details in a TFDS example.

import pyarrow as pa
from space import Dataset

schema = pa.schema([
  ("id", pa.int64()),
  ("image_name", pa.string()),
  ("feature", pa.binary())])

ds = Dataset.create(
  "/path/to/<mybucket>/example_ds",
  schema,
  primary_keys=["id"],
  record_fields=["feature"])  # Store this field in ArrayRecord files

# Load the dataset from files later:
ds = Dataset.load("/path/to/<mybucket>/example_ds")

Optionally, you can use catalogs to manage datasets by names instead of locations:

from space import DirCatalog

# DirCatalog manages datasets in a directory.
catalog = DirCatalog("/path/to/<mybucket>")

# Same as the creation above.
ds = catalog.create_dataset("example_ds", schema,
  primary_keys=["id"], record_fields=["feature"])

# Same as the load above.
ds = catalog.dataset("example_ds")

# List all datasets and materialized views.
print(catalog.datasets())

Write and Read

Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. Users can add tags to version IDs as alias.

import pyarrow.compute as pc
from space import RayOptions

# Create a local runner:
runner = ds.local()

# Or create a Ray runner:
runner = ds.ray(ray_options=RayOptions(max_parallelism=8))

# To avoid https://github.com/ray-project/ray/issues/41333, wrap the runner 
# with @ray.remote when running in a remote Ray cluster.
#
# @ray.remote
# def run():
#   return runner.read_all()
#

# Appending data generates a new dataset version `snapshot_id=1`
# Write methods:
# - append(...): no primary key check.
# - insert(...): fail if primary key exists.
# - upsert(...): overwrite if primary key exists.
ids = range(100)
runner.append({
  "id": ids,
  "image_name": [f"{i}.jpg" for i in ids],
  "feature": [f"somedata{i}".encode("utf-8") for i in ids]
})
ds.add_tag("after_append")  # Version management: add tag to snapshot

# Deletion generates a new version `snapshot_id=2`
runner.delete(pc.field("id") == 1)
ds.add_tag("after_delete")

# Show all versions
ds.versions().to_pandas()
# >>>
#    snapshot_id               create_time tag_or_branch
# 0            2 2024-01-12 20:23:57+00:00  after_delete
# 1            1 2024-01-12 20:23:38+00:00  after_append
# 2            0 2024-01-12 20:22:51+00:00          None

# Read options:
# - filter_: optional, apply a filter (push down to reader).
# - fields: optional, field selection.
# - version: optional, snapshot_id or tag, time travel back to an old version.
# - batch_size: optional, output size.
runner.read_all(
  filter_=pc.field("image_name")=="2.jpg",
  fields=["feature"],
  version="after_add"  # or snapshot ID `1`
)

# Read the changes between version 0 and 2.
for change in runner.diff(0, "after_delete"):
  print(change.change_type)
  print(change.data)
  print("===============")

Create a new branch and make changes in the new branch:

# The default branch is "main"
ds.add_branch("dev")
ds.set_current_branch("dev")
# Make changes in the new branch, the main branch is not updated.
# Switch back to the main branch.
ds.set_current_branch("main")

Transform and Materialized Views

Space supports transforming a dataset to a view, and materializing the view to files. The transforms include:

  • Mapping batches using a user defined function (UDF).
  • Filter using a UDF.
  • Joining two views/datasets.

When the source dataset is modified, refreshing the materialized view incrementally synchronizes changes, which saves compute and IO cost. See more details in a Segment Anything example. Reading or refreshing views must be the Ray runner, because they are implemented based on Ray transform.

A materialized view mv can be used as a view mv.view or a dataset mv.dataset. The former always reads data from the source dataset's files and processes all data on-the-fly. The latter directly reads processed data from the MV's files, skips processing data.

Example of map_batches

# A sample transform UDF.
# Input is {"field_name": [values, ...], ...}
def modify_feature_udf(batch):
  batch["feature"] = [d + b"123" for d in batch["feature"]]
  return batch

# Create a view and materialize it.
view = ds.map_batches(
  fn=modify_feature_udf,
  output_schema=ds.schema,
  output_record_fields=["feature"]
)

view_runner = view.ray()
# Reading a view will read the source dataset and apply transforms on it.
# It processes all data using `modify_feature_udf` on the fly.
for d in view_runner.read():
  print(d)

mv = view.materialize("/path/to/<mybucket>/example_mv")
# Or use a catalog:
# mv = catalog.materialize("example_mv", view)

mv_runner = mv.ray()
# Refresh the MV up to version tag `after_add` of the source.
mv_runner.refresh("after_add", batch_size=64)  # Reading batch size
# Or, mv_runner.refresh() refresh to the latest version

# Use the MV runner instead of view runner to directly read from materialized
# view files, no data processing any more.
mv_runner.read_all()

Example of join

See a full example in the Segment Anything example. Creating a materialized view of join result is not supported yet.

# If input is a materialized view, using `mv.dataset` instead of `mv.view`
# Only support 1 join key, it must be primary key of both left and right.
joined_view = mv_left.dataset.join(mv_right.dataset, keys=["id"])

ML Frameworks Integration

There are several ways to integrate Space storage with ML frameworks. Space provides a random access data source for reading data in ArrayRecord files:

from space import RandomAccessDataSource

datasource = RandomAccessDataSource(
  # <field-name>: <storage-location>, for reading data from ArrayRecord files.
  {
    "feature": "/path/to/<mybucket>/example_mv",
  },
  # Don't auto deserialize data, because we store them as plain bytes.
  deserialize=False)

len(datasource)
datasource[2]

A dataset or view can also be read as a Ray dataset:

ray_ds = ds.ray_dataset()
ray_ds.take(2)

Data in Parquet files can be read as a HuggingFace dataset:

from datasets import load_dataset

huggingface_ds = load_dataset("parquet", data_files={"train": ds.index_files()})

Inspect Metadata

List file path of all index (Parquet) files:

ds.index_files()
# Or show more statistics information of Parquet files.
ds.storage.index_manifest()  # Accept filter and snapshot_id

Show statistics information of all ArrayRecord files:

ds.storage.record_manifest()  # Accept filter and snapshot_id

Status

Space is a new project under active development.

๐Ÿšง Ongoing tasks:

  • Performance benchmark and improvement.

Disclaimer

This is not an officially supported Google product.

space's People

Contributors

coufon avatar huan233usc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

space's Issues

Loading from TFDS/Parquet without copying files results in files at two locations

The features of loading a TFDS datasets (append_array_record) and Parquet files (append_parquet) don't copy/rewrite the source files. As the consequence, a Space's dataset will be split across two locations: the original files and the new Space storage directory.

To support an option that first copies or moves the source files to the Space storage directory. Note that such copy should be still faster than writing files in the normal append methods.

Ray runner should offload all data processing to Ray

The Ray runner still has many local operations, only offload some steps that are trivial to parallelize. Ideally all data and metadata processing (include commit) of Ray runner should run in the Ray cluster instead of the client.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.