I'm developing some kind of error handling for flow files for NiFi, e.g. a database sub-system refuses to write the data from a flow file, because the data is not as expected, because the source system of this data is missing some master data.
So this error handling writes the data into a MongoDB with more information what went wrong.
One of those 'more information' is some kind of stacktrace for this flow file, meaning the data lineage. For this purpose I wrote an InvokeScriptedProcessor with a Groovy script to achieve this.
Here is the important part of the script:
ArrayList getStacktrace(flowfileUuid){
def lineage = this.provenanceRepository.createLineageQuery(flowfileUuid)
def lineageData = this.provenanceRepository.getLineageData(lineage.id)
if (lineageData.results == null || lineageData.results.nodes.size() == 0){
println "cannot find stacktrace for ${flowfileUuid}."
return []
}
def eventIds = lineageData.results.nodes.findAll {n -> n.type == 'EVENT'}.collect {n -> n.id }.sort()
def provenanceEvents = []
for (eventId in eventIds){
provenanceEvents << this.provenanceRepository.getProvenanceEvent(eventId).provenanceEvent.componentName
}
this.provenanceRepository.deleteLineageQuery(lineage.id)
return provenanceEvents
}
For createLineageQuery
I'm POSTING
to the nifi-api with /nifi-api/provenance/lineage
adding the uuid
of the flow file in the body. The result is, among others, the ID
of the query. I'm using this ID
to getLineageData
; there is also a property finished
and I'm waiting until the query is finished.
With this lineage data I getProvenanceEvent
data and write the name of the component (processor) into an array.
After that I deleteLineageQuery
as stated in the documentation.
So this would be my stack trace.
The problem now is, the the lineage data is empty when the flow file first hits this InvokeScriptedProcessor
. I tried a lot of things, like waiting and stuff. Doesn't help.
Now the odd thing is, that the lineage data is not empty, when I replay the flow file for this processor.
So the behavior is not deterministic as I'm expecting it.
Sometimes the lineage data is not empty when I'm processing the flow file for the first time.
I also tried the thing with Fiddler, there it worked all the time.
Is there a problem with my approach?
I'm currently using NiFi 1.6.0
.
EDIT:
I'll take the answer of Bryan as solution.
I'll investigate that as soon as I've got the time, but sounds correct. Nevertheless, I tried my solution with NiFi 1.8.0
and it works as intended. So currently I'm fine with the way I implemented it in the first step, but I'll improve my solution with Bryan's suggestion.
I'm not totally sure what the problem is, but in general provenance data is not really meant to be accessed from a processor, which is why there is no API provided by the session or context that lets you retrieve provenance events, only creating events is allowed.
In order to run a provenance query the events need to be indexed, and there is no guarantees about when the indexing will take place related to when the flow file is being processed. So it is possible the events are not visible yet.
A ReportingTask is the intended way to access provenance events and can be used to push them out of NiFi to some external system for longer term storage.