[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.programming

My efficient Threadpool engine tutorial

Ramine

5/25/2015 7:37:00 PM

Hello...


Here is my efficient Threadpool engine tutorial


On a multicore system, your goal is to spread the work efficiently among
many cores so that it does executes simultaneously. And performance gain
should be directly related to how many cores you have. So, a quad core
system should be able to get the work done 4 times faster than a single
core system. A 16-core platform should be 4-times faster than a
quad-core system, and 16-times faster than a single core...

That's where my efficient Threadpool is useful , it spreads the work
efficiently among many cores. Threadpool (and Threadpool with
priorities) consist of a fast concurrent FIFO queue, so when you call
ThreadPool.execute() , your work item get queued in thethe fast
concurrent queue. The worker threads pick them out in a First In First
Out order (i.e., FIFO order), and execute them.


The following have been added to the efficient Threadpool engine:

* The worker threads enters in a wait state when there is no job in
the concurrent FIFO queues - for more efficiency -
* You can distribute your jobs to the worker threads and call any
method with the threadpool's execute() method.
* You can wait for the jobs to finish with the wait() method.


My Threadpool allows load balancing, and also minimize contention.


Threadpool is very easy to use, let's look now at an example in Object
Pascal...

program test;



uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

ThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;



var

myobj:TMyThread;

TP: TThreadPool;

obj:pointer;

cs: TCriticalSection;

tp1:TThreadPoolC;




procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.

tp1:=TP.CreateThreadPoolC(true);

obj:=pointer(1);

TP1.execute(myobj.myproc1,pointer(obj));

obj:=pointer(2);

TP1.execute(myobj.myproc2,pointer(obj));

TP1.setCounter() ;

TP1.wait(INFINITE);

readln;

TP1.free;

TP.Terminate;

TP.Free;

end.



Let us look at the first line...

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

cmem is required for Delphi to use TBB memory manager (from Intel) ,
this will allow delphi memory manager to scale linearely...

Note: FPC doesn't need cmem, cause it scales linearely with threads...

ThreadPool: is our threadpool unit ..

syncobjs: contains all the sychronizations stuff like CriticalSections,
Events etc..

After that we have the following lines:

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

We declare a TMyThread that ineherit from TThreadPoolThread, and we
declare our two methods MyProc1 and MyProc2 that we want to be executed
by our threadpool's worker threads. Each method has an obj as a paramater.

In the main body we create a TMyThread object like this:

myobj:=TMyThread.create;

and after that we create a TThreadPool object with 4 workers threads
like this:

TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.

After that you create ThreadPoolC object so that you can distribute your
jobs to the ThreadPool by writing this:
tp1:=TP.CreateThreadPoolC(true);

If you pass true to CreateThreadPoolC() , wait() will wait for all the
jobs to finish, if you pass false to CreateThreadPoolC(), wait() will
not wait for the jobs to finish.

After that we distribute to our worker threads the methods to be
executed , we do it by calling the Threadpool's execute() method and we
pass it myobj.myproc1 and myobj.myproc2 with there parameters:.
After that you have to call setCounter() and wait() to wait for all the
threads..

TP.1.setCounter();

TP1.wait(INFINITE);
You have to call setCounter() , it is mandatory, the first parameter of
wait() is the time to wait in milliseconds, if you set it to INFINITE,
it will wait for the threads to finish.

After that you have to free TP1 and TP objects.

As you see, Threadpool (and threadpool with priority) is very easy to use...

Let's look now at an example of a Threadpool with priority:.

program test;



uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

PThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TPThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;



var

myobj:TMyThread;

TP: TPThreadPool;

obj:pointer;

cs:TCriticalSection;

tp1:TPThreadPoolC;

procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TPThreadPool.Create(4,TMyThread, 20); // 4 workers threads and
2^20 items for each queue.

tp1:=TP.CreateThreadPoolC(true);

obj:=pointer(1);

TP1.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);

obj:=pointer(2);

TP1.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);

TP1.setCounter();

TP1.wait(INIFINITE);

readln;

TP.Terminate;

TP.Free;

end.


As you have noticed, this is almost the same as threadpool..

You use PThreadPool - P for priority - rather than Threadpool

TPThreadPoolThread rather than TThreadPoolThread

TPThreadPoolC rather than TThreadPoolC

TPThreadPool.Create rather than TThreadPool.Create

and as you have noticed in
TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using
priorities.

You can give the following priorities to jobs:

LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY



That's all.

You can download threadpool (and threadpool with priority) from:

https://sites.google.com/site...

Sincerely,
Amine Moulay Ramdane.






2 Answers

Richard Heathfield

5/25/2015 4:44:00 PM

0

On 25/05/15 20:37, Ramine wrote:
> Hello...

Hello. Is all your Pascal code that badly-written?

--
Richard Heathfield
Email: rjh at cpax dot org dot uk
"Usenet is a strange place" - dmr 29 July 1999
Sig line 4 vacant - apply within

Ramine

