Skip to main content

Overview

Data loaders provide efficient batching, shuffling, and parallel loading of datasets for training and evaluation.

DataLoader Class

class DataLoader:
    def __init__(
        self,
        dataset: Dataset,
        batch_size: int = 1,
        shuffle: bool = False,
        num_workers: int = 0,
        pin_memory: bool = False,
        drop_last: bool = False,
        collate_fn: Optional[Callable] = None,
    )

Parameters

dataset
Dataset
required
Dataset to load data from.
batch_size
int
default:"1"
Number of samples per batch.
shuffle
bool
default:"False"
Whether to shuffle the data at the beginning of each epoch.
num_workers
int
default:"0"
Number of worker processes for parallel data loading. 0 means data will be loaded in the main process.
pin_memory
bool
default:"False"
If True, the data loader will copy tensors into CUDA pinned memory before returning them. Useful for GPU training.
drop_last
bool
default:"False"
Whether to drop the last incomplete batch if the dataset size is not divisible by the batch size.
collate_fn
Optional[Callable]
Function to merge a list of samples into a batch. If None, uses default collation.

Methods

iter

def __iter__(self)
Return an iterator over the dataset.

len

def __len__(self) -> int
Return the number of batches.
return
int
Number of batches in the data loader.

DistributedDataLoader

class DistributedDataLoader(DataLoader):
    def __init__(
        self,
        dataset: Dataset,
        batch_size: int = 1,
        shuffle: bool = False,
        num_workers: int = 0,
        rank: int = 0,
        world_size: int = 1,
        **kwargs
    )
Data loader for distributed training across multiple devices.

Additional Parameters

rank
int
default:"0"
Rank of the current process in distributed training.
world_size
int
default:"1"
Total number of processes in distributed training.

Utility Functions

default_collate

def default_collate(batch: List[Any]) -> Any
Default collation function that stacks samples into batches.
batch
List[Any]
required
List of samples to collate.
return
Any
Collated batch.

worker_init_fn

def worker_init_fn(worker_id: int) -> None
Initialization function for data loader workers.

Example Usage

import neurenix as nx
from neurenix.data import Dataset, DataLoader, load_dataset

# Load dataset
dataset = load_dataset("train_data.csv")

print(f"Dataset size: {len(dataset)}")

# Create data loader
train_loader = DataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    pin_memory=True,  # For GPU training
    drop_last=True
)

print(f"Number of batches: {len(train_loader)}")

# Iterate over batches
for epoch in range(10):
    for batch_idx, batch in enumerate(train_loader):
        # batch is a tensor of shape (batch_size, ...)
        inputs, labels = batch
        
        # Training step
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")

# Custom collate function
def custom_collate(batch):
    # Custom batching logic
    data = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Stack into tensors
    data_tensor = nx.Tensor.stack(data)
    labels_tensor = nx.Tensor.stack(labels)
    
    return data_tensor, labels_tensor

# Use custom collate function
custom_loader = DataLoader(
    dataset,
    batch_size=64,
    shuffle=True,
    collate_fn=custom_collate
)

# Distributed training
from neurenix.data import DistributedDataLoader

rank = 0  # Process rank
world_size = 4  # Total number of GPUs

dist_loader = DistributedDataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=2,
    rank=rank,
    world_size=world_size
)

for batch in dist_loader:
    # Each process gets a different subset
    outputs = model(batch)

# Variable length sequences
def pad_collate(batch):
    """Collate function for variable length sequences."""
    sequences = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Find max length
    max_len = max(len(seq) for seq in sequences)
    
    # Pad sequences
    padded = []
    for seq in sequences:
        pad_len = max_len - len(seq)
        padded_seq = nx.Tensor.cat([
            seq,
            nx.Tensor.zeros((pad_len,) + seq.shape[1:])
        ])
        padded.append(padded_seq)
    
    return nx.Tensor.stack(padded), nx.Tensor(labels)

seq_loader = DataLoader(
    sequence_dataset,
    batch_size=16,
    collate_fn=pad_collate
)

# Validation loader (no shuffle)
val_loader = DataLoader(
    val_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=4
)

model.eval()
with nx.Tensor.no_grad():
    for batch in val_loader:
        predictions = model(batch)
        # Evaluate predictions

Performance Tips

num_workers: Use 2-8 workers for optimal performance. Too many workers can cause overhead.
pin_memory: Enable for GPU training to speed up host-to-device transfers.
prefetch: DataLoader automatically prefetches batches in the background for better throughput.
batch_size: Larger batch sizes improve GPU utilization but require more memory. Find the sweet spot for your hardware.

Common Patterns

Training Loop

for epoch in range(num_epochs):
    model.train()
    for batch in train_loader:
        # Training step
        loss = train_step(model, batch)
    
    model.eval()
    with nx.Tensor.no_grad():
        for batch in val_loader:
            # Validation step
            val_loss = validate_step(model, batch)

Multi-GPU Training

# Wrap model for data parallel training
model = nx.DataParallel(model, device_ids=[0, 1, 2, 3])

# Data loader automatically distributes batches
train_loader = DataLoader(
    dataset,
    batch_size=128,  # Total batch size across all GPUs
    shuffle=True,
    num_workers=8
)

for batch in train_loader:
    outputs = model(batch)  # Automatically parallelized