Simpy in combination with RL

189 views Asked by At

I’m currently working on a project where I like to do some reinforcement learning on a simulation.
To start I made a simulation in Simpy. Side note (I’m currently not focused on the real-world correctness of the simulation, but more on the RL side, so some processes do not make sense at all).

# pip install stable-baselines3[extra]
# Python version 3.9.18
import gym
from gym import Env
from gym.spaces import Discrete,Box,Dict,Tuple,MultiBinary,MultiDiscrete

import numpy as np
import random
import os

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import SubprocVecEnv

import simpy
import random
import statistics
import numpy as np
import matplotlib.pyplot as plt

battery_trend = []
wait_times = []
time_stamp = []
#Create an class that mimics an charging station
class Charging_station():
    #Init method for the charging station class
    def __init__(self,env,num_charging_stations):
        self.env = env
        self.charge_station = simpy.Resource(env,num_charging_stations)
        self.battery_amount =150000
        self.amount = 2
        self.battery_drained = False
        self.solar_power = 0
        self.done = False
        self.debug = 0

    #This method controls the solar panel 
    def solar_panel(self):
        while True:
            print("Charging")
            yield self.env.timeout(0.016)
            #This method controls the solar panel
            self.battery_amount += self.solar_power
            self.debug +=1

    def trend_battery(self):
        time = 0
        while True:
            #Trend the battery status every minute
            yield self.env.timeout(1)
            battery_trend.append(self.battery_amount)  
            time_stamp.append(time)
            time +=1


    #This method simmulates the charging of a vechicle
    def charge_vechicle(self, vehicle):
        # Set a random time for 
        if self.battery_amount >1:
            self.amount = 2
        else:
            if self.battery_drained == False:
                print("Battery Drained")
                self.battery_drained = True 
            self.amount = 1
        
        self.battery_amount -= 1
        yield self.env.timeout(0.016)        



class Truck():
    #Init the truck class
    def __init__(self,charge_left,number,charge_station):
        self.charge = charge_left
        self.number = number
        self.charging_station = charge_station

    def charge_truck(self,env,truck):
        #Get the starting time
        begin_time = env.now
        #Charge the truck
        with self.charging_station.charge_station.request() as request:
            #Wait for an free charging station
            yield request
            #Start the charging
            while self.charge < 7200:
                yield env.process(self.charging_station.charge_vechicle(truck))
                self.charge += self.charging_station.amount
        #Determine the total time a truckdriver is needing for a charge
        wait_times.append(env.now -begin_time) 



#This function runs the charging station
def run_station(env,num_chargers, charge_station):
    truck = []
    number = 1
    #Create the simmulation enviroment
         
    #Set 1 car in the waiting line
    truck.append(Truck(50,1,charge_station))
    #Add the solat panel to the enviroment\
    env.process(charge_station.solar_panel())
    env.process(truck[1 -1].charge_truck(env,truck))
    env.process(charge_station.trend_battery())
    while True:
        yield env.timeout(5)
        #truck.append(Truck(50,1))
        number += 1
        truck.append(Truck(50,number,charge_station))
        env.process(truck[number -1].charge_truck(env,truck))



def get_average_wait_time(arrival_times):
    average_wait = statistics.mean(wait_times)
    # Pretty print the results
    minutes, frac_minutes = divmod(average_wait, 2)
    seconds = frac_minutes * 60
    return round(minutes), round(seconds)

#The main method for the systme
def truck_sim(env,charge_station): 
    #Setup the main system
    random.seed(30)
    #Run the simmulation
    print("Here")
    env.process(run_station(env,1,charge_station))
    
    #Run the simmulation for 1 day
    begin_time = env.now
    env.run(until= 1440)
    print(env.now -begin_time,"Wait_Time")
    #View the results
   # mins, secs = get_average_wait_time(wait_times)
    #print("Done!!!")
    print(f"\nThe battery amount is {charge_station.battery_amount}")
    charge_station.done = True

In the simulation I have a couple of subprocesses that simulate a electrical vehicle charging station. 1 of those subprocesses is a solar panel method. Based on the solar strength the battery is charged up by a certain amount. I have this part working and when I change the solar strength I can see a change in the trending (all good here).

I made a debug print statement in the solar panel method that notifies me when the method is executed. When I execute the simulation by itself it prints out multiple charging strings to the console.

I now want to predict with reinforcement learning what the solar strength should be to get a empty battery at the end of the day (this is the part that doesn’t make sense in the real world).

