I've built a custom environment and a DQN agent that runs in that environment.
Custom environment is a simple maze game.
The inputs (observation) are the maze map, agent's position, and the goal position.
Agents receive rewards every time they get closer to the maze's goal.
It seems like a simple problem, but learning is not progressing. However, the program works fine.
When the agent hits a wall, the state (Observation) does not change. I think this may be the reason why the same output continues to be produced.
Please tell me why learning is not progressing (loss is not decreasing).
Thank you for your cooperation.
Custom environment is the below:
import numpy as np
import gym
from gym import spaces
import random
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts
class MyGameEnv(py_environment.PyEnvironment):
A_L = 0
A_R = 1
A_U = 2
A_D = 3
def __init__(self, grid_size=7):
# fields
self.grid_size = grid_size
self.fieldmap = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)
self.agent_pos = np.array([random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1)], dtype=np.int32) # is same with _observation_spec
self.goal_pos = np.array([random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1)])
self.action = 0
self.reward = 0
self.action_num = 0
# spec
self._action_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.int32, minimum=0, maximum=3, name='action')
self._observation_spec = array_spec.BoundedArraySpec(shape=(self.grid_size, self.grid_size,), dtype=np.int32, minimum=0, maximum=grid_size, name='observation')
self._episode_ended = False
# # update
self.update_fieldmap()
def update_fieldmap(self):
# update action with observation
for i in range(self.grid_size):
for j in range(self.grid_size):
if self.goal_pos[0] == i and self.goal_pos[1] == j:
self.fieldmap[i][j] = 2 # goal
else:
if self.agent_pos[0] == i and self.agent_pos[1] == j:
self.fieldmap[i][j] = 3 # player
else:
self.fieldmap[i][j] = 1 # aisle
return
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _reset(self):
self._episode_ended = False
self.fieldmap = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)
self.agent_pos = np.array([random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1)], dtype=np.int32) # is same with _observation_spec
self.goal_pos = np.array([random.randint(0, self.grid_size-1), random.randint(0, self.grid_size-1)])
self.action = 0
self.reward = 0
self.action_num = 0
self.update_fieldmap()
return ts.restart(self.fieldmap)
def _step(self, action):
if self._episode_ended:
# The last action ended the episode. Ignore the current action and start
# a new episode.
return self.reset()
distance_before_action = np.abs(self.goal_pos[0] - self.agent_pos[0]) # x
distance_before_action += np.abs(self.goal_pos[1] - self.agent_pos[1]) # y
# fetch action
if self.A_L == action:
self.agent_pos[0] -= 1
if self.A_R == action:
self.agent_pos[0] += 1
if self.A_U == action:
self.agent_pos[1] -= 1
if self.A_D == action:
self.agent_pos[1] += 1
# moving limit
self.agent_pos[0] = np.clip(self.agent_pos[0], 0, self.grid_size-1)
self.agent_pos[1] = np.clip(self.agent_pos[1], 0, self.grid_size-1)
# update action with observation
self.update_fieldmap()
# goal and reward
distance_after_action = np.abs(self.goal_pos[0] - self.agent_pos[0]) # x
distance_after_action += np.abs(self.goal_pos[1] - self.agent_pos[1]) # y
if distance_after_action < distance_before_action:
reward = 1
else:
reward = -1
# finished condition #1
if self.agent_pos[0] == self.goal_pos[0] and self.agent_pos[1] == self.goal_pos[1]:
self._episode_ended = True
reward = 2
# finished condition #2
if self.action_num >= 9:
self._episode_ended = True
self.reward = reward = -2
# fetch variables
self.action = action
self.reward = reward
self.action_num += 1
# infomation
info = {}
if self._episode_ended :
return ts.termination(self.fieldmap, reward=reward)
else:
return ts.transition(self.fieldmap, reward=reward, discount=1.0)
return
# draw
def render(self, mode='human', close=False):
if mode != 'human':
raise NotImplementedError()
draw_map = ""
for i in range(self.grid_size):
for j in range(self.grid_size):
if self.fieldmap[i][j] == 1:
draw_map += "X "
if self.fieldmap[i][j] == 2:
draw_map += "G "
if self.fieldmap[i][j] == 3:
draw_map += "P "
draw_map += "\n"
print("Num:", self.action_num, "Action:", self.action, "Goal", self.goal_pos, "Agent:", self.agent_pos, "Reward:", self.reward)
print(draw_map)
DQN Agent is the below:
from __future__ import absolute_import, division, print_function
import base64
import imageio
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import pyvirtualdisplay
import reverb
import tensorflow as tf
from tf_agents.agents.dqn import dqn_agent
from tf_agents.drivers import py_driver
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.networks import sequential
from tf_agents.policies import py_tf_eager_policy
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import reverb_replay_buffer
from tf_agents.replay_buffers import reverb_utils
from tf_agents.trajectories import trajectory
from tf_agents.specs import tensor_spec
from tf_agents.utils import common
from simulator import MyGameEnv
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
# Set up a virtual display for rendering OpenAI gym environments.
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()
num_iterations = 5000 # @param {type:"integer"}
initial_collect_steps = 1000 # @param {type:"integer"}
collect_steps_per_iteration = 1# @param {type:"integer"}
replay_buffer_max_length = 100000 # @param {type:"integer"}
batch_size = 64 # @param {type:"integer"}
learning_rate = 1e-3 # @param {type:"number"}
log_interval = 100 # @param {type:"integer"}
num_eval_episodes = 10 # @param {type:"integer"}
eval_interval = 500 # @param {type:"integer"}
env = MyGameEnv()
env.reset()
env.render()
print(f"{env.time_step_spec().observation= }")
print(f"{env.time_step_spec().reward= }")
print(f"{env.action_spec()= }")
#action = np.array(0, dtype=np.int32)
#next_time_step = env.step(action)
#env.render()
#print(next_time_step)
train_py_env = MyGameEnv()
eval_py_env = MyGameEnv()
train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)
action_tensor_spec = tensor_spec.from_spec(env.action_spec())
num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1
print(f"{action_tensor_spec= }")
print(f"{num_actions= }")
# Define a helper function to create Dense layers configured with the right
# activation and kernel initializer.
#def dense_layer(num_units):
# return tf.keras.layers.Dense(
# num_units,
# activation=tf.keras.activations.relu,
# kernel_initializer=tf.keras.initializers.VarianceScaling(
# scale=2.0, mode='fan_in', distribution='truncated_normal'))
def dense_layer(num_units):
return tf.keras.layers.Dense(
num_units,
activation=tf.keras.activations.relu,
kernel_initializer='he_normal',
use_bias=True,
bias_initializer = 'zeros'
)
# QNetwork consists of a sequence of Dense layers followed by a dense layer
# with `num_actions` units to generate one q_value per available action as
# its output.
fc_layer_params = (100, 100)
dense_layers = [dense_layer(num_units) for num_units in fc_layer_params]
#q_values_layer = tf.keras.layers.Dense(
# num_actions,
# activation=None,
# kernel_initializer=tf.keras.initializers.RandomUniform(minval=-0.03, maxval=0.03),
# bias_initializer=tf.keras.initializers.Constant(0))
Dropout_layer = tf.keras.layers.Dropout(0.5)
q_values_layer = tf.keras.layers.Dense(
num_actions,
activation=None,
kernel_initializer='he_normal',
use_bias=True,
bias_initializer = 'zeros'
)
input_layer = tf.keras.layers.Flatten()
q_net = sequential.Sequential([input_layer] + dense_layers + [Dropout_layer] + [q_values_layer])
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
train_step_counter = tf.Variable(0)
agent = dqn_agent.DqnAgent(
train_env.time_step_spec(),
train_env.action_spec(),
q_network=q_net,
optimizer=optimizer,
td_errors_loss_fn=common.element_wise_squared_loss,
train_step_counter=train_step_counter)
agent.initialize()
eval_policy = agent.policy
collect_policy = agent.collect_policy
random_policy = random_tf_policy.RandomTFPolicy(train_env.time_step_spec(), train_env.action_spec())
example_environment = tf_py_environment.TFPyEnvironment(MyGameEnv())
time_step = example_environment.reset()
random_policy.action(time_step)
#@test {"skip": true}
def compute_avg_return(environment, policy, num_episodes=10):
total_return = 0.0
for i in range(num_episodes):
time_step = environment.reset()
episode_return = 0.0
#print(str(i) + "/" + str(num_episodes))
while not time_step.is_last():
action_step = policy.action(time_step)
time_step = environment.step(action_step.action)
#print(time_step)
episode_return += time_step.reward
total_return += episode_return
avg_return = total_return / num_episodes
return avg_return.numpy()[0]
# See also the metrics module for standard implementations of different metrics.
# https://github.com/tensorflow/agents/tree/master/tf_agents/metrics
result = compute_avg_return(eval_env, random_policy, num_eval_episodes)
print(result)
table_name = 'uniform_table'
replay_buffer_signature = tensor_spec.from_spec(agent.collect_data_spec)
replay_buffer_signature = tensor_spec.add_outer_dim(replay_buffer_signature)
table = reverb.Table(
table_name,
max_size=replay_buffer_max_length,
sampler=reverb.selectors.Uniform(),
remover=reverb.selectors.Fifo(),
rate_limiter=reverb.rate_limiters.MinSize(1),
signature=replay_buffer_signature)
reverb_server = reverb.Server([table])
replay_buffer = reverb_replay_buffer.ReverbReplayBuffer(
agent.collect_data_spec,
table_name=table_name,
sequence_length=2,
local_server=reverb_server)
rb_observer = reverb_utils.ReverbAddTrajectoryObserver(
replay_buffer.py_client,
table_name,
sequence_length=2)
agent.collect_data_spec
agent.collect_data_spec._fields
#@test {"skip": true}
py_driver.PyDriver(
env,
py_tf_eager_policy.PyTFEagerPolicy(random_policy, use_tf_function=True),
[rb_observer],
max_steps=initial_collect_steps).run(train_py_env.reset())
# For the curious:
# Uncomment to peel one of these off and inspect it.
# iter(replay_buffer.as_dataset()).next()
# Dataset generates trajectories with shape [Bx2x...]
dataset = replay_buffer.as_dataset(
num_parallel_calls=3,
sample_batch_size=batch_size,
num_steps=2).prefetch(3)
#print(iterator)
# (Optional) Optimize by wrapping some of the code in a graph using TF function.
agent.train = common.function(agent.train)
# Reset the train step.
agent.train_step_counter.assign(0)
# Evaluate the agent's policy once before training.
avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
returns = [avg_return]
# Reset the environment.
time_step = train_py_env.reset()
# Create a driver to collect experience.
collect_driver = py_driver.PyDriver(
env,
py_tf_eager_policy.PyTFEagerPolicy(agent.collect_policy, use_tf_function=True),
[rb_observer],
max_steps=collect_steps_per_iteration)
print("-----------Start training------------")
iterator = iter(dataset)
for _ in range(num_iterations):
# Collect a few steps and save to the replay buffer.
time_step, _ = collect_driver.run(time_step)
# Sample a batch of data from the buffer and update the agent's network.
experience, unused_info = next(iterator)
train_loss = agent.train(experience).loss
step = agent.train_step_counter.numpy()
if step % log_interval == 0:
print('step = {0}: loss = {1}'.format(step, train_loss))
if step % eval_interval == 0:
avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
print('step = {0}: Average Return = {1}'.format(step, avg_return))
returns.append(avg_return)
def env_render(environment, policy):
test_env = tf_py_environment.TFPyEnvironment(environment)
time_step = test_env.reset()
environment.render()
while not time_step.is_last():
action_step = policy.action(time_step)
time_step = test_env.step(action_step.action)
environment.render()
return
result = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
print(result)
for i in range(5):
print("------------" + "case: " + str(i) + "------------")
env_render(MyGameEnv(), agent.policy)
exit()