I'm trying to read raw data from a zipfile. The structure of that file is:
- zipfile
- data
- Spectral0.data
- Spectral1.data
- Spectral[...].data
- Spectral300.data
- Header
- data
The goal is to read all Spectral[...].data into an 2D numpy array (whereas Spectral0.data would be the first column). The single threaded approach takes a lot of time since reading one .data file takes some seconds.
import zipfile
import numpy as np
spectralData = np.zeros(shape = (dimY, dimX), dtype=np.int16)
archive = zipfile.ZipFile(path, 'r')
for file in range(fileCount):
spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data')
spectralData[:,file] = np.frombuffer(spectralDataRaw, np.short)
And I thought using multiprocessing
could speed up the process. So I read some tutorials how to set up a multiprocessing procedure. This is what I came up with:
import zipfile
import numpy as np
import multiprocessing
from joblib import Parallel, delayed
archive = zipfile.ZipFile(path, 'r')
numCores = multiprocessing.cpu_count()
def testMult(file):
spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data')
return np.frombuffer(spectralDataRaw, np.short)
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
Using this approach I get the following error:
numCores = multiprocessing.cpu_count()
def testMult(file):
spectralDataRaw = archive.read('data/Spectral' + str(file) + '.data')
return np.frombuffer(spectralDataRaw, np.short)
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
_RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\queues.py", line 153, in _feed
obj_ = dumps(obj, reducers=reducers)
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 271, in dumps
dump(obj, buf, reducers=reducers, protocol=protocol)
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 264, in dump
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\externals\cloudpickle\cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_io.BufferedReader' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<ipython-input-94-c4b007eea8e2>", line 8, in <module>
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\parallel.py", line 1061, in __call__
self.retrieve()
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\parallel.py", line 940, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\site-packages\joblib\_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\concurrent\futures\_base.py", line 432, in result
return self.__get_result()
File "C:\ProgramData\Anaconda3\envs\devEnv2\lib\concurrent\futures\_base.py", line 388, in __get_result
raise self._exception
PicklingError: Could not pickle the task to send it to the workers.
My question is, how do I set up this parallelization correctly. I've read that zipfile is not thread safe and therefore I might need a different approach to read the zip content into memory(RAM). I would rather not read the whole zipfile into memory since the file can be quite large.
I thought about using from numba import njit, prange
but there the problem occurs that zip is not supported by numba
.
What else could I do to make this work?