Skip to main content

Overview

Effective training of RL agents requires careful setup of the training loop, monitoring, and optimization. This guide covers best practices and advanced techniques.

Basic Training Loop

Agent Training Method

All agents provide a train() method:
from neurenix.rl import DQN

# Create agent
agent = DQN(
    observation_space=obs_space,
    action_space=action_space,
    learning_rate=0.001
)

# Train agent
metrics = agent.train(
    env=env,
    episodes=1000,
    max_steps=200,
    render=False,
    verbose=True,
    callback=None
)
Source: neurenix/rl/algorithms.py:107

Manual Training Loop

For more control, implement your own training loop:
from neurenix.rl.agent import Agent
import numpy as np

# Training loop
for episode in range(episodes):
    # Reset environment
    state = env.reset()
    episode_reward = 0
    episode_length = 0
    done = False
    
    # Episode loop
    while not done and episode_length < max_steps:
        # Select action
        action = agent.act(state)
        
        # Take action
        next_state, reward, done, info = env.step(action)
        
        # Update agent
        update_metrics = agent.update(
            state, action, reward, next_state, done
        )
        
        # Accumulate metrics
        episode_reward += reward
        episode_length += 1
        state = next_state
    
    # Log episode metrics
    print(f"Episode {episode}: Reward={episode_reward:.2f}")
Source: neurenix/rl/agent.py:99

Value Functions

Q-Function

Estimates state-action values:
from neurenix.rl.value import QFunction
from neurenix.nn import Sequential, Linear, ReLU
from neurenix.optim import Adam

# Create Q-network
q_network = Sequential(
    Linear(state_dim, 64),
    ReLU(),
    Linear(64, 64),
    ReLU(),
    Linear(64, action_dim)
)

# Create target network
target_network = q_network.clone()

# Create optimizer
optimizer = Adam(q_network.parameters(), lr=0.001)

# Create Q-function
q_function = QFunction(
    q_network=q_network,
    target_network=target_network,
    optimizer=optimizer,
    observation_space=obs_space,
    action_space=action_space,
    name="QFunction"
)
Source: neurenix/rl/value.py:101

Update Q-Function

from neurenix.tensor import Tensor

# Prepare batch
states = Tensor.stack([s for s, _, _, _, _ in batch])
actions = Tensor([a for _, a, _, _, _ in batch])
rewards = Tensor([r for _, _, r, _, _ in batch])
next_states = Tensor.stack([ns for _, _, _, ns, _ in batch])
dones = Tensor([d for _, _, _, _, d in batch])

# Update Q-function
metrics = q_function.update(
    states=states,
    actions=actions,
    rewards=rewards,
    next_states=next_states,
    dones=dones,
    gamma=0.99
)

print(f"Value loss: {metrics['value_loss']:.4f}")
Source: neurenix/rl/value.py:157

Value Network Function

Estimates state values:
from neurenix.rl.value import ValueNetworkFunction

# Create value network
value_network = Sequential(
    Linear(state_dim, 64),
    ReLU(),
    Linear(64, 64),
    ReLU(),
    Linear(64, 1)  # Single value output
)

optimizer = Adam(value_network.parameters(), lr=0.001)

# Create value function
value_function = ValueNetworkFunction(
    value_network=value_network,
    optimizer=optimizer,
    observation_space=obs_space,
    name="ValueFunction"
)

# Estimate value
value = value_function(state)
print(f"State value: {value.item():.4f}")
Source: neurenix/rl/value.py:263

Advantage Function

Combines value and Q-functions:
from neurenix.rl.value import AdvantageFunction

# Create advantage function
advantage_function = AdvantageFunction(
    value_function=value_function,
    q_function=q_function,
    name="Advantage"
)

# Estimate advantage
advantages = advantage_function.estimate_advantage(state)
print(f"Advantages: {advantages}")

