I've installed Fiware Orion (v 1.13.0), Fiware Cygnus (2.8.0) and Kafka in order to send data from Orion to Kafka via Cygnus.

I've used the conf file suggested in the cygnus documentation

cygnus-ngsi.sources =http-source
cygnus-ngsi.sinks =kafka-sink
cygnus-ngsi.channels =kafka-channel

cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf


cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100

cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 192.168.1.142:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 192.168.1.142:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

Everything is up using Docker swarm

ID             NAME              MODE         REPLICAS   IMAGE                       PORTS
c195zwpxparu   cygnus_cygnus     replicated   1/1        fiware/cygnus-ngsi:latest   *:5050->5050/tcp, *:5055->5055/tcp, *:5080->5080/tcp
pzjvud4q4ibc   kafka_kafka       replicated   1/1        bitnami/kafka:2             *:9092->9092/tcp
bpyaz4jphuhh   kafka_zookeeper   replicated   1/1        bitnami/zookeeper:3         *:2181->2181/tcp
jk4po3ofs3bm   orion_mongodb     replicated   1/1        mongo:3.6.5
mlknk6j7y5kd   orion_orion       replicated   1/1        fiware/orion:1.13.0         *:1026->1026/tcp

This is my docker-compose.yml about cygnus

version: "3"
services:

  cygnus:
    image: fiware/cygnus-ngsi
    ports:
      - "5050:5050"
      - "5055:5055"
      - "5080:5080"
    volumes:
      - ./conf/agent.conf:/opt/apache-flume/conf/agent.conf
    environment:
      - CYGNUS_SKIP_CONF_GENERATION=true
      - CYGNUS_MULTIAGENT=false
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s
  
networks:
  default:
    driver: overlay
    driver_opts:
      com.docker.network.driver.mtu: 1400

When the notification arrives from Orion to localhost:5050/notify, on Cygnus console this error appears


cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6
7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    |        at java.lang.Thread.run(Thread.java:748)
cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop    | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67
ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception  (begin() calle
d when transaction is OPEN!)

Any suggestion ?

1

There are 1 answers

4
Jason Fox On

The current cygnus release is indeed 2.8.0., but Orion 1.13.0 is several years old and unlikely to align with the latest Cygnus build.

The current Orion release is 3.0.0. Try using that instead.

A basic working example with latest Orion and latest Cygnus can be found here within the Step-by-Step Tutorials for NGSI-v2