diff --git a/applications/test/gatherValues1/Make/files b/applications/test/gatherValues1/Make/files index 31bbb718f07c5c733722d87a4605e1564e6da456..b5fc79cbe0972f19814ba5d7fbcf23d3a89a6599 100644 --- a/applications/test/gatherValues1/Make/files +++ b/applications/test/gatherValues1/Make/files @@ -1,3 +1,3 @@ -Test-gatherValues1.C +Test-gatherValues1.cxx EXE = $(FOAM_USER_APPBIN)/Test-gatherValues1 diff --git a/applications/test/gatherValues1/Test-gatherValues1.C b/applications/test/gatherValues1/Test-gatherValues1.cxx similarity index 81% rename from applications/test/gatherValues1/Test-gatherValues1.C rename to applications/test/gatherValues1/Test-gatherValues1.cxx index 69ce09ce1227a625a11eed11a936da14d96029aa..02bb8597edda92ea01fae581fd7a2a20fa6042ea 100644 --- a/applications/test/gatherValues1/Test-gatherValues1.C +++ b/applications/test/gatherValues1/Test-gatherValues1.cxx @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2021-2023 OpenCFD Ltd. + Copyright (C) 2021-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -52,7 +52,7 @@ int main(int argc, char *argv[]) const labelList localValues ( - identity(2 *(Pstream::myProcNo()+1), -5*Pstream::myProcNo()) + identity(2 *(UPstream::myProcNo()+1), -5*UPstream::myProcNo()) ); // Test resize @@ -76,8 +76,8 @@ int main(int argc, char *argv[]) // One-sided sizing! master only const globalIndex allProcAddr ( - sendData.size(), - globalIndex::gatherOnly{} + globalIndex::gatherOnly{}, + sendData.size() ); Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl; @@ -98,8 +98,8 @@ int main(int argc, char *argv[]) // One-sided sizing! master only const globalIndex allProcAddr ( - sendData.size(), - globalIndex::gatherOnly{} + globalIndex::gatherOnly{}, + sendData.size() ); Pout<< "listGather sizes: " << flatOutput(allProcAddr.sizes()) << nl; @@ -116,7 +116,7 @@ int main(int argc, char *argv[]) { const labelList::subList& sendData = ( - Pstream::master() + UPstream::master() ? SubList<label>(localValues, 0) // exclude : SubList<label>(localValues) ); @@ -147,11 +147,11 @@ int main(int argc, char *argv[]) << UPstream::listScatterValues(subProcAddr.offsets()) << nl; - Pout<< endl << "local list [" << Pstream::myProcNo() << "] " + Pout<< endl << "local list [" << UPstream::myProcNo() << "] " << flatOutput(localValues) << nl; - Pout<< endl << "local send [" << Pstream::myProcNo() << "] " + Pout<< endl << "local send [" << UPstream::myProcNo() << "] " << sendSize << nl; @@ -163,7 +163,7 @@ int main(int argc, char *argv[]) Pout<< "off-proc: " << allValues << endl; - if (Pstream::master()) + if (UPstream::master()) { Info<< "master: " << flatOutput(localValues) << nl; @@ -196,7 +196,7 @@ int main(int argc, char *argv[]) { globalIndex glob ( - globalIndex:gatherNone{}, + globalIndex::gatherNone{}, labelList(Foam::one{}, 0) ); Info<< "single:" << nl; @@ -208,35 +208,37 @@ int main(int argc, char *argv[]) } } - // This will likely fail - not declared as is_contiguous - // Cannot even catch since it triggers an abort() - - #if 0 + // Non-contiguous gather - use Pstream, not UPstream! { - std::pair<label,vector> sendData(Pstream::myProcNo(), vector::one); + typedef std::pair<label,vector> valueType; - const bool oldThrowingError = FatalError.throwing(true); + valueType sendData(UPstream::myProcNo(), vector::one); - try - { - List<std::pair<label,vector>> countValues - ( - UPstream::listGatherValues<std::pair<label, vector>> - ( - sendData - ) - ); + List<valueType> countValues + ( + Pstream::listGatherValues(sendData) + ); - Pout<< "listGather: " << flatOutput(countValues) << nl; - } - catch (const Foam::error& err) + Pout<< "listGather: " << flatOutput(countValues) << nl; + } + + // Non-contiguous scatter - use Pstream, not UPstream! + { + List<fileName> allValues; + + if (UPstream::master()) { - Info<< err.message().c_str() << nl; + allValues.resize(UPstream::nProcs()); + forAll(allValues, proci) + { + allValues[proci] = "processor" + Foam::name(proci); + } } - FatalError.throwing(oldThrowingError); + fileName procName = Pstream::listScatterValues(allValues); + + Pout<< "listScatter: " << procName << nl; } - #endif Info<< "\nEnd\n" << endl; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index dbcfe69c56d8614ebd17fe06cea35e614e977050..fe251cbf91ec1659ec83ee75dca18cf93a0fc3a6 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -47,9 +47,6 @@ SourceFiles #include "UPstream.H" #include "DynamicList.H" -// Legacy -// #define Foam_Pstream_scatter_nobroadcast - // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // namespace Foam @@ -130,22 +127,9 @@ public: // Gather - //- Gather (reduce) data, appyling \c bop to combine \c value - //- from different processors. The basis for Foam::reduce(). - // Uses the specified communication schedule. - template<class T, class BinaryOp> - static void gather - ( - const List<commsStruct>& comms, - T& value, - const BinaryOp& bop, - const int tag, - const label comm - ); - //- Gather (reduce) data, applying \c bop to combine \c value //- from different processors. The basis for Foam::reduce(). - // Uses linear/tree communication. + // Uses linear/tree communication (with parallel guard). template<class T, class BinaryOp> static void gather ( @@ -155,6 +139,34 @@ public: const label comm = UPstream::worldComm ); + //- Gather individual values into list locations. + // On master list length == nProcs, otherwise zero length. + // \n + // For \b non-parallel : + // the returned list length is 1 with localValue. + template<class T> + static List<T> listGatherValues + ( + const T& localValue, + const label comm = UPstream::worldComm, + //! Only used for non-contiguous types + const int tag = UPstream::msgType() + ); + + //- Scatter individual values from list locations. + // On master input list length == nProcs, ignored on other procs. + // \n + // For \b non-parallel : + // returns the first list element (or default initialized). + template<class T> + static T listScatterValues + ( + const UList<T>& allValues, + const label comm = UPstream::worldComm, + //! Only used for non-contiguous types + const int tag = UPstream::msgType() + ); + // Gather/combine data // Inplace combine values from processors. @@ -162,20 +174,7 @@ public: //- Gather data, applying \c cop to inplace combine \c value //- from different processors. - // Uses the specified communication schedule. - template<class T, class CombineOp> - static void combineGather - ( - const List<commsStruct>& comms, - T& value, - const CombineOp& cop, - const int tag, - const label comm - ); - - //- Gather data, applying \c cop to inplace combine \c value - //- from different processors. - // Uses linear/tree communication. + // Uses linear/tree communication (with parallel guard). template<class T, class CombineOp> static void combineGather ( @@ -185,22 +184,6 @@ public: const label comm = UPstream::worldComm ); - //- Reduce inplace (cf. MPI Allreduce) - //- applying \c cop to inplace combine \c value - //- from different processors. - //- After completion all processors have the same data. - // Uses the specified communication schedule. - // Wraps combineGather/broadcast (may change in the future). - template<class T, class CombineOp> - static void combineReduce - ( - const List<commsStruct>& comms, - T& value, - const CombineOp& cop, - const int tag = UPstream::msgType(), - const label comm = UPstream::worldComm - ); - //- Reduce inplace (cf. MPI Allreduce) //- applying \c cop to inplace combine \c value //- from different processors. @@ -232,17 +215,8 @@ public: // Combine variants working on whole List at a time. - template<class T, class CombineOp> - static void listCombineGather - ( - const List<commsStruct>& comms, - List<T>& values, - const CombineOp& cop, - const int tag, - const label comm - ); - - //- Like above but switches between linear/tree communication + //- Combines List elements. + // Uses linear/tree communication (with parallel guard). template<class T, class CombineOp> static void listCombineGather ( @@ -252,7 +226,9 @@ public: const label comm = UPstream::worldComm ); + //- Combines List elements. //- After completion all processors have the same data. + // Uses linear/tree communication (with parallel guard). template<class T, class CombineOp> static void listCombineReduce ( @@ -279,17 +255,8 @@ public: // Combine variants working on whole map at a time. // Container needs iterators, find() and insert methods defined. - template<class Container, class CombineOp> - static void mapCombineGather - ( - const List<commsStruct>& comms, - Container& values, - const CombineOp& cop, - const int tag, - const label comm - ); - - //- Like above but switches between linear/tree communication + //- Combine Map elements. + // Uses linear/tree communication (with parallel guard). template<class Container, class CombineOp> static void mapCombineGather ( @@ -355,7 +322,7 @@ public: ); //- Gather data, but keep individual values separate. - //- Uses linear/tree communication. + //- Uses MPI_Allgather or manual linear/tree communication. // After completion all processors have the same data. // Wraps gatherList/scatterList (may change in the future). template<class T> @@ -369,91 +336,43 @@ public: // Scatter - //- Broadcast data: Distribute without modification. - // \note comms and tag parameters only used when - // Foam_Pstream_scatter_nobroadcast is defined + //- Broadcast data template<class T> + FOAM_DEPRECATED_FOR(2024-01, "broadcast()") static void scatter ( - const List<commsStruct>& comms, T& value, - const int tag, - const label comm - ); - - //- Broadcast data: Distribute without modification. - // \note tag parameter only used when - // Foam_Pstream_scatter_nobroadcast is defined - template<class T> - static void scatter - ( - T& value, - const int tag = UPstream::msgType(), + const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); - //- Broadcast data: Distribute without modification. - // \note tag parameter only used when - // Foam_Pstream_scatter_nobroadcast is defined + //- Broadcast data template<class T> + FOAM_DEPRECATED_FOR(2024-01, "broadcast()") static void combineScatter ( - const List<commsStruct>& comms, T& value, - const int tag, - const label comm - ); - - //- Broadcast data: Distribute without modification. - // \note tag parameter only used when - // Foam_Pstream_scatter_nobroadcast is defined - template<class T> - static void combineScatter - ( - T& value, - const int tag = UPstream::msgType(), + const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); - //- Broadcast data: Distribute without modification. - // \note comms and tag parameters only used when - // Foam_Pstream_scatter_nobroadcast is defined + //- Broadcast data template<class T> + FOAM_DEPRECATED_FOR(2024-01, "broadcast()") static void listCombineScatter ( - const List<commsStruct>& comms, List<T>& value, - const int tag, - const label comm - ); - - //- Broadcast data: Distribute without modification. - // \note comms and tag parameters only used when - // Foam_Pstream_scatter_nobroadcast is defined - template<class T> - static void listCombineScatter - ( - List<T>& value, - const int tag = UPstream::msgType(), + const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); - //- Broadcast data: Distribute without modification. - template<class Container> - static void mapCombineScatter - ( - const List<commsStruct>& comms, - Container& values, - const int tag, - const label comm - ); - - //- Like above but switches between linear/tree communication + //- Broadcast data template<class Container> + FOAM_DEPRECATED_FOR(2024-01, "broadcast()") static void mapCombineScatter ( Container& values, - const int tag = UPstream::msgType(), + const int tag = UPstream::msgType(), //!< ignored const label comm = UPstream::worldComm ); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C index 1f39cca6931c0068645d519a85ee96ac8447ff94..e9e2ce98bc748f7a0702ef5cc24010e003fe3cb0 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022-2023 OpenCFD Ltd. + Copyright (C) 2022-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -137,4 +137,28 @@ void Foam::Pstream::broadcastList(ListType& list, const label comm) } +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +// Convenience wrappers - defined after all specialisations are known + +namespace Foam +{ + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +//- Return a broadcasted value (uses a copy internally) +template<class Type> +Type returnBroadcast(const Type& value, const label comm) +{ + Type work(value); + Pstream::broadcast(work, comm); + return work; +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +} // End namespace Foam + + // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C index 41d427635b6b127699a9abe4559f54784d2a1e57..4696d00592850ad091c4ff7eb958e590e00ca899 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2023 OpenCFD Ltd. + Copyright (C) 2019-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -25,7 +25,7 @@ License along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. Description - Variant of gather, scatter. + Variant of gather. Normal gather uses: - default construct and read (>>) from Istream - binary operator and assignment operator to combine values @@ -46,7 +46,6 @@ Description template<class T, class CombineOp> void Foam::Pstream::combineGather ( - const List<UPstream::commsStruct>& comms, T& value, const CombineOp& cop, const int tag, @@ -55,8 +54,10 @@ void Foam::Pstream::combineGather { if (UPstream::is_parallel(comm)) { - // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + // Communication order + const auto& comms = UPstream::whichCommunication(comm); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours for (const label belowID : myComm.below()) @@ -89,7 +90,7 @@ void Foam::Pstream::combineGather ( UPstream::commsTypes::scheduled, belowID, - 0, + 0, // bufsize tag, comm ); @@ -132,7 +133,7 @@ void Foam::Pstream::combineGather ( UPstream::commsTypes::scheduled, myComm.above(), - 0, + 0, // bufsize tag, comm ); @@ -143,144 +144,6 @@ void Foam::Pstream::combineGather } -template<class T> -void Foam::Pstream::combineScatter -( - const List<UPstream::commsStruct>& comms, - T& value, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(value, comm); - #else - if (UPstream::is_parallel(comm)) - { - // My communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from up - if (myComm.above() != -1) - { - if (is_contiguous<T>::value) - { - UIPstream::read - ( - UPstream::commsTypes::scheduled, - myComm.above(), - reinterpret_cast<char*>(&value), - sizeof(T), - tag, - comm - ); - } - else - { - IPstream fromAbove - ( - UPstream::commsTypes::scheduled, - myComm.above(), - 0, - tag, - comm - ); - value = T(fromAbove); - } - } - - // Send to my downstairs neighbours - forAllReverse(myComm.below(), belowI) - { - const label belowID = myComm.below()[belowI]; - - if (is_contiguous<T>::value) - { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - belowID, - reinterpret_cast<const char*>(&value), - sizeof(T), - tag, - comm - ); - } - else - { - OPstream toBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, - tag, - comm - ); - toBelow << value; - } - } - } - #endif -} - - -template<class T, class CombineOp> -void Foam::Pstream::combineGather -( - T& value, - const CombineOp& cop, - const int tag, - const label comm -) -{ - Pstream::combineGather - ( - UPstream::whichCommunication(comm), - value, - cop, - tag, - comm - ); -} - - -template<class T> -void Foam::Pstream::combineScatter -( - T& value, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(value, comm); - #else - Pstream::combineScatter - ( - UPstream::whichCommunication(comm), - value, - tag, - comm - ); - #endif -} - - -template<class T, class CombineOp> -void Foam::Pstream::combineReduce -( - const List<UPstream::commsStruct>& comms, - T& value, - const CombineOp& cop, - const int tag, - const label comm -) -{ - Pstream::combineGather(comms, value, cop, tag, comm); - Pstream::broadcast(value, comm); -} - - template<class T, class CombineOp> void Foam::Pstream::combineReduce ( @@ -292,9 +155,7 @@ void Foam::Pstream::combineReduce { if (UPstream::is_parallel(comm)) { - const auto& comms = UPstream::whichCommunication(comm); - - Pstream::combineGather(comms, value, cop, tag, comm); + Pstream::combineGather(value, cop, tag, comm); Pstream::broadcast(value, comm); } } @@ -305,7 +166,6 @@ void Foam::Pstream::combineReduce template<class T, class CombineOp> void Foam::Pstream::listCombineGather ( - const List<UPstream::commsStruct>& comms, List<T>& values, const CombineOp& cop, const int tag, @@ -314,8 +174,10 @@ void Foam::Pstream::listCombineGather { if (UPstream::is_parallel(comm)) { - // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + // Communication order + const auto& comms = UPstream::whichCommunication(comm); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours for (const label belowID : myComm.below()) @@ -351,7 +213,7 @@ void Foam::Pstream::listCombineGather ( UPstream::commsTypes::scheduled, belowID, - 0, + 0, // bufsize tag, comm ); @@ -397,7 +259,7 @@ void Foam::Pstream::listCombineGather ( UPstream::commsTypes::scheduled, myComm.above(), - 0, + 0, // bufsize tag, comm ); @@ -408,129 +270,6 @@ void Foam::Pstream::listCombineGather } -template<class T> -void Foam::Pstream::listCombineScatter -( - const List<UPstream::commsStruct>& comms, - List<T>& values, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(values, comm); - #else - if (UPstream::is_parallel(comm)) - { - // My communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from up - if (myComm.above() != -1) - { - if (is_contiguous<T>::value) - { - UIPstream::read - ( - UPstream::commsTypes::scheduled, - myComm.above(), - values.data_bytes(), - values.size_bytes(), - tag, - comm - ); - } - else - { - IPstream fromAbove - ( - UPstream::commsTypes::scheduled, - myComm.above(), - 0, - tag, - comm - ); - fromAbove >> values; - } - } - - // Send to my downstairs neighbours - forAllReverse(myComm.below(), belowI) - { - const label belowID = myComm.below()[belowI]; - - if (is_contiguous<T>::value) - { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - belowID, - values.cdata_bytes(), - values.size_bytes(), - tag, - comm - ); - } - else - { - OPstream toBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, - tag, - comm - ); - toBelow << values; - } - } - } - #endif -} - - -template<class T, class CombineOp> -void Foam::Pstream::listCombineGather -( - List<T>& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - Pstream::listCombineGather - ( - UPstream::whichCommunication(comm), - values, - cop, - tag, - comm - ); -} - - -template<class T> -void Foam::Pstream::listCombineScatter -( - List<T>& values, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(values, comm); - #else - Pstream::listCombineScatter - ( - UPstream::whichCommunication(comm), - values, - tag, - comm - ); - #endif -} - - template<class T, class CombineOp> void Foam::Pstream::listCombineReduce ( @@ -542,9 +281,7 @@ void Foam::Pstream::listCombineReduce { if (UPstream::is_parallel(comm)) { - const auto& comms = UPstream::whichCommunication(comm); - - Pstream::listCombineGather(comms, values, cop, tag, comm); + Pstream::listCombineGather(values, cop, tag, comm); Pstream::broadcast(values, comm); } } @@ -555,7 +292,6 @@ void Foam::Pstream::listCombineReduce template<class Container, class CombineOp> void Foam::Pstream::mapCombineGather ( - const List<UPstream::commsStruct>& comms, Container& values, const CombineOp& cop, const int tag, @@ -564,8 +300,10 @@ void Foam::Pstream::mapCombineGather { if (UPstream::is_parallel(comm)) { - // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + // Communication order + const auto& comms = UPstream::whichCommunication(comm); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours for (const label belowID : myComm.below()) @@ -576,7 +314,7 @@ void Foam::Pstream::mapCombineGather ( UPstream::commsTypes::scheduled, belowID, - 0, + 0, // bufsize tag, comm ); @@ -623,7 +361,7 @@ void Foam::Pstream::mapCombineGather ( UPstream::commsTypes::scheduled, myComm.above(), - 0, + 0, // bufsize tag, comm ); @@ -633,110 +371,6 @@ void Foam::Pstream::mapCombineGather } -template<class Container> -void Foam::Pstream::mapCombineScatter -( - const List<UPstream::commsStruct>& comms, - Container& values, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(values, comm); - #else - if (UPstream::is_parallel(comm)) - { - // My communication order - const UPstream::commsStruct& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from up - if (myComm.above() != -1) - { - IPstream fromAbove - ( - UPstream::commsTypes::scheduled, - myComm.above(), - 0, - tag, - comm - ); - fromAbove >> values; - - if (debug & 2) - { - Pout<< " received from " - << myComm.above() << " data:" << values << endl; - } - } - - // Send to my downstairs neighbours - forAllReverse(myComm.below(), belowI) - { - const label belowID = myComm.below()[belowI]; - - if (debug & 2) - { - Pout<< " sending to " << belowID << " data:" << values << endl; - } - - OPstream toBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, - tag, - comm - ); - toBelow << values; - } - } - #endif -} - - -template<class Container, class CombineOp> -void Foam::Pstream::mapCombineGather -( - Container& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - Pstream::mapCombineGather - ( - UPstream::whichCommunication(comm), - values, - cop, - tag, - comm - ); -} - - -template<class Container> -void Foam::Pstream::mapCombineScatter -( - Container& values, - const int tag, - const label comm -) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(values, comm); - #else - Pstream::mapCombineScatter - ( - UPstream::whichCommunication(comm), - values, - tag, - comm - ); - #endif -} - - template<class Container, class CombineOp> void Foam::Pstream::mapCombineReduce ( @@ -748,9 +382,7 @@ void Foam::Pstream::mapCombineReduce { if (UPstream::is_parallel(comm)) { - const auto& comms = UPstream::whichCommunication(comm); - - Pstream::mapCombineGather(comms, values, cop, tag, comm); + Pstream::mapCombineGather(values, cop, tag, comm); Pstream::broadcast(values, comm); } } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C index daeb052876f00fe5ad961940cd308c612eef9779..f879f912838e839ff6328861fe952ab01280b638 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2022 OpenCFD Ltd. + Copyright (C) 2019-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -41,7 +41,6 @@ Description template<class T, class BinaryOp> void Foam::Pstream::gather ( - const List<UPstream::commsStruct>& comms, T& value, const BinaryOp& bop, const int tag, @@ -50,8 +49,10 @@ void Foam::Pstream::gather { if (UPstream::is_parallel(comm)) { - // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + // Communication order + const auto& comms = UPstream::whichCommunication(comm); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours for (const label belowID : myComm.below()) @@ -76,7 +77,7 @@ void Foam::Pstream::gather ( UPstream::commsTypes::scheduled, belowID, - 0, + 0, // bufsize tag, comm ); @@ -107,7 +108,7 @@ void Foam::Pstream::gather ( UPstream::commsTypes::scheduled, myComm.above(), - 0, + 0, // bufsize tag, comm ); @@ -119,110 +120,181 @@ void Foam::Pstream::gather template<class T> -void Foam::Pstream::scatter +Foam::List<T> Foam::Pstream::listGatherValues ( - const List<UPstream::commsStruct>& comms, - T& value, - const int tag, - const label comm + const T& localValue, + const label comm, + const int tag ) { - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(value, comm); - #else + // OR + // if (is_contiguous<T>::value) + // { + // return UPstream::listGatherValues(localValue, comm); + // } + + List<T> allValues; + if (UPstream::is_parallel(comm)) { - // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + const label numProc = UPstream::nProcs(comm); - // Receive from up - if (myComm.above() != -1) + if (UPstream::master(comm)) { - if (is_contiguous<T>::value) + allValues.resize(numProc); + } + + if (is_contiguous<T>::value) + { + UPstream::mpiGather + ( + reinterpret_cast<const char*>(&localValue), + allValues.data_bytes(), + sizeof(T), // The send/recv size per rank + comm + ); + } + else + { + if (UPstream::master(comm)) { - UIPstream::read - ( - UPstream::commsTypes::scheduled, - myComm.above(), - reinterpret_cast<char*>(&value), - sizeof(T), - tag, - comm - ); + // Non-trivial to manage non-blocking gather without a + // PEX/NBX approach (eg, PstreamBuffers) but leave with + // with simple exchange for now + + allValues[0] = localValue; + + for (int proci = 1; proci < numProc; ++proci) + { + IPstream fromProc + ( + UPstream::commsTypes::scheduled, + proci, + 0, // bufsize + tag, + comm + ); + fromProc >> allValues[proci]; + } } - else + else if (UPstream::is_rank(comm)) { - IPstream fromAbove + OPstream toProc ( UPstream::commsTypes::scheduled, - myComm.above(), - 0, + UPstream::masterNo(), + 0, // bufsize tag, comm ); - fromAbove >> value; + toProc << localValue; } } + } + else + { + // non-parallel: return own value + // TBD: only when UPstream::is_rank(comm) as well? + allValues.resize(1); + allValues[0] = localValue; + } + + return allValues; +} + + +template<class T> +T Foam::Pstream::listScatterValues +( + const UList<T>& allValues, + const label comm, + const int tag +) +{ + // OR + // if (is_contiguous<T>::value) + // { + // return UPstream::listScatterValues(allValues, comm); + // } + + T localValue{}; + + if (UPstream::is_parallel(comm)) + { + const label numProc = UPstream::nProcs(comm); - // Send to my downstairs neighbours. Note reverse order (compared to - // receiving). This is to make sure to send to the critical path - // (only when using a tree schedule!) first. - forAllReverse(myComm.below(), belowI) + if (UPstream::master(comm) && allValues.size() < numProc) { - const label belowID = myComm.below()[belowI]; + FatalErrorInFunction + << "Attempting to send " << allValues.size() + << " values to " << numProc << " processors" << endl + << Foam::abort(FatalError); + } - if (is_contiguous<T>::value) + if (is_contiguous<T>::value) + { + UPstream::mpiScatter + ( + allValues.cdata_bytes(), + reinterpret_cast<char*>(&localValue), + sizeof(T), // The send/recv size per rank + comm + ); + } + else + { + if (UPstream::master(comm)) { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - belowID, - reinterpret_cast<const char*>(&value), - sizeof(T), - tag, - comm - ); + const label startOfRequests = UPstream::nRequests(); + + List<DynamicList<char>> sendBuffers(numProc); + + for (int proci = 1; proci < numProc; ++proci) + { + UOPstream toProc + ( + UPstream::commsTypes::nonBlocking, + proci, + sendBuffers[proci], + tag, + comm + ); + toProc << allValues[proci]; + } + + // Wait for outstanding requests + UPstream::waitRequests(startOfRequests); + + return allValues[0]; } - else + else if (UPstream::is_rank(comm)) { - OPstream toBelow + IPstream fromProc ( UPstream::commsTypes::scheduled, - belowID, - 0, + UPstream::masterNo(), + 0, // bufsize tag, comm ); - toBelow << value; + fromProc >> localValue; } } } - #endif -} + else + { + // non-parallel: return first value + // TBD: only when UPstream::is_rank(comm) as well? + if (!allValues.empty()) + { + return allValues[0]; + } + } -template<class T, class BinaryOp> -void Foam::Pstream::gather -( - T& value, - const BinaryOp& bop, - const int tag, - const label comm -) -{ - Pstream::gather(UPstream::whichCommunication(comm), value, bop, tag, comm); + return localValue; } -template<class T> -void Foam::Pstream::scatter(T& value, const int tag, const label comm) -{ - #ifndef Foam_Pstream_scatter_nobroadcast - Pstream::broadcast(value, comm); - #else - Pstream::scatter(UPstream::whichCommunication(comm), value, tag, comm); - #endif -} - // ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C index e4dd9f844011ce3cee1c4dc902a5938906905b81..d009e43eb8054ca2f5ac769e4c7c4a3b72e41d57 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2015-2023 OpenCFD Ltd. + Copyright (C) 2015-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -51,7 +51,7 @@ void Foam::Pstream::gatherList const label comm ) { - if (UPstream::is_parallel(comm)) + if (!comms.empty() && UPstream::is_parallel(comm)) { if (values.size() < UPstream::nProcs(comm)) { @@ -62,7 +62,7 @@ void Foam::Pstream::gatherList } // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from my downstairs neighbours for (const label belowID : myComm.below()) @@ -199,7 +199,7 @@ void Foam::Pstream::scatterList // between scatterList() and using broadcast(List<T>&) or a regular // scatter(List<T>&) is that processor-local data is skipped. - if (UPstream::is_parallel(comm)) + if (!comms.empty() && UPstream::is_parallel(comm)) { if (values.size() < UPstream::nProcs(comm)) { @@ -210,7 +210,7 @@ void Foam::Pstream::scatterList } // My communication order - const commsStruct& myComm = comms[UPstream::myProcNo(comm)]; + const auto& myComm = comms[UPstream::myProcNo(comm)]; // Receive from up if (myComm.above() != -1) @@ -323,7 +323,13 @@ void Foam::Pstream::gatherList const label comm ) { - Pstream::gatherList(UPstream::whichCommunication(comm), values, tag, comm); + Pstream::gatherList + ( + UPstream::whichCommunication(comm), + values, + tag, + comm + ); } @@ -336,7 +342,13 @@ void Foam::Pstream::scatterList const label comm ) { - Pstream::scatterList(UPstream::whichCommunication(comm), values, tag, comm); + Pstream::scatterList + ( + UPstream::whichCommunication(comm), + values, + tag, + comm + ); } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H index 7c114a54e8746bc5da3c72fb90ee1545215463de..aff75a17c0a35104201f5af19fe6612a949b79f7 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamReduceOps.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2023 OpenCFD Ltd. + Copyright (C) 2016-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -46,28 +46,6 @@ namespace Foam // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // -//- Reduce inplace (cf. MPI Allreduce) -//- using specified communication schedule. -template<class T, class BinaryOp> -void reduce -( - const List<UPstream::commsStruct>& comms, - T& value, - const BinaryOp& bop, - const int tag, - const label comm -) -{ - if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) - { - Pout<< "** reducing:" << value << " with comm:" << comm << endl; - error::printStack(Pout); - } - Pstream::gather(comms, value, bop, tag, comm); - Pstream::broadcast(value, comm); -} - - //- Reduce inplace (cf. MPI Allreduce) //- using linear/tree communication schedule template<class T, class BinaryOp> @@ -81,7 +59,13 @@ void reduce { if (UPstream::is_parallel(comm)) { - Foam::reduce(UPstream::whichCommunication(comm), value, bop, tag, comm); + if (UPstream::warnComm >= 0 && comm != UPstream::warnComm) + { + Pout<< "** reducing:" << value << " with comm:" << comm << endl; + error::printStack(Pout); + } + Pstream::gather(value, bop, tag, comm); + Pstream::broadcast(value, comm); } } @@ -436,8 +420,7 @@ Pstream_SumReduce(double); // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // -// Convenience wrappers for some reduction operations -// - defined after all specialisations are known +// Convenience wrappers - defined after all specialisations are known //- Perform reduction on a copy, using specified binary operation // \return the resulting value diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 6e9fa98636f19dbf138b47fa858c0ad2e6e54cd9..5c96b9a38080a94b2965c24cd15ad92f4fa00525 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -52,6 +52,7 @@ Foam::UPstream::commsTypeNames ({ { commsTypes::blocking, "blocking" }, { commsTypes::scheduled, "scheduled" }, + // { commsTypes::nonBlocking, "non-blocking" }, { commsTypes::nonBlocking, "nonBlocking" }, }); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index 83b24683fe861ac025e80c4dbab2fce9c5b32309..83760dfc3e19c89fb79c5fefb5221ed255297976 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2015-2023 OpenCFD Ltd. + Copyright (C) 2015-2024 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -895,21 +895,32 @@ public: ); //- Communication schedule for tree all-to-master (proc 0) - static const List<commsStruct>& treeCommunication + static const List<commsStruct>& + treeCommunication ( const label communicator = worldComm ); - //- Communication schedule for linear/tree all-to-master (proc 0). - //- Chooses based on the value of UPstream::nProcsSimpleSum + //- Communication schedule for all-to-master (proc 0) as + //- linear/tree/none with switching based on UPstream::nProcsSimpleSum + //- and the is_parallel() state static const List<commsStruct>& whichCommunication ( const label communicator = worldComm ) { + const label np + ( + parRun_ && is_rank(communicator) // cf. is_parallel() + ? nProcs(communicator) + : 0 + ); + return ( - nProcs(communicator) < nProcsSimpleSum + np <= 1 + ? List<commsStruct>::null() + : np < nProcsSimpleSum ? linearCommunication(communicator) : treeCommunication(communicator) ); @@ -1138,7 +1149,7 @@ public: // On master input list length == nProcs, ignored on other procs. // \n // For \b non-parallel : - // returns the first list element (or zero). + // returns the first list element (or default initialized). template<class T> static T listScatterValues ( diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C index 10065bff3f60cda3f254a29d0eebe8d1e1a05696..6f71ec548f8428f285fd04290e2b8786077c0fa0 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C @@ -34,14 +34,6 @@ Foam::List<T> Foam::UPstream::allGatherValues const label comm ) { - if (!is_contiguous<T>::value) - { - FatalErrorInFunction - << "Cannot all-gather values for non-contiguous types" << endl - << Foam::abort(FatalError); - } - - List<T> allValues; if (UPstream::is_parallel(comm)) @@ -49,7 +41,17 @@ Foam::List<T> Foam::UPstream::allGatherValues allValues.resize(UPstream::nProcs(comm)); allValues[UPstream::myProcNo(comm)] = localValue; - UPstream::mpiAllGather(allValues.data_bytes(), sizeof(T), comm); + if (is_contiguous<T>::value) + { + UPstream::mpiAllGather(allValues.data_bytes(), sizeof(T), comm); + } + else + { + FatalErrorInFunction + << "Cannot all-gather values for non-contiguous types" + " - consider Pstream variant instead" << endl + << Foam::abort(FatalError); + } } else { @@ -70,14 +72,6 @@ Foam::List<T> Foam::UPstream::listGatherValues const label comm ) { - if (!is_contiguous<T>::value) - { - FatalErrorInFunction - << "Cannot gather values for non-contiguous types" << endl - << Foam::abort(FatalError); - } - - List<T> allValues; if (UPstream::is_parallel(comm)) @@ -87,13 +81,23 @@ Foam::List<T> Foam::UPstream::listGatherValues allValues.resize(UPstream::nProcs(comm)); } - UPstream::mpiGather - ( - reinterpret_cast<const char*>(&localValue), - allValues.data_bytes(), - sizeof(T), // The send/recv size per rank - comm - ); + if (is_contiguous<T>::value) + { + UPstream::mpiGather + ( + reinterpret_cast<const char*>(&localValue), + allValues.data_bytes(), + sizeof(T), // The send/recv size per rank + comm + ); + } + else + { + FatalErrorInFunction + << "Cannot gather values for non-contiguous types" + " - consider Pstream variant instead" << endl + << Foam::abort(FatalError); + } } else { @@ -114,47 +118,46 @@ T Foam::UPstream::listScatterValues const label comm ) { - if (!is_contiguous<T>::value) - { - FatalErrorInFunction - << "Cannot scatter values for non-contiguous types" << endl - << Foam::abort(FatalError); - } - - - T localValue; + T localValue{}; if (UPstream::is_parallel(comm)) { - const label nproc = UPstream::nProcs(comm); + const label numProc = UPstream::nProcs(comm); - if (UPstream::master(comm) && allValues.size() < nproc) + if (UPstream::master(comm) && allValues.size() < numProc) { FatalErrorInFunction << "Attempting to send " << allValues.size() - << " values to " << nproc << " processors" << endl + << " values to " << numProc << " processors" << endl << Foam::abort(FatalError); } - UPstream::mpiScatter - ( - allValues.cdata_bytes(), - reinterpret_cast<char*>(&localValue), - sizeof(T), // The send/recv size per rank - comm - ); + if (is_contiguous<T>::value) + { + UPstream::mpiScatter + ( + allValues.cdata_bytes(), + reinterpret_cast<char*>(&localValue), + sizeof(T), // The send/recv size per rank + comm + ); + } + else + { + FatalErrorInFunction + << "Cannot scatter values for non-contiguous types" + " - consider Pstream variant instead" << endl + << Foam::abort(FatalError); + } } else { - // non-parallel: return local value + // non-parallel: return first value + // TBD: only when UPstream::is_rank(comm) as well? - if (UPstream::is_rank(comm) && !allValues.empty()) - { - localValue = allValues[0]; - } - else + if (!allValues.empty()) { - localValue = Zero; + return allValues[0]; } }