Can not connect to Rails database from Sneakers worker (RabbitMQ RPC call)

1.1k views Asked by At

TLDR;

Why Sneakers worker can't connect to the database or can't query it?

(General advices on "do's" and "dont's" are also welcome in comments)

Full question:

I am able to execute RPC call that returns a simple string, but I can't execute RPC call that is querying the database on the server side. I read the docs, tried many SO posts and blog tutorials, but I am still missing some piece.

I have two services. First service (Client) is using Bunny gem and is making an RPC call to second service (RPCServer) which is listening on workers using Sneakers gem. Both services are Rails apps.

RabbitMQ is serving in a docker container:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Postgres database is installed on a local machine.

Client service (mostly from Rabbitbunny docs ):

# app/services/client.rb
class Client
  attr_accessor :call_id, :lock, :condition, :reply_queue, :exchange, :params, :response, :server_queue_name, :channel, :reply_queue_name

  def initialize(rpc_route:, params:)
    @channel = channel
    @exchange = channel.fanout("Client.Server.exchange.#{params[:controller]}")
    @server_queue_name = "Server.Client.queue.#{rpc_route}"
    @reply_queue_name = "Client.Server.queue.#{params[:controller]}"
    @params = params
    setup_reply_queue
  end

  def setup_reply_queue
    @lock = Mutex.new
    @condition = ConditionVariable.new
    that = self
    @reply_queue = channel.queue(reply_queue_name, durable: true)

    reply_queue.subscribe do |_delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.response = payload
        that.lock.synchronize { that.condition.signal }
      end
    end
  end

  def call
    @call_id = "NAIVE_RAND_#{rand}#{rand}#{rand}"
    exchange.publish(params.to_json,
                     routing_key: server_queue_name,
                     correlation_id: call_id,
                     reply_to: reply_queue.name)
    lock.synchronize { condition.wait(lock) }
    connection.close
    response
  end

  def channel
    @channel ||= connection.create_channel
  end

  def connection
    @connection ||= Bunny.new.tap { |c| c.start }
  end
end

RPCServer service, using this gist (comments here are the "meat" of my question:

# app/workers/posts_worker.rb
require 'sneakers'
require 'sneakers/runner'
require 'byebug'
require 'oj'

class RpcServer
  include Sneakers::Worker
  from_queue 'Client.Server.queue.v1/filters/posts', durable: true, env: nil

  def work_with_params(deserialized_msg, delivery_info, metadata)
    post = {}
    p "ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}" # => true
    ##### This gets logged   
    Rails.logger.info "ActiveRecord::Base.connection_pool: #{ActiveRecord::Base.connection_pool}\n\n-------"
    ##### This never gets logged
    Rails.logger.info "ActiveRecord::Base.connection_pool.with_connection: #{ActiveRecord::Base.connection_pool.with_connection}\n\n--------" 

    ### interpreter never reaches this place when ActiveRecord methods like `with_connection`, `where`, `count` etc. are used
    ActiveRecord::Base.connection_pool.with_connection do
      post = Post.first.to_json
    end

    ##### first commented `publish()` works fine and RPC works when no ActiveRecord is involved (this is, assuming above code using ActiveRecord is commented out)
    ##### second publish is not working
    # publish("response from RPCServer", {
    publish(post.to_json, {      
      to_queue: metadata[:reply_to],
      correlation_id: metadata[:correlation_id],
      content_type: metadata[:content_type]
    })

    ack!
  end
end

Sneakers::Runner.new([RpcServer]).run

RPCServer sneakers configuration:

# config/initializers/sneakers.rb
Sneakers.configure({
  amqp: "amqp://guest:guest@localhost:5672",
  vhost: '/',
  workers: 4,
  log: 'log/sneakers.log',
  pid_path: "tmp/pids/sneakers.pid",
  timeout_job_after: 5,
  prefetch: 10,
  threads: 10,
  durable: true,
  ack: true,
  heartbeat: 2,
  exchange: "",
  hooks: {
    before_fork: -> {
       Rails.logger.info('Worker: Disconnect from the database')
       ActiveRecord::Base.connection_pool.disconnect!
       Rails.logger.info("before_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => false
    },
    after_fork: -> {
      ActiveRecord::Base.connection
      Rails.logger.info("after_fork: ActiveRecord::Base.connected?: #{ActiveRecord::Base.connected?}") # => true
      Rails.logger.info('Worker: Reconnect to the database')
    },
    timeout_job_after: 60
})
Sneakers.logger.level = Logger::INFO

RPCServer puma configuration:

# config/puma.rb
threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 }
threads threads_count, threads_count

port        ENV.fetch("PORT") { 3000 }
environment ENV.fetch("RAILS_ENV") { "development" }
workers ENV.fetch("WEB_CONCURRENCY") { 2 }

preload_app!

### tried and did not work
# on_worker_boot do
#   ActiveSupport.on_load(:active_record) do
#     ActiveRecord::Base.establish_connection
#   end
# end

before_fork do |server, worker|
  # other settings
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.connection.disconnect!
  end
end

after_worker_boot do |server, worker|
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.establish_connection
  end
end

plugin :tmp_restart

for completeness, I also have an external Rakefile that is binding queues to exchanges (probably not important in this case)

namespace :rabbitmq do
  desc "Setup routing"
  task :setup do
    conn = start_bunny

    rpc_route service: :blog, from: 'v1/filters/posts_mappings', to: 'v1/filters/posts'

    conn.close
  end

  def rpc_route(service:, from:, to:)
    ...
  end

  def start_bunny
    ...
  end
end

I tried many sneakers configurations, and many orders of launching rabbitmq, resetting it, deleting queues, connections, etc. All of it is hard to list here and probably not the case.

Why I can't connect to the database or execute ActiveRecord methods? What Am I missing?

1

There are 1 answers

0
ToTenMilan On BEST ANSWER

Ok I got it. The problem was last line of worker in RPCServer:

Sneakers::Runner.new([RpcServer]).run

It was running worker outside of Rails app. Commenting this out solved my problem of worker not being able to query database.