[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

Is there a standard pattern for threaded access to a file?

Jon Handler

10/12/2007 11:52:00 PM

I'm pretty new to ruby and this is one of those areas where I can't
quite seem to turn my head inside out as the language requires :-)

I have a log file that I want to process in parts, with multiple threads
working from the same file, each one getting a line at a time and doing
something with it.

I'd like something like

1. Open the file
2. Create 5 threads

Each thread should read a line of the file and process it, but no 2
threads should get the same line.

What I have looks like

open (ARGV.flags.log) do |logfile|
logfile.each do |line|

blah blah blah...

end
end

but that's inside out! How do I rubify this code?

Thanks,

Jon
--
Posted via http://www.ruby-....

35 Answers

ara.t.howard

10/13/2007 1:58:00 AM

0


On Oct 12, 2007, at 5:51 PM, Jon Handler wrote:

>
> 1. Open the file
> 2. Create 5 threads
>
> Each thread should read a line of the file and process it, but no 2
> threads should get the same line.
>

cfp:~ > cat a.rb
require 'thread'

q = Queue.new

threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }

open(__FILE__){|fd| fd.each{|line| q.push line} }

threads.map{|t| t.join}


cfp:~ > ruby a.rb
require 'thread'

q = Queue.new

threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }



a @ http://codeforp...
--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama




7stud --

10/13/2007 2:35:00 AM

0

ara.t.howard wrote:
>
> threads.map{|t| t.join}
>
>

Skip that inappropriate suggestion and use the following instead:

threads.each {|t| t.join}

--
Posted via http://www.ruby-....

7stud --

10/13/2007 3:59:00 AM

0

Actually, the example provided won't even work in your case. You have
to do some extra things.

>
> I'm pretty new to ruby
>

A Queue is a first in first out container, which means the items you
push() into one end of the Queue are the first items that pop() out the
other end. A Queue is also thread safe, which means that only one
thread can access it at the same time.

Therefore, you can push() the lines from your file into one end of the
Queue, and you can have each thread pop() a line off the other end of
the Queue.

If there is nothing in the Queue, then a thread that tries to pop() a
line from the Queue will block until more data becomes available. As a
result, even after all the lines have been read from the Queue, each
thread will come back to the Queue and try to pop() another line, but
since there won't be any more lines left, the threads will block and
wait for more data. That means the threads will never end. To make
your threads stop trying to read more lines from the Queue once it's
exhausted, you will need to send each thread a string that acts as a
termination message.

You could first push() all the lines from your file into the Queue, and
then start the threads, but you might as well get the threads working on
the first lines while you are pushing the rest of the lines into the
Queue. So, start the threads and let them block, then start pushing
the lines from the file into the Queue.


require 'thread'

#Create some data:
File.open("data.txt", "w") do |file|
(1..100).each do |num|
file.puts("line #{num}")
end
end


#Read data with 5 threads:
q = Queue.new

my_threads = (1..5).collect do |i|
Thread.new do #returns a thread
loop do
line = q.pop

if line == "END_OF_DATA"
break
end

#process line:
puts line.capitalize
end
end
end

#Threads are blocking while they
#await data. Give them some data:
IO.foreach("data.txt") do |line|
q.push(line)
end

#Send each thread a signal that
#terminates the thread:
5.times {q.push("END_OF_DATA")}

#Wait for all the threads to finish
#executing:
my_threads.each {|t| t.join}


--
Posted via http://www.ruby-....

Robert Klemme

10/13/2007 10:34:00 AM

0


While this is all true and well I have some additional remarks.

On 13.10.2007 05:59, 7stud -- wrote:
> Actually, the example provided won't even work in your case. You have
> to do some extra things.
>
>> I'm pretty new to ruby
>
> A Queue is a first in first out container, which means the items you
> push() into one end of the Queue are the first items that pop() out the
> other end. A Queue is also thread safe, which means that only one
> thread can access it at the same time.
>
> Therefore, you can push() the lines from your file into one end of the
> Queue, and you can have each thread pop() a line off the other end of
> the Queue.
>
> If there is nothing in the Queue, then a thread that tries to pop() a
> line from the Queue will block until more data becomes available. As a
> result, even after all the lines have been read from the Queue, each
> thread will come back to the Queue and try to pop() another line, but
> since there won't be any more lines left, the threads will block and
> wait for more data. That means the threads will never end. To make
> your threads stop trying to read more lines from the Queue once it's
> exhausted, you will need to send each thread a string that acts as a
> termination message.

There is a better option: rather send something down the queue that is
*not a String* - otherwise processing would suddenly stop if the file
contained the terminating line.

> You could first push() all the lines from your file into the Queue, and
> then start the threads,

That's a rather bad idea given that a file can be huge and you do not
need all lines in memory for line wise processing.

That's the same reason why it's a good idea to use a bounded queue: if
processing is slower than reading, an unbounded queue will eventually
fill up with the complete file contents. If processing is faster than
reading then threads will have to wait either way.

> but you might as well get the threads working on
> the first lines while you are pushing the rest of the lines into the
> Queue. So, start the threads and let them block, then start pushing
> the lines from the file into the Queue.
>
>
> require 'thread'
>
> #Create some data:
> File.open("data.txt", "w") do |file|
> (1..100).each do |num|
> file.puts("line #{num}")
> end
> end
>
>
> #Read data with 5 threads:
> q = Queue.new
>
> my_threads = (1..5).collect do |i|
> Thread.new do #returns a thread
> loop do
> line = q.pop
>
> if line == "END_OF_DATA"
> break
> end
>
> #process line:
> puts line.capitalize
> end
> end
> end
>
> #Threads are blocking while they
> #await data. Give them some data:
> IO.foreach("data.txt") do |line|
> q.push(line)
> end
>
> #Send each thread a signal that
> #terminates the thread:
> 5.times {q.push("END_OF_DATA")}
>
> #Wait for all the threads to finish
> #executing:
> my_threads.each {|t| t.join}

