I’m only a beginner, and my book doesn’t cover this subject. I have researched my problem and found that an implementation of the consumer-producer pattern is the ideal solution, and have Googled it, read what I could, tried to enact examples… but haven’t been lucky. I would really appreciate a bit of guidance.
I am writing in Ruby, if that makes a difference.
Background
I am writing a script that scrobbles my music library backlog to Last.FM. For anyone who isn’t familiar with that service, that simply means that I am making lots of POST requests to a HTTP API.
I start with an array, each element is a hash/dictionary like {artist: "The Cure", track: "Siamese Twins"}
. I make the calls by iterating over the array and issuing the simple method call lastfm.track.scrobble artist: song[:artist], track: song[:track]
.
Problem
Doing this in a straightforward one-at-a-time blocking style works perfectly, but is very very slow, because I’m waiting ~2 seconds for each HTTP request to travel the world and return. I could finish 5 times faster if I sent 5 HTTP requests simultaneously.
I asked on Stack Overflow what the best solution would be. Split the array into five parts, and give part a thread running one request at a time? Something like JavaScript’s XMLHttpRequest
which has an event loop and a callback function? They told me that this qusetion would be better suited to Programmers SE, and that I probably want the consumer-producer pattern. I looked it up, and it does sound like the kind of thing I need.
My Attempt in Ruby, based off a StackOverflow post
# "lastfm" is an object representing my Last.FM profile,
# its .track.scrobble method handles a POST request and waits until it returns
# "songs" is an array of hashes like {artist: "The Cure", track: "One Hundred Years"}
queue = SizedQueue.new(10) # this will only allow 10 items on the queue at once
p1 = Thread.new do
songs.each do |song|
scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
queue << scrobble
end
queue << "done"
end
consumer = Thread.new do
blocker = queue.pop(true) # don't block when zero items are in queue
Thread.exit if blocker == "done"
process(blocker)
end
# wait for the consumer to finish
consumer.join
My Error
Failure on the “pop” method call with error “queue empty”.
I don’t know enough about this stuff to really understand what’s happening. It looks like one thread is filling a queue with API calls to make, never more than 10 at a time, while another thread is consuming from that queue and performing them. But why pop? Where is it ever actually executed? Why am I appending “done” to the queue?
I’m concerned about the line scrobble = lastfm.track.scrobble artist: song[:artist], track: song[:track]
. Won’t this call the lastfm.track.scrobble method outright, and store its return value as scrobble
? Should I be using a proc or lambda instead, and calling that proc/lambda in the consumer?
1
The answer by scriptin is giving a reasonable solution, but doesn’t answer your questions about the failure of the implementation you had.
- You get a ‘queue empty’ error when you call
pop
on a Ruby queue in non-blocking mode (true
passed) and there are no items on the queue. - Your “producer” thread is created and your “consumer” thread is created, but they’ll run independently and start executing whenever Ruby feels like it.
- If your “consumer” runs before the “producer” has pushed anything onto the queue, then obviously the “consumer” will pop an empty queue and throw an exception.
The solution to this one is easy; the consumer shouldn’t use non-blocking mode. Don’t pass true
, just call pop()
. The whole point of the consumer thread is that it can block on a queue waiting for the producer, without the rest of the application halting.
As for calling lastfm.track.scrobble
– yes, this is executing that method immediately and pushing the result on the queue, which is exactly not what you want 😀 and is in fact the reason why “reliably” your consumer thread gets to pop its queue before the producer ever has a chance to push anything (even if the producer thread runs first, it hits a slow ‘scrobble’ call and sits waiting for HTTP requests/responses to complete; so the consumer thread executes and tries to read from the empty queue).
You want to push the parameters to scrobble
onto the queue, then have the consumer thread do the scrobbling. It’s “nicer”/more structured to make little classes encapsulating the work and push instances of those onto your queue, but a cheap-and-nasty solution just uses the Hash which scrobble
takes as input. We also need to remember to loop over the queue entries inside the consumer, instead of only processing one! Thus (UNTESTED) we might have:
queue = SizedQueue.new(10)
Thread.new do
songs.each do |song|
queue << { artist: song[:artist], track: song[:track] }
end
queue << "done"
end
consumer = Thread.new do
loop do
scrobble_args = queue.pop()
Thread.exit if scrobble_args == "done"
lastfm.track.scrobble(scrobble_args)
puts "Scrobbled #{scrobble_args[:track] by scrobble_args[:artist]}."
end
end
consumer.join
This should technically work but is totally pointless, because that one lonely consumer thread is just processing the queue in serial. So let’s simplify this. Really, you just want to dump all of ‘songs’ into the queue and have the consumer thread do the work. Since we want to run in parallel, we’ll make an array of threads.
# I'm not bothering with the producer thread here so the queue size is unlimited.
# You could just as well put the 'songs.each' inside a producer thread with a
# a sized queue, as per your original code, if you wanted.
#
queue = Queue.new
songs.each do | song |
queue << song
end
consumers = []
1.upto( 10 ) do
consumers << Thread.new do
# Keep looping until asked to exit
#
loop do
song = queue.pop()
Thread.exit if song == 'done'
lastfm.track.scrobble( artist: song[ :artist ], track: song[ :track ] )
puts "Scrobbled #{ song[ :track ] } by #{ song[ :artist ] }."
end
end
# Push one 'done' exit instruction for each consumer
#
queue << 'done'
end
# Wait for all consumer threads to complete
#
consumers.each do | consumer |
consumer.join()
end
…and that should do the trick. The “Parallel” gem will probably be doing much the same sort of thing under the hood.
It seems like each song is processed independently, in which case you just need parallel collection. Quick googling found this gem: https://github.com/grosser/parallel
Code would be something like that:
Parallel.each(songs) do |song|
lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
end
or that, if you want to see the results:
scrobbled_songs = Parallel.map(songs) do |song|
scrobbled_song = lastfm.track.scrobble artist: song[:artist], track: song[:track]
puts "Scrobbled #{song[:track]} by #{song[:artist]}."
scrobbled_song
end
As doc says, you can adjust the number of processes/threads to match your number of CPU cores.
It is still blocking on each HTTP request (assuming lastfm.track.scrobble
is blocking operation), you’re just doing it in parallel by using multiple cores (assuming you have more than one).
If you need async HTTP requeest, you might just look for a good tool, like em-http-request or similar.