trying to collect twitter data using flume and send that to kafka but getting an ERROR regarding batch size

116 views Asked by At

I believe this has something to do with my flume.conf file.

Flume_project.conf

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'TwitterAgent'

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = kafkasink #filesink

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
#TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.keywords = AWS Marketplace, GCP Marketplace, Azure Marketplace

#filesink
# TwitterAgent.sinks.filesink.type = file_roll
# TwitterAgent.sinks.filesink.channel = MemChannel
# TwitterAgent.sinks.filesink.sink.directory = /docker_share/twitter_sink
# TwitterAgent.sinks.filesink.sink.pathManager.extension = out
# TwitterAgent.sinks.filesink.sink.pathManager.prefix = project
# TwitterAgent.sinks.filesink.sink.rollInterval = 3600

#kafkasink
TwitterAgent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
TwitterAgent.sinks.kafkasink.topic = fp_mc
TwitterAgent.sinks.kafkasink.brokerList = fp_mc-kafka-1:9092
TwitterAgent.sinks.kafkasink.channel = MemChannel
TwitterAgent.sinks.kafkasink.batchSize = 1
# add header and text to sink
# TwitterAgent.sinks.kafkasink.sink.serializer = header_and_text
# TwitterAgent.sinks.kafkasink.sink.serializer.appendNewline = true

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

TwitterAgent.sources.Twitter.consumerKey = xxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxx
TwitterAgent.sources.Twitter.accessToken = xxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxx

Here is the error

Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/openjdk-18/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/docker_share/conf:/docker_share/apache-flume-1.11.0-bin/lib/*:/docker_share/apache-flume-1.11.0-bin/lib:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /docker_share/conf/flume_project.conf -n TwitterAgent
23:53:22.599 [main] ERROR org.apache.flume.node.AbstractConfigurationProvider - Source Twitter has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible source and channel settings defined. source's batch size is greater than the channels transaction capacity. Source: Twitter, batch size = 1000, channel MemChannel, transaction capacity = 100
    at org.apache.flume.node.AbstractConfigurationProvider.checkSourceChannelCompatibility(AbstractConfigurationProvider.java:389) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.AbstractConfigurationProvider.getSourceChannels(AbstractConfigurationProvider.java:370) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:332) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:108) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.Application.main(Application.java:491) ~[flume-ng-node-1.11.0.jar:1.11.0]
23:53:22.632 [main] ERROR org.apache.flume.node.AbstractConfigurationProvider - Sink kafkasink has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: kafkasink, batch size = 1000, channel MemChannel, transaction capacity = 100
    at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:406) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:465) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:109) ~[flume-ng-node-1.11.0.jar:1.11.0]
    at org.apache.flume.node.Application.main(Application.java:491) ~[flume-ng-node-1.11.0.jar:1.11.0]

am I missing something?

I am trying to collect twitter data using Flume, send that to Kafka, and analyze that data using elastic search.

I have tried changing the TwitterAgent.channels.MemChannel.transactionCapacity from 100 to 1000 but than get the following error instead

`

Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/openjdk-18/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/docker_share/conf:/docker_share/apache-flume-1.11.0-bin/lib/*:/docker_share/apache-flume-1.11.0-bin/lib:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /docker_share/conf/flume_project.conf -n TwitterAgent
00:08:00.136 [Twitter Stream consumer /  [1][Establishing connection]] ERROR org.apache.flume.source.twitter.TwitterSource - Exception while streaming tweets
twitter4j.TwitterException: 403:The request is understood, but it has been refused. An accompanying error message will explain why. This code is used when requests are being denied due to update limits (https://support.twitter.com/articles/15364-about-twitter-limits-update-api-dm-and-following).
message - You currently have Essential access which includes access to Twitter API v2 endpoints only. If you need access to this endpoint, you’ll need to apply for Elevated access via the Developer Portal. You can learn more here: https://developer.twitter.com/en/docs/twitter-api/getting-started/about-twitter-api#v2-access-leve
code - 453

    at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:170) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientBase.request(HttpClientBase.java:57) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.HttpClientBase.get(HttpClientBase.java:75) ~[twitter4j-core-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl.getSampleStream(TwitterStreamImpl.java:201) ~[twitter4j-stream-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl$4.getStream(TwitterStreamImpl.java:170) ~[twitter4j-stream-4.0.7.jar:4.0.7]
    at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:570) ~[twitter4j-stream-4.0.7.jar:4.0.7]

`

1

There are 1 answers

0
netdevmike On

I got approved for twitter Escalated API and error is gone