duplicate messages with RabbitMQ

36 views Asked by At

I have a system that pushes messages to a RabbitMQ exchange and several services that consume that data.

The publishing code (F#) is the following:

    type RabbitPublisher(connectionString, exchangeType: RMQExchangeType, exchangeName) =

        let connection = createConnection connectionString
        let channel    = connection.CreateModel()

        do
            // set up the exchange
            channel.ExchangeDeclare(exchangeName, exchangeType.Converted, durable = true)
            info $"started publisher for exchange '{exchangeName}'"


        member this.Send(message: byte array, ?key: string) =
            try
                let key = defaultArg key String.Empty
                channel.BasicPublish(exchangeName, key, null, message)
            with ex ->
                error $"error while sending message to '{exchangeName}': {ex.Humanize()}"

        member this.Purge(queueName) =
            try
                channel.QueuePurge(queueName) |> ignore
            with ex ->
                error $"error while purging queue '{queueName}': {ex.Humanize()}"

and when consuming, I have this code that takes a callback for every message:

    type RabbitExchangeSubscriber(connectionString, exchangeType: RMQExchangeType, exchangeName, callback: string -> byte array -> unit, ?keys: string list, ?queueLength: int) =
        let keys        = defaultArg keys []
        let queueLength = defaultArg queueLength 10

        let connection = createConnection connectionString
        let channel    = connection.CreateModel()
        let consumer   = EventingBasicConsumer(channel)

        member this.Start () =
            // properties
            let properties =
                [
                    "x-max-length", box queueLength
                ]
                |> dict

            // set up the exchange
            channel.ExchangeDeclare(exchangeName, exchangeType.Converted, durable = true)
            let queue = channel.QueueDeclare(arguments = properties)
            match keys with
            | [] ->  channel.QueueBind(queue.QueueName, exchangeName, String.Empty)
            | k  ->  k |> List.iter (fun x -> channel.QueueBind(queue.QueueName, exchangeName, x))

            // purge the queue on startup
            channel.QueuePurge(queue.QueueName) |> ignore

            // register the callback
            consumer.Received.AddHandler(fun model ea -> callback ea.RoutingKey (ea.Body.ToArray()))

            // get ready to consume events
            channel.BasicConsume(queue.QueueName, true, consumer) |> ignore // auto-ack

        member this.Stop () =
            channel.Close ()

The publisher is constantly pushing data. It is important to note that the data CAN be lost, if it's not consumed, it can disappear.

The consumers will get a lot of duplicate messages on startup and I do not understand why.

Is there anything obviously wrong with this setup?

0

There are 0 answers

Related Questions in F#