[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

Thread-safe priority queue?

Sean O'Halpin

7/9/2008 1:09:00 PM

Hi,

Does anyone know of a solid, thread-safe priority queue implementation in Ruby?

The only one I can find is Joel Vanderwerf's
(http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db9...)
which doesn't work with more recent versions of ruby (because Queue
implementation changed from Ruby to C).

Cheers,
Sean

12 Answers

Robert Dober

7/9/2008 5:34:00 PM

0

Sean I seem to fail to understand why that change should have any
impact on Jo=EBl's work, can you elaborate please?

Cheers
Robert

--=20
http://ruby-smalltalk.blo...

---
AALST (n.) One who changes his name to be further to the front
D.Adams; The Meaning of LIFF

Trans

7/9/2008 5:50:00 PM

0



On Jul 9, 9:08=A0am, "Sean O'Halpin" <sean.ohal...@gmail.com> wrote:
> Hi,
>
> Does anyone know of a solid, thread-safe priority queue implementation in=
Ruby?
>
> The only one I can find is Joel Vanderwerf's
> (http://groups.google.com/group/comp.lang.ruby/browse_thread/thr...
)
> which doesn't work with more recent versions of ruby (because Queue
> implementation changed from Ruby to C).

Hmm... I'm not sure if Facets implementation is thread safe. It's may
be worth a look. If I recall correctly, Olivier Renaud was the last to
work on it, so he may know more. If it isn't thread safe, btw, it
would make a nice patch.

T.

Sean O'Halpin

7/9/2008 7:09:00 PM

0

On Wed, Jul 9, 2008 at 6:34 PM, Robert Dober <robert.dober@gmail.com> wrote=
:
> Sean I seem to fail to understand why that change should have any
> impact on Jo=EBl's work, can you elaborate please?
>
> Cheers
> Robert
>

I didn't explain that very well, did I? Joel's version inherits from
Queue and directly references an instance variable (@waiting) which
isn't there in the C version.

Sean O'Halpin

7/9/2008 9:25:00 PM

0

On Wed, Jul 9, 2008 at 6:50 PM, Trans <transfire@gmail.com> wrote:
>
> Hmm... I'm not sure if Facets implementation is thread safe. It's may
> be worth a look. If I recall correctly, Olivier Renaud was the last to
> work on it, so he may know more. If it isn't thread safe, btw, it
> would make a nice patch.

Thanks for the pointer but the Facets version isn't thread-safe.
Still searching... :)

Roger Pack

7/9/2008 11:38:00 PM

0

Sean O'halpin wrote:
> On Wed, Jul 9, 2008 at 6:50 PM, Trans <transfire@gmail.com> wrote:
>>
>> Hmm... I'm not sure if Facets implementation is thread safe. It's may
>> be worth a look. If I recall correctly, Olivier Renaud was the last to
>> work on it, so he may know more. If it isn't thread safe, btw, it
>> would make a nice patch.
>
> Thanks for the pointer but the Facets version isn't thread-safe.
> Still searching... :)

Could throw a mutex around the facets version.
--
Posted via http://www.ruby-....

Joel VanderWerf

7/10/2008 6:17:00 AM

0

Sean O'Halpin wrote:
> Hi,
>
> Does anyone know of a solid, thread-safe priority queue implementation in Ruby?
>
> The only one I can find is Joel Vanderwerf's
> (http://groups.google.com/group/comp.lang.ruby/browse_thread/thread/9d9db9...)
> which doesn't work with more recent versions of ruby (because Queue
> implementation changed from Ruby to C).

It's pretty easy to work around, I think. Try the following code. It's
based on something I'm using in live code and it seems to pass the test
referenced in the above link.

Btw, it's great that RBTree is a gem now. Thanks to whoever did that.



require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = Queue.new
@mutex = Mutex.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @que.num_waiting > 0
@que << obj
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if (last=@tree.last)
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end
end
@que.pop # wait
end
end


--
vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Martin DeMello

7/10/2008 6:26:00 AM

0

On Wed, Jul 9, 2008 at 11:17 PM, Joel VanderWerf
<vjoel@path.berkeley.edu> wrote:
>
> Btw, it's great that RBTree is a gem now. Thanks to whoever did that.

