[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

Threaded IO trouble

Michal Suchanek

8/6/2008 12:09:00 PM

Hello

I do not understand the internals of Ruby IO but what I get here looks
really awful.

I am interfacing an application that processes lines of text and
buffers part of the output until EOF (or EOF tag). I thought using an
additional thread would be the right thing to do in this case: one
thread writes stuff, and as the application starts to output stuff
another thread wakes up to read it.

There are two problems here:

1) only reading inside the separate thread is possible, writing causes
stdin to be read into the program

2) reading inside the thread produces deadlocks

Attaching the files required to reproduce both failures.

=================== mock.rb ===================
buf=[]
while l=gets
buf << l
puts buf.shift if buf.length >3
end
=================== garbage ===================
ilks nvd
sdh v
df bhl
dj fbo
fj wr;oug
rbfg
=================== testt.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen( 'ruby -w mock.rb', IO::RDWR | IO::SYNC )
res = []
t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
while l = fd.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
ary.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
}
words.each{|w|
STDERR.puts "try_analyze: #{w}"
analyzer.puts "<f>"+w
}
analyzer.puts "</csts>"
analyzer.close_write
t.join
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}
=================== testt2.rb ===================
def unidentified l
false
end

def try_analyze *words
return [] if words.length < 1
analyzer = IO.popen 'ruby -w mock.rb'
res = []

t = Thread.new( (IO::for_fd analyzer.fileno) ,words){|fd,ary|
fd.sync = true
ary.each{|w|
STDERR.puts "try_analyze: #{w}"
fd.puts "<f>"+w
}
fd.puts "</csts>"
analyzer.close_write
}
analyzer.sync = true
while l = analyzer.gets do
break if l =~ %r|^</csts>$|
if l =~ /^$/
STDERR << "try_analyze: Empty line in output! \n"
next
else
res.push l
end
end
fd.close rescue nil # hopefully prevents zombie hordes
res.map{|w|
if unidentified w then
nil
else if w !~ /^<f/
STDERR << "try_analyze: JUNK: #{w} \n"
nil
else
w
end end
}
end

1.upto(10){|_|
try_analyze *%w(a b c d e f g h i)
}
======================================

Use:

$ cat garbage | ruby -w testt2.rb
testt2.rb:47: warning: `*' interpreted as argument prefix
try_analyze: a
try_analyze: JUNK: ilks nvd

try_analyze: JUNK: sdh v

try_analyze: JUNK: df bhl

try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a
try_analyze: a

$ cat garbage | ruby -w testt.rb

[lots of output snipped]

deadlock 0x53d4: sleep:F(4) - testt.rb:14
deadlock 0x31704: sleep:S (main) - testt.rb:27
testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
from testt.rb:27:in `puts'
from testt.rb:27:in `try_analyze'
from testt.rb:25:in `each'
from testt.rb:25:in `try_analyze'
from testt.rb:45
from testt.rb:44:in `upto'
from testt.rb:44


Thanks

Michal

12 Answers

Michal Suchanek

8/6/2008 1:03:00 PM

0

On 06/08/2008, Michal Suchanek <hramrach@centrum.cz> wrote:
> Hello
>
> I do not understand the internals of Ruby IO but what I get here looks
> really awful.
>
> I am interfacing an application that processes lines of text and
> buffers part of the output until EOF (or EOF tag). I thought using an
> additional thread would be the right thing to do in this case: one
> thread writes stuff, and as the application starts to output stuff
> another thread wakes up to read it.
>
> There are two problems here:
>
> 1) only reading inside the separate thread is possible, writing causes
> stdin to be read into the program
>
> 2) reading inside the thread produces deadlocks
>
> Attaching the files required to reproduce both failures.
>
> =================== mock.rb ===================
> buf=[]
> while l=gets
> buf << l
> puts buf.shift if buf.length >3
> end
Actually the mockup misses a line here:

while l = buf.shift do puts l end

