EO's Parallelization with MPI

What is parallelization?

Shared memory (e.g : OpenMP)

Message Passing Interface (e.g : OpenMPI)

Memory isn't shared here, manipulated objects are sent on a network: there is communication between the machines (called hosts)

Parallelization myths

A myth about speed: the car's enigma

A myth about speed: "Il dit qu'il voit pas le rapport ?"

A myth about data : the cat's enigma

A myth about data

A metric: speedup

Parallelization in EO

Objectives

Evaluation: Long story short


    int main( int argc, char **argv )
    {
        eo::mpi::Node::init( argc, argv );
        // PUT EO STUFF HERE
        // Let's make the assumption that pop is a eoPop<EOT>
        // and evalFunc is an evaluation functor
        eo::mpi::DynamicAssignmentAlgorithm assign;
        eoParallelPopLoopEval<EOT> popEval( assign, eo::mpi::DEFAULT_MASTER, evalFunc );
        popEval( pop, pop );
    }
    

Serializing EO objects

eoserial : principle

eoserial : interface eoserial::Persistent

    
    # include <serial/eoSerial.h>

    class MyObject : public eoserial::Persistent {
        public:

        // A persistent class needs a default empty ctor.
        MyObject() {}

        int id;

        // Implementation of eoserial::Persistent::pack
        // What to save when making a serialized object?
        eoserial::Object* pack() const
        {
            eoserial::Object* obj = new eoserial::Object;
            // eoserial::make creates a eoserial::String from a basic type
            eoserial::String* idAsString = eoserial::make( id );
            // the key "saved_id" will be associated to the JSON object idAsString
            obj->add( "saved_id", idAsString );
            // could have be done with
            // (*obj)["saved_id"] = idAsString;
            // as obj is a std::map pointer
            return obj;
        }

        // Implementation of eoserial::Persistent::unpack
        // What data to retrieve from a JSON object and where to put it?
        void unpack(const eoserial::Object* json)
        {
            // retrieves the value from key "saved_id" in "*json" object and put it into member "id"
            eoserial::unpack( *json, "saved_id" , id );
        }
    };
    
    

eoserial : use it

    
# include <eoSerial.h>
# include <fstream>
# include <cassert>

    int main(void)
    {
        MyObject instance;
        instance.id = 42;

        // Writes
        eoserial::Object* obj = instance.pack();
        std::ofstream ofile("filename");
        obj->print( ofile );
        ofile.close();
        delete obj;

        // Reads
        std::ifstream ifile("filename");
        std::stringstream ss;
        while( ifile )
        {
            std::string s;
            ifile >> s;
            ss << s;
        }
        eoserial::Object* objCopy = eoserial::Parser::parse( ss.str() );
        MyObject instanceCopy;
        instanceCopy.unpack( objCopy );

        assert( instanceCopy.id == instance.id );

        return 0;
    }
    
    

eoserial : more complex uses


    struct ComplexObject
    {
        bool someBooleanValue; // will be serialized into a string
        MyObject obj; // Objects can contain other objects too
        std::vector<int>; // and tables too!
    };

    int main(void)
    {
        ComplexObject co;
        // let's imagine we've set values of co.
        eoserial::Object* json = new eoserial::Object;
        // serialize basic type
        (*json)["some_boolean_value"] = eoserial::make( co.someBooleanValue );
        // MyObject is Persistent, so eoserial knows how to serialize it
        json->add( "my_object", &co.obj );
        // Instead of having a "for" loop, let's automatically serialize the content of the array
        json->add( "int_array",
            eoserial::makeArray< std::vector<int>, eoserial::MakeAlgorithm >( co.array ) );
        // Print everything on the standard output
        json->print( std::cout );
        delete json;
        return 0;
    }
    

MPI

Design of parallel algorithms

Some vocabulary

Evaluation (1/2)

