python policy gradient reinforcement learning with continous action space is not working

105 views Asked by At

i am trying to learn an agent to navigate to a target in my custom environment. The agent is learning with a neural net (2 hidden Dense layer, one dropout and one output layer of dimension 4). As input nodes the agent uses a sensor which measures the distance to the surrounding obstacles and also the angle and distance to the target if it is in view. (I normalized all input to be in range [0,1]). As output we get the mean and std of the next action (angle and distance the robot will move). The action taken is than sampled by a gaussion policy from a normal distribution. I used the gaussian policy because this was the only possible way i saw to define a log likelihood which i than want to use during training. In training the agent walks a trajectory until it hits an obstacle or wall and calculates the discounted future reward after each step afterwards. I now want to use policy gradient ascent to train the agent to greater reward. Youz can find the idea also here: https://spinningup.openai.com/en/latest/spinningup/rl_intro.html I will post my python code approach below but it somehow is not working. Maybe i missed something. I am really desperate and would be so thankful for any hint. (The crucial part is the method train() in class Agent)

class Maze:

def __init__(self, length, height, target, obstacles=[]):
    self.length =length
    self.height = height
    self.obstacles=obstacles
    self.target=target
def drawMaze(self, res=0.01):
   #plot maze outline:
   x = [0,0,self.length,self.length,0]
   y = [0,self.height, self.height,0,0]
   plt.plot(x, y) 
   #plotting maze targets
   plt.plot(self.target.pos_x, self.target.pos_y,'ro', label='target')
   #plotting maze obstacles with resolution res
   x_mesh = arange(0,self.length,res)
   y_mesh = arange(0,self.height,res)
   X_mesh,Y_mesh = meshgrid(x_mesh, y_mesh) # grid of point
   Z=np.zeros(X_mesh.size).reshape(y_mesh.size,x_mesh.size)
   for obstacle in self.obstacles:
      Z = np.maximum(Z, obstacle.func(X_mesh, Y_mesh)) # evaluation of the function on the grid
   plt.imshow(np.heaviside(np.array([z.astype(float) for z in Z]),0),extent=[0,self.length,0,self.height], cmap='Pastel1', origin='lower') #dmap='Greys'
   plt.title('MyMaze')  
   plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.1),fancybox=True, shadow=True, ncol=5) #legende unter plot
   
   

class Obstacle:

def __init__(self, func):
    self.func=func  #func(point)>0 if point lies in obstacle and <= 0 if not, function should be continous such that algorithm can find the minmum of the function
def isPointInObstacle(self, point):
    if self.func(point)>=0:
        return 1
    else:
        return 0 

class Target:

def __init__(self, pos_x, pos_y):
    self.pos_x=pos_x
    self.pos_y=pos_y 
    

class Agent:

def __init__(self, maze, pos_x=1, pos_y=1, rays=360, alpha=0.001):
    self.posvec=[[pos_x, pos_y]] #np.array([[pos_x, pos_y]])
    self.rays=rays
    self.view=[]  #distance vector to next wall/obstacle
    self.points=[] #point vector to next wall/obstacle
    self.maze=maze
    self.memory = [] #np.asarray([ [0]*(rays+4) for i in range(10)],dtype=np.float64)
    self.memoryCounter=0        
    self.lastMove=np.array([0,0])
    self.model = keras.Sequential()
    self.model.add(layers.Dense(64,batch_input_shape=(1,rays+2) ,activation="relu"))
    self.model.add(layers.Dense(64,batch_input_shape=(1,rays+2) ,activation="relu"))
    self.model.add(layers.Dropout(.2))
    self.model.add(layers.Dense(4))
    self.reward=[]
    self.isEnd=False
    self.gamma=0.3 #comulative reward factor #gamma<=0.5 such that it can not be efficient to run infinitely long and gather rewards
    self.learningRate=alpha
    self.model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=self.learningRate))    
    self.actionMemory = []
    self.updateMemory()
    
