[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

thread.rb

Adam Bender

3/25/2008 3:33:00 AM

I want to modify the Queue class in thread.rb, so I first searched for
the file. I found it in /usr/lib/ruby/1.8/thread.rb, both on a Mac
and a Linux machine. On the Mac, the file was as expected, but on the
Linux machine, it contained only this:
-----------------------
unless defined? Thread
fail "Thread not available for this ruby interpreter"
end

require 'thread.so'
-----------------------
Does this mean the Ruby code for Queue is compiled into a shared
library? Is it possible to edit the code? (I know I can modify the
class from outside the file, and that is probably what I will end up
doing, but I'm rather surprised by the above file contents and would
like to know more about it).

Thanks,

Adam

P.S. On a related note, I want to add a method to the Queue class that
is similar to pop, but will take a timeout value and return nil if it
no item is push'ed before timeout seconds have elapsed. Can anyone
suggest something better than the following:

class Queue
def pop_with_timeout(timeout)
elapsed = 0.0
while (Thread.critical = true; @que.empty?)
@waiting.push Thread.current
Thread.critical = false
elapsed += sleep(timeout - elapsed)
puts elapsed
return nil if elapsed >= timeout
end
@que.shift
ensure
Thread.critical = false
end
end

That was the first thing I came up with, and I'm (again) surprised
that it works - why does sleep ever return less than 1? What
interrupts it?

11 Answers

Robert Klemme

3/25/2008 11:58:00 AM

0

2008/3/25, Adam Bender <abender@gmail.com>:
> I want to modify the Queue class in thread.rb, so I first searched for

Why?

> P.S. On a related note, I want to add a method to the Queue class that
> is similar to pop, but will take a timeout value and return nil if it
> no item is push'ed before timeout seconds have elapsed. Can anyone
> suggest something better than the following:
>
> class Queue
> def pop_with_timeout(timeout)
> elapsed = 0.0
> while (Thread.critical = true; @que.empty?)
> @waiting.push Thread.current
> Thread.critical = false
> elapsed += sleep(timeout - elapsed)
> puts elapsed
> return nil if elapsed >= timeout
> end
> @que.shift
> ensure
> Thread.critical = false
> end
> end
>
> That was the first thing I came up with, and I'm (again) surprised
> that it works - why does sleep ever return less than 1? What
> interrupts it?

I opt against using Thread.critical because this is a global exclusive
lock on all threads and it is unlikely to continue to exist in the
light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
#wait with timeout parameter.

Kind regards

robert

--
use.inject do |as, often| as.you_can - without end

Adam Bender

3/25/2008 5:05:00 PM

0

On Tue, Mar 25, 2008 at 7:57 AM, Robert Klemme
<shortcutter@googlemail.com> wrote:
> 2008/3/25, Adam Bender <abender@gmail.com>:
>
> > I want to modify the Queue class in thread.rb, so I first searched for
>
> Why?
>
What what? I want to modify the class as described below; I searched
for the file so I could see how pop was implemented and the class
variables it used.


> > P.S. On a related note, I want to add a method to the Queue class that
> > is similar to pop, but will take a timeout value and return nil if it
> > no item is push'ed before timeout seconds have elapsed. Can anyone
> > suggest something better than the following:
> >
> > class Queue
> > def pop_with_timeout(timeout)
> > elapsed = 0.0
> > while (Thread.critical = true; @que.empty?)
> > @waiting.push Thread.current
> > Thread.critical = false
> > elapsed += sleep(timeout - elapsed)
> > puts elapsed
> > return nil if elapsed >= timeout
> > end
> > @que.shift
> > ensure
> > Thread.critical = false
> > end
> > end
> >
> > That was the first thing I came up with, and I'm (again) surprised
> > that it works - why does sleep ever return less than 1? What
> > interrupts it?
>
> I opt against using Thread.critical because this is a global exclusive
> lock on all threads and it is unlikely to continue to exist in the
> light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
> #wait with timeout parameter.

The Queue class uses Thread.critical. Though I guess writing a queue
using a Monitor would be easy enough to do.

Adam

MenTaLguY

3/25/2008 6:01:00 PM

0

On Tue, 25 Mar 2008 12:32:38 +0900, "Adam Bender" <abender@gmail.com> wrote:
> require 'thread.so'
> -----------------------
> Does this mean the Ruby code for Queue is compiled into a shared
> library? Is it possible to edit the code? (I know I can modify the
> class from outside the file, and that is probably what I will end up
> doing, but I'm rather surprised by the above file contents and would
> like to know more about it).

Yes, it's implemented in C. In JRuby, it's implemented in Java.

I do agree that the following thread.rb operations need to directly
support timeouts, though:

ConditionVariable#wait
Queue#pop
SizedQueue#pop
SizedQueue#push

Unfortunately, there is no _reliable_ way to patch timeouts in after
the fact. The support needs to be added to Ruby core.

In the interim, I think the best you can do is to implement your own
queue data structure which supports timeouts using Mutex,
ConditionVariable and the 'scheduler' gem. For instance:

require 'thread'
require 'scheduler'

class MyQueue
def initialize
@lock = Mutex.new
@values = []
@readers = []
end

class Timeout < Interrupt
end

class Reader
def initialize
@condition = ConditionVariable.new
@state = :waiting
@value = nil
end

def timeout
@state = :timeout
@condition.signal
self
end

def value=(value)
@value = value
@state = :ready
@condition.signal
self
end

def wait_for_value(lock)
@condition.wait(lock) while @state == :waiting
raise Timeout, "timed out" unless @state == :ready
@value
end
end

def push(value)
@lock.synchronize do
if @readers.empty?
@values.push value
else
@readers.pop.value = value
end
end
self
end

def pop(timeout=nil)
@lock.synchronize do
unless @values.empty?
@values.pop
else
reader = Reader.new
@readers.push reader
begin
if timeout
timeout = Scheduler.after_delay!(timeout) do
@lock.synchronize do
reader.timeout if @readers.delete reader
end
end
end
reader.wait_for_value(@lock)
finally
@readers.delete reader
timeout.cancel if timeout
end
end
end
end
end

(This is untested, but should basically do what you need and you're
welcome to use it. Feel free to ask any questions you might have.)

-mental


MenTaLguY

3/25/2008 6:12:00 PM

0

On Tue, 25 Mar 2008 20:57:48 +0900, "Robert Klemme" <shortcutter@googlemail.com> wrote:
> I opt against using Thread.critical because this is a global exclusive
> lock on all threads and it is unlikely to continue to exist in the
> light of JRuby and others. I'd rather use Mutex#try_lock or Monitor's
> #wait with timeout parameter.

Agreed, anything using Thread.critical is not very future-proof.

Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at least)
it uses Timeout in an uncontrolled way and is not entirely reliable as a
result. To implement Monitor robustly in JRuby we ended up adding timeout
support to JRuby's ConditionVariable#wait.

