iterate over pyspark Dataframe and then for each row interact with mongoDB

7.6k views Asked by At

I have a pyspark Dataframe and now i want to iterate over each row and insert/update to mongoDB collection.

 #Did every required imports 
 #dataframe
 +---+----+
 |age|name|
 +---+----+
 | 30|   c|
 |  5|   e|
 |  6|   f|
 +---+----+
  db = mongodbclient['mydatabase']
  collection = db['mycollection']
  #created below function to insert/update
  def customFunction(row):
      key = {'name':row.name}
      data = dict(zip(columns,[row.x for x in columns]))
      collection.update(key, data, {upsert:true})
      #return a_flag #commented it as of now, a_flag can be 0 or 1

If a name exist in the mongoDB collection 'mycollection' it should update that row/record else insert that new record.

i am getting following error when tried to map this function over spark-dataframe

 result = my_dataframe.rdd.map(customFunction)
 #.....TypeError: can't pickle _thread.lock objects....
 #AttributeError: 'TypeError' object has no attribute 'message'

Can anybody please figure out 'what is wrong here in that function and/or anywhere else' or please suggest if any other alternative is there to this type of task.

Basically iterate each row(without a collect call is that even possible??)

And, on each row apply a function to run outside-spark work.

Please suggest, Thanks in Advance..:)

My data in mongoDB

name  age
 a    1
 b    2
 c    3 #new update should make age as 30 and 2 more new recs should inserted
2

There are 2 answers

0
Alper t. Turker On

It looks like connection object cannot be pickled. I'd use foreachPartition:

def customFunction(rows):
    db = mongodbclient['mydatabase']
    collection = db['mycollection']

    for row in rows:
        key = {'name':row.name}
        data = dict(zip(columns,[row.x for x in columns]))
        collection.update(key, data, {upsert:true})

my_dataframe.rdd.foreachPartition(customFunction)

but keep in mind that fatal failure might leave the database in inconsistent state.

0
Mariusz On

If you have 500k records to be upserted in MongoDB the bulk mode will be probably more efficient way to handle this. Executing requests inside mongoDB will require much more power compared to what you actually do in spark (just creating requests) and even executing this in parallel may cause instabilities on mongo side (and be slower than "iterative" approach).

You can try the following code. It does not use collect(), so it's memory efficient on driver:

bulk = collection.initialize_unordered_bulk_op()
for row in rdd.toLocalIterator():
    key = {'name':row.name}
    data = dict(zip(columns,[row.x for x in columns]))
    bulk.update(key, data, {upsert:true})

print(bulk.execute())