[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

How to run background processes (more than 1 worker) parallely.

Deepak Gole

12/12/2008 1:59:00 PM

[Note: parts of this message were removed to make it a legal post.]

Hi

My requirement is as follows

1) I have around 200 feeds in the database that I need to parse (fetch
contents) *parallely* after some interval and store those feed items in
database.

2) Now I am using backgroundrb with 10 workers each worker has assigned a
job to parse data from 20 feeds (e.g 1st worker will fecth data from
feeds(1..20), 2nd from feeds(21..30) ..etc.....

3) But backgroundrb is not reliable and it fails after some time. So I have
tried Starling & Workling but those worker doesn't run *parallely.

( I need to run **parallely because those feeds will increase say 1000
feeds. So I can't run them sequentially. ) *
*
Pls I need a help on above problem.*


Thanks
Deepak

9 Answers

Chris Lowis

12/12/2008 2:13:00 PM

0

> 3) But backgroundrb is not reliable and it fails after some time. So I
> have tried Starling & Workling but those worker doesn't run *parallely.

Nanite (http://github.com/ezmobius/nanite/t...) maybe? Perhaps
some of the Rails and/or Merb groups will be able to help you more.

Good luck,

Chris

--
Posted via http://www.ruby-....

saurabh purnaye

12/12/2008 6:35:00 PM

0

[Note: parts of this message were removed to make it a legal post.]

hi Deepak,
You may go for Starfish <http://rufy.com/starfis... or
Skynet<http://skynet.rubyforg...,
which are based on google's map-reduce algorithm.
Even you may get more information on the rails cats episodes 127 to 129.

On Fri, Dec 12, 2008 at 7:28 PM, Deepak Gole <deepak.gole8@gmail.com> wrote:

> Hi
>
> My requirement is as follows
>
> 1) I have around 200 feeds in the database that I need to parse (fetch
> contents) *parallely* after some interval and store those feed items in
> database.
>
> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
> job to parse data from 20 feeds (e.g 1st worker will fecth data from
> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>
> 3) But backgroundrb is not reliable and it fails after some time. So I have
> tried Starling & Workling but those worker doesn't run *parallely.
>
> ( I need to run **parallely because those feeds will increase say 1000
> feeds. So I can't run them sequentially. ) *
> *
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak
>



--
--
Thanks and Regards
Saurabh Purnaye
+91-9922071155
skype: sorab_pune
yahoo & gtalk: saurabh.purnaye
msn: psaurabh@live.com

Saji N. Hameed

12/13/2008 12:39:00 PM

0

Hi Deepak,

As others mentioned, an adaptation of Google Map-Reduce technique
may be of use. To this end, you could you Ruby's Linda. For my needs
I wrote a small script that puts work descriptions on a tuple space.
This is taken up by one or more workers in parallel.

If you write distinct messages that are recognized by workers, you
could probably achieve your parallelism in a few lines without extra
libraries, perhaps except for DRBfire.

I attach it here (i am a novice Ruby programmer, the code may not
be optimal) - hope it helps.

saji

--queue code

require 'thread'
require 'sequel'
require 'rinda/tuplespace'
require 'drb'

ts = Rinda::TupleSpace.new
DRb.start_service("druby://:3130",ts)
puts "Drb server running at #{DRb.uri}"

dbname="sqlite://testQ.db"
db = Sequel.connect(dbname)
pause = 15

loop do
th1 = Thread.new do
job = db[:jobs].filter(:status => "queued").first
submit = job.merge(:status => "submitted")
ts.write [:q1, submit]
db[:jobs].filter(job).update(submit)
end
th2 = Thread.new do
result = ts.take [:rq1,nil,nil]
unless result[1]==nil
p "processing images"
p "finished image processing"
p "update job status in database"
db[:jobs].filter(result[1]).update(:status => "finished")
end
end
sleep(pause)
end
th1.join
th2.join

# connect to database
# create tuplespace
# thread1
# - collect from database
# - put on tuple
# - update db

# thread2
# - check tuple
# - download data
# - update db

---worker code

require 'drb'
require 'rinda/rinda'

DRb.start_service
ro = DRbObject.new_with_uri('druby://localhost:3130')
ts = Rinda::TupleSpaceProxy.new(ro)

def make_mme(job)
"This will be passed to AFS Server: don't worry yet"
p job
end

job = ts.take([:q1,nil])
msg = make_mme(job[1])
ts.write [:rq1,job,0] # write return status to tuplespace

DRb.thread.join

# worker takes job from tuple space (ts.take[:q1,..])
# executes job (make_mme)
# writes message on tuple space (ts.write[:rq1,..])
* Deepak Gole <deepak.gole8@gmail.com> [2008-12-12 22:58:58 +0900]:

> Hi
>
> My requirement is as follows
>
> 1) I have around 200 feeds in the database that I need to parse (fetch
> contents) *parallely* after some interval and store those feed items in
> database.
>
> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
> job to parse data from 20 feeds (e.g 1st worker will fecth data from
> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>
> 3) But backgroundrb is not reliable and it fails after some time. So I have
> tried Starling & Workling but those worker doesn't run *parallely.
>
> ( I need to run **parallely because those feeds will increase say 1000
> feeds. So I can't run them sequentially. ) *
> *
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak

--
Saji N. Hameed

APEC Climate Center +82 51 668 7470
National Pension Corporation Busan Building 12F
Yeonsan 2-dong, Yeonje-gu, BUSAN 611705 saji@apcc21.net
KOREA

Robert Klemme

12/13/2008 5:00:00 PM

0

On 12.12.2008 14:58, Deepak Gole wrote:

> My requirement is as follows
>
> 1) I have around 200 feeds in the database that I need to parse (fetch
> contents) *parallely* after some interval and store those feed items in
> database.
>
> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
> job to parse data from 20 feeds (e.g 1st worker will fecth data from
> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>
> 3) But backgroundrb is not reliable and it fails after some time.

