NEW Browse AI tools across categories — updated daily. See what's new →

Ray Data

Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s o...

Version1.0.0
LicenseMIT
Token count~1,825
UpdatedMay 27, 2026

Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s of nodes. Use for batch inference, data preprocessing, multi-modal data loading, or distributed ETL pipelines.

Install

Quick install

via npx skills · works with 57+ agents
npx skills add https://github.com/davila7/claude-code-templates/tree/main/cli-tool/components/skills/ai-research/data-processing-ray-data
Or pick agent:
npx skills add davila7/claude-code-templates --skill ray-data --agent claude-code
npx skills add davila7/claude-code-templates --skill ray-data --agent cursor
npx skills add davila7/claude-code-templates --skill ray-data --agent codex
npx skills add davila7/claude-code-templates --skill ray-data --agent opencode
npx skills add davila7/claude-code-templates --skill ray-data --agent github-copilot
npx skills add davila7/claude-code-templates --skill ray-data --agent windsurf
More install options

Shorthand — useful for multi-skill repos:

npx skills add davila7/claude-code-templates --skill ray-data

Manual — clone the repo and drop the folder into your agent's skills directory:

git clone https://github.com/davila7/claude-code-templates.git
cp -r claude-code-templates/cli-tool/components/skills/ai-research/data-processing-ray-data ~/.claude/skills/
How to use: Once installed, ask your agent to "use the ray-data skill" or describe what you want (e.g. "Scalable data processing for ML workloads. Streaming execution across CPU/GPU, s"). Requires Node.js 18+.

Ray Data - Scalable ML Data Processing

Distributed data processing library for ML and AI workloads.

When to use Ray Data

Use Ray Data when:


  • Processing large datasets (>100GB) for ML training

  • Need distributed data preprocessing across cluster

  • Building batch inference pipelines

  • Loading multi-modal data (images, audio, video)

  • Scaling data processing from laptop to cluster

Key features:


  • Streaming execution: Process data larger than memory

  • GPU support: Accelerate transforms with GPUs

  • Framework integration: PyTorch, TensorFlow, HuggingFace

  • Multi-modal: Images, Parquet, CSV, JSON, audio, video

Use alternatives instead:


  • Pandas: Small data (<1GB) on single machine

  • Dask: Tabular data, SQL-like operations

  • Spark: Enterprise ETL, SQL queries

Quick start

Installation

pip install -U 'ray[data]'

Load and transform data

import ray

# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})

# Consume data
for batch in ds.iter_batches(batch_size=100):
    print(batch)

Integration with Ray Train

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")

def train_func(config):
    # Access dataset in training
    train_ds = ray.train.get_dataset_shard("train")

    for epoch in range(10):
        for batch in train_ds.iter_batches(batch_size=32):
            # Train on batch
            pass

