Overview
Effective training of RL agents requires careful setup of the training loop, monitoring, and optimization. This guide covers best practices and advanced techniques.Basic Training Loop
Agent Training Method
All agents provide atrain() method:
from neurenix.rl import DQN
# Create agent
agent = DQN(
observation_space=obs_space,
action_space=action_space,
learning_rate=0.001
)
# Train agent
metrics = agent.train(
env=env,
episodes=1000,
max_steps=200,
render=False,
verbose=True,
callback=None
)
neurenix/rl/algorithms.py:107
Manual Training Loop
For more control, implement your own training loop:from neurenix.rl.agent import Agent
import numpy as np
# Training loop
for episode in range(episodes):
# Reset environment
state = env.reset()
episode_reward = 0
episode_length = 0
done = False
# Episode loop
while not done and episode_length < max_steps:
# Select action
action = agent.act(state)
# Take action
next_state, reward, done, info = env.step(action)
# Update agent
update_metrics = agent.update(
state, action, reward, next_state, done
)
# Accumulate metrics
episode_reward += reward
episode_length += 1
state = next_state
# Log episode metrics
print(f"Episode {episode}: Reward={episode_reward:.2f}")
neurenix/rl/agent.py:99
Value Functions
Q-Function
Estimates state-action values:from neurenix.rl.value import QFunction
from neurenix.nn import Sequential, Linear, ReLU
from neurenix.optim import Adam
# Create Q-network
q_network = Sequential(
Linear(state_dim, 64),
ReLU(),
Linear(64, 64),
ReLU(),
Linear(64, action_dim)
)
# Create target network
target_network = q_network.clone()
# Create optimizer
optimizer = Adam(q_network.parameters(), lr=0.001)
# Create Q-function
q_function = QFunction(
q_network=q_network,
target_network=target_network,
optimizer=optimizer,
observation_space=obs_space,
action_space=action_space,
name="QFunction"
)
neurenix/rl/value.py:101
Update Q-Function
from neurenix.tensor import Tensor
# Prepare batch
states = Tensor.stack([s for s, _, _, _, _ in batch])
actions = Tensor([a for _, a, _, _, _ in batch])
rewards = Tensor([r for _, _, r, _, _ in batch])
next_states = Tensor.stack([ns for _, _, _, ns, _ in batch])
dones = Tensor([d for _, _, _, _, d in batch])
# Update Q-function
metrics = q_function.update(
states=states,
actions=actions,
rewards=rewards,
next_states=next_states,
dones=dones,
gamma=0.99
)
print(f"Value loss: {metrics['value_loss']:.4f}")
neurenix/rl/value.py:157
Value Network Function
Estimates state values:from neurenix.rl.value import ValueNetworkFunction
# Create value network
value_network = Sequential(
Linear(state_dim, 64),
ReLU(),
Linear(64, 64),
ReLU(),
Linear(64, 1) # Single value output
)
optimizer = Adam(value_network.parameters(), lr=0.001)
# Create value function
value_function = ValueNetworkFunction(
value_network=value_network,
optimizer=optimizer,
observation_space=obs_space,
name="ValueFunction"
)
# Estimate value
value = value_function(state)
print(f"State value: {value.item():.4f}")
neurenix/rl/value.py:263
Advantage Function
Combines value and Q-functions:from neurenix.rl.value import AdvantageFunction
# Create advantage function
advantage_function = AdvantageFunction(
value_function=value_function,
q_function=q_function,
name="Advantage"
)
# Estimate advantage
advantages = advantage_function.estimate_advantage(state)
print(f"Advantages: {advantages}")
# For specific action
advantage = advantage_function.estimate_advantage(state, action=0)
print(f"Advantage for action 0: {advantage.item():.4f}")
neurenix/rl/value.py:379
Experience Replay
Replay Buffer
from collections import deque
import numpy as np
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
batch = [self.buffer[i] for i in indices]
states, actions, rewards, next_states, dones = zip(*batch)
return states, actions, rewards, next_states, dones
def __len__(self):
return len(self.buffer)
# Usage
buffer = ReplayBuffer(capacity=10000)
# Store experience
buffer.add(state, action, reward, next_state, done)
# Sample batch
if len(buffer) >= batch_size:
states, actions, rewards, next_states, dones = buffer.sample(batch_size)
neurenix/rl/agent.py:323
Prioritized Experience Replay
import numpy as np
class PrioritizedReplayBuffer:
def __init__(self, capacity=10000, alpha=0.6):
self.capacity = capacity
self.alpha = alpha
self.buffer = []
self.priorities = np.zeros(capacity, dtype=np.float32)
self.position = 0
self.size = 0
def add(self, state, action, reward, next_state, done, td_error):
priority = (abs(td_error) + 1e-5) ** self.alpha
if self.size < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.position] = (state, action, reward, next_state, done)
self.priorities[self.position] = priority
self.position = (self.position + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size, beta=0.4):
priorities = self.priorities[:self.size]
probs = priorities / priorities.sum()
indices = np.random.choice(self.size, batch_size, p=probs, replace=False)
samples = [self.buffer[i] for i in indices]
# Importance sampling weights
weights = (self.size * probs[indices]) ** (-beta)
weights /= weights.max()
return samples, indices, weights
def update_priorities(self, indices, td_errors):
for idx, td_error in zip(indices, td_errors):
self.priorities[idx] = (abs(td_error) + 1e-5) ** self.alpha
# Usage
buffer = PrioritizedReplayBuffer(capacity=10000, alpha=0.6)
# Calculate TD error
q_value = q_network(state)[action]
target = reward + gamma * target_network(next_state).max()
td_error = target - q_value
# Store with priority
buffer.add(state, action, reward, next_state, done, td_error.item())
# Sample with importance sampling
samples, indices, weights = buffer.sample(batch_size, beta=0.4)
# Update priorities after training
buffer.update_priorities(indices, new_td_errors)
Training Callbacks
Custom Callbacks
from typing import Dict, Any
def training_callback(metrics: Dict[str, Any]) -> bool:
"""
Callback function called after each episode.
Args:
metrics: Dictionary containing episode metrics
- episode: Episode number
- reward: Episode reward
- length: Episode length
- Additional algorithm-specific metrics
Returns:
True to stop training, False to continue
"""
episode = metrics["episode"]
reward = metrics["reward"]
length = metrics["length"]
# Log metrics
print(f"Episode {episode}: Reward={reward:.2f}, Length={length}")
# Early stopping
if reward > 195: # Target reward
print("Target reward reached!")
return True
# Continue training
return False
# Use callback
metrics = agent.train(
env=env,
episodes=1000,
callback=training_callback
)
neurenix/rl/agent.py:106
Logging Integration
import wandb
class WandbCallback:
def __init__(self, project="rl-training"):
wandb.init(project=project)
def __call__(self, metrics):
# Log to Weights & Biases
wandb.log({
"episode": metrics["episode"],
"reward": metrics["reward"],
"length": metrics["length"],
})
return False
# Use with agent
callback = WandbCallback(project="dqn-cartpole")
metrics = agent.train(env=env, episodes=1000, callback=callback)
Checkpoint Saving
import os
class CheckpointCallback:
def __init__(self, save_dir="checkpoints", save_freq=100):
self.save_dir = save_dir
self.save_freq = save_freq
os.makedirs(save_dir, exist_ok=True)
def __call__(self, metrics):
episode = metrics["episode"]
if episode % self.save_freq == 0:
path = os.path.join(self.save_dir, f"agent_ep{episode}")
agent.save(path)
print(f"Saved checkpoint to {path}")
return False
# Use with agent
callback = CheckpointCallback(save_dir="models", save_freq=100)
metrics = agent.train(env=env, episodes=1000, callback=callback)
Multi-Agent Training
Multi-Agent System
from neurenix.rl.agent import MultiAgentSystem
# Create multiple agents
agents = [
DQN(obs_space, action_space, name="Agent1"),
DQN(obs_space, action_space, name="Agent2"),
DQN(obs_space, action_space, name="Agent3")
]
# Create multi-agent system
mas = MultiAgentSystem(
agents=agents,
env=multi_agent_env,
name="CooperativeAgents"
)
# Train all agents together
metrics = mas.train(
episodes=1000,
max_steps=200,
verbose=True
)
# Access per-agent rewards
for i, agent_rewards in enumerate(metrics["agent_rewards"]):
mean_reward = np.mean(agent_rewards)
print(f"Agent {i} mean reward: {mean_reward:.2f}")
neurenix/rl/agent.py:393
Cooperative Learning
# Shared reward
class CooperativeEnv:
def step(self, actions):
# All agents get same reward
shared_reward = self._compute_team_reward()
rewards = [shared_reward] * len(actions)
return next_states, rewards, done, info
Competitive Learning
# Opposing rewards
class CompetitiveEnv:
def step(self, actions):
# Zero-sum rewards
reward1 = self._compute_reward(actions[0])
reward2 = -reward1
rewards = [reward1, reward2]
return next_states, rewards, done, info
Performance Optimization
Vectorized Environments
import numpy as np
class VectorizedEnv:
"""Run multiple environments in parallel."""
def __init__(self, env_fns):
self.envs = [fn() for fn in env_fns]
self.n_envs = len(self.envs)
def reset(self):
return np.array([env.reset() for env in self.envs])
def step(self, actions):
results = [env.step(a) for env, a in zip(self.envs, actions)]
states, rewards, dones, infos = zip(*results)
return np.array(states), np.array(rewards), np.array(dones), infos
def close(self):
for env in self.envs:
env.close()
# Usage
env_fns = [lambda: GridWorld() for _ in range(8)]
vec_env = VectorizedEnv(env_fns)
# Reset all environments
states = vec_env.reset() # Shape: (8, state_dim)
# Step all environments
actions = [agent.act(s) for s in states]
next_states, rewards, dones, infos = vec_env.step(actions)
Gradient Accumulation
# Accumulate gradients over multiple batches
accumulation_steps = 4
optimizer.zero_grad()
for i in range(accumulation_steps):
# Sample batch
batch = buffer.sample(batch_size)
# Compute loss
loss = compute_loss(batch) / accumulation_steps
# Accumulate gradients
loss.backward()
# Update parameters
optimizer.step()
Mixed Precision Training
from neurenix import autocast
# Use mixed precision for faster training
with autocast():
q_values = q_network(states)
target_values = compute_targets(next_states)
loss = ((q_values - target_values) ** 2).mean()
loss.backward()
optimizer.step()
Hyperparameter Tuning
Grid Search
import itertools
# Define hyperparameter grid
hyperparams = {
"learning_rate": [0.0001, 0.001, 0.01],
"gamma": [0.95, 0.99, 0.995],
"buffer_size": [5000, 10000, 50000],
}
# Grid search
best_reward = -float("inf")
best_params = None
for lr, gamma, buffer_size in itertools.product(*hyperparams.values()):
# Create agent with hyperparameters
agent = DQN(
observation_space=obs_space,
action_space=action_space,
learning_rate=lr,
gamma=gamma,
buffer_size=buffer_size
)
# Train
metrics = agent.train(env=env, episodes=100, verbose=False)
mean_reward = np.mean(metrics["episode_rewards"][-10:])
# Track best
if mean_reward > best_reward:
best_reward = mean_reward
best_params = {"lr": lr, "gamma": gamma, "buffer_size": buffer_size}
print(f"Best params: {best_params}")
print(f"Best reward: {best_reward:.2f}")
Random Search
import numpy as np
def random_search(n_trials=50):
best_reward = -float("inf")
best_params = None
for trial in range(n_trials):
# Sample random hyperparameters
params = {
"learning_rate": 10 ** np.random.uniform(-5, -2),
"gamma": np.random.uniform(0.9, 0.999),
"buffer_size": int(10 ** np.random.uniform(3, 5)),
"epsilon_decay": np.random.uniform(0.99, 0.999),
}
# Create and train agent
agent = DQN(obs_space, action_space, **params)
metrics = agent.train(env=env, episodes=100, verbose=False)
mean_reward = np.mean(metrics["episode_rewards"][-10:])
# Track best
if mean_reward > best_reward:
best_reward = mean_reward
best_params = params
print(f"Trial {trial}: New best reward {mean_reward:.2f}")
return best_params, best_reward
best_params, best_reward = random_search(n_trials=50)
Monitoring and Metrics
Training Metrics
# Train agent
metrics = agent.train(env=env, episodes=1000, verbose=True)
# Plot episode rewards
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(metrics["episode_rewards"])
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Training Progress")
plt.show()
# Rolling average
window = 100
rolling_mean = np.convolve(
metrics["episode_rewards"],
np.ones(window) / window,
mode="valid"
)
plt.figure(figsize=(10, 6))
plt.plot(rolling_mean)
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (last {window} episodes)")
plt.title("Training Progress (Smoothed)")
plt.show()
neurenix/rl/agent.py:99
Evaluation Metrics
def evaluate_agent(agent, env, n_episodes=100):
"""Evaluate agent performance."""
rewards = []
lengths = []
for _ in range(n_episodes):
state = env.reset()
episode_reward = 0
episode_length = 0
done = False
while not done:
action = agent.act(state)
state, reward, done, _ = env.step(action)
episode_reward += reward
episode_length += 1
rewards.append(episode_reward)
lengths.append(episode_length)
return {
"mean_reward": np.mean(rewards),
"std_reward": np.std(rewards),
"min_reward": np.min(rewards),
"max_reward": np.max(rewards),
"mean_length": np.mean(lengths),
}
# Evaluate
results = evaluate_agent(agent, env, n_episodes=100)
print(f"Mean reward: {results['mean_reward']:.2f} ± {results['std_reward']:.2f}")
print(f"Range: [{results['min_reward']:.2f}, {results['max_reward']:.2f}]")
print(f"Mean length: {results['mean_length']:.1f}")
Saving and Loading
Save Trained Agent
# Save agent
agent.save("models/my_agent")
# This saves both policy and value function
# Files created:
# - models/my_agent_policy
# - models/my_agent_value
neurenix/rl/agent.py:189
Load Trained Agent
# Create agent with same architecture
agent = DQN(
observation_space=obs_space,
action_space=action_space
)
# Load trained weights
agent.load("models/my_agent")
# Use for inference
state = env.reset()
action = agent.act(state)
neurenix/rl/agent.py:204
Next Steps
Algorithms
Deep dive into RL algorithms
Overview
RL module overview