diff --git a/etc/controlDict b/etc/controlDict index 855d06fbd72b3a016d31d3e98be6ed0b89620a9a..74d85e59017bac8b49d082aaf6dcfcf5551bf012 100644 --- a/etc/controlDict +++ b/etc/controlDict @@ -147,6 +147,15 @@ OptimisationSwitches // >= 3 : when there are more than N nodes nodeComms.min 0; + // Selection of topology-aware routines (bitmask) + // 0: disabled [default] + // 1: broadcast [MPI] + // 4: gather/all-gather [MPI] + // 16: combine (reduction) [manual algorithm] + // 32: mapGather (reduction) [manual algorithm] + // 64: gatherList/scatterList [manual algorithm] + topoControl 0; + // Transfer double as float for processor boundaries. Mostly defunct. floatTransfer 0; diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 70294111f3c130afade1807564a44c994be23ced..28d70dbf3af398616c3bd55fc82cd7061e834abd 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -155,6 +155,19 @@ public: const int communicator ); + //- Implementation: gather (reduce) single element data onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template<class T, class BinaryOp, bool InplaceMode> + static bool gather_topo_algorithm + ( + //! [in,out] + T& value, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) data, applying \c bop to combine \c value //- from different processors. The basis for Foam::reduce(). // A no-op for non-parallel. @@ -256,6 +269,19 @@ public: const int communicator ); + //- Implementation: gather (reduce) list element data onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template<class T, class BinaryOp, bool InplaceMode> + static bool listGather_topo_algorithm + ( + //! [in,out] + UList<T>& values, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) list elements, //- applying \c bop to each list element // @@ -337,6 +363,18 @@ public: const int communicator ); + //- Implementation: gather (reduce) Map/HashTable containers onto + //- UPstream::masterNo() using a topo algorithm. + // \returns True if topo algorithm was applied + template<class Container, class BinaryOp, bool InplaceMode> + static bool mapGather_topo_algorithm + ( + Container& values, + BinaryOp bop, + const int tag, + const int communicator + ); + //- Gather (reduce) Map/HashTable containers, //- applying \c bop to combine entries from different processors. // @@ -421,6 +459,17 @@ public: const int communicator ); + //- Gather data, keeping individual values separate. + // \returns True if topo algorithm was applied + template<class T> + static bool gatherList_topo_algorithm + ( + //! [in,out] + UList<T>& values, + const int tag, + const int communicator + ); + //- Implementation: inverse of gatherList_algorithm template<class T> static void scatterList_algorithm diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx index 3b5e34c9739690489a92e56733048a444f67a254..6ddb82f2f9c56ed3e68496d433960542f463c611 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGather.txx @@ -147,6 +147,59 @@ void Foam::Pstream::gather_algorithm } +template<class T, class BinaryOp, bool InplaceMode> +bool Foam::Pstream::gather_topo_algorithm +( + T& value, + BinaryOp bop, + const int tag, + const int communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) + && UPstream::usingTopoControl(UPstream::topoControls::combine) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair<int, bool>; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::gather_algorithm<T, BinaryOp, InplaceMode> + ( + UPstream::whichCommunication(subComm, linear), + value, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template<class T, class BinaryOp, bool InplaceMode> void Foam::Pstream::gather ( @@ -172,7 +225,16 @@ void Foam::Pstream::gather communicator ); } - else + else if + ( + !Pstream::gather_topo_algorithm<T, BinaryOp, InplaceMode> + ( + value, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); @@ -337,6 +399,59 @@ void Foam::Pstream::listGather_algorithm } +template<class T, class BinaryOp, bool InplaceMode> +bool Foam::Pstream::listGather_topo_algorithm +( + UList<T>& values, + BinaryOp bop, + const int tag, + const label communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) && !values.empty() + && UPstream::usingTopoControl(UPstream::topoControls::combine) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair<int, bool>; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::listGather_algorithm<T, BinaryOp, InplaceMode> + ( + UPstream::whichCommunication(subComm, linear), + values, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template<class T, class BinaryOp, bool InplaceMode> void Foam::Pstream::listGather ( @@ -373,7 +488,16 @@ void Foam::Pstream::listGather communicator ); } - else + else if + ( + !Pstream::listGather_topo_algorithm<T, BinaryOp, InplaceMode> + ( + values, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); @@ -548,6 +672,59 @@ void Foam::Pstream::mapGather_algorithm } +template<class Container, class BinaryOp, bool InplaceMode> +bool Foam::Pstream::mapGather_topo_algorithm +( + Container& values, + BinaryOp bop, + const int tag, + const int communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) + && UPstream::usingTopoControl(UPstream::topoControls::mapGather) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological reduce + // - linear for local-node (assume communication is fast) + // - tree for inter-node (no assumption about speed) + + using control = std::pair<int, bool>; + + for + ( + auto [subComm, linear] : + { + // 1: within node + control{ UPstream::commLocalNode(), true }, + // 2: between nodes + control{ UPstream::commInterNode(), false } + } + ) + { + if (UPstream::is_parallel(subComm)) + { + Pstream::mapGather_algorithm<Container, BinaryOp, InplaceMode> + ( + UPstream::whichCommunication(subComm, linear), + values, + bop, + tag, + subComm + ); + } + } + } + + return withTopo; +} + + template<class Container, class BinaryOp, bool InplaceMode> void Foam::Pstream::mapGather ( @@ -562,7 +739,16 @@ void Foam::Pstream::mapGather // Nothing to do return; } - else + else if + ( + !Pstream::mapGather_topo_algorithm<Container, BinaryOp, InplaceMode> + ( + values, + bop, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx index d850fe348e9e06356e743ae9e66da6ae2a08b43b..1e8c31a2170e84d8da117cde32263e7ab8e79b44 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamGatherList.txx @@ -252,6 +252,127 @@ void Foam::Pstream::gatherList_algorithm } +template<class T> +bool Foam::Pstream::gatherList_topo_algorithm +( + UList<T>& values, + const int tag, + const int communicator +) +{ + const bool withTopo = + ( + UPstream::is_parallel(communicator) + && UPstream::usingTopoControl(UPstream::topoControls::gatherList) + && UPstream::usingNodeComms(communicator) + ); + + if (withTopo) + { + // Topological gathering + + if (FOAM_UNLIKELY(values.size() < UPstream::nProcs(communicator))) + { + FatalErrorInFunction + << "List of values:" << values.size() + << " < numProcs:" << UPstream::nProcs(communicator) << nl + << Foam::abort(FatalError); + } + + // Overall node-wise offsets + const auto& off = UPstream::interNode_offsets(); + + // The per-node processor range + const auto& nodeProcs = UPstream::localNode_parentProcs(); + + // The per-node sub-section of values + auto nodeValues = values.slice(nodeProcs.start(), nodeProcs.size()); + + // Stage 1: gather values within a node + // - linear for local-node (assume communication is fast) + + if (UPstream::is_parallel(UPstream::commLocalNode())) + { + const auto subComm = UPstream::commLocalNode(); + constexpr bool linear(true); + + Pstream::gatherList_algorithm<T> + ( + UPstream::whichCommunication(subComm, linear), + nodeValues, + tag, + subComm + ); + } + + // Stage 2: gather between node leaders + // - this unfortunately corresponds to a gatherv process + // (number of cores per node is not identical) + // - code strongly resembles globalIndex::gather + + if (UPstream::is_parallel(UPstream::commInterNode())) + { + const auto subComm = UPstream::commInterNode(); + + if (UPstream::master(subComm)) + { + for (const int proci : UPstream::subProcs(subComm)) + { + auto slot = + values.slice(off[proci], off[proci+1]-off[proci]); + + // Probably not contiguous though, + // otherwise would have used mpiGather() + + if constexpr (is_contiguous_v<T>) + { + UIPstream::read + ( + UPstream::commsTypes::scheduled, + proci, + slot, + tag, + subComm + ); + } + else + { + IPstream::recv(slot, proci, tag, subComm); + } + } + } + else + { + if constexpr (is_contiguous_v<T>) + { + UOPstream::write + ( + UPstream::commsTypes::scheduled, + UPstream::masterNo(), + nodeValues, + tag, + subComm + ); + } + else + { + OPstream::send + ( + nodeValues, + UPstream::commsTypes::scheduled, + UPstream::masterNo(), + tag, + subComm + ); + } + } + } + } + + return withTopo; +} + + template<class T> void Foam::Pstream::scatterList_algorithm ( @@ -448,7 +569,15 @@ void Foam::Pstream::gatherList auto* ptr = values.data() + UPstream::myProcNo(communicator); UPstream::mpiGather(ptr, ptr, 1, communicator); } - else + else if + ( + !Pstream::gatherList_topo_algorithm + ( + values, + tag, + communicator + ) + ) { // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); @@ -486,6 +615,15 @@ void Foam::Pstream::allGatherList } else { + // IMPORTANT: always call the *_algorithm() versions here and + // never the base versions [eg, Pstream::gatherList()] since + // the communcation order must be absolutely identical + // for the gatherList and scatterList, otherwise the results will + // not replicate the allGather behaviour. + // + // This also means that we must avoid the gatherList_topo_algorithm() + // as well, since this does not pair well with scatterList_algorithm() + // Communication order const auto& commOrder = UPstream::whichCommunication(communicator); diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C index 2c1d72b143acf4957a5e4c1f0de0300f83052d6c..8bd928338291ab8fe55c690b015b1b07f2185d65 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/UPstream.C @@ -106,7 +106,35 @@ void Foam::UPstream::printNodeCommsControl(Ostream& os) void Foam::UPstream::printTopoControl(Ostream& os) { - os << "none"; + unsigned count = 0; + + if (UPstream::topologyControl_ > 0) + { + #undef PrintControl + #define PrintControl(Ctrl, Name) \ + if (UPstream::usingTopoControl(topoControls::Ctrl)) \ + { \ + os << (count++ ? ' ' : '(') << Name; \ + } + + PrintControl(broadcast, "broadcast"); + PrintControl(reduce, "reduce"); + PrintControl(gather, "gather"); + PrintControl(combine, "combine"); + PrintControl(mapGather, "mapGather"); + PrintControl(gatherList, "gatherList"); + + #undef PrintControl + } + + if (count) + { + os << ')'; // End the list + } + else + { + os << "none"; + } } diff --git a/src/Pstream/mpi/UPstreamBroadcast.C b/src/Pstream/mpi/UPstreamBroadcast.C index 5a916441b15dd340ea709f217e18f38e89575c5a..8e9db320edc40c08f6939d1cc80345facbb18de4 100644 --- a/src/Pstream/mpi/UPstreamBroadcast.C +++ b/src/Pstream/mpi/UPstreamBroadcast.C @@ -59,19 +59,62 @@ bool Foam::UPstream::mpi_broadcast return false; } + const bool withTopo = + ( + UPstream::usingTopoControl(UPstream::topoControls::broadcast) + && UPstream::usingNodeComms(communicator) + ); + if (FOAM_UNLIKELY(UPstream::debug)) { Perr<< "[mpi_broadcast] :" << " type:" << int(dataTypeId) << " count:" << label(count) << " comm:" << communicator - << Foam::endl; + << " topo:" << withTopo << Foam::endl; } int returnCode = MPI_SUCCESS; profilingPstream::beginTiming(); + if (withTopo) + { + // Topological broadcast + + for + ( + const int subComm : + // std::initializer_list<int> + { + UPstream::commInterNode_, // Stage 1: between nodes + UPstream::commLocalNode_ // Stage 2: within a node + } + ) + { + if (UPstream::is_parallel(subComm)) + { + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_broadcast] :" + << " type:" << int(dataTypeId) + << " count:" << label(count) + << " comm:" << subComm + << " substage" << Foam::endl; + } + + returnCode = MPI_Bcast + ( + buf, + count, + datatype, + 0, // (root rank) == UPstream::masterNo() + PstreamGlobals::MPICommunicators_[subComm] + ); + } + } + } + else { // Regular broadcast // OR: PstreamDetail::broadcast(buf, count, datatype, communicator); diff --git a/src/Pstream/mpi/UPstreamReduce.C b/src/Pstream/mpi/UPstreamReduce.C index fc87380ba5313df8bb97660938d67c26bab07009..cefc3309453a1fe00fea22dbbf6b0b7de7bd7db4 100644 --- a/src/Pstream/mpi/UPstreamReduce.C +++ b/src/Pstream/mpi/UPstreamReduce.C @@ -172,13 +172,20 @@ void Foam::UPstream::mpi_reduce FatalError << Foam::abort(FatalError); } + const bool withTopo = + ( + (req == nullptr) + && UPstream::usingTopoControl(UPstream::topoControls::reduce) + && UPstream::usingNodeComms(communicator) + ); + if (FOAM_UNLIKELY(UPstream::debug)) { Perr<< "[mpi_reduce] : (inplace)" << " op:" << int(opCodeId) << " type:" << int(dataTypeId) << " count:" << count << " comm:" << communicator - << Foam::endl; + << " topo:" << withTopo << Foam::endl; // error::printStack(Perr); } @@ -207,10 +214,65 @@ void Foam::UPstream::mpi_reduce std::memcpy(send_buffer, values, num_bytes); #else - void* send_buffer(nullptr); // ie, in-place + void* send_buffer = values; // ie, in-place #endif - + if (withTopo) + { + // Topological reduce + + // Stage 1: local reduction within a node -> onto the node leader + if (UPstream::is_parallel(UPstream::commLocalNode_)) + { + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_reduce] : (inplace)" + << " op:" << int(opCodeId) + << " type:" << int(dataTypeId) << " count:" << count + << " comm:" << UPstream::commLocalNode_ + << " stage-1" << Foam::endl; + } + + PstreamDetail::reduce + ( + send_buffer, + values, + count, + datatype, + optype, + UPstream::commLocalNode_ + ); + } + + // Stage 2: reduce between node leaders -> world leader + if (UPstream::is_parallel(UPstream::commInterNode_)) + { + // Transcribe the previous results as input for this stage + #ifndef Foam_vendor_supports_INPLACE_REDUCE + std::memcpy(send_buffer, values, num_bytes); + #endif + + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_reduce] : (inplace)" + << " op:" << int(opCodeId) + << " type:" << int(dataTypeId) << " count:" << count + << " comm:" << UPstream::commInterNode_ + << " stage-2" << Foam::endl; + } + + PstreamDetail::reduce + ( + send_buffer, + values, + count, + datatype, + optype, + UPstream::commInterNode_ + ); + } + } + else { // Regular reduce @@ -261,15 +323,92 @@ void Foam::UPstream::mpi_allreduce FatalError << Foam::abort(FatalError); } + const bool withTopo = + ( + (req == nullptr) + && UPstream::usingTopoControl(UPstream::topoControls::reduce) + && UPstream::usingNodeComms(communicator) + ); + if (FOAM_UNLIKELY(UPstream::debug)) { Perr<< "[mpi_allreduce] :" << " op:" << int(opCodeId) << " type:" << int(dataTypeId) << " count:" << count << " comm:" << communicator - << Foam::endl; + << " topo:" << withTopo << Foam::endl; } + if (withTopo) + { + // Topological allReduce + + // Stage 1: local reduction within a node -> onto the node leader + if (UPstream::is_parallel(UPstream::commLocalNode_)) + { + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_allreduce] :" + << " op:" << int(opCodeId) + << " type:" << int(dataTypeId) << " count:" << count + << " comm:" << UPstream::commLocalNode_ + << " stage-1:reduce" << Foam::endl; + } + + UPstream::mpi_reduce + ( + values, + count, + dataTypeId, + opCodeId, + UPstream::commLocalNode_ + ); + } + + // Stage 2: all-reduce between node leaders + if (UPstream::is_parallel(UPstream::commInterNode_)) + { + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_allreduce] :" + << " op:" << int(opCodeId) + << " type:" << int(dataTypeId) << " count:" << count + << " comm:" << UPstream::commInterNode_ + << " stage-2:allreduce" << Foam::endl; + } + + PstreamDetail::allReduce + ( + values, + count, + datatype, + optype, + UPstream::commInterNode_ + ); + } + + // Finally, broadcast the information from each local node leader + if (UPstream::is_parallel(UPstream::commLocalNode_)) + { + if (FOAM_UNLIKELY(UPstream::debug)) + { + Perr<< "[mpi_allreduce] :" + << " op:" << int(opCodeId) + << " type:" << int(dataTypeId) << " count:" << count + << " comm:" << UPstream::commLocalNode_ + << " stage-3:broadcast" << Foam::endl; + } + + PstreamDetail::broadcast + ( + values, + count, + datatype, + UPstream::commLocalNode_ + ); + } + } + else { // Regular allReduce