Skip to content
Snippets Groups Projects
UPstream.C 17.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*---------------------------------------------------------------------------*\
      =========                 |
      \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
       \\    /   O peration     |
    
        \\  /    A nd           | Copyright (C) 2011-2016 OpenFOAM Foundation
    
         \\/     M anipulation  | Copyright (C) 2016 OpenCFD Ltd.
    
    -------------------------------------------------------------------------------
    License
        This file is part of OpenFOAM.
    
    
        OpenFOAM is free software: you can redistribute it and/or modify it
        under the terms of the GNU General Public License as published by
        the Free Software Foundation, either version 3 of the License, or
        (at your option) any later version.
    
    
        OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
        ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
        FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
        for more details.
    
        You should have received a copy of the GNU General Public License
    
        along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.
    
    
    \*---------------------------------------------------------------------------*/
    
    
    #include "PstreamReduceOps.H"
    #include "OSspecific.H"
    
    #include "PstreamGlobals.H"
    
    mattijs's avatar
    mattijs committed
    #include "SubList.H"
    
    #include "allReduce.H"
    
    #include <cstring>
    #include <cstdlib>
    #include <csignal>
    
    
    #if defined(WM_SP)
    
        #define MPI_SCALAR MPI_FLOAT
    
    #elif defined(WM_DP)
    
        #define MPI_SCALAR MPI_DOUBLE
    
    // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
    
    // file-scope: min value and default for mpiBufferSize
    static const int minBufferSize = 20000000;
    
    
    
    // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
    
    
    // NOTE:
    // valid parallel options vary between implementations, but flag common ones.
    // if they are not removed by MPI_Init(), the subsequent argument processing
    // will notice that they are wrong
    
    mattijs's avatar
    mattijs committed
    void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
    
    {
        validParOptions.insert("np", "");
        validParOptions.insert("p4pg", "PI file");
        validParOptions.insert("p4wd", "directory");
        validParOptions.insert("p4amslave", "");
        validParOptions.insert("p4yourname", "hostname");
        validParOptions.insert("machinefile", "machine file");
    }
    
    
    
    mattijs's avatar
    mattijs committed
    bool Foam::UPstream::init(int& argc, char**& argv)
    
    {
        MPI_Init(&argc, &argv);
    
        int numprocs;
        MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    
        int myRank;
        MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
    
        if (debug)
        {
            Pout<< "UPstream::init : initialised with numProcs:" << numprocs
    
                << " myRank:" << myRank << endl;
    
            FatalErrorInFunction
    
    mattijs's avatar
    mattijs committed
                << "bool IPstream::init(int& argc, char**& argv) : "
    
                   "attempt to run parallel on 1 processor"
                << Foam::abort(FatalError);
        }
    
    
    
        // Initialise parallel structure
        setParRun(numprocs);
    
            // Normally use UPstream::mpiBufferSize (optimisationSwitch),
            // but allow override with the MPI_BUFFER_SIZE env variable
            // which has an int value
            int bufSize = 0;
    
            const std::string str = Foam::getEnv("MPI_BUFFER_SIZE");
            if (str.empty() || !Foam::read(str.c_str(), bufSize) || bufSize <= 0)
    
                bufSize = mpiBufferSize;
    
    
            if (bufSize < minBufferSize)
            {
                bufSize = minBufferSize;
            }
    
            if (debug)
            {
                Pout<< "UPstream::init : mpi-buffer-size " << bufSize << endl;
            }
    
            MPI_Buffer_attach(new char[bufSize], bufSize);
    
    mattijs's avatar
    mattijs committed
    void Foam::UPstream::exit(int errnum)
    
        if (debug)
        {
            Pout<< "UPstream::exit." << endl;
        }
    
    
        {
            int size;
            char* buff;
            MPI_Buffer_detach(&buff, &size);
            delete[] buff;
        }
    
        if (PstreamGlobals::outstandingRequests_.size())
        {
            label n = PstreamGlobals::outstandingRequests_.size();
            PstreamGlobals::outstandingRequests_.clear();
    
    
                << "There are still " << n << " outstanding MPI_Requests." << endl
                << "This means that your code exited before doing a"
    
    mattijs's avatar
    mattijs committed
                << " UPstream::waitRequests()." << endl
    
                << "This should not happen for a normal code exit."
                << endl;
        }
    
    
        // Clean mpi communicators
        forAll(myProcNo_, communicator)
        {
            if (myProcNo_[communicator] != -1)
            {
                freePstreamCommunicator(communicator);
            }
        }
    
    
        if (errnum == 0)
        {
            MPI_Finalize();
            ::exit(errnum);
        }
        else
        {
            MPI_Abort(MPI_COMM_WORLD, errnum);
        }
    }
    
    
    
    mattijs's avatar
    mattijs committed
    void Foam::UPstream::abort()
    
    void Foam::reduce
    (
        scalar& Value,
        const sumOp<scalar>& bop,
        const int tag,
        const label communicator
    )
    
        if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
        {
            Pout<< "** reducing:" << Value << " with comm:" << communicator
    
                << " warnComm:" << UPstream::warnComm
    
                << endl;
            error::printStack(Pout);
        }
    
        allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
    
    void Foam::reduce
    (
        scalar& Value,
        const minOp<scalar>& bop,
        const int tag,
        const label communicator
    )
    
        if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
        {
            Pout<< "** reducing:" << Value << " with comm:" << communicator
    
                << " warnComm:" << UPstream::warnComm
    
                << endl;
            error::printStack(Pout);
        }
    
        allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
    
    void Foam::reduce
    (
        vector2D& Value,
        const sumOp<vector2D>& bop,
        const int tag,
        const label communicator
    )
    
        if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
        {
            Pout<< "** reducing:" << Value << " with comm:" << communicator
    
                << " warnComm:" << UPstream::warnComm
    
                << endl;
            error::printStack(Pout);
        }
    
        allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
    
    void Foam::sumReduce
    (
        scalar& Value,
        label& Count,
    
        const int tag,
        const label communicator
    
        if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
        {
            Pout<< "** reducing:" << Value << " with comm:" << communicator
    
                << " warnComm:" << UPstream::warnComm
    
                << endl;
            error::printStack(Pout);
        }
    
        vector2D twoScalars(Value, scalar(Count));
    
        reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
    
        Value = twoScalars.x();
        Count = twoScalars.y();
    
    void Foam::reduce
    (
        scalar& Value,
        const sumOp<scalar>& bop,
        const int tag,
    
        const label communicator,
    
        label& requestID
    )
    {
    #ifdef MPIX_COMM_TYPE_SHARED
        // Assume mpich2 with non-blocking collectives extensions. Once mpi3
        // is available this will change.
        MPI_Request request;
        scalar v = Value;
        MPIX_Ireduce
        (
            &v,
            &Value,
            1,
            MPI_SCALAR,
            MPI_SUM,
            0,              //root
    
            PstreamGlobals::MPICommunicators_[communicator],
    
            &request
        );
    
        requestID = PstreamGlobals::outstandingRequests_.size();
        PstreamGlobals::outstandingRequests_.append(request);
    
    Henry's avatar
    Henry committed
        if (UPstream::debug)
    
        {
            Pout<< "UPstream::allocateRequest for non-blocking reduce"
                << " : request:" << requestID
                << endl;
    
    #else
        // Non-blocking not yet implemented in mpi
    
        reduce(Value, bop, tag, communicator);
    
    void Foam::UPstream::allToAll
    (
        const labelUList& sendData,
        labelUList& recvData,
        const label communicator
    )
    {
        label np = nProcs(communicator);
    
        if (sendData.size() != np || recvData.size() != np)
        {
            FatalErrorInFunction
                << "Size of sendData " << sendData.size()
                << " or size of recvData " << recvData.size()
                << " is not equal to the number of processors in the domain "
                << np
                << Foam::abort(FatalError);
        }
    
        if (!UPstream::parRun())
        {
    
            recvData.deepCopy(sendData);
    
                    // NOTE: const_cast is a temporary hack for
                    // backward-compatibility with versions of OpenMPI < 1.7.4
    
                    const_cast<label*>(sendData.begin()),
    
                    sizeof(label),
                    MPI_BYTE,
                    recvData.begin(),
                    sizeof(label),
                    MPI_BYTE,
                    PstreamGlobals::MPICommunicators_[communicator]
                )
            )
            {
                FatalErrorInFunction
                    << "MPI_Alltoall failed for " << sendData
                    << " on communicator " << communicator
                    << Foam::abort(FatalError);
            }
        }
    }
    
    
    
    void Foam::UPstream::allocatePstreamCommunicator
    (
        const label parentIndex,
        const label index
    )
    {
        if (index == PstreamGlobals::MPIGroups_.size())
        {
            // Extend storage with dummy values
    
            MPI_Group newGroup = MPI_GROUP_NULL;
    
            PstreamGlobals::MPIGroups_.append(newGroup);
    
            PstreamGlobals::MPICommunicators_.append(newComm);
        }
        else if (index > PstreamGlobals::MPIGroups_.size())
        {
    
            FatalErrorInFunction
                << "PstreamGlobals out of sync with UPstream data. Problem."
    
                << Foam::exit(FatalError);
        }
    
    
        if (parentIndex == -1)
        {
            // Allocate world communicator
    
            if (index != UPstream::worldComm)
            {
    
                FatalErrorInFunction
                    << "world communicator should always be index "
    
                    << UPstream::worldComm << Foam::exit(FatalError);
            }
    
            PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
            MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
            MPI_Comm_rank
            (
                PstreamGlobals::MPICommunicators_[index],
               &myProcNo_[index]
            );
    
            // Set the number of processes to the actual number
            int numProcs;
            MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
    
    
            //procIDs_[index] = identity(numProcs);
            procIDs_[index].setSize(numProcs);
            forAll(procIDs_[index], i)
            {
                procIDs_[index][i] = i;
            }
    
        }
        else
        {
            // Create new group
            MPI_Group_incl
            (
                PstreamGlobals::MPIGroups_[parentIndex],
                procIDs_[index].size(),
                procIDs_[index].begin(),
               &PstreamGlobals::MPIGroups_[index]
            );
    
            // Create new communicator
            MPI_Comm_create
            (
                PstreamGlobals::MPICommunicators_[parentIndex],
                PstreamGlobals::MPIGroups_[index],
               &PstreamGlobals::MPICommunicators_[index]
            );
    
            if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
            {
                myProcNo_[index] = -1;
            }
            else
            {
    
                    MPI_Comm_rank
                    (
                        PstreamGlobals::MPICommunicators_[index],
                       &myProcNo_[index]
                    )
                )
                {
    
                    FatalErrorInFunction
                        << "Problem :"
    
                        << " when allocating communicator at " << index
                        << " from ranks " << procIDs_[index]
                        << " of parent " << parentIndex
                        << " cannot find my own rank"
                        << Foam::exit(FatalError);
                }
    
            }
        }
    }
    
    
    void Foam::UPstream::freePstreamCommunicator(const label communicator)
    {
        if (communicator != UPstream::worldComm)
        {
            if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
            {
    
                // Free communicator. Sets communicator to MPI_COMM_NULL
    
                MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
            }
    
            if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
            {
                // Free greoup. Sets group to MPI_GROUP_NULL
                MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
            }
    
    mattijs's avatar
    mattijs committed
    Foam::label Foam::UPstream::nRequests()
    {
        return PstreamGlobals::outstandingRequests_.size();
    }
    
    
    void Foam::UPstream::resetRequests(const label i)
    {
        if (i < PstreamGlobals::outstandingRequests_.size())
        {
            PstreamGlobals::outstandingRequests_.setSize(i);
        }
    }
    
    
    void Foam::UPstream::waitRequests(const label start)
    
        if (debug)
        {
            Pout<< "UPstream::waitRequests : starting wait for "
    
    mattijs's avatar
    mattijs committed
                << PstreamGlobals::outstandingRequests_.size()-start
                << " outstanding requests starting at " << start << endl;
    
        if (PstreamGlobals::outstandingRequests_.size())
        {
    
    mattijs's avatar
    mattijs committed
            SubList<MPI_Request> waitRequests
            (
                PstreamGlobals::outstandingRequests_,
                PstreamGlobals::outstandingRequests_.size() - start,
                start
            );
    
    
    mattijs's avatar
    mattijs committed
                    waitRequests.size(),
                    waitRequests.begin(),
    
                    MPI_STATUSES_IGNORE
                )
            )
            {
    
                FatalErrorInFunction
                    << "MPI_Waitall returned with error" << Foam::endl;
    
    mattijs's avatar
    mattijs committed
            resetRequests(start);
    
    
        if (debug)
        {
            Pout<< "UPstream::waitRequests : finished wait." << endl;
        }
    
    void Foam::UPstream::waitRequest(const label i)
    {
        if (debug)
        {
            Pout<< "UPstream::waitRequest : starting wait for request:" << i
                << endl;
        }
    
        if (i >= PstreamGlobals::outstandingRequests_.size())
        {
    
            FatalErrorInFunction
                << "There are " << PstreamGlobals::outstandingRequests_.size()
    
                << " outstanding send requests and you are asking for i=" << i
                << nl
                << "Maybe you are mixing blocking/non-blocking comms?"
                << Foam::abort(FatalError);
        }
    
        if
        (
            MPI_Wait
            (
               &PstreamGlobals::outstandingRequests_[i],
                MPI_STATUS_IGNORE
            )
        )
        {
    
            FatalErrorInFunction
                << "MPI_Wait returned with error" << Foam::endl;
    
        }
    
        if (debug)
        {
            Pout<< "UPstream::waitRequest : finished wait for request:" << i
                << endl;
        }
    }
    
    
    
    mattijs's avatar
    mattijs committed
    bool Foam::UPstream::finishedRequest(const label i)
    
        if (debug)
        {
    
            Pout<< "UPstream::finishedRequest : checking request:" << i
    
        if (i >= PstreamGlobals::outstandingRequests_.size())
        {
    
            FatalErrorInFunction
                << "There are " << PstreamGlobals::outstandingRequests_.size()
    
                << " outstanding send requests and you are asking for i=" << i
                << nl
                << "Maybe you are mixing blocking/non-blocking comms?"
                << Foam::abort(FatalError);
        }
    
        int flag;
        MPI_Test
        (
           &PstreamGlobals::outstandingRequests_[i],
           &flag,
            MPI_STATUS_IGNORE
        );
    
    
        if (debug)
        {
    
            Pout<< "UPstream::finishedRequest : finished request:" << i
    
    int Foam::UPstream::allocateTag(const char* s)
    {
        int tag;
        if (PstreamGlobals::freedTags_.size())
        {
            tag = PstreamGlobals::freedTags_.remove();
        }
        else
        {
            tag = PstreamGlobals::nTags_++;
        }
    
        if (debug)
        {
            //if (UPstream::lateBlocking > 0)
            //{
            //    string& poutp = Pout.prefix();
            //    poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
            //    Perr.prefix() = Pout.prefix();
            //}
            Pout<< "UPstream::allocateTag " << s
                << " : tag:" << tag
                << endl;
        }
    
        return tag;
    }
    
    
    int Foam::UPstream::allocateTag(const word& s)
    {
        int tag;
        if (PstreamGlobals::freedTags_.size())
        {
            tag = PstreamGlobals::freedTags_.remove();
        }
        else
        {
            tag = PstreamGlobals::nTags_++;
        }
    
        if (debug)
        {
            //if (UPstream::lateBlocking > 0)
            //{
            //    string& poutp = Pout.prefix();
            //    poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
            //    Perr.prefix() = Pout.prefix();
            //}
            Pout<< "UPstream::allocateTag " << s
                << " : tag:" << tag
                << endl;
        }
    
        return tag;
    }
    
    
    void Foam::UPstream::freeTag(const char* s, const int tag)
    {
        if (debug)
        {
            //if (UPstream::lateBlocking > 0)
            //{
            //    string& poutp = Pout.prefix();
            //    poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
            //    Perr.prefix() = Pout.prefix();
            //}
            Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
        }
        PstreamGlobals::freedTags_.append(tag);
    }
    
    
    void Foam::UPstream::freeTag(const word& s, const int tag)
    {
        if (debug)
        {
            //if (UPstream::lateBlocking > 0)
            //{
            //    string& poutp = Pout.prefix();
            //    poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
            //    Perr.prefix() = Pout.prefix();
            //}
            Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
        }
        PstreamGlobals::freedTags_.append(tag);
    }
    
    
    
    // ************************************************************************* //