# For specific action
advantage = advantage_function.estimate_advantage(state, action=0)
print(f"Advantage for action 0: {advantage.item():.4f}")
Source: neurenix/rl/value.py:379

Experience Replay

Replay Buffer

from collections import deque
import numpy as np

class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        batch = [self.buffer[i] for i in indices]
        
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)

# Usage
buffer = ReplayBuffer(capacity=10000)

# Store experience
buffer.add(state, action, reward, next_state, done)

# Sample batch
if len(buffer) >= batch_size:
    states, actions, rewards, next_states, dones = buffer.sample(batch_size)
Source: neurenix/rl/agent.py:323

Prioritized Experience Replay

import numpy as np

class PrioritizedReplayBuffer:
    def __init__(self, capacity=10000, alpha=0.6):
        self.capacity = capacity
        self.alpha = alpha
        self.buffer = []
        self.priorities = np.zeros(capacity, dtype=np.float32)
        self.position = 0
        self.size = 0
    
    def add(self, state, action, reward, next_state, done, td_error):
        priority = (abs(td_error) + 1e-5) ** self.alpha
        
        if self.size < self.capacity:
            self.buffer.append((state, action, reward, next_state, done))
        else:
            self.buffer[self.position] = (state, action, reward, next_state, done)
        
        self.priorities[self.position] = priority
        self.position = (self.position + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)
    
    def sample(self, batch_size, beta=0.4):
        priorities = self.priorities[:self.size]
        probs = priorities / priorities.sum()
        
        indices = np.random.choice(self.size, batch_size, p=probs, replace=False)
        samples = [self.buffer[i] for i in indices]
        
        # Importance sampling weights
        weights = (self.size * probs[indices]) ** (-beta)
        weights /= weights.max()
        
        return samples, indices, weights
    
    def update_priorities(self, indices, td_errors):
        for idx, td_error in zip(indices, td_errors):
            self.priorities[idx] = (abs(td_error) + 1e-5) ** self.alpha

# Usage
buffer = PrioritizedReplayBuffer(capacity=10000, alpha=0.6)

# Calculate TD error
q_value = q_network(state)[action]
target = reward + gamma * target_network(next_state).max()
td_error = target - q_value

# Store with priority
buffer.add(state, action, reward, next_state, done, td_error.item())

# Sample with importance sampling
samples, indices, weights = buffer.sample(batch_size, beta=0.4)

# Update priorities after training
buffer.update_priorities(indices, new_td_errors)

Training Callbacks

Custom Callbacks

from typing import Dict, Any

def training_callback(metrics: Dict[str, Any]) -> bool:
    """
    Callback function called after each episode.
    
    Args:
        metrics: Dictionary containing episode metrics
            - episode: Episode number
            - reward: Episode reward
            - length: Episode length
            - Additional algorithm-specific metrics
    
    Returns:
        True to stop training, False to continue
    """
    episode = metrics["episode"]
    reward = metrics["reward"]
    length = metrics["length"]
    
    # Log metrics
    print(f"Episode {episode}: Reward={reward:.2f}, Length={length}")
    
    # Early stopping
    if reward > 195:  # Target reward
        print("Target reward reached!")
        return True
    
    # Continue training
    return False

# Use callback
metrics = agent.train(
    env=env,
    episodes=1000,
    callback=training_callback
)
Source: neurenix/rl/agent.py:106

Logging Integration

import wandb

class WandbCallback:
    def __init__(self, project="rl-training"):
        wandb.init(project=project)
    
    def __call__(self, metrics):
        # Log to Weights & Biases
        wandb.log({
            "episode": metrics["episode"],
            "reward": metrics["reward"],
            "length": metrics["length"],
        })
        return False

# Use with agent
callback = WandbCallback(project="dqn-cartpole")
metrics = agent.train(env=env, episodes=1000, callback=callback)

Checkpoint Saving

import os

