my problem seem to be similar to This Thread however, while I think I am following the advised method, I still get a PicklingError. When I run my process locally without sending to an IPython Cluster Engine the function works fine.
I am using zipline with IPyhon's notebook, so I first create a class based on zipline.TradingAlgorithm
Cell [ 1 ]
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
Cell [ 2 ]
%%px --local # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as np
class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods
def initialize(self):
self.valueHistory = None
pass
def handle_data(self, data):
for security in data.keys():
## Just randomly buy/sell/hold for each security
coinflip = np.random.random()
if coinflip < .25:
self.order(security,100)
elif coinflip > .75:
self.order(security,-100)
pass
Cell [ 3 ]
from zipline.utils.factory import load_from_yahoo
start = '2013-04-01'
end = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)
agentList = []
for i in range(3):
agentList.append(Agent())
def testSystem(agent,data):
results = agent.run(data) #-- This is how the zipline based class is executed
#-- next I'm just storing the final value of the test so I can plot later
agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
return agent
for i in range(10):
tasks = []
for agent in agentList:
#agent = testSystem(agent,data) ## On its own, this works!
#-- To Test, uncomment the above line and comment out the next two
tasks.append(lview.apply_async(testSystem,agent,data))
agentList = [ar.get() for ar in tasks]
for agent in agentList:
plot(agent.valueHistory)
Here is the Error produced:
PicklingError Traceback (most recent call last)/Library/Python/2.7/site-packages/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold)
100 buffers.extend(_extract_buffers(cobj, buffer_threshold))
101
--> 102 buffers.insert(0, pickle.dumps(cobj,-1))
103 return buffers
104
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
If I override the run() method from zipline.TradingAlgorithm with something like:
def run(self, data):
return 1
Trying something like this...
def run(self, data):
return zpl.TradingAlgorithm.run(self,data)
results in the same PicklingError.
then the passing off to the engines works, but obviously the guts of the test are not performed. As run is a method internal to zipline.TradingAlgorithm and I don't know everything that it does, how would I make sure it is passed through?
It looks like the zipline TradingAlgorithm object is not pickleable after it has been run:
But this suggests to me that you should be creating the Agents on the engines, and only passing data / results back and forth (ideally, not passing data across at all, or at most once).
Minimizing data transfers might look something like this:
define the class:
load the data
and run the code:
And plot the results, never fetching the agents themselves
This is not quite the same code you shared - three agents bouncing back and forth on all your engines, whereas this has on agent per engine. I don't know enough about zipline to say whether that's useful to you or not.