However, the program reproduces the errors anyway.
> =================== garbage ===================
> ilks nvd
> sdh v
> df bhl
> dj fbo
> fj wr;oug
> rbfg
> =================== testt.rb ===================
> def unidentified l
> false
> end
>
> def try_analyze *words
> return [] if words.length < 1
> analyzer = IO.popen( 'ruby -w mock.rb', IO::RDWR | IO::SYNC )
> res = []
> t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
> while l = fd.gets do
> break if l =~ %r|^</csts>$|
> if l =~ /^$/
> STDERR << "try_analyze: Empty line in output! \n"
> next
> else
> ary.push l
> end
> end
> fd.close rescue nil # hopefully prevents zombie hordes
> }
> words.each{|w|
> STDERR.puts "try_analyze: #{w}"
> analyzer.puts "<f>"+w
> }
> analyzer.puts "</csts>"
> analyzer.close_write
> t.join
> res.map{|w|
> if unidentified w then
> nil
> else if w !~ /^<f/
> STDERR << "try_analyze: JUNK: #{w} \n"
> nil
> else
> w
> end end
> }
> end
>
> 1.upto(10000){|_|
> try_analyze *%w(a b c d e f g h i)
> }
> =================== testt2.rb ===================
> def unidentified l
> false
> end
>
> def try_analyze *words
> return [] if words.length < 1
> analyzer = IO.popen 'ruby -w mock.rb'
> res = []
>
> t = Thread.new( (IO::for_fd analyzer.fileno) ,words){|fd,ary|
> fd.sync = true
> ary.each{|w|
> STDERR.puts "try_analyze: #{w}"
> fd.puts "<f>"+w
> }
> fd.puts "</csts>"
> analyzer.close_write
> }
> analyzer.sync = true
> while l = analyzer.gets do
> break if l =~ %r|^</csts>$|
> if l =~ /^$/
> STDERR << "try_analyze: Empty line in output! \n"
> next
> else
> res.push l
> end
> end
> fd.close rescue nil # hopefully prevents zombie hordes
> res.map{|w|
> if unidentified w then
> nil
> else if w !~ /^<f/
> STDERR << "try_analyze: JUNK: #{w} \n"
> nil
> else
> w
> end end
> }
> end
>
> 1.upto(10){|_|
> try_analyze *%w(a b c d e f g h i)
> }
> ======================================
>
> Use:
>
> $ cat garbage | ruby -w testt2.rb
> testt2.rb:47: warning: `*' interpreted as argument prefix
> try_analyze: a
> try_analyze: JUNK: ilks nvd
>
> try_analyze: JUNK: sdh v
>
> try_analyze: JUNK: df bhl
>
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
> try_analyze: a
>
> $ cat garbage | ruby -w testt.rb
>
> [lots of output snipped]
>
> deadlock 0x53d4: sleep:F(4) - testt.rb:14
> deadlock 0x31704: sleep:S (main) - testt.rb:27
> testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
> from testt.rb:27:in `puts'
> from testt.rb:27:in `try_analyze'
> from testt.rb:25:in `each'
> from testt.rb:25:in `try_analyze'
> from testt.rb:45
> from testt.rb:44:in `upto'
> from testt.rb:44
>
>
> Thanks
>
> Michal
>
>

Robert Klemme

8/7/2008 8:05:00 AM

0

2008/8/6 Michal Suchanek <hramrach@centrum.cz>:
> On 06/08/2008, Michal Suchanek <hramrach@centrum.cz> wrote:
>> Hello
>>
>> I do not understand the internals of Ruby IO but what I get here looks
>> really awful.
>>
>> I am interfacing an application that processes lines of text and
>> buffers part of the output until EOF (or EOF tag). I thought using an
>> additional thread would be the right thing to do in this case: one
>> thread writes stuff, and as the application starts to output stuff
>> another thread wakes up to read it.
>>
>> There are two problems here:
>>
>> 1) only reading inside the separate thread is possible, writing causes
>> stdin to be read into the program
>>
>> 2) reading inside the thread produces deadlocks
>>
>> Attaching the files required to reproduce both failures.
>>
>> =================== mock.rb ===================
>> buf=[]
>> while l=gets
>> buf << l
>> puts buf.shift if buf.length >3
>> end
> Actually the mockup misses a line here:
>
> while l = buf.shift do puts l end

And it misses another line at the beginning:

$defout.sync = true

:-)

With that change for me this works as expected:

10:04:42 Temp$ ./ttest.rb
1218096292.04: main : 0
1218096293.07: main : 1
1218096297.08: main : 2
1218096298.08: main : 3
1218096298.08: reader : 0
1218096303.08: main : 4
1218096303.08: reader : 1
1218096306.09: reader : 2
1218096306.09: reader : 3
1218096306.09: reader : 4
10:05:06 Temp$ cat ttest.rb
#!/bin/env ruby

def log(m)
t = Thread.current
printf "%10.2f: %-10s: %s\n",
Time.now,
t[:label] || t.inspect,
m
end

Thread.current[:label] = "main"

IO.popen "./mock.rb", "r+" do |io|
io.sync = true

t = Thread.new do
Thread.current[:label] = "reader"
io.each do |line|
log line.chomp
end
end

5.times do |i|
log i
io.puts i
sleep(1 + rand(5))
end

io.close_write
t.join
end
10:05:08 Temp$

Kind regards

robert

--
use.inject do |as, often| as.you_can - without end

Michal Suchanek

8/7/2008 9:06:00 AM

0

On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
>
> And it misses another line at the beginning:
>
> $defout.sync = true
>
> :-)

Fixing the mockup is not the right way, I need the ruby script working
with the application which is represented by the mockup. Also doing 5
repetitions is insufficient to test for the problem. It fails only in
few % of cases.

Thanks

Michal

Robert Klemme

8/7/2008 11:17:00 AM

0

2008/8/7 Michal Suchanek <hramrach@centrum.cz>:
> On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
>>
>> And it misses another line at the beginning:
>>
>> $defout.sync = true
>>
>> :-)
>
> Fixing the mockup is not the right way,

Maybe I am missing your point but the buffering in the mock client is
certainly delaying the complete process. Also, I did not only add
that line to mock.rb but provided a different client implementation.
Did you actually try it out?

> I need the ruby script working
> with the application which is represented by the mockup. Also doing 5
> repetitions is insufficient to test for the problem. It fails only in
> few % of cases.

Well, you can imagine that I did not want to post 1,000,000 lines
here. Do the errors you report show up with my code when increasing
the number of lines?

I usually start with a simple bit to verify the basic functionality is
ok and build from there. I do not understand, why you go to all this
fd duplicating stuff. Also, I find your processor quite complex and
it seems that for the error you are seeing a simpler program will be
sufficient.

Btw, what does "writing causes stdin to be read into the program" exactly mean?

Cheers

robert

--
use.inject do |as, often| as.you_can - without end

Michal Suchanek

8/7/2008 1:16:00 PM

0

On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
> 2008/8/7 Michal Suchanek <hramrach@centrum.cz>:
>
> > On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
> >>
> >> And it misses another line at the beginning:
> >>
> >> $defout.sync = true
> >>
> >> :-)
> >
> > Fixing the mockup is not the right way,
>
>
> Maybe I am missing your point but the buffering in the mock client is
> certainly delaying the complete process. Also, I did not only add
> that line to mock.rb but provided a different client implementation.
> Did you actually try it out?

Sorry, I did not try that yet.

>
>
> > I need the ruby script working
> > with the application which is represented by the mockup. Also doing 5
> > repetitions is insufficient to test for the problem. It fails only in
> > few % of cases.
>
>
> Well, you can imagine that I did not want to post 1,000,000 lines
> here. Do the errors you report show up with my code when increasing
> the number of lines?
>
> I usually start with a simple bit to verify the basic functionality is
> ok and build from there. I do not understand, why you go to all this
> fd duplicating stuff. Also, I find your processor quite complex and
> it seems that for the error you are seeing a simpler program will be
> sufficient.

I came from the other end: I have a script that used to work so I
tried to rip minimal piece of the script so that if a modification to
the example makes it work I would be able to apply the modification
back to the original script.

>
> Btw, what does "writing causes stdin to be read into the program" exactly mean?

Did you try testt2?

That prints out the stuff that you feed it on stdin for some reason I
don't understand at all.

Thanks

Michal

Michal Suchanek

8/7/2008 1:28:00 PM

0

On 07/08/2008, Michal Suchanek <hramrach@centrum.cz> wrote:
> On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
> > 2008/8/7 Michal Suchanek <hramrach@centrum.cz>:
> >
> > > On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
> > >>
> > >> And it misses another line at the beginning:
> > >>
> > >> $defout.sync = true
> > >>
> > >> :-)
> > >
> > > Fixing the mockup is not the right way,
> >
> >
> > Maybe I am missing your point but the buffering in the mock client is
> > certainly delaying the complete process. Also, I did not only add
> > that line to mock.rb but provided a different client implementation.
> > Did you actually try it out?
>
>
> Sorry, I did not try that yet.
>

Well, I did not quite understand how your client maps to my client,
and it is indeed completely different.

While your client does only one popen my client does numerous popens.
This is necessary to work around the buffering: the pipe is closed
every time so that the application knows it's time to print out the
rest of the output.

Your client is indeed very slow with the mockup and might fail
completely with the application I use.

I haven't explored the option of using an EOF marker instead of
closing the pipe too much. Maybe that would be the option that works
around limitations in both my application and Ruby.

As for duplication the IO using a fd: I was unsure it is safe to use a
single IO structure in two threads.

Thanks

Michal

Robert Klemme

8/7/2008 5:05:00 PM

0

On 07.08.2008 15:27, Michal Suchanek wrote:
> On 07/08/2008, Michal Suchanek <hramrach@centrum.cz> wrote:
>> On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
>> > 2008/8/7 Michal Suchanek <hramrach@centrum.cz>:
>> >
>> > > On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:
>> > >>
>> > >> And it misses another line at the beginning:
>> > >>
>> > >> $defout.sync = true
>> > >>
>> > >> :-)
>> > >
>> > > Fixing the mockup is not the right way,
>> >
>> >
>> > Maybe I am missing your point but the buffering in the mock client is
>> > certainly delaying the complete process. Also, I did not only add
>> > that line to mock.rb but provided a different client implementation.
>> > Did you actually try it out?
>>
>>
>> Sorry, I did not try that yet.
>>
>
> Well, I did not quite understand how your client maps to my client,
> and it is indeed completely different.
>
> While your client does only one popen my client does numerous popens.
> This is necessary to work around the buffering: the pipe is closed
> every time so that the application knows it's time to print out the
> rest of the output.

Well, that obviously depends on the application you are using for
filtering. If it will send out the last output only on termination
there is no other way than to create new processes over and over again.
Of course, this is not really efficient - especially for short lived
processes - no matter what programming language you use.

> Your client is indeed very slow with the mockup and might fail
> completely with the application I use.

I am not sure what you mean by "slow". I hope you noticed the sleep in
there.

> I haven't explored the option of using an EOF marker instead of
> closing the pipe too much. Maybe that would be the option that works
> around limitations in both my application and Ruby.

So the application that replaces mock.rb is under your control? I was
assuming that it is not and you need to work with it as is. In that
case you should certainly introduce some kind of signaling for the end
of one chunk of data (if that is needed).

> As for duplication the IO using a fd: I was unsure it is safe to use a
> single IO structure in two threads.

It is because you are using it in one direction in each thread only.

Cheers

robert

Michal Suchanek

8/7/2008 5:23:00 PM

0

On 07/08/2008, Robert Klemme <shortcutter@googlemail.com> wrote:

> Well, that obviously depends on the application you are using for
> filtering. If it will send out the last output only on termination there is
> no other way than to create new processes over and over again. Of course,
> this is not really efficient - especially for short lived processes - no
> matter what programming language you use.

I am under the impression it is so. Using single instance of the
application did not work so far. However, there are at least two
markers that could be possibly used for input termination so it might
be worth experimenting with that a bit.

Still even if it is possible to run a single instance ruby should be
able to run multiple instances, too.

