Question

I'm using AMQP / RabbitMQ for my Ruby on Rails app.

I put the following amqp.rb file under config/initializers: (Copied and changed from a Recipe: http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs)

require 'amqp'

# References:
#   1. Getting Started with AMQP and Ruby
#      http://rubyamqp.info/articles/getting_started/
#   2. EventMachine and Rails
#      http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
#   3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
#      http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
  def self.start
    if defined?(PhusionPassenger)
      Rails.logger.info "###############################################################################"
      Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
      Rails.logger.info "###############################################################################"
      PhusionPassenger.on_event(:starting_worker_process) do |forked|
      # =>  for passenger, we need to avoid orphaned threads
        if forked && EventMachine.reactor_running?
          EventMachine.stop
        end

        spawn_eventmachine_thread
        die_gracefully_on_signal
      end
    else
      Rails.logger.info "###############################################################################"
      Rails.logger.info "PhusionPassenger is not running.  Probably you are running Rails locally ......"
      Rails.logger.info "###############################################################################"

      # faciliates debugging
      Thread.abort_on_exception = true
      # just spawn a thread and start it up
      spawn_eventmachine_thread unless defined?(Thin)
      # Thin is built on EventMachine, doesn't need this thread
    end
  end

  def self.spawn_eventmachine_thread
    Thread.new {
      EventMachine.run do
        AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST ))
        AMQP.channel.on_error(&method(:handle_channel_exception))
        AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
                    .subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
      end
    }
  end

  def self.handle_channel_exception(channel, channel_close)
    Rails.logger.error "###############################################################################"
    Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
    Rails.logger.error "###############################################################################"
  end

  def self.die_gracefully_on_signal
    Signal.trap("INT") {
      Rails.logger.error "###############################################################################"
      Rails.logger.error "Stopping the EventMachine ......"
      EventMachine.stop
      Rails.logger.error "###############################################################################"
    }
    Signal.trap("TERM") {
      Rails.logger.error "###############################################################################"
      Rails.logger.error "Stopping the EventMachine ......"
      EventMachine.stop
      Rails.logger.error "###############################################################################"
    }
  end
end

AppEventMachine.start

After I start Rails with PhusionPassenger, I saw that it's running with PhusionPassenger, and then I try to send messages to the queue, but to my surprise:

.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }

The subscribe handler was only executed once, i.e., only the first message is received, any other messages (2nd, 3rd), the subscribe handler never gets called.

Was it helpful?

Solution

Thanks to Joshua's comment. It turns out that MixpanelJob::handl_sending failed, and it's blocking the EventMachine thread.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top