diff --git a/applications/test/parallel-chunks/Test-parallel-chunks.C b/applications/test/parallel-chunks/Test-parallel-chunks.C index cbb372298fcff11af4722ef68d8c738889116c59..b5181bccec94cbdeffd908884b86c33139a5b2d7 100644 --- a/applications/test/parallel-chunks/Test-parallel-chunks.C +++ b/applications/test/parallel-chunks/Test-parallel-chunks.C @@ -32,6 +32,8 @@ Description \*---------------------------------------------------------------------------*/ +#define Foam_PstreamExchange_debug_chunks + #include "List.H" #include "argList.H" #include "Time.H" @@ -371,6 +373,7 @@ int main(int argc, char *argv[]) } // Manually + Info<< "perform list exchange" << endl; { labelListList sendBufs(UPstream::nProcs()); labelListList recvBufs(UPstream::nProcs()); @@ -397,6 +400,34 @@ int main(int argc, char *argv[]) ); } + + Info<< "perform Map exchange" << endl; + { + Map<labelList> sendBufs; + Map<labelList> recvBufs; + Map<label> recvSizes; + + if (Pstream::master()) + { + for (const int proci : Pstream::allProcs()) + { + if (proci != Pstream::myProcNo()) + { + sendBufs(proci) = identity(500); + } + } + } + + Pstream::exchangeSizes(sendBufs, recvSizes); + + Pstream::exchange<labelList, label> + ( + sendBufs, + recvSizes, + recvBufs + ); + } + Info<< "End\n" << endl; return 0; } diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H index 07399cf3bddec7294500cbef1e9e1a833843ca26..c0865f2ce2b557e5dc19147e7fc8998395088ccb 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H +++ b/src/OpenFOAM/db/IOstreams/Pstreams/Pstream.H @@ -496,6 +496,27 @@ public: const label comm = UPstream::worldComm ); + //- Exchange the \b non-zero sizes of sendBufs entries (sparse map) + //- with other ranks in the communicator + //- using non-blocking consensus exchange. + // + // Since the recvData map always cleared before receipt and sizes + // of zero are never transmitted, a simple check + // of its keys is sufficient to determine connectivity. + // + // For \b non-parallel : copy size of rank (if it exists and non-empty) + // from sendBufs to recvSizes. + // + // \note The message tag is adjusted internally to improve uniqueness + template<class Container> + static void exchangeSizes + ( + const Map<Container>& sendBufs, + Map<label>& recvSizes, + const label tag = UPstream::msgType(), + const label comm = UPstream::worldComm + ); + //- Helper: exchange \em contiguous data. //- Sends sendBufs, receives into recvBufs using predetermined receive //- sizing. @@ -511,6 +532,22 @@ public: const bool wait = true //!< Wait for requests to complete ); + //- Exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs. + // Data provided and received as container. + // + // No internal guards or resizing. + template<class Container, class Type> + static void exchange + ( + const Map<Container>& sendBufs, + const Map<label>& recvSizes, //!< Num of recv elements (not bytes) + Map<Container>& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete + ); + //- Exchange \em contiguous data. //- Sends sendBufs, receives into recvBufs. //- Determines sizes to receive. @@ -525,30 +562,23 @@ public: const bool wait = true //!< Wait for requests to complete ); - - // Non-blocking exchange - - //- Exchange the \b non-zero sizes of sendBufs entries (sparse map) - //- with all ranks in the communicator - //- using non-blocking consensus exchange. - // - // Since the recvData map always cleared before receipt and sizes - // of zero are never transmitted, a simple check - // of its keys is sufficient to determine connectivity. - // - // For \b non-parallel : copy size of rank (if it exists and non-empty) - // from sendBufs to recvSizes. - // - // \note The message tag is adjusted internally to improve uniqueness - template<class Container> - static void exchangeSizes + //- Exchange \em contiguous data. + //- Sends sendBufs, receives into recvBufs. + //- Determines sizes to receive. + // If wait=true will wait for all transfers to finish. + template<class Container, class Type> + static void exchange ( const Map<Container>& sendBufs, - Map<label>& recvSizes, - const label tag = UPstream::msgType(), - const label comm = UPstream::worldComm + Map<Container>& recvBufs, + const int tag = UPstream::msgType(), + const label comm = UPstream::worldComm, + const bool wait = true //!< Wait for requests to complete ); + + // Non-blocking exchange + //- Exchange \em contiguous data using non-blocking consensus //- Sends sendData, receives into recvData. // diff --git a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C index 9979998a8c2eaf761f5448c1d58f0e2708049636..f98d47334c1d2eb8122b992983d6e6e676b9b2b7 100644 --- a/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C +++ b/src/OpenFOAM/db/IOstreams/Pstreams/PstreamExchange.C @@ -366,6 +366,89 @@ void exchangeContainer } +//- Exchange \em contiguous data using point-to-point communication. +//- Sends sendBufs, receives into recvBufs. +// Data provided and received as container all of which have been +// properly sized before calling +// +// No internal guards or resizing. +template<class Container, class Type> +void exchangeContainer +( + const Map<Container>& sendBufs, + Map<Container>& recvBufs, + const int tag, + const label comm, + const bool wait //!< Wait for requests to complete +) +{ + const label startOfRequests = UPstream::nRequests(); + const label myProci = UPstream::myProcNo(comm); + + // Set up receives + // ~~~~~~~~~~~~~~~ + + forAllIters(recvBufs, iter) + { + const label proci = iter.key(); + auto& recvData = iter.val(); + + if (proci != myProci && !recvData.empty()) + { + UIPstream::read + ( + UPstream::commsTypes::nonBlocking, + proci, + recvData.data_bytes(), + recvData.size_bytes(), + tag, + comm + ); + } + } + + + // Set up sends + // ~~~~~~~~~~~~ + + forAllConstIters(sendBufs, iter) + { + const label proci = iter.key(); + const auto& sendData = iter.val(); + + if (proci != myProci && !sendData.empty()) + { + if + ( + !UOPstream::write + ( + UPstream::commsTypes::nonBlocking, + proci, + sendData.cdata_bytes(), + sendData.size_bytes(), + tag, + comm + ) + ) + { + FatalErrorInFunction + << "Cannot send outgoing message to:" + << proci << " nBytes:" + << label(sendData.size_bytes()) + << Foam::abort(FatalError); + } + } + } + + // Wait for all to finish + // ~~~~~~~~~~~~~~~~~~~~~~ + + if (wait) + { + UPstream::waitRequests(startOfRequests); + } +} + } // namespace PstreamDetail } // namespace Foam @@ -489,6 +572,169 @@ void Foam::Pstream::exchange } +template<class Container, class Type> +void Foam::Pstream::exchange +( + const Map<Container>& sendBufs, + const Map<label>& recvSizes, + Map<Container>& recvBufs, + const int tag, + const label comm, + const bool wait +) +{ + static_assert(is_contiguous<Type>::value, "Contiguous data only!"); + + const int myProci = UPstream::myProcNo(comm); + const label numProcs = UPstream::nProcs(comm); + + // Initial: clear out receive 'slots' + // Preferrable to clear out the map entries instead of the map itself + // since this can potentially preserve allocated space + // (eg DynamicList entries) between calls + + forAllIters(recvBufs, iter) + { + iter.val().clear(); + } + + if (UPstream::parRun() && numProcs > 1) + { + // Presize all receive buffers + forAllIters(recvSizes, iter) + { + const label proci = iter.key(); + const label count = iter.val(); + + if (proci != myProci && count > 0) + { + recvBufs(proci).resize_nocopy(count); + } + } + + // Define the exchange sequences as a flattened list. + // We add an additional step of ordering the send/recv list + // by message size, which can help with transfer speeds. + + typedef std::pair<int, stdFoam::span<const Type>> sendTuple; + typedef std::pair<int, stdFoam::span<Type>> recvTuple; + + // Populate send sequences + DynamicList<sendTuple> sends(sendBufs.size()); + forAllConstIters(sendBufs, iter) + { + const auto proci = iter.key(); + const auto& sendData = iter.val(); + + if (proci != myProci && !sendData.empty()) + { + sends.push_back + ( + sendTuple + ( + proci, + { sendData.cdata(), std::size_t(sendData.size()) } + ) + ); + } + } + + // Shorter messages first + std::sort + ( + sends.begin(), + sends.end(), + [=](const sendTuple& a, const sendTuple& b) + { + return (a.second.size() < b.second.size()); + } + ); + + // Populate recv sequences + DynamicList<recvTuple> recvs(recvBufs.size()); + forAllIters(recvBufs, iter) + { + const auto proci = iter.key(); + auto& recvData = recvBufs[proci]; + + if (proci != myProci && !recvData.empty()) + { + recvs.push_back + ( + recvTuple + ( + proci, + { recvData.data(), std::size_t(recvData.size()) } + ) + ); + } + } + + // Shorter messages first + std::sort + ( + recvs.begin(), + recvs.end(), + [=](const recvTuple& a, const recvTuple& b) + { + return (a.second.size() < b.second.size()); + } + ); + + + if (UPstream::maxCommsSize <= 0) + { + // Do the exchanging in a single go + PstreamDetail::exchangeBuf<Type> + ( + sends, + recvs, + tag, + comm, + wait + ); + } + else + { + // Exchange buffers in chunks + PstreamDetail::exchangeChunkedBuf<Type> + ( + sends, + recvs, + tag, + comm, + wait + ); + } + } + + + // Do myself + { + const auto iter = sendBufs.find(myProci); + + bool needsCopy = iter.good(); + + if (needsCopy) + { + const auto& sendData = iter.val(); + + needsCopy = !sendData.empty(); + if (needsCopy) + { + // insert_or_assign + recvBufs(myProci) = sendData; + } + } + + if (!needsCopy) + { + recvBufs.erase(myProci); + } + } +} + + template<class Container> void Foam::Pstream::exchangeSizes ( @@ -643,29 +889,9 @@ void Foam::Pstream::exchange const bool wait ) { - if - ( - wait - && UPstream::parRun() - && UPstream::nProcsNonblockingExchange > 1 - && UPstream::nProcsNonblockingExchange <= UPstream::nProcs(comm) - ) - { - // Use algorithm NBX: Nonblocking Consensus Exchange - - Pstream::exchangeConsensus<Container, Type> - ( - sendBufs, - recvBufs, - (tag + 314159), // some unique tag? - comm - ); - return; - } - // Algorithm PEX: Personalized Exchange // - Step 1: each process writes the data sizes to each peer and - // redistributes the vector (eg, MPI_Alltoall) + // redistributes the vector (eg, MPI_Alltoall or non-blocking consensus) // - Step 2: size receive buffers and setup receives for all // non-zero sendcounts. Post all sends and wait. @@ -676,4 +902,24 @@ void Foam::Pstream::exchange } +template<class Container, class Type> +void Foam::Pstream::exchange +( + const Map<Container>& sendBufs, + Map<Container>& recvBufs, + const int tag, + const label comm, + const bool wait +) +{ + // Algorithm PEX: Personalized Exchange + // but using nonblocking consensus exchange for the sizes + + Map<label> recvSizes; + exchangeSizes(sendBufs, recvSizes, tag, comm); + + exchange<Container, Type>(sendBufs, recvSizes, recvBufs, tag, comm, wait); +} + + // ************************************************************************* //