Can you be more specific what you really mean by this? How does it fail?

> So I have
> tried Starling & Workling but those worker doesn't run *parallely.

Maybe you used it not in the proper way. From what I read on the web
site doing work concurrently is all that S+W is about.

Cheers

robert

Jeff Moore

12/13/2008 6:04:00 PM

0

Deepak Gole wrote:
> Hi
>
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak

Here's my approach to a similar problem. Still not as polished as I'd
like, but it maybe useful.

The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).

=begin

NAME

class CircularBuffer

DESCRIPTION

A lightweight but (hopefully) thread-safe version of the circular
buffer

Designed primarily for intentionally limited in-memory event/error
logging.

URI



INSTALL



HISTORY

0.1

SYNOPSIS

cb = CircularBuffer.new(50) # create a new CircularBuffer that
holds 50 nil elements
cb << 'fnord' # append an element to the buffer
elements = cb.to_a # return elements as an array with
elements ordered from oldest to newest
cb.clear # force all entires to nil

CAVEATS

The CircularBuffer ignores nil elements and ignores attempts to append
them

2DOs



By Djief

=end

require 'thread'

class CircularBuffer

def initialize(max_size)
@max_size = max_size
@ra = Array.new(@max_size, nil)
@head = 0
@mutex = Mutex.new
end

private

def inc(index)
(index +1) % @max_size
end

public

# set all elements to nil
#
def clear
@mutex.synchronize do
@ra.collect! { |element| element = nil }
end
end

# append a new element to the current 'end'
#
def <<(element)
unless element.nil?
@mutex.synchronize do
@ra[@head]=element
@head = inc(@head)
end
end
end

# return the entire buffer (except nil elements)
# as an array
#
def to_a
index = @head
result = []
@mutex.synchronize do
@max_size.times do
result << @ra[index] unless @ra[index].nil?
index = inc(index)
end
end
result
end

end

=begin

NAME

class PoolQM

DESCRIPTION

PoolQM extends an Array with MonitorMixin to create a queue with
an associated pool of worker threads that wait process any requests
that are added to the queue.

A dispatcher thread watches continuously for enqueued requests and
signals idle worker threads (if any) to dequeue and process the
request(s). If no idle workers exist, the request remains in the
queue until one is available.

During the creation of a new instance of PoolQM, the number of worker
threads is established and the request processing block is defined:

results = Queue.new
NUM_OF_WORKERS = 10
pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end

Note that any output you expect to collect from your worker threads
should
be returned via some thread-safe mechanism or container (Queue is a
good
default).

Enqueuing a request is all that is necessary to initiate it's
processing:

pqm.enq("This is a test, this is only a test")

If a request causes an exception within the processing block, the
Exception
is appended to a circular buffer whose contents can be obtained as an
array
with the PoolQM#exceptions method.

If you're intested in logging exceptions, you'll have a bit more work
to
do but replacing the CircularBuffer with a Queue that has it's own
worker
to handle disk IO is probably a good bet.

Performance-wise this approach behaves more consistently than any I've
produced so far i.e. it's both fast and performs with repeatable
uniformity.

No doubt, there's still room for improvement.


URI



INSTALL



HISTORY

0.1 - genesis
0.2 - documentation and clean-up

SYNOPSIS

require 'thread'

results = Queue.new # thread-safe container
for results! <<<<<<<<<< IMPORTANT

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
results << "Current request: #{request}" # processing request
here
end

100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end

pqm.wait_until_idle # wait for all requests
to be processed

until results.empty? do # dump results
p results.pop
end

pqm.exceptions.each do |exception| # obtain exceptions
array and dump it
p exception
end

CAVEATS



2DOs



By Djief

=end


require 'monitor'

class PoolQM

# default size for the exceptions CircularBuffer
#
DEFAULT_EXCEPTION_BUFFER_SIZE = 10

# Create a new PoolQM with 'worker_count' worker threads to execute
# the associated block
#
def initialize(worker_count = 1)
raise 'block required: { |request| ... }' unless block_given?
@worker_count = worker_count
@request_q = []
@request_q.extend(MonitorMixin)
@request_ready = @request_q.new_cond
@exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
@worker_count.times do
Thread.new do
loop do
request = nil
@request_q.synchronize do
@request_ready.wait
request = @request_q.delete_at(0)
end
begin
yield request
rescue Exception => e
@exceptions << e
end
Thread.pass
end
end
end
@dispatcher = Thread.new do
loop do
@request_q.synchronize do
@request_ready.signal unless @request_q.empty? ||
@request_ready.count_waiters == 0
end
Thread.pass
end
end

end

# enq the request data
#
def enq(request)
@request_q.synchronize do
@request_q << request
end
end

# Wait until all the queued requests have been removed
# from the request_q && then wait until all threads have
# compeleted their processing and are idle
#
def wait_until_idle(wait_resolution=0.3)
q_empty = false
until q_empty
@request_q.synchronize do
q_empty = @request_q.empty?
end
sleep(wait_resolution) unless q_empty
end
all_threads_idle = false
until all_threads_idle
@request_q.synchronize do
all_threads_idle = @request_ready.count_waiters == @worker_count
end
sleep(wait_resolution) unless all_threads_idle
end
end

# create a new exceptions buffer of new_size
#
def exceptions_buffer_size=(new_size)
@exceptions = CircularBuffer.new(new_size)
end

# report the size of the current exceptions buffer
#
def exceptions_buffer_size
@exceptions.size
end

# return the current exceptions buffer as an ordered Array
#
def exceptions
@exceptions.to_a
end

end

if __FILE__ == $0

# the usual trivial example

require 'thread'

# >>>> thread-safe container for result <<<<
#
results = Queue.new

NUM_OF_WORKERS = 10

pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
raise "Dummy Exception during #{request}" if rand(10) == 0 #
simulate random exceptions
results << "Current request: #{request}" # processing request
here
end

100.times do |index|
pqm.enq("Request number #{index}") # enqueuing requests
here
end

# wait for all requests to be processed
pqm.wait_until_idle

# dump results
until results.empty? do
p results.pop
end

# obtain exceptions array and dump it
pqm.exceptions.each do |exception|
p exception
end

end


Regards,

djief
--
Posted via http://www.ruby-....

Deepak Gole

12/15/2008 1:29:00 PM

0

[Note: parts of this message were removed to make it a legal post.]

Hello Robert



On Sat, Dec 13, 2008 at 10:22 PM, Robert Klemme
<shortcutter@googlemail.com>wrote:

> On 12.12.2008 14:58, Deepak Gole wrote:
>
> My requirement is as follows
>>
>> 1) I have around 200 feeds in the database that I need to parse (fetch
>> contents) *parallely* after some interval and store those feed items in
>> database.
>>
>> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
>> job to parse data from 20 feeds (e.g 1st worker will fecth data from
>> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>>
>> 3) But backgroundrb is not reliable and it fails after some time.
>>
>
> Can you be more specific what you really mean by this? How does it fail?
>

>>> Well when I start the backgroundrb processes then for next few or next
1,2 days everything works well, But after some time worker just gets hangs I
can see there process ID till active.
But No output. I examine logs also but didn't get anything in log files. I
am not getting a single clue of what went wrong?

>
>
> So I have
>> tried Starling & Workling but those worker doesn't run *parallely.
>>
>
> Maybe you used it not in the proper way. From what I read on the web site
> doing work concurrently is all that S+W is about.
>
>>> I have created 2 workling workers each worker has one method which just
logs the some o/p and then I ran them from console. When I examined the log
files I got those o/p sequentially

class MyWorker < Workling::Base

def sample_one(options)
5.times do |i|
logger.info "====Hi from 1st worker===============" end
end
end
end

class MySecondWorker < Workling::Base

def sample_twooptions)
5.times do |i|
logger.info "====Hi from 2nd worker===============" end
end
end

end

I got following o/p

====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============
====Hi from 2nd worker===============


I was expecting something like this

