I wrote a custom processor similar to this one. In particular, the processor takes an InputDto and returns json. Following guidance in the Q&A, my custom processor has an application.properties file with this content:
spring.cloud.stream.bindings.input.content-type=application/x-java-object;com.company.InputDto
spring.cloud.stream.bindings.output.content-type=application/json
From this Q&A I created a spring.integration.properties
file with this line:
spring.integration.readOnly.headers=contentType
And I have working automated integration tests. So far so good.
I created a stream in the SCDF shell including my processor. time
and httpclient
are working well, so I didn't show detailed args for those. Added newlines here & throughout for readability.
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$..[0:]')\"
| myprocessor
| log"
I have debug logging enabled. httpclient
produces messages with contentType=text/plain
and payload:
payload=[{"name":"first","url":"url1"},{"name":"second","url":"url2"}]
The splitter
creates two messages like this, per logs (as expected):
message: GenericMessage [payload={name=first, url=url1},
headers={...contentType=text/plain, ... }]
message: GenericMessage [payload={name=second, url=url2},
headers={...contentType=text/plain, ... }]
And the custom processor I wrote fails with this exception:
org.springframework.messaging.converter.MessageConversionException: Cannot
convert from [java.util.LinkedHashMap] to [com.company.InputDto] for
GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1,
kafka_offset=xx, sequenceSize=x, correlationId=yy, id=zzz,
kafka_receivedPartitionId=0, contentType=text/plain,
kafka_receivedTopic=test.splitter, timestamp=1503591622851}]
at
org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.10.RELEASE.jar!/:4.3.10.RELEASE]
....
I'm not sure where the LinkedHashMap
is coming from. I tried changing the application.properties file to:
spring.cloud.stream.bindings.input.content-type=application/json;com.company.InputDto
but no help. I also tried adding
--spring.cloud.stream.bindings.output.contentType='application/json'
to the splitter
when creating the stream (following directions in a sample app), but still get the exception.
I've spent hours and just cannot see what I'm missing. Appreciate any assistance.
My custom processor uses Spring Cloud Dalston.SR3. I'm using SCDF Server and shell 1.3.0.M1. Using the Kafka binder.
Update, more info. I looked at the code in the Splitter starter and wrote a little test case to simulate what it's doing.
final ExpressionEvaluatingSplitter splitter = new ExpressionEvaluatingSplitter(
new SpelExpressionParser().parseExpression(
"T(com.jayway.jsonpath.JsonPath).read(payload, '$..[0:]')"));
final PollableChannel channel = new QueueChannel();
splitter.setOutputChannel(channel);
splitter.handleMessage(new GenericMessage<Object>(
"[{\"name\":\"first\",\"url\":\"url1\"},
{\"name\":\"second\",\"url\":\"url2\"}]"));
final Message<?> message = channel.receive(10);
System.out.println("payload class: " + message.getPayload().getClass());
System.out.println("payload: " + message.getPayload());
System.out.println(channel.receive(10));
This produces output:
payload class: class java.util.LinkedHashMap
payload: {name=first, url=url1}
GenericMessage [payload={name=second, url=url2}, headers={sequenceNumber=2,
correlationId=xx, id=yy, sequenceSize=2, timestamp=1503609649518}]
Aha, the LinkedHashMap
! Now I just need to convince the splitter to send the output as plain text or json, not a Map.
Update 2. I've been able to duplicate this issue without using any custom processors.
stream create --name test --definition "time <args>
| httpclient <args>
| splitter --expression=\"#jsonPath(payload,'$.[0:]')\"
--outputType=\"application/json\"
| transform --expression=\"new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload) != null\"
--outputType=\"application/json\"
| log"
When run, the splitter log file contains this exception (abridged), the infamous "payload must not be null" error:
2017-08-25 13:09:30,322 DEBUG -kafka-listener-2 o.s.i.c.DirectChannel:411 - preSend on channel 'output', message: GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, kafka_offset=xx sequenceSize=2, correlationId=yy, id=zz, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=test.httpclient, timestamp=1503680970322}]
2017-08-25 13:09:30,328 ERROR -kafka-listener-2 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = test.httpclient, partition = 0, offset = 52606, CreateTime = 1503680967030, checksum = 2166925613, serialized key size = -1, serialized value size = 470, key = null, value = [B@6567451e)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-.3.8.RELEASE.jar!/:4.3.8.RELEASE]
It looks like the splitter is having difficulty converting a LinkedHashMap to JSON. Any way to make the conversion happen?
I also tried setting the outputType of the httpclient processor to be explicitly application/json
and it didn't seem to make a difference. (Following the docs. The example shows the outputType values with shell quotes, I also tried without, no difference.)
Apps were loaded into SCDF server using command (replaced '.' in bit-ly so SO would accept link)
app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven
Noticed a few more things. The debug logging shows the content type as text/plain
throughout, so it appears to not be picking up the content type I set explicitly. Also, the errors go away if I remove the transform
processor. I see the data in the log, but not in JSON format, just this:
{name=first, url=url1}
I was able to get this working for both the pure Spring stream and a stream including my custom processor by explicitly setting both inputType and outputType as shown.
Gary Russell mentions that this issue has been corrected in Spring Cloud Stream Ditmars.M2 (1.3.0.M2).