diff --git a/applications/test/Tuple2/Make/files b/applications/test/Tuple2/Make/files index be0026f0cdf85e21a0302cdf3fc8711657cc30b5..f58d59d0f2cc61e99fad618bd69ee7ad608a4473 100644 --- a/applications/test/Tuple2/Make/files +++ b/applications/test/Tuple2/Make/files @@ -1,3 +1,3 @@ -Test-Tuple2.C +Test-Tuple2.cxx EXE = $(FOAM_USER_APPBIN)/Test-Tuple2 diff --git a/applications/test/Tuple2/Test-Tuple2.C b/applications/test/Tuple2/Test-Tuple2.cxx similarity index 97% rename from applications/test/Tuple2/Test-Tuple2.C rename to applications/test/Tuple2/Test-Tuple2.cxx index 9163f14e5488981e81d648ae335eabcf22f6b3a0..a2ebf44592b84c60cd8a704547bf166cf075b163 100644 --- a/applications/test/Tuple2/Test-Tuple2.C +++ b/applications/test/Tuple2/Test-Tuple2.cxx @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011 OpenFOAM Foundation - Copyright (C) 2019-2020 OpenCFD Ltd. + Copyright (C) 2019-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -32,6 +32,7 @@ Description \*---------------------------------------------------------------------------*/ +#include "argList.H" #include "labelPair.H" #include "Tuple2.H" #include "label.H" @@ -102,8 +103,12 @@ void printTuple2(const Pair<word>& t) // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // // Main program: -int main() +int main(int argc, char *argv[]) { + argList::noCheckProcessorDirectories(); + + #include "setRootCase.H" + typedef Tuple2<label, scalar> indexedScalar; Info<< "Default constructed Tuple: " << indexedScalar() << nl; diff --git a/applications/test/nodeTopology/Test-nodeTopology.cxx b/applications/test/nodeTopology/Test-nodeTopology.cxx index db4a5eeaf4382fb6771438a9bb8b384a18fdebb7..9909d51d4f51b7b0e8443cd86bd6e01adaee588b 100644 --- a/applications/test/nodeTopology/Test-nodeTopology.cxx +++ b/applications/test/nodeTopology/Test-nodeTopology.cxx @@ -59,13 +59,13 @@ int main(int argc, char *argv[]) label nProcs = UPstream::nProcs(UPstream::worldComm); - List<int> interNodeProcs_fake; + DynamicList<int> fake_interNode_offsets; if (UPstream::parRun()) { if (args.found("numProcs")) { - InfoErr<< "ignoring -np option in parallel" << nl; + InfoErr<< "ignoring -numProcs option in parallel" << nl; } if (args.found("cores")) { @@ -78,25 +78,40 @@ int main(int argc, char *argv[]) nProcs = args.getOrDefault<label>("numProcs", 16); label nCores = args.getOrDefault<label>("cores", 4); + auto& interNode_offsets = fake_interNode_offsets; + if (nCores > 1 && nCores < nProcs) { - const label numNodes - = (nProcs/nCores) + ((nProcs % nCores) ? 1 : 0); - - interNodeProcs_fake.resize(numNodes); + // Build the inter-node offsets + interNode_offsets.reserve((nProcs/nCores) + 4); + interNode_offsets.push_back(0); - for (label nodei = 0; nodei < numNodes; ++nodei) + for + ( + int count = interNode_offsets.back() + nCores; + count < nProcs; + count += nCores + ) { - interNodeProcs_fake[nodei] = nodei * nCores; + interNode_offsets.push_back(count); } + + interNode_offsets.push_back(nProcs); + } + else + { + // Some fallback + interNode_offsets.reserve(2); + interNode_offsets.push_back(0); + interNode_offsets.push_back(nProcs); } } - const List<int>& interNodeProcs = + const List<int>& interNodeOffsets = ( UPstream::parRun() - ? UPstream::procID(UPstream::commInterNode()) - : interNodeProcs_fake + ? UPstream::interNode_offsets() + : fake_interNode_offsets ); @@ -111,79 +126,31 @@ int main(int argc, char *argv[]) // Prefer left-to-right layout for large graphs os << indent << "rankdir=LR" << nl; - int pos = 0; + const label numNodes = interNodeOffsets.size()-1; // First level are the inter-node connections - const label parent = 0; - for (const auto proci : interNodeProcs) { - if (parent == proci) continue; - - if (pos) - { - os << " "; - } - else - { - os << indent; - } - os << parent << " -- " << proci; + os << indent << 0 << " -- " << token::LBRACE; - if (++pos >= 4) // Max 4 items per line + for (label nodei = 1; nodei < numNodes; ++nodei) { - pos = 0; - os << nl; + os << ' ' << interNodeOffsets[nodei]; } - } - if (pos) - { - pos = 0; - os << nl; + os << token::SPACE << token::RBRACE + << " // inter-node: " << flatOutput(interNodeOffsets) + << nl; } - // Next level are within the nodes - for (label nodei = 0; nodei < interNodeProcs.size(); ++nodei) + // Next level are the local-node connections + for (label nodei = 0; nodei < numNodes; ++nodei) { - pos = 0; - - label firstProc = interNodeProcs[nodei]; - const label lastProc = - ( - (nodei+1 < interNodeProcs.size()) - ? interNodeProcs[nodei+1] - : nProcs - ); + const auto firstProc = interNodeOffsets[nodei]; + const auto lastProc = interNodeOffsets[nodei+1]; - os << indent << "// inter-node " << nodei - << " [" << firstProc - << ".." << lastProc-1 << "]" << nl; - - for (label proci = firstProc; proci < lastProc; ++proci) - { - if (firstProc == proci) continue; - - if (pos) - { - os << " "; - } - else - { - os << indent; - } - os << firstProc << " -- " << proci; - - if (++pos >= 4) // Max 4 items per line - { - pos = 0; - os << nl; - } - } - if (pos) - { - pos = 0; - os << nl; - } + os << indent << firstProc << " -- " << token::DQUOTE + << (firstProc+1) << ".." << (lastProc-1) + << token::DQUOTE << nl; } os.endBlock(); diff --git a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C index 8542952455c88a611f6ff8b68c931b0a5a5ca265..47d22fa427753c15d7c978fea68b75e6d36c063f 100644 --- a/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C +++ b/src/OpenFOAM/db/IOobjects/decomposedBlockData/decomposedBlockData.C @@ -763,15 +763,15 @@ void Foam::decomposedBlockData::gather recvOffsets.setSize(nProcs); forAll(recvOffsets, proci) { - // Note: truncating long int to int since UPstream::gather limited - // to ints + // Note: truncating long int to int since + // UPstream::mpiGatherv is limited to ints recvOffsets[proci] = int(reinterpret_cast<char*>(&datas[proci]) - data0Ptr); } recvSizes.setSize(nProcs, sizeof(label)); } - UPstream::gather + UPstream::mpiGatherv ( reinterpret_cast<const char*>(&data), sizeof(label), @@ -824,11 +824,11 @@ void Foam::decomposedBlockData::gatherSlaveData } else if (fromProcs.contains(myProci)) { - // Note: UPstream::gather limited to int + // Note: UPstream::mpiGatherv limited to int nSendBytes = int(data.size_bytes()); } - UPstream::gather + UPstream::mpiGatherv ( data.cdata(), nSendBytes, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 05dc64eb5135757473d450a61390151621965a8f..0f7a17e3a3397808256cc427f76bcdf956da9810 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2011-2016 OpenFOAM Foundation - Copyright (C) 2016-2024 OpenCFD Ltd. + Copyright (C) 2016-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -32,12 +32,11 @@ Description SourceFiles Pstream.C - PstreamBroadcast.C - PstreamGather.C - PstreamCombineGather.C - PstreamGatherList.C - PstreamExchangeConsensus.C - PstreamExchange.C + PstreamBroadcast.txx + PstreamGather.txx + PstreamGatherList.txx + PstreamExchange.txx + PstreamExchangeConsensus.txx \*---------------------------------------------------------------------------*/ @@ -125,14 +124,18 @@ public: ); - // Gather + // Gather/scatter : single value //- Gather (reduce) data, applying \c bop to combine \c value //- from different processors. The basis for Foam::reduce(). - // Uses linear/tree communication (with parallel guard). - template<class T, class BinaryOp> + // A no-op for non-parallel. + // + // \tparam InplaceMode indicates that the binary operator + // modifies values in-place, not using assignment + template<class T, class BinaryOp, bool InplaceMode=false> static void gather ( + //! [in,out] the result is only reliable on rank=0 T& value, const BinaryOp& bop, const int tag = UPstream::msgType(), @@ -168,17 +171,13 @@ public: ); - // Gather/combine data - // Inplace combine values from processors. - // (Uses construct from Istream instead of \c << operator) + // Inplace combine (gather) : single value - //- Gather data, applying \c cop to inplace combine \c value - //- from different processors. - // Uses linear/tree communication (with parallel guard). + //- Forwards to Pstream::gather with an \em in-place \c cop template<class T, class CombineOp> static void combineGather ( - //! [in,out] + //! [in,out] the result is only reliable on rank=0 T& value, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -188,13 +187,10 @@ public: //- Reduce inplace (cf. MPI Allreduce) //- applying \c cop to inplace combine \c value //- from different processors. - //- After completion all processors have the same data. - // Uses linear/tree communication. - // Wraps combineGather/broadcast (may change in the future). template<class T, class CombineOp> static void combineReduce ( - //! [in,out] + //! [in,out] the result is consistent on all ranks T& value, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -205,37 +201,65 @@ public: template<class T, class CombineOp> static void combineAllGather ( + //! [in,out] the result is consistent on all ranks T& value, const CombineOp& cop, const int tag = UPstream::msgType(), const label comm = UPstream::worldComm ) { - Pstream::combineReduce(value, cop, tag, comm); + Pstream::listCombineReduce(value, cop, tag, comm); } - // Combine variants working on whole List at a time. + // Gather/combine variants working on entire List - //- Combines List elements. - // Uses linear/tree communication (with parallel guard). + //- Gather (reduce) list elements, + //- applying \c bop to each list element + // + // \tparam InplaceMode indicates that the binary operator + // modifies values in-place, not using assignment + template<class T, class BinaryOp, bool InplaceMode=false> + static void listGather + ( + //! [in,out] the result is only reliable on rank=0 + UList<T>& values, + const BinaryOp& bop, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + + //- Forwards to Pstream::listGather with an \em in-place \c cop template<class T, class CombineOp> static void listCombineGather ( - //! [in,out] + //! [in,out] the result is only reliable on rank=0 UList<T>& values, const CombineOp& cop, const int tag = UPstream::msgType(), const label comm = UPstream::worldComm ); - //- Combines List elements. - //- After completion all processors have the same data. - // Uses linear/tree communication (with parallel guard). + //- Gather (reduce) list elements, + //- applying \c bop to combine each list element. + // + // \tparam InplaceMode indicates that the binary operator + // modifies values in-place, not using assignment + template<class T, class BinaryOp, bool InplaceMode=false> + static void listGatherReduce + ( + //! [in,out] the result is consistent on all ranks + List<T>& values, + const BinaryOp& bop, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + + //- Forwards to Pstream::listGatherReduce with an \em in-place \c cop template<class T, class CombineOp> static void listCombineReduce ( - //! [in,out] - List (not UList) due to broadcast() + //! [in,out] the result is consistent on all ranks List<T>& values, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -246,7 +270,7 @@ public: template<class T, class CombineOp> static void listCombineAllGather ( - //! [in,out] - List (not UList) due to broadcast() + //! [in,out] the result is consistent on all ranks List<T>& values, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -257,14 +281,28 @@ public: } - // Combine variants working on whole map at a time. - // Container needs iterators, find() and insert methods defined. + // Gather/combine variants working on Map/HashTable containers - //- Combine Map elements. - // Uses linear/tree communication (with parallel guard). + //- Gather (reduce) Map/HashTable containers, + //- applying \c bop to combine entries from different processors. + // + // \tparam InplaceMode indicates that the binary operator + // modifies values in-place, not using assignment + template<class Container, class BinaryOp, bool InplaceMode=false> + static void mapGather + ( + //! [in,out] the result is only reliable on rank=0 + Container& values, + const BinaryOp& bop, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + + //- Forwards to Pstream::mapGather with an \em in-place \c cop template<class Container, class CombineOp> static void mapCombineGather ( + //! [in,out] the result is only reliable on rank=0 Container& values, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -272,15 +310,26 @@ public: ); //- Reduce inplace (cf. MPI Allreduce) - //- applying \c cop to inplace combine map \c values + //- applying \c bop to combine map \c values //- from different processors. //- After completion all processors have the same data. - // Uses the specified communication schedule. + // // Wraps mapCombineGather/broadcast (may change in the future). - //- After completion all processors have the same data. + template<class Container, class BinaryOp, bool InplaceMode=false> + static void mapGatherReduce + ( + //! [in,out] the result is consistent on all ranks + Container& values, + const BinaryOp& bop, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + + //- Forwards to Pstream::mapGatherReduce with an \em in-place \c cop template<class Container, class CombineOp> static void mapCombineReduce ( + //! [in,out] the result is consistent on all ranks Container& values, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -291,6 +340,7 @@ public: template<class Container, class CombineOp> static void mapCombineAllGather ( + //! [in,out] the result is consistent on all ranks Container& values, const CombineOp& cop, const int tag = UPstream::msgType(), @@ -301,24 +351,38 @@ public: } - // Gather/scatter keeping the individual processor data separate. - // The values is a List of size UPstream::nProcs() where - // values[UPstream::myProcNo()] is the data for the current processor. +private: + + // Private implementations //- Gather data, but keep individual values separate. - //- Uses the specified communication schedule. template<class T> - static void gatherList + static void gatherList_tree_algorithm + ( + //! [in,out] + UList<T>& values, + const int tag, + const label communicator + ); + + //- Inverse of gatherList_tree_algorithm + template<class T> + static void scatterList_tree_algorithm ( - const UPstream::commsStructList& comms, //! [in,out] UList<T>& values, const int tag, - const label comm + const label communicator ); + +public: + + // Gather/scatter keeping the individual processor data separate. + // The values is a List of size UPstream::nProcs() where + // values[UPstream::myProcNo()] is the data for the current processor. + //- Gather data, but keep individual values separate. - //- Uses linear/tree communication. template<class T> static void gatherList ( @@ -329,7 +393,7 @@ public: ); //- Gather data, but keep individual values separate. - //- Uses MPI_Allgather or manual linear/tree communication. + //- Uses MPI_Allgather or manual communication. // After completion all processors have the same data. // Wraps gatherList/scatterList (may change in the future). template<class T> @@ -345,21 +409,10 @@ public: // Scatter //- Inverse of gatherList. - //- Uses the specified communication schedule. - template<class T> - static void scatterList - ( - const UPstream::commsStructList& comms, - UList<T>& values, - const int tag, - const label comm - ); - - //- Inverse of gatherList. - //- Uses linear/tree communication. template<class T> static void scatterList ( + //! [in,out] UList<T>& values, const int tag = UPstream::msgType(), const label comm = UPstream::worldComm @@ -607,11 +660,10 @@ public: // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #ifdef NoRepository - #include "PstreamBroadcast.C" - #include "PstreamGather.C" - #include "PstreamCombineGather.C" - #include "PstreamGatherList.C" - #include "PstreamExchange.C" + #include "PstreamBroadcast.txx" + #include "PstreamGather.txx" + #include "PstreamGatherList.txx" + #include "PstreamExchange.txx" #endif // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.txx similarity index 100% rename from src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.C rename to src/OpenFOAM/db/IOstreams/Pstreams/PstreamBroadcast.txx diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C deleted file mode 100644 index 911bc8ebb8114eb0dfcef9eb63b96290c63a00a1..0000000000000000000000000000000000000000 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineGather.C +++ /dev/null @@ -1,365 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | OpenFOAM: The Open Source CFD Toolbox - \\ / O peration | - \\ / A nd | www.openfoam.com - \\/ M anipulation | -------------------------------------------------------------------------------- - Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2025 OpenCFD Ltd. -------------------------------------------------------------------------------- -License - This file is part of OpenFOAM. - - OpenFOAM is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - OpenFOAM is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. - -Description - Variant of gather. - Normal gather uses: - - default construct and read (>>) from Istream - - binary operator and assignment operator to combine values - - combineGather uses: - - construct from Istream - - modify operator which modifies its lhs - -\*---------------------------------------------------------------------------*/ - -#include "IPstream.H" -#include "OPstream.H" -#include "IOstreams.H" -#include "contiguous.H" - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -template<class T, class CombineOp> -void Foam::Pstream::combineGather -( - T& value, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - // Communication order - const auto& comms = UPstream::whichCommunication(comm); - // if (comms.empty()) return; // extra safety? - const auto& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from my downstairs neighbours - for (const label belowID : myComm.below()) - { - if constexpr (is_contiguous_v<T>) - { - T received; - - UIPstream::read - ( - UPstream::commsTypes::scheduled, - belowID, - reinterpret_cast<char*>(&received), - sizeof(T), - tag, - comm - ); - - if (debug & 2) - { - Perr<< " received from " - << belowID << " data:" << received << endl; - } - - cop(value, received); - } - else - { - IPstream fromBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, // bufsize - tag, - comm - ); - T received(fromBelow); - - if (debug & 2) - { - Perr<< " received from " - << belowID << " data:" << received << endl; - } - - cop(value, received); - } - } - - // Send up value - if (myComm.above() >= 0) - { - if (debug & 2) - { - Perr<< " sending to " << myComm.above() - << " data:" << value << endl; - } - - if constexpr (is_contiguous_v<T>) - { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - myComm.above(), - reinterpret_cast<const char*>(&value), - sizeof(T), - tag, - comm - ); - } - else - { - OPstream::send(value, myComm.above(), tag, comm); - } - } - } -} - - -template<class T, class CombineOp> -void Foam::Pstream::combineReduce -( - T& value, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - Pstream::combineGather(value, cop, tag, comm); - Pstream::broadcast(value, comm); - } -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -template<class T, class CombineOp> -void Foam::Pstream::listCombineGather -( - UList<T>& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - // Communication order - const auto& comms = UPstream::whichCommunication(comm); - // if (comms.empty()) return; // extra safety? - const auto& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from my downstairs neighbours - for (const label belowID : myComm.below()) - { - if constexpr (is_contiguous_v<T>) - { - List<T> received(values.size()); - - UIPstream::read - ( - UPstream::commsTypes::scheduled, - belowID, - received, - tag, - comm - ); - - if (debug & 2) - { - Perr<< " received from " - << belowID << " data:" << received << endl; - } - - forAll(values, i) - { - cop(values[i], received[i]); - } - } - else - { - IPstream fromBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, // bufsize - tag, - comm - ); - List<T> received(fromBelow); - - if (debug & 2) - { - Perr<< " received from " - << belowID << " data:" << received << endl; - } - - forAll(values, i) - { - cop(values[i], received[i]); - } - } - } - - // Send up values - if (myComm.above() >= 0) - { - if (debug & 2) - { - Perr<< " sending to " << myComm.above() - << " data:" << values << endl; - } - - if constexpr (is_contiguous_v<T>) - { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - myComm.above(), - values, - tag, - comm - ); - } - else - { - OPstream::send(values, myComm.above(), tag, comm); - } - } - } -} - - -template<class T, class CombineOp> -void Foam::Pstream::listCombineReduce -( - List<T>& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - Pstream::listCombineGather(values, cop, tag, comm); - Pstream::broadcast(values, comm); - } -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -template<class Container, class CombineOp> -void Foam::Pstream::mapCombineGather -( - Container& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - // Communication order - const auto& comms = UPstream::whichCommunication(comm); - // if (comms.empty()) return; // extra safety? - const auto& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from my downstairs neighbours - for (const label belowID : myComm.below()) - { - // Map/HashTable: non-contiguous - - IPstream fromBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, // bufsize - tag, - comm - ); - Container received(fromBelow); - - if (debug & 2) - { - Perr<< " received from " - << belowID << " data:" << received << endl; - } - - for - ( - auto recvIter = received.cbegin(); - recvIter != received.cend(); - ++recvIter - ) - { - auto masterIter = values.find(recvIter.key()); - - if (masterIter.good()) - { - // Combine with existing - cop(masterIter.val(), recvIter.val()); - } - else - { - // Insert new key/value - values.insert(recvIter.key(), recvIter.val()); - } - } - } - - // Send up values - if (myComm.above() >= 0) - { - if (debug & 2) - { - Perr<< " sending to " << myComm.above() - << " data:" << values << endl; - } - - OPstream::send(values, myComm.above(), tag, comm); - } - } -} - - -template<class Container, class CombineOp> -void Foam::Pstream::mapCombineReduce -( - Container& values, - const CombineOp& cop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - Pstream::mapCombineGather(values, cop, tag, comm); - Pstream::broadcast(values, comm); - } -} - - -// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.txx similarity index 99% rename from src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C rename to src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.txx index b8bc41861c4cc8428c599b6db18d5ead5c6e0e78..59c7a4824134aa5eaab6bc9aeba3679906112692 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.txx @@ -616,7 +616,7 @@ void exchangeContainer } // namespace PstreamDetail } // namespace Foam -#include "PstreamExchangeConsensus.C" +#include "PstreamExchangeConsensus.txx" // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.txx similarity index 100% rename from src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.C rename to src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchangeConsensus.txx diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C deleted file mode 100644 index f3ec676e1ec06e52156055d0440fa7ac66be1c58..0000000000000000000000000000000000000000 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.C +++ /dev/null @@ -1,234 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | OpenFOAM: The Open Source CFD Toolbox - \\ / O peration | - \\ / A nd | www.openfoam.com - \\/ M anipulation | -------------------------------------------------------------------------------- - Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2019-2025 OpenCFD Ltd. -------------------------------------------------------------------------------- -License - This file is part of OpenFOAM. - - OpenFOAM is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - OpenFOAM is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. - -Description - Gather data from all processors onto single processor according to some - communication schedule (usually tree-to-master). - The gathered data will be a single value constructed from the values - on individual processors using a user-specified operator. - -\*---------------------------------------------------------------------------*/ - -#include "IPstream.H" -#include "OPstream.H" -#include "contiguous.H" - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -template<class T, class BinaryOp> -void Foam::Pstream::gather -( - T& value, - const BinaryOp& bop, - const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - // Communication order - const auto& comms = UPstream::whichCommunication(comm); - // if (comms.empty()) return; // extra safety? - const auto& myComm = comms[UPstream::myProcNo(comm)]; - - // Receive from my downstairs neighbours - for (const label belowID : myComm.below()) - { - T received; - - if constexpr (is_contiguous_v<T>) - { - UIPstream::read - ( - UPstream::commsTypes::scheduled, - belowID, - reinterpret_cast<char*>(&received), - sizeof(T), - tag, - comm - ); - } - else - { - IPstream::recv(received, belowID, tag, comm); - } - - value = bop(value, received); - } - - // Send up value - if (myComm.above() >= 0) - { - if constexpr (is_contiguous_v<T>) - { - UOPstream::write - ( - UPstream::commsTypes::scheduled, - myComm.above(), - reinterpret_cast<const char*>(&value), - sizeof(T), - tag, - comm - ); - } - else - { - OPstream::send(value, myComm.above(), tag, comm); - } - } - } -} - - -template<class T> -Foam::List<T> Foam::Pstream::listGatherValues -( - const T& localValue, - const label comm, - [[maybe_unused]] const int tag -) -{ - if constexpr (is_contiguous_v<T>) - { - // UPstream version is contiguous only - return UPstream::listGatherValues(localValue, comm); - } - else - { - List<T> allValues; - - if (UPstream::is_parallel(comm)) - { - const label numProc = UPstream::nProcs(comm); - - if (UPstream::master(comm)) - { - allValues.resize(numProc); - - // Non-trivial to manage non-blocking gather without a - // PEX/NBX approach (eg, PstreamBuffers). - // Leave with simple exchange for now - - allValues[0] = localValue; - - for (int proci = 1; proci < numProc; ++proci) - { - IPstream::recv(allValues[proci], proci, tag, comm); - } - } - else if (UPstream::is_rank(comm)) - { - OPstream::send(localValue, UPstream::masterNo(), tag, comm); - } - } - else - { - // non-parallel: return own value - // TBD: only when UPstream::is_rank(comm) as well? - allValues.resize(1); - allValues[0] = localValue; - } - - return allValues; - } -} - - -template<class T> -T Foam::Pstream::listScatterValues -( - const UList<T>& allValues, - const label comm, - [[maybe_unused]] const int tag -) -{ - if constexpr (is_contiguous_v<T>) - { - // UPstream version is contiguous only - return UPstream::listScatterValues(allValues, comm); - } - else - { - T localValue{}; - - if (UPstream::is_parallel(comm)) - { - const label numProc = UPstream::nProcs(comm); - - if (UPstream::master(comm) && allValues.size() < numProc) - { - FatalErrorInFunction - << "Attempting to send " << allValues.size() - << " values to " << numProc << " processors" << endl - << Foam::abort(FatalError); - } - - if (UPstream::master(comm)) - { - const label startOfRequests = UPstream::nRequests(); - - List<DynamicList<char>> sendBuffers(numProc); - - for (int proci = 1; proci < numProc; ++proci) - { - UOPstream toProc - ( - UPstream::commsTypes::nonBlocking, - proci, - sendBuffers[proci], - tag, - comm - ); - toProc << allValues[proci]; - } - - // Wait for outstanding requests - UPstream::waitRequests(startOfRequests); - - return allValues[0]; - } - else if (UPstream::is_rank(comm)) - { - IPstream::recv(localValue, UPstream::masterNo(), tag, comm); - } - } - else - { - // non-parallel: return first value - // TBD: only when UPstream::is_rank(comm) as well? - - if (!allValues.empty()) - { - return allValues[0]; - } - } - - return localValue; - } -} - - -// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx new file mode 100644 index 0000000000000000000000000000000000000000..154aee8fb9865be7393aaec5cc04455f087f9f83 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx @@ -0,0 +1,615 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2011-2017 OpenFOAM Foundation + Copyright (C) 2019-2025 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +Description + Gather data from all processors onto single processor according to some + communication schedule (usually tree-to-master). + The gathered data will be a single value constructed from the values + on individual processors using a user-specified operator. + +Note + Normal gather uses: + - binary operator that returns a value. + So assignment that return value to yield the new value + + Combine gather uses: + - binary operator modifies its first parameter in-place + +\*---------------------------------------------------------------------------*/ + +#include "contiguous.H" +#include "IPstream.H" +#include "OPstream.H" + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +// Single value variants + +template<class T, class BinaryOp, bool InplaceMode> +void Foam::Pstream::gather +( + T& value, + const BinaryOp& bop, + const int tag, + const label communicator +) +{ + if (!UPstream::is_parallel(communicator)) + { + // Nothing to do + return; + } + else + { + // Communication order + const auto& comms = UPstream::whichCommunication(communicator); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(communicator)]; + const auto& below = myComm.below(); + + // Receive from my downstairs neighbours + for (const auto proci : below) + { + T received; + + if constexpr (is_contiguous_v<T>) + { + UIPstream::read + ( + UPstream::commsTypes::scheduled, + proci, + reinterpret_cast<char*>(&received), + sizeof(T), + tag, + communicator + ); + } + else + { + IPstream::recv(received, proci, tag, communicator); + } + + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " received from " + << proci << " data:" << received << endl; + } + } + + if constexpr (InplaceMode) + { + // In-place binary operation + bop(value, received); + } + else + { + // Assign result of binary operation + value = bop(value, received); + } + } + + // Send up value + if (myComm.above() >= 0) + { + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " sending to " << myComm.above() + << " data:" << value << endl; + } + } + + if constexpr (is_contiguous_v<T>) + { + UOPstream::write + ( + UPstream::commsTypes::scheduled, + myComm.above(), + reinterpret_cast<const char*>(&value), + sizeof(T), + tag, + communicator + ); + } + else + { + OPstream::send(value, myComm.above(), tag, communicator); + } + } + } +} + + +template<class T, class CombineOp> +void Foam::Pstream::combineGather +( + T& value, + const CombineOp& cop, + const int tag, + const label comm +) +{ + // In-place binary operation + Pstream::gather<T, CombineOp, true>(value, cop, tag, comm); +} + + +template<class T, class CombineOp> +void Foam::Pstream::combineReduce +( + T& value, + const CombineOp& cop, + const int tag, + const label comm +) +{ + if (UPstream::is_parallel(comm)) + { + // In-place binary operation + Pstream::gather<T, CombineOp, true>(value, cop, tag, comm); + Pstream::broadcast(value, comm); + } +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +// List variants + +template<class T, class BinaryOp, bool InplaceMode> +void Foam::Pstream::listGather +( + UList<T>& values, + const BinaryOp& bop, + const int tag, + const label communicator +) +{ + if (!UPstream::is_parallel(communicator) || values.empty()) + { + // Nothing to do + return; + } + else + { + // Communication order + const auto& comms = UPstream::whichCommunication(communicator); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(communicator)]; + const auto& below = myComm.below(); + + // Same length on all ranks + const label listLen = values.size(); + + List<T> received; + + if (!below.empty()) + { + // Pre-size for contiguous reading + if constexpr (is_contiguous_v<T>) + { + received.resize_nocopy(listLen); + } + } + + // Receive from my downstairs neighbours + for (const auto proci : below) + { + if constexpr (is_contiguous_v<T>) + { + UIPstream::read + ( + UPstream::commsTypes::scheduled, + proci, + received, + tag, + communicator + ); + } + else + { + received.clear(); // extra safety? + IPstream::recv(received, proci, tag, communicator); + } + + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " received from " + << proci << " data:" << received << endl; + } + } + + for (label i = 0; i < listLen; ++i) + { + if constexpr (InplaceMode) + { + // In-place binary operation + bop(values[i], received[i]); + } + else + { + // Assign result of binary operation + values[i] = bop(values[i], received[i]); + } + } + } + + // Send up values + if (myComm.above() >= 0) + { + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " sending to " << myComm.above() + << " data:" << values << endl; + } + } + + if constexpr (is_contiguous_v<T>) + { + UOPstream::write + ( + UPstream::commsTypes::scheduled, + myComm.above(), + values, + tag, + communicator + ); + } + else + { + OPstream::send(values, myComm.above(), tag, communicator); + } + } + } +} + + +template<class T, class BinaryOp, bool InplaceMode> +void Foam::Pstream::listGatherReduce +( + List<T>& values, + const BinaryOp& bop, + const int tag, + const label comm +) +{ + Pstream::listGather<T, BinaryOp, InplaceMode>(values, bop, tag, comm); + if (!values.empty()) + { + Pstream::broadcast(values, comm); + } +} + + +template<class T, class CombineOp> +void Foam::Pstream::listCombineGather +( + UList<T>& values, + const CombineOp& cop, + const int tag, + const label comm +) +{ + // In-place binary operation + Pstream::listGather<T, CombineOp, true>(values, cop, tag, comm); +} + + +template<class T, class CombineOp> +void Foam::Pstream::listCombineReduce +( + List<T>& values, + const CombineOp& cop, + const int tag, + const label comm +) +{ + // In-place binary operation + Pstream::listGatherReduce<T, CombineOp, true>(values, cop, tag, comm); +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +// Map variants + +template<class Container, class BinaryOp, bool InplaceMode> +void Foam::Pstream::mapGather +( + Container& values, + const BinaryOp& bop, + const int tag, + const label communicator +) +{ + if (!UPstream::is_parallel(communicator)) + { + // Nothing to do + return; + } + else + { + // Communication order + const auto& comms = UPstream::whichCommunication(communicator); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[UPstream::myProcNo(communicator)]; + const auto& below = myComm.below(); + + // Receive from my downstairs neighbours + for (const auto proci : below) + { + // Map/HashTable: non-contiguous + Container received; + IPstream::recv(received, proci, tag, communicator); + + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " received from " + << proci << " data:" << received << endl; + } + } + + const auto last = received.end(); + + for (auto iter = received.begin(); iter != last; ++iter) + { + auto slot = values.find(iter.key()); + + if (slot.good()) + { + // Combine with existing entry + + if constexpr (InplaceMode) + { + // In-place binary operation + bop(slot.val(), iter.val()); + } + else + { + // Assign result of binary operation + slot.val() = bop(slot.val(), iter.val()); + } + } + else + { + // Create a new entry + values.emplace(iter.key(), std::move(iter.val())); + } + + } + } + + // Send up values + if (myComm.above() >= 0) + { + if constexpr (InplaceMode) + { + if (debug & 2) + { + Perr<< " sending to " << myComm.above() + << " data:" << values << endl; + } + } + + OPstream::send(values, myComm.above(), tag, communicator); + } + } +} + + +template<class Container, class BinaryOp, bool InplaceMode> +void Foam::Pstream::mapGatherReduce +( + Container& values, + const BinaryOp& bop, + const int tag, + const label comm +) +{ + Pstream::mapGather<Container, BinaryOp, InplaceMode> + ( + values, bop, tag, comm + ); + Pstream::broadcast(values, comm); +} + + +template<class Container, class CombineOp> +void Foam::Pstream::mapCombineGather +( + Container& values, + const CombineOp& cop, + const int tag, + const label comm +) +{ + // In-place binary operation + Pstream::mapGather<Container, CombineOp, true> + ( + values, cop, tag, comm + ); +} + + +template<class Container, class CombineOp> +void Foam::Pstream::mapCombineReduce +( + Container& values, + const CombineOp& cop, + const int tag, + const label comm +) +{ + // In-place binary operation + Pstream::mapGatherReduce<Container, CombineOp, true> + ( + values, cop, tag, comm + ); +} + + +// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // + +// Single values to/from a list + +template<class T> +Foam::List<T> Foam::Pstream::listGatherValues +( + const T& localValue, + const label communicator, + [[maybe_unused]] const int tag +) +{ + if (!UPstream::is_parallel(communicator)) + { + // non-parallel: return own value + // TBD: only when UPstream::is_rank(communicator) as well? + List<T> allValues(1); + allValues[0] = localValue; + return allValues; + } + else if constexpr (is_contiguous_v<T>) + { + // UPstream version is contiguous only + return UPstream::listGatherValues(localValue, communicator); + } + else + { + // Standard gather (all to one) + + // The data are non-contiguous! + // + // Non-trivial to manage non-blocking gather without a + // PEX/NBX approach (eg, PstreamBuffers). + // Leave with simple exchange for now + + List<T> allValues; + if (UPstream::master(communicator)) + { + allValues.resize(UPstream::nProcs(communicator)); + + for (const int proci : UPstream::subProcs(communicator)) + { + IPstream::recv(allValues[proci], proci, tag, communicator); + } + + allValues[0] = localValue; + } + else if (UPstream::is_rank(communicator)) + { + OPstream::send(localValue, UPstream::masterNo(), tag, communicator); + } + + return allValues; + } +} + + +template<class T> +T Foam::Pstream::listScatterValues +( + const UList<T>& allValues, + const label communicator, + [[maybe_unused]] const int tag +) +{ + if (!UPstream::is_parallel(communicator)) + { + // non-parallel: return first value + // TBD: only when UPstream::is_rank(communicator) as well? + + if (!allValues.empty()) + { + return allValues[0]; + } + + return T{}; // Fallback value + } + else if constexpr (is_contiguous_v<T>) + { + // UPstream version is contiguous only + return UPstream::listScatterValues(allValues, communicator); + } + else + { + // Standard scatter (one to all) + + T localValue{}; + + if (UPstream::master(communicator)) + { + const label numProc = UPstream::nProcs(communicator); + + if (allValues.size() < numProc) + { + FatalErrorInFunction + << "Attempting to send " << allValues.size() + << " values to " << numProc << " processors" << endl + << Foam::abort(FatalError); + } + + const label startOfRequests = UPstream::nRequests(); + + List<DynamicList<char>> sendBuffers(numProc); + + for (const int proci : UPstream::subProcs(communicator)) + { + UOPstream toProc + ( + UPstream::commsTypes::nonBlocking, + proci, + sendBuffers[proci], + tag, + communicator + ); + toProc << allValues[proci]; + } + + // Wait for outstanding requests + UPstream::waitRequests(startOfRequests); + + return allValues[0]; + } + else if (UPstream::is_rank(communicator)) + { + IPstream::recv(localValue, UPstream::masterNo(), tag, communicator); + } + + return localValue; + } +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C deleted file mode 100644 index edc269a067819e71fbf064bcbab83b3f7bb52ed5..0000000000000000000000000000000000000000 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.C +++ /dev/null @@ -1,390 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | OpenFOAM: The Open Source CFD Toolbox - \\ / O peration | - \\ / A nd | www.openfoam.com - \\/ M anipulation | -------------------------------------------------------------------------------- - Copyright (C) 2011-2017 OpenFOAM Foundation - Copyright (C) 2015-2025 OpenCFD Ltd. -------------------------------------------------------------------------------- -License - This file is part of OpenFOAM. - - OpenFOAM is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - OpenFOAM is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. - -Description - Gather data from all processors onto single processor according to some - communication schedule (usually tree-to-master). - The gathered data will be a list with element procID the data from processor - procID. Before calling every processor should insert its value into - values[UPstream::myProcNo(comm)]. - Note: after gather every processor only knows its own data and that of the - processors below it. Only the 'master' of the communication schedule holds - a fully filled List. Use scatter to distribute the data. - -\*---------------------------------------------------------------------------*/ - -#include "IPstream.H" -#include "OPstream.H" -#include "contiguous.H" - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -template<class T> -void Foam::Pstream::gatherList -( - const UPstream::commsStructList& comms, - UList<T>& values, - const int tag, - const label comm -) -{ - if (!comms.empty() && UPstream::is_parallel(comm)) - { - const label myProci = UPstream::myProcNo(comm); - const label numProc = UPstream::nProcs(comm); - - if (values.size() < numProc) - { - FatalErrorInFunction - << "List of values:" << values.size() - << " < numProcs:" << numProc << nl - << Foam::abort(FatalError); - } - - // My communication order - const auto& myComm = comms[myProci]; - - // Receive from my downstairs neighbours - for (const label belowID : myComm.below()) - { - const labelList& belowLeaves = comms[belowID].allBelow(); - - if constexpr (is_contiguous_v<T>) - { - List<T> received(belowLeaves.size() + 1); - - UIPstream::read - ( - UPstream::commsTypes::scheduled, - belowID, - received, - tag, - comm - ); - - values[belowID] = received[0]; - - forAll(belowLeaves, leafI) - { - values[belowLeaves[leafI]] = received[leafI + 1]; - } - } - else - { - IPstream fromBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, // bufsize - tag, - comm - ); - fromBelow >> values[belowID]; - - if (debug & 2) - { - Perr<< " received through " - << belowID << " data from:" << belowID - << " data:" << values[belowID] << endl; - } - - // Receive from all other processors below belowID - for (const label leafID : belowLeaves) - { - fromBelow >> values[leafID]; - - if (debug & 2) - { - Perr<< " received through " - << belowID << " data from:" << leafID - << " data:" << values[leafID] << endl; - } - } - } - } - - // Send up from values: - // - my own value first - // - all belowLeaves next - if (myComm.above() >= 0) - { - const labelList& belowLeaves = myComm.allBelow(); - - if (debug & 2) - { - Perr<< " sending to " << myComm.above() - << " data from me:" << myProci - << " data:" << values[myProci] << endl; - } - - if constexpr (is_contiguous_v<T>) - { - List<T> sending(belowLeaves.size() + 1); - sending[0] = values[myProci]; - - forAll(belowLeaves, leafI) - { - sending[leafI + 1] = values[belowLeaves[leafI]]; - } - - UOPstream::write - ( - UPstream::commsTypes::scheduled, - myComm.above(), - sending, - tag, - comm - ); - } - else - { - OPstream toAbove - ( - UPstream::commsTypes::scheduled, - myComm.above(), - 0, // bufsize - tag, - comm - ); - toAbove << values[myProci]; - - for (const label leafID : belowLeaves) - { - if (debug & 2) - { - Perr<< " sending to " - << myComm.above() << " data from:" << leafID - << " data:" << values[leafID] << endl; - } - toAbove << values[leafID]; - } - } - } - } -} - - -template<class T> -void Foam::Pstream::scatterList -( - const UPstream::commsStructList& comms, - UList<T>& values, - const int tag, - const label comm -) -{ - // Apart from the additional size check, the only difference - // between scatterList() and using broadcast(List<T>&) or a regular - // scatter(List<T>&) is that processor-local data is skipped. - - if (!comms.empty() && UPstream::is_parallel(comm)) - { - const label myProci = UPstream::myProcNo(comm); - const label numProc = UPstream::nProcs(comm); - - if (values.size() < numProc) - { - FatalErrorInFunction - << "List of values:" << values.size() - << " < numProcs:" << numProc << nl - << Foam::abort(FatalError); - } - - // My communication order - const auto& myComm = comms[myProci]; - - // Receive from up - if (myComm.above() >= 0) - { - const labelList& notBelowLeaves = myComm.allNotBelow(); - - if constexpr (is_contiguous_v<T>) - { - List<T> received(notBelowLeaves.size()); - - UIPstream::read - ( - UPstream::commsTypes::scheduled, - myComm.above(), - received, - tag, - comm - ); - - forAll(notBelowLeaves, leafI) - { - values[notBelowLeaves[leafI]] = received[leafI]; - } - } - else - { - IPstream fromAbove - ( - UPstream::commsTypes::scheduled, - myComm.above(), - 0, // bufsize - tag, - comm - ); - - for (const label leafID : notBelowLeaves) - { - fromAbove >> values[leafID]; - - if (debug & 2) - { - Perr<< " received through " - << myComm.above() << " data for:" << leafID - << " data:" << values[leafID] << endl; - } - } - } - } - - // Send to my downstairs neighbours - forAllReverse(myComm.below(), belowI) - { - const label belowID = myComm.below()[belowI]; - const labelList& notBelowLeaves = comms[belowID].allNotBelow(); - - if constexpr (is_contiguous_v<T>) - { - List<T> sending(notBelowLeaves.size()); - - forAll(notBelowLeaves, leafI) - { - sending[leafI] = values[notBelowLeaves[leafI]]; - } - - UOPstream::write - ( - UPstream::commsTypes::scheduled, - belowID, - sending, - tag, - comm - ); - } - else - { - OPstream toBelow - ( - UPstream::commsTypes::scheduled, - belowID, - 0, // bufsize - tag, - comm - ); - - // Send data destined for all other processors below belowID - for (const label leafID : notBelowLeaves) - { - toBelow << values[leafID]; - - if (debug & 2) - { - Perr<< " sent through " - << belowID << " data for:" << leafID - << " data:" << values[leafID] << endl; - } - } - } - } - } -} - - -template<class T> -void Foam::Pstream::gatherList -( - UList<T>& values, - const int tag, - const label comm -) -{ - Pstream::gatherList - ( - UPstream::whichCommunication(comm), - values, - tag, - comm - ); -} - - -// Unused - slate for removal? (MAY-2023) -template<class T> -void Foam::Pstream::scatterList -( - UList<T>& values, - const int tag, - const label comm -) -{ - Pstream::scatterList - ( - UPstream::whichCommunication(comm), - values, - tag, - comm - ); -} - - -template<class T> -void Foam::Pstream::allGatherList -( - UList<T>& values, - [[maybe_unused]] const int tag, - const label comm -) -{ - if (UPstream::is_parallel(comm)) - { - if constexpr (is_contiguous_v<T>) - { - if (values.size() < UPstream::nProcs(comm)) - { - FatalErrorInFunction - << "List of values is too small:" << values.size() - << " vs numProcs:" << UPstream::nProcs(comm) << nl - << Foam::abort(FatalError); - } - - UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm); - } - else - { - const auto& comms = UPstream::whichCommunication(comm); - - Pstream::gatherList(comms, values, tag, comm); - Pstream::scatterList(comms, values, tag, comm); - } - } -} - - -// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx new file mode 100644 index 0000000000000000000000000000000000000000..082e1688007df5ce70ef8a230cc806186abd4919 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx @@ -0,0 +1,520 @@ +/*---------------------------------------------------------------------------*\ + ========= | + \\ / F ield | OpenFOAM: The Open Source CFD Toolbox + \\ / O peration | + \\ / A nd | www.openfoam.com + \\/ M anipulation | +------------------------------------------------------------------------------- + Copyright (C) 2011-2017 OpenFOAM Foundation + Copyright (C) 2015-2025 OpenCFD Ltd. +------------------------------------------------------------------------------- +License + This file is part of OpenFOAM. + + OpenFOAM is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OpenFOAM is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>. + +Description + Gather data from all processors onto single processor according to some + communication schedule (usually tree-to-master). + The gathered data will be a list with element procID the data from processor + procID. Before calling every processor should insert its value into + values[UPstream::myProcNo(comm)]. + Note: after gather every processor only knows its own data and that of the + processors below it. Only the 'master' of the communication schedule holds + a fully filled List. Use broadcast to distribute the data. + +\*---------------------------------------------------------------------------*/ + +#include "contiguous.H" +#include "IPstream.H" +#include "OPstream.H" + +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +template<class T> +void Foam::Pstream::gatherList_tree_algorithm +( + UList<T>& values, + const int tag, + const label communicator +) +{ + if (FOAM_UNLIKELY(!UPstream::is_parallel(communicator))) + { + // Nothing to do + return; + } + else + { + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator))) + { + FatalErrorInFunction + << "List of values:" << values.size() + << " < numProcs:" << UPstream::nProcs(communicator) << nl + << Foam::abort(FatalError); + } + + const label myProci = UPstream::myProcNo(communicator); + + // Communication order + const auto& comms = UPstream::whichCommunication(communicator); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[myProci]; + + + // Local buffer for send/recv of contiguous + [[maybe_unused]] DynamicList<T> buffer; + + // Presize buffer + if constexpr (is_contiguous_v<T>) + { + label maxCount = 0; + + for (const auto belowID : myComm.below()) + { + auto count = comms[belowID].allBelow().size(); + maxCount = Foam::max(maxCount, count); + } + + if (myComm.above() >= 0) + { + auto count = myComm.allBelow().size(); + maxCount = Foam::max(maxCount, count); + } + + buffer.reserve(maxCount + 1); + } + + + // Receive from my downstairs neighbours + for (const auto belowID : myComm.below()) + { + const auto& leaves = comms[belowID].allBelow(); + + if constexpr (is_contiguous_v<T>) + { + if (leaves.empty()) + { + // Receive directly into destination + UIPstream::read + ( + UPstream::commsTypes::scheduled, + belowID, + values[belowID], + tag, + communicator + ); + } + else + { + // Receive via intermediate buffer + buffer.resize_nocopy(leaves.size() + 1); + + UIPstream::read + ( + UPstream::commsTypes::scheduled, + belowID, + buffer, + tag, + communicator + ); + + label recvIdx(0); + values[belowID] = buffer[recvIdx++]; + + for (const auto leafID : leaves) + { + values[leafID] = buffer[recvIdx++]; + } + } + } + else + { + IPstream fromBelow + ( + UPstream::commsTypes::scheduled, + belowID, + 0, // bufsize + tag, + communicator + ); + fromBelow >> values[belowID]; + + if (debug & 2) + { + Perr<< " received through " + << belowID << " data from:" << belowID + << " data:" << values[belowID] << endl; + } + + // Receive from all other processors below belowID + for (const auto leafID : leaves) + { + fromBelow >> values[leafID]; + + if (debug & 2) + { + Perr<< " received through " + << belowID << " data from:" << leafID + << " data:" << values[leafID] << endl; + } + } + } + } + + // Send up from values: + // - my own value first + // - all belowLeaves next + if (myComm.above() >= 0) + { + const auto& leaves = myComm.allBelow(); + + if (debug & 2) + { + Perr<< " sending to " << myComm.above() + << " data from me:" << myProci + << " data:" << values[myProci] << endl; + } + + if constexpr (is_contiguous_v<T>) + { + if (leaves.empty()) + { + // Send directly + UOPstream::write + ( + UPstream::commsTypes::scheduled, + myComm.above(), + values[myProci], + tag, + communicator + ); + } + else + { + // Send via intermediate buffer + buffer.resize_nocopy(leaves.size() + 1); + + label sendIdx(0); + buffer[sendIdx++] = values[myProci]; + + for (const auto leafID : leaves) + { + buffer[sendIdx++] = values[leafID]; + } + + UOPstream::write + ( + UPstream::commsTypes::scheduled, + myComm.above(), + buffer, + tag, + communicator + ); + } + } + else + { + OPstream toAbove + ( + UPstream::commsTypes::scheduled, + myComm.above(), + 0, // bufsize + tag, + communicator + ); + toAbove << values[myProci]; + + for (const auto leafID : leaves) + { + if (debug & 2) + { + Perr<< " sending to " + << myComm.above() << " data from:" << leafID + << " data:" << values[leafID] << endl; + } + toAbove << values[leafID]; + } + } + } + } +} + + +template<class T> +void Foam::Pstream::scatterList_tree_algorithm +( + UList<T>& values, + const int tag, + const label communicator +) +{ + if (FOAM_UNLIKELY(!UPstream::is_parallel(communicator))) + { + // Nothing to do + return; + } + else + { + // Apart from the additional size check, the only difference + // between scatterList() and using broadcast(List<T>&) or a regular + // scatter(List<T>&) is that processor-local data is skipped. + + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator))) + { + FatalErrorInFunction + << "List of values:" << values.size() + << " < numProcs:" << UPstream::nProcs(communicator) << nl + << Foam::abort(FatalError); + } + + const label myProci = UPstream::myProcNo(communicator); + + // Communication order + const auto& comms = UPstream::whichCommunication(communicator); + // if (comms.empty()) return; // extra safety? + const auto& myComm = comms[myProci]; + + + // Local buffer for send/recv of contiguous + [[maybe_unused]] DynamicList<T> buffer; + + // Presize buffer + if constexpr (is_contiguous_v<T>) + { + label maxCount = 0; + + if (myComm.above() >= 0) + { + auto count = myComm.allNotBelow().size(); + maxCount = Foam::max(maxCount, count); + } + + for (const auto belowID : myComm.below()) + { + auto count = comms[belowID].allNotBelow().size(); + maxCount = Foam::max(maxCount, count); + } + + buffer.reserve(maxCount); + } + + + // Receive from up + if (myComm.above() >= 0) + { + const auto& leaves = myComm.allNotBelow(); + + if constexpr (is_contiguous_v<T>) + { + buffer.resize_nocopy(leaves.size()); + + UIPstream::read + ( + UPstream::commsTypes::scheduled, + myComm.above(), + buffer, + tag, + communicator + ); + + label recvIdx(0); + for (const auto leafID : leaves) + { + values[leafID] = buffer[recvIdx++]; + } + } + else + { + IPstream fromAbove + ( + UPstream::commsTypes::scheduled, + myComm.above(), + 0, // bufsize + tag, + communicator + ); + + for (const auto leafID : leaves) + { + fromAbove >> values[leafID]; + + if (debug & 2) + { + Perr<< " received through " + << myComm.above() << " data for:" << leafID + << " data:" << values[leafID] << endl; + } + } + } + } + + // Send to my downstairs neighbours + forAllReverse(myComm.below(), belowI) + { + const auto belowID = myComm.below()[belowI]; + const auto& leaves = comms[belowID].allNotBelow(); + + if constexpr (is_contiguous_v<T>) + { + buffer.resize_nocopy(leaves.size()); + + label sendIdx(0); + for (const auto leafID : leaves) + { + buffer[sendIdx++] = values[leafID]; + } + + UOPstream::write + ( + UPstream::commsTypes::scheduled, + belowID, + buffer, + tag, + communicator + ); + } + else + { + OPstream toBelow + ( + UPstream::commsTypes::scheduled, + belowID, + 0, // bufsize + tag, + communicator + ); + + // Send data destined for all other processors below belowID + for (const auto leafID : leaves) + { + toBelow << values[leafID]; + + if (debug & 2) + { + Perr<< " sent through " + << belowID << " data for:" << leafID + << " data:" << values[leafID] << endl; + } + } + } + } + } +} + + +template<class T> +void Foam::Pstream::gatherList +( + UList<T>& values, + [[maybe_unused]] const int tag, + const label communicator +) +{ + if (!UPstream::is_parallel(communicator)) + { + // Nothing to do + return; + } + else if constexpr (is_contiguous_v<T>) + { + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator))) + { + FatalErrorInFunction + << "List of values:" << values.size() + << " < numProcs:" << UPstream::nProcs(communicator) << nl + << Foam::abort(FatalError); + } + + // In-place gather for contiguous types + UPstream::mpiGather + ( + nullptr, + values.data_bytes(), + sizeof(T), + communicator + ); + } + else + { + Pstream::gatherList_tree_algorithm(values, tag, communicator); + } +} + + +template<class T> +void Foam::Pstream::scatterList +( + UList<T>& values, + [[maybe_unused]] const int tag, + const label communicator +) +{ + if (!UPstream::is_parallel(communicator)) + { + // Nothing to do + return; + } + else if constexpr (is_contiguous_v<T>) + { + // In-place scatter for contiguous types + UPstream::mpiScatter + ( + nullptr, + values.data_bytes(), + sizeof(T), + communicator + ); + } + else + { + Pstream::scatterList_tree_algorithm(values, tag, communicator); + } +} + + +template<class T> +void Foam::Pstream::allGatherList +( + UList<T>& values, + [[maybe_unused]] const int tag, + const label comm +) +{ + if (!UPstream::is_parallel(comm)) + { + // Nothing to do + return; + } + else if constexpr (is_contiguous_v<T>) + { + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(comm))) + { + FatalErrorInFunction + << "List of values is too small:" << values.size() + << " vs numProcs:" << UPstream::nProcs(comm) << nl + << Foam::abort(FatalError); + } + + UPstream::mpiAllGather(values.data_bytes(), sizeof(T), comm); + } + else + { + Pstream::gatherList(values, tag, comm); + Pstream::scatterList(values, tag, comm); + } +} + + +// ************************************************************************* // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 4f47d5f98377194a56fa177de4bc9a8f18624094..cd2e1d3c84e4f1eeab4351cca128010a0a68d0f5 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -640,7 +640,7 @@ Foam::UPstream::treeCommunication(const label communicator) } -void Foam::UPstream::printCommTree(const label communicator) +void Foam::UPstream::printCommTree(int communicator) { const auto& comms = UPstream::whichCommunication(communicator); @@ -663,14 +663,60 @@ bool Foam::UPstream::usingNodeComms(const label communicator) ( parRun_ && (constWorldComm_ == communicator) && (nodeCommsControl_ > 0) + // More than one node and above defined threshold && (numNodes_ > 1) && (numNodes_ >= nodeCommsMin_) // Some processes do share nodes && (numNodes_ < procIDs_[constWorldComm_].size()) + + // Extra paranoid (guard against calling during startup) + && (commInterNode_ > constWorldComm_) + && (commLocalNode_ > constWorldComm_) ); } +const Foam::List<int>& Foam::UPstream::interNode_offsets() +{ + static std::unique_ptr<List<int>> singleton; + + if (!singleton) + { + // Extra paranoid (guard against calling during startup) + if + ( + (commInterNode_ <= constWorldComm_) + || (commInterNode_ >= procIDs_.size()) + ) + { + return List<int>::null(); + } + + singleton = std::make_unique<List<int>>(); + auto& offsets = *singleton; + + const auto& procs = procIDs_[commInterNode_]; + + // The procIDs_ are already the offsets, but missing the end offset + if (!procs.empty()) + { + const auto count = procs.size(); + + offsets.resize(count+1); + std::copy_n + ( + procs.begin(), + count, + offsets.begin() + ); + offsets[count] = UPstream::nProcs(constWorldComm_); + } + } + + return *singleton; +} + + // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // bool Foam::UPstream::parRun_(false); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H index b7df5c279ee4da7d96784d24025b54242e3783bb..a49db6dd8a1d845dfc9d368d648313e9eded3c93 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.H @@ -32,8 +32,8 @@ Description SourceFiles UPstream.C + UPstream.txx UPstreamCommsStruct.C - UPstreamTemplates.C \*---------------------------------------------------------------------------*/ @@ -108,18 +108,18 @@ public: // Private Data //- The procID of the processor \em directly above - label above_; + int above_; //- The procIDs of processors \em directly below - labelList below_; + List<int> below_; //- The procIDs of all processors below myProcNo, //- not just directly below - labelList allBelow_; + List<int> allBelow_; //- The procIDs of all processors not below myProcNo - // (inverse of allBelow_ without myProcNo) - labelList allNotBelow_; + //- (inverse of allBelow_ without myProcNo) + List<int> allNotBelow_; public: @@ -132,20 +132,20 @@ public: //- Move construct from components commsStruct ( - const label above, - labelList&& below, - labelList&& allBelow, - labelList&& allNotBelow + const int above, + List<int>&& below, + List<int>&& allBelow, + List<int>&& allNotBelow ); //- Copy construct from below, allBelow components commsStruct ( - const label numProcs, - const label myProcID, - const label above, - const labelUList& below, - const labelUList& allBelow + const int numProcs, + const int myProcID, + const int above, + const UList<int>& below, + const UList<int>& allBelow ); @@ -153,26 +153,26 @@ public: // Access - //- The number of processors addressed by the structure - label nProcs() const noexcept; - //- The procID of the processor \em directly above - label above() const noexcept { return above_; } + int above() const noexcept { return above_; } //- The procIDs of the processors \em directly below - const labelList& below() const noexcept { return below_; } + const List<int>& below() const noexcept { return below_; } - //- The procIDs of all processors below + //- The procIDs of \em all processors below //- (so not just directly below) - const labelList& allBelow() const noexcept { return allBelow_; } + const List<int>& allBelow() const noexcept { return allBelow_; } //- The procIDs of all processors not below myProcNo. //- The inverse set of allBelow without myProcNo. - const labelList& allNotBelow() const noexcept + const List<int>& allNotBelow() const noexcept { return allNotBelow_; } + //- The number of processors addressed by the structure + int nProcs() const noexcept; + // Edit @@ -183,9 +183,9 @@ public: //- possibly with communicator-specific adjustments void reset ( - const label procID, - const label numProcs, - const label comm = -1 + const int myProci, + const int numProcs, + const int communicator ); @@ -203,7 +203,7 @@ public: // Private Data //- The communicator index - label comm_; + int comm_; //- The communication tree List<commsStruct> tree_; @@ -216,7 +216,7 @@ public: commsStructList() noexcept : comm_(-1) {} //- Construct empty with given communicator - commsStructList(label comm) noexcept : comm_(comm) {} + explicit commsStructList(int comm) noexcept : comm_(comm) {} // Static Functions @@ -230,8 +230,8 @@ public: //- True if communicator is non-negative (ie, was assigned) bool good() const noexcept { return (comm_ >= 0); } - //- The communicator label - label comm() const noexcept { return comm_; } + //- The communicator internal index + int comm() const noexcept { return comm_; } //- Clear the list void clear() { return tree_.clear(); } @@ -242,20 +242,23 @@ public: //- The number of entries label size() const noexcept { return tree_.size(); } - //- Reset communicator index and clear demand-driven entries - void init(const label comm); + //- Reset communicator index, fill tree with empty entries + void init(int communicator); + + //- Reset communicator index, clear tree entries + void reset(int communicator); //- Get existing or create (demand-driven) entry - const UPstream::commsStruct& get(const label proci) const; + const UPstream::commsStruct& get(int proci) const; //- Get existing or create (demand-driven) entry - const UPstream::commsStruct& operator[](const label proci) const + const UPstream::commsStruct& operator[](int proci) const { return get(proci); } //- Print un-directed graph in graphviz dot format - void printGraph(Ostream& os, label proci = 0) const; + void printGraph(Ostream& os, int proci = 0) const; }; @@ -1074,6 +1077,10 @@ public: return rangeType(1, static_cast<int>(nProcs(communicator)-1)); } + //- Processor offsets corresponding to the inter-node communicator + static const List<int>& interNode_offsets(); + + //- Communication schedule for linear all-to-master (proc 0) static const commsStructList& linearCommunication ( @@ -1105,7 +1112,7 @@ public: ( np <= 1 ? commsStructList::null() - : (np <= 2 || np < nProcsSimpleSum) + : (np <= 2 || np < UPstream::nProcsSimpleSum) ? linearCommunication(communicator) : treeCommunication(communicator) ); @@ -1164,20 +1171,20 @@ public: #undef Pstream_CommonRoutines - #define Pstream_CommonRoutines(Native) \ + #define Pstream_CommonRoutines(Type) \ \ - /*!\brief Exchange \c Native data with all ranks in communicator */ \ + /*!\brief Exchange \c Type data with all ranks in communicator */ \ /*! \em non-parallel : simple copy of \p sendData to \p recvData */ \ static void allToAll \ ( \ /*! [in] The value at [proci] is sent to proci */ \ - const UList<Native>& sendData, \ + const UList<Type>& sendData, \ /*! [out] The data received from the other ranks */ \ - UList<Native>& recvData, \ + UList<Type>& recvData, \ const label communicator = worldComm \ ); \ \ - /*!\brief Exchange \em non-zero \c Native data between ranks [NBX] */ \ + /*!\brief Exchange \em non-zero \c Type data between ranks [NBX] */ \ /*! \p recvData is always initially assigned zero and no non-zero */ \ /*! values are sent/received from other ranks. */ \ /*! \em non-parallel : simple copy of \p sendData to \p recvData */ \ @@ -1188,15 +1195,15 @@ public: static void allToAllConsensus \ ( \ /*! [in] The \em non-zero value at [proci] is sent to proci */ \ - const UList<Native>& sendData, \ + const UList<Type>& sendData, \ /*! [out] The non-zero value received from each rank */ \ - UList<Native>& recvData, \ + UList<Type>& recvData, \ /*! Message tag for the communication */ \ const int tag, \ const label communicator = worldComm \ ); \ \ - /*!\brief Exchange \c Native data between ranks [NBX] */ \ + /*!\brief Exchange \c Type data between ranks [NBX] */ \ /*! \p recvData map is always cleared initially so a simple check */ \ /*! of its keys is sufficient to determine connectivity. */ \ /*! \em non-parallel : copy own rank (if it exists) */ \ @@ -1204,26 +1211,26 @@ public: static void allToAllConsensus \ ( \ /*! [in] The value at [proci] is sent to proci. */ \ - const Map<Native>& sendData, \ + const Map<Type>& sendData, \ /*! [out] The values received from given ranks. */ \ - Map<Native>& recvData, \ + Map<Type>& recvData, \ /*! Message tag for the communication */ \ const int tag, \ const label communicator = worldComm \ ); \ \ - /*!\brief Exchange \c Native data between ranks [NBX] */ \ + /*!\brief Exchange \c Type data between ranks [NBX] */ \ /*! \returns any received data as a Map */ \ - static Map<Native> allToAllConsensus \ + static Map<Type> allToAllConsensus \ ( \ /*! [in] The value at [proci] is sent to proci. */ \ - const Map<Native>& sendData, \ + const Map<Type>& sendData, \ /*! Message tag for the communication */ \ const int tag, \ const label communicator = worldComm \ ) \ { \ - Map<Native> recvData; \ + Map<Type> recvData; \ allToAllConsensus(sendData, recvData, tag, communicator); \ return recvData; \ } @@ -1237,64 +1244,96 @@ public: // Low-level gather/scatter routines #undef Pstream_CommonRoutines - #define Pstream_CommonRoutines(Native) \ + #define Pstream_CommonRoutines(Type) \ \ - /*! \brief Receive identically-sized \c Native data from all ranks */ \ + /*! \brief Receive identically-sized \c Type data from all ranks */ \ static void mpiGather \ ( \ - /*! On rank: individual value to send */ \ - const Native* sendData, \ - /*! On master: receive buffer with all values */ \ - Native* recvData, \ + /*! On rank: individual value to send (or nullptr for inplace) */ \ + const Type* sendData, \ + /*! Master: receive buffer with all values */ \ + /*! Or for in-place send/recv when sendData is nullptr */ \ + Type* recvData, \ /*! Number of send/recv data per rank. Globally consistent! */ \ int count, \ const label communicator = worldComm \ ); \ \ - /*! \brief Send identically-sized \c Native data to all ranks */ \ + /*! \brief Send identically-sized \c Type data to all ranks */ \ static void mpiScatter \ ( \ - /*! On master: send buffer with all values */ \ - const Native* sendData, \ + /*! Master: send buffer with all values (nullptr for inplace) */ \ + const Type* sendData, \ /*! On rank: individual value to receive */ \ - Native* recvData, \ + /*! Or for in-place send/recv when sendData is nullptr */ \ + Type* recvData, \ /*! Number of send/recv data per rank. Globally consistent! */ \ int count, \ const label communicator = worldComm \ ); \ \ - /*! \brief Gather/scatter identically-sized \c Native data */ \ + /*! \brief Gather/scatter identically-sized \c Type data */ \ /*! Send data from proc slot, receive into all slots */ \ static void mpiAllGather \ ( \ /*! On all ranks: the base of the data locations */ \ - Native* allData, \ + Type* allData, \ /*! Number of send/recv data per rank. Globally consistent! */ \ int count, \ const label communicator = worldComm \ ); \ \ - /*! \brief Receive variable length \c Native data from all ranks */ \ - static void gather \ + /*! \brief Receive variable length \c Type data from all ranks */ \ + static void mpiGatherv \ ( \ - const Native* sendData, \ + const Type* sendData, \ int sendCount, /*!< Ignored on master if recvCount[0] == 0 */ \ - Native* recvData, /*!< Ignored on non-root rank */ \ + Type* recvData, /*!< Ignored on non-root rank */ \ const UList<int>& recvCounts, /*!< Ignored on non-root rank */ \ const UList<int>& recvOffsets, /*!< Ignored on non-root rank */ \ const label communicator = worldComm \ ); \ \ - /*! \brief Send variable length \c Native data to all ranks */ \ - static void scatter \ + /*! \brief Send variable length \c Type data to all ranks */ \ + static void mpiScatterv \ ( \ - const Native* sendData, /*!< Ignored on non-root rank */ \ + const Type* sendData, /*!< Ignored on non-root rank */ \ const UList<int>& sendCounts, /*!< Ignored on non-root rank */ \ const UList<int>& sendOffsets, /*!< Ignored on non-root rank */ \ - Native* recvData, \ + Type* recvData, \ int recvCount, \ const label communicator = worldComm \ - ); + ); \ + \ + /*! \deprecated(2025-02) prefer mpiGatherv */ \ + FOAM_DEPRECATED_FOR(2025-02, "mpiGatherv()") \ + inline static void gather \ + ( \ + const Type* send, \ + int count, \ + Type* recv, \ + const UList<int>& counts, \ + const UList<int>& offsets, \ + const label comm = worldComm \ + ) \ + { \ + UPstream::mpiGatherv(send, count, recv, counts, offsets, comm); \ + } \ + \ + /*! \deprecated(2025-02) prefer mpiScatterv */ \ + FOAM_DEPRECATED_FOR(2025-02, "mpiScatterv()") \ + inline static void scatter \ + ( \ + const Type* send, \ + const UList<int>& counts, \ + const UList<int>& offsets, \ + Type* recv, \ + int count, \ + const label comm = worldComm \ + ) \ + { \ + UPstream::mpiScatterv(send, counts, offsets, recv, count, comm); \ + } Pstream_CommonRoutines(char); Pstream_CommonRoutines(int32_t); @@ -1650,7 +1689,7 @@ Ostream& operator<<(Ostream&, const UPstream::commsStruct&); // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #ifdef NoRepository - #include "UPstreamTemplates.C" + #include "UPstream.txx" #endif // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx similarity index 100% rename from src/OpenFOAM/db/IOstreams/Pstreams/UPstreamTemplates.C rename to src/OpenFOAM/db/IOstreams/Pstreams/UPstream.txx diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C index 72a41bba26823d08b76b6fe08157c33000aa7804..e55eff05ed8b53bcfd0a4a3b219a617c31288c3a 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstreamCommsStruct.C @@ -28,6 +28,9 @@ License #include "UPstream.H" +#include <algorithm> +#include <numeric> + // * * * * * * * * * * * * * * * Local Functions * * * * * * * * * * * * * * // namespace Foam @@ -38,9 +41,9 @@ static void printGraph_impl ( Ostream& os, const UPstream::commsStructList& comms, - const label proci, - label depth, - const label maxDepth = 1024 + const int proci, + int depth, + const int maxDepth = 1024 ) { if (proci >= comms.size()) @@ -59,41 +62,80 @@ static void printGraph_impl // Prefer left-to-right layout for large graphs os << indent << "rankdir=LR" << nl; + } + - if (below.empty()) + // Output the immediate neighbours below + + if (below.empty()) + { + if (proci == 0) { // A graph with a single-node (eg, self-comm) os << indent << proci << nl; } } + else + { + os << indent << proci << " -- " << token::BEGIN_BLOCK; - int pos = 0; + // Accumulate into ranges whenever possible + IntRange<int> range; - for (const auto nbrProci : below) - { - if (pos) - { - os << " "; - } - else + // Print accumulated range and reset + auto emit_range = [&]() { - os << indent; - } - os << proci << " -- " << nbrProci; + if (!range.empty()) + { + os << ' '; + if (range.min() < range.max()) + { + os << '"' << range.min() << ".." << range.max() << '"'; + } + else + { + os << range.min(); + } + range.reset(); + } + }; - if (++pos >= 4) // Max 4 items per line + for (const auto nbrProci : below) { - pos = 0; - os << nl; + const bool terminal = comms[nbrProci].below().empty(); + + if + ( + terminal + && (!range.empty() && (range.max()+1 == nbrProci)) + ) + { + // Accumulate + ++range; + continue; + } + + // Emit accumulated range + emit_range(); + + if (terminal) + { + range.reset(nbrProci, 1); + } + else + { + os << token::SPACE << nbrProci; + } } - } - if (pos) - { - os << nl; + // Emit accumulated range + emit_range(); + + os << token::SPACE << token::END_BLOCK << nl; } - // Limit the maximum depth + + // Recurse into below neighbours, but limit the maximum depth ++depth; if (depth >= maxDepth && (proci != 0)) { @@ -109,7 +151,6 @@ static void printGraph_impl if (proci == 0) { os.endBlock(); - os << "// end graph" << nl; } } @@ -150,46 +191,46 @@ static void printGraph_impl namespace Foam { -static label simpleTree +static int simpleTree ( - const label procID, - const label numProcs, + const int myProci, + const int numProcs, - DynamicList<label>& below, - DynamicList<label>& allBelow + DynamicList<int>& below, + DynamicList<int>& allBelow ) { - label above(-1); + int above(-1); - for (label mod = 2, step = 1; step < numProcs; step = mod) + for (int mod = 2, step = 1; step < numProcs; step = mod) { mod = step * 2; - if (procID % mod) + if (myProci % mod) { // The rank above - above = procID - (procID % mod); + above = myProci - (myProci % mod); break; } else { for ( - label j = procID + step; - j < numProcs && j < procID + mod; - j += step + int i = myProci + step; + i < numProcs && i < myProci + mod; + i += step ) { - below.push_back(j); + below.push_back(i); } for ( - label j = procID + step; - j < numProcs && j < procID + mod; - j++ + int i = myProci + step; + i < numProcs && i < myProci + mod; + ++i ) { - allBelow.push_back(j); + allBelow.push_back(i); } } } @@ -204,10 +245,10 @@ static label simpleTree Foam::UPstream::commsStruct::commsStruct ( - const label above, - labelList&& below, - labelList&& allBelow, - labelList&& allNotBelow + const int above, + List<int>&& below, + List<int>&& allBelow, + List<int>&& allNotBelow ) : above_(above), @@ -219,11 +260,11 @@ Foam::UPstream::commsStruct::commsStruct Foam::UPstream::commsStruct::commsStruct ( - const label numProcs, - const label myProcID, - const label above, - const labelUList& below, - const labelUList& allBelow + const int numProcs, + const int myProcID, + const int above, + const UList<int>& below, + const UList<int>& allBelow ) : above_(above), @@ -237,14 +278,14 @@ Foam::UPstream::commsStruct::commsStruct isNotBelow[myProcID] = false; // Exclude allBelow - for (const label proci : allBelow) + for (const auto proci : allBelow) { isNotBelow[proci] = false; } // Compacting to obtain allNotBelow_ - label nNotBelow = 0; - forAll(isNotBelow, proci) + int nNotBelow = 0; + for (int proci = 0; proci < numProcs; ++proci) { if (isNotBelow[proci]) { @@ -266,7 +307,7 @@ Foam::UPstream::commsStruct::commsStruct void Foam::UPstream::commsStructList::printGraph ( Ostream& os, - const label proci + const int proci ) const { // Print graph - starting at depth 0 @@ -282,9 +323,9 @@ void Foam::UPstream::commsStructList::printGraph // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // -Foam::label Foam::UPstream::commsStruct::nProcs() const noexcept +int Foam::UPstream::commsStruct::nProcs() const noexcept { - return (1 + allBelow_.size() + allNotBelow_.size()); + return (1 + int(allBelow_.size() + allNotBelow_.size())); } @@ -299,46 +340,65 @@ void Foam::UPstream::commsStruct::reset() void Foam::UPstream::commsStruct::reset ( - const label procID, - const label numProcs, - [[maybe_unused]] const label comm + const int myProci, + const int numProcs, + const int communicator ) { reset(); - if (numProcs <= 2 || numProcs < UPstream::nProcsSimpleSum) + // Linear (flat) communication pattern + if + ( + // Trivially small domains + (numProcs <= 2 || numProcs < UPstream::nProcsSimpleSum) + + // local-node: assume that the local communication is low-latency + || ( + UPstream::commLocalNode() == communicator + && UPstream::commLocalNode() > UPstream::commConstWorld() + ) + // inter-node: presumably relatively few nodes and/or + // higher latency with larger messages being sent + || ( + UPstream::commInterNode() == communicator + && UPstream::commInterNode() > UPstream::commConstWorld() + ) + ) { // Linear communication pattern - label above(-1); - labelList below; + int above(-1); + List<int> below; - if (procID == 0) + if (myProci == 0) { - below = identity(numProcs-1, 1); + below.resize(numProcs-1); + std::iota(below.begin(), below.end(), 1); } else { above = 0; } - *this = UPstream::commsStruct(numProcs, procID, above, below, below); + *this = UPstream::commsStruct(numProcs, myProci, above, below, below); return; } - // Simple tree communication pattern - DynamicList<label> below; - DynamicList<label> allBelow; - label above = simpleTree + DynamicList<int> below; + DynamicList<int> allBelow; + + // Simple tree communication pattern + int above = simpleTree ( - procID, + myProci, numProcs, below, allBelow ); - *this = UPstream::commsStruct(numProcs, procID, above, below, allBelow); + *this = UPstream::commsStruct(numProcs, myProci, above, below, allBelow); } @@ -360,19 +420,36 @@ Foam::UPstream::commsStructList::null() // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // -void Foam::UPstream::commsStructList::init(const label comm) +void Foam::UPstream::commsStructList::init(int communicator) { - comm_ = comm; + comm_ = communicator; + tree_.clear(); + if (comm_ >= 0) + { + tree_.resize(UPstream::nProcs(comm_)); + } +} + + +void Foam::UPstream::commsStructList::reset(int communicator) +{ + comm_ = communicator; tree_.clear(); - tree_.resize(UPstream::nProcs(comm)); } const Foam::UPstream::commsStruct& -Foam::UPstream::commsStructList::get(const label proci) const +Foam::UPstream::commsStructList::get(int proci) const { + const auto numProcs = UPstream::nProcs(comm_); + + // Only if reset(comm) instead of init(comm) was used + if (tree_.size() < numProcs) + { + const_cast<List<commsStruct>&>(tree_).resize(numProcs); + } + const UPstream::commsStruct& entry = tree_[proci]; - const auto numProcs = tree_.size(); if (entry.nProcs() != numProcs) { @@ -391,10 +468,8 @@ bool Foam::UPstream::commsStruct::operator==(const commsStruct& comm) const { return ( - (above_ == comm.above()) - && (below_ == comm.below()) - // && (allBelow_ == comm.allBelow()) - // && (allNotBelow_ == comm.allNotBelow()) + (above() == comm.above()) + && (below() == comm.below()) ); } @@ -409,10 +484,10 @@ bool Foam::UPstream::commsStruct::operator!=(const commsStruct& comm) const Foam::Ostream& Foam::operator<<(Ostream& os, const UPstream::commsStruct& comm) { - os << comm.above() << nl << token::SPACE << token::SPACE; - comm.below().writeList(os) << nl << token::SPACE << token::SPACE; - comm.allBelow().writeList(os) << nl << token::SPACE << token::SPACE; - comm.allNotBelow().writeList(os); + os << comm.above() << nl; + os << " "; comm.below().writeList(os) << nl; + os << " "; comm.allBelow().writeList(os) << nl; + os << " "; comm.allNotBelow().writeList(os); os.check(FUNCTION_NAME); return os; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamBroadcast.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamBroadcast.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamBroadcast.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineGather.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineGather.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineReduceOps.H similarity index 95% rename from src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H rename to src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineReduceOps.H index 5c8f52d6acff4cef46f6729f6d8d87876704c474..905cd788c3d8cc8b502cf693d382916fe2b2afda 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamCombineReduceOps.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamCombineReduceOps.H @@ -20,6 +20,8 @@ Description #ifndef FoamCompat_PstreamCombineReduceOps_H #define FoamCompat_PstreamCombineReduceOps_H +#warning Deprecated header + #include "Pstream.H" #include "ops.H" @@ -32,6 +34,7 @@ namespace Foam //- Compatibility wrapper for Pstream::combineReduce template<class T, class CombineOp> +FOAM_DEPRECATED_FOR(2022-08, "Pstream::combineReduce()") void combineReduce ( T& value, diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchange.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchange.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchangeConsensus.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchangeConsensus.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamExchangeConsensus.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGather.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGather.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGather.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGatherList.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGatherList.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/PstreamGatherList.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/compat/UPstreamTemplates.C b/src/OpenFOAM/db/IOstreams/Pstreams/compat/UPstreamTemplates.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/db/IOstreams/Pstreams/compat/UPstreamTemplates.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/global/argList/argList.C b/src/OpenFOAM/global/argList/argList.C index 8abb9a5457f4a07a0bb75a2073b0f79dc06b91c2..99bee6f6fd1a87665209d88886063a93b4377f2d 100644 --- a/src/OpenFOAM/global/argList/argList.C +++ b/src/OpenFOAM/global/argList/argList.C @@ -2110,21 +2110,33 @@ void Foam::argList::parse Info<< " (" << UPstream::nProcs() << " ranks, " << UPstream::numNodes() << " nodes)" << nl; - Info<< " floatTransfer : " - << Switch::name(UPstream::floatTransfer) << nl - << " maxCommsSize : " - << UPstream::maxCommsSize << nl - << " nProcsSimpleSum : " - << UPstream::nProcsSimpleSum << nl - << " nonBlockingExchange: " - << UPstream::nProcsNonblockingExchange - << " (tuning: " << UPstream::tuning_NBX_ << ')' << nl - << " exchange algorithm : " - << PstreamBuffers::algorithm << nl - << " commsType : " - << UPstream::commsTypeNames[UPstream::defaultCommsType] << nl - << " polling iterations : " - << UPstream::nPollProcInterfaces << nl; + if (UPstream::floatTransfer) + { + Info<< " floatTransfer : enabled" << nl; + } + if (UPstream::maxCommsSize) + { + Info<< " maxCommsSize : " + << UPstream::maxCommsSize << nl; + } + if (UPstream::nProcsSimpleSum > 2) + { + Info<< " nProcsSimpleSum : " + << UPstream::nProcsSimpleSum << nl; + } + { + const auto& commsType = + UPstream::commsTypeNames[UPstream::defaultCommsType]; + + Info<< " nonBlockingExchange: " + << UPstream::nProcsNonblockingExchange + << " (tuning: " << UPstream::tuning_NBX_ << ')' << nl + << " exchange algorithm : " + << PstreamBuffers::algorithm << nl + << " commsType : " << commsType << nl + << " polling iterations : " + << UPstream::nPollProcInterfaces << nl; + } if (UPstream::allWorlds().size() > 1) { diff --git a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/GAMGAgglomerations/GAMGAgglomeration/GAMGAgglomerateLduAddressing.C b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/GAMGAgglomerations/GAMGAgglomeration/GAMGAgglomerateLduAddressing.C index f231f5d778bb7b54658075e662c5c3da9f8e37c4..6cf566630adc7b1d4af54fce0732b767e62f3d88 100644 --- a/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/GAMGAgglomerations/GAMGAgglomeration/GAMGAgglomerateLduAddressing.C +++ b/src/OpenFOAM/matrices/lduMatrix/solvers/GAMG/GAMGAgglomerations/GAMGAgglomeration/GAMGAgglomerateLduAddressing.C @@ -520,7 +520,7 @@ void Foam::GAMGAgglomeration::procAgglomerateLduAddressing bMap.setSize(nOldInterfaces); // Scatter relevant section to originating processor - UPstream::scatter + UPstream::mpiScatterv ( data.values().cdata(), diff --git a/src/OpenFOAM/parallel/globalIndex/compat/globalIndexTemplates.C b/src/OpenFOAM/parallel/globalIndex/compat/globalIndexTemplates.C new file mode 100644 index 0000000000000000000000000000000000000000..7cec1b4541d1553571d30afa2aac3b841f6c3d51 --- /dev/null +++ b/src/OpenFOAM/parallel/globalIndex/compat/globalIndexTemplates.C @@ -0,0 +1 @@ +#warning File removed - left for old dependency check only diff --git a/src/OpenFOAM/parallel/globalIndex/globalIndex.H b/src/OpenFOAM/parallel/globalIndex/globalIndex.H index 19f6ef1a9d2a6a92790806bf521e8b983e19c4de..c203cedbd602c77879428dd67712ff17f8038ab2 100644 --- a/src/OpenFOAM/parallel/globalIndex/globalIndex.H +++ b/src/OpenFOAM/parallel/globalIndex/globalIndex.H @@ -37,7 +37,7 @@ Description SourceFiles globalIndex.C globalIndexI.H - globalIndexTemplates.C + globalIndex.txx \*---------------------------------------------------------------------------*/ @@ -1098,7 +1098,7 @@ public: // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #ifdef NoRepository - #include "globalIndexTemplates.C" + #include "globalIndex.txx" #endif // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // diff --git a/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C b/src/OpenFOAM/parallel/globalIndex/globalIndex.txx similarity index 99% rename from src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C rename to src/OpenFOAM/parallel/globalIndex/globalIndex.txx index 46b5cae1554140b72355671b1e54756116b0752f..6ada3d890c8ed4d637f91beea2e581f430622068 100644 --- a/src/OpenFOAM/parallel/globalIndex/globalIndexTemplates.C +++ b/src/OpenFOAM/parallel/globalIndex/globalIndex.txx @@ -789,7 +789,7 @@ void Foam::globalIndex::mpiGather { case 'b': // Byte-wise { - UPstream::gather + UPstream::mpiGatherv ( sendData.cdata_bytes(), sendData.size_bytes(), @@ -804,7 +804,7 @@ void Foam::globalIndex::mpiGather { typedef scalar cmptType; - UPstream::gather + UPstream::mpiGatherv ( reinterpret_cast<const cmptType*>(sendData.cdata()), (sendData.size()*nCmpts), @@ -819,7 +819,7 @@ void Foam::globalIndex::mpiGather { typedef label cmptType; - UPstream::gather + UPstream::mpiGatherv ( reinterpret_cast<const cmptType*>(sendData.cdata()), (sendData.size()*nCmpts), diff --git a/src/Pstream/dummy/UPstreamAllToAll.C b/src/Pstream/dummy/UPstreamAllToAll.C index 134808104f8c428a2bde9e2fdfdad7c02ed154b6..acc4152a62aa1b03985db4213d4cc1eeefd935fa 100644 --- a/src/Pstream/dummy/UPstreamAllToAll.C +++ b/src/Pstream/dummy/UPstreamAllToAll.C @@ -34,11 +34,11 @@ License // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native) \ +#define Pstream_CommonRoutines(Type) \ void Foam::UPstream::allToAll \ ( \ - const UList<Native>& sendData, \ - UList<Native>& recvData, \ + const UList<Type>& sendData, \ + UList<Type>& recvData, \ const label comm \ ) \ { \ @@ -55,11 +55,11 @@ Pstream_CommonRoutines(int64_t); // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native) \ +#define Pstream_CommonRoutines(Type) \ void Foam::UPstream::allToAllConsensus \ ( \ - const UList<Native>& sendData, \ - UList<Native>& recvData, \ + const UList<Type>& sendData, \ + UList<Type>& recvData, \ const int tag, \ const label comm \ ) \ @@ -68,8 +68,8 @@ void Foam::UPstream::allToAllConsensus \ } \ void Foam::UPstream::allToAllConsensus \ ( \ - const Map<Native>& sendData, \ - Map<Native>& recvData, \ + const Map<Type>& sendData, \ + Map<Type>& recvData, \ const int tag, \ const label comm \ ) \ @@ -87,13 +87,13 @@ Pstream_CommonRoutines(int64_t); // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native) \ -void Foam::UPstream::allToAll \ +#define Pstream_CommonRoutines(Type) \ +void Foam::UPstream::allToAllv \ ( \ - const Native* sendData, \ + const Type* sendData, \ const UList<int>& sendCounts, \ const UList<int>& sendOffsets, \ - Native* recvData, \ + Type* recvData, \ const UList<int>& recvCounts, \ const UList<int>& recvOffsets, \ const label comm \ @@ -106,7 +106,7 @@ void Foam::UPstream::allToAll \ << " does not equal number to receive " << recvCounts[0] \ << Foam::abort(FatalError); \ } \ - std::memmove(recvData, sendData, recvCounts[0]*sizeof(Native)); \ + std::memmove(recvData, sendData, recvCounts[0]*sizeof(Type)); \ } diff --git a/src/Pstream/dummy/UPstreamGatherScatter.C b/src/Pstream/dummy/UPstreamGatherScatter.C index 4f772b4c1d1d8f865e04713ba999b4616ca3ce21..9034fc75f39e4c6b9eb77ee84265a5f6c28a2fdc 100644 --- a/src/Pstream/dummy/UPstreamGatherScatter.C +++ b/src/Pstream/dummy/UPstreamGatherScatter.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022-2023 OpenCFD Ltd. + Copyright (C) 2022-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -31,68 +31,74 @@ License // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // #undef Pstream_CommonRoutines -#define Pstream_CommonRoutines(Native) \ +#define Pstream_CommonRoutines(Type) \ \ void Foam::UPstream::mpiGather \ ( \ - const Native* sendData, \ - Native* recvData, \ + const Type* sendData, \ + Type* recvData, \ int count, \ const label comm \ ) \ { \ - std::memmove(recvData, sendData, count*sizeof(Native)); \ + if (sendData && recvData) \ + { \ + std::memmove(recvData, sendData, count*sizeof(Type)); \ + } \ } \ \ \ void Foam::UPstream::mpiScatter \ ( \ - const Native* sendData, \ - Native* recvData, \ + const Type* sendData, \ + Type* recvData, \ int count, \ const label comm \ ) \ { \ - std::memmove(recvData, sendData, count*sizeof(Native)); \ + if (sendData && recvData) \ + { \ + std::memmove(recvData, sendData, count*sizeof(Type)); \ + } \ } \ \ \ void Foam::UPstream::mpiAllGather \ ( \ - Native* allData, \ + Type* allData, \ int count, \ const label comm \ ) \ {} \ \ \ -void Foam::UPstream::gather \ +void Foam::UPstream::mpiGatherv \ ( \ - const Native* sendData, \ + const Type* sendData, \ int sendCount, \ \ - Native* recvData, \ + Type* recvData, \ const UList<int>& recvCounts, \ const UList<int>& recvOffsets, \ const label comm \ ) \ { \ /* recvCounts[0] may be invalid - use sendCount instead */ \ - std::memmove(recvData, sendData, sendCount*sizeof(Native)); \ + std::memmove(recvData, sendData, sendCount*sizeof(Type)); \ } \ \ -void Foam::UPstream::scatter \ +void Foam::UPstream::mpiScatterv \ ( \ - const Native* sendData, \ + const Type* sendData, \ const UList<int>& sendCounts, \ const UList<int>& sendOffsets, \ \ - Native* recvData, \ + Type* recvData, \ int recvCount, \ const label comm \ ) \ { \ - std::memmove(recvData, sendData, recvCount*sizeof(Native)); \ + std::memmove(recvData, sendData, recvCount*sizeof(Type)); \ } diff --git a/src/Pstream/mpi/UPstreamAllToAll.C b/src/Pstream/mpi/UPstreamAllToAll.C index 98bc7192cdc21ed6fdbbfd873f49198713f0559d..63f3e3188e5e37c2d25e36f8b0842f9d5a786c42 100644 --- a/src/Pstream/mpi/UPstreamAllToAll.C +++ b/src/Pstream/mpi/UPstreamAllToAll.C @@ -5,7 +5,7 @@ \\ / A nd | www.openfoam.com \\/ M anipulation | ------------------------------------------------------------------------------- - Copyright (C) 2022-2023 OpenCFD Ltd. + Copyright (C) 2022-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -101,7 +101,7 @@ Pstream_CommonRoutines(int64_t, MPI_INT64_T); #undef Pstream_CommonRoutines #define Pstream_CommonRoutines(Native, TaggedType) \ -void Foam::UPstream::allToAll \ +void Foam::UPstream::allToAllv \ ( \ const Native* sendData, \ const UList<int>& sendCounts, \ diff --git a/src/Pstream/mpi/UPstreamGatherScatter.C b/src/Pstream/mpi/UPstreamGatherScatter.C index 4bd49dcedb505b4867164ea0819173fe41065526..36cac86dd427d9df5a82961127c467d29b5de79c 100644 --- a/src/Pstream/mpi/UPstreamGatherScatter.C +++ b/src/Pstream/mpi/UPstreamGatherScatter.C @@ -81,7 +81,7 @@ void Foam::UPstream::mpiAllGather \ ); \ } \ \ -void Foam::UPstream::gather \ +void Foam::UPstream::mpiGatherv \ ( \ const Native* sendData, \ int sendCount, \ @@ -100,7 +100,7 @@ void Foam::UPstream::gather \ ); \ } \ \ -void Foam::UPstream::scatter \ +void Foam::UPstream::mpiScatterv \ ( \ const Native* sendData, \ const UList<int>& sendCounts, \ diff --git a/src/Pstream/mpi/UPstreamWrapping.H b/src/Pstream/mpi/UPstreamWrapping.H index 33092eda41ca4dadf7562cf34bf462aad75c3c2d..a76a2664b3a6f8df392c238201a968def1fd830a 100644 --- a/src/Pstream/mpi/UPstreamWrapping.H +++ b/src/Pstream/mpi/UPstreamWrapping.H @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2012-2016 OpenFOAM Foundation - Copyright (C) 2022-2024 OpenCFD Ltd. + Copyright (C) 2022-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -139,6 +139,7 @@ void allToAllConsensus // MPI_Gather or MPI_Igather +// Uses recvData as send/recv when sendData is nullptr template<class Type> void gather ( @@ -153,6 +154,7 @@ void gather // MPI_Scatter or MPI_Iscatter +// Uses recvData as send/recv when sendData is nullptr template<class Type> void scatter ( diff --git a/src/Pstream/mpi/UPstreamWrapping.txx b/src/Pstream/mpi/UPstreamWrapping.txx index 97b0a95b4c5622c0f21bdf3903d6de7a6695f042..cce0e9cdcb8c210ca9f27759cc872ed48c03aa1d 100644 --- a/src/Pstream/mpi/UPstreamWrapping.txx +++ b/src/Pstream/mpi/UPstreamWrapping.txx @@ -6,7 +6,7 @@ \\/ M anipulation | ------------------------------------------------------------------------------- Copyright (C) 2012-2015 OpenFOAM Foundation - Copyright (C) 2019-2023 OpenCFD Ltd. + Copyright (C) 2019-2025 OpenCFD Ltd. ------------------------------------------------------------------------------- License This file is part of OpenFOAM. @@ -30,6 +30,7 @@ License #include "profilingPstream.H" #include "PstreamGlobals.H" #include "Map.H" +#include <cstring> // memmove // * * * * * * * * * * * * * * * Global Functions * * * * * * * * * * * * * // @@ -902,9 +903,9 @@ void Foam::PstreamDetail::gather { return; } - if (!UPstream::is_parallel(comm)) + else if (!UPstream::is_parallel(comm)) { - if (recvData) + if (sendData && recvData) { std::memmove(recvData, sendData, count*sizeof(Type)); } @@ -923,6 +924,10 @@ void Foam::PstreamDetail::gather { Perr<< "** MPI_Gather (blocking):"; } + if (sendData == nullptr) + { + Perr<< " [inplace]"; + } Perr<< " numProc:" << numProc << " count:" << count << " with comm:" << comm @@ -931,6 +936,12 @@ void Foam::PstreamDetail::gather error::printStack(Perr); } + const void* send_buffer = sendData; + if (sendData == nullptr || (sendData == recvData)) + { + // Appears to be an in-place request + send_buffer = MPI_IN_PLACE; + } #if defined(MPI_VERSION) && (MPI_VERSION >= 3) if (immediate) @@ -943,7 +954,7 @@ void Foam::PstreamDetail::gather ( MPI_Igather ( - const_cast<Type*>(sendData), count, datatype, + send_buffer, count, datatype, recvData, count, datatype, 0, // root: UPstream::masterNo() PstreamGlobals::MPICommunicators_[comm], @@ -969,7 +980,7 @@ void Foam::PstreamDetail::gather ( MPI_Gather ( - const_cast<Type*>(sendData), count, datatype, + send_buffer, count, datatype, recvData, count, datatype, 0, // root: UPstream::masterNo() PstreamGlobals::MPICommunicators_[comm] @@ -1009,9 +1020,9 @@ void Foam::PstreamDetail::scatter { return; } - if (!UPstream::is_parallel(comm)) + else if (!UPstream::is_parallel(comm)) { - if (recvData) + if (sendData && recvData) { std::memmove(recvData, sendData, count*sizeof(Type)); } @@ -1030,6 +1041,10 @@ void Foam::PstreamDetail::scatter { Perr<< "** MPI_Scatter (blocking):"; } + if (sendData == nullptr) + { + Perr<< " [inplace]"; + } Perr<< " numProc:" << numProc << " count:" << count << " with comm:" << comm @@ -1039,6 +1054,13 @@ void Foam::PstreamDetail::scatter } + const void* send_buffer = sendData; + if (sendData == nullptr || (sendData == recvData)) + { + // Appears to be an in-place request + send_buffer = MPI_IN_PLACE; + } + #if defined(MPI_VERSION) && (MPI_VERSION >= 3) if (immediate) { @@ -1050,7 +1072,7 @@ void Foam::PstreamDetail::scatter ( MPI_Iscatter ( - const_cast<Type*>(sendData), count, datatype, + send_buffer, count, datatype, recvData, count, datatype, 0, // root: UPstream::masterNo() PstreamGlobals::MPICommunicators_[comm], @@ -1076,7 +1098,7 @@ void Foam::PstreamDetail::scatter ( MPI_Scatter ( - const_cast<Type*>(sendData), count, datatype, + send_buffer, count, datatype, recvData, count, datatype, 0, // root: UPstream::masterNo() PstreamGlobals::MPICommunicators_[comm] @@ -1119,10 +1141,13 @@ void Foam::PstreamDetail::gatherv { return; } - if (!UPstream::is_parallel(comm)) + else if (!UPstream::is_parallel(comm)) { // recvCounts[0] may be invalid - use sendCount instead - std::memmove(recvData, sendData, sendCount*sizeof(Type)); + if (sendData && recvData) + { + std::memmove(recvData, sendData, sendCount*sizeof(Type)); + } return; } @@ -1262,7 +1287,7 @@ void Foam::PstreamDetail::scatterv { return; } - if (!UPstream::is_parallel(comm)) + else if (!UPstream::is_parallel(comm)) { std::memmove(recvData, sendData, recvCount*sizeof(Type)); return; diff --git a/src/dynamicMesh/motionSolvers/displacement/pointSmoothing/hexMeshSmootherMotionSolver.C b/src/dynamicMesh/motionSolvers/displacement/pointSmoothing/hexMeshSmootherMotionSolver.C index 8795e1cfe02157e35cac87691ce3d5daa9dc017c..f8b250f690f7b1e964f4fd991d7cbe4b52725287 100644 --- a/src/dynamicMesh/motionSolvers/displacement/pointSmoothing/hexMeshSmootherMotionSolver.C +++ b/src/dynamicMesh/motionSolvers/displacement/pointSmoothing/hexMeshSmootherMotionSolver.C @@ -455,7 +455,7 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos const labelList& elems ) const { - labelList n(size, 0); + labelList n(size, Zero); for (const label elem : elems) { if (elem >= 0) @@ -463,8 +463,8 @@ Foam::labelList Foam::hexMeshSmootherMotionSolver::countZeroOrPos n[elem]++; } } - Pstream::listCombineGather(n, plusEqOp<label>()); - Pstream::broadcast(n); + + Pstream::listCombineReduce(n, plusEqOp<label>()); return n; }