Overview
Data loaders provide efficient batching, shuffling, and parallel loading of datasets for training and evaluation.
DataLoader Class
class DataLoader:
def __init__(
self,
dataset: Dataset,
batch_size: int = 1,
shuffle: bool = False,
num_workers: int = 0,
pin_memory: bool = False,
drop_last: bool = False,
collate_fn: Optional[Callable] = None,
)
Parameters
Dataset to load data from.
Number of samples per batch.
Whether to shuffle the data at the beginning of each epoch.
Number of worker processes for parallel data loading. 0 means data will be loaded in the main process.
If True, the data loader will copy tensors into CUDA pinned memory before returning them. Useful for GPU training.
Whether to drop the last incomplete batch if the dataset size is not divisible by the batch size.
Function to merge a list of samples into a batch. If None, uses default collation.
Methods
iter
Return an iterator over the dataset.
len
Return the number of batches.
Number of batches in the data loader.
DistributedDataLoader
class DistributedDataLoader(DataLoader):
def __init__(
self,
dataset: Dataset,
batch_size: int = 1,
shuffle: bool = False,
num_workers: int = 0,
rank: int = 0,
world_size: int = 1,
**kwargs
)
Data loader for distributed training across multiple devices.
Additional Parameters
Rank of the current process in distributed training.
Total number of processes in distributed training.
Utility Functions
default_collate
def default_collate(batch: List[Any]) -> Any
Default collation function that stacks samples into batches.
List of samples to collate.
worker_init_fn
def worker_init_fn(worker_id: int) -> None
Initialization function for data loader workers.
Example Usage
import neurenix as nx
from neurenix.data import Dataset, DataLoader, load_dataset
# Load dataset
dataset = load_dataset("train_data.csv")
print(f"Dataset size: {len(dataset)}")
# Create data loader
train_loader = DataLoader(
dataset,
batch_size=32,
shuffle=True,
num_workers=4,
pin_memory=True, # For GPU training
drop_last=True
)
print(f"Number of batches: {len(train_loader)}")
# Iterate over batches
for epoch in range(10):
for batch_idx, batch in enumerate(train_loader):
# batch is a tensor of shape (batch_size, ...)
inputs, labels = batch
# Training step
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
# Custom collate function
def custom_collate(batch):
# Custom batching logic
data = [item[0] for item in batch]
labels = [item[1] for item in batch]
# Stack into tensors
data_tensor = nx.Tensor.stack(data)
labels_tensor = nx.Tensor.stack(labels)
return data_tensor, labels_tensor
# Use custom collate function
custom_loader = DataLoader(
dataset,
batch_size=64,
shuffle=True,
collate_fn=custom_collate
)
# Distributed training
from neurenix.data import DistributedDataLoader
rank = 0 # Process rank
world_size = 4 # Total number of GPUs
dist_loader = DistributedDataLoader(
dataset,
batch_size=32,
shuffle=True,
num_workers=2,
rank=rank,
world_size=world_size
)
for batch in dist_loader:
# Each process gets a different subset
outputs = model(batch)
# Variable length sequences
def pad_collate(batch):
"""Collate function for variable length sequences."""
sequences = [item[0] for item in batch]
labels = [item[1] for item in batch]
# Find max length
max_len = max(len(seq) for seq in sequences)
# Pad sequences
padded = []
for seq in sequences:
pad_len = max_len - len(seq)
padded_seq = nx.Tensor.cat([
seq,
nx.Tensor.zeros((pad_len,) + seq.shape[1:])
])
padded.append(padded_seq)
return nx.Tensor.stack(padded), nx.Tensor(labels)
seq_loader = DataLoader(
sequence_dataset,
batch_size=16,
collate_fn=pad_collate
)
# Validation loader (no shuffle)
val_loader = DataLoader(
val_dataset,
batch_size=64,
shuffle=False,
num_workers=4
)
model.eval()
with nx.Tensor.no_grad():
for batch in val_loader:
predictions = model(batch)
# Evaluate predictions
num_workers: Use 2-8 workers for optimal performance. Too many workers can cause overhead.
pin_memory: Enable for GPU training to speed up host-to-device transfers.
prefetch: DataLoader automatically prefetches batches in the background for better throughput.
batch_size: Larger batch sizes improve GPU utilization but require more memory. Find the sweet spot for your hardware.
Common Patterns
Training Loop
for epoch in range(num_epochs):
model.train()
for batch in train_loader:
# Training step
loss = train_step(model, batch)
model.eval()
with nx.Tensor.no_grad():
for batch in val_loader:
# Validation step
val_loss = validate_step(model, batch)
Multi-GPU Training
# Wrap model for data parallel training
model = nx.DataParallel(model, device_ids=[0, 1, 2, 3])
# Data loader automatically distributes batches
train_loader = DataLoader(
dataset,
batch_size=128, # Total batch size across all GPUs
shuffle=True,
num_workers=8
)
for batch in train_loader:
outputs = model(batch) # Automatically parallelized