{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# PointMaze D4RL dataset\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this tutorial you will learn how to re-create the Maze2D datasets from [D4RL](https://sites.google.com/view/d4rl/home) [1] with Minari.\nWe will be using the refactored version of the PointMaze environments in [Gymnasium-Robotics](https://robotics.farama.org/envs/maze/point_maze/) which support the Gymnasium API as well as the latest\nMuJoCo python bindings.\n\nLets start by breaking down the steps to generate these datasets:\n 1. First we need to create a planner that outputs a trajectory of waypoints that the agent can follow to reach the goal from its initial location in the maze. We will be using\n [Q-Value Iteration](https://towardsdatascience.com/fundamental-iterative-methods-of-reinforcement-learning-df8ff078652a) [2] to solve the discrete grid maze, same as in D4RL.\n 2. Then we also need to generate the actions so that the agent can follow the waypoints of the trajectory. For this purpose D4RL implements a PD controller.\n 3. Finally, to create the Minari dataset, we will wrap the environment with a :class:`minari.DataCollector` and step through it by generating actions with the path planner and waypoint controller.\n\nFor this tutorial we will be using the ``pointmaze-medium-v3`` environment to collect transition data. However, any map implementation in the PointMaze environment group can be used.\nAnother important factor to take into account is that the environment is continuing, which means that it won't be ``terminated`` when reaching a goal. Instead a new goal target will be randomly selected and the agent\nwill start from the location it's currently at (no ``env.reset()`` required).\n\nLets start by importing the required modules for this tutorial:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import gymnasium as gym\nimport numpy as np\n\nfrom minari import DataCollector, StepDataCallback"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## WayPoint Planner\nOur first task is to create a method that generates a trajectory to the goal in the maze.\nWe have the advantage that the MuJoCo maze can be discretized into a grid of cells, which reduces\nthe size of the state space. The action space for this solver will also be reduced to ``UP``, ``DOWN``, ``LEFT``,\nand ``RIGHT``. The solution trajectories will then be a set of waypoints that the agent has to follow to reach the\ngoal.\nWe can simply use a variation of Dynamic Programming to generate the trajectories. The method chosen in the D4RL[1] publication\nis that of Value Iteration, specifically Q-Value Iteration[2]. We will obtain the optimal Q-values by doing a series of Bellman\nupdates (``50`` in total) of the form:\n\n\\begin{align}Q'(s, a) \\leftarrow \\sum_{s'}T(s,a,s')[R(s,a,s') + \\gamma\\max_{a'}Q(s',a')]\\end{align}\n\n**T(s,a,s')** is the transition matrix which gives the probability of reaching state **s'** when taking action **a** from state **s**.\nWe consider the grid maze a deterministic space which means that if **s'** is an empty cell **T(s,a,s')** will have a value of ``1`` since\nwe know that the agent will always reach that state. On the other hand, if the state **s'** is a wall the value of **T(s,a,s')** will be ``0``.\n\nOnce we have the optimal Q-values (**Q***) we can generate a waypoint trajectory with the following policy:\n\n\\begin{align}\\pi(s) = arg\\max_{a}Q^{*}(s,a)\\end{align}\n\nThe class below, ``QIteration``, gives access to the method ``generate_path(current_cell, goal_cell)``. This method returns a dictionary of waypoints\nsuch as:\n\n.. code:: py\n\n {(5, 1): (4, 1), (4, 1): (4, 2), (4, 2): (3, 2), (3, 2): (2, 2), (2, 2): (2, 1), (2, 1): (1, 1)}\n\nThe keys of this dictionary are the current state of the agent and the values the next state of the wapoint path.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"UP = 0\nDOWN = 1\nLEFT = 2\nRIGHT = 3\n\nEXPLORATION_ACTIONS = {UP: (0, 1), DOWN: (0, -1), LEFT: (-1, 0), RIGHT: (1, 0)}\n\n\nclass QIteration:\n \"\"\"Solves for optimal policy with Q-Value Iteration.\n\n Inspired by https://github.com/Farama-Foundation/D4RL/blob/master/d4rl/pointmaze/q_iteration.py\n \"\"\"\n\n def __init__(self, maze):\n self.maze = maze\n self.num_states = maze.map_length * maze.map_width\n self.num_actions = len(EXPLORATION_ACTIONS.keys())\n self.rew_matrix = np.zeros((self.num_states, self.num_actions))\n self.compute_transition_matrix()\n\n def generate_path(self, current_cell, goal_cell):\n self.compute_reward_matrix(goal_cell)\n q_values = self.get_q_values()\n current_state = self.cell_to_state(current_cell)\n waypoints = {}\n while True:\n action_id = np.argmax(q_values[current_state])\n next_state, _ = self.get_next_state(\n current_state, EXPLORATION_ACTIONS[action_id]\n )\n current_cell = self.state_to_cell(current_state)\n waypoints[current_cell] = self.state_to_cell(next_state)\n if waypoints[current_cell] == goal_cell:\n break\n\n current_state = next_state\n\n return waypoints\n\n def reward_function(self, desired_cell, current_cell):\n if desired_cell == current_cell:\n return 1.0\n else:\n return 0.0\n\n def state_to_cell(self, state):\n i = int(state / self.maze.map_width)\n j = state % self.maze.map_width\n return (i, j)\n\n def cell_to_state(self, cell):\n return cell[0] * self.maze.map_width + cell[1]\n\n def get_q_values(self, num_itrs=50, discount=0.99):\n q_fn = np.zeros((self.num_states, self.num_actions))\n for _ in range(num_itrs):\n v_fn = np.max(q_fn, axis=1)\n q_fn = self.rew_matrix + discount * self.transition_matrix.dot(v_fn)\n return q_fn\n\n def compute_reward_matrix(self, goal_cell):\n for state in range(self.num_states):\n for action in range(self.num_actions):\n next_state, _ = self.get_next_state(state, EXPLORATION_ACTIONS[action])\n next_cell = self.state_to_cell(next_state)\n self.rew_matrix[state, action] = self.reward_function(\n goal_cell, next_cell\n )\n\n def compute_transition_matrix(self):\n \"\"\"Constructs this environment's transition matrix.\n Returns:\n A dS x dA x dS array where the entry transition_matrix[s, a, ns]\n corresponds to the probability of transitioning into state ns after taking\n action a from state s.\n \"\"\"\n self.transition_matrix = np.zeros(\n (self.num_states, self.num_actions, self.num_states)\n )\n for state in range(self.num_states):\n for action_idx, action in EXPLORATION_ACTIONS.items():\n next_state, valid = self.get_next_state(state, action)\n if valid:\n self.transition_matrix[state, action_idx, next_state] = 1\n\n def get_next_state(self, state, action):\n cell = self.state_to_cell(state)\n\n next_cell = tuple(map(lambda i, j: int(i + j), cell, action))\n next_state = self.cell_to_state(next_cell)\n\n return next_state, self._check_valid_cell(next_cell)\n\n def _check_valid_cell(self, cell):\n # Out of map bounds\n if cell[0] >= self.maze.map_length:\n return False\n elif cell[1] >= self.maze.map_width:\n return False\n # Wall collision\n elif self.maze.maze_map[cell[0]][cell[1]] == 1:\n return False\n else:\n return True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Waypoint Controller\nThe step will be to create a controller to allow the agent to follow the waypoint trajectory.\nD4RL uses a PD controller to output continuous force actions from position and velocity.\nA PD controller is a variation of the PID controller often used in classical Control Theory.\nPID combines three components: a Proportial Term(P), Integral Term(I) and Derivative Term (D)\n\n### 1. Proportional Term (P)\nThe proportional term in a PID controller adjusts the control action based on the current error, which\nis the difference between the desired value (setpoint) and the current value of the process variable.\nThe control action is directly proportional to the error. A higher error results in a stronger control action.\nHowever, the proportional term alone can lead to overshooting or instability. Note $\\tau$ is our control value.\n\n.. math ::\n \\tau = k_{p}(\\text{Error})\n\n### 2. Derivative Term (D)\nThe derivative term in a PD controller considers the rate of change of the error over time.\nIt helps to predict the future behavior of the error. By dampening the control action based\non the rate of change of the error, the derivative term contributes to system stability and reduces overshooting.\nIt also helps the system respond quickly to changes in the error.\n\n.. math ::\n \\tau = k_{d}(d(\\text{Error}) / dt)\n\nSo for a PD controller we have the equation below. We explain what the values $k_{d}$ and $k_{p}$ mean in a bit\n\n.. math ::\n \\tau = k_{p}(\\text{Error}) + k_{d}(d(\\text{Error}) / dt)\n\n### 3. Integral Term (I)\nThe integral term in a PID controller integrates the cumulative error over time.\nIt helps to address steady-state errors or biases that may exist in the system.\nThe integral term continuously adjusts the control action based on the accumulated error,\naiming to eliminate any long-term deviations between the desired setpoint and the actual process variable.\n\n.. math ::\n \\tau = k_{I}{\\int}_0^t(\\text{Error}) dt\n\nFinally for a PID controller we have the equation below\n\n.. math ::\n \\tau = k_{p}(\\text{Error}) + k_{d}(d(\\text{Error}) / dt) + k_{I}\\int_{0}^{t}(\\text{Error}) dt\n\nIn the PID controller formula, $K_p$, $K_i$, and $K_d$ are the respective gains for the proportional, integral, and derivative terms.\nThese gains determine the influence of each term on the control action.\nThe optimal values for these gains are typically determined through tuning, which involves adjusting\nthe gains to achieve the desired control performance.\n\nNow back to our controller as stated previously, for the D4RL task we use a PD controller and we\nfollow the same theme as what we have stated before as can be seen below. The $Error$ is equlivalent\nto the difference between the $\\text{goal}_\\text{pose}$ and $\\text{agent}_\\text{pose}$ and we replace the derivative term $(d(\\text{Error}) / dt)$ with\nthe velocity of the the agent $v_{\\text{agent}}$, we can think of this as a measure of the speed at which the agent\nis approaching the target position. When the agent is moving quickly towards the target, the\nderivative term will be larger, contributing to a stronger corrective action from the controller.\nOn the other hand, if the agent is already close to the target and moving slowly, the derivative term will be smaller,\nresulting in a less aggressive control action.\n\n.. math ::\n \\tau = k_{p}(p_{\\text{goal}} - p_{\\text{agent}}) + k_{d}v_{\\text{agent}}\n\nEach target position in the waypoint trajectory is converted from discrete to a continuous value and we also add some noise to\nthe $x$ and $y$ coordinates to add more variance in the trajectories generated for the offline dataset.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class WaypointController:\n \"\"\"Agent controller to follow waypoints in the maze.\n\n Inspired by https://github.com/Farama-Foundation/D4RL/blob/master/d4rl/pointmaze/waypoint_controller.py\n \"\"\"\n\n def __init__(self, maze, gains={\"p\": 10.0, \"d\": -1.0}, waypoint_threshold=0.1):\n self.global_target_xy = np.empty(2)\n self.maze = maze\n\n self.maze_solver = QIteration(maze=self.maze)\n\n self.gains = gains\n self.waypoint_threshold = waypoint_threshold\n self.waypoint_targets = None\n\n def compute_action(self, obs):\n # Check if we need to generate new waypoint path due to change in global target\n if (\n np.linalg.norm(self.global_target_xy - obs[\"desired_goal\"]) > 1e-3\n or self.waypoint_targets is None\n ):\n # Convert xy to cell id\n achieved_goal_cell = tuple(\n self.maze.cell_xy_to_rowcol(obs[\"achieved_goal\"])\n )\n self.global_target_id = tuple(\n self.maze.cell_xy_to_rowcol(obs[\"desired_goal\"])\n )\n self.global_target_xy = obs[\"desired_goal\"]\n\n self.waypoint_targets = self.maze_solver.generate_path(\n achieved_goal_cell, self.global_target_id\n )\n\n # Check if the waypoint dictionary is empty\n # If empty then the ball is already in the target cell location\n if self.waypoint_targets:\n self.current_control_target_id = self.waypoint_targets[\n achieved_goal_cell\n ]\n self.current_control_target_xy = self.maze.cell_rowcol_to_xy(\n np.array(self.current_control_target_id)\n )\n else:\n self.waypoint_targets[\n self.current_control_target_id\n ] = self.current_control_target_id\n self.current_control_target_id = self.global_target_id\n self.current_control_target_xy = self.global_target_xy\n\n # Check if we need to go to the next waypoint\n dist = np.linalg.norm(self.current_control_target_xy - obs[\"achieved_goal\"])\n if (\n dist <= self.waypoint_threshold\n and self.current_control_target_id != self.global_target_id\n ):\n self.current_control_target_id = self.waypoint_targets[\n self.current_control_target_id\n ]\n # If target is global goal go directly to goal position\n if self.current_control_target_id == self.global_target_id:\n self.current_control_target_xy = self.global_target_xy\n else:\n self.current_control_target_xy = (\n self.maze.cell_rowcol_to_xy(\n np.array(self.current_control_target_id)\n )\n - np.random.uniform(size=(2,)) * 0.2\n )\n\n action = (\n self.gains[\"p\"] * (self.current_control_target_xy - obs[\"achieved_goal\"])\n + self.gains[\"d\"] * obs[\"observation\"][2:]\n )\n action = np.clip(action, -1, 1)\n\n return action"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Modified StepDataCallback\n\nWe will also need to create our own custom callback function to record the data after each step.\nAs previously mentioned, the task is continuing and the environment won't be ``terminated`` or ``truncated`` when reaching a goal.\nInstead a new target will be randomly selected in the map and the agent will continue stepping to this new goal. For this reason, to divide the dataset into episodes,\nwe will have to truncate the dataset ourselves when a new goal is reached. This can be done by overriding the ``'truncations'`` key in the step data return when the\nagent returns ``success=True`` in the ``'infos'`` item.\n\nIn the :class:`minari.StepDataCallback` we can add new keys to infos that we would also want to save in our Minari dataset. For example in this\ncase we will be generating new hdf5 datasets ``qpos``, ``qvel``, and ``goal`` in the ``infos`` subgroup of each episode group.\n\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class PointMazeStepDataCallback(StepDataCallback):\n \"\"\"Add environment state information to 'infos'.\n\n Also, since the environment generates a new target every time it reaches a goal, the environment is\n never terminated or truncated. This callback overrides the truncation value to True when the step\n returns a True 'succes' key in 'infos'. This way we can divide the Minari dataset into different trajectories.\n \"\"\"\n\n def __call__(\n self, env, obs, info, action=None, rew=None, terminated=None, truncated=None\n ):\n qpos = obs[\"observation\"][:2]\n qvel = obs[\"observation\"][2:]\n goal = obs[\"desired_goal\"]\n\n step_data = super().__call__(env, obs, info, action, rew, terminated, truncated)\n\n if step_data[\"infos\"][\"success\"]:\n step_data[\"truncations\"] = True\n step_data[\"infos\"][\"qpos\"] = qpos\n step_data[\"infos\"][\"qvel\"] = qvel\n step_data[\"infos\"][\"goal\"] = goal\n\n return step_data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Collect Data and Create Minari Dataset\nNow we will finally perform our data collection and create the Minari dataset. This is as simple as wrapping the environment with\nthe :class:`minari.DataCollector` wrapper and add the custom callback methods. Once we've done this we can step the environment with the ``WayPointController``\nas our policy. For the tutorial, we collect 10,000 transitions. Thus, we initialize the environment with ``max_episode_steps=10,000`` since that's the total amount of steps we want to\ncollect for our dataset and we don't want the environment to get ``truncated`` during the data collection due to a time limit.\n\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dataset_name = \"pointmaze-umaze-v0\"\ntotal_steps = 10_000\n\n# continuing task => the episode doesn't terminate or truncate when reaching a goal\n# it will generate a new target. For this reason we set the maximum episode steps to\n# the desired size of our Minari dataset (evade truncation due to time limit)\nenv = gym.make(\"PointMaze_Medium-v3\", continuing_task=True, max_episode_steps=total_steps)\n\n# Data collector wrapper to save temporary data while stepping. Characteristics:\n# * Custom StepDataCallback to add extra state information to 'infos' and divide dataset in different episodes by overridng\n# truncation value to True when target is reached\n# * Record the 'info' value of every step\ncollector_env = DataCollector(\n env, step_data_callback=PointMazeStepDataCallback, record_infos=True\n)\n\nobs, _ = collector_env.reset(seed=123)\n\nwaypoint_controller = WaypointController(maze=env.maze)\n\nfor n_step in range(int(total_steps)):\n action = waypoint_controller.compute_action(obs)\n # Add some noise to each step action\n action += np.random.randn(*action.shape) * 0.5\n action = np.clip(\n action, env.action_space.low, env.action_space.high, dtype=np.float32\n )\n\n obs, rew, terminated, truncated, info = collector_env.step(action)\n\ndataset = collector_env.create_dataset(\n dataset_id=dataset_name,\n algorithm_name=\"QIteration\",\n code_permalink=\"https://github.com/Farama-Foundation/Minari/blob/main/docs/tutorials/dataset_creation/point_maze_dataset.py\",\n author=\"Rodrigo Perez-Vicente\",\n author_email=\"rperezvicente@farama.org\",\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n\n[1] Fu, Justin, et al. \u2018D4RL: Datasets for Deep Data-Driven Reinforcement Learning\u2019.\nCoRR, vol. abs/2004.07219, 2020, https://arxiv.org/abs/2004.07219..\n\n[2] Lambert, Nathan. \u2018Fundamental Iterative Methods of Reinforcement Learnin\u2019.\nApr 8, 2020, https://towardsdatascience.com/fundamental-iterative-methods-of-reinforcement-learning-df8ff078652a\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 0
}