Skip to content
Snippets Groups Projects
UPstream.C 18.6 KiB
Newer Older
/*---------------------------------------------------------------------------*\
  =========                 |
  \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\    /   O peration     |
    \\  /    A nd           | Copyright (C) 2011-2012 OpenFOAM Foundation
     \\/     M anipulation  |
-------------------------------------------------------------------------------
License
    This file is part of OpenFOAM.

    OpenFOAM is free software: you can redistribute it and/or modify it
    under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    for more details.

    You should have received a copy of the GNU General Public License
    along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.

\*---------------------------------------------------------------------------*/

#include "mpi.h"

mattijs's avatar
mattijs committed
#include "UPstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "PstreamGlobals.H"
mattijs's avatar
mattijs committed
#include "SubList.H"

#include <cstring>
#include <cstdlib>
#include <csignal>

#if defined(WM_SP)
#   define MPI_SCALAR MPI_FLOAT
#elif defined(WM_DP)
#   define MPI_SCALAR MPI_DOUBLE
#endif

// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //

namespace Foam
{
    typedef struct
    {
        scalar value;
        label count;
    } CountAndValue;

    void reduceSum
    (
        void *in,
        void *inOut,
        int *len,
        MPI_Datatype *dptr
    )
    {
        CountAndValue* inPtr =
            reinterpret_cast<CountAndValue*>(in);
        CountAndValue* inOutPtr =
            reinterpret_cast<CountAndValue*>(inOut);

        for (int i=0; i< *len; ++i)
        {
            inOutPtr->value += inPtr->value;
            inOutPtr->count += inPtr->count;
            inPtr++;
            inOutPtr++;
        }
    }
}


// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
mattijs's avatar
mattijs committed
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
    validParOptions.insert("np", "");
    validParOptions.insert("p4pg", "PI file");
    validParOptions.insert("p4wd", "directory");
    validParOptions.insert("p4amslave", "");
    validParOptions.insert("p4yourname", "hostname");
    validParOptions.insert("GAMMANP", "number of instances");
    validParOptions.insert("machinefile", "machine file");
}


mattijs's avatar
mattijs committed
bool Foam::UPstream::init(int& argc, char**& argv)
{
    MPI_Init(&argc, &argv);

    int numprocs;
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);

    if (debug)
    {
        Pout<< "UPstream::init : initialised with numProcs:" << numprocs
            << " myProcNo:" << myProcNo_ << endl;
    }

mattijs's avatar
mattijs committed
        FatalErrorIn("UPstream::init(int& argc, char**& argv)")
            << "bool IPstream::init(int& argc, char**& argv) : "
               "attempt to run parallel on 1 processor"
            << Foam::abort(FatalError);
    }

    procIDs_.setSize(numprocs);

    forAll(procIDs_, procNo)
    {
        procIDs_[procNo] = procNo;
    }

    setParRun();

#   ifndef SGIMPI
    string bufferSizeName = getEnv("MPI_BUFFER_SIZE");

    if (bufferSizeName.size())
    {
        int bufferSize = atoi(bufferSizeName.c_str());

        if (bufferSize)
        {
            MPI_Buffer_attach(new char[bufferSize], bufferSize);
        }
    }
    else
    {
mattijs's avatar
mattijs committed
        FatalErrorIn("UPstream::init(int& argc, char**& argv)")
            << "UPstream::init(int& argc, char**& argv) : "
            << "environment variable MPI_BUFFER_SIZE not defined"
            << Foam::abort(FatalError);
    }
#   endif

    int processorNameLen;
    char processorName[MPI_MAX_PROCESSOR_NAME];

    MPI_Get_processor_name(processorName, &processorNameLen);

    //signal(SIGABRT, stop);

    // Now that nprocs is known construct communication tables.
    initCommunicationSchedule();

    return true;
}


mattijs's avatar
mattijs committed
void Foam::UPstream::exit(int errnum)
    if (debug)
    {
        Pout<< "UPstream::exit." << endl;
    }

#   ifndef SGIMPI
    int size;
    char* buff;
    MPI_Buffer_detach(&buff, &size);
    delete[] buff;
