[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

Multi Threading

Sriram Varahan

4/17/2009 5:45:00 AM

Hello,

#*******************STARTCODE

start_time = Time.now
$count = 0
class Queue
def initialize *s # splat operator allows variable length argument
list
@mutex = Mutex.new
@queue = []
s.each { |e| @queue.push e }
end

def enq v
@queue.push v
end

def deq
@mutex.synchronize {item = @queue.shift}
end

def empty?
@mutex.synchronize{@queue.length == 0}
end

def count
@mutex.synchronize do
$count += 1
end
end
end


#*****Test

queue = Queue.new
500.times do |a|
queue.enq a
end
threads = []


# Create 5 threads which fetch values from the Q.
5.times do
threads << Thread.new {
until queue.empty?
queue.count
puts "Thread ID: #{Thread.current}.Job started"
puts "#{queue.deq}"
#sleep 0.0001
puts "Thread ID: #{Thread.current}.Job complete"
end
}
end


threads.each {|t| t.join }
puts "Count"
puts $count
puts "timeTaken:"
puts Time.now - start_time

# *************CODE ENDS******************


I have five threads which fetch values from a queue. The above code
works perfectly well in case of a single thread. But the issue arises
when there are more threads.

In case of 5 threads the number of times the block is executed is 503
where it should have been 500.

I know the reason why?
The "deq" and "empty?" methods are not synchronized.
So when the final item is removed from the thread, other threads access
the empty? method before the @queue.length becomes 0.

Hence the difference in count.

If the sleep is activated this problem is solved.

Any suggestion on how to get this working without the sleep?

Thanks.
--
Posted via http://www.ruby-....

5 Answers

Michael Neumann

4/17/2009 6:23:00 AM

0

Sriram Varahan wrote:

> Hello,
>
> #*******************STARTCODE
>
> start_time = Time.now
> $count = 0
> class Queue
> def initialize *s # splat operator allows variable length argument
> list
> @mutex = Mutex.new
> @queue = []
> s.each { |e| @queue.push e }
> end
>
> def enq v
> @queue.push v
> end
>
> def deq
> @mutex.synchronize {item = @queue.shift}
> end
>
> def empty?
> @mutex.synchronize{@queue.length == 0}
> end
>
> def count
> @mutex.synchronize do
> $count += 1
> end
> end
> end
>
>
> #*****Test
>
> queue = Queue.new
> 500.times do |a|
> queue.enq a
> end
> threads = []
>
>
> # Create 5 threads which fetch values from the Q.
> 5.times do
> threads << Thread.new {
> until queue.empty?
> queue.count
> puts "Thread ID: #{Thread.current}.Job started"
> puts "#{queue.deq}"
> #sleep 0.0001
> puts "Thread ID: #{Thread.current}.Job complete"
> end
> }
> end
>
>
> threads.each {|t| t.join }
> puts "Count"
> puts $count
> puts "timeTaken:"
> puts Time.now - start_time
>
> # *************CODE ENDS******************
>
>
> I have five threads which fetch values from a queue. The above code
> works perfectly well in case of a single thread. But the issue arises
> when there are more threads.
>
> In case of 5 threads the number of times the block is executed is 503
> where it should have been 500.
>
> I know the reason why?
> The "deq" and "empty?" methods are not synchronized.
> So when the final item is removed from the thread, other threads access
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

You should also synchronize the enque operation (Queue#enq). Btw, there is
an existing Queue class that does this thread-safe:

require 'thread'
q = Queue.new
q.push 1
x = q.pop
q.pop # => would block the thread until a new element is available

q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10

Regards,

Michael



Robert Klemme

4/17/2009 6:56:00 AM

0

2009/4/17 Sriram Varahan <sriram.varahan@gmail.com>:
> Hello,
>
> #*******************STARTCODE
>
> start_time =3D Time.now
> $count =3D 0
> class Queue
> =A0def initialize *s # splat operator allows variable length argument
> list
> =A0 =A0@mutex =3D Mutex.new
> =A0 =A0@queue =3D []
> =A0 =A0s.each { |e| @queue.push e }
> =A0end
>
> =A0def enq v
> =A0 =A0 =A0@queue.push v
> =A0end
>
> =A0def deq
> =A0 =A0@mutex.synchronize {item =3D @queue.shift}
> =A0end
>
> =A0def empty?
> =A0 =A0@mutex.synchronize{@queue.length =3D=3D 0}
> =A0end
>
> =A0def count
> =A0 @mutex.synchronize do
> =A0 =A0 $count +=3D 1
> =A0 end
> =A0end
> end
>
>
> #*****Test
>
> queue =3D Queue.new
> 500.times do |a|
> =A0queue.enq a
> end
> threads =3D []
>
>
> # Create 5 threads which fetch values from the Q.
> =A05.times do
> =A0 =A0threads << Thread.new {
> =A0 =A0until queue.empty?
> =A0 =A0 =A0queue.count
> =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0started"
> =A0 =A0 =A0puts "#{queue.deq}"
> =A0 =A0 =A0#sleep 0.0001
> =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0complete"
> =A0 =A0end
> =A0 }
> =A0end
>
>
> threads.each {|t| t.join }
> puts "Count"
> puts $count
> puts "timeTaken:"
> puts Time.now - start_time
>
> # *************CODE ENDS******************
>
>
> I have five threads which fetch values from a queue. The above code
> works perfectly well in case of a single thread. But the issue arises
> when there are more threads.
>
> In case of 5 threads the number of times the block is executed is 503
> where it should have been 500.
>
> I know the reason why?
> The "deq" and "empty?" methods are not synchronized.
> So when the final item is removed from the thread, other threads =A0acces=
s
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?

There are several options. You could use MonitorMixin instead of Mutex
and include it in initialize

def initialize *s
# @mutex =3D=3D self so you do not need to change sync code
@mutex =3D extend MonitorMixin
@queue =3D s.dup
end

Then you can do external synchronization, e.g.

queue.synchronize do
if queue.empty?
# finish
else
elm =3D deq
end
end

Much better though is this approach

require 'thread'

# use library class
queue =3D Queue.new

# _first_ start threads
# does not really matter but if filling
# the queue takes time work can
# begin immediately
threads =3D (1..5).map do
label =3D Thread.current.to_s.freeze

Thread.new queue do |q|
until ( job =3D q.deq ) =3D=3D :terminate
puts "Thread ID: #{label}.Job started"
puts job
puts "Thread ID: #{label}.Job complete"
end
end
end

# fill queues
500.times do |a|
queue.enq a
end

# "close" queues
threads.size.times { queue.enq :terminate }

# wait for termination
threads.each do |th|
th.join
end

Cheers

robert

--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestprac...

Sean O'Halpin

4/17/2009 7:21:00 AM

0

On Fri, Apr 17, 2009 at 7:55 AM, Robert Klemme
<shortcutter@googlemail.com> wrote:
>
> Much better though is this approach
>
> require 'thread'
>
> # use library class
> queue =3D Queue.new
>
> # _first_ start threads
> # does not really matter but if filling
> # the queue takes time work can
> # begin immediately
> threads =3D (1..5).map do
> =A0label =3D Thread.current.to_s.freeze
>
> =A0Thread.new queue do |q|
> =A0 =A0until ( job =3D q.deq ) =3D=3D :terminate
> =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0started"
> =A0 =A0 =A0puts job
> =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0complete"
> =A0 =A0end
> =A0end
> end
>
> # fill queues
> 500.times do |a|
> =A0queue.enq a
> end
>
> # "close" queues
> threads.size.times { queue.enq :terminate }
>
> # wait for termination
> threads.each do |th|
> =A0th.join
> end
>
> Cheers
>
> robert
>
> --
> remember.guy do |as, often| as.you_can - without end
> http://blog.rubybestprac...
>
>

Minor nitpick - these lines should be reversed:

> label =3D Thread.current.to_s.freeze
> Thread.new queue do |q|

i.e.

> Thread.new queue do |q|
> label =3D Thread.current.to_s.freeze

Regards,
Sean

Sriram Varahan

4/17/2009 7:43:00 AM

0

Hey Robert,

That was an amazing solution!Thanks a million :)


Thank you Michael and Sean for your time :)
--
Posted via http://www.ruby-....

Robert Klemme

4/17/2009 8:01:00 AM

0

2009/4/17 Sean O'Halpin <sean.ohalpin@gmail.com>:

> Minor nitpick - these lines should be reversed:
>
>> =A0label =3D Thread.current.to_s.freeze
>> =A0Thread.new queue do |q|
>
> i.e.
>
>> =A0Thread.new queue do |q|
>> =A0 =A0label =3D Thread.current.to_s.freeze

Oh yes, absolutely! Apparently I moved the line too high. Thanks for
catching that gotcha, Sean!

Kind regards

robert

--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestprac...