Here is my code, the key point is to share the tweets
variable in multiprocessing:
import urllib,urllib2,json,re,datetime,sys,cookielib
from .. import models
from pyquery import PyQuery
from multiprocessing import Process, Pool, Queue, Manager
import os, time, random
def crawl_and_write(q,tweets):
tweets = tweets
for tweetHTML in tweets:
tweetPQ = PyQuery(tweetHTML)
tweet = models.Tweet()
usernameTweet = tweetPQ("span:first.username.u-dir b").text();
txt = re.sub(r"\s+", " ", tweetPQ("p.js-tweet-text").text().replace('# ', '#').replace('@ ', '@'));
retweets = int(tweetPQ("span.ProfileTweet-action--retweet span.ProfileTweet-actionCount").attr(
"data-tweet-stat-count").replace(",", ""));
favorites = int(tweetPQ("span.ProfileTweet-action--favorite span.ProfileTweet-actionCount").attr(
"data-tweet-stat-count").replace(",", ""));
dateSec = int(tweetPQ("small.time span.js-short-timestamp").attr("data-time"));
id = tweetPQ.attr("data-tweet-id");
permalink = tweetPQ.attr("data-permalink-path");
geo = ''
geoSpan = tweetPQ('span.Tweet-geo')
if len(geoSpan) > 0:
geo = geoSpan.attr('title')
tweet.id = id
tweet.permalink = 'https://twitter.com' + permalink
tweet.username = usernameTweet
tweet.text = txt
tweet.date = datetime.datetime.fromtimestamp(dateSec)
tweet.retweets = retweets
tweet.favorites = favorites
tweet.mentions = " ".join(re.compile('(@\\w*)').findall(tweet.text))
tweet.hashtags = " ".join(re.compile('(#\\w*)').findall(tweet.text))
tweet.geo = geo
# add tweet into queue
q.put(tweet)
def read_result(q):
while True:
# get value from the queue
tweet = q.get(True)
results.append(tweet)
resultsAux.append(tweet)
if receiveBuffer and len(resultsAux) >= bufferLength:
receiveBuffer(resultsAux)
length += len(resultsAux)
# tell the finish percerntage of the process
percent = length / float(tweetCriteria.maxTweets) * 100
resultsAux = []
print '%.2f %% of tweets required was finished, we have %s tweets now' % (percent, length)
if tweetCriteria.maxTweets > 0 and len(results) >= tweetCriteria.maxTweets:
active = False
def getTweets(tweetCriteria, receiveBuffer=None, bufferLength=100, proxy=None):
refreshCursor = ''
length = 0
results = []
resultsAux = []
cookieJar = cookielib.CookieJar()
if hasattr(tweetCriteria, 'username') and (tweetCriteria.username.startswith("\'") or tweetCriteria.username.startswith("\"")) and (tweetCriteria.username.endswith("\'") or tweetCriteria.username.endswith("\"")):
tweetCriteria.username = tweetCriteria.username[1:-1]
active = True
manager = Manager()
while active:
json = TweetManager.getJsonReponse(tweetCriteria, refreshCursor, cookieJar, proxy)
if len(json['items_html'].strip()) == 0:
break
refreshCursor = json['min_position']
tweets = PyQuery(json['items_html'])('div.js-stream-tweet')
tweets = manager(tweets)
if len(tweets) == 0:
break
pw = Process(target = crawl_and_write, args=(q,tweets))
pr = Process(target = read_result, args=(q,))
# start pw
pw.start()
# start pr
pr.start()
# wait for the pw end
pw.join()
However, the error was shown in the code tweets = manager(tweets)
as Pickle.PicklingError:Can't pickle<class 'pyquery.pyquery.NoDefault'>:attribute lookup pyqery.pyquery.NoDefault failed
.
I guess it is because of the pyquery type but still have no idea how to fix it?
Any thoughs would be helpful.
Manager use pickle to serialize object into binary stream, and then transform it through network. So as not every object can be serialized, may be you cannot fix this problem.