I’m currently working on a project where I like to do some reinforcement learning on a simulation.
To start I made a simulation in Simpy. Side note (I’m currently not focused on the real-world correctness of the simulation, but more on the RL side, so some processes do not make sense at all).
# pip install stable-baselines3[extra]
# Python version 3.9.18
import gym
from gym import Env
from gym.spaces import Discrete,Box,Dict,Tuple,MultiBinary,MultiDiscrete
import numpy as np
import random
import os
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import SubprocVecEnv
import simpy
import random
import statistics
import numpy as np
import matplotlib.pyplot as plt
battery_trend = []
wait_times = []
time_stamp = []
#Create an class that mimics an charging station
class Charging_station():
#Init method for the charging station class
def __init__(self,env,num_charging_stations):
self.env = env
self.charge_station = simpy.Resource(env,num_charging_stations)
self.battery_amount =150000
self.amount = 2
self.battery_drained = False
self.solar_power = 0
self.done = False
self.debug = 0
#This method controls the solar panel
def solar_panel(self):
while True:
print("Charging")
yield self.env.timeout(0.016)
#This method controls the solar panel
self.battery_amount += self.solar_power
self.debug +=1
def trend_battery(self):
time = 0
while True:
#Trend the battery status every minute
yield self.env.timeout(1)
battery_trend.append(self.battery_amount)
time_stamp.append(time)
time +=1
#This method simmulates the charging of a vechicle
def charge_vechicle(self, vehicle):
# Set a random time for
if self.battery_amount >1:
self.amount = 2
else:
if self.battery_drained == False:
print("Battery Drained")
self.battery_drained = True
self.amount = 1
self.battery_amount -= 1
yield self.env.timeout(0.016)
class Truck():
#Init the truck class
def __init__(self,charge_left,number,charge_station):
self.charge = charge_left
self.number = number
self.charging_station = charge_station
def charge_truck(self,env,truck):
#Get the starting time
begin_time = env.now
#Charge the truck
with self.charging_station.charge_station.request() as request:
#Wait for an free charging station
yield request
#Start the charging
while self.charge < 7200:
yield env.process(self.charging_station.charge_vechicle(truck))
self.charge += self.charging_station.amount
#Determine the total time a truckdriver is needing for a charge
wait_times.append(env.now -begin_time)
#This function runs the charging station
def run_station(env,num_chargers, charge_station):
truck = []
number = 1
#Create the simmulation enviroment
#Set 1 car in the waiting line
truck.append(Truck(50,1,charge_station))
#Add the solat panel to the enviroment\
env.process(charge_station.solar_panel())
env.process(truck[1 -1].charge_truck(env,truck))
env.process(charge_station.trend_battery())
while True:
yield env.timeout(5)
#truck.append(Truck(50,1))
number += 1
truck.append(Truck(50,number,charge_station))
env.process(truck[number -1].charge_truck(env,truck))
def get_average_wait_time(arrival_times):
average_wait = statistics.mean(wait_times)
# Pretty print the results
minutes, frac_minutes = divmod(average_wait, 2)
seconds = frac_minutes * 60
return round(minutes), round(seconds)
#The main method for the systme
def truck_sim(env,charge_station):
#Setup the main system
random.seed(30)
#Run the simmulation
print("Here")
env.process(run_station(env,1,charge_station))
#Run the simmulation for 1 day
begin_time = env.now
env.run(until= 1440)
print(env.now -begin_time,"Wait_Time")
#View the results
# mins, secs = get_average_wait_time(wait_times)
#print("Done!!!")
print(f"\nThe battery amount is {charge_station.battery_amount}")
charge_station.done = True
In the simulation I have a couple of subprocesses that simulate a electrical vehicle charging station. 1 of those subprocesses is a solar panel method. Based on the solar strength the battery is charged up by a certain amount. I have this part working and when I change the solar strength I can see a change in the trending (all good here).
I made a debug print statement in the solar panel method that notifies me when the method is executed. When I execute the simulation by itself it prints out multiple charging strings to the console.
I now want to predict with reinforcement learning what the solar strength should be to get a empty battery at the end of the day (this is the part that doesn’t make sense in the real world).
#Create a truck enviroment that the model is going to perform in
class TruckEnv(Env):
def __init__(self):
self.action_space = Box(low = np.array([0]), high = np.array([10]))
self.observation_space = Box(low = np.array([0]), high = np.array([25000]))
self.state = 0
self.done = False
self.running = False
self.env_sim = simpy.Environment()
self.charge_station = Charging_station(self.env_sim, 1)
def step(self,action):
if self.running == False:
#Run the simmulation
self.env_sim = simpy.Environment()
self.charge_station.battery_amount = 160000
self.charge_station.solar_power = action[0]
self.running = True
print("new_env")
truck_sim(self.env_sim,self.charge_station)
reward = 0
#print(charge_station.solar_power )
#Check if the battery is near zero
#print(charge_station.done)
if self.charge_station.done:
#print(charge_station.debug,"debug_Amount")
if self.charge_station.battery_amount < 1000 and self.charge_station.battery_amount >= 0:
reward = 1
else:
reward = -1
done = True
else:
done = False
print("not Done")
info = {}
return self.charge_station.battery_amount, reward, done, info
def render(self):
pass
def reset(self):
# Reset Charging_station
self.charge_station.battery_amount = 160000
self.charge_station.solar_power = 1.1 # reset to a default value
self.charge_station.done = False
self.done = False
self.running = False
return self.state
#The main method for the systme
def truck_sim(self):
#Setup the main system
random.seed(30)
#Run the simmulation
print("Here")
env.process(run_station(self.env,1))
#Run the simmulation for 1 day
begin_time = self.env.now
env.run(until= 1440)
print(self.env.now -begin_time,"Wait_Time")
#View the results
mins, secs = get_average_wait_time(wait_times)
#print("Done!!!")
print(f"\nThe battery amount is {self.charge_station.battery_amount}")
self.charge_station.done = True
env = TruckEnv()
log_path = os.path.join('.','logs')
model = PPO('MlpPolicy', env, verbose = 1, tensorboard_log = log_path)
episodes = 5
for episode in range(1, episodes+1):
state = env.reset()
done = False
score = 0
while not done:
env.render()
action = env.action_space.sample()
n_state, reward, done, info = env.step(action)
score+=reward
print('Episode:{} Score:{}'.format(episode, score))
env.close()
y = np.array(battery_trend)
x = np.array(time_stamp)
slope, intercept = np.polyfit(x, y, 1)
plt.plot(x, y)
When I implement it in the RL environment and run it. The charging debug statement is only executed once, and that is what my problem is.
I have tried to alter def step(self,action) in a way where it only returns something when it is done with the simmulation, but this results in a chrash.
To summarise I have no clue why the simmulation is only executed once when it is running inside the RL enviroment.
I hope some of you can give me a push in the right direction.
Note that this is a complete new world for me, so maybe I did something stupid .
Thank you in advance
Alex
Your problem is likely related to the fact that the SimPy environment has global state that isn't being properly reset between episodes and steps in the reinforcement learning (RL) environment. When you run the simulation outside of the RL environment, it's getting a fresh start every time. But within the RL environment, global states are being maintained, causing unexpected behaviors.
Here are a few suggestions on how to fix this:
Isolation of SimPy State: Ensure that any global state maintained by the SimPy simulation (like battery_trend, wait_times, time_stamp, charge_station, etc.) is initialized and isolated within the RL environment. This way, the state is fresh for every episode.
Resetting SimPy State: In the reset method of TruckEnv, ensure that all relevant states and variables associated with the SimPy simulation are reinitialized. This will guarantee that every episode starts from a clean slate.
Avoid Global Variables: The use of global variables can be problematic, especially when integrating with an RL environment. It's preferable to wrap the simulation into a class or some structure where all variables are encapsulated, avoiding unintended side effects.
To address these, here are modifications to your code:
Convert Charging_station and truck_sim to be members of TruckEnv:
Modify the step method to use self.charge_station and self.env_sim:
Ensure that all other methods and functions which depend on global variables (like truck_sim, run_station, etc.) are modified to avoid using global state, or they are converted into methods of TruckEnv that operate on the class's attributes. By reorganizing the code this way, you're ensuring that the SimPy simulation and the RL environment are properly isolated and the global state doesn't interfere between episodes or steps.