From ea8d2901917cf8b4df23f030c79af416b39cc3ee Mon Sep 17 00:00:00 2001 From: mattijs <mattijs> Date: Mon, 4 Feb 2013 10:17:37 +0000 Subject: [PATCH] ENH: communicators: initial version - extended Pstream API --- .../Test-parallel-communicators.C | 61 +++-- .../extrude/extrudeMesh/extrudeMesh.C | 1 + .../decomposePar/domainDecomposition.C | 2 + etc/bashrc | 2 +- etc/cshrc | 2 +- .../IOobjects/IOdictionary/IOdictionaryIO.C | 5 +- src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C | 2 + src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H | 1 + src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C | 3 +- src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H | 1 + src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H | 65 +++-- .../db/IOstreams/Pstreams/PstreamBuffers.C | 16 +- .../db/IOstreams/Pstreams/PstreamBuffers.H | 3 + .../Pstreams/PstreamCombineReduceOps.H | 42 ++- .../db/IOstreams/Pstreams/PstreamReduceOps.H | 65 +++-- .../db/IOstreams/Pstreams/UIPstream.C | 4 +- .../db/IOstreams/Pstreams/UIPstream.H | 6 +- .../db/IOstreams/Pstreams/UOPstream.C | 9 +- .../db/IOstreams/Pstreams/UOPstream.H | 6 +- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C | 187 +++++++++++-- src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H | 175 +++++++++--- .../IOstreams/Pstreams/combineGatherScatter.C | 250 ++++++++++++++---- src/OpenFOAM/db/IOstreams/Pstreams/exchange.C | 27 +- .../db/IOstreams/Pstreams/gatherScatter.C | 68 +++-- .../db/IOstreams/Pstreams/gatherScatterList.C | 78 ++++-- src/OpenFOAM/db/regIOobject/regIOobjectRead.C | 5 +- .../LUscalarMatrix/procLduInterface.C | 10 +- .../LUscalarMatrix/procLduInterface.H | 3 +- .../lduInterface/processorLduInterface.H | 7 +- .../processorLduInterfaceTemplates.C | 24 +- .../processorLduInterfaceField.H | 3 + .../processorGAMGInterfaceField.H | 6 + .../processorGAMGInterface.H | 8 +- .../ProcessorTopology/ProcessorTopology.C | 16 +- .../ProcessorTopology/ProcessorTopology.H | 4 +- .../polyMesh/globalMeshData/globalMeshData.C | 2 +- .../mapPolyMesh/mapDistribute/mapDistribute.C | 6 +- .../constraint/processor/processorPolyPatch.C | 14 + .../constraint/processor/processorPolyPatch.H | 11 + .../processorCyclicPolyPatch.C | 2 + .../processorCyclicPolyPatch.H | 1 + src/Pstream/dummy/UIPread.C | 16 +- src/Pstream/dummy/UOPwrite.C | 6 +- src/Pstream/dummy/UPstream.C | 27 +- src/Pstream/mpi/PstreamGlobals.C | 31 ++- src/Pstream/mpi/PstreamGlobals.H | 11 +- src/Pstream/mpi/UIPread.C | 27 +- src/Pstream/mpi/UOPwrite.C | 21 +- src/Pstream/mpi/UPstream.C | 201 ++++++++++++-- src/Pstream/mpi/allReduce.H | 3 +- src/Pstream/mpi/allReduceTemplates.C | 39 +-- .../fvMeshDistribute/fvMeshDistribute.C | 2 + .../processor/processorFvPatchField.C | 26 +- .../processor/processorFvPatchField.H | 6 + .../processor/processorFvPatchScalarField.C | 10 +- .../constraint/processor/processorFvPatch.H | 10 +- 56 files changed, 1247 insertions(+), 392 deletions(-) diff --git a/applications/test/parallel-communicators/Test-parallel-communicators.C b/applications/test/parallel-communicators/Test-parallel-communicators.C index e5c4b996482..2b9277fb6e5 100644 --- a/applications/test/parallel-communicators/Test-parallel-communicators.C +++ b/applications/test/parallel-communicators/Test-parallel-communicators.C @@ -150,46 +150,49 @@ int main(int argc, char *argv[]) top.append(i); } - Pout<< "bottom:" << bottom << endl; - Pout<< "top:" << top << endl; + //Pout<< "bottom:" << bottom << endl; + Pout<< "top :" << top << endl; scalar localValue = 111*UPstream::myProcNo(UPstream::worldComm); - Pout<< "localValue:" << localValue << endl; + Pout<< "localValue :" << localValue << endl; - if (Pstream::myProcNo(UPstream::worldComm) < n/2) - { - label comm = Pstream::allocateCommunicator - ( - UPstream::worldComm, - bottom - ); - Pout<< "allocated bottom comm:" << comm << endl; - Pout<< "comm myproc :" << Pstream::myProcNo(comm) - << endl; - scalar sum = sumReduce(comm, localValue); - Pout<< "sum :" << sum << endl; + label comm = Pstream::allocateCommunicator + ( + UPstream::worldComm, + top + ); - Pstream::freeCommunicator(comm); - } - else + Pout<< "allocated comm :" << comm << endl; + Pout<< "comm myproc :" << Pstream::myProcNo(comm) + << endl; + + + if (Pstream::myProcNo(comm) != -1) { - label comm = Pstream::allocateCommunicator + //scalar sum = sumReduce(comm, localValue); + //scalar sum = localValue; + //reduce + //( + // UPstream::treeCommunication(comm), + // sum, + // sumOp<scalar>(), + // Pstream::msgType(), + // comm + //); + scalar sum = returnReduce ( - UPstream::worldComm, - top + localValue, + sumOp<scalar>(), + Pstream::msgType(), + comm ); - - Pout<< "allocated top comm:" << comm << endl; - Pout<< "comm myproc :" << Pstream::myProcNo(comm) - << endl; - scalar sum = sumReduce(comm, localValue); - Pout<< "sum :" << sum << endl; - - Pstream::freeCommunicator(comm); + Pout<< "sum :" << sum << endl; } + Pstream::freeCommunicator(comm); + Pout<< "End\n" << endl; diff --git a/applications/utilities/mesh/generation/extrude/extrudeMesh/extrudeMesh.C b/applications/utilities/mesh/generation/extrude/extrudeMesh/extrudeMesh.C index bafe3055046..6e6c02546da 100644 --- a/applications/utilities/mesh/generation/extrude/extrudeMesh/extrudeMesh.C +++ b/applications/utilities/mesh/generation/extrude/extrudeMesh/extrudeMesh.C @@ -496,6 +496,7 @@ int main(int argc, char *argv[]) mesh.nFaces(), // start patchI, // index mesh.boundaryMesh(),// polyBoundaryMesh + Pstream::worldComm, // communicator Pstream::myProcNo(),// myProcNo nbrProcI // neighbProcNo ) diff --git a/applications/utilities/parallelProcessing/decomposePar/domainDecomposition.C b/applications/utilities/parallelProcessing/decomposePar/domainDecomposition.C index f5dabf60770..04e1c18685b 100644 --- a/applications/utilities/parallelProcessing/decomposePar/domainDecomposition.C +++ b/applications/utilities/parallelProcessing/decomposePar/domainDecomposition.C @@ -448,6 +448,7 @@ bool Foam::domainDecomposition::writeDecomposition() curStart, nPatches, procMesh.boundaryMesh(), + Pstream::worldComm, procI, curNeighbourProcessors[procPatchI] ); @@ -475,6 +476,7 @@ bool Foam::domainDecomposition::writeDecomposition() curStart, nPatches, procMesh.boundaryMesh(), + Pstream::worldComm, procI, curNeighbourProcessors[procPatchI], referPatch, diff --git a/etc/bashrc b/etc/bashrc index f077631ac89..79cf4f47d89 100644 --- a/etc/bashrc +++ b/etc/bashrc @@ -32,7 +32,7 @@ #------------------------------------------------------------------------------ export WM_PROJECT=OpenFOAM -export WM_PROJECT_VERSION=dev +export WM_PROJECT_VERSION=dev.procAgglom ################################################################################ # USER EDITABLE PART: Changes made here may be lost with the next upgrade diff --git a/etc/cshrc b/etc/cshrc index 6b5eb166324..6453413b385 100644 --- a/etc/cshrc +++ b/etc/cshrc @@ -31,7 +31,7 @@ #------------------------------------------------------------------------------ setenv WM_PROJECT OpenFOAM -setenv WM_PROJECT_VERSION dev +setenv WM_PROJECT_VERSION dev.procAgglom ################################################################################ # USER EDITABLE PART: Changes made here may be lost with the next upgrade diff --git a/src/OpenFOAM/db/IOobjects/IOdictionary/IOdictionaryIO.C b/src/OpenFOAM/db/IOobjects/IOdictionary/IOdictionaryIO.C index e620b3b9423..39dd1093669 100644 --- a/src/OpenFOAM/db/IOobjects/IOdictionary/IOdictionaryIO.C +++ b/src/OpenFOAM/db/IOobjects/IOdictionary/IOdictionaryIO.C @@ -78,9 +78,10 @@ void Foam::IOdictionary::readFile(const bool masterOnly) ( comms, const_cast<word&>(headerClassName()), - Pstream::msgType() + Pstream::msgType(), + Pstream::worldComm ); - Pstream::scatter(comms, note(), Pstream::msgType()); + Pstream::scatter(comms, note(), Pstream::msgType(), Pstream::worldComm); // Get my communication order const Pstream::commsStruct& myComm = comms[Pstream::myProcNo()]; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C index 1431585bfd3..6871c3ff654 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.C @@ -33,6 +33,7 @@ Foam::IPstream::IPstream const int fromProcNo, const label bufSize, const int tag, + const label comm, streamFormat format, versionNumber version ) @@ -45,6 +46,7 @@ Foam::IPstream::IPstream buf_, externalBufPosition_, tag, // tag + comm, false, // do not clear buf_ if at end format, version diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H index eb236a0c782..3a0047df1a2 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/IPstream.H @@ -69,6 +69,7 @@ public: const int fromProcNo, const label bufSize = 0, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, streamFormat format=BINARY, versionNumber version=currentVersion ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C index 233fd10f6ed..a9fccf4f1b0 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.C @@ -33,12 +33,13 @@ Foam::OPstream::OPstream const int toProcNo, const label bufSize, const int tag, + const label comm, streamFormat format, versionNumber version ) : Pstream(commsType, bufSize), - UOPstream(commsType, toProcNo, buf_, tag, true, format, version) + UOPstream(commsType, toProcNo, buf_, tag, comm, true, format, version) {} diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H index edc8dce4889..2abff736b49 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/OPstream.H @@ -66,6 +66,7 @@ public: const int toProcNo, const label bufSize = 0, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, streamFormat format=BINARY, versionNumber version=currentVersion ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 9f39306ccce..0fcd5465827 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -98,7 +98,8 @@ public: const List<commsStruct>& comms, T& Value, const BinaryOp& bop, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -107,7 +108,8 @@ public: ( T& Value, const BinaryOp& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ); //- Scatter data. Distribute without modification. Reverse of gather @@ -116,12 +118,18 @@ public: ( const List<commsStruct>& comms, T& Value, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication template <class T> - static void scatter(T& Value, const int tag = Pstream::msgType()); + static void scatter + ( + T& Value, + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm + ); // Combine variants. Inplace combine values from processors. @@ -133,7 +141,8 @@ public: const List<commsStruct>& comms, T& Value, const CombineOp& cop, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -142,7 +151,8 @@ public: ( T& Value, const CombineOp& cop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ); //- Scatter data. Reverse of combineGather @@ -151,7 +161,8 @@ public: ( const List<commsStruct>& comms, T& Value, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -159,7 +170,8 @@ public: static void combineScatter ( T& Value, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ); // Combine variants working on whole List at a time. @@ -170,7 +182,8 @@ public: const List<commsStruct>& comms, List<T>& Value, const CombineOp& cop, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -179,7 +192,8 @@ public: ( List<T>& Value, const CombineOp& cop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ); //- Scatter data. Reverse of combineGather @@ -188,7 +202,8 @@ public: ( const List<commsStruct>& comms, List<T>& Value, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -196,7 +211,8 @@ public: static void listCombineScatter ( List<T>& Value, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ); // Combine variants working on whole map at a time. Container needs to @@ -208,7 +224,8 @@ public: const List<commsStruct>& comms, Container& Values, const CombineOp& cop, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -217,7 +234,8 @@ public: ( Container& Values, const CombineOp& cop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); //- Scatter data. Reverse of combineGather @@ -226,7 +244,8 @@ public: ( const List<commsStruct>& comms, Container& Values, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -234,7 +253,8 @@ public: static void mapCombineScatter ( Container& Values, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); @@ -249,7 +269,8 @@ public: ( const List<commsStruct>& comms, List<T>& Values, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -257,7 +278,8 @@ public: static void gatherList ( List<T>& Values, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); //- Scatter data. Reverse of gatherList @@ -266,7 +288,8 @@ public: ( const List<commsStruct>& comms, List<T>& Values, - const int tag + const int tag, + const label comm ); //- Like above but switches between linear/tree communication @@ -274,7 +297,8 @@ public: static void scatterList ( List<T>& Values, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); @@ -291,6 +315,7 @@ public: List<Container >&, labelListList& sizes, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, const bool block = true ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C index 24eec63c352..cbb62ae55c5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.C @@ -40,17 +40,19 @@ Foam::PstreamBuffers::PstreamBuffers ( const UPstream::commsTypes commsType, const int tag, + const label comm, IOstream::streamFormat format, IOstream::versionNumber version ) : commsType_(commsType), tag_(tag), + comm_(comm), format_(format), version_(version), - sendBuf_(UPstream::nProcs()), - recvBuf_(UPstream::nProcs()), - recvBufPos_(UPstream::nProcs(), 0), + sendBuf_(UPstream::nProcs(comm)), + recvBuf_(UPstream::nProcs(comm)), + recvBufPos_(UPstream::nProcs(comm), 0), finishedSendsCalled_(false) {} @@ -90,6 +92,7 @@ void Foam::PstreamBuffers::finishedSends(const bool block) recvBuf_, sizes, tag_, + comm_, block ); } @@ -108,6 +111,7 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block) recvBuf_, sizes, tag_, + comm_, block ); } @@ -123,9 +127,9 @@ void Foam::PstreamBuffers::finishedSends(labelListList& sizes, const bool block) // Note: possible only if using different tag from write started // by ~UOPstream. Needs some work. - //sizes.setSize(UPstream::nProcs()); - //labelList& nsTransPs = sizes[UPstream::myProcNo()]; - //nsTransPs.setSize(UPstream::nProcs()); + //sizes.setSize(UPstream::nProcs(comm)); + //labelList& nsTransPs = sizes[UPstream::myProcNo(comm)]; + //nsTransPs.setSize(UPstream::nProcs(comm)); // //forAll(sendBuf_, procI) //{ diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H index 0b88a644f0f..da6cb21acc3 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBuffers.H @@ -95,6 +95,8 @@ class PstreamBuffers const int tag_; + const label comm_; + const IOstream::streamFormat format_; const IOstream::versionNumber version_; @@ -127,6 +129,7 @@ public: ( const UPstream::commsTypes commsType, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, IOstream::streamFormat format=IOstream::BINARY, IOstream::versionNumber version=IOstream::currentVersion ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H index c657e0a140f..4a2985ef3ad 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H @@ -53,11 +53,12 @@ void combineReduce const List<UPstream::commsStruct>& comms, T& Value, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { - Pstream::combineGather(comms, Value, cop, tag); - Pstream::combineScatter(comms, Value, tag); + Pstream::combineGather(comms, Value, cop, tag, comm); + Pstream::combineScatter(comms, Value, tag, comm); } @@ -66,24 +67,45 @@ void combineReduce ( T& Value, const CombineOp& cop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = Pstream::worldComm ) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { Pstream::combineGather ( - UPstream::linearCommunication(), + UPstream::linearCommunication(comm), Value, cop, - tag + tag, + comm + ); + Pstream::combineScatter + ( + UPstream::linearCommunication(comm), + Value, + tag, + comm ); - Pstream::combineScatter(UPstream::linearCommunication(), Value, tag); } else { - Pstream::combineGather(UPstream::treeCommunication(), Value, cop, tag); - Pstream::combineScatter(UPstream::treeCommunication(), Value, tag); + Pstream::combineGather + ( + UPstream::treeCommunication(comm), + Value, + cop, + tag, + comm + ); + Pstream::combineScatter + ( + UPstream::treeCommunication(comm), + Value, + tag, + comm + ); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index 47c7c51acdd..bd8f5ab1dc5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -44,11 +44,12 @@ void reduce const List<UPstream::commsStruct>& comms, T& Value, const BinaryOp& bop, - const int tag + const int tag, + const label comm ) { - Pstream::gather(comms, Value, bop, tag); - Pstream::scatter(comms, Value, tag); + Pstream::gather(comms, Value, bop, tag, comm); + Pstream::scatter(comms, Value, tag, comm); } @@ -58,16 +59,17 @@ void reduce ( T& Value, const BinaryOp& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - reduce(UPstream::linearCommunication(), Value, bop, tag); + reduce(UPstream::linearCommunication(comm), Value, bop, tag, comm); } else { - reduce(UPstream::treeCommunication(), Value, bop, tag); + reduce(UPstream::treeCommunication(comm), Value, bop, tag, comm); } } @@ -78,18 +80,33 @@ T returnReduce ( const T& Value, const BinaryOp& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ) { T WorkValue(Value); - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - reduce(UPstream::linearCommunication(), WorkValue, bop, tag); + reduce + ( + UPstream::linearCommunication(comm), + WorkValue, + bop, + tag, + comm + ); } else { - reduce(UPstream::treeCommunication(), WorkValue, bop, tag); + reduce + ( + UPstream::treeCommunication(comm), + WorkValue, + bop, + tag, + comm + ); } return WorkValue; @@ -102,11 +119,12 @@ void sumReduce ( T& Value, label& Count, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ) { - reduce(Value, sumOp<T>(), tag); - reduce(Count, sumOp<label>(), tag); + reduce(Value, sumOp<T>(), tag, comm); + reduce(Count, sumOp<label>(), tag, comm); } @@ -117,10 +135,14 @@ void reduce T& Value, const BinaryOp& bop, const int tag, + const label comm, label& request ) { - notImplemented("reduce(T&, const BinaryOp&, const int, label&"); + notImplemented + ( + "reduce(T&, const BinaryOp&, const int, const label, label&" + ); } @@ -129,28 +151,32 @@ void reduce ( scalar& Value, const sumOp<scalar>& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); void reduce ( scalar& Value, const minOp<scalar>& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); void reduce ( vector2D& Value, const sumOp<vector2D>& bop, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); void sumReduce ( scalar& Value, label& Count, - const int tag = Pstream::msgType() + const int tag = Pstream::msgType(), + const label comm = UPstream::worldComm ); void reduce @@ -158,6 +184,7 @@ void reduce scalar& Value, const sumOp<scalar>& bop, const int tag, + const label comm, label& request ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C index 2167d15b6de..4e853274d8b 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.C @@ -330,7 +330,9 @@ Foam::Istream& Foam::UIPstream::rewind() void Foam::UIPstream::print(Ostream& os) const { os << "Reading from processor " << fromProcNo_ - << " to processor " << myProcNo() << Foam::endl; + << " to processor " << myProcNo << " using communicator " << comm_ + << " and tag " << tag_ + << Foam::endl; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H index 7b4a7bfada8..3e76c095525 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UIPstream.H @@ -66,6 +66,8 @@ class UIPstream const int tag_; + const label comm_; + const bool clearAtEnd_; int messageSize_; @@ -97,6 +99,7 @@ public: DynamicList<char>& externalBuf, label& externalBufPosition, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, const bool clearAtEnd = false, // destroy externalBuf if at end streamFormat format=BINARY, versionNumber version=currentVersion @@ -131,7 +134,8 @@ public: const int fromProcNo, char* buf, const std::streamsize bufSize, - const int tag = UPstream::msgType() + const int tag = UPstream::msgType(), + const label communicator = 0 ); //- Return next token from stream diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C index e01e8de01bd..2e6cdb0ec18 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.C @@ -91,6 +91,7 @@ Foam::UOPstream::UOPstream const int toProcNo, DynamicList<char>& sendBuf, const int tag, + const label comm, const bool sendAtDestruct, streamFormat format, versionNumber version @@ -101,6 +102,7 @@ Foam::UOPstream::UOPstream toProcNo_(toProcNo), sendBuf_(sendBuf), tag_(tag), + comm_(comm), sendAtDestruct_(sendAtDestruct) { setOpened(); @@ -115,6 +117,7 @@ Foam::UOPstream::UOPstream(const int toProcNo, PstreamBuffers& buffers) toProcNo_(toProcNo), sendBuf_(buffers.sendBuf_[toProcNo]), tag_(buffers.tag_), + comm_(buffers.comm_), sendAtDestruct_(buffers.commsType_ != UPstream::nonBlocking) { setOpened(); @@ -136,7 +139,8 @@ Foam::UOPstream::~UOPstream() toProcNo_, sendBuf_.begin(), sendBuf_.size(), - tag_ + tag_, + comm_ ) ) { @@ -287,7 +291,8 @@ Foam::Ostream& Foam::UOPstream::write(const char* data, std::streamsize count) void Foam::UOPstream::print(Ostream& os) const { os << "Writing from processor " << toProcNo_ - << " to processor " << myProcNo() << Foam::endl; + << " to processor " << myProcNo() << " in communicator " << comm_ + << " and tag " << tag_ << Foam::endl; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H index 5cbde9aee5b..4f2dcc9c6bf 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UOPstream.H @@ -65,6 +65,8 @@ class UOPstream const int tag_; + const label comm_; + const bool sendAtDestruct_; @@ -93,6 +95,7 @@ public: const int toProcNo, DynamicList<char>& sendBuf, const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, const bool sendAtDestruct = true, streamFormat format=BINARY, versionNumber version=currentVersion @@ -126,7 +129,8 @@ public: const int toProcNo, const char* buf, const std::streamsize bufSize, - const int tag = UPstream::msgType() + const int tag = UPstream::msgType(), + const label communicator = 0 ); //- Write next token to stream diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 9cc6d9d6f5d..20d46092656 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -54,18 +54,33 @@ const Foam::NamedEnum<Foam::UPstream::commsTypes, 3> // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * // -void Foam::UPstream::setParRun() +void Foam::UPstream::setParRun(const label nProcs) { parRun_ = true; - Pout.prefix() = '[' + name(myProcNo()) + "] "; - Perr.prefix() = '[' + name(myProcNo()) + "] "; + // Redo worldComm communicator (this has been created at static + // initialisation time) + freeCommunicator(UPstream::worldComm); + label comm = allocateCommunicator(-1, identity(nProcs), true); + if (comm != UPstream::worldComm) + { + FatalErrorIn("UPstream::setParRun(const label)") + << "problem : comm:" << comm + << " UPstream::worldComm:" << UPstream::worldComm + << Foam::exit(FatalError); + } + + Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] "; + Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] "; } -void Foam::UPstream::calcLinearComm(const label nProcs) +Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm +( + const label nProcs +) { - linearCommunication_.setSize(nProcs); + List<commsStruct> linearCommunication(nProcs); // Master labelList belowIDs(nProcs - 1); @@ -74,7 +89,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs) belowIDs[i] = i + 1; } - linearCommunication_[0] = commsStruct + linearCommunication[0] = commsStruct ( nProcs, 0, @@ -86,7 +101,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs) // Slaves. Have no below processors, only communicate up to master for (label procID = 1; procID < nProcs; procID++) { - linearCommunication_[procID] = commsStruct + linearCommunication[procID] = commsStruct ( nProcs, procID, @@ -95,6 +110,7 @@ void Foam::UPstream::calcLinearComm(const label nProcs) labelList(0) ); } + return linearCommunication; } @@ -142,7 +158,10 @@ void Foam::UPstream::collectReceives // 5 - 4 // 6 7 4 // 7 - 6 -void Foam::UPstream::calcTreeComm(label nProcs) +Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm +( + label nProcs +) { label nLevels = 1; while ((1 << nLevels) < nProcs) @@ -188,11 +207,11 @@ void Foam::UPstream::calcTreeComm(label nProcs) } - treeCommunication_.setSize(nProcs); + List<commsStruct> treeCommunication(nProcs); for (label procID = 0; procID < nProcs; procID++) { - treeCommunication_[procID] = commsStruct + treeCommunication[procID] = commsStruct ( nProcs, procID, @@ -201,37 +220,159 @@ void Foam::UPstream::calcTreeComm(label nProcs) allReceives[procID].shrink() ); } + return treeCommunication; } -// Callback from UPstream::init() : initialize linear and tree communication -// schedules now that nProcs is known. -void Foam::UPstream::initCommunicationSchedule() +//// Callback from UPstream::init() : initialize linear and tree communication +//// schedules now that nProcs is known. +//void Foam::UPstream::initCommunicationSchedule() +//{ +// calcLinearComm(nProcs()); +// calcTreeComm(nProcs()); +//} + + +Foam::label Foam::UPstream::allocateCommunicator +( + const label parentIndex, + const labelList& subRanks, + const bool doPstream +) { - calcLinearComm(nProcs()); - calcTreeComm(nProcs()); + label index; + if (!freeComms_.empty()) + { + index = freeComms_.pop(); + } + else + { + // Extend storage + index = parentCommunicator_.size(); + + myProcNo_.append(-1); + procIDs_.append(List<int>(0)); + parentCommunicator_.append(-1); + linearCommunication_.append(List<commsStruct>(0)); + treeCommunication_.append(List<commsStruct>(0)); + } + + Pout<< "Communicators : Allocating communicator " << index << endl + << " parent : " << parentIndex << endl + << " procs : " << subRanks << endl + << endl; + + // Initialise; overwritten by allocatePstreamCommunicator + myProcNo_[index] = 0; + + // Convert from label to int + procIDs_[index].setSize(subRanks.size()); + forAll(procIDs_[index], i) + { + procIDs_[index][i] = subRanks[i]; + } + parentCommunicator_[index] = parentIndex; + + linearCommunication_[index] = calcLinearComm(procIDs_[index].size()); + treeCommunication_[index] = calcTreeComm(procIDs_[index].size()); + + + if (doPstream) + { + allocatePstreamCommunicator(parentIndex, index); + } + + return index; } -// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // +void Foam::UPstream::freeCommunicator +( + const label communicator, + const bool doPstream +) +{ + Pout<< "Communicators : Freeing communicator " << communicator << endl + << " parent : " << parentCommunicator_[communicator] << endl + << " myProcNo : " << myProcNo_[communicator] << endl + << endl; + + if (doPstream) + { + freePstreamCommunicator(communicator); + } + myProcNo_[communicator] = -1; + //procIDs_[communicator].clear(); + parentCommunicator_[communicator] = -1; + linearCommunication_[communicator].clear(); + treeCommunication_[communicator].clear(); + + freeComms_.push(communicator); +} + + +void Foam::UPstream::freeCommunicators(const bool doPstream) +{ + Pout<< "Communicators : Freeing all communicators" << endl + << endl; + + forAll(myProcNo_, communicator) + { + if (myProcNo_[communicator] != -1) + { + freeCommunicator(communicator, doPstream); + } + } +} -// Initialise my process number to 0 (the master) -int Foam::UPstream::myProcNo_(0); + +// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // // By default this is not a parallel run bool Foam::UPstream::parRun_(false); +//// Initialise my process number to 0 (the master) +//int Foam::UPstream::myProcNo_(0); +// +//// List of process IDs +//Foam::List<int> Foam::UPstream::procIDs_(label(1), 0); + +// Free communicators +Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_; + +// My processor number +Foam::DynamicList<int> Foam::UPstream::myProcNo_(10); + // List of process IDs -Foam::List<int> Foam::UPstream::procIDs_(label(1), 0); +Foam::DynamicList<Foam::List<int> > Foam::UPstream::procIDs_(10); + +// Parent communicator +Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10); // Standard transfer message type int Foam::UPstream::msgType_(1); +//// Linear communication schedule +//Foam::List<Foam::UPstream::commsStruct> +// Foam::UPstream::linearCommunication_(0); +//// Multi level communication schedule +//Foam::List<Foam::UPstream::commsStruct> +// Foam::UPstream::treeCommunication_(0); + // Linear communication schedule -Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::linearCommunication_(0); +Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct> > +Foam::UPstream::linearCommunication_(10); // Multi level communication schedule -Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::treeCommunication_(0); +Foam::DynamicList<Foam::List<Foam::UPstream::commsStruct> > +Foam::UPstream::treeCommunication_(10); + + +// Allocate a serial communicator. This gets overwritten in parallel mode +// (by UPstream::setParRun()) +Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, 0), false); + + // Should compact transfer be used in which floats replace doubles // reducing the bandwidth requirement at the expense of some loss @@ -292,6 +433,10 @@ public: addcommsTypeToOpt addcommsTypeToOpt_("commsType"); +// Default communicator +Foam::label Foam::UPstream::worldComm(0); + + // Number of polling cycles in processor updates int Foam::UPstream::nPollProcInterfaces ( diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 1ad4aa99ccc..2eaa66847e9 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -29,7 +29,6 @@ Description SourceFiles UPstream.C - UPstreamsPrint.C UPstreamCommsStruct.C gatherScatter.C combineGatherScatter.C @@ -45,6 +44,8 @@ SourceFiles #include "HashTable.H" #include "string.H" #include "NamedEnum.H" +#include "ListOps.H" +#include "LIFOStack.H" // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // @@ -180,26 +181,33 @@ private: // Private data - static int myProcNo_; static bool parRun_; - - static List<int> procIDs_; static int msgType_; - static List<commsStruct> linearCommunication_; - static List<commsStruct> treeCommunication_; +// static int myProcNo_; +// static List<int> procIDs_; +// static List<commsStruct> linearCommunication_; +// static List<commsStruct> treeCommunication_; + // Communicator specific data + static LIFOStack<label> freeComms_; + static DynamicList<int> myProcNo_; + static DynamicList<List<int> > procIDs_; + static DynamicList<label> parentCommunicator_; + + static DynamicList<List<commsStruct> > linearCommunication_; + static DynamicList<List<commsStruct> > treeCommunication_; // Private Member Functions //- Set data for parallel running - static void setParRun(); + static void setParRun(const label nProcs); //- Calculate linear communication schedule - static void calcLinearComm(const label nProcs); + static List<commsStruct> calcLinearComm(const label nProcs); //- Calculate tree communication schedule - static void calcTreeComm(const label nProcs); + static List<commsStruct> calcTreeComm(const label nProcs); //- Helper function for tree communication schedule determination // Collects all processorIDs below a processor @@ -210,9 +218,22 @@ private: DynamicList<label>& allReceives ); - //- Initialize all communication schedules. Callback from - // UPstream::init() - static void initCommunicationSchedule(); + //- Allocate a communicator with index + static void allocatePstreamCommunicator + ( + const label parentIndex, + const label index + ); + + //- Free a communicator + static void freePstreamCommunicator + ( + const label index + ); + +// //- Initialize all communication schedules. Callback from +// // UPstream::init() +// static void initCommunicationSchedule(); protected: @@ -245,6 +266,9 @@ public: //- Number of polling cycles in processor updates static int nPollProcInterfaces; + //- Default communicator (all processors) + static label worldComm; + // Constructors //- Construct given optional buffer size @@ -256,6 +280,69 @@ public: // Member functions + //- Allocate a new communicator + static label allocateCommunicator + ( + const label parent, + const labelList& subRanks, + const bool doPstream = true + ); + + //- Free a previously allocated communicator + static void freeCommunicator + ( + const label communicator, + const bool doPstream = true + ); + + //- Free all communicators + static void freeCommunicators(const bool doPstream); + + //- Helper class for allocating/freeing communicators + class communicator + { + label comm_; + + //- Disallow copy and assignment + communicator(const communicator&); + void operator=(const communicator&); + + public: + + communicator + ( + const label parent, + const labelList& subRanks, + const bool doPstream + ) + : + comm_(allocateCommunicator(parent, subRanks, doPstream)) + {} + + communicator(const label parent) + : + comm_ + ( + allocateCommunicator + ( + parent, + identity(UPstream::nProcs(parent)) + ) + ) + {} + + ~communicator() + { + freeCommunicator(comm_); + } + + operator label() const + { + return comm_; + } + }; + + //- Add the valid option this type of communications library // adds/requires on the command line static void addValidParOptions(HashTable<string>& validParOptions); @@ -281,6 +368,14 @@ public: //- Non-blocking comms: has request i finished? static bool finishedRequest(const label i); + static int allocateTag(const char*); + + static int allocateTag(const word&); + + static void freeTag(const char*, const int tag); + + static void freeTag(const word&, const int tag); + //- Is this a parallel run? static bool& parRun() @@ -289,15 +384,9 @@ public: } //- Number of processes in parallel run - static label nProcs() + static label nProcs(const label communicator = 0) { - return procIDs_.size(); - } - - //- Am I the master process - static bool master() - { - return myProcNo_ == 0; + return procIDs_[communicator].size(); } //- Process index of the master @@ -306,23 +395,29 @@ public: return 0; } - //- Number of this process (starting from masterNo() = 0) - static int myProcNo() + //- Am I the master process + static bool master(const label communicator = 0) { - return myProcNo_; + return myProcNo_[communicator] == masterNo(); } - //- Process IDs - static const List<int>& procIDs() + //- Number of this process (starting from masterNo() = 0) + static int myProcNo(const label communicator = 0) { - return procIDs_; + return myProcNo_[communicator]; } - //- Process ID of given process index - static int procID(int procNo) - { - return procIDs_[procNo]; - } +// //- Process IDs +// static const List<int>& procIDs() +// { +// return procIDs_; +// } +// +// //- Process ID of given process index +// static int procID(int procNo) +// { +// return procIDs_[procNo]; +// } //- Process index of first slave static int firstSlave() @@ -331,21 +426,27 @@ public: } //- Process index of last slave - static int lastSlave() + static int lastSlave(const label communicator = 0) { - return nProcs() - 1; + return nProcs(communicator) - 1; } //- Communication schedule for linear all-to-master (proc 0) - static const List<commsStruct>& linearCommunication() + static const List<commsStruct>& linearCommunication + ( + const label communicator = 0 + ) { - return linearCommunication_; + return linearCommunication_[communicator]; } //- Communication schedule for tree all-to-master (proc 0) - static const List<commsStruct>& treeCommunication() + static const List<commsStruct>& treeCommunication + ( + const label communicator = 0 + ) { - return treeCommunication_; + return treeCommunication_[communicator]; } //- Message tag of standard messages diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/combineGatherScatter.C b/src/OpenFOAM/db/IOstreams/Pstreams/combineGatherScatter.C index fbe246f1eaf..952cfbb47d2 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/combineGatherScatter.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/combineGatherScatter.C @@ -51,13 +51,14 @@ void Pstream::combineGather const List<UPstream::commsStruct>& comms, T& Value, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours forAll(myComm.below(), belowI) @@ -73,7 +74,8 @@ void Pstream::combineGather belowID, reinterpret_cast<char*>(&value), sizeof(T), - tag + tag, + comm ); if (debug & 2) @@ -86,7 +88,7 @@ void Pstream::combineGather } else { - IPstream fromBelow(UPstream::scheduled, belowID, 0, tag); + IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm); T value(fromBelow); if (debug & 2) @@ -116,12 +118,20 @@ void Pstream::combineGather myComm.above(), reinterpret_cast<const char*>(&Value), sizeof(T), - tag + tag, + comm ); } else { - OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag); + OPstream toAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); toAbove << Value; } } @@ -130,15 +140,35 @@ void Pstream::combineGather template <class T, class CombineOp> -void Pstream::combineGather(T& Value, const CombineOp& cop, const int tag) +void Pstream::combineGather +( + T& Value, + const CombineOp& cop, + const int tag, + const label comm +) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - combineGather(UPstream::linearCommunication(), Value, cop, tag); + combineGather + ( + UPstream::linearCommunication(comm), + Value, + cop, + tag, + comm + ); } else { - combineGather(UPstream::treeCommunication(), Value, cop, tag); + combineGather + ( + UPstream::treeCommunication(comm), + Value, + cop, + tag, + comm + ); } } @@ -148,13 +178,14 @@ void Pstream::combineScatter ( const List<UPstream::commsStruct>& comms, T& Value, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()]; + const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Reveive from up if (myComm.above() != -1) @@ -167,12 +198,20 @@ void Pstream::combineScatter myComm.above(), reinterpret_cast<char*>(&Value), sizeof(T), - tag + tag, + comm ); } else { - IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag); + IPstream fromAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); Value = T(fromAbove); } @@ -201,12 +240,13 @@ void Pstream::combineScatter belowID, reinterpret_cast<const char*>(&Value), sizeof(T), - tag + tag, + comm ); } else { - OPstream toBelow(UPstream::scheduled, belowID, 0, tag); + OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm); toBelow << Value; } } @@ -215,15 +255,20 @@ void Pstream::combineScatter template <class T> -void Pstream::combineScatter(T& Value, const int tag) +void Pstream::combineScatter +( + T& Value, + const int tag, + const label comm +) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - combineScatter(UPstream::linearCommunication(), Value, tag); + combineScatter(UPstream::linearCommunication(comm), Value, tag, comm); } else { - combineScatter(UPstream::treeCommunication(), Value, tag); + combineScatter(UPstream::treeCommunication(comm), Value, tag, comm); } } @@ -238,13 +283,14 @@ void Pstream::listCombineGather const List<UPstream::commsStruct>& comms, List<T>& Values, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours forAll(myComm.below(), belowI) @@ -261,7 +307,8 @@ void Pstream::listCombineGather belowID, reinterpret_cast<char*>(receivedValues.begin()), receivedValues.byteSize(), - tag + tag, + comm ); if (debug & 2) @@ -277,7 +324,7 @@ void Pstream::listCombineGather } else { - IPstream fromBelow(UPstream::scheduled, belowID, 0, tag); + IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm); List<T> receivedValues(fromBelow); if (debug & 2) @@ -310,12 +357,20 @@ void Pstream::listCombineGather myComm.above(), reinterpret_cast<const char*>(Values.begin()), Values.byteSize(), - tag + tag, + comm ); } else { - OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag); + OPstream toAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); toAbove << Values; } } @@ -328,16 +383,31 @@ void Pstream::listCombineGather ( List<T>& Values, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - listCombineGather(UPstream::linearCommunication(), Values, cop, tag); + listCombineGather + ( + UPstream::linearCommunication(comm), + Values, + cop, + tag, + comm + ); } else { - listCombineGather(UPstream::treeCommunication(), Values, cop, tag); + listCombineGather + ( + UPstream::treeCommunication(comm), + Values, + cop, + tag, + comm + ); } } @@ -347,13 +417,14 @@ void Pstream::listCombineScatter ( const List<UPstream::commsStruct>& comms, List<T>& Values, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()]; + const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Reveive from up if (myComm.above() != -1) @@ -366,12 +437,20 @@ void Pstream::listCombineScatter myComm.above(), reinterpret_cast<char*>(Values.begin()), Values.byteSize(), - tag + tag, + comm ); } else { - IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag); + IPstream fromAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); fromAbove >> Values; } @@ -400,12 +479,13 @@ void Pstream::listCombineScatter belowID, reinterpret_cast<const char*>(Values.begin()), Values.byteSize(), - tag + tag, + comm ); } else { - OPstream toBelow(UPstream::scheduled, belowID, 0, tag); + OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm); toBelow << Values; } } @@ -414,15 +494,32 @@ void Pstream::listCombineScatter template <class T> -void Pstream::listCombineScatter(List<T>& Values, const int tag) +void Pstream::listCombineScatter +( + List<T>& Values, + const int tag, + const label comm +) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - listCombineScatter(UPstream::linearCommunication(), Values, tag); + listCombineScatter + ( + UPstream::linearCommunication(comm), + Values, + tag, + comm + ); } else { - listCombineScatter(UPstream::treeCommunication(), Values, tag); + listCombineScatter + ( + UPstream::treeCommunication(comm), + Values, + tag, + comm + ); } } @@ -439,20 +536,21 @@ void Pstream::mapCombineGather const List<UPstream::commsStruct>& comms, Container& Values, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours forAll(myComm.below(), belowI) { label belowID = myComm.below()[belowI]; - IPstream fromBelow(UPstream::scheduled, belowID, 0, tag); + IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm); Container receivedValues(fromBelow); if (debug & 2) @@ -492,7 +590,7 @@ void Pstream::mapCombineGather << " data:" << Values << endl; } - OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag); + OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag, comm); toAbove << Values; } } @@ -504,16 +602,31 @@ void Pstream::mapCombineGather ( Container& Values, const CombineOp& cop, - const int tag + const int tag, + const label comm ) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - mapCombineGather(UPstream::linearCommunication(), Values, cop, tag); + mapCombineGather + ( + UPstream::linearCommunication(comm), + Values, + cop, + tag, + comm + ); } else { - mapCombineGather(UPstream::treeCommunication(), Values, cop, tag); + mapCombineGather + ( + UPstream::treeCommunication(comm), + Values, + cop, + tag, + comm + ); } } @@ -523,18 +636,26 @@ void Pstream::mapCombineScatter ( const List<UPstream::commsStruct>& comms, Container& Values, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo()]; + const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Reveive from up if (myComm.above() != -1) { - IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag); + IPstream fromAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); fromAbove >> Values; if (debug & 2) @@ -554,7 +675,7 @@ void Pstream::mapCombineScatter Pout<< " sending to " << belowID << " data:" << Values << endl; } - OPstream toBelow(UPstream::scheduled, belowID, 0, tag); + OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm); toBelow << Values; } } @@ -562,15 +683,32 @@ void Pstream::mapCombineScatter template <class Container> -void Pstream::mapCombineScatter(Container& Values, const int tag) +void Pstream::mapCombineScatter +( + Container& Values, + const int tag, + const label comm +) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - mapCombineScatter(UPstream::linearCommunication(), Values, tag); + mapCombineScatter + ( + UPstream::linearCommunication(comm), + Values, + tag, + comm + ); } else { - mapCombineScatter(UPstream::treeCommunication(), Values, tag); + mapCombineScatter + ( + UPstream::treeCommunication(comm), + Values, + tag, + comm + ); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C index 48c166935a1..b64de740f92 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/exchange.C @@ -46,6 +46,7 @@ void Pstream::exchange List<Container>& recvBufs, labelListList& sizes, const int tag, + const label comm, const bool block ) { @@ -57,20 +58,20 @@ void Pstream::exchange ) << "Continuous data only." << Foam::abort(FatalError); } - if (sendBufs.size() != UPstream::nProcs()) + if (sendBufs.size() != UPstream::nProcs(comm)) { FatalErrorIn ( "Pstream::exchange(..)" ) << "Size of list:" << sendBufs.size() << " does not equal the number of processors:" - << UPstream::nProcs() + << UPstream::nProcs(comm) << Foam::abort(FatalError); } - sizes.setSize(UPstream::nProcs()); - labelList& nsTransPs = sizes[UPstream::myProcNo()]; - nsTransPs.setSize(UPstream::nProcs()); + sizes.setSize(UPstream::nProcs(comm)); + labelList& nsTransPs = sizes[UPstream::myProcNo(comm)]; + nsTransPs.setSize(UPstream::nProcs(comm)); forAll(sendBufs, procI) { @@ -78,7 +79,7 @@ void Pstream::exchange } // Send sizes across. Note: blocks. - combineReduce(sizes, UPstream::listEq(), tag); + combineReduce(sizes, UPstream::listEq(), tag, comm); if (Pstream::parRun()) { @@ -90,9 +91,9 @@ void Pstream::exchange recvBufs.setSize(sendBufs.size()); forAll(sizes, procI) { - label nRecv = sizes[procI][UPstream::myProcNo()]; + label nRecv = sizes[procI][UPstream::myProcNo(comm)]; - if (procI != Pstream::myProcNo() && nRecv > 0) + if (procI != Pstream::myProcNo(comm) && nRecv > 0) { recvBufs[procI].setSize(nRecv); UIPstream::read @@ -101,7 +102,8 @@ void Pstream::exchange procI, reinterpret_cast<char*>(recvBufs[procI].begin()), nRecv*sizeof(T), - tag + tag, + comm ); } } @@ -112,7 +114,7 @@ void Pstream::exchange forAll(sendBufs, procI) { - if (procI != Pstream::myProcNo() && sendBufs[procI].size() > 0) + if (procI != Pstream::myProcNo(comm) && sendBufs[procI].size() > 0) { if ( @@ -122,7 +124,8 @@ void Pstream::exchange procI, reinterpret_cast<const char*>(sendBufs[procI].begin()), sendBufs[procI].size()*sizeof(T), - tag + tag, + comm ) ) { @@ -146,7 +149,7 @@ void Pstream::exchange } // Do myself - recvBufs[Pstream::myProcNo()] = sendBufs[Pstream::myProcNo()]; + recvBufs[Pstream::myProcNo(comm)] = sendBufs[Pstream::myProcNo(comm)]; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatter.C b/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatter.C index 7065383d38c..3e412f416fc 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatter.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatter.C @@ -48,13 +48,14 @@ void Pstream::gather const List<UPstream::commsStruct>& comms, T& Value, const BinaryOp& bop, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours forAll(myComm.below(), belowI) @@ -69,7 +70,8 @@ void Pstream::gather myComm.below()[belowI], reinterpret_cast<char*>(&value), sizeof(T), - tag + tag, + comm ); } else @@ -79,7 +81,8 @@ void Pstream::gather UPstream::scheduled, myComm.below()[belowI], 0, - tag + tag, + comm ); fromBelow >> value; } @@ -98,12 +101,20 @@ void Pstream::gather myComm.above(), reinterpret_cast<const char*>(&Value), sizeof(T), - tag + tag, + comm ); } else { - OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag); + OPstream toAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); toAbove << Value; } } @@ -112,15 +123,21 @@ void Pstream::gather template <class T, class BinaryOp> -void Pstream::gather(T& Value, const BinaryOp& bop, const int tag) +void Pstream::gather +( + T& Value, + const BinaryOp& bop, + const int tag, + const label comm +) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - gather(UPstream::linearCommunication(), Value, bop, tag); + gather(UPstream::linearCommunication(comm), Value, bop, tag, comm); } else { - gather(UPstream::treeCommunication(), Value, bop, tag); + gather(UPstream::treeCommunication(comm), Value, bop, tag, comm); } } @@ -130,13 +147,14 @@ void Pstream::scatter ( const List<UPstream::commsStruct>& comms, T& Value, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Reveive from up if (myComm.above() != -1) @@ -149,12 +167,20 @@ void Pstream::scatter myComm.above(), reinterpret_cast<char*>(&Value), sizeof(T), - tag + tag, + comm ); } else { - IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag); + IPstream fromAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); fromAbove >> Value; } } @@ -170,7 +196,8 @@ void Pstream::scatter myComm.below()[belowI], reinterpret_cast<const char*>(&Value), sizeof(T), - tag + tag, + comm ); } else @@ -180,7 +207,8 @@ void Pstream::scatter UPstream::scheduled, myComm.below()[belowI], 0, - tag + tag, + comm ); toBelow << Value; } @@ -190,15 +218,15 @@ void Pstream::scatter template <class T> -void Pstream::scatter(T& Value, const int tag) +void Pstream::scatter(T& Value, const int tag, const label comm) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - scatter(UPstream::linearCommunication(), Value, tag); + scatter(UPstream::linearCommunication(comm), Value, tag, comm); } else { - scatter(UPstream::treeCommunication(), Value, tag); + scatter(UPstream::treeCommunication(comm), Value, tag, comm); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatterList.C b/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatterList.C index b309ea79321..c7e2d026b66 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatterList.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/gatherScatterList.C @@ -26,7 +26,7 @@ Description communication schedule (usually linear-to-master or tree-to-master). The gathered data will be a list with element procID the data from processor procID. Before calling every processor should insert its value into - Values[UPstream::myProcNo()]. + Values[UPstream::myProcNo(comm)]. Note: after gather every processor only knows its own data and that of the processors below it. Only the 'master' of the communication schedule holds a fully filled List. Use scatter to distribute the data. @@ -49,12 +49,13 @@ void Pstream::gatherList ( const List<UPstream::commsStruct>& comms, List<T>& Values, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { - if (Values.size() != UPstream::nProcs()) + if (Values.size() != UPstream::nProcs(comm)) { FatalErrorIn ( @@ -62,12 +63,12 @@ void Pstream::gatherList ", List<T>)" ) << "Size of list:" << Values.size() << " does not equal the number of processors:" - << UPstream::nProcs() + << UPstream::nProcs(comm) << Foam::abort(FatalError); } // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours forAll(myComm.below(), belowI) @@ -85,7 +86,8 @@ void Pstream::gatherList belowID, reinterpret_cast<char*>(receivedValues.begin()), receivedValues.byteSize(), - tag + tag, + comm ); Values[belowID] = receivedValues[0]; @@ -97,7 +99,7 @@ void Pstream::gatherList } else { - IPstream fromBelow(UPstream::scheduled, belowID, 0, tag); + IPstream fromBelow(UPstream::scheduled, belowID, 0, tag, comm); fromBelow >> Values[belowID]; if (debug & 2) @@ -133,14 +135,14 @@ void Pstream::gatherList if (debug & 2) { Pout<< " sending to " << myComm.above() - << " data from me:" << UPstream::myProcNo() - << " data:" << Values[UPstream::myProcNo()] << endl; + << " data from me:" << UPstream::myProcNo(comm) + << " data:" << Values[UPstream::myProcNo(comm)] << endl; } if (contiguous<T>()) { List<T> sendingValues(belowLeaves.size() + 1); - sendingValues[0] = Values[UPstream::myProcNo()]; + sendingValues[0] = Values[UPstream::myProcNo(comm)]; forAll(belowLeaves, leafI) { @@ -153,13 +155,21 @@ void Pstream::gatherList myComm.above(), reinterpret_cast<const char*>(sendingValues.begin()), sendingValues.byteSize(), - tag + tag, + comm ); } else { - OPstream toAbove(UPstream::scheduled, myComm.above(), 0, tag); - toAbove << Values[UPstream::myProcNo()]; + OPstream toAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); + toAbove << Values[UPstream::myProcNo(comm)]; forAll(belowLeaves, leafI) { @@ -180,15 +190,15 @@ void Pstream::gatherList template <class T> -void Pstream::gatherList(List<T>& Values, const int tag) +void Pstream::gatherList(List<T>& Values, const int tag, const label comm) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - gatherList(UPstream::linearCommunication(), Values, tag); + gatherList(UPstream::linearCommunication(comm), Values, tag, comm); } else { - gatherList(UPstream::treeCommunication(), Values, tag); + gatherList(UPstream::treeCommunication(comm), Values, tag, comm); } } @@ -198,12 +208,13 @@ void Pstream::scatterList ( const List<UPstream::commsStruct>& comms, List<T>& Values, - const int tag + const int tag, + const label comm ) { if (UPstream::parRun()) { - if (Values.size() != UPstream::nProcs()) + if (Values.size() != UPstream::nProcs(comm)) { FatalErrorIn ( @@ -211,12 +222,12 @@ void Pstream::scatterList ", List<T>)" ) << "Size of list:" << Values.size() << " does not equal the number of processors:" - << UPstream::nProcs() + << UPstream::nProcs(comm) << Foam::abort(FatalError); } // Get my communication order - const commsStruct& myComm = comms[UPstream::myProcNo()]; + const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; // Reveive from up if (myComm.above() != -1) @@ -233,7 +244,8 @@ void Pstream::scatterList myComm.above(), reinterpret_cast<char*>(receivedValues.begin()), receivedValues.byteSize(), - tag + tag, + comm ); forAll(notBelowLeaves, leafI) @@ -243,7 +255,14 @@ void Pstream::scatterList } else { - IPstream fromAbove(UPstream::scheduled, myComm.above(), 0, tag); + IPstream fromAbove + ( + UPstream::scheduled, + myComm.above(), + 0, + tag, + comm + ); forAll(notBelowLeaves, leafI) { @@ -281,12 +300,13 @@ void Pstream::scatterList belowID, reinterpret_cast<const char*>(sendingValues.begin()), sendingValues.byteSize(), - tag + tag, + comm ); } else { - OPstream toBelow(UPstream::scheduled, belowID, 0, tag); + OPstream toBelow(UPstream::scheduled, belowID, 0, tag, comm); // Send data destined for all other processors below belowID forAll(notBelowLeaves, leafI) @@ -308,15 +328,15 @@ void Pstream::scatterList template <class T> -void Pstream::scatterList(List<T>& Values, const int tag) +void Pstream::scatterList(List<T>& Values, const int tag, const label comm) { - if (UPstream::nProcs() < UPstream::nProcsSimpleSum) + if (UPstream::nProcs(comm) < UPstream::nProcsSimpleSum) { - scatterList(UPstream::linearCommunication(), Values, tag); + scatterList(UPstream::linearCommunication(comm), Values, tag, comm); } else { - scatterList(UPstream::treeCommunication(), Values, tag); + scatterList(UPstream::treeCommunication(comm), Values, tag, comm); } } diff --git a/src/OpenFOAM/db/regIOobject/regIOobjectRead.C b/src/OpenFOAM/db/regIOobject/regIOobjectRead.C index 39fba32a292..5b273d7a80b 100644 --- a/src/OpenFOAM/db/regIOobject/regIOobjectRead.C +++ b/src/OpenFOAM/db/regIOobject/regIOobjectRead.C @@ -215,9 +215,10 @@ bool Foam::regIOobject::read() ( comms, const_cast<word&>(headerClassName()), - Pstream::msgType() + Pstream::msgType(), + Pstream::worldComm ); - Pstream::scatter(comms, note(), Pstream::msgType()); + Pstream::scatter(comms, note(), Pstream::msgType(), Pstream::worldComm); // Get my communication order diff --git a/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.C b/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.C index 1d02976a154..6eab2e4cbda 100644 --- a/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.C +++ b/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.C @@ -40,7 +40,8 @@ Foam::procLduInterface::procLduInterface coeffs_(coeffs), myProcNo_(-1), neighbProcNo_(-1), - tag_(-1) + tag_(-1), + comm_(-1) { if (isA<processorLduInterface>(interface.interface())) { @@ -50,6 +51,7 @@ Foam::procLduInterface::procLduInterface myProcNo_ = pldui.myProcNo(); neighbProcNo_ = pldui.neighbProcNo(); tag_ = pldui.tag(); + comm_ = pldui.comm(); } else if (isA<cyclicLduInterface>(interface.interface())) { @@ -73,7 +75,8 @@ Foam::procLduInterface::procLduInterface(Istream& is) coeffs_(is), myProcNo_(readLabel(is)), neighbProcNo_(readLabel(is)), - tag_(readLabel(is)) + tag_(readLabel(is)), + comm_(readLabel(is)) {} @@ -85,7 +88,8 @@ Foam::Ostream& Foam::operator<<(Ostream& os, const procLduInterface& cldui) << cldui.coeffs_ << cldui.myProcNo_ << cldui.neighbProcNo_ - << cldui.tag_; + << cldui.tag_ + << cldui.comm_; return os; } diff --git a/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.H b/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.H index 7541b53b0fc..8965f989db3 100644 --- a/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.H +++ b/src/OpenFOAM/matrices/LUscalarMatrix/procLduInterface.H @@ -25,7 +25,7 @@ Class Foam::procLduInterface Description - Foam::procLduInterface + IO interface for processorLduInterface SourceFiles procLduInterface.C @@ -58,6 +58,7 @@ class procLduInterface label myProcNo_; label neighbProcNo_; label tag_; + label comm_; // Private Member Functions diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H index 26dc4c1eb84..ac0f8959006 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterface.H @@ -84,10 +84,13 @@ public: // Access - //- Return processor number + //- Return communicator used for sending + virtual int comm() const = 0; + + //- Return processor number (rank in communicator) virtual int myProcNo() const = 0; - //- Return neigbour processor number + //- Return neigbour processor number (rank in communicator) virtual int neighbProcNo() const = 0; //- Return face transformation tensor diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C index 396845ffe86..fe552501e12 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterface/processorLduInterfaceTemplates.C @@ -46,7 +46,8 @@ void Foam::processorLduInterface::send neighbProcNo(), reinterpret_cast<const char*>(f.begin()), nBytes, - tag() + tag(), + comm() ); } else if (commsType == Pstream::nonBlocking) @@ -59,7 +60,8 @@ void Foam::processorLduInterface::send neighbProcNo(), receiveBuf_.begin(), nBytes, - tag() + tag(), + comm() ); resizeBuf(sendBuf_, nBytes); @@ -71,7 +73,8 @@ void Foam::processorLduInterface::send neighbProcNo(), sendBuf_.begin(), nBytes, - tag() + tag(), + comm() ); } else @@ -98,7 +101,8 @@ void Foam::processorLduInterface::receive neighbProcNo(), reinterpret_cast<char*>(f.begin()), f.byteSize(), - tag() + tag(), + comm() ); } else if (commsType == Pstream::nonBlocking) @@ -162,7 +166,8 @@ void Foam::processorLduInterface::compressedSend neighbProcNo(), sendBuf_.begin(), nBytes, - tag() + tag(), + comm() ); } else if (commsType == Pstream::nonBlocking) @@ -175,7 +180,8 @@ void Foam::processorLduInterface::compressedSend neighbProcNo(), receiveBuf_.begin(), nBytes, - tag() + tag(), + comm() ); OPstream::write @@ -184,7 +190,8 @@ void Foam::processorLduInterface::compressedSend neighbProcNo(), sendBuf_.begin(), nBytes, - tag() + tag(), + comm() ); } else @@ -225,7 +232,8 @@ void Foam::processorLduInterface::compressedReceive neighbProcNo(), receiveBuf_.begin(), nBytes, - tag() + tag(), + comm() ); } else if (commsType != Pstream::nonBlocking) diff --git a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterfaceFields/processorLduInterfaceField/processorLduInterfaceField.H b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterfaceFields/processorLduInterfaceField/processorLduInterfaceField.H index a5b9066b828..c1cc1c21881 100644 --- a/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterfaceFields/processorLduInterfaceField/processorLduInterfaceField.H +++ b/src/OpenFOAM/matrices/lduMatrix/lduAddressing/lduInterfaceFields/processorLduInterfaceField/processorLduInterfaceField.H @@ -71,6 +71,9 @@ public: // Access + //- Return communicator used for comms + virtual int comm() const = 0; + //- Return processor number virtual int myProcNo() const = 0; diff --git a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaceFields/processorGAMGInterfaceField/processorGAMGInterfaceField.H b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaceFields/processorGAMGInterfaceField/processorGAMGInterfaceField.H index c64f0738ee4..68007b2c819 100644 --- a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaceFields/processorGAMGInterfaceField/processorGAMGInterfaceField.H +++ b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaceFields/processorGAMGInterfaceField/processorGAMGInterfaceField.H @@ -146,6 +146,12 @@ public: //- Processor interface functions + //- Return communicator used for comms + virtual int comm() const + { + return procInterface_.comm(); + } + //- Return processor number virtual int myProcNo() const { diff --git a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaces/processorGAMGInterface/processorGAMGInterface.H b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaces/processorGAMGInterface/processorGAMGInterface.H index 8ef90c74fe1..6c332909cfb 100644 --- a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaces/processorGAMGInterface/processorGAMGInterface.H +++ b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/interfaces/processorGAMGInterface/processorGAMGInterface.H @@ -55,7 +55,7 @@ class processorGAMGInterface { // Private data - //- Reference tor the processorLduInterface from which this is + //- Reference for the processorLduInterface from which this is // agglomerated const processorLduInterface& fineProcInterface_; @@ -137,6 +137,12 @@ public: { return fineProcInterface_.tag(); } + + //- Return communicator used for sending + virtual int comm() const + { + return fineProcInterface_.comm(); + } }; diff --git a/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.C b/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.C index f5a6c72fa80..7f099685bfd 100644 --- a/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.C +++ b/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.C @@ -34,6 +34,7 @@ License template<class Patch, class ProcPatch> Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours ( + const label nProcs, const PtrList<Patch>& patches ) { @@ -43,7 +44,7 @@ Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours label maxNb = 0; - boolList isNeighbourProc(Pstream::nProcs(), false); + boolList isNeighbourProc(nProcs, false); forAll(patches, patchi) { @@ -106,20 +107,21 @@ Foam::labelList Foam::ProcessorTopology<Patch, ProcPatch>::procNeighbours template<class Patch, class ProcPatch> Foam::ProcessorTopology<Patch, ProcPatch>::ProcessorTopology ( - const PtrList<Patch>& patches + const PtrList<Patch>& patches, + const label comm ) : - labelListList(Pstream::nProcs()), + labelListList(Pstream::nProcs(comm)), patchSchedule_(2*patches.size()) { if (Pstream::parRun()) { // Fill my 'slot' with my neighbours - operator[](Pstream::myProcNo()) = procNeighbours(patches); + operator[](Pstream::myProcNo()) = procNeighbours(this->size(), patches); // Distribute to all processors - Pstream::gatherList(*this); - Pstream::scatterList(*this); + Pstream::gatherList(*this, Pstream::msgType(), comm); + Pstream::scatterList(*this, Pstream::msgType(), comm); } if (Pstream::parRun() && Pstream::defaultCommsType == Pstream::scheduled) @@ -172,7 +174,7 @@ Foam::ProcessorTopology<Patch, ProcPatch>::ProcessorTopology ( commSchedule ( - Pstream::nProcs(), + Pstream::nProcs(comm), comms ).procSchedule()[Pstream::myProcNo()] ); diff --git a/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.H b/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.H index 4a46bfcbd0f..e39e9e19142 100644 --- a/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.H +++ b/src/OpenFOAM/meshes/ProcessorTopology/ProcessorTopology.H @@ -76,14 +76,14 @@ private: //- Return all neighbouring processors of this processor. Set // procPatchMap_. - labelList procNeighbours(const PtrList<Patch>&); + labelList procNeighbours(const label nProcs, const PtrList<Patch>&); public: // Constructors //- Construct from boundaryMesh - ProcessorTopology(const PtrList<Patch>& patches); + ProcessorTopology(const PtrList<Patch>& patches, const label comm); // Member Functions diff --git a/src/OpenFOAM/meshes/polyMesh/globalMeshData/globalMeshData.C b/src/OpenFOAM/meshes/polyMesh/globalMeshData/globalMeshData.C index 2c0ae4c102b..35a88a646d4 100644 --- a/src/OpenFOAM/meshes/polyMesh/globalMeshData/globalMeshData.C +++ b/src/OpenFOAM/meshes/polyMesh/globalMeshData/globalMeshData.C @@ -1740,7 +1740,7 @@ void Foam::globalMeshData::calcGlobalCoPointSlaves() const // Construct from polyMesh Foam::globalMeshData::globalMeshData(const polyMesh& mesh) : - processorTopology(mesh.boundaryMesh()), + processorTopology(mesh.boundaryMesh(), UPstream::worldComm), mesh_(mesh), nTotalPoints_(-1), nTotalFaces_(-1), diff --git a/src/OpenFOAM/meshes/polyMesh/mapPolyMesh/mapDistribute/mapDistribute.C b/src/OpenFOAM/meshes/polyMesh/mapPolyMesh/mapDistribute/mapDistribute.C index a4ababac70b..94c8d176dfa 100644 --- a/src/OpenFOAM/meshes/polyMesh/mapPolyMesh/mapDistribute/mapDistribute.C +++ b/src/OpenFOAM/meshes/polyMesh/mapPolyMesh/mapDistribute/mapDistribute.C @@ -548,7 +548,8 @@ void Foam::mapDistribute::exchangeAddressing wantedRemoteElements, subMap_, sendSizes, - tag + tag, + Pstream::worldComm //TBD ); // Renumber elements @@ -627,7 +628,8 @@ void Foam::mapDistribute::exchangeAddressing wantedRemoteElements, subMap_, sendSizes, - tag + tag, + Pstream::worldComm //TBD ); // Renumber elements diff --git a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.C b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.C index 5da47a6ce4c..ab44c8d45b9 100644 --- a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.C +++ b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.C @@ -54,12 +54,14 @@ Foam::processorPolyPatch::processorPolyPatch const label start, const label index, const polyBoundaryMesh& bm, + const label comm, const int myProcNo, const int neighbProcNo, const transformType transform ) : coupledPolyPatch(name, size, start, index, bm, typeName, transform), + comm_(comm), myProcNo_(myProcNo), neighbProcNo_(neighbProcNo), neighbFaceCentres_(), @@ -78,6 +80,10 @@ Foam::processorPolyPatch::processorPolyPatch ) : coupledPolyPatch(name, dict, index, bm, patchType), + comm_ + ( + dict.lookupOrDefault("communicator", UPstream::worldComm) + ), myProcNo_(readLabel(dict.lookup("myProcNo"))), neighbProcNo_(readLabel(dict.lookup("neighbProcNo"))), neighbFaceCentres_(), @@ -93,6 +99,7 @@ Foam::processorPolyPatch::processorPolyPatch ) : coupledPolyPatch(pp, bm), + comm_(pp.comm_), myProcNo_(pp.myProcNo_), neighbProcNo_(pp.neighbProcNo_), neighbFaceCentres_(), @@ -111,6 +118,7 @@ Foam::processorPolyPatch::processorPolyPatch ) : coupledPolyPatch(pp, bm, index, newSize, newStart), + comm_(pp.comm_), myProcNo_(pp.myProcNo_), neighbProcNo_(pp.neighbProcNo_), neighbFaceCentres_(), @@ -129,6 +137,7 @@ Foam::processorPolyPatch::processorPolyPatch ) : coupledPolyPatch(pp, bm, index, mapAddressing, newStart), + comm_(pp.comm_), myProcNo_(pp.myProcNo_), neighbProcNo_(pp.neighbProcNo_), neighbFaceCentres_(), @@ -1082,6 +1091,11 @@ bool Foam::processorPolyPatch::order void Foam::processorPolyPatch::write(Ostream& os) const { coupledPolyPatch::write(os); + if (comm_ != UPstream::worldComm) + { + os.writeKeyword("communicator") << comm_ + << token::END_STATEMENT << nl; + } os.writeKeyword("myProcNo") << myProcNo_ << token::END_STATEMENT << nl; os.writeKeyword("neighbProcNo") << neighbProcNo_ diff --git a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.H b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.H index f40f97c94f2..7c74f9f28e3 100644 --- a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.H +++ b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processor/processorPolyPatch.H @@ -58,6 +58,9 @@ class processorPolyPatch { // Private data + //- Communicator to use + label comm_; + int myProcNo_; int neighbProcNo_; @@ -133,6 +136,7 @@ public: const label start, const label index, const polyBoundaryMesh& bm, + const label comm, const int myProcNo, const int neighbProcNo, const transformType transform = UNKNOWN // transformation type @@ -244,6 +248,13 @@ public: } } + + //- Return communicator used for communication + label comm() const + { + return comm_; + } + //- Return processor number int myProcNo() const { diff --git a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.C b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.C index 8988a06eb21..305f01826d1 100644 --- a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.C +++ b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.C @@ -49,6 +49,7 @@ Foam::processorCyclicPolyPatch::processorCyclicPolyPatch const label start, const label index, const polyBoundaryMesh& bm, + const label comm, const int myProcNo, const int neighbProcNo, const word& referPatchName, @@ -62,6 +63,7 @@ Foam::processorCyclicPolyPatch::processorCyclicPolyPatch start, index, bm, + comm, myProcNo, neighbProcNo, transform diff --git a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.H b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.H index fa59414586b..139080b35c8 100644 --- a/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.H +++ b/src/OpenFOAM/meshes/polyMesh/polyPatches/constraint/processorCyclic/processorCyclicPolyPatch.H @@ -121,6 +121,7 @@ public: const label start, const label index, const polyBoundaryMesh& bm, + const label comm, const int myProcNo, const int neighbProcNo, const word& referPatchName, diff --git a/src/Pstream/dummy/UIPread.C b/src/Pstream/dummy/UIPread.C index fc3b4466e2a..6fa253f38c7 100644 --- a/src/Pstream/dummy/UIPread.C +++ b/src/Pstream/dummy/UIPread.C @@ -37,6 +37,7 @@ Foam::UIPstream::UIPstream DynamicList<char>& externalBuf, label& externalBufPosition, const int tag, + const label comm, const bool clearAtEnd, streamFormat format, versionNumber version @@ -48,6 +49,7 @@ Foam::UIPstream::UIPstream externalBuf_(externalBuf), externalBufPosition_(externalBufPosition), tag_(tag), + comm_(comm), clearAtEnd_(clearAtEnd), messageSize_(0) { @@ -60,6 +62,7 @@ Foam::UIPstream::UIPstream "DynamicList<char>&,\n" "label&,\n" "const int,\n" + "const label,\n" "const bool,\n" "streamFormat,\n" "versionNumber\n" @@ -68,11 +71,7 @@ Foam::UIPstream::UIPstream } -Foam::UIPstream::UIPstream -( - const int fromProcNo, - PstreamBuffers& buffers -) +Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) : UPstream(buffers.commsType_), Istream(buffers.format_, buffers.version_), @@ -80,6 +79,7 @@ Foam::UIPstream::UIPstream externalBuf_(buffers.recvBuf_[fromProcNo]), externalBufPosition_(buffers.recvBufPos_[fromProcNo]), tag_(buffers.tag_), + comm_(buffers.comm_), clearAtEnd_(true), messageSize_(0) { @@ -102,7 +102,8 @@ Foam::label Foam::UIPstream::read const int fromProcNo, char* buf, const std::streamsize bufSize, - const int tag + const int tag, + const label communicator ) { notImplemented @@ -113,7 +114,8 @@ Foam::label Foam::UIPstream::read "const int fromProcNo," "char* buf," "const label bufSize," - "const int tag" + "const int tag," + "const label communicator" ")" ); diff --git a/src/Pstream/dummy/UOPwrite.C b/src/Pstream/dummy/UOPwrite.C index 3766f82e7c1..79dfb9719bd 100644 --- a/src/Pstream/dummy/UOPwrite.C +++ b/src/Pstream/dummy/UOPwrite.C @@ -36,7 +36,8 @@ bool Foam::UOPstream::write const int toProcNo, const char* buf, const std::streamsize bufSize, - const int tag + const int tag, + const label communicator ) { notImplemented @@ -47,7 +48,8 @@ bool Foam::UOPstream::write "const int fromProcNo," "char* buf," "const label bufSize," - "const int tag" + "const int tag," + "const label communicator" ")" ); diff --git a/src/Pstream/dummy/UPstream.C b/src/Pstream/dummy/UPstream.C index 183a86cb9e4..1e4cd4899a6 100644 --- a/src/Pstream/dummy/UPstream.C +++ b/src/Pstream/dummy/UPstream.C @@ -55,28 +55,41 @@ void Foam::UPstream::abort() } -void Foam::reduce(scalar&, const sumOp<scalar>&, const int) +void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label) {} -void Foam::reduce(scalar&, const minOp<scalar>&, const int) +void Foam::reduce(scalar&, const minOp<scalar>&, const int, const label) {} -void Foam::reduce(vector2D&, const sumOp<vector2D>&, const int) +void Foam::reduce(vector2D&, const sumOp<vector2D>&, const int, const label) {} void Foam::sumReduce ( - scalar& Value, - label& Count, - const int tag + scalar&, + label&, + const int, + const label ) {} -void Foam::reduce(scalar&, const sumOp<scalar>&, const int, label&) +void Foam::reduce(scalar&, const sumOp<scalar>&, const int, const label, label&) +{} + + +void Foam::UPstream::allocatePstreamCommunicator +( + const label, + const label +) +{} + + +void Foam::UPstream::freePstreamCommunicator(const label) {} diff --git a/src/Pstream/mpi/PstreamGlobals.C b/src/Pstream/mpi/PstreamGlobals.C index 40eb6adacff..c57e7aef24d 100644 --- a/src/Pstream/mpi/PstreamGlobals.C +++ b/src/Pstream/mpi/PstreamGlobals.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2013 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -37,6 +37,35 @@ namespace Foam DynamicList<MPI_Request> PstreamGlobals::outstandingRequests_; //! \endcond +// Allocated communicators. +//! \cond fileScope +DynamicList<MPI_Comm> PstreamGlobals::MPICommunicators_; +DynamicList<MPI_Group> PstreamGlobals::MPIGroups_; +//! \endcond + +void PstreamGlobals::checkCommunicator +( + const label comm, + const label otherProcNo +) +{ + if + ( + comm < 0 + || comm >= PstreamGlobals::MPICommunicators_.size() + ) + { + FatalErrorIn + ( + "PstreamGlobals::checkCommunicator(const label, const label)" + ) << "otherProcNo:" << otherProcNo << " : illegal communicator " + << comm << endl + << "Communicator should be within range 0.." + << PstreamGlobals::MPICommunicators_.size()-1 << abort(FatalError); + } +} + + // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // } // End namespace Foam diff --git a/src/Pstream/mpi/PstreamGlobals.H b/src/Pstream/mpi/PstreamGlobals.H index 9d12ca9da06..9448a948a57 100644 --- a/src/Pstream/mpi/PstreamGlobals.H +++ b/src/Pstream/mpi/PstreamGlobals.H @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2013 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -26,7 +26,7 @@ Namespace Description Global functions and variables for working with parallel streams, - but principally for gamma/mpi + but principally for mpi SourceFiles PstreamGlobals.C @@ -54,6 +54,13 @@ namespace PstreamGlobals extern DynamicList<MPI_Request> outstandingRequests_; + +// Current communicators. First element will be MPI_COMM_WORLD +extern DynamicList<MPI_Comm> MPICommunicators_; +extern DynamicList<MPI_Group> MPIGroups_; + +void checkCommunicator(const label, const label procNo); + }; diff --git a/src/Pstream/mpi/UIPread.C b/src/Pstream/mpi/UIPread.C index 19d6b161122..c1cae71e164 100644 --- a/src/Pstream/mpi/UIPread.C +++ b/src/Pstream/mpi/UIPread.C @@ -41,6 +41,7 @@ Foam::UIPstream::UIPstream DynamicList<char>& externalBuf, label& externalBufPosition, const int tag, + const label comm, const bool clearAtEnd, streamFormat format, versionNumber version @@ -52,6 +53,7 @@ Foam::UIPstream::UIPstream externalBuf_(externalBuf), externalBufPosition_(externalBufPosition), tag_(tag), + comm_(comm), clearAtEnd_(clearAtEnd), messageSize_(0) { @@ -80,7 +82,7 @@ Foam::UIPstream::UIPstream // and set it if (!wantedSize) { - MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status); + MPI_Probe(fromProcNo_, tag_, MPI_COMM_WORLD, &status); MPI_Get_count(&status, MPI_BYTE, &messageSize_); externalBuf_.setCapacity(messageSize_); @@ -99,7 +101,8 @@ Foam::UIPstream::UIPstream fromProcNo_, externalBuf_.begin(), wantedSize, - tag_ + tag_, + comm_ ); // Set addressed size. Leave actual allocated memory intact. @@ -121,6 +124,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) externalBuf_(buffers.recvBuf_[fromProcNo]), externalBufPosition_(buffers.recvBufPos_[fromProcNo]), tag_(buffers.tag_), + comm_(buffers.comm_), clearAtEnd_(true), messageSize_(0) { @@ -167,7 +171,7 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) // and set it if (!wantedSize) { - MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status); + MPI_Probe(fromProcNo_, tag_, MPI_COMM_WORLD, &status); MPI_Get_count(&status, MPI_BYTE, &messageSize_); externalBuf_.setCapacity(messageSize_); @@ -186,7 +190,8 @@ Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers) fromProcNo_, externalBuf_.begin(), wantedSize, - tag_ + tag_, + comm_ ); // Set addressed size. Leave actual allocated memory intact. @@ -208,13 +213,15 @@ Foam::label Foam::UIPstream::read const int fromProcNo, char* buf, const std::streamsize bufSize, - const int tag + const int tag, + const label communicator ) { if (debug) { Pout<< "UIPstream::read : starting read from:" << fromProcNo - << " tag:" << tag << " wanted size:" << label(bufSize) + << " tag:" << tag << " comm:" << communicator + << " wanted size:" << label(bufSize) << " commsType:" << UPstream::commsTypeNames[commsType] << Foam::endl; } @@ -230,9 +237,9 @@ Foam::label Foam::UIPstream::read buf, bufSize, MPI_PACKED, - procID(fromProcNo), + fromProcNo, tag, - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator], &status ) ) @@ -286,9 +293,9 @@ Foam::label Foam::UIPstream::read buf, bufSize, MPI_PACKED, - procID(fromProcNo), + fromProcNo, tag, - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator], &request ) ) diff --git a/src/Pstream/mpi/UOPwrite.C b/src/Pstream/mpi/UOPwrite.C index 2a1b3f71950..24f425e38b6 100644 --- a/src/Pstream/mpi/UOPwrite.C +++ b/src/Pstream/mpi/UOPwrite.C @@ -39,17 +39,22 @@ bool Foam::UOPstream::write const int toProcNo, const char* buf, const std::streamsize bufSize, - const int tag + const int tag, + const label communicator ) { if (debug) { Pout<< "UOPstream::write : starting write to:" << toProcNo - << " tag:" << tag << " size:" << label(bufSize) + << " tag:" << tag + << " comm:" << communicator << " size:" << label(bufSize) << " commsType:" << UPstream::commsTypeNames[commsType] << Foam::endl; } + PstreamGlobals::checkCommunicator(communicator, toProcNo); + + bool transferFailed = true; if (commsType == blocking) @@ -59,9 +64,9 @@ bool Foam::UOPstream::write const_cast<char*>(buf), bufSize, MPI_PACKED, - procID(toProcNo), + toProcNo, //procID(toProcNo), tag, - MPI_COMM_WORLD + PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD ); if (debug) @@ -79,9 +84,9 @@ bool Foam::UOPstream::write const_cast<char*>(buf), bufSize, MPI_PACKED, - procID(toProcNo), + toProcNo, //procID(toProcNo), tag, - MPI_COMM_WORLD + PstreamGlobals::MPICommunicators_[communicator] //MPI_COMM_WORLD ); if (debug) @@ -101,9 +106,9 @@ bool Foam::UOPstream::write const_cast<char*>(buf), bufSize, MPI_PACKED, - procID(toProcNo), + toProcNo, //procID(toProcNo), tag, - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator],//MPI_COMM_WORLD, &request ); diff --git a/src/Pstream/mpi/UPstream.C b/src/Pstream/mpi/UPstream.C index bf7af5dd12f..efe84884838 100644 --- a/src/Pstream/mpi/UPstream.C +++ b/src/Pstream/mpi/UPstream.C @@ -2,7 +2,7 @@ ========= | \\ / F ield | OpenFOAM: The Open Source CFD Toolbox \\ / O peration | - \\ / A nd | Copyright (C) 2011-2012 OpenFOAM Foundation + \\ / A nd | Copyright (C) 2011-2013 OpenFOAM Foundation \\/ M anipulation | ------------------------------------------------------------------------------- License @@ -55,7 +55,6 @@ void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions) validParOptions.insert("p4wd", "directory"); validParOptions.insert("p4amslave", ""); validParOptions.insert("p4yourname", "hostname"); - validParOptions.insert("GAMMANP", "number of instances"); validParOptions.insert("machinefile", "machine file"); } @@ -66,12 +65,13 @@ bool Foam::UPstream::init(int& argc, char**& argv) int numprocs; MPI_Comm_size(MPI_COMM_WORLD, &numprocs); - MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_); + int myRank; + MPI_Comm_rank(MPI_COMM_WORLD, &myRank); if (debug) { Pout<< "UPstream::init : initialised with numProcs:" << numprocs - << " myProcNo:" << myProcNo_ << endl; + << " myRank:" << myRank << endl; } if (numprocs <= 1) @@ -82,14 +82,9 @@ bool Foam::UPstream::init(int& argc, char**& argv) << Foam::abort(FatalError); } - procIDs_.setSize(numprocs); - forAll(procIDs_, procNo) - { - procIDs_[procNo] = procNo; - } - - setParRun(); + // Initialise parallel structure + setParRun(numprocs); # ifndef SGIMPI string bufferSizeName = getEnv("MPI_BUFFER_SIZE"); @@ -116,11 +111,9 @@ bool Foam::UPstream::init(int& argc, char**& argv) char processorName[MPI_MAX_PROCESSOR_NAME]; MPI_Get_processor_name(processorName, &processorNameLen); + processorName[processorNameLen] = '\0'; - //signal(SIGABRT, stop); - - // Now that nprocs is known construct communication tables. - initCommunicationSchedule(); + Pout<< "Processor name:" << processorName << endl; return true; } @@ -153,6 +146,15 @@ void Foam::UPstream::exit(int errnum) << endl; } + // Clean mpi communicators + forAll(myProcNo_, communicator) + { + if (myProcNo_[communicator] != -1) + { + freePstreamCommunicator(communicator); + } + } + if (errnum == 0) { MPI_Finalize(); @@ -171,21 +173,39 @@ void Foam::UPstream::abort() } -void Foam::reduce(scalar& Value, const sumOp<scalar>& bop, const int tag) +void Foam::reduce +( + scalar& Value, + const sumOp<scalar>& bop, + const int tag, + const label communicator +) { - allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag); + allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator); } -void Foam::reduce(scalar& Value, const minOp<scalar>& bop, const int tag) +void Foam::reduce +( + scalar& Value, + const minOp<scalar>& bop, + const int tag, + const label communicator +) { - allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag); + allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator); } -void Foam::reduce(vector2D& Value, const sumOp<vector2D>& bop, const int tag) +void Foam::reduce +( + vector2D& Value, + const sumOp<vector2D>& bop, + const int tag, + const label communicator +) { - allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag); + allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator); } @@ -193,11 +213,12 @@ void Foam::sumReduce ( scalar& Value, label& Count, - const int tag + const int tag, + const label communicator ) { vector2D twoScalars(Value, scalar(Count)); - reduce(twoScalars, sumOp<vector2D>()); + reduce(twoScalars, sumOp<vector2D>(), tag, communicator); Value = twoScalars.x(); Count = twoScalars.y(); @@ -209,6 +230,7 @@ void Foam::reduce scalar& Value, const sumOp<scalar>& bop, const int tag, + const label communicator, label& requestID ) { @@ -225,7 +247,7 @@ void Foam::reduce MPI_SCALAR, MPI_SUM, 0, //root - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator], &request ); @@ -233,12 +255,141 @@ void Foam::reduce PstreamGlobals::outstandingRequests_.append(request); #else // Non-blocking not yet implemented in mpi - reduce(Value, bop, tag); + reduce(Value, bop, tag, communicator); requestID = -1; #endif } +void Foam::UPstream::allocatePstreamCommunicator +( + const label parentIndex, + const label index +) +{ + if (index == PstreamGlobals::MPIGroups_.size()) + { + // Extend storage with dummy values + MPI_Group newGroup; + PstreamGlobals::MPIGroups_.append(newGroup); + MPI_Comm newComm; + PstreamGlobals::MPICommunicators_.append(newComm); + } + else if (index > PstreamGlobals::MPIGroups_.size()) + { + FatalErrorIn + ( + "UPstream::allocatePstreamCommunicator\n" + "(\n" + " const label parentIndex,\n" + " const labelList& subRanks\n" + ")\n" + ) << "PstreamGlobals out of sync with UPstream data. Problem." + << Foam::exit(FatalError); + } + + + if (parentIndex == -1) + { + // Allocate world communicator + + //std::cout + // << "MPI : Allocating world communicator at index " << index + // << std::endl; + + if (index != UPstream::worldComm) + { + FatalErrorIn + ( + "UPstream::allocateCommunicator\n" + "(\n" + " const label parentIndex,\n" + " const labelList& subRanks\n" + ")\n" + ) << "world communicator should always be index " + << UPstream::worldComm << Foam::exit(FatalError); + } + + PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD; + MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]); + MPI_Comm_rank + ( + PstreamGlobals::MPICommunicators_[index], + &myProcNo_[index] + ); + + // Set the number of processes to the actual number + int numProcs; + MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs); + procIDs_[index] = identity(numProcs); + } + else + { + //std::cout + // << "MPI : Allocating new communicator at index " << index + // << " from parent " << parentIndex + // << std::endl; + + // Create new group + MPI_Group_incl + ( + PstreamGlobals::MPIGroups_[parentIndex], + procIDs_[index].size(), + procIDs_[index].begin(), + &PstreamGlobals::MPIGroups_[index] + ); + + //std::cout + // << "MPI : New group " << long(PstreamGlobals::MPIGroups_[index]) + // << std::endl; + + + // Create new communicator + MPI_Comm_create + ( + PstreamGlobals::MPICommunicators_[parentIndex], + PstreamGlobals::MPIGroups_[index], + &PstreamGlobals::MPICommunicators_[index] + ); + + if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL) + { + //std::cout + // << "MPI : NULL : not in group" + // << std::endl; + myProcNo_[index] = -1; + } + else + { + //std::cout + // << "MPI : New comm " + // << long(PstreamGlobals::MPICommunicators_[index]) + // << std::endl; + MPI_Comm_rank + ( + PstreamGlobals::MPICommunicators_[index], + &myProcNo_[index] + ); + } + } + + //std::cout<< "MPI : I am rank " << myProcNo_[index] << std::endl; +} + + +void Foam::UPstream::freePstreamCommunicator(const label communicator) +{ + if (communicator != UPstream::worldComm) + { + if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL) + { + MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]); + } + MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]); + } +} + + Foam::label Foam::UPstream::nRequests() { return PstreamGlobals::outstandingRequests_.size(); diff --git a/src/Pstream/mpi/allReduce.H b/src/Pstream/mpi/allReduce.H index bb4c6931136..53bd102ae12 100644 --- a/src/Pstream/mpi/allReduce.H +++ b/src/Pstream/mpi/allReduce.H @@ -52,7 +52,8 @@ void allReduce MPI_Datatype MPIType, MPI_Op op, const BinaryOp& bop, - const int tag + const int tag, + const label communicator ); // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/Pstream/mpi/allReduceTemplates.C b/src/Pstream/mpi/allReduceTemplates.C index 98b8d97b0aa..9271e8b90be 100644 --- a/src/Pstream/mpi/allReduceTemplates.C +++ b/src/Pstream/mpi/allReduceTemplates.C @@ -35,7 +35,8 @@ void Foam::allReduce MPI_Datatype MPIType, MPI_Op MPIOp, const BinaryOp& bop, - const int tag + const int tag, + const label communicator ) { if (!UPstream::parRun()) @@ -43,14 +44,14 @@ void Foam::allReduce return; } - if (UPstream::nProcs() <= UPstream::nProcsSimpleSum) + if (UPstream::nProcs(communicator) <= UPstream::nProcsSimpleSum) { - if (UPstream::master()) + if (UPstream::master(communicator)) { for ( int slave=UPstream::firstSlave(); - slave<=UPstream::lastSlave(); + slave<=UPstream::lastSlave(communicator); slave++ ) { @@ -63,9 +64,9 @@ void Foam::allReduce &value, MPICount, MPIType, - UPstream::procID(slave), + slave, //UPstream::procID(slave), tag, - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator], MPI_STATUS_IGNORE ) ) @@ -97,9 +98,9 @@ void Foam::allReduce &Value, MPICount, MPIType, - UPstream::procID(UPstream::masterNo()), + UPstream::masterNo(),//UPstream::procID(masterNo()), tag, - MPI_COMM_WORLD + PstreamGlobals::MPICommunicators_[communicator] ) ) { @@ -120,12 +121,12 @@ void Foam::allReduce } - if (UPstream::master()) + if (UPstream::master(communicator)) { for ( int slave=UPstream::firstSlave(); - slave<=UPstream::lastSlave(); + slave<=UPstream::lastSlave(communicator); slave++ ) { @@ -136,9 +137,9 @@ void Foam::allReduce &Value, MPICount, MPIType, - UPstream::procID(slave), + slave, //UPstream::procID(slave), tag, - MPI_COMM_WORLD + PstreamGlobals::MPICommunicators_[communicator] ) ) { @@ -167,9 +168,9 @@ void Foam::allReduce &Value, MPICount, MPIType, - UPstream::procID(UPstream::masterNo()), + UPstream::masterNo(),//UPstream::procID(masterNo()), tag, - MPI_COMM_WORLD, + PstreamGlobals::MPICommunicators_[communicator], MPI_STATUS_IGNORE ) ) @@ -193,7 +194,15 @@ void Foam::allReduce else { Type sum; - MPI_Allreduce(&Value, &sum, MPICount, MPIType, MPIOp, MPI_COMM_WORLD); + MPI_Allreduce + ( + &Value, + &sum, + MPICount, + MPIType, + MPIOp, + PstreamGlobals::MPICommunicators_[communicator] + ); Value = sum; } } diff --git a/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C b/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C index a21cfb6300a..f3e97ba285c 100644 --- a/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C +++ b/src/dynamicMesh/fvMeshDistribute/fvMeshDistribute.C @@ -945,6 +945,7 @@ void Foam::fvMeshDistribute::addProcPatches mesh_.nFaces(), mesh_.boundaryMesh().size(), mesh_.boundaryMesh(), + Pstream::worldComm, Pstream::myProcNo(), nbrProc[bFaceI] ); @@ -988,6 +989,7 @@ void Foam::fvMeshDistribute::addProcPatches mesh_.nFaces(), mesh_.boundaryMesh().size(), mesh_.boundaryMesh(), + Pstream::worldComm, Pstream::myProcNo(), nbrProc[bFaceI], cycName, diff --git a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.C b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.C index c9fb2563c16..b0c2cd26d3f 100644 --- a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.C +++ b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.C @@ -244,23 +244,25 @@ void processorFvPatchField<Type>::initEvaluate // Fast path. Receive into *this this->setSize(sendBuf_.size()); outstandingRecvRequest_ = UPstream::nRequests(); - IPstream::read + UIPstream::read ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<char*>(this->begin()), this->byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); outstandingSendRequest_ = UPstream::nRequests(); - OPstream::write + UOPstream::write ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<const char*>(sendBuf_.begin()), this->byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); } else @@ -342,23 +344,25 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate scalarReceiveBuf_.setSize(scalarSendBuf_.size()); outstandingRecvRequest_ = UPstream::nRequests(); - IPstream::read + UIPstream::read ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<char*>(scalarReceiveBuf_.begin()), scalarReceiveBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); outstandingSendRequest_ = UPstream::nRequests(); - OPstream::write + UOPstream::write ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<const char*>(scalarSendBuf_.begin()), scalarSendBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); } else @@ -467,7 +471,8 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate procPatch_.neighbProcNo(), reinterpret_cast<char*>(receiveBuf_.begin()), receiveBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); outstandingSendRequest_ = UPstream::nRequests(); @@ -477,7 +482,8 @@ void processorFvPatchField<Type>::initInterfaceMatrixUpdate procPatch_.neighbProcNo(), reinterpret_cast<const char*>(sendBuf_.begin()), sendBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); } else diff --git a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.H b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.H index a913aaf6067..848b8e51655 100644 --- a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.H +++ b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchField.H @@ -244,6 +244,12 @@ public: //- Processor coupled interface functions + //- Return communicator used for comms + virtual int comm() const + { + return procPatch_.comm(); + } + //- Return processor number virtual int myProcNo() const { diff --git a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchScalarField.C b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchScalarField.C index 82dcf7ca36e..2d51603100d 100644 --- a/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchScalarField.C +++ b/src/finiteVolume/fields/fvPatchFields/constraint/processor/processorFvPatchScalarField.C @@ -60,23 +60,25 @@ void processorFvPatchField<scalar>::initInterfaceMatrixUpdate scalarReceiveBuf_.setSize(scalarSendBuf_.size()); outstandingRecvRequest_ = UPstream::nRequests(); - IPstream::read + UIPstream::read ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<char*>(scalarReceiveBuf_.begin()), scalarReceiveBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); outstandingSendRequest_ = UPstream::nRequests(); - OPstream::write + UOPstream::write ( Pstream::nonBlocking, procPatch_.neighbProcNo(), reinterpret_cast<const char*>(scalarSendBuf_.begin()), scalarSendBuf_.byteSize(), - procPatch_.tag() + procPatch_.tag(), + procPatch_.comm() ); } else diff --git a/src/finiteVolume/fvMesh/fvPatches/constraint/processor/processorFvPatch.H b/src/finiteVolume/fvMesh/fvPatches/constraint/processor/processorFvPatch.H index d0f3b04c7cc..5df5a25f77b 100644 --- a/src/finiteVolume/fvMesh/fvPatches/constraint/processor/processorFvPatch.H +++ b/src/finiteVolume/fvMesh/fvPatches/constraint/processor/processorFvPatch.H @@ -84,14 +84,20 @@ public: // Member functions + //- Return communicator used for comms + virtual int comm() const + { + return procPolyPatch_.comm(); + } + //- Return processor number - int myProcNo() const + virtual int myProcNo() const { return procPolyPatch_.myProcNo(); } //- Return neigbour processor number - int neighbProcNo() const + virtual int neighbProcNo() const { return procPolyPatch_.neighbProcNo(); } -- GitLab