EvolvingObjects
|
00001 /* 00002 (c) Thales group, 2012 00003 00004 This library is free software; you can redistribute it and/or 00005 modify it under the terms of the GNU Lesser General Public 00006 License as published by the Free Software Foundation; 00007 version 2 of the License. 00008 00009 This library is distributed in the hope that it will be useful, 00010 but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00012 Lesser General Public License for more details. 00013 00014 You should have received a copy of the GNU Lesser General Public 00015 License along with this library; if not, write to the Free Software 00016 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00017 Contact: http://eodev.sourceforge.net 00018 00019 Authors: 00020 Benjamin Bouvier <benjamin.bouvier@gmail.com> 00021 */ 00022 # ifndef __EO_PARALLEL_APPLY_H__ 00023 # define __EO_PARALLEL_APPLY_H__ 00024 00025 # include "eoMpi.h" 00026 00027 # include <eoFunctor.h> // eoUF 00028 # include <vector> // std::vector population 00029 00052 namespace eo 00053 { 00054 namespace mpi 00055 { 00061 struct ParallelApplyAssignment 00062 { 00063 int index; 00064 int size; 00065 }; 00066 00083 template<class EOT> 00084 struct ParallelApplyData 00085 { 00095 ParallelApplyData( 00096 eoUF<EOT&, void> & _proc, 00097 int _masterRank, 00098 int _packetSize, 00099 std::vector<EOT> * table = 0 00100 ) : 00101 _table( table ), func( _proc ), index( 0 ), packetSize( _packetSize ), masterRank( _masterRank ), comm( Node::comm() ) 00102 { 00103 if ( _packetSize <= 0 ) 00104 { 00105 throw std::runtime_error("Packet size should not be negative."); 00106 } 00107 00108 if( table ) 00109 { 00110 size = table->size(); 00111 } 00112 } 00113 00117 void init( std::vector<EOT>& table ) 00118 { 00119 index = 0; 00120 size = table.size(); 00121 _table = &table; 00122 assignedTasks.clear(); 00123 } 00124 00125 std::vector<EOT>& table() 00126 { 00127 return *_table; 00128 } 00129 00130 // All elements are public since functors will often use them. 00131 std::vector<EOT> * _table; 00132 eoUF<EOT&, void> & func; 00133 int index; 00134 int size; 00135 std::map< int /* worker rank */, ParallelApplyAssignment /* last assignment */> assignedTasks; 00136 int packetSize; 00137 std::vector<EOT> tempArray; 00138 00139 int masterRank; 00140 bmpi::communicator& comm; 00141 }; 00142 00152 template< class EOT > 00153 class SendTaskParallelApply : public SendTaskFunction< ParallelApplyData<EOT> > 00154 { 00155 public: 00156 using SendTaskFunction< ParallelApplyData<EOT> >::_data; 00157 00158 SendTaskParallelApply( SendTaskParallelApply<EOT> * w = 0 ) : SendTaskFunction< ParallelApplyData<EOT> >( w ) 00159 { 00160 // empty 00161 } 00162 00163 void operator()(int wrkRank) 00164 { 00165 int futureIndex; 00166 00167 if( _data->index + _data->packetSize < _data->size ) 00168 { 00169 futureIndex = _data->index + _data->packetSize; 00170 } else { 00171 futureIndex = _data->size; 00172 } 00173 00174 int sentSize = futureIndex - _data->index ; 00175 00176 _data->comm.send( wrkRank, 1, sentSize ); 00177 00178 eo::log << eo::progress << "Evaluating individual " << _data->index << std::endl; 00179 00180 _data->assignedTasks[ wrkRank ].index = _data->index; 00181 _data->assignedTasks[ wrkRank ].size = sentSize; 00182 00183 _data->comm.send( wrkRank, 1, & ( (_data->table())[ _data->index ] ) , sentSize ); 00184 _data->index = futureIndex; 00185 } 00186 }; 00187 00193 template< class EOT > 00194 class HandleResponseParallelApply : public HandleResponseFunction< ParallelApplyData<EOT> > 00195 { 00196 public: 00197 using HandleResponseFunction< ParallelApplyData<EOT> >::_data; 00198 00199 HandleResponseParallelApply( HandleResponseParallelApply<EOT> * w = 0 ) : HandleResponseFunction< ParallelApplyData<EOT> >( w ) 00200 { 00201 // empty 00202 } 00203 00204 void operator()(int wrkRank) 00205 { 00206 _data->comm.recv( wrkRank, 1, & (_data->table()[ _data->assignedTasks[wrkRank].index ] ), _data->assignedTasks[wrkRank].size ); 00207 } 00208 }; 00209 00218 template< class EOT > 00219 class ProcessTaskParallelApply : public ProcessTaskFunction< ParallelApplyData<EOT> > 00220 { 00221 public: 00222 using ProcessTaskFunction< ParallelApplyData<EOT> >::_data; 00223 00224 ProcessTaskParallelApply( ProcessTaskParallelApply<EOT> * w = 0 ) : ProcessTaskFunction< ParallelApplyData<EOT> >( w ) 00225 { 00226 // empty 00227 } 00228 00229 void operator()() 00230 { 00231 int recvSize; 00232 00233 _data->comm.recv( _data->masterRank, 1, recvSize ); 00234 _data->tempArray.resize( recvSize ); 00235 _data->comm.recv( _data->masterRank, 1, & _data->tempArray[0] , recvSize ); 00236 timerStat.start("worker_processes"); 00237 for( int i = 0; i < recvSize ; ++i ) 00238 { 00239 _data->func( _data->tempArray[ i ] ); 00240 } 00241 timerStat.stop("worker_processes"); 00242 _data->comm.send( _data->masterRank, 1, & _data->tempArray[0], recvSize ); 00243 } 00244 }; 00245 00252 template< class EOT > 00253 class IsFinishedParallelApply : public IsFinishedFunction< ParallelApplyData<EOT> > 00254 { 00255 public: 00256 using IsFinishedFunction< ParallelApplyData<EOT> >::_data; 00257 00258 IsFinishedParallelApply( IsFinishedParallelApply<EOT> * w = 0 ) : IsFinishedFunction< ParallelApplyData<EOT> >( w ) 00259 { 00260 // empty 00261 } 00262 00263 bool operator()() 00264 { 00265 return _data->index == _data->size; 00266 } 00267 }; 00268 00277 template< class EOT > 00278 struct ParallelApplyStore : public JobStore< ParallelApplyData<EOT> > 00279 { 00280 using JobStore< ParallelApplyData<EOT> >::_stf; 00281 using JobStore< ParallelApplyData<EOT> >::_hrf; 00282 using JobStore< ParallelApplyData<EOT> >::_ptf; 00283 using JobStore< ParallelApplyData<EOT> >::_iff; 00284 00296 ParallelApplyStore( 00297 eoUF<EOT&, void> & _proc, 00298 int _masterRank, 00299 int _packetSize = 1, 00300 // JobStore functors 00301 SendTaskParallelApply<EOT> * stpa = 0, 00302 HandleResponseParallelApply<EOT>* hrpa = 0, 00303 ProcessTaskParallelApply<EOT>* ptpa = 0, 00304 IsFinishedParallelApply<EOT>* ifpa = 0 00305 ) : 00306 _data( _proc, _masterRank, _packetSize ) 00307 { 00308 if( stpa == 0 ) { 00309 stpa = new SendTaskParallelApply<EOT>; 00310 stpa->needDelete( true ); 00311 } 00312 00313 if( hrpa == 0 ) { 00314 hrpa = new HandleResponseParallelApply<EOT>; 00315 hrpa->needDelete( true ); 00316 } 00317 00318 if( ptpa == 0 ) { 00319 ptpa = new ProcessTaskParallelApply<EOT>; 00320 ptpa->needDelete( true ); 00321 } 00322 00323 if( ifpa == 0 ) { 00324 ifpa = new IsFinishedParallelApply<EOT>; 00325 ifpa->needDelete( true ); 00326 } 00327 00328 _stf = stpa; 00329 _hrf = hrpa; 00330 _ptf = ptpa; 00331 _iff = ifpa; 00332 } 00333 00334 ParallelApplyData<EOT>* data() { return &_data; } 00335 00341 void data( std::vector<EOT>& _pop ) 00342 { 00343 _data.init( _pop ); 00344 } 00345 00346 virtual ~ParallelApplyStore() // for inheritance purposes only 00347 { 00348 } 00349 00350 protected: 00351 ParallelApplyData<EOT> _data; 00352 }; 00353 00363 template< typename EOT > 00364 class ParallelApply : public MultiJob< ParallelApplyData<EOT> > 00365 { 00366 public: 00367 00368 ParallelApply( 00369 AssignmentAlgorithm & algo, 00370 int _masterRank, 00371 ParallelApplyStore<EOT> & store 00372 ) : 00373 MultiJob< ParallelApplyData<EOT> >( algo, _masterRank, store ) 00374 { 00375 // empty 00376 } 00377 }; 00378 00383 } 00384 } 00385 # endif // __EO_PARALLEL_APPLY_H__ 00386 00387