EvolvingObjects
eoMpi.h
00001 /*
00002 (c) Thales group, 2012
00003 
00004     This library is free software; you can redistribute it and/or
00005     modify it under the terms of the GNU Lesser General Public
00006     License as published by the Free Software Foundation;
00007     version 2 of the License.
00008 
00009     This library is distributed in the hope that it will be useful,
00010     but WITHOUT ANY WARRANTY; without even the implied warranty of
00011     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012     Lesser General Public License for more details.
00013 
00014     You should have received a copy of the GNU Lesser General Public
00015     License along with this library; if not, write to the Free Software
00016     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017 Contact: http://eodev.sourceforge.net
00018 
00019 Authors:
00020     Benjamin Bouvier <benjamin.bouvier@gmail.com>
00021 */
00022 # ifndef __EO_MPI_H__
00023 # define __EO_MPI_H__
00024 
00025 # include <vector>  // std::vector
00026 
00027 # include <utils/eoLogger.h>
00028 # include <utils/eoTimer.h>
00029 # include <eoFunctor.h>
00030 # include <eoExceptions.h>
00031 
00032 # include "eoMpiNode.h"
00033 # include "eoMpiAssignmentAlgorithm.h"
00034 
00035 namespace eo
00036 {
00128     namespace mpi
00129     {
00133         extern eoTimerStat timerStat;
00134 
00144         namespace Channel
00145         {
00146             const int Commands = 0;
00147             const int Messages = 1;
00148         }
00149 
00158         namespace Message
00159         {
00160             const int Continue = 0;
00161             const int Finish = 1;
00162             const int Kill = 2;
00163         }
00164 
00170         const int DEFAULT_MASTER = 0;
00171 
00217         template< typename JobData, typename Wrapped >
00218         struct SharedDataFunction
00219         {
00225             SharedDataFunction( Wrapped * w = 0 ) : _data( 0 ), _wrapped( w ), _needDelete( false )
00226             {
00227                 // empty
00228             }
00229 
00235             virtual ~SharedDataFunction()
00236             {
00237                 if( _wrapped && _wrapped->needDelete() )
00238                 {
00239                     delete _wrapped;
00240                 }
00241             }
00242 
00248             void wrapped( Wrapped * w )
00249             {
00250                 _wrapped = w;
00251             }
00252 
00258             void data( JobData* d )
00259             {
00260                 _data = d;
00261                 if( _wrapped )
00262                 {
00263                     _wrapped->data( d );
00264                 }
00265             }
00266 
00272             bool needDelete() { return _needDelete; }
00273             void needDelete( bool b ) { _needDelete = b; }
00274 
00275             protected:
00276             JobData* _data;
00277             Wrapped* _wrapped; // Pointer and not a reference so as to be set at any time and to avoid affectation
00278             bool _needDelete;
00279         };
00280 
00294         template< typename JobData >
00295         struct SendTaskFunction : public eoUF<int, void>, public SharedDataFunction< JobData, SendTaskFunction<JobData> >
00296         {
00297             public:
00298 
00299             SendTaskFunction( SendTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, SendTaskFunction<JobData> >( w )
00300             {
00301                 // empty
00302             }
00303 
00304             virtual ~SendTaskFunction() {} // for inherited classes
00305         };
00306 
00319         template< typename JobData >
00320         struct HandleResponseFunction : public eoUF<int, void>, public SharedDataFunction< JobData, HandleResponseFunction<JobData> >
00321         {
00322             public:
00323 
00324             HandleResponseFunction( HandleResponseFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, HandleResponseFunction<JobData> >( w )
00325             {
00326                 // empty
00327             }
00328 
00329             virtual ~HandleResponseFunction() {} // for inherited classes
00330         };
00331 
00346         template< typename JobData >
00347         struct ProcessTaskFunction : public eoF<void>, public SharedDataFunction< JobData, ProcessTaskFunction<JobData> >
00348         {
00349             public:
00350 
00351             ProcessTaskFunction( ProcessTaskFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, ProcessTaskFunction<JobData> >( w )
00352             {
00353                 // empty
00354             }
00355 
00356             virtual ~ProcessTaskFunction() {} // for inherited classes
00357         };
00358 
00371         template< typename JobData >
00372         struct IsFinishedFunction : public eoF<bool>, public SharedDataFunction< JobData, IsFinishedFunction<JobData> >
00373         {
00374             public:
00375 
00376             IsFinishedFunction( IsFinishedFunction<JobData>* w = 0 ) : SharedDataFunction<JobData, IsFinishedFunction<JobData> >( w )
00377             {
00378                 // empty
00379             }
00380 
00381             virtual ~IsFinishedFunction() {} // for inherited classes
00382         };
00383 
00402         template< typename JobData >
00403         struct JobStore
00404         {
00408             JobStore(
00409                 SendTaskFunction<JobData>* stf,
00410                 HandleResponseFunction<JobData>* hrf,
00411                 ProcessTaskFunction<JobData>* ptf,
00412                 IsFinishedFunction<JobData>* iff
00413             ) :
00414                 _stf( stf ), _hrf( hrf ), _ptf( ptf ), _iff( iff )
00415             {
00416                 // empty
00417             }
00418 
00425             JobStore()
00426             {
00427                 // empty
00428             }
00429 
00435             ~JobStore()
00436             {
00437                 if( _stf->needDelete() ) delete _stf;
00438                 if( _hrf->needDelete() ) delete _hrf;
00439                 if( _ptf->needDelete() ) delete _ptf;
00440                 if( _iff->needDelete() ) delete _iff;
00441             }
00442 
00443             // Getters
00444             SendTaskFunction<JobData> & sendTask() { return *_stf; }
00445             HandleResponseFunction<JobData> & handleResponse() { return *_hrf; }
00446             ProcessTaskFunction<JobData> & processTask() { return *_ptf; }
00447             IsFinishedFunction<JobData> & isFinished() { return *_iff; }
00448 
00449             // Setters
00450             void sendTask( SendTaskFunction<JobData>* stf ) { _stf = stf; }
00451             void handleResponse( HandleResponseFunction<JobData>* hrf ) { _hrf = hrf; }
00452             void processTask( ProcessTaskFunction<JobData>* ptf ) { _ptf = ptf; }
00453             void isFinished( IsFinishedFunction<JobData>* iff ) { _iff = iff; }
00454 
00458             void wrapSendTask( SendTaskFunction<JobData>* stf )
00459             {
00460                 if( stf )
00461                 {
00462                     stf->wrapped( _stf );
00463                     _stf = stf;
00464                 }
00465             }
00466 
00470             void wrapHandleResponse( HandleResponseFunction<JobData>* hrf )
00471             {
00472                 if( hrf )
00473                 {
00474                     hrf->wrapped( _hrf );
00475                     _hrf = hrf;
00476                 }
00477             }
00478 
00482             void wrapProcessTask( ProcessTaskFunction<JobData>* ptf )
00483             {
00484                 if( ptf )
00485                 {
00486                     ptf->wrapped( _ptf );
00487                     _ptf = ptf;
00488                 }
00489             }
00490 
00494             void wrapIsFinished( IsFinishedFunction<JobData>* iff )
00495             {
00496                 if( iff )
00497                 {
00498                     iff->wrapped( _iff );
00499                     _iff = iff;
00500                 }
00501             }
00502 
00503             virtual JobData* data() = 0;
00504 
00505             protected:
00506 
00507             SendTaskFunction< JobData >* _stf;
00508             HandleResponseFunction< JobData >* _hrf;
00509             ProcessTaskFunction< JobData >* _ptf;
00510             IsFinishedFunction< JobData >* _iff;
00511         };
00512 
00532         template< class JobData >
00533         class Job
00534         {
00535             public:
00554                 Job( AssignmentAlgorithm& _algo,
00555                      int _masterRank,
00556                      int _workerStopCondition,
00557                      JobStore<JobData> & _store
00558                     ) :
00559                     assignmentAlgo( _algo ),
00560                     masterRank( _masterRank ),
00561                     workerStopCondition( _workerStopCondition ),
00562                     comm( Node::comm() ),
00563                     // Functors
00564                     store( _store ),
00565                     sendTask( _store.sendTask() ),
00566                     handleResponse( _store.handleResponse() ),
00567                     processTask( _store.processTask() ),
00568                     isFinished( _store.isFinished() )
00569                 {
00570                     _isMaster = Node::comm().rank() == _masterRank;
00571 
00572                     sendTask.data( _store.data() );
00573                     handleResponse.data( _store.data() );
00574                     processTask.data( _store.data() );
00575                     isFinished.data( _store.data() );
00576                 }
00577 
00578             protected:
00579 
00590                 struct FinallyBlock
00591                 {
00592                     FinallyBlock(
00593                             int _totalWorkers,
00594                             AssignmentAlgorithm& _algo,
00595                             Job< JobData > & _that
00596                             ) :
00597                         totalWorkers( _totalWorkers ),
00598                         assignmentAlgo( _algo ),
00599                         that( _that ),
00600                         // global field
00601                         comm( Node::comm() )
00602                     {
00603                         // empty
00604                     }
00605 
00606                     ~FinallyBlock()
00607                     {
00608 # ifndef NDEBUG
00609                         eo::log << eo::debug;
00610                         eo::log << "[M" << comm.rank() << "] Frees all the idle." << std::endl;
00611 # endif
00612                         // frees all the idle workers
00613                         timerStat.start("master_wait_for_idles");
00614                         std::vector<int> idles = assignmentAlgo.idles();
00615                         for(unsigned int i = 0; i < idles.size(); ++i)
00616                         {
00617                             comm.send( idles[i], Channel::Commands, Message::Finish );
00618                         }
00619                         timerStat.stop("master_wait_for_idles");
00620 
00621 # ifndef NDEBUG
00622                         eo::log << "[M" << comm.rank() << "] Waits for all responses." << std::endl;
00623 # endif
00624                         // wait for all responses
00625                         timerStat.start("master_wait_for_all_responses");
00626                         while( assignmentAlgo.availableWorkers() != totalWorkers )
00627                         {
00628                             bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
00629                             int wrkRank = status.source();
00630                             that.handleResponse( wrkRank );
00631                             comm.send( wrkRank, Channel::Commands, Message::Finish );
00632                             assignmentAlgo.confirm( wrkRank );
00633                         }
00634                         timerStat.stop("master_wait_for_all_responses");
00635 # ifndef NDEBUG
00636                         eo::log << "[M" << comm.rank() << "] Leaving master task." << std::endl;
00637 # endif
00638                     }
00639 
00640                     protected:
00641 
00642                     int totalWorkers;
00643                     AssignmentAlgorithm& assignmentAlgo;
00644                     Job< JobData > & that;
00645 
00646                     bmpi::communicator & comm;
00647                 };
00648 
00658                 void master( )
00659                 {
00660                     int totalWorkers = assignmentAlgo.availableWorkers();
00661 # ifndef NDEBUG
00662                     eo::log << eo::debug;
00663                     eo::log << "[M" << comm.rank() << "] Have " << totalWorkers << " workers." << std::endl;
00664 # endif
00665                     try {
00666                         FinallyBlock finally( totalWorkers, assignmentAlgo, *this );
00667                         while( ! isFinished() )
00668                         {
00669                             timerStat.start("master_wait_for_assignee");
00670                             int assignee = assignmentAlgo.get( );
00671                             while( assignee <= 0 )
00672                             {
00673 # ifndef NDEBUG
00674                                 eo::log << "[M" << comm.rank() << "] Waitin' for node..." << std::endl;
00675 # endif
00676                                 bmpi::status status = comm.probe( bmpi::any_source, bmpi::any_tag );
00677                                 int wrkRank = status.source();
00678 # ifndef NDEBUG
00679                                 eo::log << "[M" << comm.rank() << "] Node " << wrkRank << " just terminated." << std::endl;
00680 # endif
00681                                 handleResponse( wrkRank );
00682                                 assignmentAlgo.confirm( wrkRank );
00683                                 assignee = assignmentAlgo.get( );
00684                             }
00685                             timerStat.stop("master_wait_for_assignee");
00686 # ifndef NDEBUG
00687                             eo::log << "[M" << comm.rank() << "] Assignee : " << assignee << std::endl;
00688 # endif
00689 
00690                             timerStat.start("master_wait_for_send");
00691                             comm.send( assignee, Channel::Commands, Message::Continue );
00692                             sendTask( assignee );
00693                             timerStat.stop("master_wait_for_send");
00694                         }
00695                     } catch( const std::exception & e )
00696                     {
00697                         std::string s = e.what();
00698                         s.append( " in eoMpi loop");
00699                         throw std::runtime_error( s );
00700                     }
00701                 }
00702 
00709                 void worker( )
00710                 {
00711                     int order;
00712 # ifndef NDEBUG
00713                     eo::log << eo::debug;
00714 # endif
00715                     timerStat.start("worker_wait_for_order");
00716                     comm.recv( masterRank, Channel::Commands, order );
00717                     timerStat.stop("worker_wait_for_order");
00718 
00719                     while( true )
00720                     {
00721 # ifndef NDEBUG
00722                         eo::log << "[W" << comm.rank() << "] Waiting for an order..." << std::endl;
00723 # endif
00724                         if ( order == workerStopCondition )
00725                         {
00726 # ifndef NDEBUG
00727                             eo::log << "[W" << comm.rank() << "] Leaving worker task." << std::endl;
00728 # endif
00729                             return;
00730                         } else if( order == Message::Continue )
00731                         {
00732 # ifndef NDEBUG
00733                             eo::log << "[W" << comm.rank() << "] Processing task..." << std::endl;
00734 # endif
00735                             processTask( );
00736                         }
00737 
00738                         timerStat.start("worker_wait_for_order");
00739                         comm.recv( masterRank, Channel::Commands, order );
00740                         timerStat.stop("worker_wait_for_order");
00741                     }
00742                 }
00743 
00744             public:
00745 
00750                 void run( )
00751                 {
00752                     ( _isMaster ) ? master( ) : worker( );
00753                 }
00754 
00758                 bool isMaster( )
00759                 {
00760                     return _isMaster;
00761                 }
00762 
00763             protected:
00764 
00765                 AssignmentAlgorithm& assignmentAlgo;
00766                 int masterRank;
00767                 const int workerStopCondition;
00768                 bmpi::communicator& comm;
00769 
00770                 JobStore<JobData>& store;
00771                 SendTaskFunction<JobData> & sendTask;
00772                 HandleResponseFunction<JobData> & handleResponse;
00773                 ProcessTaskFunction<JobData> & processTask;
00774                 IsFinishedFunction<JobData> & isFinished;
00775 
00776                 bool _isMaster;
00777         };
00778 
00790         template< class JobData >
00791         class OneShotJob : public Job< JobData >
00792         {
00793             public:
00794                 OneShotJob( AssignmentAlgorithm& algo,
00795                             int masterRank,
00796                             JobStore<JobData> & store )
00797                     : Job<JobData>( algo, masterRank, Message::Finish, store )
00798                 {
00799                     // empty
00800                 }
00801         };
00802 
00817         template< class JobData >
00818         class MultiJob : public Job< JobData >
00819         {
00820             public:
00821                 MultiJob ( AssignmentAlgorithm& algo,
00822                             int masterRank,
00823                             JobStore<JobData> & store )
00824                     : Job<JobData>( algo, masterRank, Message::Kill, store )
00825                 {
00826                     // empty
00827                 }
00828         };
00829     }
00830 
00834 }
00835 # endif // __EO_MPI_H__
00836 
 All Classes Namespaces Files Functions Variables Typedefs Friends