Here's my version with all the remarks incorporated.

require 'thread'

MAX_IN_QUEUE = 1024
NUM_THREADS = 5

queue = SizedQueue.new MAX_IN_QUEUE

threads = (1..NUM_THREADS).map do
# we use the mechanism to pass the queue through
# the constructor to avoid nasty effects of
# variable "queue" changing
Thread.new queue do |q|
# we use the queue itself as terminator
until q == (item = q.deq)
begin
# whatever processing
rescue Exception => e
# whatever error handling
end
end
end
end

# read from files on the command line
ARGF.each do |line|
queue.enq line
end

threads.each do |th|
# send the terminator and wait
queue.enq queue
th.join
end

Have fun!

robert

7stud --

10/13/2007 3:11:00 PM

0

Francis Cianfrocca wrote:
> On 10/12/07, Jon Handler <jhandler@shopping.com> wrote:
>> 1. Open the file
>> 2. Create 5 threads
>>
>> Each thread should read a line of the file and process it, but no 2
>> threads should get the same line.
>
>
>
> Why are you doing this in the first place? Do you have a computer with
> five
> processors and five memory buses?

According to pickaxe2, p. 135, your question is irrelevant:

"Finally, if your machine has more than one processor, Ruby threads
won't take advantage of that fact--because they run in a single process,
and in a single native thread, they are constrained to run on one
processor at a time."

Perhaps a better question for the op is: does your processing result in
any pauses in the code? For instance, do you use the information in the
log file to send requests to websites where you are waiting for a
response?

Threads do not actually allow any code to run at the same time. What
really happens is that execution rapidly shifts from one thread to
another, which gives the appearance that the threads are executing at
the same time.

If you have five methods that each take 2 seconds to execute, and you
run those five methods one after another, your program with take 10
seconds to execute. On the other hand, if you use five threads to
execute those methods, your program will still take 10 seconds to
execute. For example, suppose each thread gets 1 second to execute
before execution shifts to another thread, something like this will
occur:

thread1: 1 sec
|
V
thread2: 1 sec
|
V
thread3: 1 sec
|
V
thread4: 1 sec
|
V
thread5: 1 sec
|
V
thread1: 1 sec
|
V
thread2: 1 sec
|
V
thread3: 1 sec
|
V
thread4: 1 sec
|
V
thread5: 1 sec


If you total up the time, it still takes 10 seconds for your program to
execute when using five threads. The only way threads help speed up
execution is if there are pauses in your code where nothing is
happening. During those pauses, threads allow execution to shift to
other code that is ready to execute.

--
Posted via http://www.ruby-....

Eric I.

10/13/2007 3:36:00 PM

0

On Oct 13, 6:33 am, Robert Klemme <shortcut...@googlemail.com> wrote:
> There is a better option: rather send something down the queue that is
> *not a String* - otherwise processing would suddenly stop if the file
> contained the terminating line.

That's a very nice solution. It demonstrates a lot of accumulatd
wisdom. I think I'd use a symbol in the queue, such as :end_of_data,
rather than the queue itself to mark the end of the data, if only to
avoid a "huh?" moment from those who read the code down the line.

Eric

----

On-site, hands-on Ruby training is available from http://Lea...
!

Simon Kröger

10/13/2007 3:46:00 PM

0

ara.t.howard wrote:
>[...]
> threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }
>[...]

just a question of style:

while the above is quite clever, is there any (hidden) reason to use it over
such primitive constructs like

threads = Array.new(5){ Thread.new{ puts q.pop } }

or

threads = (1..5).map{ Thread.new{ puts q.pop } }

?

cheers

Simon

Simon Kröger

10/13/2007 3:52:00 PM

0

Eric I. schrieb:
> That's a very nice solution. It demonstrates a lot of accumulatd
> wisdom. I think I'd use a symbol in the queue, such as :end_of_data,
> rather than the queue itself to mark the end of the data, if only to
> avoid a "huh?" moment from those who read the code down the line.

I think there will be a big (and probably long) "huh?" moment when running the
code:

threads.each do |th|
# send the terminator and wait
queue.enq queue
th.join
end

is more likely than not to never terminate. If another thread than th is eating
the terminator th.join will wait for a long time.

> Eric

cheers

Simon

never ever think wisdom will protect you against threads :) except if wisdom
tells you not to use them.

ara.t.howard

10/13/2007 4:27:00 PM

0


On Oct 13, 2007, at 9:50 AM, Simon Kröger wrote:

> ara.t.howard wrote:
>> [...]
>> threads = [] and 5.times{ threads << Thread.new{ puts q.pop } }
>> [...]
>
> just a question of style:
>
> while the above is quite clever, is there any (hidden) reason to
> use it over
> such primitive constructs like

not really. mostly i just like the '5' to be more prominent in the code

>
> threads = Array.new(5){ Thread.new{ puts q.pop } }
>

this works

> or
>
> threads = (1..5).map{ Thread.new{ puts q.pop } }

seems kinda heavyweight, but yeah

cheers.

a @ http://codeforp...
--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama




ara.t.howard

10/13/2007 4:37:00 PM

0


On Oct 12, 2007, at 8:35 PM, 7stud -- wrote:

>
> Skip that inappropriate suggestion and use the following instead:
>
> threads.each {|t| t.join}

unless you use

Thread.current.abort_on_exception = true

you should *always* use 'map' and check the return value. otherwise
you have no idea if your threads have completed successfully and you
simply exit whether threads did their job or not.

regards.

a @ http://codeforp...
--
it is not enough to be compassionate. you must act.
h.h. the 14th dalai lama