I've installed Fiware Orion (v 1.13.0), Fiware Cygnus (2.8.0) and Kafka in order to send data from Orion to Kafka via Cygnus.
I've used the conf file suggested in the cygnus documentation
cygnus-ngsi.sources =http-source
cygnus-ngsi.sinks =kafka-sink
cygnus-ngsi.channels =kafka-channel
cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf
cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100
cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 192.168.1.142:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 192.168.1.142:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10
Everything is up using Docker swarm
ID NAME MODE REPLICAS IMAGE PORTS
c195zwpxparu cygnus_cygnus replicated 1/1 fiware/cygnus-ngsi:latest *:5050->5050/tcp, *:5055->5055/tcp, *:5080->5080/tcp
pzjvud4q4ibc kafka_kafka replicated 1/1 bitnami/kafka:2 *:9092->9092/tcp
bpyaz4jphuhh kafka_zookeeper replicated 1/1 bitnami/zookeeper:3 *:2181->2181/tcp
jk4po3ofs3bm orion_mongodb replicated 1/1 mongo:3.6.5
mlknk6j7y5kd orion_orion replicated 1/1 fiware/orion:1.13.0 *:1026->1026/tcp
This is my docker-compose.yml about cygnus
version: "3"
services:
cygnus:
image: fiware/cygnus-ngsi
ports:
- "5050:5050"
- "5055:5055"
- "5080:5080"
volumes:
- ./conf/agent.conf:/opt/apache-flume/conf/agent.conf
environment:
- CYGNUS_SKIP_CONF_GENERATION=true
- CYGNUS_MULTIAGENT=false
deploy:
replicas: 1
restart_policy:
condition: any
delay: 5s
max_attempts: 3
window: 120s
networks:
default:
driver: overlay
driver_opts:
com.docker.network.driver.mtu: 1400
When the notification arrives from Orion to localhost:5050/notify, on Cygnus console this error appears
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6
7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at java.lang.Thread.run(Thread.java:748)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67
ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception (begin() calle
d when transaction is OPEN!)
Any suggestion ?
The current cygnus release is indeed 2.8.0., but Orion 1.13.0 is several years old and unlikely to align with the latest Cygnus build.
The current Orion release is 3.0.0. Try using that instead.
A basic working example with latest Orion and latest Cygnus can be found here within the Step-by-Step Tutorials for NGSI-v2