I have a code that is returning a list of directions for a csv with the latitude and longitude of several vehicles and their stops from OpenRouteService (ORS), an open source API. The data is in the format "EqupimentID,", "Latitude", "Longitude". I'm doing this using an api call through the ORS module. This should return a geojson of the directions, which is converted to an ESRI shapefile:
import pandas as pd
import openrouteservice as ors
import fiona
from openrouteservice import exceptions
import requests
import time
csv_file = '24hourtrucks.csv'
api_key = 'your key here'
df = pd.read_csv(csv_file)
grouped_df = df.groupby('equipmentID')
def get_directions(group):
coords = list(zip(group['longitude'], group['latitude']))
client = ors.Client(key=api_key)
rate_limit = 40
counter = 0
while counter < rate_limit:
try:
# Calculate directions using OpenRouteService
directions = client.directions(
coordinates=coords,
profile='driving-car',
format='geojson',
)
break
except exceptions.ApiError as e:
# Handle cases where a route is not available
print(f"Error calculating route: {e}")
time.sleep(60)
return None
counter += 1
directions_df = grouped_df.apply(get_directions)
polylines = [directions['features'][0]['geometry']['coordinates'] for directions in directions_df]
equipment_ids = [str(i) for i in range(len(polylines))]
schema = {
'geometry': 'LineString',
'properties': {
'equipmentID': 'str',
},
}
with fiona.open('output.shp', 'w', 'ESRI Shapefile', schema) as shp:
features = [{
'geometry': {
'type': 'LineString',
'coordinates': polyline,
},
'properties': {
'equipmentID': equipment_id,
'directions': directions_df.iloc[i], # Add the associated set of directions
},
} for i, (polyline, equipment_id) in enumerate(zip(polylines, equipment_ids))]
shp.writerecords(features)
The ORS API allows 40 calls before enforcing a minute cooldown. I set a counter that counts 40 requests and then sleeps for 1 minute, but I'm still coming up on the API rate limit, which causes the tool to fail.
I also tried limiting the speed of the tool to force it to make less than 40 requests per minute in this way:
def get_directions(group):
coords = list(zip(group['longitude'], group['latitude']))
client = ors.Client(key=api_key)
try:
# Calculate directions using OpenRouteService
directions = client.directions(
coordinates=coords,
profile='driving-car',
format='geojson',
)
return directions
except exceptions.ApiError as e:
# Handle cases where a route is not available
print(f"Error calculating route: {e}")
return None, None
#pause for api limit
iteration = len(group)
if iteration % 2 == 0:
time.sleep(1)
But again, it hit the api limit. I also can't seem to call the api using the requests module, to be able to return status codes:
def get_directions(group):
coords = list(zip(group['longitude'], group['latitude']))[0]
url = 'https://api.openrouteservice.org/v2/directions/driving-car/geojson'
headers = {'Authorization': api_key}
params = {
'coordinates': coords,
}
try:
# Send request using requests library and proxy
response = requests.post(url, headers=headers, params=params, proxies=proxies)
response.raise_for_status()
directions = response.json()
except requests.exceptions.RequestException as e:
print(f"Error calculating route: {e}")
return None
Again receiving errors: "Error calculating route: 500 Server Error: Internal Server Error for url"
Ii probably am missing something pretty basic, but this is my first time using python (or any language for that matter) to access an API endpoint. What can I do to fix this error?