GEtting Error in Pyspark communication through RPC in RabbitMq

269 views Asked by At

I am newly using RabbitMq for pyspark communication from remote pc through RPC.For testing purpose i have developed a test code which is giving me the error enter image description here

I have followed RabbitMq doc tutorial for implementing RPC over pyspark

Here is my spark RPC server code

import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re



connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)

channel=connection.channel()

channel.queue_declare(queue='rpc_queue')







spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
    \
    .appName("TestApp").\
    enableHiveSupport().getOrCreate()

print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2  WHERE lflow1.Did = lesflow2.MID"



def queryBuilder(sqlval):
    print("printing",sqlval)
    df=spark.sql(sqlval)
    print("printing data frame table")
    df.show()

    resultlist = df.toJSON().collect()
    dumpdata = re.sub(r"\'", "", str(resultlist))
    jsondata = json.dumps(dumpdata)
    #print(jsondata)
    return jsondata


def on_request(ch,method,props, body):
    n=body
    print("printing request body ",n)
    response=queryBuilder(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=response
                     )
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")

channel.start_consuming()

master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()

and my following RPC client code for remote pyspark application is

import pika
import uuid

class SparkRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, querymsg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=querymsg)
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2  WHERE lflow1.Did = lesflow2.MID"


print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)

My server has already received the request string from client and print it but it could not works on my querybuild() function which process the sqlstring and return json data. More over i have requested multiple times and it seems thats individual request has queued in rpc queue but not cleared out.Because if i run only server script i am getting same error. May be i am missing something here can anyone help me to figure it out. i just want to return json data to client Thanks in advance Kalyan

1

There are 1 answers

0
Alper t. Turker On BEST ANSWER

You're passing incompatible type (looks like either bytes or bytearray) where str is expected.

You should decode the content to string first.

def queryBuilder(sqlval, enc):
    ...
    df = spark.sql(sqlval.decode(enc))
    df.show()
    ...