Ooh, seconded! Stand up and be thanked :)

martin

Joel VanderWerf

7/10/2008 6:42:00 AM

0


Looks like a race condition in that...

Joel VanderWerf wrote:
> require 'thread'
> require 'rbtree'
>
> class PriorityQueue
> def size
> @tree.size
> end
>
> def initialize(*)
> super
> @tree = MultiRBTree.new
> @que = Queue.new
> @mutex = Mutex.new
> end
>
> # Push +obj+ with priority equal to +pri+ if given or, otherwise,
> # the result of sending #queue_priority to +obj+. Objects are
> # dequeued in priority order, and first-in-first-out among objects
> # with equal priorities.
> def push(obj, pri = obj.queue_priority)
> @mutex.synchronize do
> if @que.num_waiting > 0
> @que << obj
> else
> @tree.store(pri, obj)
> end
> end
> end
>
> def pop(non_block=false)
> @mutex.synchronize do
> if (last=@tree.last)
> return @tree.delete(last[0]) # highest key, oldest first
> end
>
> if non_block
> raise ThreadError, "priority queue empty"
> end
> end

### Race happens here: if someone else calls #push, then
### this thread will wait even though data is available.

> @que.pop # wait
> end
> end

Will try to fix....

--
vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Joel VanderWerf

7/10/2008 7:09:00 AM

0

Joel VanderWerf wrote:
>
> Looks like a race condition in that...

Proposed fix, using a condition var... still needs some eyeballing and
some tests:

require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@que = [] # should never have more than one entry
@num_waiting = 0
@mutex = Mutex.new
@cond = ConditionVariable.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
if @num_waiting > 0
@que << obj
@cond.signal
else
@tree.store(pri, obj)
end
end
end

def pop(non_block=false)
@mutex.synchronize do
if (last=@tree.last)
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end

@num_waiting += 1
@cond.wait(@mutex)
@num_waiting -= 1
@que.pop
end
end
end



--
vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407

Joel VanderWerf

7/10/2008 7:03:00 PM

0

Joel VanderWerf wrote:
> Joel VanderWerf wrote:
>>
>> Looks like a race condition in that...
>
> Proposed fix, using a condition var... still needs some eyeballing and
> some tests:

That was not quite right either (because cond.signal only wakes the
waiter, and doesn't schedule it). The following seems to complete
without deadlocks or starvation.

require 'thread'
require 'rbtree'

class PriorityQueue
def size
@tree.size
end

def initialize(*)
super
@tree = MultiRBTree.new
@mutex = Mutex.new
@cond = ConditionVariable.new
end

# Push +obj+ with priority equal to +pri+ if given or, otherwise,
# the result of sending #queue_priority to +obj+. Objects are
# dequeued in priority order, and first-in-first-out among objects
# with equal priorities.
def push(obj, pri = obj.queue_priority)
@mutex.synchronize do
@tree.store(pri, obj)
@cond.signal
end
end

def pop(non_block=false)
@mutex.synchronize do
if (last=@tree.last)
return @tree.delete(last[0]) # highest key, oldest first
end

if non_block
raise ThreadError, "priority queue empty"
end

loop do
@cond.wait(@mutex)
if (last=@tree.last)
return @tree.delete(last[0])
end
end
end
end
end


if __FILE__ == $0

Thread.abort_on_exception = true

pq = PriorityQueue.new

n_items_per_thread = 1000
n_writers = 10
n_readers = 10

writers = (0...n_writers).map do |i_thr|
Thread.new do
n_items_per_thread.times do |i|
pri = rand(10)
pq.push([pri, i, i_thr], pri)
Thread.pass if rand(5) == 0
end
end
end

sleep 0.1 until pq.size > 100 # a little head start populating the tree

results = Array.new(n_readers, 0)

readers = (0...n_readers).map do |i|
Thread.new do
loop do
pq.pop
results[i] += 1
end
end
end

writers.each do |wr|
wr.join
end

p results
until pq.size == 0
sleep 0.1
p results
end

raise unless results.inject {|s,x|s+x} == n_items_per_thread * n_writers

end


--
vjoel : Joel VanderWerf : path berkeley edu : 510 665 3407