class CheckpointCallback:
    def __init__(self, save_dir="checkpoints", save_freq=100):
        self.save_dir = save_dir
        self.save_freq = save_freq
        os.makedirs(save_dir, exist_ok=True)
    
    def __call__(self, metrics):
        episode = metrics["episode"]
        
        if episode % self.save_freq == 0:
            path = os.path.join(self.save_dir, f"agent_ep{episode}")
            agent.save(path)
            print(f"Saved checkpoint to {path}")
        
        return False

# Use with agent
callback = CheckpointCallback(save_dir="models", save_freq=100)
metrics = agent.train(env=env, episodes=1000, callback=callback)

Multi-Agent Training

Multi-Agent System

from neurenix.rl.agent import MultiAgentSystem

# Create multiple agents
agents = [
    DQN(obs_space, action_space, name="Agent1"),
    DQN(obs_space, action_space, name="Agent2"),
    DQN(obs_space, action_space, name="Agent3")
]

# Create multi-agent system
mas = MultiAgentSystem(
    agents=agents,
    env=multi_agent_env,
    name="CooperativeAgents"
)

# Train all agents together
metrics = mas.train(
    episodes=1000,
    max_steps=200,
    verbose=True
)

# Access per-agent rewards
for i, agent_rewards in enumerate(metrics["agent_rewards"]):
    mean_reward = np.mean(agent_rewards)
    print(f"Agent {i} mean reward: {mean_reward:.2f}")
Source: neurenix/rl/agent.py:393

Cooperative Learning

# Shared reward
class CooperativeEnv:
    def step(self, actions):
        # All agents get same reward
        shared_reward = self._compute_team_reward()
        rewards = [shared_reward] * len(actions)
        return next_states, rewards, done, info

Competitive Learning

# Opposing rewards
class CompetitiveEnv:
    def step(self, actions):
        # Zero-sum rewards
        reward1 = self._compute_reward(actions[0])
        reward2 = -reward1
        rewards = [reward1, reward2]
        return next_states, rewards, done, info

Performance Optimization

Vectorized Environments

import numpy as np

class VectorizedEnv:
    """Run multiple environments in parallel."""
    
    def __init__(self, env_fns):
        self.envs = [fn() for fn in env_fns]
        self.n_envs = len(self.envs)
    
    def reset(self):
        return np.array([env.reset() for env in self.envs])
    
    def step(self, actions):
        results = [env.step(a) for env, a in zip(self.envs, actions)]
        states, rewards, dones, infos = zip(*results)
        return np.array(states), np.array(rewards), np.array(dones), infos
    
    def close(self):
        for env in self.envs:
            env.close()

# Usage
env_fns = [lambda: GridWorld() for _ in range(8)]
vec_env = VectorizedEnv(env_fns)

# Reset all environments
states = vec_env.reset()  # Shape: (8, state_dim)

# Step all environments
actions = [agent.act(s) for s in states]
next_states, rewards, dones, infos = vec_env.step(actions)

Gradient Accumulation

# Accumulate gradients over multiple batches
accumulation_steps = 4
optimizer.zero_grad()

for i in range(accumulation_steps):
    # Sample batch
    batch = buffer.sample(batch_size)
    
    # Compute loss
    loss = compute_loss(batch) / accumulation_steps
    
    # Accumulate gradients
    loss.backward()

# Update parameters
optimizer.step()

Mixed Precision Training

from neurenix import autocast

# Use mixed precision for faster training
with autocast():
    q_values = q_network(states)
    target_values = compute_targets(next_states)
    loss = ((q_values - target_values) ** 2).mean()

loss.backward()
optimizer.step()

Hyperparameter Tuning

import itertools

# Define hyperparameter grid
hyperparams = {
    "learning_rate": [0.0001, 0.001, 0.01],
    "gamma": [0.95, 0.99, 0.995],
    "buffer_size": [5000, 10000, 50000],
}

# Grid search
best_reward = -float("inf")
best_params = None

