強化学習の実装例とその解説
強化学習の基礎シリーズで、基礎的な考え方と、それを実装するとどんな感じになるのか、というところを説明しました。 ただ、流れの理解の妨げにならないよう、長くなったり冗長になったりしそうな実装は省いて説明しました。
そこで、この記事では、強化学習の基礎シリーズで説明したことに基づいた解説をできるだけ挟みつつ、コピペしてそのまま動く「完成形」の実装例を示したいと思います。 従って、以下の4記事を読んでいることを前提として説明します。
今回作るゲーム
今回は、8パズルを作ろうと思います。 プログラミングの授業でこれを解くプログラムを作ったりしますね。 今回は、これを強化学習で解くプログラムを作ります。 ルールについては、Wikipediaの15パズルの記事とかを参照してください。
もちろん、本当は強化学習など使う必要もない問題ですが、例ということで許してください。
また、今回作ったプログラムは、Githubに置いてあります。
環境
まずは環境を作ります。 環境とはゲームそのものであるので、すなわち8パズルのゲームを実装するということになります。
以下で、実装の核となる部分を軽く説明してから、実装例を紹介します。 実装に伴う細かい部分や補足情報については、実装例の後に書きます。
観測
今回は、エージェントは環境(ゲーム)の全ての情報を得ることができます。 完全情報ゲームというやつですね。 将棋なんかもこれに当てはまり、割とよくあります。
というわけで、観測で得られる情報は盤面全体の情報になります。 これを、要素数9のリストで表します。 図の場合、
[2, 5, 0, 8, 4, 3, 6, 1, 7]
とします。
従って、observation_space
は
gym.spaces.Box(0, 8, (9, ), np.uint8)
となります。
ただし、紹介した実装では一般化しやすいよう、正方形の1辺のマスの数としてN
を使った実装としています。
2 | 5 | |
8 | 4 | 3 |
6 | 1 | 7 |
行動
行動は、空きマスを上下左右に動かすと考えます。
すなわち、4種類の行動が取れるものとします。
従って、action_space
は
gym.spaces.Discrete(4)
となります。
報酬
報酬は、
- パズルが完成したら1の報酬を得る
- 不可能な操作(空きマスが右下にあるのに、さらに右に動かそうとする場合など)をしようとした場合、-1の報酬を得る
- その他の場合の報酬は0である
とします。
従って、reward_range
は
(-1, 1)
となります。
終了条件
done
は、パズルが完成したときと、不可能な操作をした場合にのみTrue
になるものとします。
ディレクトリ構成
gym-puzzle/
README.md
setup.py
gym_puzzle/
__init__.py
envs/
__init__.py
puzzle_env.py
各ファイルの実装
setup.py
from setuptools import setup setup(name='gym_puzzle', version='1.0.0', install_requires=['gym', 'numpy'] )
gym_puzzle/__init__.py
from gym.envs.registration import register register(id='puzzle-v0', entry_point='gym_puzzle.envs:PuzzleEnv' )
gym_puzzle/envs/__init__.py
from gym_puzzle.envs.puzzle_env import PuzzleEnv
gym_puzzle/envs/puzzle_env.py
import gym import numpy as np from gym import spaces class PuzzleEnv(gym.Env): N = 3 # パズルの1辺の長さ metadata = {'render.modes': ['human', 'ansi']} observation_space = spaces.Box(0, N**2 - 1, (N**2, ), np.uint8) action_space = spaces.Discrete(4) reward_range = (-1, 1) goal_field = np.arange(N**2) # 盤面の完成形 def __init__(self): self.field = None self.empty_idx = None self.already_done = None self.rng = None self.seed() def step(self, action): if self.already_done: return self.field.copy(), 0, True, {} if self._cannot_act(action): return self.field.copy(), -1, True, {} if action == 0: # 空きマスを上に self._swap(self.empty_idx, self.empty_idx - self.N) elif action == 1: # 空きマスを右に self._swap(self.empty_idx, self.empty_idx + 1) elif action == 2: # 空きマスを下に self._swap(self.empty_idx, self.empty_idx + self.N) elif action == 3: # 空きマスを左に self._swap(self.empty_idx, self.empty_idx - 1) else: raise ValueError('未対応のaction') if self._clear(): reward = 1 done = True else: reward = 0 done = False return self.field.copy(), reward, done, {} def reset(self): # fieldをランダムに初期化(解が存在する盤面に必ずなる) self.field = self.goal_field.copy() self.rng.shuffle(self.field) self._set_empty_idx() if not self._solvable(): self._fliplr() self.already_done = False return self.reset() if self._clear() else self.field.copy() # 最初から完成形にならないようにしている def render(self, mode='human'): if mode == 'human': for i in range(self.N): print(self.field[self.N * i : self.N * (i+1)]) elif mode == 'ansi': return ','.join(map(str, self.field)) else: raise ValueError('未対応のmode') def seed(self, seed=None): self.rng, seed = gym.utils.seeding.np_random(seed) return [seed] def _solvable(self): nonzero_field = self.field[self.field.nonzero()] tmp = 0 for i in range(self.N**2 - 1): tmp += len(np.where(nonzero_field[i + 1:] < nonzero_field[i])) tmp += self.empty_idx // self.N return tmp % 2 == 1 def _fliplr(self): # 盤面を左右反転する for i in range(self.N): self.field[self.N * i : self.N * (i+1)] = np.flip(self.field[self.N * i : self.N * (i+1)]).copy() self._set_empty_idx() def _swap(self, idx1, idx2): self.field[idx1], self.field[idx2] = self.field[idx2], self.field[idx1] def _cannot_act(self, action): if action == 0: # 空きマスを上に impossible_idxes = [0, 1, 2] elif action == 1: # 空きマスを右に impossible_idxes = [2, 5, 8] elif action == 2: # 空きマスを下に impossible_idxes = [6, 7, 8] else: # 空きマスを左に impossible_idxes = [0, 3, 6] return self.empty_idx in impossible_idxes def _clear(self): return np.array_equal(self.field, self.goal_field) def _set_empty_idx(self): self.empty_idx = np.where(self.field == 0)[0][0]
reset
やstep
でリストを返すとき、環境内のリストが変更されないよう、コピーして返す。- 8パズルは完成不可能な場合があるので、
reset
時にはそうならないように注意する(こちらのサイトを参考にしました)。 reset
時に、盤面が最初から完成形にならないように注意する。
エージェント
さて、環境の次はエージェントです。 今回は、入力に対して施す演算部分であるニューラルネットワークを、全結合層3層のネットワークとします。
また、これは機械学習ではよくやる話なのですが、行動を直接出力するのではなく、各行動を取る確率を出力とします。
今回は行動の種類が4つあるので、[0.2, 0.4, 0.1, 0.3]
みたいな感じです。
これはsoftmax
関数を使うことでできます。
と言っておいてなんですが、さらに今回は確率をそのまま出力するのではなく、確率にlogを施した値を出力とします。
これもよくやる手法です。
PyTorchには、これ用の関数log_softmax
があるので、これを使います。
import torch import torch.nn as nn import torch.nn.functional as F class Agent(nn.Module): def __init__(self, n, hidden): super().__init__() self.fc1 = nn.Linear(n**2, hidden) self.fc2 = nn.Linear(hidden, hidden) self.fc3 = nn.Linear(hidden, 4) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return F.log_softmax(self.fc3(x), dim=-1)
学習本体
最後に、環境とエージェントを用いて強化学習するコードを書きます。 今回は方策勾配法を用います。 gym環境の作り方とは違い、学習手法についてはいくらでも分かりやすい記事や本があるので、ここでは詳しく触れません。 機会とやる気があれば別の記事で書きます。
import gym import numpy as np import tensorboardX as tbx import torch import torch.optim as opt from tqdm import tqdm import gym_puzzle from agent import Agent # ハイパーパラメータ HIDDEN_NUM = 128 # エージェントの隠れ層のニューロン数 EPISODE_NUM = 10000 # エピソードを何回行うか MAX_STEPS = 1000 # 1エピソード内で最大何回行動するか GAMMA = .99 # 時間割引率 env = gym.make('puzzle-v0') agent = Agent(env.N, HIDDEN_NUM) optimizer = opt.Adam(agent.parameters()) def do_episode(): obs = env.reset() obss, actions, rewards = [], [], [] # 現在の方策で1エピソード行動する agent.eval() with torch.no_grad(): for step in range(1, MAX_STEPS + 1): obs = torch.tensor([obs], dtype=torch.float32) obss.append(obs) prob = agent(obs)[0].exp().numpy() action = np.random.choice(range(4), p=prob) obs, reward, done, _ = env.step(action) actions.append(torch.eye(4, dtype=torch.float32)[action]) # 後の都合でone-hot形式で保存 rewards.append(reward) if done: break # 割引報酬和を求める cum_rewards = [0] for i, r in enumerate(rewards[::-1]): cum_rewards.append(GAMMA*cum_rewards[i] + r) cum_rewards = cum_rewards[:0:-1] # lossを計算して返す agent.train() loss_sum = .0 log_pis = [agent(o)[0] * a for (o, a) in zip(obss, actions)] for log_pi, r in zip(log_pis, cum_rewards): loss_sum = loss_sum - (log_pi * r).sum() return loss_sum / len(obss) if __name__ == '__main__': with tbx.SummaryWriter() as writer: for episode in tqdm(range(1, EPISODE_NUM + 1)): loss = do_episode() # lossを用いて方策を更新する optimizer.zero_grad() loss.backward() optimizer.step() writer.add_scalar('loss/loss', loss.item(), episode) torch.save(agent.state_dict(), 'agent.tar')
学習結果
学習してみましたが、まったく学習が進みませんでした。 なので、実装を書いておいてなんなんですが、学習本体部分は間違っている可能性が結構あります。
考えられる原因は、
- (正の)報酬がまったくもらえず、学習の進みようがなかった
- 方策勾配法の実装が間違っている
- 学習時間不足
- ハイパーパラメータ
くらいですかね。 上から確率70%、25%、3%、2%くらいだと思います。
まとめ
というわけで、間違っているかもしれない実装を紹介しました。 このシリーズで本当に書きたかったのはgym環境の作り方なので、学習部分はおまけということで許してください。 何でもはしません。