====Hi from 1st worker===============
====Hi from 2nd worker===============
====Hi from 1st worker===============
====Hi from 2nd worker===============

> ........


Thanks For your help
Deepak

Robert Klemme

12/15/2008 9:26:00 PM

0

On 15.12.2008 14:28, Deepak Gole wrote:

> On Sat, Dec 13, 2008 at 10:22 PM, Robert Klemme
> <shortcutter@googlemail.com>wrote:

>> Can you be more specific what you really mean by this? How does it fail?
>
>>>> Well when I start the backgroundrb processes then for next few or next
> 1,2 days everything works well, But after some time worker just gets hangs I
> can see there process ID till active.
> But No output. I examine logs also but didn't get anything in log files. I
> am not getting a single clue of what went wrong?

Apparently. Since I don't know the code I cannot really make sense of
what you report. It does seem weird though that apparently you keep
your workers active for several days. Do you actually keep them busy or
do you just keep them around?

>> So I have
>>> tried Starling & Workling but those worker doesn't run *parallely.
>>>
>> Maybe you used it not in the proper way. From what I read on the web site
>> doing work concurrently is all that S+W is about.
>>
>>>> I have created 2 workling workers each worker has one method which just
> logs the some o/p and then I ran them from console. When I examined the log
> files I got those o/p sequentially
>
> class MyWorker < Workling::Base
>
> def sample_one(options)
> 5.times do |i|
> logger.info "====Hi from 1st worker===============" end
> end
> end
> end
>
> class MySecondWorker < Workling::Base
>
> def sample_twooptions)
> 5.times do |i|
> logger.info "====Hi from 2nd worker===============" end
> end
> end
>
> end
>
> I got following o/p
>
> ====Hi from 1st worker===============
> ====Hi from 1st worker===============
> ====Hi from 1st worker===============
> ====Hi from 1st worker===============
> ====Hi from 1st worker===============
> ====Hi from 2nd worker===============
> ====Hi from 2nd worker===============
> ====Hi from 2nd worker===============
> ====Hi from 2nd worker===============
> ====Hi from 2nd worker===============
>
>
> I was expecting something like this
>
> ====Hi from 1st worker===============
> ====Hi from 2nd worker===============
> ====Hi from 1st worker===============
> ====Hi from 2nd worker===============

Well, there is no guarantee that messages are actually intermixed as you
expect - especially not with Ruby's green threads - if that's what
Workling is using.

Cheers

robert

ara.t.howard

12/16/2008 4:45:00 PM

0


On Dec 12, 2008, at 6:58 AM, Deepak Gole wrote:

> Hi
>
> My requirement is as follows
>
> 1) I have around 200 feeds in the database that I need to parse (fetch
> contents) *parallely* after some interval and store those feed
> items in
> database.
>
> 2) Now I am using backgroundrb with 10 workers each worker has
> assigned a
> job to parse data from 20 feeds (e.g 1st worker will fecth data from
> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>
> 3) But backgroundrb is not reliable and it fails after some time. So
> I have
> tried Starling & Workling but those worker doesn't run *parallely.
>
> ( I need to run **parallely because those feeds will increase say 1000
> feeds. So I can't run them sequentially. ) *
> *
> Pls I need a help on above problem.*
>
>
> Thanks
> Deepak


use bj


http://codeforpeople.rubyforge.org/svn/bj/tr...


it was written for engine yard and is under heavy use there. the
focus is on simplicity and robustness.

a @ http://codeforp...
--
we can deny everything, except that we have the possibility of being
better. simply reflect on that.
h.h. the 14th dalai lama




hemant

12/17/2008 5:22:00 PM

0

On Tue, Dec 16, 2008 at 10:15 PM, ara.t.howard <ara.t.howard@gmail.com> wrote:
>
> On Dec 12, 2008, at 6:58 AM, Deepak Gole wrote:
>
>> Hi
>>
>> My requirement is as follows
>>
>> 1) I have around 200 feeds in the database that I need to parse (fetch
>> contents) *parallely* after some interval and store those feed items in
>> database.
>>
>> 2) Now I am using backgroundrb with 10 workers each worker has assigned a
>> job to parse data from 20 feeds (e.g 1st worker will fecth data from
>> feeds(1..20), 2nd from feeds(21..30) ..etc.....
>>
>> 3) But backgroundrb is not reliable and it fails after some time. So I
>> have
>> tried Starling & Workling but those worker doesn't run *parallely.
>>
>> ( I need to run **parallely because those feeds will increase say 1000
>> feeds. So I can't run them sequentially. ) *

Do you have backtrace of any kind? Can you post your worker code?
Which version of BackgrounDRb you are running?