How does the consumer-producer solution work?

I’m only a beginner, and my book doesn’t cover this subject. I have researched my problem and found that an implementation of the consumer-producer pattern is the ideal solution, and have Googled it, read what I could, tried to enact examples… but haven’t been lucky. I would really appreciate a bit of guidance.

I am writing in Ruby, if that makes a difference.

Background

I am writing a script that scrobbles my music library backlog to Last.FM. For anyone who isn’t familiar with that service, that simply means that I am making lots of POST requests to a HTTP API.

I start with an array, each element is a hash/dictionary like {artist: "The Cure", track: "Siamese Twins"}. I make the calls by iterating over the array and issuing the simple method call lastfm.track.scrobble artist: song[:artist], track: song[:track].

Problem

Doing this in a straightforward one-at-a-time blocking style works perfectly, but is very very slow, because I’m waiting ~2 seconds for each HTTP request to travel the world and return. I could finish 5 times faster if I sent 5 HTTP requests simultaneously.

I asked on Stack Overflow what the best solution would be. Split the array into five parts, and give part a thread running one request at a time? Something like JavaScript’s XMLHttpRequest which has an event loop and a callback function? They told me that this qusetion would be better suited to Programmers SE, and that I probably want the consumer-producer pattern. I looked it up, and it does sound like the kind of thing I need.

My Attempt in Ruby, based off a StackOverflow post

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code># "lastfm" is an object representing my Last.FM profile,
# its .track.scrobble method handles a POST request and waits until it returns
# "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"}
queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once
p1 = Thread.new do
songs.each do |song|
scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
queue << scrobble
end
queue << "done"
end
consumer = Thread.new do
blocker = queue.pop(true) # don't block when zero items are in queue
Thread.exit if blocker == "done"
process(blocker)
end
# wait for the consumer to finish
consumer.join
</code>
<code># "lastfm" is an object representing my Last.FM profile, # its .track.scrobble method handles a POST request and waits until it returns # "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"} queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once p1 = Thread.new do songs.each do |song| scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track] puts "Scrobbled #{song[:track]} by #{song[:artist]}." queue << scrobble end queue << "done" end consumer = Thread.new do blocker = queue.pop(true) # don't block when zero items are in queue Thread.exit if blocker == "done" process(blocker) end # wait for the consumer to finish consumer.join </code>
# "lastfm" is an object representing my Last.FM profile, 
# its .track.scrobble method handles a POST request and waits until it returns
# "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"}

queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once

p1 = Thread.new do 
  songs.each do |song|
      scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
      puts "Scrobbled #{song[:track]} by #{song[:artist]}."
      queue << scrobble
  end
  queue << "done"
end

consumer = Thread.new do
  blocker = queue.pop(true) # don't block when zero items are in queue
  Thread.exit if blocker == "done"
  process(blocker)
end

# wait for the consumer to finish
consumer.join

My Error

Failure on the “pop” method call with error “queue empty”.

I don’t know enough about this stuff to really understand what’s happening. It looks like one thread is filling a queue with API calls to make, never more than 10 at a time, while another thread is consuming from that queue and performing them. But why pop? Where is it ever actually executed? Why am I appending “done” to the queue?

I’m concerned about the line scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]. Won’t this call the lastfm.track.scrobble method outright, and store its return value as scrobble? Should I be using a proc or lambda instead, and calling that proc/lambda in the consumer?

1

The answer by scriptin is giving a reasonable solution, but doesn’t answer your questions about the failure of the implementation you had.

  • You get a ‘queue empty’ error when you call pop on a Ruby queue in non-blocking mode (true passed) and there are no items on the queue.
  • Your “producer” thread is created and your “consumer” thread is created, but they’ll run independently and start executing whenever Ruby feels like it.
  • If your “consumer” runs before the “producer” has pushed anything onto the queue, then obviously the “consumer” will pop an empty queue and throw an exception.

The solution to this one is easy; the consumer shouldn’t use non-blocking mode. Don’t pass true, just call pop(). The whole point of the consumer thread is that it can block on a queue waiting for the producer, without the rest of the application halting.

As for calling lastfm.track.scrobble – yes, this is executing that method immediately and pushing the result on the queue, which is exactly not what you want 😀 and is in fact the reason why “reliably” your consumer thread gets to pop its queue before the producer ever has a chance to push anything (even if the producer thread runs first, it hits a slow ‘scrobble’ call and sits waiting for HTTP requests/responses to complete; so the consumer thread executes and tries to read from the empty queue).

