Recently, I started working on storm and being more comfortable with python, I decided to use streamparse for working with storm. I am planning to accept a twitter stream in spout and perform some computations in bolt. But I cannot figure out how I would code that in spout. I have gone through various streamparse tutorials but they all show spout emitting tuples from static list and do not have stream like twitter streaming api provides. This is my code for storm:
class WordSpout(Spout):
def initialize(self, stormconf, context):
self.words = itertools.cycle(['dog', 'cat','zebra', 'elephant'])
def next_tuple(self):
word = next(self.words)
self.emit([word])
This is my code for tweepy:
class listener(StreamListener):
def on_status(self,status):
print(status.text)
print "--------------------------------"
return(True)
def on_error(self, status):
print "error"
def on_connect(self):
print "CONNECTED"
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
twitterStream = Stream(auth, listener())
twitterStream.filter(track=["california"])
How should I integrate both these codes?
To do this, I setup a kafka queue, by which the tweepy listener wrote the status.text into the queue using pykafka. The spout then constantly read data from the queue to perform the analytics. My code looks a bit like this:
listener.py:
Spout File: