I am newly using RabbitMq for pyspark communication from remote pc through RPC.For testing purpose i have developed a test code which is giving me the error
I have followed RabbitMq doc tutorial for implementing RPC over pyspark
Here is my spark RPC server code
import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re
connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)
channel=connection.channel()
channel.queue_declare(queue='rpc_queue')
spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
\
.appName("TestApp").\
enableHiveSupport().getOrCreate()
print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
def queryBuilder(sqlval):
print("printing",sqlval)
df=spark.sql(sqlval)
print("printing data frame table")
df.show()
resultlist = df.toJSON().collect()
dumpdata = re.sub(r"\'", "", str(resultlist))
jsondata = json.dumps(dumpdata)
#print(jsondata)
return jsondata
def on_request(ch,method,props, body):
n=body
print("printing request body ",n)
response=queryBuilder(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")
channel.start_consuming()
master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()
and my following RPC client code for remote pyspark application is
import pika
import uuid
class SparkRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, querymsg):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=querymsg)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)
My server has already received the request string from client and print it but it could not works on my querybuild() function which process the sqlstring and return json data. More over i have requested multiple times and it seems thats individual request has queued in rpc queue but not cleared out.Because if i run only server script i am getting same error. May be i am missing something here can anyone help me to figure it out. i just want to return json data to client Thanks in advance Kalyan
You're passing incompatible type (looks like either
bytes
orbytearray
) wherestr
is expected.You should
decode
the content to string first.