#   endif

    if (PstreamGlobals::outstandingRequests_.size())
    {
        label n = PstreamGlobals::outstandingRequests_.size();
        PstreamGlobals::outstandingRequests_.clear();

mattijs's avatar
mattijs committed
        WarningIn("UPstream::exit(int)")
            << "There are still " << n << " outstanding MPI_Requests." << endl
            << "This means that your code exited before doing a"
mattijs's avatar
mattijs committed
            << " UPstream::waitRequests()." << endl
            << "This should not happen for a normal code exit."
            << endl;
    }

    if (errnum == 0)
    {
        MPI_Finalize();
        ::exit(errnum);
    }
    else
    {
        MPI_Abort(MPI_COMM_WORLD, errnum);
    }
}


mattijs's avatar
mattijs committed
void Foam::UPstream::abort()
mattijs's avatar
mattijs committed
void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag)
    if (Pstream::debug)
    {
mattijs's avatar
mattijs committed
        Pout<< "Foam::reduce : value:" << Value << endl;
mattijs's avatar
mattijs committed
    if (!UPstream::parRun())
mattijs's avatar
mattijs committed
    if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
mattijs's avatar
mattijs committed
        if (UPstream::master())
mattijs's avatar
mattijs committed
                int slave=UPstream::firstSlave();
                slave<=UPstream::lastSlave();
mattijs's avatar
mattijs committed
                        UPstream::procID(slave),
mattijs's avatar
mattijs committed
                        tag,
mattijs's avatar
mattijs committed
                        MPI_STATUS_IGNORE
                    )
                )
                {
                    FatalErrorIn
                    (
                        "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
                    )   << "MPI_Recv failed"
                        << Foam::abort(FatalError);
                }

                Value = bop(Value, value);
            }
        }
        else
        {
            if
            (
                MPI_Send
                (
                    &Value,
                    1,
                    MPI_SCALAR,
mattijs's avatar
mattijs committed
                    UPstream::procID(UPstream::masterNo()),
mattijs's avatar
mattijs committed
                    tag,
                    MPI_COMM_WORLD
                )
            )
            {
                FatalErrorIn
                (
                    "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
                )   << "MPI_Send failed"
                    << Foam::abort(FatalError);
            }
        }


mattijs's avatar
mattijs committed
        if (UPstream::master())
mattijs's avatar
mattijs committed
                int slave=UPstream::firstSlave();
                slave<=UPstream::lastSlave();
mattijs's avatar
mattijs committed
                        UPstream::procID(slave),
mattijs's avatar
mattijs committed
                        tag,
                        MPI_COMM_WORLD
                    )
                )
                {
                    FatalErrorIn
                    (
                        "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
                    )   << "MPI_Send failed"
                        << Foam::abort(FatalError);
                }
            }
        }
        else
        {
            if
            (
                MPI_Recv
                (
                    &Value,
                    1,
                    MPI_SCALAR,
mattijs's avatar
mattijs committed
                    UPstream::procID(UPstream::masterNo()),
mattijs's avatar
mattijs committed
                    tag,
mattijs's avatar
mattijs committed
                    MPI_STATUS_IGNORE
                )
            )
            {
                FatalErrorIn
                (
                    "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
                )   << "MPI_Recv failed"
                    << Foam::abort(FatalError);
            }
        }
    }
    else
    {
        scalar sum;
        MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
        Value = sum;
    if (Pstream::debug)
    {
        Pout<< "Foam::reduce : reduced value:" << Value << endl;
    }
}
void Foam::reduce(vector2D& Value, const sumOp<vector2D>& bop, const int tag)
{
    if (Pstream::debug)
    {
        Pout<< "Foam::reduce : value:" << Value << endl;
    }

    if (!UPstream::parRun())
    {
        return;
    }

    if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
    {
        if (UPstream::master())
        {
            for
            (
                int slave=UPstream::firstSlave();
                slave<=UPstream::lastSlave();
                slave++
            )
mattijs's avatar
mattijs committed
                        tag,
mattijs's avatar
mattijs committed
                        MPI_STATUS_IGNORE
                        "reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
                    )   << "MPI_Recv failed"
                        << Foam::abort(FatalError);
                }

                Value = bop(Value, value);
            }
                    UPstream::procID(UPstream::masterNo()),