*** I would strongly recommend _against_ using any concurrency primitive in
1.9 except for the primitives from thread.rb/thread.so. ***

Given that, the most directly portable way to get a robust queue that
supports timeouts is to roll your own using Mutex, ConditionVariable, and
some safe source of timed events like Scheduler.

-mental


MenTaLguY

3/25/2008 6:32:00 PM

0

On Wed, 26 Mar 2008 03:11:55 +0900, MenTaLguY <mental@rydia.net> wrote:
> Note, however, that while Monitor is okay under JRuby, in MRI (1.9 at
> least) it uses Timeout in an uncontrolled way and is not entirely reliable
> as a result.

I am wrong. Timeouts are currently unimplemented in 1.9's Monitor.

> *** I would strongly recommend _against_ using any concurrency primitive
> in 1.9 except for the primitives from thread.rb/thread.so. ***

Evidently more things have changed since 1.8 than I had thought, so
this may or may not still hold.

-mental


Adam Bender

3/25/2008 10:32:00 PM

0

On Tue, Mar 25, 2008 at 2:01 PM, MenTaLguY <mental@rydia.net> wrote:
> I do agree that the following thread.rb operations need to directly
> support timeouts, though:
>
> ConditionVariable#wait

ConditionVariable#wait appears to support timeouts
(http://www.ruby-doc.org/stdlib/libdoc/monitor/rdoc/classes/MonitorMixin/ConditionVariable.ht...).
Do they not work as expected?

I couldn't find the Scheduler gem, so I'm trying to hack something
together based your code example, but using Monitors. It depends on
ConditionVariable#wait, so it could be totally broken. I didn't
understand the point of the Reader class in your code, since only one
thread at a time can acquire the lock in pop. This problem is closely
related to the example on page 146 of Pickaxe 2nd ed.

require 'monitor'

class MyQueue

def initialize
@values = []
@values.extend(MonitorMixin)
@cond = @values.new_cond
end

def push(value)
@values.synchronize do
@values.push value
@cond.signal
end
self
end

def pop(timeout=nil)
ret = nil
@values.synchronize do
t = Time.now
@cond.wait(timeout)
puts "waited #{Time.now - t}"
ret = @values.shift unless @values.empty?
end
return ret
end
end

q = MyQueue.new

consumers = (1..3).map do |i|
Thread.new("consumer #{i}") do |name|
begin
obj = q.pop(5)
puts "#{name} consumed #{obj.inspect}"
sleep(rand(0.05))
end until obj == :END_OF_WORK
end
end

producers = (1..3).map do |i|
Thread.new("producer #{i}") do |name|
3.times do |j|
sleep(1)
q.push("Item #{j} from #{name}")
end
end
end

producers.each { |th| th.join }
consumers.size.times { q.push(:END_OF_WORK) }
consumers.each { |th| th.join }

The problem is that the wait() waits for timeout seconds, even when
something is in the queue.

Thanks for your help,

Adam

Adam Bender

3/25/2008 10:38:00 PM

0

On Tue, Mar 25, 2008 at 6:31 PM, Adam Bender <abender@gmail.com> wrote:
> On Tue, Mar 25, 2008 at 2:01 PM, MenTaLguY <mental@rydia.net> wrote:
> > I do agree that the following thread.rb operations need to directly
> > support timeouts, though:
> >
> > ConditionVariable#wait
>
> ConditionVariable#wait appears to support timeouts
> (http://www.ruby-doc.org/stdlib/libdoc/monitor/rdoc/classes/MonitorMixin/ConditionVariable.ht...).
> Do they not work as expected?

My apologies, I didn't read your previous replies thoroughly enough.
Anyways, I'm not worried about 1.9 portability just yet, I just need
something that works in 1.8.6.

Thanks again,

Adam

MenTaLguY

3/26/2008 12:06:00 AM

0

On Wed, 26 Mar 2008 07:32:27 +0900, "Adam Bender" <abender@gmail.com> wrote:
> I couldn't find the Scheduler gem

gem install scheduler

> I didn't understand the point of the Reader class in your code, since
> only one thread at a time can acquire the lock in pop.

The lock is released (by ConditionVariable#wait) while a reader is
waiting (and re-acquired before ConditionVariable#wait returns).
Otherwise writers couldn't get in to write.

Conceptually, a Queue is actually two queues: a queue of "pushes" which
holds pushed values, and a queue of "pops" which holds threads waiting
for new values to be pushed.

If there have been more pushes than pops, the "pushes" queue will have
entries in it, and if there have been more pops than pushes, the "pops"
queue will have entries in it. Each push tries to take an entry from
the "pops" queue and vice-versa, so that when there have been an equal
number of pushes and pops, both queues will be empty.

Readers are just used to represent entries in the "pops" queue.

-mental


MenTaLguY

3/26/2008 12:10:00 AM

0

On Wed, 26 Mar 2008 07:32:27 +0900, "Adam Bender" <abender@gmail.com> wrote:
> def push(value)
> @values.synchronize do
> @values.push value
> @cond.signal
> end
> self
> end
>
> def pop(timeout=nil)
> ret = nil
> @values.synchronize do
> t = Time.now
> @cond.wait(timeout)
> puts "waited #{Time.now - t}"
> ret = @values.shift unless @values.empty?
> end
> return ret
> end

> The problem is that the wait() waits for timeout seconds, even when
> something is in the queue.

In this case, the problem is that you are always putting the thread to
sleep on the "pops" queue, even when there is already a pushed value
available on the "pushes" queue.

You need to check whether the "pushes" queue is empty and only sleep
when it is:

def pop
@values.synchronize do
while @values.empty?
@cond.wait
end
@values.shift
end
end

Note that I've used a while loop rather than a simple if test; this is
for several reasons, but mainly because it is possible for another
thread to "steal" a value from the "pops" queue before we finish
waking up. Because of that, we have to check the predicate again after
@cond.wait returns, and potentially go back to sleep.

Implementing this in conjunction with timeout is harder, but possible:

def pop(timeout = nil)
if timeout
deadline = Time.now + timeout
else
deadline = nil
end
@values.synchronize do
while @values.empty?
if deadline
timeout = deadline - Time.now
return nil if timeout <= 0
end
@cond.wait(timeout)
end
@values.shift
end
end

Since this uses the system clock rather than a monotonic timer, it does
still have a bug inasmuch as changing the system time can cause #pop to
wait for too long or too little, but Ruby 1.8 and 1.9 have that same bug
in their thread schedulers anyway.

-mental


MenTaLguY

3/26/2008 12:13:00 AM

0

On Wed, 26 Mar 2008 07:37:32 +0900, "Adam Bender" <abender@gmail.com> wrote:
> My apologies, I didn't read your previous replies thoroughly enough.
> Anyways, I'm not worried about 1.9 portability just yet, I just need
> something that works in 1.8.6.

Monitor should work then, although the implementation of timeouts is
a little flaky in 1.8. It should work reliably in JRuby though.

-mental