I am streaming twitter data using Flume and but getting an error regarding the twitter4j Jar File.
01:42:03.075 [Twitter Stream consumer / [1][Establishing connection]] ERROR org.apache.flume.source.twitter.TwitterSource - Exception while streaming tweets
twitter4j.TwitterException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:185) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientBase.request(HttpClientBase.java:57) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientBase.get(HttpClientBase.java:75) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:201) ~[twitter4j-stream-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:170) ~[twitter4j-stream-4.0.7.jar:4.0.7]
at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:570) ~[twitter4j-stream-4.0.7.jar:4.0.7]
Caused by: java.io.FileNotFoundException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
at jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67) ~[?:?]
at java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Constructor.java:483) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:2048) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection$10.run(HttpURLConnection.java:2043) ~[?:?]
at java.security.AccessController.doPrivileged(AccessController.java:569) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection.getChainedException(HttpURLConnection.java:2042) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1609) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589) ~[?:?]
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:224) ~[?:?]
at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:50) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:149) ~[twitter4j-core-4.0.7.jar:4.0.7]
... 5 more
Caused by: java.io.FileNotFoundException: https://stream.twitter.com/1.1/statuses/sample.json?stall_warnings=true
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1993) ~[?:?]
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589) ~[?:?]
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529) ~[?:?]
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:308) ~[?:?]
at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:35) ~[twitter4j-core-4.0.7.jar:4.0.7]
at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:149) ~[twitter4j-core-4.0.7.jar:4.0.7]
... 5 more
Here is my flume_project.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = kafkasink #filesink
#TwitterAgent.sources.Twitter.type = com.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.keywords = AWS Marketplace, GCP Marketplace, Azure Marketplace
#filesink
# TwitterAgent.sinks.filesink.type = file_roll
# TwitterAgent.sinks.filesink.channel = MemChannel
# TwitterAgent.sinks.filesink.sink.directory = /docker_share/twitter_sink
# TwitterAgent.sinks.filesink.sink.pathManager.extension = out
# TwitterAgent.sinks.filesink.sink.pathManager.prefix = project
# TwitterAgent.sinks.filesink.sink.rollInterval = 3600
#kafkasink
TwitterAgent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
TwitterAgent.sinks.kafkasink.topic = fp_mc
TwitterAgent.sinks.kafkasink.brokerList = fp_mc-kafka-1:9092
TwitterAgent.sinks.kafkasink.channel = MemChannel
TwitterAgent.sinks.kafkasink.batchSize = 1
# add header and text to sink
# TwitterAgent.sinks.kafkasink.sink.serializer = header_and_text
# TwitterAgent.sinks.kafkasink.sink.serializer.appendNewline = true
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000
TwitterAgent.sources.Twitter.consumerKey = xxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxx
TwitterAgent.sources.Twitter.accessToken = xxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxx
I am using apache-flume-1.11.0
Everything is running in Docker
# 2. create topic using kafka container
docker exec -i fp_mc-kafka-1 kafka-topics.sh \
--create \
--zookeeper zookeeper:2181 \
--replication-factor 2 \
--partitions 2 \
--topic fp_mc
including flume in a jdk container. I use the following commands to start flume in the jdk container to collect the twitter data
docker exec -it fp_mc-jdk-1 bash
docker_share/apache-flume-1.11.0-bin/bin/flume-ng agent \
-c /docker_share/conf \
-f /docker_share/conf/flume_project.conf \
-n TwitterAgent \
-C /docker_share/apache-flume-1.11.0-bin/lib \
-Dflume.root.logger=DEBUG,console
Twitter_project.yaml
version: '3.5'
networks:
twitter-demo:
name: twitter-demo-net
driver: bridge
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
networks:
- twitter-demo
kafka:
image: wurstmeister/kafka
environment:
# KAFKA_BROKER_ID: 1
# (Hack for Mac)use this if you want to have docker host node to be used as broadcast ip
HOSTNAME_COMMAND: "/sbin/ip route|awk '/src/ { print $$NF }'"
# Use below for Linux
# HOSTNAME_COMMAND: "ip route get 1.2.3.4 | awk '{print $$7}'"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
# KAFKA_CREATE_TOPICS: "varnish_raw_logs:10:1"
depends_on:
- zookeeper
ports:
- 9092
networks:
- twitter-demo
#jdk container is used to run flume
jdk:
image: openjdk
depends_on:
- kafka
- zookeeper
volumes:
- "./docker_share:/docker_share"
command: 'tail -F anything'
networks:
- twitter-demo
# elk container is used to run logstash, elasticsearch, and kibana
# elk:
# image: sebp/elk
# volumes:
# - "./docker_share:/docker_share"
# depends_on:
# - kafka
# - zookeeper
# ports:
# - "5601:5601"
# - "9200:9200"
# - "5044:5044"
# networks:
# - twitter-demo
elasticsearch:
container_name: elasticsearch
environment:
- discovery.type=single-node
- ELASTIC_USERNAME=elastic
- ELASTIC_PASSWORD=elastic
image: elasticsearch:8.5.2
depends_on:
- kafka
- zookeeper
ports:
- 9200:9200
- 9300:9300
volumes:
- "./docker_share:/docker_share"
networks:
- twitter-demo
logstash:
image: logstash:8.5.2
volumes:
- "./docker_share:/docker_share"
ports:
- "5044:5044"
networks:
- twitter-demo
depends_on:
- elasticsearch
links:
- elasticsearch
kibana:
image: kibana:8.5.2
volumes:
- "./docker_share:/docker_share"
ports:
- "5601:5601"
networks:
- twitter-demo
depends_on:
- elasticsearch
links:
- elasticsearch
I am trying to stream twitter data using flume. I would expect to run the following command to stat kaka consumer message viewer and see tweets
docker exec -i fp_mc-kafka-2 kafka-console-consumer.sh \
--bootstrap-server fp_mc-kafka-1:9092 \
--topic fp_mc
Instead, I receive nothing due to the twitter4j error