You want to push the parameters to scrobble onto the queue, then have the consumer thread do the scrobbling. It’s “nicer”/more structured to make little classes encapsulating the work and push instances of those onto your queue, but a cheap-and-nasty solution just uses the Hash which scrobble takes as input. We also need to remember to loop over the queue entries inside the consumer, instead of only processing one! Thus (UNTESTED) we might have:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>queue = SizedQueue.new(10)
Thread.new do
songs.each do |song|
queue << { artist: song[:artist], track: song[:track] }
end
queue << "done"
end
consumer = Thread.new do
loop do
scrobble_args = queue.pop()
Thread.exit if scrobble_args == "done"
lastfm.track.scrobble(scrobble_args)
puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
end
end
consumer.join
</code>
<code>queue = SizedQueue.new(10) Thread.new do songs.each do |song| queue << { artist: song[:artist], track: song[:track] } end queue << "done" end consumer = Thread.new do loop do scrobble_args = queue.pop() Thread.exit if scrobble_args == "done" lastfm.track.scrobble(scrobble_args) puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}." end end consumer.join </code>
queue = SizedQueue.new(10)

Thread.new do 
  songs.each do |song|
      queue << { artist: song[:artist], track: song[:track] }
  end
  queue << "done"
end

consumer = Thread.new do
  loop do
    scrobble_args = queue.pop()
    Thread.exit if scrobble_args == "done"
    lastfm.track.scrobble(scrobble_args)
    puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
  end
end

consumer.join

This should technically work but is totally pointless, because that one lonely consumer thread is just processing the queue in serial. So let’s simplify this. Really, you just want to dump all of ‘songs’ into the queue and have the consumer thread do the work. Since we want to run in parallel, we’ll make an array of threads.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code># I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
queue << song
end
consumers = []
1.upto( 10 ) do
consumers << Thread.new do
# Keep looping until asked to exit
#
loop do
song = queue.pop()
Thread.exit if song == 'done'
lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
end
end
# Push one 'done' exit instruction for each consumer
#
queue << 'done'
end
# Wait for all consumer threads to complete
#
consumers.each do | consumer |
consumer.join()
end
</code>
<code># I'm not bothering with the producer thread here so the queue size is unlimited. # You could just as well put the 'songs.each' inside a producer thread with a # a sized queue, as per your original code, if you wanted. # queue = Queue.new songs.each do | song | queue << song end consumers = [] 1.upto( 10 ) do consumers << Thread.new do # Keep looping until asked to exit # loop do song = queue.pop() Thread.exit if song == 'done' lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] ) puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }." end end # Push one 'done' exit instruction for each consumer # queue << 'done' end # Wait for all consumer threads to complete # consumers.each do | consumer | consumer.join() end </code>
# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
  queue << song
end

consumers = []

1.upto( 10 ) do
  consumers << Thread.new do

    # Keep looping until asked to exit
    #
    loop do
      song = queue.pop()
      Thread.exit if song == 'done'
      lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
      puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
    end
  end

  # Push one 'done' exit instruction for each consumer
  #
  queue << 'done'
end

# Wait for all consumer threads to complete
#
consumers.each do | consumer |
  consumer.join()
end

…and that should do the trick. The “Parallel” gem will probably be doing much the same sort of thing under the hood.

It seems like each song is processed independently, in which case you just need parallel collection. Quick googling found this gem: https://github.com/grosser/parallel

Code would be something like that:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>Parallel.each(songs) do |song|
lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
end
</code>
<code>Parallel.each(songs) do |song| lastfm.track.scrobble artist: song[:artist], track: song[:track] puts "Scrobbled #{song[:track]} by #{song[:artist]}." end </code>
Parallel.each(songs) do |song|
  lastfm.track.scrobble artist: song[:artist], track: song[:track]
  puts "Scrobbled #{song[:track]} by #{song[:artist]}."
end

or that, if you want to see the results:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>scrobbled_songs = Parallel.map(songs) do |song|
scrobbled_song = lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
scrobbled_song
end
</code>
<code>scrobbled_songs = Parallel.map(songs) do |song| scrobbled_song = lastfm.track.scrobble artist: song[:artist], track: song[:track] puts "Scrobbled #{song[:track]} by #{song[:artist]}." scrobbled_song end </code>
scrobbled_songs = Parallel.map(songs) do |song|
  scrobbled_song = lastfm.track.scrobble artist: song[:artist], track: song[:track]
  puts "Scrobbled #{song[:track]} by #{song[:artist]}."
  scrobbled_song
end

As doc says, you can adjust the number of processes/threads to match your number of CPU cores.

It is still blocking on each HTTP request (assuming lastfm.track.scrobble is blocking operation), you’re just doing it in parallel by using multiple cores (assuming you have more than one).

If you need async HTTP requeest, you might just look for a good tool, like em-http-request or similar.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật