I want to use Flume for collecting logs in Python scripts, so I follow user guide to config Flume with netcat source, then I use telnet and nc
for test, it works well.
My config code:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Then I use Python to connect Flume, and send some words to it like this:
import socket
def netcat(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((hostname, port))
s.send("test words 1\n")
s.send("test words 2\n")
s.send("test words 3\n")
s.send("test words 4\n")
s.shutdown(socket.SHUT_WR)
s.close()
if _name_ == "_main_":
netcat("127.0.0.1",44444)
problem happens, flume can only receive 2 rows. flume logs:
2016-12-28 16:44:32,248 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
2016-12-28 16:44:41,814 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 31 test words 1 }
2016-12-28 16:44:41,815 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 65 73 74 20 77 6F 72 64 73 20 32 test words 2 }
I got same result on both Ubuntu&Java1.8 and Centos&Java 1.7 and the same result with telnet model in Python.
Is there anything wrong with config or Python scripts? Or anyone have advice for this case?
The reason this happens is because you're not waiting for the responses to come back in. By default, Flume's netcat source will send an "OK" message back for every event. You're killing the connection before that response can be sent, which is causing the processing of further messages to fail (as the pipe has been broken from the client end).
To fix this, you need the following change to your flume.conf:
This eliminates the requirement for an "OK" to be sent and therefore stops the failure.
Alternatively, you can change your Python to wait each time for the "OK" message to be sent before closing the connection. Artificially, adding a sleep statement in should also solve the problem, although you'd be making an assumption about how long it might take to process your message. Fine normally, but there could be other circumstances that cause processing to be delayed.