I’m working on a Ruby on Rails web project that runs on Puma server, also has several DelayedJob Workers on different servers.
The project generates some special logs that i want to send to AWS Sqs queue. Those logs are generated on both, frontend requests (run in the Puma server) and in different Delayed Jobs run in the Delayed::Worker servers.
One prerequisite is to send this information asyncronously son i’ve implemented a SQSManager class where different threads can write to a temporal local Queue (thread safe), and a second thread processes them every 10 seconds. This thread is launched on puma and workers start.
The relevant code is similar to this:
require 'aws-sdk-sqs'
class SQSManager
SEND_INTERVAL = 10 # seconds
MAX_BATCH_SIZE = 10000
@@sqs_queue_url = APP_CONFIG[:aws_sqs_queue_url]
@@local_queue = Queue.new # Threadsafe intermediate buffer
@@sqs_client = Aws::SQS::Client.new(region: ENV['AWS_REGION']) if ENV['AWS_REGION']
@@worker_thread = nil
# Add a log to the local queue
def self.add_log(log)
Rails.logger.warn "SQSManager (#{$$}): adding log, current # elements #{@@local_queue}"
@@local_queue << cal
end
end
def self.start
@@worker_thread = Thread.new do
Rails.logger.warn "SQSManager (#{$$}): Starting thread"
loop do
sleep(SEND_INTERVAL)
send_messages
if @@local_queue.empty?
Rails.logger.warn "SQSManager (#{$$}): Stopping thread"
break
end
end
end
end
private
def self.send_messages
Rails.logger.warn "SQSManager (#{$$}): processing local queues"
if @@local_queue.empty?
Rails.logger.warn "SQSManager (#{$$}): Messages empty"
else
Rails.logger.warn "SQSManager (#{$$}): Sending messages"
MAX_BATCH_SIZE.times do
unless @@local_queue.empty?
begin
@@sqs_client.send_message({queue_url: sqs_queue_url, message_body: @@local_queue.pop})
rescue Aws::SQS::Errors::ServiceError => e
Rails.logger.error "SQSManager (#{$$}): Error sending messages to #{sqs_queue_url}: #{e.message}.nBacktrace:nt#{e.backtrace.join("nt")}"
end
else
break
end
end
end
end
end
On puma i’ve configured to start the SQSManager thread on the after_fork hook and everything works fine. But with the DelayedJob i’m able to launch this thread, but something weird happens. It seems that the DelayedJob threads write on the local queue but the sending to SQS thread finds the local queue empty.
The log looks like this:
SQSManager (15928): Starting thread
SQSManager (15928): adding log, current # elements 1
SQSManager (15928): adding log, current # elements 2
SQSManager (15928): processing local queues
SQSManager (15928): Messages empty
SQSManager (15928): adding log, current # elements 3
SQSManager (15928): processing local queues
SQSManager (15928): Messages empty
The initializer for starting SQSManager on the Delaye::Worker it’s a bit tricky but it’s the best i found to make SQSManager thread run:
#config/initializer/delayed_job_worker.rb
require 'delayed_job'
module DelayedJobWorker
def self.included(base)
base.class_eval do
alias_method :original_stop, :stop
alias_method :original_start, :start
def start
Delayed::Worker.logger.warn "original_start worker!"
SQSManager.start # SQSManager initialization before the Workr original start
original_start
end
def stop
SQSManager.shutdown
original_stop
end
end
end
end
Delayed::Worker.include(DelayedJobWorker)
I think the key is in this initializer and also has tried many different things but i cant make it work properly. Any suggestions?
Technical data: Ruby 3.3, Rails 4.2.11.36 LTS, DelayedJob 4.1.11.