def look(self, res=0.01):  #divides 360 degrees in rays and returns vector points of endpoints (ray hitting maze or obstacle) of each of these rays
    rays=self.rays
    angle=arange(0,360,360/rays)
    points=[]
    distance=[]
    #intersection with maze boarders
    for alpha in angle:
        if (alpha==0):
            t_min_maze=(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha))
        if (alpha==180):
            t_min_maze=(0-self.posvec[-1][1])/math.cos(math.radians(alpha))
        if (alpha==90):
            t_min_maze=(self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha))
        if (alpha==270):
            t_min_maze=(0-self.posvec[-1][0])/math.sin(math.radians(alpha))
        if (alpha>0 and alpha <90):
            t_min_maze=min((self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha)),(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha)))
        if (alpha>90 and alpha <180):
            t_min_maze=min((self.maze.length-self.posvec[-1][0])/math.sin(math.radians(alpha)), (0-self.posvec[-1][1])/math.cos(math.radians(alpha)))
        if (alpha>180 and alpha <270):
            t_min_maze=min((0-self.posvec[-1][0])/math.sin(math.radians(alpha)),(0-self.posvec[-1][1])/math.cos(math.radians(alpha)))
        if alpha>270:
            t_min_maze=min((0-self.posvec[-1][0])/math.sin(math.radians(alpha)),(self.maze.height-self.posvec[-1][1])/math.cos(math.radians(alpha)))
            '''
            t_min_right=(self.maze.length-self.posvec[-1,0])/sin(alpha)
            t_min_left=(0-self.posvec[-1,0])/sin(alpha)
            t_min_up=(self.maze.height-self.posvec[-1,1])/cos(alpha)
            t_min_down=(0-self.posvec[-1,1])/cos(alpha)
            '''  
        #intersection with obstacles
        t_min_obstacle=t_min_maze
        for obstacle in self.maze.obstacles:
            def ray(t):
                x=self.posvec[-1][0]+t*math.sin(math.radians(alpha))
                y=self.posvec[-1][1]+t*math.cos(math.radians(alpha))
                return obstacle.func(x,y) 
            for t_ray in np.arange(0,t_min_obstacle+1,res):
                if (ray(t_ray)>0 and t_ray<t_min_maze and t_ray>0):
                    t_min_obstacle=t_ray
                    break
        t_min=t_min_obstacle
        #calculate end points of beam:
        distance.append(t_min)
        points.append([self.posvec[-1][0]+t_min*math.sin(math.radians(alpha)),self.posvec[-1][1]+t_min*math.cos(math.radians(alpha))])
        self.view=distance
        self.points=points
        
    return points
 
def run(self, angle, distance):
    newaction=[angle, distance]
    self.actionMemory.append(newaction)
    '''
    checks weather the path is free of obstacles, if so then walks the path if not only walks until hits the obstacle
    updates self.memory
    updates self.isEnd
    updates self.posvec
    '''
    if(self.isEnd):
        print("Agent has already crashed with an Obstacle, please reset Agent with command Agentname.reset()")
        return
    else:
        result=self.IsWalkPossible(angle, distance)
        isRunPossible=result[0]
        t_ray=result[1]
        if(not isRunPossible):
            print("Agent crashed with an Obstacle, please reset Agent with command Agentname.reset()")
        self.isEnd=not isRunPossible
        self.posvec.append([(self.posvec[-1][0] + distance*t_ray*math.sin(math.radians(angle)))[0], (self.posvec[-1][1] + distance*t_ray*math.cos(math.radians(angle)))[0]])
        self.lastMove=[angle, t_ray*distance]
        self.updateMemory()
        if(not isRunPossible):
            self.draw(0.01, False, True)
        return

def draw(self, res=0.01, plotView=True, drawPath=True):
    self.maze.drawMaze(res)
    #plot green lines
    if (self.points is not [] and plotView):
       for point in self.points:
           plt.plot([self.posvec[-1][0],point[0]],[self.posvec[-1][1],point[1]],'g')
    #plot agent:
    plt.plot(self.posvec[-1][0],self.posvec[-1][1],'bx', label='agent')
    plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.1),fancybox=True, shadow=True, ncol=5) 
    if (drawPath):
        self.drawPath()
        
def drawPath(self):    
    lastpoint=[self.posvec[0][0],self.posvec[0][1]]
    for point in self.posvec:
        plt.plot([lastpoint[0],point[0]],[lastpoint[1],point[1]],'b')
        lastpoint=point
    
