Making a Custom algorithm
ここでは本フレームワークの自作アルゴリズムを作成する方法を説明します。
1.概要
- 2.実装するクラス
2-1.Config
2-2.Memory
2-3.Parameter
2-4.Trainer
2-5.Worker
3.自作アルゴリズムの登録
4.型ヒント
5.Q学習実装例
1. 概要

# 学習単位の初期化
worker.on_setup()
trainer.on_setup()
for episode in range(N)
# 1エピソード最初の初期化
env.reset()
worker.on_reset()
# 1エピソードのループ
while not env.done:
# アクションを取得
action = worker.policy()
worker.render() # workerの描画
# 環境の1stepを実行
env.step(action)
worker.on_step()
# 学習
trainer.train()
# 終了後の描画
worker.render()
# 学習単位の終了
worker.on_teardown()
trainer.on_teardown()
Config |
|
Memory |
|
Parameter |
|
Trainer |
|
Worker |
|
分散学習は以下となり各クラスが非同期で動作します。

同期的な学習と以下の点が異なります。
WorkerがMemoryにサンプルを送るタイミングとTrainerが取り出すタイミングが異なる
ParameterがWorkerとTrainerで別インスタンス
各クラスの実装の仕方を見ていきます。
2. 実装する各クラスの説明
2-1. Config
強化学習アルゴリズムの種類やハイパーパラメータを管理するクラスです。 基底クラスは srl.base.rl.base.RLConfig でこれを継承して作成します。
RLConfig で実装が必要な関数・プロパティは以下です。
from dataclasses import dataclass
from srl.base.rl.config import RLConfig
from srl.base.define import RLBaseActTypes, RLBaseObsTypes
from srl.base.rl.processor import Processor
# 必ず dataclass で書いてください
@dataclass
class MyConfig(RLConfig):
# 任意のハイパーパラメータを定義
hoo_param: float = 0
def __post_init__(self):
super().__post_init__() # 親のコンストラクタも呼んでください
def get_name(self) -> str:
""" ユニークな名前を返す """
raise NotImplementedError()
def get_base_action_type(self) -> RLBaseActTypes:
"""
アルゴリズムが想定するアクションのタイプ(srl.base.define.RLBaseActTypes)を返してください。
"""
raise NotImplementedError()
def get_base_observation_type(self) -> RLBaseObsTypes:
"""
アルゴリズムが想定する環境から受け取る状態のタイプ(srl.base.define.RLBaseObsTypes)を返してください。
"""
raise NotImplementedError()
def get_framework(self) -> str:
"""
使うフレームワークを指定してください。
return "" : なし
return "tensorflow" : Tensorflow
return "torch" : Torch
"""
raise NotImplementedError()
# ------------------------------------
# 以下は option です。(なくても問題ありません)
# ------------------------------------
def assert_params(self) -> None:
""" パラメータのassertを記載 """
super().assert_params() # 親クラスも呼び出してください
def setup_from_env(self, env: EnvRun) -> None:
""" env初期化後に呼び出されます。env関係の初期化がある場合は記載してください。 """
pass
def setup_from_actor(self, actor_num: int, actor_id: int) -> None:
""" 分散学習でactorが指定されたときに呼び出されます。Actor関係の初期化がある場合は記載してください。 """
pass
def get_processors(self) -> List[Optional[RLProcessor]]:
""" 前処理を追加したい場合設定してください """
return []
def use_backup_restore(self) -> bool:
""" MCTSなど、envのbackup/restoreを使う場合はTrueを返してください"""
return False
# use_render_image_stateをTrueにするとworker.render_img_stateが有効になります
# これはenv.render_rgb_arrayの画像が入ります
def use_render_image_state(self) -> bool:
return False
def get_render_image_processors(self) -> List[RLProcessor]:
"""render_img_stateに対する前処理"""
return []
2-2. Memory
SequenceMemory |
来たサンプルを順序通りに取り出します。(Queueみたいな動作です) |
ExperienceReplayBuffer |
サンプルをランダムに取り出します。 |
PriorityExperienceReplay |
サンプルを優先順位に従い取り出します。 |
SequenceMemory
順番通りにサンプルを取り出しますMemoryです。サンプルは取り出すとなくなります。
from srl.rl.memories.sequence_memory import SequenceMemory
class MyRemoteMemory(SequenceMemory):
pass
# 実行例
memory = MyRemoteMemory(None)
memory.add([1, 2])
memory.add([2, 3])
memory.add([3, 4])
dat = memory.sample()
print(dat) # [[1, 2], [2, 3], [3, 4]]
ExperienceReplayBuffer
from dataclasses import dataclass
from srl.base.define import RLBaseActTypes, RLBaseObsTypes
from srl.base.rl.config import RLConfig
from srl.rl.memories.experience_replay_buffer import ExperienceReplayBuffer, RLConfigComponentExperienceReplayBuffer
@dataclass
class MyConfig(
RLConfig,
RLConfigComponentExperienceReplayBuffer,
):
# RLConfig に加え、RLConfigComponentExperienceReplayBuffer も継承する
# 順番は RLConfig -> RLConfigComponentExperienceReplayBuffer
def get_name(self) -> str:
return "MyConfig"
def get_base_action_type(self) -> RLBaseActTypes:
return RLBaseActTypes.DISCRETE
def get_base_observation_type(self) -> RLBaseObsTypes:
return RLBaseObsTypes.DISCRETE
def get_framework(self) -> str:
return ""
class MyRemoteMemory(ExperienceReplayBuffer):
pass
# 実行例
memory = MyRemoteMemory(MyConfig())
memory.add(1)
memory.add(2)
memory.add(3)
memory.add(4)
dat = memory.sample(batch_size=2)
print(dat) # [3, 2]
PriorityExperienceReplay
このアルゴリズムはConfigにより切り替えることができます。
クラス名 |
説明 |
---|---|
ReplayMemory |
ExperienceReplayBufferと同じで、ランダムに取得します。(優先順位はありません) |
ProportionalMemory |
サンプルの重要度によって確率が変わります。重要度が高いサンプルほど選ばれる確率が上がります。 |
RankBaseMemory |
サンプルの重要度のランキングによって確率が変わります。重要度が高いサンプルほど選ばれる確率が上がるのはProportionalと同じです。 |
from dataclasses import dataclass
import numpy as np
from srl.base.define import RLBaseActTypes, RLBaseObsTypes
from srl.base.rl.config import RLConfig
from srl.rl.memories.priority_experience_replay import (
PriorityExperienceReplay,
RLConfigComponentPriorityExperienceReplay,
)
@dataclass
class MyConfig(RLConfig, RLConfigComponentPriorityExperienceReplay):
# RLConfig に加え、RLConfigComponentPriorityExperienceReplay も継承する
# 順番は RLConfig -> RLConfigComponentPriorityExperienceReplay
def get_name(self) -> str:
return "MyConfig"
def get_base_action_type(self) -> RLBaseActTypes:
return RLBaseActTypes.DISCRETE
def get_base_observation_type(self) -> RLBaseObsTypes:
return RLBaseObsTypes.DISCRETE
def get_framework(self) -> str:
return ""
class MyRemoteMemory(PriorityExperienceReplay):
pass
# --- select memory
config = MyConfig()
# config.set_replay_memory()
config.set_proportional_memory()
# config.set_rankbase_memory()
# --- run memory
memory = MyRemoteMemory(config)
memory.add(1, priority=1)
memory.add(2, priority=2)
memory.add(3, priority=3)
memory.add(4, priority=4)
batchs, weights, update_args = memory.sample(batch_size=1, step=0)
print(batchs) # [2]
memory.update(update_args, np.array([5, 10, 15, 20, 11]))
2-3. Parameter
実装が必要な関数は以下です。
from srl.base.rl.parameter import RLParameter
import numpy as np
class MyParameter(RLParameter):
def __init__(self, *args):
""" コントラクタの引数は親に渡してください """
super().__init__(*args)
# self.config に上で定義した MyConfig が入ります
self.config: MyConfig
# call_restore/call_backupでパラメータが復元できるように作成
def call_restore(self, data, **kwargs) -> None:
raise NotImplementedError()
def call_backup(self, **kwargs):
raise NotImplementedError()
# その他任意の関数を作成できます。
# 分散学習ではTrainer/Worker間で値を保持できない点に注意(backup/restoreした値のみ共有されます)
2-4. Trainer
実装が必要な関数は以下です。
from srl.base.rl.trainer import RLTrainer
class MyTrainer(RLTrainer):
def __init__(self, *args):
""" コントラクタの引数は親に渡してください """
super().__init__(*args)
# 以下の変数を持ちます。
self.config: MyConfig
self.parameter: MyParameter
self.memory: MyMemory
def train(self) -> None:
"""
self.memory から batch を受け取り学習を定義します。
self.memory は以下の関数が定義されています。
self.memory.sample() : batchを返します
self.memory.update() : ProportionalMemory の場合 update で使います
・学習したら回数を数えてください
self.train_count += 1
・(option)必要に応じてinfoを設定します
self.info = {"loss": 0.0}
"""
raise NotImplementedError()
# --- 実装時に関数内で使う事を想定しているプロパティ・関数となります
trainer = MyTrainer()
trainer.distributed # property, bool : 分散実行中かどうかを返します
trainer.train_only # property, bool : 学習のみかどうかを返します
Worker
フローをすごく簡単に書くと以下です。
env.reset()
worker.on_reset()
while:
action = worker.policy()
env.step(action)
worker.on_step()
trainer.train()
from srl.base.rl.worker import RLWorker
from srl.base.rl.worker_run import WorkerRun
class MyWorker(RLWorker):
def __init__(self, *args):
""" コントラクタの引数は親に渡してください """
super().__init__(*args)
# 以下の変数が設定されます
self.config: MyConfig
self.parameter: MyParameter
self.memory: IRLMemoryWorker
def on_reset(self, worker: WorkerRun):
""" エピソードの最初に呼ばれる関数 """
raise NotImplementedError()
def policy(self, worker: WorkerRun) -> RLActionType:
""" このターンで実行するアクションを返す関数、この関数のみ実装が必須になります
Returns:
RLActionType : 実行するアクション
"""
raise NotImplementedError()
def on_step(self, worker: WorkerRun):
""" Envが1step実行した後に呼ばれる関数 """
raise NotImplementedError()
def render_terminal(self, worker, **kwargs) -> None:
"""
描画用の関数です。
実装するとrenderによる描画が可能になります。
"""
pass
def render_rgb_array(self, worker, **kwargs) -> Optional[np.ndarray]:
"""
描画用の関数です。
実装するとrenderによる描画が可能になります。
"""
return None
# --- 実装時に関数内で使う事を想定しているプロパティ・関数となります
worker = MyWorker()
worker.training # property, bool : training かどうかを返します
worker.distributed # property, bool : 分散実行中かどうかを返します
worker.rendering # property, bool : renderがあるエピソードかどうかを返します
worker.observation_space # property , SpaceBase : RLWorkerが受け取るobservation_spaceを返します
worker.action_space # property , SpaceBase : RLWorkerが受け取るaction_spaceを返します
worker.get_invalid_actions() # function , List[RLAction] : 有効でないアクションを返します(離散限定)
worker.sample_action() # function , RLAction : ランダムなアクションを返します
また、情報は WorkerRun から基本取り出して使います。 情報の例は以下です。
class MyWorker(RLWorker):
def on_reset(self, worker):
worker.state # 初期状態
worker.player_index # 初期プレイヤーのindex
worker.invalid_action # 初期有効ではないアクションリスト
def policy(self, worker) :
worker.state # 状態
worker.player_index # プレイヤーのindex
worker.invalid_action # 有効ではないアクションリスト
def on_step(self, worker: "WorkerRun") -> dict:
worker.prev_state # step前の状態(policyのworker.stateと等価)
worker.state # step後の状態
worker.prev_action # step前の前のアクション
worker.action # step前のアクション(policyで返したアクションと等価)
worker.reward # step後の即時報酬
worker.done # step後に終了フラグが立ったか
worker.terminated # step後にenvが終了フラグを立てたか
worker.player_index # 次のプレイヤーのindex
worker.prev_invalid_action # step前の有効ではないアクションリスト
worker.invalid_action # step後の有効ではないアクションリスト
3. 自作アルゴリズムの登録
以下で登録します。 第2引数以降の entry_point は、モジュールパス + ":" + クラス名`で、 モジュールパスは `importlib.import_module で呼び出せる形式である必要があります。
from srl.base.rl.registration import register
register(
MyConfig(),
__name__ + ":MyMemory",
__name__ + ":MyParameter",
__name__ + ":MyTrainer",
__name__ + ":MyWorker",
)
4. 型ヒント
動作に影響ないですが、可能な限り型ヒントが表示されるようにしています。(開発中の機能です) RLConfigとRLWorkerはimport先を変えることで型アノテーションが指定された状態になります。
# from srl.rl.config import RLConfig
# from srl.rl.worker import RLWorker
# ↓
from srl.base.rl.algorithms.base_dqn import RLConfig, RLWorker
# srl.base.rl.algorithms.base_XX の XX の部分を変更する事でアルゴリズムに合った型に変更できます
# XX の種類についてはソースコードを見てください。(開発中につき資料作成は後回し)
また、ジェネリック型を追加する事で各クラスの型を追加できます。
# RLParameter[TConfig]
# TConfig : RLConfigを継承したクラス
class Parameter(RLParameter[Config]):
pass
# RLTrainer[TConfig, TParameter, TMemory]
# TConfig : RLConfigを継承したクラス
# TParameter : RLParameterを継承したクラス
# TMemory : RLMemoryを継承したクラス
class Trainer(RLTrainer[Config, Parameter, Memory]):
pass
# RLWorker[TConfig, TParameter]
# TConfig : RLConfigを継承したクラス
# TParameter : RLParameterを継承したクラス
class Worker(RLWorker[Config, Parameter]):
pass
5. 実装例(Q学習)
import json
import random
from dataclasses import dataclass
import numpy as np
from srl.base.define import RLBaseActTypes, RLBaseObsTypes
from srl.base.rl.config import RLConfig
from srl.base.rl.parameter import RLParameter
from srl.base.rl.registration import register
from srl.base.rl.trainer import RLTrainer
from srl.base.rl.worker import RLWorker
from srl.base.spaces.array_discrete import ArrayDiscreteSpace
from srl.base.spaces.discrete import DiscreteSpace
from srl.rl.memories.sequence_memory import SequenceMemory
@dataclass
class Config(RLConfig[DiscreteSpace, ArrayDiscreteSpace]):
epsilon: float = 0.1
test_epsilon: float = 0
gamma: float = 0.9
lr: float = 0.1
def get_base_action_type(self) -> RLBaseActTypes:
return RLBaseActTypes.DISCRETE
def get_base_observation_type(self) -> RLBaseObsTypes:
return RLBaseObsTypes.DISCRETE
def get_framework(self) -> str:
return ""
def get_name(self) -> str:
return "MyRL"
class Memory(SequenceMemory):
pass
class Parameter(RLParameter[Config]):
def __init__(self, *args):
super().__init__(*args)
self.Q = {} # Q学習用のテーブル
def call_restore(self, data, **kwargs) -> None:
self.Q = json.loads(data)
def call_backup(self, **kwargs):
return json.dumps(self.Q)
# Q値を取得する関数
def get_action_values(self, state: str):
if state not in self.Q:
self.Q[state] = [0] * self.config.action_space.n
return self.Q[state]
class Trainer(RLTrainer[Config, Parameter, Memory]):
def __init__(self, *args):
super().__init__(*args)
def train(self) -> None:
if self.memory.is_warmup_needed():
return
batchs = self.memory.sample()
td_error = 0
for batch in batchs:
# 各batch毎にQテーブルを更新する
s = batch["state"]
n_s = batch["next_state"]
action = batch["action"]
reward = batch["reward"]
done = batch["done"]
q = self.parameter.get_action_values(s)
n_q = self.parameter.get_action_values(n_s)
if done:
target_q = reward
else:
target_q = reward + self.config.gamma * max(n_q)
td_error = target_q - q[action]
q[action] += self.config.lr * td_error
td_error += td_error
self.train_count += 1 # 学習回数
if len(batchs) > 0:
td_error /= len(batchs)
# 学習結果(任意)
self.info["Q"] = len(self.parameter.Q)
self.info["td_error"] = td_error
class Worker(RLWorker[Config, Parameter]):
def __init__(self, *args):
super().__init__(*args)
def on_reset(self, worker):
pass
def policy(self, worker) -> int:
self.state = self.config.observation_space.to_str(worker.state)
# 学習中かどうかで探索率を変える
if self.training:
epsilon = self.config.epsilon
else:
epsilon = self.config.test_epsilon
if random.random() < epsilon:
# epsilonより低いならランダムに移動
self.action = self.sample_action()
else:
q = self.parameter.get_action_values(self.state)
q = np.asarray(q)
# 最大値を選ぶ(複数あればランダム)
self.action = np.random.choice(np.where(q == q.max())[0])
return int(self.action)
def on_step(self, worker):
if not self.training:
return
batch = {
"state": self.state,
"next_state": self.config.observation_space.to_str(worker.state),
"action": self.action,
"reward": worker.reward,
"done": worker.terminated,
}
self.memory.add(batch) # memoryはaddのみ
# 強化学習の可視化用、今回ですとQテーブルを表示しています。
def render_terminal(self, worker, **kwargs):
q = self.parameter.get_action_values(self.state)
maxa = np.argmax(q)
for a in range(self.config.action_space.n):
if a == maxa:
s = "*"
else:
s = " "
s += f"{worker.env.action_to_str(a)}: {q[a]:7.5f}"
print(s)
# ---------------------------------
# 登録
# ---------------------------------
register(
Config(),
__name__ + ":Memory",
__name__ + ":Parameter",
__name__ + ":Trainer",
__name__ + ":Worker",
)
# ---------------------------------
# テスト
# ---------------------------------
from srl.test.rl import TestRL
tester = TestRL()
tester.test(Config())
# ---------------------------------
# Grid環境の学習
# ---------------------------------
import srl
runner = srl.Runner(srl.EnvConfig("Grid"), Config(lr=0.001))
# --- train
runner.train(timeout=10)
# --- test
rewards = runner.evaluate(max_episodes=100)
print("100エピソードの平均結果", np.mean(rewards))
runner.render_terminal()
runner.animation_save_gif("_MyRL-Grid.gif", render_scale=2)
renderの表示例
### 0, action None, rewards[0.000] (0.0s)
env {}
work0 None
......
. G.
. . X.
.P .
......
←: 0.17756
↓: 0.16355
→: 0.11174
*↑: 0.37473
### 1, action 3(↑), rewards[-0.040] (0.0s)
env {}
work0 {}
......
. G.
.P. X.
. .
......
←: 0.27779
↓: 0.20577
→: 0.27886
*↑: 0.49146
### 2, action 3(↑), rewards[-0.040] (0.0s)
env {}
work0 {}
......
.P G.
. . X.
. .
......
←: 0.34255
↓: 0.29609
*→: 0.61361
↑: 0.34684
### 3, action 2(→), rewards[-0.040] (0.0s)
env {}
work0 {}
......
. G.
.P. X.
. .
......
←: 0.27779
↓: 0.20577
→: 0.27886
*↑: 0.49146
### 4, action 3(↑), rewards[-0.040] (0.0s)
env {}
work0 {}
......
.P G.
. . X.
. .
......
←: 0.34255
↓: 0.29609
*→: 0.61361
↑: 0.34684
### 5, action 2(→), rewards[-0.040] (0.0s)
env {}
work0 {}
......
. P G.
. . X.
. .
......
←: 0.37910
↓: 0.44334
*→: 0.76733
↑: 0.46368
### 6, action 2(→), rewards[-0.040] (0.0s)
env {}
work0 {}
......
. PG.
. . X.
. .
......
←: 0.47941
↓: 0.39324
*→: 0.92425
↑: 0.59087
### 7, action 2(→), rewards[1.000], done(env) (0.0s)
env {}
work0 {}
......
. P.
. . X.
. .
......
[0.760000005364418]
