I am currently facing this problem:
I have a pandas (or dask) dataframe where one of the columns holds integer values representing the index of a node in a given NetworkX graph. The graph itself is moderately big (~100mb, it's a OSMNX street network). For each of these nodes, I need to do a local Dijkstra search using nx.dijkstra_predecessor_and_distance up to a given distance from the source node, then compare the visited nodes with indices in another list and return the nodes that we met.
First, I defined the function that should take the integer value of each row:
def get_all_walkable_stopID(node_index,walk_network,stops : pd.DataFrame, max_walk=1500):
"""
Given a OSMNX walk network, the index of one node and a pd.DataFrame of stops with coordinates, returns all stops within walkable distance.
"""
pred,distance = nx.dijkstra_predecessor_and_distance(walk_network, node_index, cutoff=max_walk, weight='length')
# is a df with a single column "distance"
result = stops.join(pd.DataFrame.from_dict(distance,orient='index',columns=['distance']),on='nearest_osmnx_node',how='inner')[['distance']]
return result.index.values.tolist() # this is a list of nodes of interest within walkable distance
As you can see, I pass around the walk_network as input of the function. This works well in local pandas, but when I try
walkers_dask.start_nearest_osmnx_node.apply(
get_all_walkable_stopID,
args=(walk_network,stops),
meta=('x','object')
).compute()
I get an awful long cryptic CancelledError (as usual with Dask) telling me that it failed to deserialize:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 0: invalid start byte
I even get this error if I replace the output of the function to a simple 'test' string (and modify the meta accordingly); but if I apply a lambda function with no arguments
walkers_dask.start_nearest_osmnx_node.apply(
lambda x : 'test',
meta=('x','string')
).compute()
the computation succeeds and returns a Series of 'test' as expected, so there must be something wrong with the argument passing, but I am currently at a loss for better ways to distribute this computation with Dask and I have no idea how to fix this.
Any help is appreciated!