def seeTarget(self, res=0.01):
    '''
    check weather there is an obstacle in direct line from agent to target
    if agent can see target return True
    Else: return False
    '''
    for obstacle in self.maze.obstacles:
    #parametrize line from agent to target
        def ray(t):
            x=self.posvec[-1][0]+t*(self.maze.target.pos_x-self.posvec[-1][0])
            y=self.posvec[-1][1]+t*(self.maze.target.pos_y-self.posvec[-1][1])
            return obstacle.func(x,y)
        for t_ray in np.arange(0,1,res):
            if ray(t_ray)>0:
                return False
        if ray(1)>0:
            return False
    return True
def IsWalkPossible(self, angle, distance, res=0.01):
    '''
    if agent would cross wall while walking the track, return False
    Else: return True
    '''      
    def pos(t):
        x=self.posvec[-1][0]+t*distance*(math.sin(math.radians(angle)))
        y=self.posvec[-1][1]+t*distance*(math.cos(math.radians(angle)))
        return x,y

    for t_ray in np.arange(0,1,res):
        if pos(t_ray)[0]<0 or pos(t_ray)[0]>self.maze.length or pos(t_ray)[1]<0 or pos(t_ray)[1]>self.maze.height:
            return False, t_ray
    if pos(1)[0]<0 or pos(1)[0]>self.maze.length or pos(1)[1]<0 or pos(1)[1]>self.maze.height:
        return False, 1
    '''
    if agent would cross obstacle while walking the track, return False
    Else: return True
    '''
    for obstacle in self.maze.obstacles:
    #parametrize line from agent to target
        def ray(t):
            x=self.posvec[-1][0]+t*distance*(math.sin(math.radians(angle)))
            y=self.posvec[-1][1]+t*distance*(math.cos(math.radians(angle)))
            return obstacle.func(x,y)
        for t_ray in np.arange(0,1,res):
            if ray(t_ray)>0:
                return False, t_ray
        if ray(1)>0:
            return False, 1
    return True, 1

def angleDistanceTarget(self):
    v1=[0,1]
    v2=[self.maze.target.pos_x-self.posvec[-1][0],self.maze.target.pos_y-self.posvec[-1][1]]
    angle=math.acos(np.dot(v1,v2/np.linalg.norm(v2)))
    if (self.maze.target.pos_x-self.posvec[-1][0]<0):
        return 360-np.rad2deg(angle),np.linalg.norm(v2)
    else:
        return np.rad2deg(angle), np.linalg.norm(v2)
    
def updateMemory(self):
    if (not self.isEnd):
        self.look()
        newMemory=self.view
        if (self.seeTarget()):
            angle, distance=self.angleDistanceTarget()
            newMemory.append(angle)
            newMemory.append(distance)
        else: 
            newMemory.append(-1)
            newMemory.append(-1)
        self.memory.append(newMemory)
    if self.memoryCounter>0:
        self.reward.append(self.rewardFunction())
    self.memoryCounter+=1
    
def reset(self):
    self.posvec=[[random.random()*self.maze.length, random.random()*self.maze.height]]
    self.view=[]  #distance vector to next wall/obstacle
    self.points=[] #point vector to next wall/obstacle
    self.memory = []
    self.memoryCounter=0        
    self.lastMove=np.array([0,0])
    self.isEnd=False
    self.reward=[]
    self.updateMemory()
    
def evaluate(self):
    return

def rewardFunction(self):
    if (self.isEnd):
        print("crashed Obstacle")
        return -1
    else:
        distance_target_squared=(self.posvec[-1][0]-self.maze.target.pos_x)**2+(self.posvec[-1][1]-self.maze.target.pos_y)**2
        if distance_target_squared==0:
            print("Agent reached target")
            self.isEnd=True
        return np.exp(-distance_target_squared)

