Running python 2.7, dev environment is OS X but production is linux.
I've got some code I'm trying to speed up with multiprocessing, and I got it working fine and observed the desired theoretical speedups. I then went to run the test suite on it, and after a few tests, started getting the above OSError on all subsequent tests. If I run the tests from the point where I start getting the error, some number of them pass and then I get that error again. Which is fairly logical, just a sanity check.
To try to figure out what was going wrong, I replaced __builtin__
's open
and close
calls with ones that print (following advice in https://stackoverflow.com/a/2023709/3543200)
import __builtin__
import traceback
import sys
openfiles = set()
oldfile = __builtin__.file
class newfile(oldfile):
def __init__(self, *args):
self.x = args[0]
print "### OPENING %s ###" % str(self.x)
traceback.print_stack(limit=20)
print
sys.stdout.flush()
oldfile.__init__(self, *args)
openfiles.add(self)
def close(self):
print "### CLOSING %s ###" % str(self.x)
oldfile.close(self)
openfiles.remove(self)
oldopen = __builtin__.open
def newopen(*args):
return newfile(*args)
__builtin__.file = newfile
__builtin__.open = newopen
and what did I see but hundreds and hundreds of lines of ### OPENING /dev/null ###
.
When I do the same thing for the code that accomplishes the same task but without the multiprocessing, I get no such file connections, so it stands to reason that the multiprocessing is at fault here. This is supported by the traceback
call, which suggests that the culprit is here:
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 250, in _bootstrap
sys.stdin = open(os.devnull)
posting the code of the multiprocessing::process.py::_bootstrap
function here, just in case it's helpful:
def _bootstrap(self):
from . import util
global _current_process
try:
self._children = set()
self._counter = itertools.count(1)
try:
sys.stdin.close()
sys.stdin = open(os.devnull)
except (OSError, ValueError):
pass
_current_process = self
util._finalizer_registry.clear()
util._run_after_forkers()
util.info('child process calling self.run()')
try:
self.run()
exitcode = 0
finally:
util._exit_function()
except SystemExit, e:
if not e.args:
exitcode = 1
elif isinstance(e.args[0], int):
exitcode = e.args[0]
else:
sys.stderr.write(str(e.args[0]) + '\n')
sys.stderr.flush()
exitcode = 1
except:
exitcode = 1
import traceback
sys.stderr.write('Process %s:\n' % self.name)
sys.stderr.flush()
traceback.print_exc()
util.info('process exiting with exitcode %d' % exitcode)
return exitcode
And, for what it's worth, I'm invoking multiprocessing with code that looks like this:
num_cpus = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=num_cpus)
num_per_job = len(input_data) / num_cpus + 1
chunks = [input_data[num_per_job*i:num_per_job*(i+1)] for i in range(num_cpus)]
# TODO: ^^^ make this a list of generators
data = pool.map(get_output_from_input, chunks)
return itertools.chain.from_iterable(data)
So, question: is this a bug in multiprocessing
, or am I doing something terribly wrong? I would really welcome the excuse to spend the next week digging through the multiprocessing
code and figuring out how it works, but I would have trouble convincing the higher-ups that this is a valid use of my time. Appreciate anyone with experience helping out!
You need to close the pools to terminate the child processes and free the pipes used to communicate with them. Do it with
contextlib.closing
so that you don't have to worry about exceptions skipping the close.closing
will close the pool at the end of thewith
block, including when it is exited with an exception. So, you never need to call close yourself.Also,
Pool.map
chunks its requests so you don't have to do it yourself. I removed that bit of code but theget_output_from_input
signature may not be right (it will be called once per input item, not once with a list of input items) so you may need to do some fixups.