Let's see how we could implement our parallelized evaluation
It's feasible as evaluating an individual is independant from evaluating another one.


        // On master side
        function parallel_evaluate( population p )
            foreach individual i in p,
                send i to a worker
                if there is no available worker,
                    wait for any response (return)
                    and retry
                endif
            endforeach
            inform all the available workers that they are done (yes, it's a centralized algorithm)
            wait for all remaining responses
        endfunction

        when receiving a response:
            replace the evaluated individual in the population

        // On worker side
        function parallel_evaluate( evaluation function f )
            wait for a individual i
            apply f on it
            send i to the master
        endfunction
        

Evaluation (2/2)

But a parallelization algorithm is interesting only if the process time is higher than the communication time. If process time is too short relatively to the communication time, we can do the following:


        // On master side
        function parallel_evaluate( population p, number of elements to send each time packet_size )
            index = 0
            while index < size
                sentSize := how many individuals (<= packet_size) can we send to a worker?
                find a worker. If there is no one, wait for any response (return) and retry
                send the sentSize to the worker
                send the individuals to the worker
                index += sentSize
            endwhile
            inform all the available workers that they're done
            wait for all remaining responses
        endfunction

        when receiving a response:
            replace the evaluated individuals in the population

        // On worker side
        function parallel_evaluate( evaluation function f )
        size := wait for a sentSize as described above
            individuals := wait for size individuals
            apply f on each of them
            send back the individuals
        endfunction
        

Multi start

The idea behing multi-start is to run many times the same algorithm (for instance, eoEasyEA), but with different seeds: the workers launch the algorithm and send their solutions as they come to the master, which saves the ultimate best solution.

        // On master side
        variable best_score (initialized at the worst value ever) // score can be fitness, for instance

        function parallel_multistart( integer runs )
            seeds = table of generated seeds, or fixed seeds, whose size is at least "runs"
            for i := 0; i < runs; ++i
                find a worker. If there is no one, wait for any response (return) and retry
                send to the worker a different seed
            endfor
            inform all the available workers that they're done
            wait for all remaining responses
        endfunction

        when receiving a response:
            received_score := receive score from the worker.
            If the received_score > best_score
                send worker a message indicating that master is interested by the solution
                receive the solution
                updates the best_score
            else
                send worker a message indicating that master isn't interested by the solution
            endif

        // On worker side
        function parallel_multistart( algorithm eoAlgo )
            seed := wait for a seed
            solution := eoAlgo( seed )
            send solution score to master
            master_is_interested := wait for the response
            if master_is_interested
                send solution to master
            endif
        endfunction
    

Common parts vs specific parts


        // On master side
        function parallel_evaluate(population p, number of elements to send each time packet_size )
            index = 0
            while index < size
                find a worker. If there is no one, wait for any response (return) and retry
                sentSize := how many individuals (<= packet_size) can we send to a worker?
                send the sentSize to the worker
                send the individuals to the worker
                index += sentSize
            endwhile
            inform all the available workers that they're done
            wait for all remaining responses
        endfunction

        when receiving a response:
            replace the evaluated individuals in the population

        // On worker side
        function parallel_evaluate( evaluation function f )
        size := wait for a sentSize as described above
            individuals := wait for size individuals
            apply f on each of them
            send back the individuals
        endfunction
        

Common parts

Specific parts

Generic parallel algorithm

The calls to specific parts are in red.


    // Master side
    function parallel_algorithm()
        while ! isFinished()
            worker := none
            while worker is none
                wait for a response and affect worker the origin of the response
                handleResponse( worker )
                worker = retrieve worker
            endwhile
            send worker a work order
            sendTask( worker )
        endwhile

        foreach available worker
            indicate worker it's done (send them a termination order)
        endforeach

        while all responses haven't be received
            worker := none
            wait for a response and affect worker the origin of the response
            handleResponse( worker )
            send worker a termination order
        endwhile
    endfunction

    // Worker side
    function parallel_algorithm()
        order := receive order
        while order is not termination order
            processTask( )
            order = receive order
        endwhile
    endfunction
    

TLDR;

Functors

Stores

Scheduling tasks between workers

Let's go back to evaluation in EO

Customizing evaluation: reminder


    int main( int argc, char **argv )
    {
        eo::mpi::Node::init( argc, argv );
        // PUT EO STUFF HERE
        // Let's make the assumption that pop is a eoPop<EOT>
        // and evalFunc is an evaluation functor
        eo::mpi::DynamicAssignmentAlgorithm assign;
        eoParallelPopLoopEval<EOT> popEval( assign, eo::mpi::DEFAULT_MASTER, evalFunc );
        // The store is hidden behind this call, but it can be given at eoParallelPopLoopEval constructor!
        popEval( pop, pop );
    }
    

Customizing evaluation: the idea

Customizing evaluation: implementation!


    // Our objective is to minimize fitness, for instance
    struct CatBestAnswers : public eo::mpi::HandleResponseParallelApply<EOT>
    {
        CatBestAnswers()
        {
            best.fitness( 1000000000. );
        }

        void operator()(int wrkRank)
        {
            // Retrieve informations about the slice processed by the worker
            int index = _data->assignedTasks[wrkRank].index;
            int size = _data->assignedTasks[wrkRank].size;
            // call to the wrapped function HERE
            (*_wrapped)( wrkRank );
            // Compare fitnesses of evaluated individuals with the best saved
            for(int i = index; i < index+size; ++i)
            {
                if( best.fitness() < _data->table()[ i ].fitness() )
                {
                    eo::log << eo::quiet << "Better solution found:" << _data->table()[i].fitness() << std::endl;
                    best = _data->table()[ i ];
                }
            }
        }

        protected:

        EOT best;
    };
    

Using customized handler


        int main( int argc, char **argv )
        {
            eo::mpi::Node::init( argc, argv );
            // PUT EO STUFF HERE
            // Let's make the assumption that pop is a eoPop<EOT>
            // and evalFunc is an evaluation functor
            eo::mpi::DynamicAssignmentAlgorithm assign;
            // What was used before
            // eoParallelPopLoopEval<EOT> popEval( assign, eo::mpi::DEFAULT_MASTER, evalFunc );
            // What's new
            eo::mpi::ParallelApplyStore< EOT > store( evalFunc, eo::mpi::DEFAULT_MASTER );
            CatBestAnswer catBestAnswers;
            store.wrapHandleResponse( &catBestAnswers );

            eoParallelPopLoopEval< EOT > popEval( assign, eo::mpi::DEFAULT_MASTER, &store );
            // What doesn't change
            popEval( pop, pop );
        }
    

Thank you for your attention

Remarks

/

#