Streaming Twitter Data with Flume - Twitter4J error

237 views Asked by At

I am streaming twitter data using Flume and but getting an error regarding the twitter4j Jar File.

01:42:03.075 [Twitter Stream consumer /  [1][Establishing connection]] ERROR org.apache.flume.source.twitter.TwitterSource - Exception while streaming tweets
twitter4j.TwitterException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
    at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:185) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientBase.request(HttpClientBase.java:57) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientBase.get(HttpClientBase.java:75) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:201) ~[twitter4j-stream-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:170) ~[twitter4j-stream-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:570) ~[twitter4j-stream-4.0.7.jar:4.0.7]
Caused by: java.io.FileNotFoundException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
    at jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67) ~[?:?]
    at java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499) ~[?:?]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:483) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:2048) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:2043) ~[?:?]
    at java.security.AccessController.doPrivileged(AccessController.java:569) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:2042) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1609) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589) ~[?:?]
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:224) ~[?:?]
    at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:50) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:149) ~[twitter4j-core-4.0.7.jar:4.0.7]
    ... 5 more
Caused by: java.io.FileNotFoundException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1993) ~[?:?]
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589) ~[?:?]
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529) ~[?:?]
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:308) ~[?:?]
    at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:35) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:149) ~[twitter4j-core-4.0.7.jar:4.0.7]
    ... 5 more

Here is my flume_project.conf

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'TwitterAgent'

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = kafkasink #filesink

#TwitterAgent.sources.Twitter.type = com.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.keywords = AWS Marketplace, GCP Marketplace, Azure Marketplace

#filesink
# TwitterAgent.sinks.filesink.type = file_roll
# TwitterAgent.sinks.filesink.channel = MemChannel
# TwitterAgent.sinks.filesink.sink.directory = /docker_share/twitter_sink
# TwitterAgent.sinks.filesink.sink.pathManager.extension = out
# TwitterAgent.sinks.filesink.sink.pathManager.prefix = project
# TwitterAgent.sinks.filesink.sink.rollInterval = 3600

#kafkasink
TwitterAgent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
TwitterAgent.sinks.kafkasink.topic = fp_mc
TwitterAgent.sinks.kafkasink.brokerList = fp_mc-kafka-1:9092
TwitterAgent.sinks.kafkasink.channel = MemChannel
TwitterAgent.sinks.kafkasink.batchSize = 1
# add header and text to sink
# TwitterAgent.sinks.kafkasink.sink.serializer = header_and_text
# TwitterAgent.sinks.kafkasink.sink.serializer.appendNewline = true

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000

TwitterAgent.sources.Twitter.consumerKey = xxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxx
TwitterAgent.sources.Twitter.accessToken = xxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxx

I am using apache-flume-1.11.0

Everything is running in Docker

# 2. create topic using kafka container
docker exec -i fp_mc-kafka-1 kafka-topics.sh \
    --create \
    --zookeeper zookeeper:2181 \
    --replication-factor 2 \
    --partitions 2 \
    --topic fp_mc

including flume in a jdk container. I use the following commands to start flume in the jdk container to collect the twitter data

docker exec -it fp_mc-jdk-1 bash

docker_share/apache-flume-1.11.0-bin/bin/flume-ng agent \
    -c /docker_share/conf \
    -f /docker_share/conf/flume_project.conf \
    -n TwitterAgent \
    -C /docker_share/apache-flume-1.11.0-bin/lib \
    -Dflume.root.logger=DEBUG,console

Twitter_project.yaml

version: '3.5'

networks:
  twitter-demo:
    name: twitter-demo-net
    driver: bridge

services:
  zookeeper:
    image: zookeeper
    ports:
      - "2181:2181"
    networks:
      - twitter-demo

  kafka:
    image: wurstmeister/kafka
    environment:
      # KAFKA_BROKER_ID: 1
      # (Hack for Mac)use this if you want to have docker host node to be used as broadcast ip
      HOSTNAME_COMMAND: "/sbin/ip route|awk '/src/ { print $$NF }'"
      # Use below for Linux
      # HOSTNAME_COMMAND: "ip route get 1.2.3.4 | awk '{print $$7}'"
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      # KAFKA_CREATE_TOPICS: "varnish_raw_logs:10:1"
    depends_on:
      - zookeeper
    ports:
      - 9092
    networks:
      - twitter-demo
  
  #jdk container is used to run flume
  jdk:
    image: openjdk
    depends_on:
        - kafka
        - zookeeper
    volumes:
        - "./docker_share:/docker_share"
    command: 'tail -F anything'
    networks:
        - twitter-demo
  
#  elk container is used to run logstash, elasticsearch, and kibana
#  elk:
#    image: sebp/elk
#    volumes:
#      - "./docker_share:/docker_share"
#    depends_on:
#      - kafka
#      - zookeeper
#    ports:
#    - "5601:5601"
#    - "9200:9200"
#    - "5044:5044"
#    networks:
#      - twitter-demo

  elasticsearch:
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ELASTIC_USERNAME=elastic
      - ELASTIC_PASSWORD=elastic

    image: elasticsearch:8.5.2
    depends_on:
      - kafka
      - zookeeper
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - "./docker_share:/docker_share"
    networks:
      - twitter-demo

  logstash:
    image: logstash:8.5.2
    volumes:
      - "./docker_share:/docker_share"
    ports:
      - "5044:5044"
    networks:
      - twitter-demo
    depends_on:
      - elasticsearch
    links:
      - elasticsearch

  kibana:
    image: kibana:8.5.2
    volumes:
      - "./docker_share:/docker_share"
    ports:
      - "5601:5601"
    networks:
      - twitter-demo
    depends_on:
      - elasticsearch
    links:
      - elasticsearch

I am trying to stream twitter data using flume. I would expect to run the following command to stat kaka consumer message viewer and see tweets

docker exec -i fp_mc-kafka-2 kafka-console-consumer.sh \
    --bootstrap-server fp_mc-kafka-1:9092 \
    --topic fp_mc

Instead, I receive nothing due to the twitter4j error

0

There are 0 answers