What causes a promise using the first call variable value

44 views Asked by At

I'm developing a AMQPClient class to abstract RPC calls, works perfectly on the first call, but when calling again the correlationId has the value of the first call.

async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, onMessage, { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

Output:

generated correlationId:  lwfvgqym5ya
lwfvgqym5ya lwfvgqym5ya true

generated correlationId:  1m09k9jk2xm
lwfvgqym5ya 1m09k9jk2xm false

The correlationId printed on the second time matches the correlationId from the first call that was already resolved. The second call was made after the first one resolved.

I already tried moving the const correlationId = Math.random().toString(36).slice(2) outside the new Promise(...). I also tried to pass a anon function to the consume callback calling the onMessage funcion, no success.

this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })

I also tried to pass the correlationId as parameter, none above works. Always the second call uses the last value of correlationId inside de onMessage function.

Full code:

import client, { Channel, Connection, ConsumeMessage } from 'amqplib'

class AMQPClient {
  private channel?: Channel

  constructor(private readonly amqpUrl: string) {
    client.connect(this.amqpUrl).then((connection) => {
      connection.createChannel().then((channel) => {
        this.channel = channel
      })

      process.on('SIGINT', () => this.close(connection))
      process.on('SIGTERM', () => this.close(connection))
    })
  }

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

  close(connection: Connection) {
    connection.close()
    process.exit(0)
  }
}

const amqpClient = new AMQPClient(process.env.AMQP_URL || 'amqp://localhost')
export { amqpClient, AMQPClient }

Call:

this.amqpClient.RPC<MerchantStatus>(
  'getMerchantStatus',
  JSON.stringify({ merchantId: 'test' })
)
1

There are 1 answers

2
Lincon Dias On

As pointed by @jfriend00, the onMessage callback function was not being removed from the consume function, so it was keeping the value of the first correlationId.

The solution was to create a HashMap using the correlationId as key and a callback function as the value. So when the queue is consumed, it checks the callback HashMap with the correlationId sent on the message; if it finds a registered callback, the callback its called resolving the promise with the message value.

Working code:

class AMQPClient {
  private callbacks: Record<string, (message: ConsumeMessage) => void> = {}

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)
    const correlationId = Math.random().toString(36).slice(2)

    return new Promise((resolve) => {
      this.callbacks[correlationId] = (message) => {
        resolve(JSON.parse(message.content.toString()))
        delete this.callbacks[correlationId]
      }

      this.channel?.consume(replyTo, (message) => {
        if (message) {
          const correlationId = message.properties.correlationId
          const callback = this.callbacks[correlationId]
          if (callback) {
            callback(message)
          }
        }
      })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }
}

More details about RPC over RabbitMQ.