from neurenix.agent import Environment
import numpy as np
class GridWorld(Environment):
def __init__(self, width=10, height=10, num_obstacles=5):
super().__init__()
self.width = width
self.height = height
self.num_obstacles = num_obstacles
self.goal_position = None
self.obstacles = []
def _get_initial_state(self):
"""Define the initial state of the environment."""
# Place goal and obstacles
self.goal_position = [self.width - 1, self.height - 1]
self.obstacles = [
[np.random.randint(0, self.width), np.random.randint(0, self.height)]
for _ in range(self.num_obstacles)
]
return {
"goal": self.goal_position,
"obstacles": self.obstacles,
"agent_positions": {},
"step_count": 0
}
def step(self, actions):
"""Process agent actions and return results."""
rewards = {}
for agent_id, action in actions.items():
# Get current position
current_pos = self._state["agent_positions"].get(agent_id, [0, 0])
# Compute new position based on action
# Actions: 0=up, 1=down, 2=left, 3=right
new_pos = self._compute_new_position(current_pos, action)
# Check if new position is valid
if self._is_valid_position(new_pos):
self._state["agent_positions"][agent_id] = new_pos
# Compute reward
if new_pos == self.goal_position:
rewards[agent_id] = 100.0 # Goal reached!
elif new_pos in self.obstacles:
rewards[agent_id] = -10.0 # Hit obstacle
else:
rewards[agent_id] = -1.0 # Step penalty
else:
# Invalid move (out of bounds)
rewards[agent_id] = -5.0
self._state["step_count"] += 1
# Episode ends when any agent reaches goal or max steps
done = (any(r == 100.0 for r in rewards.values()) or
self._state["step_count"] >= 100)
return {
"rewards": rewards,
"done": done,
"info": {"step_count": self._state["step_count"]}
}
def observe(self, agent):
"""Return observation for a specific agent."""
agent_pos = self._state["agent_positions"].get(agent.id, [0, 0])
# Create grid observation (local view)
view_range = 3
local_grid = self._get_local_grid(agent_pos, view_range)
return {
"position": agent_pos,
"goal_direction": self._compute_direction(agent_pos, self.goal_position),
"local_grid": local_grid,
"distance_to_goal": self._compute_distance(agent_pos, self.goal_position)
}
def _compute_new_position(self, pos, action):
"""Compute new position based on action."""
moves = [[0, -1], [0, 1], [-1, 0], [1, 0]] # up, down, left, right
move = moves[action]
return [pos[0] + move[0], pos[1] + move[1]]
def _is_valid_position(self, pos):
"""Check if position is within bounds."""
return (0 <= pos[0] < self.width and 0 <= pos[1] < self.height)
def _get_local_grid(self, center, range_size):
"""Get local grid around position."""
grid = np.zeros((range_size * 2 + 1, range_size * 2 + 1))
for i in range(-range_size, range_size + 1):
for j in range(-range_size, range_size + 1):
pos = [center[0] + i, center[1] + j]
if not self._is_valid_position(pos):
grid[i + range_size][j + range_size] = -1 # Out of bounds
elif pos in self.obstacles:
grid[i + range_size][j + range_size] = 1 # Obstacle
elif pos == self.goal_position:
grid[i + range_size][j + range_size] = 2 # Goal
return grid.tolist()
def _compute_direction(self, from_pos, to_pos):
"""Compute direction vector from one position to another."""
return [to_pos[0] - from_pos[0], to_pos[1] - from_pos[1]]
def _compute_distance(self, pos1, pos2):
"""Compute Manhattan distance between positions."""
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
# Use the environment
env = GridWorld(width=10, height=10)
state = env.reset()
print(f"Initial state: {state}")