How to accept twitter stream using tweepy in streamparse spout and pass the tweets to bolt?

1k views Asked by At

Recently, I started working on storm and being more comfortable with python, I decided to use streamparse for working with storm. I am planning to accept a twitter stream in spout and perform some computations in bolt. But I cannot figure out how I would code that in spout. I have gone through various streamparse tutorials but they all show spout emitting tuples from static list and do not have stream like twitter streaming api provides. This is my code for storm:

class WordSpout(Spout):

def initialize(self, stormconf, context):
    self.words = itertools.cycle(['dog', 'cat','zebra', 'elephant'])
def next_tuple(self):
    word = next(self.words)
    self.emit([word])

This is my code for tweepy:

class listener(StreamListener):

def on_status(self,status):
    print(status.text)
    print "--------------------------------"
    return(True)

def on_error(self, status):
    print "error"
def on_connect(self):
    print "CONNECTED"


auth = OAuthHandler(ckey, csecret)


auth.set_access_token(atoken, asecret)

twitterStream = Stream(auth, listener())
twitterStream.filter(track=["california"])

How should I integrate both these codes?

1

There are 1 answers

0
CoolestNerdIII On

To do this, I setup a kafka queue, by which the tweepy listener wrote the status.text into the queue using pykafka. The spout then constantly read data from the queue to perform the analytics. My code looks a bit like this:

listener.py:

class MyStreamListener(tweepy.StreamListener):

  def on_status(self, status):
    # print(status.text)
    client = KafkaClient(hosts='127.0.0.1:9092')

    topic = client.topics[str('tweets')]
    with topic.get_producer(delivery_reports=False) as producer:

        # print status.text
        sentence = status.text
        for word in sentence.split(" "):
            if word is None:
                continue
            try:
                word = str(word)
                producer.produce(word)
            except:
                continue

  def on_error(self, status_code):
    if status_code == 420:  # exceed rate limit
        return False
    else:
        print("Failing with status code " + str(status_code))
        return False

auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

myStream.filter(track=['is'])

Spout File:

from streamparse.spout import Spout
from pykafka import KafkaClient

class TweetSpout(Spout):
    words = []

    def initialize(self, stormconf, context):
        client = KafkaClient(hosts='127.0.0.1:9092')

        self.topic = client.topics[str('tweets')]

    def next_tuple(self):
        consumer = self.topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                self.emit([message.value])
        else:
            self.emit()