#Create a truck enviroment that the model is going to perform in
class TruckEnv(Env):
    def __init__(self):
        self.action_space = Box(low = np.array([0]), high = np.array([10]))
        self.observation_space = Box(low = np.array([0]), high = np.array([25000]))
        self.state = 0
        self.done = False
        self.running = False
        self.env_sim = simpy.Environment()
        self.charge_station = Charging_station(self.env_sim, 1)

    def step(self,action):            
        if self.running == False:
            #Run the simmulation
            self.env_sim = simpy.Environment()
            self.charge_station.battery_amount = 160000
            self.charge_station.solar_power = action[0]
            self.running = True
            print("new_env")
            
        truck_sim(self.env_sim,self.charge_station)
        reward = 0
        #print(charge_station.solar_power )
        #Check if the battery is near zero
        #print(charge_station.done)
        if self.charge_station.done:
            #print(charge_station.debug,"debug_Amount")
            if self.charge_station.battery_amount < 1000 and self.charge_station.battery_amount >= 0:
                reward = 1
            else:
                reward = -1               
            done = True
        else:
            done = False
            print("not Done")
        info = {}
        return self.charge_station.battery_amount, reward, done, info
    
    def render(self):
        pass

    def reset(self):
        # Reset Charging_station
        self.charge_station.battery_amount = 160000
        self.charge_station.solar_power = 1.1  # reset to a default value
        self.charge_station.done = False
        self.done = False
        self.running = False
        return self.state
    
    #The main method for the systme
    def truck_sim(self): 
        #Setup the main system
        random.seed(30)
        #Run the simmulation
        print("Here")
        env.process(run_station(self.env,1))
        
        #Run the simmulation for 1 day
        begin_time = self.env.now
        env.run(until= 1440)
        print(self.env.now -begin_time,"Wait_Time")
        #View the results
        mins, secs = get_average_wait_time(wait_times)
        #print("Done!!!")
        print(f"\nThe battery amount is {self.charge_station.battery_amount}")
        self.charge_station.done = True

    
    
env = TruckEnv()
log_path = os.path.join('.','logs')
model = PPO('MlpPolicy', env, verbose = 1, tensorboard_log = log_path)

episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

y = np.array(battery_trend)
x = np.array(time_stamp)
slope, intercept = np.polyfit(x, y, 1)
plt.plot(x, y)

When I implement it in the RL environment and run it. The charging debug statement is only executed once, and that is what my problem is.

I have tried to alter def step(self,action) in a way where it only returns something when it is done with the simmulation, but this results in a chrash.

To summarise I have no clue why the simmulation is only executed once when it is running inside the RL enviroment.

I hope some of you can give me a push in the right direction.
Note that this is a complete new world for me, so maybe I did something stupid .

Thank you in advance

Alex

1

There are 1 answers

1
tsrufod On

Your problem is likely related to the fact that the SimPy environment has global state that isn't being properly reset between episodes and steps in the reinforcement learning (RL) environment. When you run the simulation outside of the RL environment, it's getting a fresh start every time. But within the RL environment, global states are being maintained, causing unexpected behaviors.

Here are a few suggestions on how to fix this:

Isolation of SimPy State: Ensure that any global state maintained by the SimPy simulation (like battery_trend, wait_times, time_stamp, charge_station, etc.) is initialized and isolated within the RL environment. This way, the state is fresh for every episode.

Resetting SimPy State: In the reset method of TruckEnv, ensure that all relevant states and variables associated with the SimPy simulation are reinitialized. This will guarantee that every episode starts from a clean slate.

Avoid Global Variables: The use of global variables can be problematic, especially when integrating with an RL environment. It's preferable to wrap the simulation into a class or some structure where all variables are encapsulated, avoiding unintended side effects.

To address these, here are modifications to your code:

Convert Charging_station and truck_sim to be members of TruckEnv:

class TruckEnv(Env):
def __init__(self):
    self.action_space = Box(low = np.array([0]), high = np.array([10]))
    self.observation_space = Box(low = np.array([0]), high = np.array([25000]))
    self.state = 0
    self.done = False

    # Initialize Charging_station within TruckEnv
    self.env_sim = simpy.Environment()
    self.charge_station = Charging_station(self.env_sim, 1)

# ... rest of your methods

def reset(self):
    # Reset Charging_station
    self.charge_station.battery_amount = 160000
    self.charge_station.solar_power = 1.1  # reset to a default value
    self.charge_station.done = False
    self.done = False
    return self.state

# Ensure truck_sim uses the TruckEnv's charge_station and env_sim
def truck_sim(self):
    # Your truck_sim logic here
    # Remember to replace every `charge_station` with `self.charge_station`
    # and `env_sim` with `self.env_sim`

Modify the step method to use self.charge_station and self.env_sim:

 def step(self, action):
# Reset your environment state if necessary
self.charge_station.battery_amount = 160000
self.charge_station.solar_power = action[0]
self.truck_sim()

Ensure that all other methods and functions which depend on global variables (like truck_sim, run_station, etc.) are modified to avoid using global state, or they are converted into methods of TruckEnv that operate on the class's attributes. By reorganizing the code this way, you're ensuring that the SimPy simulation and the RL environment are properly isolated and the global state doesn't interfere between episodes or steps.