Michael Neumann
4/17/2009 6:23:00 AM
Sriram Varahan wrote:
> Hello,
>
> #*******************STARTCODE
>
> start_time = Time.now
> $count = 0
> class Queue
> def initialize *s # splat operator allows variable length argument
> list
> @mutex = Mutex.new
> @queue = []
> s.each { |e| @queue.push e }
> end
>
> def enq v
> @queue.push v
> end
>
> def deq
> @mutex.synchronize {item = @queue.shift}
> end
>
> def empty?
> @mutex.synchronize{@queue.length == 0}
> end
>
> def count
> @mutex.synchronize do
> $count += 1
> end
> end
> end
>
>
> #*****Test
>
> queue = Queue.new
> 500.times do |a|
> queue.enq a
> end
> threads = []
>
>
> # Create 5 threads which fetch values from the Q.
> 5.times do
> threads << Thread.new {
> until queue.empty?
> queue.count
> puts "Thread ID: #{Thread.current}.Job started"
> puts "#{queue.deq}"
> #sleep 0.0001
> puts "Thread ID: #{Thread.current}.Job complete"
> end
> }
> end
>
>
> threads.each {|t| t.join }
> puts "Count"
> puts $count
> puts "timeTaken:"
> puts Time.now - start_time
>
> # *************CODE ENDS******************
>
>
> I have five threads which fetch values from a queue. The above code
> works perfectly well in case of a single thread. But the issue arises
> when there are more threads.
>
> In case of 5 threads the number of times the block is executed is 503
> where it should have been 500.
>
> I know the reason why?
> The "deq" and "empty?" methods are not synchronized.
> So when the final item is removed from the thread, other threads access
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?
You should also synchronize the enque operation (Queue#enq). Btw, there is
an existing Queue class that does this thread-safe:
require 'thread'
q = Queue.new
q.push 1
x = q.pop
q.pop # => would block the thread until a new element is available
q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10
Regards,
Michael