i am trying to learn an agent to navigate to a target in my custom environment. The agent is learning with a neural net (2 hidden Dense layer, one dropout and one output layer of dimension 4). As input nodes the agent uses a sensor which measures the distance to the surrounding obstacles and also the angle and distance to the target if it is in view. (I normalized all input to be in range [0,1]). As output we get the mean and std of the next action (angle and distance the robot will move). The action taken is than sampled by a gaussion policy from a normal distribution. I used the gaussian policy because this was the only possible way i saw to define a log likelihood which i than want to use during training. In training the agent walks a trajectory until it hits an obstacle or wall and calculates the discounted future reward after each step afterwards. I now want to use policy gradient ascent to train the agent to greater reward. Youz can find the idea also here: https://spinningup.openai.com/en/latest/spinningup/rl_intro.html I will post my python code approach below but it somehow is not working. Maybe i missed something. I am really desperate and would be so thankful for any hint. (The crucial part is the method train() in class Agent)
class Maze:
def __init__(self, length, height, target, obstacles=[]):
self.length =length
self.height = height
self.obstacles=obstacles
self.target=target
def drawMaze(self, res=0.01):
#plot maze outline:
x = [0,0,self.length,self.length,0]
y = [0,self.height, self.height,0,0]
plt.plot(x, y)
#plotting maze targets
plt.plot(self.target.pos_x, self.target.pos_y,'ro', label='target')
#plotting maze obstacles with resolution res
x_mesh = arange(0,self.length,res)
y_mesh = arange(0,self.height,res)
X_mesh,Y_mesh = meshgrid(x_mesh, y_mesh) # grid of point
Z=np.zeros(X_mesh.size).reshape(y_mesh.size,x_mesh.size)
for obstacle in self.obstacles:
Z = np.maximum(Z, obstacle.func(X_mesh, Y_mesh)) # evaluation of the function on the grid
plt.imshow(np.heaviside(np.array([z.astype(float) for z in Z]),0),extent=[0,self.length,0,self.height], cmap='Pastel1', origin='lower') #dmap='Greys'
plt.title('MyMaze')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.1),fancybox=True, shadow=True, ncol=5) #legende unter plot
class Obstacle:
def __init__(self, func):
self.func=func #func(point)>0 if point lies in obstacle and <= 0 if not, function should be continous such that algorithm can find the minmum of the function
def isPointInObstacle(self, point):
if self.func(point)>=0:
return 1
else:
return 0
class Target:
def __init__(self, pos_x, pos_y):
self.pos_x=pos_x
self.pos_y=pos_y
class Agent:
def __init__(self, maze, pos_x=1, pos_y=1, rays=360, alpha=0.001):
self.posvec=[[pos_x, pos_y]] #np.array([[pos_x, pos_y]])
self.rays=rays
self.view=[] #distance vector to next wall/obstacle
self.points=[] #point vector to next wall/obstacle
self.maze=maze
self.memory = [] #np.asarray([ [0]*(rays+4) for i in range(10)],dtype=np.float64)
self.memoryCounter=0
self.lastMove=np.array([0,0])
self.model = keras.Sequential()
self.model.add(layers.Dense(64,batch_input_shape=(1,rays+2) ,activation="relu"))
self.model.add(layers.Dense(64,batch_input_shape=(1,rays+2) ,activation="relu"))
self.model.add(layers.Dropout(.2))
self.model.add(layers.Dense(4))
self.reward=[]
self.isEnd=False
self.gamma=0.3 #comulative reward factor #gamma<=0.5 such that it can not be efficient to run infinitely long and gather rewards
self.learningRate=alpha
self.model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=self.learningRate))
self.actionMemory = []
self.updateMemory()
def look(self, res=0.01): #divides 360 degrees in rays and returns vector points of endpoints (ray hitting maze or obstacle) of each of these rays
rays=self.rays
angle=arange(0,360,360/rays)
points=[]
distance=[]
#intersection with maze boarders
for alpha in angle:
if (alpha==0):
t_min_maze=(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha))
if (alpha==180):
t_min_maze=(0-self.posvec[-1][1])/math.cos(math.radians(alpha))
if (alpha==90):
t_min_maze=(self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha))
if (alpha==270):
t_min_maze=(0-self.posvec[-1][0])/math.sin(math.radians(alpha))
if (alpha>0 and alpha <90):
t_min_maze=min((self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha)),(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha)))
if (alpha>90 and alpha <180):
t_min_maze=min((self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha)), (0-self.posvec[-1][1])/math.cos(math.radians(alpha)))
if (alpha>180 and alpha <270):
t_min_maze=min((0-self.posvec[-1][0])/math.sin(math.radians(alpha)),(0-self.posvec[-1][1])/math.cos(math.radians(alpha)))
if alpha>270:
t_min_maze=min((0-self.posvec[-1][0])/math.sin(math.radians(alpha)),(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha)))
'''
t_min_right=(self.maze.length-self.posvec[-1,0])/sin(alpha)
t_min_left=(0-self.posvec[-1,0])/sin(alpha)
t_min_up=(self.maze.height-self.posvec[-1,1])/cos(alpha)
t_min_down=(0-self.posvec[-1,1])/cos(alpha)
'''
#intersection with obstacles
t_min_obstacle=t_min_maze
for obstacle in self.maze.obstacles:
def ray(t):
x=self.posvec[-1][0]+t*math.sin(math.radians(alpha))
y=self.posvec[-1][1]+t*math.cos(math.radians(alpha))
return obstacle.func(x,y)
for t_ray in np.arange(0,t_min_obstacle+1,res):
if (ray(t_ray)>0 and t_ray<t_min_maze and t_ray>0):
t_min_obstacle=t_ray
break
t_min=t_min_obstacle
#calculate end points of beam:
distance.append(t_min)
points.append([self.posvec[-1][0]+t_min*math.sin(math.radians(alpha)),self.posvec[-1][1]+t_min*math.cos(math.radians(alpha))])
self.view=distance
self.points=points
return points
def run(self, angle, distance):
newaction=[angle, distance]
self.actionMemory.append(newaction)
'''
checks weather the path is free of obstacles, if so then walks the path if not only walks until hits the obstacle
updates self.memory
updates self.isEnd
updates self.posvec
'''
if(self.isEnd):
print("Agent has already crashed with an Obstacle, please reset Agent with command Agentname.reset()")
return
else:
result=self.IsWalkPossible(angle, distance)
isRunPossible=result[0]
t_ray=result[1]
if(not isRunPossible):
print("Agent crashed with an Obstacle, please reset Agent with command Agentname.reset()")
self.isEnd=not isRunPossible
self.posvec.append([(self.posvec[-1][0] + distance*t_ray*math.sin(math.radians(angle)))[0], (self.posvec[-1][1] + distance*t_ray*math.cos(math.radians(angle)))[0]])
self.lastMove=[angle, t_ray*distance]
self.updateMemory()
if(not isRunPossible):
self.draw(0.01, False, True)
return
def draw(self, res=0.01, plotView=True, drawPath=True):
self.maze.drawMaze(res)
#plot green lines
if (self.points is not [] and plotView):
for point in self.points:
plt.plot([self.posvec[-1][0],point[0]],[self.posvec[-1][1],point[1]],'g')
#plot agent:
plt.plot(self.posvec[-1][0],self.posvec[-1][1],'bx', label='agent')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.1),fancybox=True, shadow=True, ncol=5)
if (drawPath):
self.drawPath()
def drawPath(self):
lastpoint=[self.posvec[0][0],self.posvec[0][1]]
for point in self.posvec:
plt.plot([lastpoint[0],point[0]],[lastpoint[1],point[1]],'b')
lastpoint=point
def seeTarget(self, res=0.01):
'''
check weather there is an obstacle in direct line from agent to target
if agent can see target return True
Else: return False
'''
for obstacle in self.maze.obstacles:
#parametrize line from agent to target
def ray(t):
x=self.posvec[-1][0]+t*(self.maze.target.pos_x-self.posvec[-1][0])
y=self.posvec[-1][1]+t*(self.maze.target.pos_y-self.posvec[-1][1])
return obstacle.func(x,y)
for t_ray in np.arange(0,1,res):
if ray(t_ray)>0:
return False
if ray(1)>0:
return False
return True
def IsWalkPossible(self, angle, distance, res=0.01):
'''
if agent would cross wall while walking the track, return False
Else: return True
'''
def pos(t):
x=self.posvec[-1][0]+t*distance*(math.sin(math.radians(angle)))
y=self.posvec[-1][1]+t*distance*(math.cos(math.radians(angle)))
return x,y
for t_ray in np.arange(0,1,res):
if pos(t_ray)[0]<0 or pos(t_ray)[0]>self.maze.length or pos(t_ray)[1]<0 or pos(t_ray)[1]>self.maze.height:
return False, t_ray
if pos(1)[0]<0 or pos(1)[0]>self.maze.length or pos(1)[1]<0 or pos(1)[1]>self.maze.height:
return False, 1
'''
if agent would cross obstacle while walking the track, return False
Else: return True
'''
for obstacle in self.maze.obstacles:
#parametrize line from agent to target
def ray(t):
x=self.posvec[-1][0]+t*distance*(math.sin(math.radians(angle)))
y=self.posvec[-1][1]+t*distance*(math.cos(math.radians(angle)))
return obstacle.func(x,y)
for t_ray in np.arange(0,1,res):
if ray(t_ray)>0:
return False, t_ray
if ray(1)>0:
return False, 1
return True, 1
def angleDistanceTarget(self):
v1=[0,1]
v2=[self.maze.target.pos_x-self.posvec[-1][0],self.maze.target.pos_y-self.posvec[-1][1]]
angle=math.acos(np.dot(v1,v2/np.linalg.norm(v2)))
if (self.maze.target.pos_x-self.posvec[-1][0]<0):
return 360-np.rad2deg(angle),np.linalg.norm(v2)
else:
return np.rad2deg(angle), np.linalg.norm(v2)
def updateMemory(self):
if (not self.isEnd):
self.look()
newMemory=self.view
if (self.seeTarget()):
angle, distance=self.angleDistanceTarget()
newMemory.append(angle)
newMemory.append(distance)
else:
newMemory.append(-1)
newMemory.append(-1)
self.memory.append(newMemory)
if self.memoryCounter>0:
self.reward.append(self.rewardFunction())
self.memoryCounter+=1
def reset(self):
self.posvec=[[random.random()*self.maze.length, random.random()*self.maze.height]]
self.view=[] #distance vector to next wall/obstacle
self.points=[] #point vector to next wall/obstacle
self.memory = []
self.memoryCounter=0
self.lastMove=np.array([0,0])
self.isEnd=False
self.reward=[]
self.updateMemory()
def evaluate(self):
return
def rewardFunction(self):
if (self.isEnd):
print("crashed Obstacle")
return -1
else:
distance_target_squared=(self.posvec[-1][0]-self.maze.target.pos_x)**2+(self.posvec[-1][1]-self.maze.target.pos_y)**2
if distance_target_squared==0:
print("Agent reached target")
self.isEnd=True
return np.exp(-distance_target_squared)
def train(self, num_weight_updates=50):
print(self.model.summary())
maze_diag=(self.maze.height**2+self.maze.length**2)**0.5
for i in range(num_weight_updates):
print("weight Update Numer:", i)
#create a new full trajectory of agent
self.reset()
gradient=[]
iteration=0
while(not self.isEnd and iteration<20):
with tf.GradientTape() as tape:
iteration +=1
print("iteration:",iteration)
insertMemory=np.expand_dims(self.memory[-1], axis=0) # mein self.memory wird die shape nicht verändert
insertMemory=np.true_divide(insertMemory, maze_diag) #damit Längen maximal 1 sind
insertMemory[0,-2]=insertMemory[0,-2]*maze_diag/360 #damit Winkel Norm 1 hat
probs=self.model(insertMemory)
meannewangle=probs[0,0]*360
meannewdistance=probs[0,1]*maze_diag
stdnewangle=np.exp(probs[0,2])
stdnewdistance=np.exp(probs[0,3])
random_normal=tf.random.normal([1],0,1,tf.float32, seed=1).numpy()
run_angle=meannewangle+random_normal*stdnewangle
run_distance=meannewdistance+random_normal*stdnewdistance
log_likelihood=-0.5*((run_angle-meannewangle)**2/stdnewangle**2+2*math.log(stdnewangle)+(run_distance-meannewdistance)**2/stdnewdistance**2+2*math.log(stdnewdistance)+2*math.log(2*math.pi))
gradient.append(tape.gradient(log_likelihood,self.model.trainable_variables))
run_distance=np.exp(run_distance) #such that it is positive
self.run(run_angle, run_distance)
self.draw()
#calculate discounted reward:
discountedReward = np.zeros_like(self.reward)
for t in range(len(self.reward)):
discount = 1
tmp=0
for k in range(t, len(self.reward)):
tmp += self.reward[k] * discount
discount *= self.gamma
discountedReward[t] = tmp
#update weights
for i in range (iteration):
grad=[old_grad*discountedReward[i] for old_grad in gradient[i]] #multiplication with learning rate is done by Adam optimizer
self.model.optimizer.apply_gradients(zip(grad, self.model.trainable_variables))
def play(self):
maze_diag=(self.maze.height**2+self.maze.length**2)**0.5
self.reset()
iteration=0
while(not self.isEnd and iteration<10):
iteration +=1
print("iteration:",iteration)
insertMemory=np.expand_dims(self.memory[-1], axis=0) # mein self.memory wird die shape nicht verändert
insertMemory=np.true_divide(insertMemory, maze_diag) #damit Längen maximal 1 sind
insertMemory[0,-2]=insertMemory[0,-2]*maze_diag/360 #damit Winkel Norm 1 hat
probs=self.model(insertMemory)
meannewangle=probs[0,0]
meannewdistance=probs[0,1]
stdnewangle=np.exp(probs[0,2])
stdnewdistance=np.exp(probs[0,3])
random_normal=tf.random.normal([1],0,1,tf.float32, seed=1).numpy()
run_angle=meannewangle+random_normal*stdnewangle
run_distance=meannewdistance+random_normal*stdnewdistance
run_distance=np.exp(run_distance) #such that it is positive
print("angle:",run_angle,"distance:",run_distance)
self.run(run_angle, run_distance)
self.draw()
I know this is really long but i thought maybe there is a mistake somewhere else, so if you want to test the code, it would be enough to test it without obstacles, because even in this easy scenario it is not working even if i train for 500 iterations :(
Just run the following code:
listOfObstacles = []
myTarget=Target(1,2)
myMaze=Maze(5,3,myTarget,listOfObstacles)
myAgent=Agent(myMaze, 4,1, 10)
myAgent.train()
myAgent.play()
500 iterations are really really low for reinforcement learning problem, especially when neural networks are involved. Try increasing it (by a lot - at least 100.000 iterations), and also try to look at the reward at the end of each episode, to see if it somewhat increasing or not.
Then, if after that many iterations you see zero progress, you should check if you are implementing the algorithm in the right way, or if there are any hidden bugs in the environment.
To solve the first problem, you should try some RL library, like stable baselines, to be assured that there are no bugs in your algorithm implementation. Then, if it still not working, you should look for bugs in your code, or rethink the way you are defining this RL problem.
Look into similar problems and check how they defined the MDP, especially the reward function.
Also, delete that dropout layer: in supervised learning settings, this indeed often helps to reduce overfitting (by introducing some variance), but in RL, additional variance is not really what we're looking for.