Faye Websocket Ruby not working as expected

666 views Asked by At

I am trying to use haproxy to load balance my websocket rack application.

I publish message in channel rates using redis-cli and this succeeds puts "sent" if ws.send(msg)

The client does receive the 'Welcome! from server' message so I know the initial handshake is done.

But, the client never receives the published message in channel 'rates'.

web_socket.rb

require 'faye/websocket'

module WebSocket
 class App
   KEEPALIVE_TIME = 15 # in seconds

def initialize(app)
  @app     = app
  @mutex = Mutex.new
  @clients = []
  # @redis   = Redis.new(host: 'rds', port: 6739)
  Thread.new do
    @redis_sub = Redis.new(host: 'rds', port: 6379)
    @redis_sub.subscribe('rates') do |on|
      on.message do |channel, msg|
        p [msg,@clients.length]
        @mutex.synchronize do
          @clients.each do |ws|
            # ws.ping 'Mic check, one, two' do
            p ws
            puts "sent" if ws.send(msg)
            # end
          end
        end
      end
    end
  end
end

def call(env)
  if Faye::WebSocket.websocket?(env)
    # WebSockets logic goes here
    ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }
    ws.on :open do |event|
      p [:open,  ENV['APPID'], ws.object_id]
      ws.ping 'Mic check, one, two' do
        # fires when pong is received
        puts "Welcome sent" if ws.send('Welcome! from server')
        @mutex.synchronize do
          @clients << ws
        end
        p [@clients.length, ' Client Connected']
      end

    end
    ws.on :close do |event|
      p [:close, ENV['APPID'], ws.object_id, event.code, event.reason]
      @mutex.synchronize do
        @clients.delete(ws)
      end
      p @clients.length
      ws = nil
    end
    ws.on :message do |event|
      p [:message, event.data]
      # @clients.each {|client| client.send(event.data) }
    end
    # Return async Rack response
    ws.rack_response
  else
    @app.call(env)
  end
  end
  end
  end

My haproxy.cfg

frontend http
  bind *:8080
  mode http
  timeout client 1000s
  use_backend all
backend all
  mode http
  timeout server 1000s
  timeout tunnel 1000s
  timeout connect 1000s
  server s1 app1:8080
  server s2 app2:8080
  server s3 app3:8080
  server s4 app4:8080

Chrome dev tools enter image description here

Please help me!!!

EDIT: I have tried Thread running in Middleware is using old version of parent's instance variable but this does not work.

As mentioned earlier. The below code succeeds

puts "sent" if ws.send(msg)

1

There are 1 answers

1
kamal patwa On BEST ANSWER

Okay, After a lot of searching and testing.. I found that the issue was with not setting a ping. during websocket initialization in the server.

Change this

ws = Faye::WebSocket.new(env, nil) # {ping: KEEPALIVE_TIME }

to

ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })

My KEEPALIVE_TIME is 0.5 because I am making a stock application where rates change very quickly. You can keep it per your needs.