[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

Queue, with timeout. (Thread.notify...

Tim Becker

5/12/2007 1:39:00 PM

Hi,

I'm trying to build a message queue that can receive messages, blocks
until messages arrive and has the ability to only block for a limited
amount of time. Unfortunately, the build-in Queue doesn't support
timeout behaviour, so I went about implementing a queue myself. I
missed java's Object.wait method, i.e. using a plain object to notify
the blocking thread to resume .

I ended up using `thread.join(timeout)` which blocks the current
thread until the thread that `join` was called on finishes (or until
timeout is reached). My queue doesn't need to join any particular
thread, though, it just needs to be notified when a new message is
added to the queue. So I needed to create a `dummy` thread solely to
provide notification. What I came up with is this:

require 'thread'

class MessageQueue
def initialize
@lock = Mutex.new
@t = Thread.new{Thread.stop} # this thread exists solely for notification
@queue = []
end

def enqueue msg
@lock.synchronize {
@queue.push msg
t=@t
@t=Thread.new{Thread.stop}
t.kill# notifiy waiting thread
}
end

def dequeue timeout=nil
msg = nil
stop = Time.now+timeout if timeout
while msg == nil
@lock.synchronize {msg = @queue.shift}
break if timeout && Time.new >= stop
@t.join(timeout) unless msg # wait for new message
end #while
msg
end

end

While this works fine, it seems like a bit of a kludge because I have
to create/misuses threads solely to provided notification to the
blocking thread.

Any thought? Am I missing an easier/build-in method to block a thread
only for a limited about of time?

Thanks,
-tim

3 Answers

Avdi Grimm

5/12/2007 7:18:00 PM

0

On 5/12/07, Tim Becker <a2800276@gmail.com> wrote:
> I'm trying to build a message queue that can receive messages, blocks
> until messages arrive and has the ability to only block for a limited
> amount of time. Unfortunately, the build-in Queue doesn't support
> timeout behaviour, so I went about implementing a queue myself. I
> missed java's Object.wait method, i.e. using a plain object to notify
> the blocking thread to resume .

Use 'timeout':

require 'timeout'
require 'thread'

q = Queue.new

Thread.new do
q.push "thing"
sleep 10
end

begin
Timeout::timeout(1) do
q.pop
q.pop
end
rescue Timeout::Error
puts "timed out!"
end


--
Avdi

MenTaLguY

5/12/2007 8:14:00 PM

0

On Sat, 2007-05-12 at 22:39 +0900, Tim Becker wrote:
> Any thought? Am I missing an easier/build-in method to block a thread
> only for a limited about of time?

Unless/until the built-in classes support timeouts, using a thread to
track the timeout is sadly your best option.

However, there are a number of problems with the specific approach
you've used here (e.g. not all accesses to @t are protected by the
mutex)... here's an alternate implementation...

require 'thread'
begin
require 'fastthread'
rescue LoadError
end

class MessageQueue
def initialize
@lock = Mutex.new
@messages = []
@readers = []
end

def enqueue msg
@lock.synchronize do
unless @readers.empty?
@readers.pop << msg
else
@messages.push msg
end
end
end

def dequeue timeout=nil
timeout_thread = nil
begin
reader = nil
@lock.synchronize do
unless @messages.empty?
# fast path
return @messages.shift
else
reader = Queue.new
@readers.push reader
if timeout
timeout_thread = Thread.new do
sleep timeout
@lock.synchronize do
@readers.delete reader
reader << nil
end
end
end
end
end
# either timeout or writer will send to us
reader.shift
ensure
# (try to) clean up timeout thread
timeout_thread.run if timeout_thread
end
end
end

Queue may seem sort of heavyweight to use for "callbacks" this way, but
if you use fastthread they are pretty inexpensive, and queues take care
of a _lot_ of the bookkeeping you would need to do to make this safe
otherwise.

One remaining issue is that it's possible for the timeout thread to
sleep _after_ the call to timeout_thread.run, so the cleanup isn't
necessarily that effective. However, Thread#kill (or Thread#raise)
isn't an option, since those can leave things in an inconsistent state.

A better solution would be to maintain a dedicated thread for tracking
timeouts which you don't have to worry about cleaning up -- I'll be
releasing a library to do that soon, but for now the above solution is
the best I can do.

-mental

MenTaLguY

5/12/2007 10:30:00 PM

0

On Sun, 2007-05-13 at 04:17 +0900, Avdi Grimm wrote:
> Use 'timeout':

I'd be cautious of using timeout, since not all of stdlib is safe with
respect to asynchronous exceptions (it's sometimes very difficult to do
properly). I think it's okay with the old thread.rb implementation of
Queue specifically, but I'm not 100% sure about the current C
implementation or other Ruby implementations like JRuby.

-mental