Reinforcement Learning with PyTorch

Posted on Tue 19 March 2024 in Python • 3 min read

In our final exploration into machine learning with PyTorch, we're going to do something critical for lifeforms in our world, learn to walk!

This post took many trials and errors, a form of reinforcement learning I completed unsupervised as a human. The resulting code below was what ended up working on a M1 (M2) macbook pro. As many other researchers have implemented much better training algorithms that I could develop on my own, we'll make use the of the work from OpenAI, MuJoCo (multi joint control) and Stable Baselines3. If you're interested in how it may be implemented, there's a separate notebook using PyTorch to implement a Deep Q Learning agent to teach our model to walk at this blogs repository.

Walking agent

As this work is a very dependant on the environment set up, this was achieved using miniconda3, creating an environment with Python 3.11.3 and installing the following dependencies.

In [ ]:
# # https://github.com/DLR-RM/stable-baselines3/pull/780
!pip install gymnasium
!pip install 'gymnasium[mujoco]'
!pip install matplotlib
!pip3 install torch torchvision torchaudio
!pip install "sb3_contrib>=2.0.0a1" --upgrade
!pip install moviepy

Next we are going to import the necessary libraries, in which we'll use Stable Baselines3 to implement the Proximal Policy Optimization algorithm, where in a reward based return in an environment, the agent will optimize it's choices (how to move it's limbs) to receive the highest reward, The reward function in MuJoCo is set up to be a combination of multiple factors resulting in reward = healthy_reward + forward_reward - ctrl_cost. Healthy reward is where the model's 'torso' isn't touching the ground, forward reward is how far the model has moved forward, and control cost being put in place as to lessen the reward when the model tries to 'overwork' the joints it has.

In [ ]:
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy
from stable_baselines3.common.callbacks import BaseCallback
import gymnasium as gym
from stable_baselines3 import PPO
import os
import numpy as np
import time

Here we create the environment we wish to train the model in, wherein we make use of the precreated 'Ant-v4' environment.

In [ ]:
log_dir = "./tmp/gym/"
timestr = time.strftime("%Y%m%d-%H%M%S")
os.makedirs(log_dir, exist_ok=True)
env_name = "Ant-v4"
env = gym.make(env_name, render_mode="human")

To check on the progress of our model, we will create a monitor which will log out how our model's maximum reward is going through it's training, and also save the network weights and biases to be used later on.

In [ ]:
class SaveOnBestTrainingRewardCallback(BaseCallback):
    """
    Callback for saving a model (the check is done every ``check_freq`` steps)
    based on the training reward (in practice, we recommend using ``EvalCallback``).

    :param check_freq: (int)
    :param log_dir: (str) Path to the folder where the model will be saved.
      It must contains the file created by the ``Monitor`` wrapper.
    :param verbose: (int)
    """

    def __init__(self, env_name: str, check_freq: int, log_dir: str, verbose=1):
        super().__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_path = os.path.join(log_dir, f"{timestr}_{env_name}")
        self.best_mean_reward = -np.inf

    def _init_callback(self) -> None:
        # Create folder if needed
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:
            # Retrieve training reward
            x, y = ts2xy(load_results(self.log_dir), "timesteps")
            if len(x) > 0:
                # Mean training reward over the last 100 episodes
                mean_reward = np.mean(y[-100:])
                if self.verbose > 0:
                    print(f"Num timesteps: {self.num_timesteps}")
                    print(
                        f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}"
                    )

                # New best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward
                    # Example for saving best model
                    if self.verbose > 0:
                        print(f"Saving new best model to {self.save_path}.zip")
                    self.model.save(self.save_path)

        return True


env = Monitor(env, log_dir)

This is arguably the most important step here, where we will lean on other researchers work to find optimized hyperparameters. Hyperparameters are used for how each agent is created and evaluated into the PPO algorithm. This was likely completed by a tool such as Optuna, where a model is used to evaluate how well each hyperparameter performs for training the model.

This took ~2 hours to train the model to result in the video at the top of this blog post.

In [ ]:
# Create the callback: check every 1000 steps
callback = SaveOnBestTrainingRewardCallback(
    env_name=env_name, check_freq=1000, log_dir=log_dir
)

# https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/ppo.yml
model = PPO(
    "MlpPolicy",
    env,
    batch_size=32,
    n_steps=512,
    gamma=0.98,
    learning_rate=1.90609e-05,
    ent_coef=4.9646e-07,
    clip_range=0.1,
    n_epochs=10,
    gae_lambda=0.8,
    max_grad_norm=0.6,
    vf_coef=0.677239,
)
model.learn(total_timesteps=1e7, callback=callback)

Finally, we load the created model and visualise it, creating the video you see at the top of this blog post.

In [ ]:
env = gym.wrappers.RecordVideo(gym.make(env_name, render_mode="rgb_array"), log_dir)
model = PPO.load("./tmp/gym/20230417-224635_Ant-v4.zip", env=env)

vec_env = model.get_env()
obs = vec_env.reset()

for i in range(1000):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)
    vec_env.render()