EvolvingObjects
eoParallelApply.h
Go to the documentation of this file.
00001 /*
00002 (c) Thales group, 2012
00003 
00004     This library is free software; you can redistribute it and/or
00005     modify it under the terms of the GNU Lesser General Public
00006     License as published by the Free Software Foundation;
00007     version 2 of the License.
00008 
00009     This library is distributed in the hope that it will be useful,
00010     but WITHOUT ANY WARRANTY; without even the implied warranty of
00011     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012     Lesser General Public License for more details.
00013 
00014     You should have received a copy of the GNU Lesser General Public
00015     License along with this library; if not, write to the Free Software
00016     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 Contact: http://eodev.sourceforge.net
00018 
00019 Authors:
00020     Benjamin Bouvier <benjamin.bouvier@gmail.com>
00021 */
00022 # ifndef __EO_PARALLEL_APPLY_H__
00023 # define __EO_PARALLEL_APPLY_H__
00024 
00025 # include "eoMpi.h"
00026 
00027 # include <eoFunctor.h> // eoUF
00028 # include <vector> // std::vector population
00029 
00052 namespace eo
00053 {
00054     namespace mpi
00055     {
00061         struct ParallelApplyAssignment
00062         {
00063             int index;
00064             int size;
00065         };
00066 
00083         template<class EOT>
00084         struct ParallelApplyData
00085         {
00095             ParallelApplyData(
00096                     eoUF<EOT&, void> & _proc,
00097                     int _masterRank,
00098                     int _packetSize,
00099                     std::vector<EOT> * table = 0
00100                    ) :
00101                 _table( table ), func( _proc ), index( 0 ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() )
00102             {
00103                 if ( _packetSize <= 0 )
00104                 {
00105                     throw std::runtime_error("Packet size should not be negative.");
00106                 }
00107 
00108                 if( table )
00109                 {
00110                     size = table->size();
00111                 }
00112             }
00113 
00117             void init( std::vector<EOT>& table )
00118             {
00119                 index = 0;
00120                 size = table.size();
00121                 _table = &table;
00122                 assignedTasks.clear();
00123             }
00124 
00125             std::vector<EOT>& table()
00126             {
00127                 return *_table;
00128             }
00129 
00130             // All elements are public since functors will often use them.
00131             std::vector<EOT> * _table;
00132             eoUF<EOT&, void> & func;
00133             int index;
00134             int size;
00135             std::map< int /* worker rank */, ParallelApplyAssignment /* last assignment */> assignedTasks;
00136             int packetSize;
00137             std::vector<EOT> tempArray;
00138 
00139             int masterRank;
00140             bmpi::communicator& comm;
00141         };
00142 
00152         template< class EOT >
00153         class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData<EOT> >
00154         {
00155             public:
00156             using SendTaskFunction< ParallelApplyData<EOT> >::_data;
00157 
00158             SendTaskParallelApply( SendTaskParallelApply<EOT> * w = 0 ) : SendTaskFunction< ParallelApplyData<EOT> >( w )
00159             {
00160                 // empty
00161             }
00162 
00163             void operator()(int wrkRank)
00164             {
00165                 int futureIndex;
00166 
00167                 if( _data->index + _data->packetSize < _data->size )
00168                 {
00169                     futureIndex = _data->index + _data->packetSize;
00170                 } else {
00171                     futureIndex = _data->size;
00172                 }
00173 
00174                 int sentSize = futureIndex - _data->index ;
00175 
00176                 _data->comm.send( wrkRank, 1, sentSize );
00177 
00178                 eo::log << eo::progress << "Evaluating individual " << _data->index << std::endl;
00179 
00180                 _data->assignedTasks[ wrkRank ].index = _data->index;
00181                 _data->assignedTasks[ wrkRank ].size = sentSize;
00182 
00183                 _data->comm.send( wrkRank, 1, & ( (_data->table())[ _data->index ] ) , sentSize );
00184                 _data->index = futureIndex;
00185             }
00186         };
00187 
00193         template< class EOT >
00194         class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData<EOT> >
00195         {
00196             public:
00197             using HandleResponseFunction< ParallelApplyData<EOT> >::_data;
00198 
00199             HandleResponseParallelApply( HandleResponseParallelApply<EOT> * w = 0 ) : HandleResponseFunction< ParallelApplyData<EOT> >( w )
00200             {
00201                 // empty
00202             }
00203 
00204             void operator()(int wrkRank)
00205             {
00206                 _data->comm.recv( wrkRank, 1, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size );
00207             }
00208         };
00209 
00218         template< class EOT >
00219         class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData<EOT> >
00220         {
00221             public:
00222             using ProcessTaskFunction< ParallelApplyData<EOT> >::_data;
00223 
00224             ProcessTaskParallelApply( ProcessTaskParallelApply<EOT> * w = 0 ) : ProcessTaskFunction< ParallelApplyData<EOT> >( w )
00225             {
00226                 // empty
00227             }
00228 
00229             void operator()()
00230             {
00231                 int recvSize;
00232 
00233                 _data->comm.recv( _data->masterRank, 1, recvSize );
00234                 _data->tempArray.resize( recvSize );
00235                 _data->comm.recv( _data->masterRank, 1, & _data->tempArray[0] , recvSize );
00236                 timerStat.start("worker_processes");
00237                 for( int i = 0; i < recvSize ; ++i )
00238                 {
00239                     _data->func( _data->tempArray[ i ] );
00240                 }
00241                 timerStat.stop("worker_processes");
00242                 _data->comm.send( _data->masterRank, 1, & _data->tempArray[0], recvSize );
00243             }
00244         };
00245 
00252         template< class EOT >
00253         class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData<EOT> >
00254         {
00255             public:
00256             using IsFinishedFunction< ParallelApplyData<EOT> >::_data;
00257 
00258             IsFinishedParallelApply( IsFinishedParallelApply<EOT> * w = 0 ) : IsFinishedFunction< ParallelApplyData<EOT> >( w )
00259             {
00260                 // empty
00261             }
00262 
00263             bool operator()()
00264             {
00265                 return _data->index == _data->size;
00266             }
00267         };
00268 
00277         template< class EOT >
00278         struct ParallelApplyStore : public JobStore< ParallelApplyData<EOT> >
00279         {
00280             using JobStore< ParallelApplyData<EOT> >::_stf;
00281             using JobStore< ParallelApplyData<EOT> >::_hrf;
00282             using JobStore< ParallelApplyData<EOT> >::_ptf;
00283             using JobStore< ParallelApplyData<EOT> >::_iff;
00284 
00296             ParallelApplyStore(
00297                     eoUF<EOT&, void> & _proc,
00298                     int _masterRank,
00299                     int _packetSize = 1,
00300                     // JobStore functors
00301                     SendTaskParallelApply<EOT> * stpa = 0,
00302                     HandleResponseParallelApply<EOT>* hrpa = 0,
00303                     ProcessTaskParallelApply<EOT>* ptpa = 0,
00304                     IsFinishedParallelApply<EOT>* ifpa = 0
00305                    ) :
00306                 _data( _proc, _masterRank, _packetSize )
00307             {
00308                 if( stpa == 0 ) {
00309                     stpa = new SendTaskParallelApply<EOT>;
00310                     stpa->needDelete( true );
00311                 }
00312 
00313                 if( hrpa == 0 ) {
00314                     hrpa = new HandleResponseParallelApply<EOT>;
00315                     hrpa->needDelete( true );
00316                 }
00317 
00318                 if( ptpa == 0 ) {
00319                     ptpa = new ProcessTaskParallelApply<EOT>;
00320                     ptpa->needDelete( true );
00321                 }
00322 
00323                 if( ifpa == 0 ) {
00324                     ifpa = new IsFinishedParallelApply<EOT>;
00325                     ifpa->needDelete( true );
00326                 }
00327 
00328                 _stf = stpa;
00329                 _hrf = hrpa;
00330                 _ptf = ptpa;
00331                 _iff = ifpa;
00332             }
00333 
00334             ParallelApplyData<EOT>* data() { return &_data; }
00335 
00341             void data( std::vector<EOT>& _pop )
00342             {
00343                 _data.init( _pop );
00344             }
00345 
00346             virtual ~ParallelApplyStore() // for inheritance purposes only
00347             {
00348             }
00349 
00350             protected:
00351             ParallelApplyData<EOT> _data;
00352         };
00353 
00363         template< typename EOT >
00364         class ParallelApply : public MultiJob< ParallelApplyData<EOT> >
00365         {
00366             public:
00367 
00368             ParallelApply(
00369                     AssignmentAlgorithm & algo,
00370                     int _masterRank,
00371                     ParallelApplyStore<EOT> & store
00372                     ) :
00373                 MultiJob< ParallelApplyData<EOT> >( algo, _masterRank, store )
00374             {
00375                 // empty
00376             }
00377         };
00378 
00383     }
00384 }
00385 # endif // __EO_PARALLEL_APPLY_H__
00386 
00387 
 All Classes Namespaces Files Functions Variables Typedefs Friends