Asp Forum
Home
|
Login
|
Register
|
Search
Forums
>
comp.programming.threads
My efficient Threadpool engine tutorial
Ramine
5/25/2015 7:36:00 PM
Hello,
Here is my efficient Threadpool engine tutorial
On a multicore system, your goal is to spread the work efficiently among
many cores so that it does executes simultaneously. And performance gain
should be directly related to how many cores you have. So, a quad core
system should be able to get the work done 4 times faster than a single
core system. A 16-core platform should be 4-times faster than a
quad-core system, and 16-times faster than a single core...
That's where my efficient Threadpool is useful , it spreads the work
efficiently among many cores. Threadpool (and Threadpool with
priorities) consist of a fast concurrent FIFO queue, so when you call
ThreadPool.execute() , your work item get queued in thethe fast
concurrent queue. The worker threads pick them out in a First In First
Out order (i.e., FIFO order), and execute them.
The following have been added to the efficient Threadpool engine:
* The worker threads enters in a wait state when there is no job in
the concurrent FIFO queues - for more efficiency -
* You can distribute your jobs to the worker threads and call any
method with the threadpool's execute() method.
* You can wait for the jobs to finish with the wait() method.
My Threadpool allows load balancing, and also minimize contention.
Threadpool is very easy to use, let's look now at an example in Object
Pascal...
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
ThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TThreadPool;
obj:pointer;
cs: TCriticalSection;
tp1:TThreadPoolC;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.
tp1:=TP.CreateThreadPoolC(true);
obj:=pointer(1);
TP1.execute(myobj.myproc1,pointer(obj));
obj:=pointer(2);
TP1.execute(myobj.myproc2,pointer(obj));
TP1.setCounter() ;
TP1.wait(INFINITE);
readln;
TP1.free;
TP.Terminate;
TP.Free;
end.
Let us look at the first line...
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
cmem is required for Delphi to use TBB memory manager (from Intel) ,
this will allow delphi memory manager to scale linearely...
Note: FPC doesn't need cmem, cause it scales linearely with threads...
ThreadPool: is our threadpool unit ..
syncobjs: contains all the sychronizations stuff like CriticalSections,
Events etc..
After that we have the following lines:
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
We declare a TMyThread that ineherit from TThreadPoolThread, and we
declare our two methods MyProc1 and MyProc2 that we want to be executed
by our threadpool's worker threads. Each method has an obj as a paramater.
In the main body we create a TMyThread object like this:
myobj:=TMyThread.create;
and after that we create a TThreadPool object with 4 workers threads
like this:
TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.
After that you create ThreadPoolC object so that you can distribute your
jobs to the ThreadPool by writing this:
tp1:=TP.CreateThreadPoolC(true);
If you pass true to CreateThreadPoolC() , wait() will wait for all the
jobs to finish, if you pass false to CreateThreadPoolC(), wait() will
not wait for the jobs to finish.
After that we distribute to our worker threads the methods to be
executed , we do it by calling the Threadpool's execute() method and we
pass it myobj.myproc1 and myobj.myproc2 with there parameters:.
After that you have to call setCounter() and wait() to wait for all the
threads..
TP.1.setCounter();
TP1.wait(INFINITE);
You have to call setCounter() , it is mandatory, the first parameter of
wait() is the time to wait in milliseconds, if you set it to INFINITE,
it will wait for the threads to finish.
After that you have to free TP1 and TP objects.
As you see, Threadpool (and threadpool with priority) is very easy to use...
Let's look now at an example of a Threadpool with priority:.
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
PThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TPThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TPThreadPool;
obj:pointer;
cs:TCriticalSection;
tp1:TPThreadPoolC;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TPThreadPool.Create(4,TMyThread, 20); // 4 workers threads and
2^20 items for each queue.
tp1:=TP.CreateThreadPoolC(true);
obj:=pointer(1);
TP1.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);
obj:=pointer(2);
TP1.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);
TP1.setCounter();
TP1.wait(INIFINITE);
readln;
TP.Terminate;
TP.Free;
end.
As you have noticed, this is almost the same as threadpool..
You use PThreadPool - P for priority - rather than Threadpool
TPThreadPoolThread rather than TThreadPoolThread
TPThreadPoolC rather than TThreadPoolC
TPThreadPool.Create rather than TThreadPool.Create
and as you have noticed in
TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using
priorities.
You can give the following priorities to jobs:
LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY
That's all.
You can download threadpool (and threadpool with priority) from:
https://sites.google.com/site...
Sincerely,
Amine Moulay Ramdane.
1 Answer
Ramine
5/25/2015 7:55:00 PM
0
Hello,
Here is the new updated tutorial:
On a multicore system, your goal is to spread the work efficiently among
many cores so that it does executes simultaneously. And performance gain
should be directly related to how many cores you have. So, a quad core
system should be able to get the work done 4 times faster than a single
core system. A 16-core platform should be 4-times faster than a
quad-core system, and 16-times faster than a single core...
That's where my efficient Threadpool is useful , it spreads the work
efficiently among many cores. Threadpool (and Threadpool with
priorities) consist of a fast concurrent FIFO queue and a number of
worker threads that you have to start, so when you call
ThreadPool.execute() , your work item get queued in the fast concurrent
queue. The worker threads pick them out in a First In First Out order
(i.e., FIFO order), and execute them.
The following have been added to the efficient Threadpool engine:
* The worker threads enters in a wait state when there is no job in
the concurrent FIFO queues - for more efficiency -
* You can distribute your jobs to the worker threads and call any
method with the threadpool's execute() method.
* You can wait for the jobs to finish with the wait() method.
My Threadpool allows load balancing, and also minimize contention.
Threadpool is very easy to use, let's look now at an example in Object
Pascal...
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
ThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TThreadPool;
obj:pointer;
cs: TCriticalSection;
tp1:TThreadPoolC;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.
tp1:=TP.CreateThreadPoolC(true);
obj:=pointer(1);
TP1.execute(myobj.myproc1,pointer(obj));
obj:=pointer(2);
TP1.execute(myobj.myproc2,pointer(obj));
TP1.setCounter() ;
TP1.wait(INFINITE);
readln;
TP1.free;
TP.Terminate;
TP.Free;
end.
Let us look at the first line...
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
cmem is required for Delphi to use TBB memory manager (from Intel) ,
this will allow delphi memory manager to scale linearely...
Note: FPC doesn't need cmem, cause it scales linearely with threads...
ThreadPool: is our threadpool unit ..
syncobjs: contains all the sychronizations stuff like CriticalSections,
Events etc..
After that we have the following lines:
type
TMyThread = class (TThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
We declare a TMyThread that ineherit from TThreadPoolThread, and we
declare our two methods MyProc1 and MyProc2 that we want to be executed
by our threadpool's worker threads. Each method has an obj as a paramater.
In the main body we create a TMyThread object like this:
myobj:=TMyThread.create;
and after that we create a TThreadPool object with 4 workers threads
like this:
TP := TThreadPool.Create(4, TMyThread); // 4 workers threads.
After that you create ThreadPoolC object so that you can distribute your
jobs to the ThreadPool by writing this:
tp1:=TP.CreateThreadPoolC(true);
If you pass true to CreateThreadPoolC() , wait() will wait for all the
jobs to finish, if you pass false to CreateThreadPoolC(), wait() will
not wait for the jobs to finish.
After that we distribute to our worker threads the methods to be
executed , we do it by calling the Threadpool's execute() method and we
pass it myobj.myproc1 and myobj.myproc2 with there parameters:.
After that you have to call setCounter() and wait() to wait for all the
jobs..
TP.1.setCounter();
TP1.wait(INFINITE);
You have to call setCounter() , it is mandatory, the first parameter of
wait() is the time to wait in milliseconds, if you set it to INFINITE,
it will wait for the jobs to finish.
After that you have to free TP1 and TP objects.
As you see, Threadpool (and threadpool with priority) is very easy to use...
Let's look now at an example of a Threadpool with priority:.
program test;
uses
{$IFDEF Delphi}
cmem,
{$ENDIF}
PThreadPool,sysutils,syncobjs;
{$I defines.inc}
type
TMyThread = class (TPThreadPoolThread)
//procedure ProcessRequest(obj: Pointer); override;
procedure MyProc1(obj: Pointer);
procedure MyProc2(obj: Pointer);
end;
var
myobj:TMyThread;
TP: TPThreadPool;
obj:pointer;
cs:TCriticalSection;
tp1:TPThreadPoolC;
procedure TMyThread.MyProc1(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc1 with parameter: ',integer(obj));
cs.leave;
end;
procedure TMyThread.MyProc2(obj: Pointer);
begin
cs.enter;
writeln('This is MyProc2 with parameter: ',integer(obj));
cs.leave;
end;
begin
myobj:=TMyThread.create;
cs:=TCriticalSection.create;
TP := TPThreadPool.Create(4,TMyThread, 20); // 4 workers threads and
2^20 items for each queue.
tp1:=TP.CreateThreadPoolC(true);
obj:=pointer(1);
TP1.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY);
obj:=pointer(2);
TP1.execute(myobj.myproc2,pointer(obj),NORMAL_PRIORITY);
TP1.setCounter();
TP1.wait(INIFINITE);
readln;
TP.Terminate;
TP.Free;
end.
As you have noticed, this is almost the same as threadpool..
You use PThreadPool - P for priority - rather than Threadpool
TPThreadPoolThread rather than TThreadPoolThread
TPThreadPoolC rather than TThreadPoolC
TPThreadPool.Create rather than TThreadPool.Create
and as you have noticed in
TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using
priorities.
You can give the following priorities to jobs:
LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY
That's all.
You can download threadpool (and threadpool with priority) from:
https://sites.google.com/site...
Sincerely,
Amine Moulay Ramdane.
Servizio di avviso nuovi messaggi
Ricevi direttamente nella tua mail i nuovi messaggi per
My efficient Threadpool engine tutorial
Inserendo la tua e-mail nella casella sotto, riceverai un avviso tramite posta elettronica ogni volta che il motore di ricerca troverà un nuovo messaggio per te
Il servizio è completamente GRATUITO!
x
Login to ForumsZone
Login with Google
Login with E-Mail & Password