5/25/2015 7:55:00 PM

0


Hello,


Here is the new updated tutorial:


On a multicore system, your goal is to spread the work efficiently among
many cores so that it does executes simultaneously. And performance gain
should be directly related to how many cores you have. So, a quad core
system should be able to get the work done 4 times faster than a single
core system. A 16-core platform should be 4-times faster than a
quad-core system, and 16-times faster than a single core...

That's where my efficient Threadpool is useful , it spreads the work
efficiently among many cores. Threadpool (and Threadpool with
priorities) consist of a fast concurrent FIFO queue and a number of
worker threads that you have to start, so when you call
ThreadPool.execute() , your work item get queued in the fast concurrent
queue. The worker threads pick them out in a First In First Out order
(i.e., FIFO order), and execute them.


The following have been added to the efficient Threadpool engine:

* The worker threads enters in a wait state when there is no job in
the concurrent FIFO queues - for more efficiency -
* You can distribute your jobs to the worker threads and call any
method with the threadpool's execute() method.
* You can wait for the jobs to finish with the wait() method.


My Threadpool allows load balancing, and also minimize contention.


Threadpool is very easy to use, let's look now at an example in Object
Pascal...

program test;



uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

ThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;



var

myobj:TMyThread;

TP: TThreadPool;

obj:pointer;

cs: TCriticalSection;

tp1:TThreadPoolC;




procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.

tp1:=TP.CreateThreadPoolC(true);

obj:=pointer(1);

TP1.execute(myobj.myproc1,pointer(obj));

obj:=pointer(2);

TP1.execute(myobj.myproc2,pointer(obj));

TP1.setCounter() ;

TP1.wait(INFINITE);

readln;

TP1.free;

TP.Terminate;

TP.Free;

end.



Let us look at the first line...

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

cmem is required for Delphi to use TBB memory manager (from Intel) ,
this will allow delphi memory manager to scale linearely...

Note: FPC doesn't need cmem, cause it scales linearely with threads...

ThreadPool: is our threadpool unit ..

syncobjs: contains all the sychronizations stuff like CriticalSections,
Events etc..

After that we have the following lines:

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

We declare a TMyThread that ineherit from TThreadPoolThread, and we
declare our two methods MyProc1 and MyProc2 that we want to be executed
by our threadpool's worker threads. Each method has an obj as a paramater.

In the main body we create a TMyThread object like this:

myobj:=TMyThread.create;

and after that we create a TThreadPool object with 4 workers threads
like this:

TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.

After that you create ThreadPoolC object so that you can distribute your
jobs to the ThreadPool by writing this:
tp1:=TP.CreateThreadPoolC(true);

If you pass true to CreateThreadPoolC() , wait() will wait for all the
jobs to finish, if you pass false to CreateThreadPoolC(), wait() will
not wait for the jobs to finish.

After that we distribute to our worker threads the methods to be
executed , we do it by calling the Threadpool's execute() method and we
pass it myobj.myproc1 and myobj.myproc2 with there parameters:.
After that you have to call setCounter() and wait() to wait for all the
jobs..

TP.1.setCounter();

TP1.wait(INFINITE);
You have to call setCounter() , it is mandatory, the first parameter of
wait() is the time to wait in milliseconds, if you set it to INFINITE,
it will wait for the jobs to finish.

After that you have to free TP1 and TP objects.

As you see, Threadpool (and threadpool with priority) is very easy to use...

Let's look now at an example of a Threadpool with priority:.

program test;



uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

PThreadPool,sysutils,syncobjs;

{$I defines.inc}

type

TMyThread = class (TPThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;



var

myobj:TMyThread;

TP: TPThreadPool;

obj:pointer;

cs:TCriticalSection;

tp1:TPThreadPoolC;

procedure TMyThread.MyProc1(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc1 with parameter: ',integer(obj));

cs.leave;

end;

procedure TMyThread.MyProc2(obj: Pointer);

begin

cs.enter;

writeln('This is MyProc2 with parameter: ',integer(obj));

cs.leave;

end;

begin

myobj:=TMyThread.create;

cs:=TCriticalSection.create;

TP := TPThreadPool.Create(4,TMyThread, 20); // 4 workers threads and
2^20 items for each queue.

tp1:=TP.CreateThreadPoolC(true);

obj:=pointer(1);

TP1.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);

obj:=pointer(2);

TP1.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);

TP1.setCounter();

TP1.wait(INIFINITE);

readln;

TP.Terminate;

TP.Free;

end.



As you have noticed, this is almost the same as threadpool..

You use PThreadPool - P for priority - rather than Threadpool

TPThreadPoolThread rather than TThreadPoolThread

TPThreadPoolC rather than TThreadPoolC

TPThreadPool.Create rather than TThreadPool.Create

and as you have noticed in
TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using
priorities.

You can give the following priorities to jobs:

LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY



That's all.

You can download threadpool (and threadpool with priority) from:

https://sites.google.com/site...

Sincerely,
Amine Moulay Ramdane.