Robert Klemme
10/13/2007 10:34:00 AM
While this is all true and well I have some additional remarks.
On 13.10.2007 05:59, 7stud -- wrote:
> Actually, the example provided won't even work in your case. You have
> to do some extra things.
>
>> I'm pretty new to ruby
>
> A Queue is a first in first out container, which means the items you
> push() into one end of the Queue are the first items that pop() out the
> other end. A Queue is also thread safe, which means that only one
> thread can access it at the same time.
>
> Therefore, you can push() the lines from your file into one end of the
> Queue, and you can have each thread pop() a line off the other end of
> the Queue.
>
> If there is nothing in the Queue, then a thread that tries to pop() a
> line from the Queue will block until more data becomes available. As a
> result, even after all the lines have been read from the Queue, each
> thread will come back to the Queue and try to pop() another line, but
> since there won't be any more lines left, the threads will block and
> wait for more data. That means the threads will never end. To make
> your threads stop trying to read more lines from the Queue once it's
> exhausted, you will need to send each thread a string that acts as a
> termination message.
There is a better option: rather send something down the queue that is
*not a String* - otherwise processing would suddenly stop if the file
contained the terminating line.
> You could first push() all the lines from your file into the Queue, and
> then start the threads,
That's a rather bad idea given that a file can be huge and you do not
need all lines in memory for line wise processing.
That's the same reason why it's a good idea to use a bounded queue: if
processing is slower than reading, an unbounded queue will eventually
fill up with the complete file contents. If processing is faster than
reading then threads will have to wait either way.
> but you might as well get the threads working on
> the first lines while you are pushing the rest of the lines into the
> Queue. So, start the threads and let them block, then start pushing
> the lines from the file into the Queue.
>
>
> require 'thread'
>
> #Create some data:
> File.open("data.txt", "w") do |file|
> (1..100).each do |num|
> file.puts("line #{num}")
> end
> end
>
>
> #Read data with 5 threads:
> q = Queue.new
>
> my_threads = (1..5).collect do |i|
> Thread.new do #returns a thread
> loop do
> line = q.pop
>
> if line == "END_OF_DATA"
> break
> end
>
> #process line:
> puts line.capitalize
> end
> end
> end
>
> #Threads are blocking while they
> #await data. Give them some data:
> IO.foreach("data.txt") do |line|
> q.push(line)
> end
>
> #Send each thread a signal that
> #terminates the thread:
> 5.times {q.push("END_OF_DATA")}
>
> #Wait for all the threads to finish
> #executing:
> my_threads.each {|t| t.join}
Here's my version with all the remarks incorporated.
require 'thread'
MAX_IN_QUEUE = 1024
NUM_THREADS = 5
queue = SizedQueue.new MAX_IN_QUEUE
threads = (1..NUM_THREADS).map do
# we use the mechanism to pass the queue through
# the constructor to avoid nasty effects of
# variable "queue" changing
Thread.new queue do |q|
# we use the queue itself as terminator
until q == (item = q.deq)
begin
# whatever processing
rescue Exception => e
# whatever error handling
end
end
end
end
# read from files on the command line
ARGF.each do |line|
queue.enq line
end
threads.each do |th|
# send the terminator and wait
queue.enq queue
th.join
end
Have fun!
robert