Newer
Older
/*---------------------------------------------------------------------------*\
========= |
\\ / F ield | OpenFOAM: The Open Source CFD Toolbox
\\ / O peration |
Henry Weller
committed
\\ / A nd | Copyright (C) 2011-2017 OpenFOAM Foundation
mattijs
committed
\\/ M anipulation | Copyright (C) 2016 OpenCFD Ltd.
-------------------------------------------------------------------------------
License
This file is part of OpenFOAM.
OpenFOAM is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
\*---------------------------------------------------------------------------*/
mattijs
committed
#include "Pstream.H"
#include "PstreamReduceOps.H"
#include "OSspecific.H"
#include "collatedFileOperation.H"
#include <mpi.h>
#include <cstring>
#include <cstdlib>
#include <csignal>
#define MPI_SCALAR MPI_FLOAT
#define MPI_SCALAR MPI_DOUBLE
// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
// file-scope: min value and default for mpiBufferSize
static const int minBufferSize = 20000000;
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
// NOTE:
// valid parallel options vary between implementations, but flag common ones.
// if they are not removed by MPI_Init(), the subsequent argument processing
// will notice that they are wrong
void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
{
validParOptions.insert("np", "");
validParOptions.insert("p4pg", "PI file");
validParOptions.insert("p4wd", "directory");
validParOptions.insert("p4amslave", "");
validParOptions.insert("p4yourname", "hostname");
validParOptions.insert("machinefile", "machine file");
}
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
bool Foam::UPstream::initNull()
{
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init" << endl
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
// Already initialized - nothing to do
return true;
}
MPI_Init_thread
(
nullptr, // argc
nullptr, // argv
MPI_THREAD_SINGLE,
&flag // provided_thread_support
);
return true;
}
bool Foam::UPstream::init(int& argc, char**& argv, const bool needsThread)
int flag = 0;
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - this is an error
FatalErrorInFunction
<< "MPI was already finalized - cannot perform MPI_Init" << endl
<< Foam::abort(FatalError);
return false;
}
MPI_Initialized(&flag);
if (flag)
{
// Already initialized - issue warning and skip the rest
WarningInFunction
<< "MPI was already initialized - cannot perform MPI_Init" << nl
<< "This could indicate an application programming error!" << endl;
return true;
}
//MPI_Init(&argc, &argv);
int provided_thread_support;
MPI_Init_thread
(
&argc,
&argv,
(
needsThread
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE
),
&provided_thread_support
);
int numprocs;
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
if (debug)
{
Pout<< "UPstream::init : initialised with numProcs:" << numprocs
<< " myRank:" << myRank << endl;
if (numprocs <= 1)
{
<< "attempt to run parallel on 1 processor"
<< Foam::abort(FatalError);
}
// Initialise parallel structure
Henry Weller
committed
setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
// Normally use UPstream::mpiBufferSize (optimisationSwitch),
// but allow override with the MPI_BUFFER_SIZE env variable
// which has an int value
int bufSize = 0;
const std::string str = Foam::getEnv("MPI_BUFFER_SIZE");
if (str.empty() || !Foam::read(str, bufSize) || bufSize <= 0)
bufSize = mpiBufferSize;
if (bufSize < minBufferSize)
{
bufSize = minBufferSize;
}
if (debug)
{
Pout<< "UPstream::init : mpi-buffer-size " << bufSize << endl;
}
// TBD: could add error handling here.
// Delete allocated and leave if we fail to attach the buffer?
MPI_Buffer_attach(new char[bufSize], bufSize);
return true;
}
if (debug)
{
Pout<< "UPstream::exit." << endl;
}
int flag = 0;
MPI_Initialized(&flag);
if (!flag)
{
// Not initialized - just exit
::exit(errnum);
return;
}
MPI_Finalized(&flag);
if (flag)
{
// Already finalized - warn and exit
WarningInFunction
<< "MPI was already finalized (perhaps by a connected program)"
<< endl;
::exit(1);
// Some MPI notes suggest that the return code is MPI_SUCCESS when
// no buffer is attached.
// Be extra careful and require a non-zero size as well.
int bufSize = 0;
char* buf = nullptr;
flag = MPI_Buffer_detach(&buf, &bufSize);
if (MPI_SUCCESS == flag && bufSize)
{
delete[] buf;
}
if (PstreamGlobals::outstandingRequests_.size())
{
label n = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.clear();
<< "There are still " << n << " outstanding MPI_Requests." << endl
<< "This means that your code exited before doing a"
<< "This should not happen for a normal code exit."
<< endl;
}
// Clean mpi communicators
forAll(myProcNo_, communicator)
{
if (myProcNo_[communicator] != -1)
{
freePstreamCommunicator(communicator);
}
}
if (errnum == 0)
{
MPI_Finalize();
::exit(errnum);
}
else
{
MPI_Abort(MPI_COMM_WORLD, errnum);
}
}
{
MPI_Abort(MPI_COMM_WORLD, 1);
}
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::reduce
(
scalar& Value,
const minOp<scalar>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
void Foam::reduce
(
vector2D& Value,
const sumOp<vector2D>& bop,
const int tag,
const label communicator
)
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
void Foam::sumReduce
(
scalar& Value,
label& Count,
const int tag,
const label communicator
if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
{
Pout<< "** reducing:" << Value << " with comm:" << communicator
<< " warnComm:" << UPstream::warnComm
<< endl;
error::printStack(Pout);
}
vector2D twoScalars(Value, scalar(Count));
reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
Value = twoScalars.x();
Count = twoScalars.y();
void Foam::reduce
(
scalar& Value,
const sumOp<scalar>& bop,
const int tag,
const label communicator,
label& requestID
)
{
#ifdef MPIX_COMM_TYPE_SHARED
// Assume mpich2 with non-blocking collectives extensions. Once mpi3
// is available this will change.
MPI_Request request;
scalar v = Value;
MPIX_Ireduce
(
&v,
&Value,
1,
MPI_SCALAR,
MPI_SUM,
0, //root
PstreamGlobals::MPICommunicators_[communicator],
&request
);
requestID = PstreamGlobals::outstandingRequests_.size();
PstreamGlobals::outstandingRequests_.append(request);
{
Pout<< "UPstream::allocateRequest for non-blocking reduce"
<< " : request:" << requestID
<< endl;
#else
// Non-blocking not yet implemented in mpi
reduce(Value, bop, tag, communicator);
requestID = -1;
#endif
}
void Foam::UPstream::allToAll
(
const labelUList& sendData,
labelUList& recvData,
const label communicator
)
{
label np = nProcs(communicator);
if (sendData.size() != np || recvData.size() != np)
{
FatalErrorInFunction
<< "Size of sendData " << sendData.size()
<< " or size of recvData " << recvData.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
recvData.deepCopy(sendData);
}
else
{
if
(
MPI_Alltoall
(
Henry Weller
committed
// NOTE: const_cast is a temporary hack for
// backward-compatibility with versions of OpenMPI < 1.7.4
const_cast<label*>(sendData.begin()),
sizeof(label),
MPI_BYTE,
recvData.begin(),
sizeof(label),
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoall failed for " << sendData
<< " on communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
Henry Weller
committed
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
void Foam::UPstream::allToAll
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
label np = nProcs(communicator);
if
(
sendSizes.size() != np
|| sendOffsets.size() != np
|| recvSizes.size() != np
|| recvOffsets.size() != np
)
{
FatalErrorInFunction
<< "Size of sendSize " << sendSizes.size()
<< ", sendOffsets " << sendOffsets.size()
<< ", recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
if (recvSizes[0] != sendSizes[0])
{
FatalErrorInFunction
<< "Bytes to send " << sendSizes[0]
<< " does not equal bytes to receive " << recvSizes[0]
<< Foam::abort(FatalError);
}
memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
}
else
{
if
(
MPI_Alltoallv
(
const_cast<char*>(sendData),
const_cast<int*>(sendSizes.begin()),
const_cast<int*>(sendOffsets.begin()),
Henry Weller
committed
MPI_BYTE,
recvData,
const_cast<int*>(recvSizes.begin()),
const_cast<int*>(recvOffsets.begin()),
Henry Weller
committed
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
MPI_BYTE,
PstreamGlobals::MPICommunicators_[communicator]
)
)
{
FatalErrorInFunction
<< "MPI_Alltoallv failed for sendSizes " << sendSizes
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::gather
(
const char* sendData,
int sendSize,
char* recvData,
const UList<int>& recvSizes,
const UList<int>& recvOffsets,
const label communicator
)
{
label np = nProcs(communicator);
if
(
UPstream::master(communicator)
&& (recvSizes.size() != np || recvOffsets.size() < np)
)
{
// Note: allow recvOffsets to be e.g. 1 larger than np so we
// can easily loop over the result
FatalErrorInFunction
<< "Size of recvSizes " << recvSizes.size()
<< " or recvOffsets " << recvOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
memmove(recvData, sendData, sendSize);
}
else
{
if
(
MPI_Gatherv
(
const_cast<char*>(sendData),
Henry Weller
committed
sendSize,
MPI_BYTE,
recvData,
const_cast<int*>(recvSizes.begin()),
const_cast<int*>(recvOffsets.begin()),
Henry Weller
committed
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Gatherv failed for sendSize " << sendSize
<< " recvSizes " << recvSizes
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::scatter
(
const char* sendData,
const UList<int>& sendSizes,
const UList<int>& sendOffsets,
char* recvData,
int recvSize,
const label communicator
)
{
label np = nProcs(communicator);
if
(
UPstream::master(communicator)
&& (sendSizes.size() != np || sendOffsets.size() != np)
)
{
FatalErrorInFunction
<< "Size of sendSizes " << sendSizes.size()
<< " or sendOffsets " << sendOffsets.size()
<< " is not equal to the number of processors in the domain "
<< np
<< Foam::abort(FatalError);
}
if (!UPstream::parRun())
{
memmove(recvData, sendData, recvSize);
}
else
{
if
(
MPI_Scatterv
(
const_cast<char*>(sendData),
const_cast<int*>(sendSizes.begin()),
const_cast<int*>(sendOffsets.begin()),
Henry Weller
committed
MPI_BYTE,
recvData,
recvSize,
MPI_BYTE,
0,
MPI_Comm(PstreamGlobals::MPICommunicators_[communicator])
)
)
{
FatalErrorInFunction
<< "MPI_Scatterv failed for sendSizes " << sendSizes
<< " sendOffsets " << sendOffsets
<< " communicator " << communicator
<< Foam::abort(FatalError);
}
}
}
void Foam::UPstream::allocatePstreamCommunicator
(
const label parentIndex,
const label index
)
{
if (index == PstreamGlobals::MPIGroups_.size())
{
// Extend storage with dummy values
MPI_Group newGroup = MPI_GROUP_NULL;
PstreamGlobals::MPIGroups_.append(newGroup);
MPI_Comm newComm = MPI_COMM_NULL;
PstreamGlobals::MPICommunicators_.append(newComm);
}
else if (index > PstreamGlobals::MPIGroups_.size())
{
FatalErrorInFunction
<< "PstreamGlobals out of sync with UPstream data. Problem."
<< Foam::exit(FatalError);
}
if (parentIndex == -1)
{
// Allocate world communicator
if (index != UPstream::worldComm)
{
FatalErrorInFunction
<< "world communicator should always be index "
<< UPstream::worldComm << Foam::exit(FatalError);
}
PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
);
// Set the number of processes to the actual number
int numProcs;
MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
//procIDs_[index] = identity(numProcs);
procIDs_[index].setSize(numProcs);
forAll(procIDs_[index], i)
{
procIDs_[index][i] = i;
}
}
else
{
// Create new group
MPI_Group_incl
(
PstreamGlobals::MPIGroups_[parentIndex],
procIDs_[index].size(),
procIDs_[index].begin(),
&PstreamGlobals::MPIGroups_[index]
);
// Create new communicator
MPI_Comm_create
(
PstreamGlobals::MPICommunicators_[parentIndex],
PstreamGlobals::MPIGroups_[index],
&PstreamGlobals::MPICommunicators_[index]
);
if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
{
myProcNo_[index] = -1;
}
else
{
mattijs
committed
if
mattijs
committed
MPI_Comm_rank
(
PstreamGlobals::MPICommunicators_[index],
&myProcNo_[index]
)
)
{
FatalErrorInFunction
<< "Problem :"
mattijs
committed
<< " when allocating communicator at " << index
<< " from ranks " << procIDs_[index]
<< " of parent " << parentIndex
<< " cannot find my own rank"
<< Foam::exit(FatalError);
}
}
}
}
void Foam::UPstream::freePstreamCommunicator(const label communicator)
{
if (communicator != UPstream::worldComm)
{
if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
{
// Free communicator. Sets communicator to MPI_COMM_NULL
MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
}
if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
{
// Free greoup. Sets group to MPI_GROUP_NULL
MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
}
Foam::label Foam::UPstream::nRequests()
{
return PstreamGlobals::outstandingRequests_.size();
}
void Foam::UPstream::resetRequests(const label i)
{
if (i < PstreamGlobals::outstandingRequests_.size())
{
PstreamGlobals::outstandingRequests_.setSize(i);
}
}
void Foam::UPstream::waitRequests(const label start)
if (debug)
{
Pout<< "UPstream::waitRequests : starting wait for "
<< PstreamGlobals::outstandingRequests_.size()-start
<< " outstanding requests starting at " << start << endl;
if (PstreamGlobals::outstandingRequests_.size())
{
SubList<MPI_Request> waitRequests
(
PstreamGlobals::outstandingRequests_,
PstreamGlobals::outstandingRequests_.size() - start,
start
);
MPI_STATUSES_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Waitall returned with error" << Foam::endl;
if (debug)
{
Pout<< "UPstream::waitRequests : finished wait." << endl;
}
void Foam::UPstream::waitRequest(const label i)
{
if (debug)
{
Pout<< "UPstream::waitRequest : starting wait for request:" << i
<< endl;
}
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorInFunction
<< "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
if
(
MPI_Wait
(
&PstreamGlobals::outstandingRequests_[i],
MPI_STATUS_IGNORE
)
)
{
FatalErrorInFunction
<< "MPI_Wait returned with error" << Foam::endl;
}
if (debug)
{
Pout<< "UPstream::waitRequest : finished wait for request:" << i
<< endl;
}
}
Pout<< "UPstream::finishedRequest : checking request:" << i
if (i >= PstreamGlobals::outstandingRequests_.size())
{
FatalErrorInFunction
<< "There are " << PstreamGlobals::outstandingRequests_.size()
<< " outstanding send requests and you are asking for i=" << i
<< nl
<< "Maybe you are mixing blocking/non-blocking comms?"
<< Foam::abort(FatalError);
}
int flag;
MPI_Test
(
&PstreamGlobals::outstandingRequests_[i],
&flag,
MPI_STATUS_IGNORE
);
Pout<< "UPstream::finishedRequest : finished request:" << i
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
int Foam::UPstream::allocateTag(const char* s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
int Foam::UPstream::allocateTag(const word& s)
{
int tag;
if (PstreamGlobals::freedTags_.size())
{
tag = PstreamGlobals::freedTags_.remove();
}
else
{
tag = PstreamGlobals::nTags_++;
}
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::allocateTag " << s
<< " : tag:" << tag
<< endl;
}
return tag;
}
void Foam::UPstream::freeTag(const char* s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
void Foam::UPstream::freeTag(const word& s, const int tag)
{
if (debug)
{
//if (UPstream::lateBlocking > 0)
//{
// string& poutp = Pout.prefix();
// poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
// Perr.prefix() = Pout.prefix();
//}
Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
}
PstreamGlobals::freedTags_.append(tag);
}
// ************************************************************************* //