I am trying to implement the reward system which can be used by the SARSA model to make better decisions in relieving traffic in all the lanes in an intersection. This is how my reward function looks like:
def calculate_reward(self, old_dti, new_dti):
alpha = 0.5
beta = 0.3
gamma = 0.2
reduction_in_total_congestion = sum(old_dti.values()) - sum(new_dti.values())
excess_vehicles = [max(0, count - self.vehicle_threshold) for count in self.vehicle_parameters["vehicle_count"].values()]
avg_congestion_above_threshold = sum(excess_vehicles) / 4
action_cost = 1 if self.action_changed else 0
reward = alpha * reduction_in_total_congestion - beta * avg_congestion_above_threshold - gamma * action_cost
return reward
dti (Delay Time Indicator)
: It is the sum of the waiting time of all vehicles in a lane
Example: old_dti = {"north": 4334, "south": 83, "east": 2332, "west": 432}
vehicle_threshold
: It is the maximum number of vehicles a lane can have. I have set it to 12
self.vehicle_parameters["vehicle_count"]
: It is the number of vehicles in each lane that are waiting at the red light.
Example: {"north": 12, "south": 0, "east": 2, "west": 2}
action_cost
: If the SARSA model made a decision and if it is not the same decision as before, the cost is 1. If the same decision is made, the cost is 0
I have added weights to the above parameters to signify their importance. DTI has the highest importance because there can be 10 vehicles in a lane with a low DTI, while in another lane, there can be 5 vehicles with high DTI. In this case, DTI has a priority over the vehicle_count.
My earlier reward calculation function:
@staticmethod
def calculate_reward(old_dti, new_dti, vehicle_count):
max_reward = 10
max_penalty = -10
delay_before = sum(old_dti.values())
delay_after = sum(new_dti.values())
if delay_before == 0:
if delay_after > 0:
# Introducing delay where there was none should be penalized
return max_penalty
else:
# Maintaining no congestion could be a neutral or slightly positive outcome
return 1 # or some small positive value
else:
improvement = delay_before - delay_after
if improvement > 0:
# Scale the reward based on the percentage improvement
reward = (improvement / delay_before) * max_reward
elif improvement < 0:
# Scale the penalty based on the percentage worsening
penalty_ratio = abs(improvement) / delay_before
reward = penalty_ratio * max_penalty
else:
# No change in delay
reward = 0
return reward
In this implementation, I calculate the reward only on the basis of the DTI. But after 20 generations, the reward did not change significantly and the model has not learned properly.
Is my new way of calculating the reward better for a relieving congestion in each lane? Also, on what basis should my SARSA make the next decision? As of now, SARSA is making a decision every 0.5 seconds. I am thinking about implementing the vehicle_threshold and if a lane's vehicle threshsold has crossed the preset limit, SARSA should then make the decision