>
>
> > Your client is indeed very slow with the mockup and might fail
> > completely with the application I use.
> >
>
> I am not sure what you mean by "slow". I hope you noticed the sleep in
> there.
>
Oh, yeah, no ehmm..
>
> > I haven't explored the option of using an EOF marker instead of
> > closing the pipe too much. Maybe that would be the option that works
> > around limitations in both my application and Ruby.
> >
>
> So the application that replaces mock.rb is under your control? I was
> assuming that it is not and you need to work with it as is. In that case
> you should certainly introduce some kind of signaling for the end of one
> chunk of data (if that is needed).

Unfortunately it is not. If it was so I would use a saner interface in
the first place.

>
>
> > As for duplication the IO using a fd: I was unsure it is safe to use a
> > single IO structure in two threads.
> >
>
> It is because you are using it in one direction in each thread only.

Thanks

Michal

Roger Pack

8/7/2008 8:33:00 PM

0

> deadlock 0x53d4: sleep:F(4) - testt.rb:14
> deadlock 0x31704: sleep:S (main) - testt.rb:27
> testt.rb:27:in `write': Thread(0x31704): deadlock (fatal)
> from testt.rb:27:in `puts'
> from testt.rb:27:in `try_analyze'


I recently got something similar waiting for threads:
/lib/ruby_useful_here.rb:682:in `join': Thread(0x346fc): deadlock
(fatal)

In error, at least to my reckoning.
Answer for now: avoid threads and just used evented sockets, like Rev or
EventMachine.

-R
--
Posted via http://www.ruby-....

Michal Suchanek

8/14/2008 4:35:00 PM

0

So it turns out the application I am using has some provisions for
running in terminal. If I run just the application from a shell it
behaves much like cat - it processes lines as I type them except it
buffers one line. If I wrap it in iconvs or cats it buffers much more,
and the only way to get output is to close the input pipe.

So I would really like to get ruby to use pipes properly.

I can reproduce deadlocks even when I replace the application with cat:

$ cat testt.rb
def try_analyze *words
analyzer = IO.popen( 'cat', IO::RDWR | IO::SYNC )
res = []

t = Thread.new( (IO::for_fd analyzer.fileno) ,res){|fd,ary|
while l = fd.gets do
STDERR.putc 8
ary.push l
end
fd.close rescue nil # hopefully prevents zombie hordes
}

words.each{|w|
STDERR.putc '.'[0]
analyzer.puts w
}

analyzer.close_write rescue nil
t.join
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}

$ ruby -w testt.rb
testt.rb:23: warning: `*' interpreted as argument prefix
....cat: stdin: Bad file descriptor
deadlock 0x5154: sleep:F(4) - testt.rb:6
deadlock 0x318f8: sleep:S (main) - testt.rb:15
testt.rb:15:in `write': Thread(0x318f8): deadlock (fatal)
from testt.rb:15:in `puts'
from testt.rb:15:in `try_analyze'
from testt.rb:13:in `each'
from testt.rb:13:in `try_analyze'
from testt.rb:23
from testt.rb:22:in `upto'
from testt.rb:22

The other way around the code looks much simpler but it really locks up:

$ cat testt2.rb
def try_analyze *words
res=[]
analyzer = IO.popen 'cat', IO::RDWR + IO::SYNC

t = Thread.new(analyzer, words){|fd,ary|
ary.each{|w|
STDERR.putc '.'[0]
fd.puts w
}
fd.close_write
}

while l = analyzer.gets do
STDERR.putc 8
res.push l
end
fd.close rescue nil # hopefully prevents zombie hordes
res
end

1.upto(10000){|_|
try_analyze *%w(a b c d e f g h i)
}

$ ruby -w testt2.rb
testt2.rb:22: warning: `*' interpreted as argument prefix
^Ctestt2.rb:3:in `popen': Interrupt from testt2.rb:3:in `try_analyze'
from testt2.rb:22
from testt2.rb:21:in `upto'
from testt2.rb:21


Any suggestions how to make pipes work in ruby are welcome.

Thanks

Michal