EvolvingObjects
|
00001 /* 00002 (c) Thales group, 2012 00003 00004 This library is free software; you can redistribute it and/or 00005 modify it under the terms of the GNU Lesser General Public 00006 License as published by the Free Software Foundation; 00007 version 2 of the License. 00008 00009 This library is distributed in the hope that it will be useful, 00010 but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00012 Lesser General Public License for more details. 00013 00014 You should have received a copy of the GNU Lesser General Public 00015 License along with this library; if not, write to the Free Software 00016 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00017 Contact: http://eodev.sourceforge.net 00018 00019 Authors: 00020 Benjamin Bouvier <benjamin.bouvier@gmail.com> 00021 */ 00022 # ifndef __EO_MPI_H__ 00023 # define __EO_MPI_H__ 00024 00025 # include <vector> // std::vector 00026 00027 # include <utils/eoLogger.h> 00028 # include <utils/eoTimer.h> 00029 # include <eoFunctor.h> 00030 # include <eoExceptions.h> 00031 00032 # include "eoMpiNode.h" 00033 # include "eoMpiAssignmentAlgorithm.h" 00034 00035 namespace eo 00036 { 00128 namespace mpi 00129 { 00133 extern eoTimerStat timerStat; 00134 00144 namespace Channel 00145 { 00146 const int Commands = 0; 00147 const int Messages = 1; 00148 } 00149 00158 namespace Message 00159 { 00160 const int Continue = 0; 00161 const int Finish = 1; 00162 const int Kill = 2; 00163 } 00164 00170 const int DEFAULT_MASTER = 0; 00171 00217 template< typename JobData, typename Wrapped > 00218 struct SharedDataFunction 00219 { 00225 SharedDataFunction( Wrapped * w = 0 ) : _data( 0 ), _wrapped( w ), _needDelete( false ) 00226 { 00227 // empty 00228 } 00229 00235 virtual ~SharedDataFunction() 00236 { 00237 if( _wrapped && _wrapped->needDelete() ) 00238 { 00239 delete _wrapped; 00240 } 00241 } 00242 00248 void wrapped( Wrapped * w ) 00249 { 00250 _wrapped = w; 00251 } 00252 00258 void data( JobData* d ) 00259 { 00260 _data = d; 00261 if( _wrapped ) 00262 { 00263 _wrapped->data( d ); 00264 } 00265 } 00266 00272 bool needDelete() { return _needDelete; } 00273 void needDelete( bool b ) { _needDelete = b; } 00274 00275 protected: 00276 JobData* _data; 00277 Wrapped* _wrapped; // Pointer and not a reference so as to be set at any time and to avoid affectation 00278 bool _needDelete; 00279 }; 00280 00294 template< typename JobData > 00295 struct SendTaskFunction : public eoUF<int, void>, public SharedDataFunction< JobData, SendTaskFunction<JobData> > 00296 { 00297 public: 00298 00299 SendTaskFunction( SendTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, SendTaskFunction<JobData> >( w ) 00300 { 00301 // empty 00302 } 00303 00304 virtual ~SendTaskFunction() {} // for inherited classes 00305 }; 00306 00319 template< typename JobData > 00320 struct HandleResponseFunction : public eoUF<int, void>, public SharedDataFunction< JobData, HandleResponseFunction<JobData> > 00321 { 00322 public: 00323 00324 HandleResponseFunction( HandleResponseFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, HandleResponseFunction<JobData> >( w ) 00325 { 00326 // empty 00327 } 00328 00329 virtual ~HandleResponseFunction() {} // for inherited classes 00330 }; 00331 00346 template< typename JobData > 00347 struct ProcessTaskFunction : public eoF<void>, public SharedDataFunction< JobData, ProcessTaskFunction<JobData> > 00348 { 00349 public: 00350 00351 ProcessTaskFunction( ProcessTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, ProcessTaskFunction<JobData> >( w ) 00352 { 00353 // empty 00354 } 00355 00356 virtual ~ProcessTaskFunction() {} // for inherited classes 00357 }; 00358 00371 template< typename JobData > 00372 struct IsFinishedFunction : public eoF<bool>, public SharedDataFunction< JobData, IsFinishedFunction<JobData> > 00373 { 00374 public: 00375 00376 IsFinishedFunction( IsFinishedFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, IsFinishedFunction<JobData> >( w ) 00377 { 00378 // empty 00379 } 00380 00381 virtual ~IsFinishedFunction() {} // for inherited classes 00382 }; 00383 00402 template< typename JobData > 00403 struct JobStore 00404 { 00408 JobStore( 00409 SendTaskFunction<JobData>* stf, 00410 HandleResponseFunction<JobData>* hrf, 00411 ProcessTaskFunction<JobData>* ptf, 00412 IsFinishedFunction<JobData>* iff 00413 ) : 00414 _stf( stf ), _hrf( hrf ), _ptf( ptf ), _iff( iff ) 00415 { 00416 // empty 00417 } 00418 00425 JobStore() 00426 { 00427 // empty 00428 } 00429 00435 ~JobStore() 00436 { 00437 if( _stf->needDelete() ) delete _stf; 00438 if( _hrf->needDelete() ) delete _hrf; 00439 if( _ptf->needDelete() ) delete _ptf; 00440 if( _iff->needDelete() ) delete _iff; 00441 } 00442 00443 // Getters 00444 SendTaskFunction<JobData> & sendTask() { return *_stf; } 00445 HandleResponseFunction<JobData> & handleResponse() { return *_hrf; } 00446 ProcessTaskFunction<JobData> & processTask() { return *_ptf; } 00447 IsFinishedFunction<JobData> & isFinished() { return *_iff; } 00448 00449 // Setters 00450 void sendTask( SendTaskFunction<JobData>* stf ) { _stf = stf; } 00451 void handleResponse( HandleResponseFunction<JobData>* hrf ) { _hrf = hrf; } 00452 void processTask( ProcessTaskFunction<JobData>* ptf ) { _ptf = ptf; } 00453 void isFinished( IsFinishedFunction<JobData>* iff ) { _iff = iff; } 00454 00458 void wrapSendTask( SendTaskFunction<JobData>* stf ) 00459 { 00460 if( stf ) 00461 { 00462 stf->wrapped( _stf ); 00463 _stf = stf; 00464 } 00465 } 00466 00470 void wrapHandleResponse( HandleResponseFunction<JobData>* hrf ) 00471 { 00472 if( hrf ) 00473 { 00474 hrf->wrapped( _hrf ); 00475 _hrf = hrf; 00476 } 00477 } 00478 00482 void wrapProcessTask( ProcessTaskFunction<JobData>* ptf ) 00483 { 00484 if( ptf ) 00485 { 00486 ptf->wrapped( _ptf ); 00487 _ptf = ptf; 00488 } 00489 } 00490 00494 void wrapIsFinished( IsFinishedFunction<JobData>* iff ) 00495 { 00496 if( iff ) 00497 { 00498 iff->wrapped( _iff ); 00499 _iff = iff; 00500 } 00501 } 00502 00503 virtual JobData* data() = 0; 00504 00505 protected: 00506 00507 SendTaskFunction< JobData >* _stf; 00508 HandleResponseFunction< JobData >* _hrf; 00509 ProcessTaskFunction< JobData >* _ptf; 00510 IsFinishedFunction< JobData >* _iff; 00511 }; 00512 00532 template< class JobData > 00533 class Job 00534 { 00535 public: 00554 Job( AssignmentAlgorithm& _algo, 00555 int _masterRank, 00556 int _workerStopCondition, 00557 JobStore<JobData> & _store 00558 ) : 00559 assignmentAlgo( _algo ), 00560 masterRank( _masterRank ), 00561 workerStopCondition( _workerStopCondition ), 00562 comm( Node::comm() ), 00563 // Functors 00564 store( _store ), 00565 sendTask( _store.sendTask() ), 00566 handleResponse( _store.handleResponse() ), 00567 processTask( _store.processTask() ), 00568 isFinished( _store.isFinished() ) 00569 { 00570 _isMaster = Node::comm().rank() == _masterRank; 00571 00572 sendTask.data( _store.data() ); 00573 handleResponse.data( _store.data() ); 00574 processTask.data( _store.data() ); 00575 isFinished.data( _store.data() ); 00576 } 00577 00578 protected: 00579 00590 struct FinallyBlock 00591 { 00592 FinallyBlock( 00593 int _totalWorkers, 00594 AssignmentAlgorithm& _algo, 00595 Job< JobData > & _that 00596 ) : 00597 totalWorkers( _totalWorkers ), 00598 assignmentAlgo( _algo ), 00599 that( _that ), 00600 // global field 00601 comm( Node::comm() ) 00602 { 00603 // empty 00604 } 00605 00606 ~FinallyBlock() 00607 { 00608 # ifndef NDEBUG 00609 eo::log << eo::debug; 00610 eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl; 00611 # endif 00612 // frees all the idle workers 00613 timerStat.start("master_wait_for_idles"); 00614 std::vector<int> idles = assignmentAlgo.idles(); 00615 for(unsigned int i = 0; i < idles.size(); ++i) 00616 { 00617 comm.send( idles[i], Channel::Commands, Message::Finish ); 00618 } 00619 timerStat.stop("master_wait_for_idles"); 00620 00621 # ifndef NDEBUG 00622 eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl; 00623 # endif 00624 // wait for all responses 00625 timerStat.start("master_wait_for_all_responses"); 00626 while( assignmentAlgo.availableWorkers() != totalWorkers ) 00627 { 00628 bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); 00629 int wrkRank = status.source(); 00630 that.handleResponse( wrkRank ); 00631 comm.send( wrkRank, Channel::Commands, Message::Finish ); 00632 assignmentAlgo.confirm( wrkRank ); 00633 } 00634 timerStat.stop("master_wait_for_all_responses"); 00635 # ifndef NDEBUG 00636 eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl; 00637 # endif 00638 } 00639 00640 protected: 00641 00642 int totalWorkers; 00643 AssignmentAlgorithm& assignmentAlgo; 00644 Job< JobData > & that; 00645 00646 bmpi::communicator & comm; 00647 }; 00648 00658 void master( ) 00659 { 00660 int totalWorkers = assignmentAlgo.availableWorkers(); 00661 # ifndef NDEBUG 00662 eo::log << eo::debug; 00663 eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl; 00664 # endif 00665 try { 00666 FinallyBlock finally( totalWorkers, assignmentAlgo, *this ); 00667 while( ! isFinished() ) 00668 { 00669 timerStat.start("master_wait_for_assignee"); 00670 int assignee = assignmentAlgo.get( ); 00671 while( assignee <= 0 ) 00672 { 00673 # ifndef NDEBUG 00674 eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl; 00675 # endif 00676 bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag ); 00677 int wrkRank = status.source(); 00678 # ifndef NDEBUG 00679 eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl; 00680 # endif 00681 handleResponse( wrkRank ); 00682 assignmentAlgo.confirm( wrkRank ); 00683 assignee = assignmentAlgo.get( ); 00684 } 00685 timerStat.stop("master_wait_for_assignee"); 00686 # ifndef NDEBUG 00687 eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl; 00688 # endif 00689 00690 timerStat.start("master_wait_for_send"); 00691 comm.send( assignee, Channel::Commands, Message::Continue ); 00692 sendTask( assignee ); 00693 timerStat.stop("master_wait_for_send"); 00694 } 00695 } catch( const std::exception & e ) 00696 { 00697 std::string s = e.what(); 00698 s.append( " in eoMpi loop"); 00699 throw std::runtime_error( s ); 00700 } 00701 } 00702 00709 void worker( ) 00710 { 00711 int order; 00712 # ifndef NDEBUG 00713 eo::log << eo::debug; 00714 # endif 00715 timerStat.start("worker_wait_for_order"); 00716 comm.recv( masterRank, Channel::Commands, order ); 00717 timerStat.stop("worker_wait_for_order"); 00718 00719 while( true ) 00720 { 00721 # ifndef NDEBUG 00722 eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl; 00723 # endif 00724 if ( order == workerStopCondition ) 00725 { 00726 # ifndef NDEBUG 00727 eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl; 00728 # endif 00729 return; 00730 } else if( order == Message::Continue ) 00731 { 00732 # ifndef NDEBUG 00733 eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl; 00734 # endif 00735 processTask( ); 00736 } 00737 00738 timerStat.start("worker_wait_for_order"); 00739 comm.recv( masterRank, Channel::Commands, order ); 00740 timerStat.stop("worker_wait_for_order"); 00741 } 00742 } 00743 00744 public: 00745 00750 void run( ) 00751 { 00752 ( _isMaster ) ? master( ) : worker( ); 00753 } 00754 00758 bool isMaster( ) 00759 { 00760 return _isMaster; 00761 } 00762 00763 protected: 00764 00765 AssignmentAlgorithm& assignmentAlgo; 00766 int masterRank; 00767 const int workerStopCondition; 00768 bmpi::communicator& comm; 00769 00770 JobStore<JobData>& store; 00771 SendTaskFunction<JobData> & sendTask; 00772 HandleResponseFunction<JobData> & handleResponse; 00773 ProcessTaskFunction<JobData> & processTask; 00774 IsFinishedFunction<JobData> & isFinished; 00775 00776 bool _isMaster; 00777 }; 00778 00790 template< class JobData > 00791 class OneShotJob : public Job< JobData > 00792 { 00793 public: 00794 OneShotJob( AssignmentAlgorithm& algo, 00795 int masterRank, 00796 JobStore<JobData> & store ) 00797 : Job<JobData>( algo, masterRank, Message::Finish, store ) 00798 { 00799 // empty 00800 } 00801 }; 00802 00817 template< class JobData > 00818 class MultiJob : public Job< JobData > 00819 { 00820 public: 00821 MultiJob ( AssignmentAlgorithm& algo, 00822 int masterRank, 00823 JobStore<JobData> & store ) 00824 : Job<JobData>( algo, masterRank, Message::Kill, store ) 00825 { 00826 // empty 00827 } 00828 }; 00829 } 00830 00834 } 00835 # endif // __EO_MPI_H__ 00836