for lr, gamma, buffer_size in itertools.product(*hyperparams.values()):
    # Create agent with hyperparameters
    agent = DQN(
        observation_space=obs_space,
        action_space=action_space,
        learning_rate=lr,
        gamma=gamma,
        buffer_size=buffer_size
    )
    
    # Train
    metrics = agent.train(env=env, episodes=100, verbose=False)
    mean_reward = np.mean(metrics["episode_rewards"][-10:])
    
    # Track best
    if mean_reward > best_reward:
        best_reward = mean_reward
        best_params = {"lr": lr, "gamma": gamma, "buffer_size": buffer_size}

print(f"Best params: {best_params}")
print(f"Best reward: {best_reward:.2f}")
import numpy as np

def random_search(n_trials=50):
    best_reward = -float("inf")
    best_params = None
    
    for trial in range(n_trials):
        # Sample random hyperparameters
        params = {
            "learning_rate": 10 ** np.random.uniform(-5, -2),
            "gamma": np.random.uniform(0.9, 0.999),
            "buffer_size": int(10 ** np.random.uniform(3, 5)),
            "epsilon_decay": np.random.uniform(0.99, 0.999),
        }
        
        # Create and train agent
        agent = DQN(obs_space, action_space, **params)
        metrics = agent.train(env=env, episodes=100, verbose=False)
        mean_reward = np.mean(metrics["episode_rewards"][-10:])
        
        # Track best
        if mean_reward > best_reward:
            best_reward = mean_reward
            best_params = params
            print(f"Trial {trial}: New best reward {mean_reward:.2f}")
    
    return best_params, best_reward

best_params, best_reward = random_search(n_trials=50)

Monitoring and Metrics

Training Metrics

# Train agent
metrics = agent.train(env=env, episodes=1000, verbose=True)

# Plot episode rewards
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(metrics["episode_rewards"])
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Training Progress")
plt.show()

# Rolling average
window = 100
rolling_mean = np.convolve(
    metrics["episode_rewards"],
    np.ones(window) / window,
    mode="valid"
)

plt.figure(figsize=(10, 6))
plt.plot(rolling_mean)
plt.xlabel("Episode")
plt.ylabel(f"Average Reward (last {window} episodes)")
plt.title("Training Progress (Smoothed)")
plt.show()
Source: neurenix/rl/agent.py:99

Evaluation Metrics

def evaluate_agent(agent, env, n_episodes=100):
    """Evaluate agent performance."""
    rewards = []
    lengths = []
    
    for _ in range(n_episodes):
        state = env.reset()
        episode_reward = 0
        episode_length = 0
        done = False
        
        while not done:
            action = agent.act(state)
            state, reward, done, _ = env.step(action)
            episode_reward += reward
            episode_length += 1
        
        rewards.append(episode_reward)
        lengths.append(episode_length)
    
    return {
        "mean_reward": np.mean(rewards),
        "std_reward": np.std(rewards),
        "min_reward": np.min(rewards),
        "max_reward": np.max(rewards),
        "mean_length": np.mean(lengths),
    }

# Evaluate
results = evaluate_agent(agent, env, n_episodes=100)
print(f"Mean reward: {results['mean_reward']:.2f} ± {results['std_reward']:.2f}")
print(f"Range: [{results['min_reward']:.2f}, {results['max_reward']:.2f}]")
print(f"Mean length: {results['mean_length']:.1f}")

Saving and Loading

Save Trained Agent

# Save agent
agent.save("models/my_agent")

# This saves both policy and value function
# Files created:
# - models/my_agent_policy
# - models/my_agent_value
Source: neurenix/rl/agent.py:189

Load Trained Agent

# Create agent with same architecture
agent = DQN(
    observation_space=obs_space,
    action_space=action_space
)

# Load trained weights
agent.load("models/my_agent")

# Use for inference
state = env.reset()
action = agent.act(state)
Source: neurenix/rl/agent.py:204

Next Steps

Algorithms

Deep dive into RL algorithms

Overview

RL module overview