Error occurring when running apache nifi processor integration test

66 views Asked by At

I've created a new apache NIFI processor for AWS Timestream, I know a timestream processor already exists but it does not currently support MultiMeasure values. As part of this I've attempted to upgrade the nifi AWS processor structure to aws V2 API but I've hit an issue that I believe to be an AWS version conflict (Although I could be wrong). I'm hoping someone can assist me with this issue.

The error occurring is:

java.lang.IllegalArgumentException: Invalid option: software.amazon.awssdk.awscore.client.config.AwsClientOption@71168c46. Required value of type interface software.amazon.awssdk.identity.spi.IdentityProvider, but was class org.apache.nifi.processors.aws.credentials.provider.PropertiesCredentialsProvider.

The integration test I'm running is:

    private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
    private final String DB_NAME = "example_db";
    private final String TBL_NAME = "example_tbl";
    private final String REGION = Region.EU_WEST_1.id();

    @Test
    public void testSimplePut() throws IOException {
        final TestRunner runner = TestRunners.newTestRunner(new PutTimestream());
        runner.setProperty(PutTimestream.TBL_NAME, TBL_NAME);
        runner.setProperty(PutTimestream.DB_NAME, DB_NAME);
        runner.setProperty(PutTimestream.REGION, REGION);
        runner.setProperty(PutTimestream.CREDENTIALS_FILE, CREDENTIALS_FILE);

        final Map<String, String> attrs = new HashMap<>();
        attrs.put("filename", "timestream_valid.json");
        runner.enqueue(Paths.get("src/test/resources/timestream_valid.json"), attrs);
        runner.run(1);

        runner.assertAllFlowFilesTransferred(PutTimestream.REL_SUCCESS, 1);
    }

This is my AbstractTimestreamProcessor:

public abstract class AbstractTimestreamProcessor extends AbstractAwsProcessor<TimestreamWriteClient, TimestreamWriteClientBuilder> {
    @Override
    protected TimestreamWriteClientBuilder createClientBuilder(ProcessContext processContext) {
        return TimestreamWriteClient.builder();
    }
}

Then this is the PutTimestream processor:

@Tags( { "amazon", "aws", "timestream", "put" } )
@CapabilityDescription( "AWS Timestream Put Processor." )
public class PutTimestream extends AbstractTimestreamProcessor {

    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
            .name("Database Name")
            .displayName("Database Name")
            .description("Specifies the name of the Amazon Timestream Database")
            .required(true)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor TBL_NAME = new PropertyDescriptor.Builder()
            .name("Table Name")
            .displayName("Table Name")
            .description("Specifies the name of the Amazon Timestream Table")
            .required(true)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

//  private Set<Relationship> relationships;
    WriteRecordsResponse writeRecordsResponse;
    Map<String, String> attributes;

    public static final List<PropertyDescriptor> properties = Collections
            .unmodifiableList(Arrays.asList(AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, ACCESS_KEY, SECRET_KEY,
                    CREDENTIALS_FILE, TIMEOUT, DB_NAME, TBL_NAME));

    private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
    private TimestreamRecordConverter timestreamRecordConverter;
    
    @Override
    protected void init(ProcessorInitializationContext context) {
        timestreamRecordConverter = new TimestreamRecordConverter(getLogger());
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    private Map<String, String> getAttributes(FlowFile flowFile, WriteRecordsResponse writeRecordsResponse) {
        attributes = new HashMap();
        attributes.putAll(flowFile.getAttributes());
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        attributes.put("timestream.insert.status",
                String.valueOf(writeRecordsResponse.responseMetadata()));
        return attributes;

    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final long startNanos = System.nanoTime();

        String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String tblName = context.getProperty(TBL_NAME).evaluateAttributeExpressions(flowFile).getValue();

        final List<Record> records;
        try {
            final byte[] content = new byte[(int) flowFile.getSize()];
            session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));
            records = timestreamRecordConverter.convertJsonToTimestreamRecords(new String(content));

            WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder()
                    .databaseName(dbName)
                    .tableName(tblName)
                    .records(records)
                    .build();

            writeRecordsResponse = getClient(context).writeRecords(writeRecordsRequest);
            flowFile = session.putAllAttributes(flowFile, getAttributes(flowFile, writeRecordsResponse));
            session.transfer(flowFile, REL_SUCCESS);

            long transmissionMillis = java.util.concurrent.TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            session.getProvenanceReporter().send(flowFile, tblName, transmissionMillis);
        } catch (Exception e) {
            StringWriter sw = new StringWriter();
            e.printStackTrace(new PrintWriter(sw));
            getLogger().info("Error processing timestream record. Check stacktrace " + sw);
            session.transfer(flowFile, REL_FAILURE);
        }

    }
}

The record gets processed successfully but when I call getClient it seems to fail on the line:

this.finalizeChildConfiguration(configuration)

Which is in the SdkDefaultClientBuilder class. Any help would be much appreaciated with this issue as there's very little resources to go off for this.

Thanks

1

There are 1 answers

0
Ciaran George On

It turns out some AWS V2 files made their way into apache NIFI V1.23.2 before they had upgraded to nifi 2.0.0. These files caused conflicts with the V1 implementation throughout the rest of the code. I've now upgraded to nifi 2.0.0-M1 and have been able to get it running.