Why does an optional flume channel cause a non-optional flume channel to have problems?

474 views Asked by At

I have what seems to be a simple Flume configuration that is giving me a lot of problems. Let me first describe the problem and then I'll list the configuration files.

I have 3 servers: Server1, Server2, Server3.

Server1: Netcat source / Syslogtcp source (I tested this on both netcat with no acks and syslogtcp) 2 memory channels 2 Avro sinks (one per channel) Replicating selector with second memory channel optional

Server2,3: Avro source memory channel Kafka sink

In my simulation, Server2 is simulating "production" and thus cannot experience any data loss whereas Server3 is simulating "development" and data loss is fine. My assumption is that using 2 channels and 2 sources will decouple the two servers from each other and if Server3 goes down, it won't affect Sever2 (especially with the optional configuration option!). However, this is not the case. When I run my simulations and terminate Server3 with CTRL-C, I experience slowdown on Server2 and the output to the Kafka sink from Server2 becomes a crawl. When I resume the Flume agent on Server3, everything goes back to normal.

I didn't expect this behavior. What I expected was that because I have two channels and two sinks, if one channel and/or sink goes down, the other channel and/or sink shouldn't have a problem. Is this a limitation of Flume? Is this a limitation of my sources, sinks, or channels? Is there a way to have Flume behave where I use one agent with multiple channels and sinks that are decoupled from each other? I really don't want to have multiple Flume agents on one machine for each "environment" (production and development). Attached are my config files so you can see what I did in a more technical way:

SERVER1 (FIRST TIER AGENT)

#Describe the top level configuration    
agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2

#Describe/configure the source
agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false
#agent.sources.mySource.type = syslogtcp
#agent.sources.mySource.host = 0.0.0.0
#agent.sources.mySource.port = 7103
#agent.sources.mySource.eventSize = 150000
agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2

#Describe/configure the channel
agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200

agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200

#Avro Sink
agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666

agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666

SERVER2 "PROD" FLUME AGENT

#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel

SERVER3 "DEV" FLUME AGENT

#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel 

Thanks for your help!

1

There are 1 answers

3
miller2j On

I would look at tweaking this configuration parameters as it has to do with the memory channel:

agent.channels.defaultChannel.capacity = 5000 agent.channels.defaultChannel.transactionCapacity = 200

Possibly try doubling first, and perform the test again and you should see improvments:

agent.channels.defaultChannel.capacity = 10000 agent.channels.defaultChannel.transactionCapacity = 400

It would be also be good to observe the JVMs of the Apache Flume instances when during your tests