Newer
Older
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
\\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
mattijs
committed
\\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
mattijs
committed
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include <mpi.h>
#include <cstring>
#include <cstdlib>
#include <csignal>
#define MPI_SCALAR MPI_FLOAT
#define MPI_SCALAR MPI_DOUBLE
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// file-scope: min value and default for mpiBufferSize
static const int minBufferSize = 20000000;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
}
{
MPI_Init(&argc, &argv);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myRank:" << myRank << endl;
"attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
// Initialise parallel structure
setParRun(numprocs);
// Normally use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with the MPI_BUFFER_SIZE env variable
// which has an int value
int bufSize = 0;
const std::string str = Foam::getEnv("MPI_BUFFER_SIZE");
if (str.empty() || !Foam::read(str.c_str(), bufSize) || bufSize <= 0)
bufSize = mpiBufferSize;
if (bufSize < minBufferSize)
{
bufSize = minBufferSize;
}
if (debug)
{
Pout<< "UPstream::init : mpi-buffer-size " << bufSize << endl;
}
MPI_Buffer_attach(new char[bufSize], bufSize);
return true;
}
if (debug)
{
Pout<< "UPstream::exit." << endl;
}
{
int size;
char* buff;
MPI_Buffer_detach(&buff, &size);
delete[] buff;
}
if (PstreamGlobals::outstandingRequests_.size())
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< "This should not happen for a normal code exit."
<< endl;
}
// Clean mpi communicators
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freePstreamCommunicator(communicator);
}
}
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
void Foam::reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag,
const label communicator
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
vector2D twoScalars(Value, scalar(Count));
reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
Value = twoScalars.x();
Count = twoScalars.y();
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator,
label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
// Assume mpich2 with non-blocking collectives extensions. Once mpi3
// is available this will change.
MPI_Request request;
scalar v = Value;
MPIX_Ireduce
(
&v,
&Value,
1,
MPI_SCALAR,
MPI_SUM,
0, //root
PstreamGlobals::MPICommunicators_[communicator],
&request
);
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
{
Pout<< "UPstream::allocateRequest for non-blocking reduce"
<< " : request:" << requestID
<< endl;
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag, communicator);
requestID = -1;
#endif
}
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
label np = nProcs(communicator);
if (sendData.size() != np || recvData.size() != np)
{
FatalErrorInFunction
<< "Size of sendData " << sendData.size()
<< " or size of recvData " << recvData.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
recvData.deepCopy(sendData);
}
else
{
if
(
MPI_Alltoall
(
Henry Weller
committed
// NOTE: const_cast is a temporary hack for
// backward-compatibility with versions of OpenMPI < 1.7.4
const_cast<label*>(sendData.begin()),
sizeof(label),
MPI_BYTE,
recvData.begin(),
sizeof(label),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoall failed for " << sendData
<< " on communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
const label index
)
{
if (index == PstreamGlobals::MPIGroups_.size())
{
// Extend storage with dummy values
MPI_Group newGroup = MPI_GROUP_NULL;
PstreamGlobals::MPIGroups_.append(newGroup);
MPI_Comm newComm = MPI_COMM_NULL;
PstreamGlobals::MPICommunicators_.append(newComm);
}
else if (index > PstreamGlobals::MPIGroups_.size())
{
FatalErrorInFunction
<< "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::exit(FatalError);
}
if (parentIndex == -1)
{
// Allocate world communicator
if (index != UPstream::worldComm)
{
FatalErrorInFunction
<< "world communicator should always be index "
<< UPstream::worldComm << Foam::exit(FatalError);
}
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
// Set the number of processes to the actual number
int numProcs;
MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
//procIDs_[index] = identity(numProcs);
procIDs_[index].setSize(numProcs);
forAll(procIDs_[index], i)
{
procIDs_[index][i] = i;
}
}
else
{
// Create new group
MPI_Group_incl
(
PstreamGlobals::MPIGroups_[parentIndex],
procIDs_[index].size(),
procIDs_[index].begin(),
&PstreamGlobals::MPIGroups_[index]
);
// Create new communicator
MPI_Comm_create
(
PstreamGlobals::MPICommunicators_[parentIndex],
PstreamGlobals::MPIGroups_[index],
&PstreamGlobals::MPICommunicators_[index]
);
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
{
myProcNo_[index] = -1;
}
else
{
mattijs
committed
if
mattijs
committed
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
)
)
{
FatalErrorInFunction
<< "Problem :"
mattijs
committed
<< " when allocating communicator at " << index
<< " from ranks " << procIDs_[index]
<< " of parent " << parentIndex
<< " cannot find my own rank"
<< Foam::exit(FatalError);
}
}
}
}
void Foam::UPstream::freePstreamCommunicator(const label communicator)
{
if (communicator != UPstream::worldComm)
{
if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
{
// Free communicator. Sets communicator to MPI_COMM_NULL
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
}
if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
{
// Free greoup. Sets group to MPI_GROUP_NULL
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
}
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
MPI_STATUSES_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Waitall returned with error" << Foam::endl;
if (debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
}
void Foam::UPstream::waitRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorInFunction
<< "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
if
(
MPI_Wait
(
&PstreamGlobals::outstandingRequests_[i],
MPI_STATUS_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Wait returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:" << i
<< endl;
}
}
Pout<< "UPstream::finishedRequest : checking request:" << i
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorInFunction
<< "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
Pout<< "UPstream::finishedRequest : finished request:" << i
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
int Foam::UPstream::allocateTag(const char* s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
int Foam::UPstream::allocateTag(const word& s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
void Foam::UPstream::freeTag(const char* s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
void Foam::UPstream::freeTag(const word& s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
// ************************************************************************* //