def train(self, num_weight_updates=50):
    print(self.model.summary())
    maze_diag=(self.maze.height**2+self.maze.length**2)**0.5
    for i in range(num_weight_updates):
        print("weight Update Numer:", i)
        #create a new full trajectory of agent
        self.reset()
        gradient=[]
        iteration=0
        while(not self.isEnd and iteration<20):
            with tf.GradientTape() as tape:
                iteration +=1
                print("iteration:",iteration)
                insertMemory=np.expand_dims(self.memory[-1], axis=0)  # mein self.memory wird die shape nicht verändert
                insertMemory=np.true_divide(insertMemory, maze_diag)  #damit Längen maximal 1 sind
                insertMemory[0,-2]=insertMemory[0,-2]*maze_diag/360 #damit Winkel Norm 1 hat
                probs=self.model(insertMemory)
                meannewangle=probs[0,0]*360
                meannewdistance=probs[0,1]*maze_diag
                stdnewangle=np.exp(probs[0,2])
                stdnewdistance=np.exp(probs[0,3])
                random_normal=tf.random.normal([1],0,1,tf.float32, seed=1).numpy()
                run_angle=meannewangle+random_normal*stdnewangle
                run_distance=meannewdistance+random_normal*stdnewdistance
                log_likelihood=-0.5*((run_angle-meannewangle)**2/stdnewangle**2+2*math.log(stdnewangle)+(run_distance-meannewdistance)**2/stdnewdistance**2+2*math.log(stdnewdistance)+2*math.log(2*math.pi))
                gradient.append(tape.gradient(log_likelihood,self.model.trainable_variables))
            run_distance=np.exp(run_distance) #such that it is positive
            self.run(run_angle, run_distance)
        self.draw()
        
        #calculate discounted reward:
        discountedReward = np.zeros_like(self.reward)
        for t in range(len(self.reward)):
            discount = 1
            tmp=0
            for k in range(t, len(self.reward)):
                tmp += self.reward[k] * discount
                discount *= self.gamma
            discountedReward[t] = tmp
            
        #update weights
        for i in range (iteration):
            grad=[old_grad*discountedReward[i] for old_grad in gradient[i]] #multiplication with learning rate is done by Adam optimizer
            self.model.optimizer.apply_gradients(zip(grad, self.model.trainable_variables))              
  
def play(self):
    maze_diag=(self.maze.height**2+self.maze.length**2)**0.5
    self.reset()
    iteration=0
    while(not self.isEnd and iteration<10):
        iteration +=1
        print("iteration:",iteration)
        insertMemory=np.expand_dims(self.memory[-1], axis=0)  # mein self.memory wird die shape nicht verändert
        insertMemory=np.true_divide(insertMemory, maze_diag)  #damit Längen maximal 1 sind
        insertMemory[0,-2]=insertMemory[0,-2]*maze_diag/360 #damit Winkel Norm 1 hat
        probs=self.model(insertMemory)
        meannewangle=probs[0,0]
        meannewdistance=probs[0,1]
        stdnewangle=np.exp(probs[0,2])
        stdnewdistance=np.exp(probs[0,3])
        random_normal=tf.random.normal([1],0,1,tf.float32, seed=1).numpy()
        run_angle=meannewangle+random_normal*stdnewangle
        run_distance=meannewdistance+random_normal*stdnewdistance
        run_distance=np.exp(run_distance) #such that it is positive
        print("angle:",run_angle,"distance:",run_distance)
        self.run(run_angle, run_distance)
    self.draw()

I know this is really long but i thought maybe there is a mistake somewhere else, so if you want to test the code, it would be enough to test it without obstacles, because even in this easy scenario it is not working even if i train for 500 iterations :(

Just run the following code:

listOfObstacles = [] 
myTarget=Target(1,2)
myMaze=Maze(5,3,myTarget,listOfObstacles)
myAgent=Agent(myMaze, 4,1, 10)
myAgent.train()
myAgent.play()
1

There are 1 answers

0
Raoul Raftopoulos On

500 iterations are really really low for reinforcement learning problem, especially when neural networks are involved. Try increasing it (by a lot - at least 100.000 iterations), and also try to look at the reward at the end of each episode, to see if it somewhat increasing or not.

Then, if after that many iterations you see zero progress, you should check if you are implementing the algorithm in the right way, or if there are any hidden bugs in the environment.

To solve the first problem, you should try some RL library, like stable baselines, to be assured that there are no bugs in your algorithm implementation. Then, if it still not working, you should look for bugs in your code, or rethink the way you are defining this RL problem.

Look into similar problems and check how they defined the MDP, especially the reward function.

Also, delete that dropout layer: in supervised learning settings, this indeed often helps to reduce overfitting (by introducing some variance), but in RL, additional variance is not really what we're looking for.