How to save numpy array from PySpark worker to HDFS or shared file system?

4.8k views Asked by At

I would like to save/read numpy arrays from/to worker machines (function) to HDFS efficiently in PySpark. I have two machines A and B. A has the master and worker. B has one worker. For e.g. I would like to achieve something as below:

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("Test")
    sc = SparkContext(conf = conf)
    sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
    P = << LOAD from HDFS or Shared Memory as numpy array>>
    for x in iterator:
        P = P + x

    << SAVE P (numpy array) to HDFS/ shared file system >>

What can be a fast and efficient method for this?

1

There are 1 answers

1
Derlin On

I stumbled upon the same problem. and finally used a workaround using the HdfsCli module and tempfiles with Python3.4.

  1. imports:
from hdfs import InsecureClient
from tempfile import TemporaryFile
  1. create a hdfs client. In most cases, it is better to have a utility function somewhere in your script, like this one:
def get_hdfs_client():
    return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
         root="<hdfs base path>")
  1. load and save your numpy inside a worker function:
hdfs_client = get_hdfs_client()

# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
    tf.write(reader.read())
    tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(),  overwrite=True) 

Notes:

  • the uri used to create the hdfs client begins with http://, because it uses the web interface of the hdfs file system;
  • ensure that the user you pass to the hdfs client has read and write permissions
  • in my experience, the overhead is not significant (at least in term of execution time)
  • the advantage of using tempfiles (vs regular files in /tmp) is that you ensure no garbage files stay in the cluster machines after the script ends, normally or not