# Train with Ray
trainer = TorchTrainer(
    train_func,
    datasets={"train": train_ds},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()

Reading data

From cloud storage

import ray

# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")

# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")

# Images
ds = ray.data.read_images("s3://bucket/images/")

From Python objects

# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])

# From range
ds = ray.data.range(1000000)  # Synthetic data

# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)

Transformations

Map batches (vectorized)

# Batch transformation (fast)
def process_batch(batch):
    batch["doubled"] = batch["value"] * 2
    return batch

ds = ds.map_batches(process_batch, batch_size=1000)

Row transformations

# Row-by-row (slower)
def process_row(row):
    row["squared"] = row["value"] ** 2
    return row

ds = ds.map(process_row)

Filter

# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)

Group by and aggregate

# Group by column
ds = ds.groupby("category").count()

# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})

GPU-accelerated transforms

# Use GPU for preprocessing
def preprocess_images_gpu(batch):
    import torch
    images = torch.tensor(batch["image"]).cuda()
    # GPU preprocessing
    processed = images * 255
    return {"processed": processed.cpu().numpy()}

ds = ds.map_batches(
    preprocess_images_gpu,
    batch_size=64,
    num_gpus=1  # Request GPU
)

Writing data

# Write to Parquet
ds.write_parquet("s3://bucket/output/")

# Write to CSV
ds.write_csv("output/")

# Write to JSON
ds.write_json("output/")

Performance optimization

Repartition

# Control parallelism
ds = ds.repartition(100)  # 100 blocks for 100-core cluster

Batch size tuning

# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000)  # vs batch_size=100

Streaming execution

# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
    process(batch)  # Streamed, not loaded to memory

Common patterns

Batch inference

import ray

# Load model
def load_model():
    # Load once per worker
    return MyModel()

# Inference function
class BatchInference:
    def __init__(self):
        self.model = load_model()

    def __call__(self, batch):
        predictions = self.model(batch["input"])
        return {"prediction": predictions}

# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")

Data preprocessing pipeline

# Multi-step pipeline
ds = (
    ray.data.read_parquet("s3://raw/")
    .map_batches(clean_data)
    .map_batches(tokenize)
    .map_batches(augment)
    .write_parquet("s3://processed/")
)

Integration with ML frameworks

PyTorch

# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)

for batch in torch_ds:
    # batch is dict with tensors
    inputs, labels = batch["features"], batch["label"]

TensorFlow

# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)

for features, labels in tf_ds:
    # Train model
    pass

Supported data formats

| Format | Read | Write | Use Case |
|--------|------|-------|----------|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |

Performance benchmarks

Scaling (processing 100GB data):


  • 1 node (16 cores): ~30 minutes

  • 4 nodes (64 cores): ~8 minutes

  • 16 nodes (256 cores): ~2 minutes

GPU acceleration (image preprocessing):


  • CPU only: 1,000 images/sec

  • 1 GPU: 5,000 images/sec

  • 4 GPUs: 18,000 images/sec

Use cases

Production deployments:


  • Pinterest: Last-mile data processing for model training

  • ByteDance: Scaling offline inference with multi-modal LLMs

  • Spotify: ML platform for batch inference

References

  • [Transformations Guide](references/transformations.md) - Map, filter, groupby operations
  • [Integration Guide](references/integration.md) - Ray Train, PyTorch, TensorFlow

Resources

  • Docs: https://docs.ray.io/en/latest/data/data.html
  • GitHub: https://github.com/ray-project/ray ⭐ 36,000+
  • Version: Ray 2.40.0+
  • Examples: https://docs.ray.io/en/latest/data/examples/overview.html

SKILL.md source

---
name: ray-data
description: Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s o...
---

# Ray Data - Scalable ML Data Processing

Distributed data processing library for ML and AI workloads.

## When to use Ray Data

**Use Ray Data when:**
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster

**Key features**:
- **Streaming execution**: Process data larger than memory
- **GPU support**: Accelerate transforms with GPUs
- **Framework integration**: PyTorch, TensorFlow, HuggingFace
- **Multi-modal**: Images, Parquet, CSV, JSON, audio, video

**Use alternatives instead**:
- **Pandas**: Small data (<1GB) on single machine
- **Dask**: Tabular data, SQL-like operations
- **Spark**: Enterprise ETL, SQL queries

## Quick start

### Installation

```bash
pip install -U 'ray[data]'
```

### Load and transform data

```python
import ray

# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})

# Consume data
for batch in ds.iter_batches(batch_size=100):
    print(batch)
```

### Integration with Ray Train

```python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")

def train_func(config):
    # Access dataset in training
    train_ds = ray.train.get_dataset_shard("train")

    for epoch in range(10):
        for batch in train_ds.iter_batches(batch_size=32):
            # Train on batch
            pass

# Train with Ray
trainer = TorchTrainer(
    train_func,
    datasets={"train": train_ds},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
```

## Reading data

### From cloud storage

```python
import ray

# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")

# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")

# Images
ds = ray.data.read_images("s3://bucket/images/")
```

### From Python objects

```python
# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])

# From range
ds = ray.data.range(1000000)  # Synthetic data

# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
```

## Transformations

### Map batches (vectorized)

```python
# Batch transformation (fast)
def process_batch(batch):
    batch["doubled"] = batch["value"] * 2
    return batch

ds = ds.map_batches(process_batch, batch_size=1000)
```

### Row transformations

```python
# Row-by-row (slower)
def process_row(row):
    row["squared"] = row["value"] ** 2
    return row

ds = ds.map(process_row)
```

### Filter

```python
# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)
```

### Group by and aggregate

```python
# Group by column
ds = ds.groupby("category").count()

# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
```

## GPU-accelerated transforms

```python
# Use GPU for preprocessing
def preprocess_images_gpu(batch):
    import torch
    images = torch.tensor(batch["image"]).cuda()
    # GPU preprocessing
    processed = images * 255
    return {"processed": processed.cpu().numpy()}

ds = ds.map_batches(
    preprocess_images_gpu,
    batch_size=64,
    num_gpus=1  # Request GPU
)
```

## Writing data

```python
# Write to Parquet
ds.write_parquet("s3://bucket/output/")

# Write to CSV
ds.write_csv("output/")

# Write to JSON
ds.write_json("output/")
```

## Performance optimization

### Repartition

```python
# Control parallelism
ds = ds.repartition(100)  # 100 blocks for 100-core cluster
```

### Batch size tuning

```python
# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000)  # vs batch_size=100
```

### Streaming execution

```python
# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
    process(batch)  # Streamed, not loaded to memory
```

## Common patterns

### Batch inference

```python
import ray

# Load model
def load_model():
    # Load once per worker
    return MyModel()

# Inference function
class BatchInference:
    def __init__(self):
        self.model = load_model()

    def __call__(self, batch):
        predictions = self.model(batch["input"])
        return {"prediction": predictions}

# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
```

### Data preprocessing pipeline

```python
# Multi-step pipeline
ds = (
    ray.data.read_parquet("s3://raw/")
    .map_batches(clean_data)
    .map_batches(tokenize)
    .map_batches(augment)
    .write_parquet("s3://processed/")
)
```

## Integration with ML frameworks

### PyTorch

```python
# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)

for batch in torch_ds:
    # batch is dict with tensors
    inputs, labels = batch["features"], batch["label"]
```

### TensorFlow

```python
# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)

for features, labels in tf_ds:
    # Train model
    pass
```

## Supported data formats

| Format | Read | Write | Use Case |
|--------|------|-------|----------|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |

## Performance benchmarks

**Scaling** (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes

**GPU acceleration** (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec

## Use cases

**Production deployments**:
- **Pinterest**: Last-mile data processing for model training
- **ByteDance**: Scaling offline inference with multi-modal LLMs
- **Spotify**: ML platform for batch inference

## References

- **[Transformations Guide](references/transformations.md)** - Map, filter, groupby operations
- **[Integration Guide](references/integration.md)** - Ray Train, PyTorch, TensorFlow

## Resources

- **Docs**: https://docs.ray.io/en/latest/data/data.html
- **GitHub**: https://github.com/ray-project/ray ⭐ 36,000+
- **Version**: Ray 2.40.0+
- **Examples**: https://docs.ray.io/en/latest/data/examples/overview.html

Related skills 6

caveman

★ Featured

Ultra-compressed communication mode. Cuts token usage ~75% by speaking like caveman while keeping full technical accuracy. Supports intensity levels: lite, full (default), ultra, wenyan-lite, wenyan-full, wenyan-ultra. Use when user says "caveman mode", "talk like caveman", "use caveman", "less tokens", "be brief", or invokes /caveman. Also auto-triggers when token efficiency is requested.

juliusbrussee 167k
Development

secure-linux-web-hosting

★ Featured

Use when setting up, hardening, or reviewing a cloud server for self-hosting, including DNS, SSH, firewalls, Nginx, static-site hosting, reverse-proxying an app, HTTPS with Let's Encrypt or ACME clients, safe HTTP-to-HTTPS redirects, or optional post-launch network tuning such as BBR.

xixu-me 155k
Development

readme-i18n

★ Featured

Use when the user wants to translate a repository README, make a repo multilingual, localize docs, add a language switcher, internationalize the README, or update localized README variants in a GitHub-style repository.

xixu-me 155k
Development

lark-shared

★ Featured

Use when first setting up lark-cli, running auth login, switching user/bot identity (--as), handling permission denied or scope errors, needing to update lark-cli, or seeing _notice in JSON output.

larksuite 155k
Development

improve-codebase-architecture

★ Featured

Find deepening opportunities in a codebase, informed by the domain language in CONTEXT.md and the decisions in docs/adr/. Use when the user wants to improve architecture, find refactoring opportunities, consolidate tightly-coupled modules, or make a codebase more testable and AI-navigable.

mattpocock 151k
Development

paper-context-resolver

★ Featured

Optional RigorPilot helper for README-first deep learning repo reproduction. Use only when the README and repository files leave a narrow reproduction-critical gap and the task is to resolve a specific paper detail such as dataset split, preprocessing, evaluation protocol, checkpoint mapping, or runtime assumption from primary paper sources while recording conflicts. Do not use for general paper summary, repo scanning, environment setup, command execution, title-only paper lookup, or replacin...

lllllllama 127k
Development