mattijs's avatar
mattijs committed
                    tag,
                    "reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
                )   << "MPI_Send failed"
                    << Foam::abort(FatalError);
            }
        }


            for
            (
                int slave=UPstream::firstSlave();
                slave<=UPstream::lastSlave();
                slave++
            )
mattijs's avatar
mattijs committed
                        tag,
                        "reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
                    )   << "MPI_Send failed"
                        << Foam::abort(FatalError);
                }
            }
        }
        else
        {
            if
            (
                MPI_Recv
                (
                    &Value,
                    2,
                    MPI_SCALAR,
                    UPstream::procID(UPstream::masterNo()),
                    tag,
                    MPI_COMM_WORLD,
                    MPI_STATUS_IGNORE
                )
            )
            {
                FatalErrorIn
                (
                    "reduce(vector2D& Value, const sumOp<vector2D>& sumOp)"
                )   << "MPI_Recv failed"
                    << Foam::abort(FatalError);
            }
        }
    }
    else
    {
        vector2D sum;
        MPI_Allreduce(&Value, &sum, 2, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
        Value = sum;

    if (Pstream::debug)
    {
mattijs's avatar
mattijs committed
        Pout<< "Foam::reduce : reduced value:" << Value << endl;
void Foam::sumReduce
(
    scalar& Value,
    label& Count,
    const int tag
)
{
    static bool hasDataType_ = false;
    static MPI_Datatype mesg_mpi_strct_;
    static MPI_Op myOp_;

    if (Pstream::debug)
    {
        Pout<< "Foam::sumReduce : value:" << Value
            << " count:" << Count << endl;
    }

    if (!UPstream::parRun())
    {
        return;
    }

    if (UPstream::nProcs() <= UPstream::nProcsSimpleSum)
    {
        reduce(Value, sumOp<scalar>(), tag);
        reduce(Count, sumOp<label>(), tag);
    }
    else
    {
        CountAndValue in,out;

        if (!hasDataType_)
        {
            int lengths[2];
            lengths[0] = 1;
            lengths[1] = 1;
            MPI_Datatype types[2];
            types[0] = MPI_DOUBLE;
            types[1] = MPI_INT;
            MPI_Aint addresses[2];
            MPI_Address(&in.value, &addresses[0]);
            MPI_Address(&in.count, &addresses[1]);
            MPI_Aint offsets[2];
            offsets[0] = 0;
            offsets[1] = addresses[1]-addresses[0];

            if
            (
                MPI_Type_create_struct
                (
                    2,
                    lengths,
                    offsets,
                    types,
                    &mesg_mpi_strct_
                )
            )
            {
                FatalErrorIn("Foam::sumReduce()")
                    << "MPI_Type_create_struct" << abort(FatalError);
            }
            if (MPI_Type_commit(&mesg_mpi_strct_))
            {
                FatalErrorIn("Foam::sumReduce()")
                    << "MPI_Type_commit" << abort(FatalError);
            }
            if (MPI_Op_create(reduceSum, true, &myOp_))
            {
                FatalErrorIn("Foam::sumReduce()")
                    << "MPI_Op_create" << abort(FatalError);
            }

            hasDataType_ = true;
        }

        in.value = Value;
        in.count = Count;
        if
        (
            MPI_Allreduce
            (
                &in,
                &out,
                1,
                mesg_mpi_strct_,
                myOp_,
                MPI_COMM_WORLD
            )
        )
        {
            FatalErrorIn("Foam::sumReduce(..)")
                << "Problem." << abort(FatalError);
        }
        Value = out.value;
        Count = out.count;
    }

    if (Pstream::debug)
    {
        Pout<< "Foam::reduce : reduced value:" << Value
            << " reduced count:" << Count << endl;
    }
}


void Foam::reduce
(
    scalar& Value,
    const sumOp<scalar>& bop,
    const int tag,
    label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
    // Assume mpich2 with non-blocking collectives extensions. Once mpi3
    // is available this will change.
    MPI_Request request;
    scalar v = Value;
    MPIX_Ireduce
    (
        &v,
        &Value,
        1,
        MPI_SCALAR,
        MPI_SUM,
        0,              //root
        MPI_COMM_WORLD,
        &request
    );

    requestID = PstreamGlobals::outstandingRequests_.size();
    PstreamGlobals::outstandingRequests_.append(request);
#else
    // Non-blocking not yet implemented in mpi
    reduce(Value, bop, tag);
    requestID = -1;
#endif
}


mattijs's avatar
mattijs committed
Foam::label Foam::UPstream::nRequests()
{
    return PstreamGlobals::outstandingRequests_.size();
}


void Foam::UPstream::resetRequests(const label i)
{
    if (i < PstreamGlobals::outstandingRequests_.size())
    {
        PstreamGlobals::outstandingRequests_.setSize(i);
    }
}


void Foam::UPstream::waitRequests(const label start)
    if (debug)
    {
        Pout<< "UPstream::waitRequests : starting wait for "
mattijs's avatar
mattijs committed
            << PstreamGlobals::outstandingRequests_.size()-start
            << " outstanding requests starting at " << start << endl;
    if (PstreamGlobals::outstandingRequests_.size())
    {
mattijs's avatar
mattijs committed
        SubList<MPI_Request> waitRequests
        (
            PstreamGlobals::outstandingRequests_,
            PstreamGlobals::outstandingRequests_.size() - start,
            start
        );

mattijs's avatar
mattijs committed
                waitRequests.size(),
                waitRequests.begin(),
                MPI_STATUSES_IGNORE
            )
        )
        {
            FatalErrorIn
            (
mattijs's avatar
mattijs committed
                "UPstream::waitRequests()"
            )   << "MPI_Waitall returned with error" << Foam::endl;
        }

mattijs's avatar
mattijs committed
        resetRequests(start);

    if (debug)
    {
        Pout<< "UPstream::waitRequests : finished wait." << endl;
    }
void Foam::UPstream::waitRequest(const label i)
{
    if (debug)
    {
        Pout<< "UPstream::waitRequest : starting wait for request:" << i
            << endl;
    }

    if (i >= PstreamGlobals::outstandingRequests_.size())
    {
        FatalErrorIn
        (
            "UPstream::waitRequest(const label)"
        )   << "There are " << PstreamGlobals::outstandingRequests_.size()
            << " outstanding send requests and you are asking for i=" << i
            << nl
            << "Maybe you are mixing blocking/non-blocking comms?"
            << Foam::abort(FatalError);
    }

    int flag;
    if
    (
        MPI_Wait
        (
           &PstreamGlobals::outstandingRequests_[i],
            MPI_STATUS_IGNORE
        )
    )
    {
        FatalErrorIn
        (
            "UPstream::waitRequest()"
        )   << "MPI_Wait returned with error" << Foam::endl;
    }

    if (debug)
    {
        Pout<< "UPstream::waitRequest : finished wait for request:" << i
            << endl;
    }
}


mattijs's avatar
mattijs committed
bool Foam::UPstream::finishedRequest(const label i)
    if (debug)
    {
        Pout<< "UPstream::waitRequests : checking finishedRequest:" << i
    if (i >= PstreamGlobals::outstandingRequests_.size())
    {
        FatalErrorIn
        (
mattijs's avatar
mattijs committed
            "UPstream::finishedRequest(const label)"
        )   << "There are " << PstreamGlobals::outstandingRequests_.size()
            << " outstanding send requests and you are asking for i=" << i
            << nl
            << "Maybe you are mixing blocking/non-blocking comms?"
            << Foam::abort(FatalError);
    }

    int flag;
    MPI_Test
    (
       &PstreamGlobals::outstandingRequests_[i],
       &flag,
        MPI_STATUS_IGNORE
    );

    if (debug)
    {
        Pout<< "UPstream::waitRequests : finished finishedRequest:" << i
// ************************************************************************* //