I have written custom flume sink, named MySink, whose process method is indicated in the first snippet below. I am getting an IllegalStateException as follows (detailed stack trace is available in the 2nd snippet below):
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
QUESTION: I have followed the KafkaSink and similar existing sink implementations in flume code base while writing the process method and I am applying the very same transaction handling logic with those exiting sinks. Could you please tell me what is wrong in my process method here? How can I fix the problem?
PROCESS method (I have marked where the exception is thrown):
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
Event event = null;
try {
LOG.info(getName() + " BEFORE txn.begin()");
//!!!! EXCEPTION IS THROWN in the following LINE !!!!!!
txn.begin();
LOG.info(getName() + " AFTER txn.begin()");
LOG.info(getName() + " BEFORE ch.take()");
event = ch.take();
LOG.info(getName() + " AFTER ch.take()");
if (event == null) {
// No event found, request back-off semantics from the sink runner
LOG.info(getName() + " - EVENT is null! ");
return Status.BACKOFF;
}
Map<String, String> keyValueMapInTheMessage = event.getHeaders();
if (!keyValueMapInTheMessage.isEmpty()) {
mDBWriter.insertDataToDB(keyValueMapInTheMessage);
}
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
if (txn != null) {
txn.commit();
}
} catch (Exception ex) {
String errMsg = getName() + " - Failed to publish events. Exception: ";
LOG.info(errMsg);
status = Status.BACKOFF;
if (txn != null) {
try {
txn.rollback();
} catch (Exception e) {
LOG.info(getName() + " - EVENT: " + EventHelper.dumpEvent(event));
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errMsg, ex);
} finally {
if (txn != null) {
txn.close();
}
}
return status;
}
EXCEPTION STACK:
2016-01-22 14:01:15,440 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: MySink - Failed to publish events. Exception: at com.XYZ.flume.maprdb.MySink.process(MySink.java:116)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at com.XYZ.flume.maprdb.MySink.process(MySink.java:82)
... 3 more
this code causes this problem. when event is null, you just return it.however, the correct way is to commit or rollback.a transaction should go through three stages: begin, commit or rollback, finally close.we can see the following source code to find how it implements.
BasicChannelSemantics:
when currentTransaction is null or its State is close, channel will create a new one, otherwise return the old one. this exception does not happen immediately. when the first time execute the process method, you get a new transaction, but the event is null, you just return and finally close, the close method does not work because of its implement.so the second time execute the process method, you don't get a new transaction, it is the old one.the following code is about how transaction implement.
BasicTransactionSemantics:
when create, the state is new.
when begin, the state must be new, then state become open.
when commit or rollback, the state must be open, then state become complete.
when close, the state must be complete, then state become close.
so when you execute the close method in a right way, the next time you will get a new transaction, otherwise the old one which state must not be new, so you can't execute transaction.begin(), it needs a new one.